Source code for plaso.output.l2t_csv

# -*- coding: utf-8 -*-
"""Output module for the log2timeline (L2T) CSV format.

For documentation on the L2T CSV format see:
  https://forensics.wiki/l2t_csv
"""

import datetime
import pytz

from acstore.containers import interface as containers_interface

from dfdatetime import interface as dfdatetime_interface
from dfdatetime import posix_time as dfdatetime_posix_time

from plaso.output import formatting_helper
from plaso.output import logger
from plaso.output import manager
from plaso.output import shared_dsv
from plaso.output import text_file


[docs] class L2TCSVEventFormattingHelper(shared_dsv.DSVEventFormattingHelper): """L2T CSV output module event formatting helper."""
[docs] def GetFormattedMACBGroup(self, output_mediator, macb_group): """Retrieves a string representation of a MACB group. Args: output_mediator (OutputMediator): mediates interactions between output modules and other components, such as storage and dfVFS. macb_group (list[tuple[event, event_data, event_data_stream, event_tag]]): group of event, event_data, event_data_stream and event_tag objects with identical timestamps, attributes and values. Returns: str: string representation of the MACB group. """ timestamp_descriptions = [ event.timestamp_desc for event, _, _, _ in macb_group] field_values = [] for field_name in self._field_names: if field_name == 'MACB': field_value = output_mediator.GetMACBRepresentationFromDescriptions( timestamp_descriptions) elif field_name == 'type': # TODO: fix timestamp description in source. field_value = '; '.join(timestamp_descriptions) else: event, event_data, event_data_stream, event_tag = macb_group[0] field_value = self._field_formatting_helper.GetFormattedField( output_mediator, field_name, event, event_data, event_data_stream, event_tag) if field_value is None: field_value = '-' field_value = self._SanitizeField(field_value) field_values.append(field_value) return self.field_delimiter.join(field_values)
[docs] class L2TCSVFieldFormattingHelper(formatting_helper.FieldFormattingHelper): """L2T CSV output module field formatting helper.""" # Maps the name of a fields to a a callback function that formats # the field value. _FIELD_FORMAT_CALLBACKS = { 'date': '_FormatDate', 'desc': '_FormatMessage', 'extra': '_FormatExtraAttributes', 'filename': '_FormatDisplayName', 'format': '_FormatParser', 'host': '_FormatHostname', 'inode': '_FormatInode', 'MACB': '_FormatMACB', 'notes': '_FormatTag', 'short': '_FormatMessageShort', 'source': '_FormatSourceShort', 'sourcetype': '_FormatSource', 'time': '_FormatTime', 'timezone': '_FormatTimeZone', 'type': '_FormatType', 'user': '_FormatUsername', 'values': '_FormatValues', 'version': '_FormatVersion'} # Note that L2T CSV defines certain fields as part of the format. _RESERVED_VARIABLE_NAMES = frozenset([ '_event_values_hash', '_parser_chain', 'body', 'data_type', 'date_time', 'display_name', 'filename', 'hostname', 'http_headers', 'inode', 'mapped_files', 'metadata', 'offset', 'path_spec', 'query', 'source_long', 'source_short', 'tag', 'timestamp', 'timestamp_desc', 'timezone', 'username']) # The field format callback methods require specific arguments hence # the check for unused arguments is disabled here. # pylint: disable=unused-argument def _FormatDate(self, output_mediator, event, event_data, event_data_stream): """Formats a date field. Args: output_mediator (OutputMediator): mediates interactions between output modules and other components, such as storage and dfVFS. event (EventObject): event. event_data (EventData): event data. event_data_stream (EventDataStream): event data stream. Returns: str: date formatted as "MM/DD/YYYY" or "00/00/0000" on error. """ # For now check if event.timestamp is set, to mimic existing behavior of # using 00/00/0000 for 0 timestamp values. if not event.timestamp: return '00/00/0000' date_time = event.date_time if not date_time or date_time.is_local_time: date_time = dfdatetime_posix_time.PosixTimeInMicroseconds( timestamp=event.timestamp) # Note that GetDateWithTimeOfDay will return the date and time in UTC, # so no adjustment for date_time.time_zone_offset is needed. year, month, day_of_month, hours, minutes, seconds = ( date_time.GetDateWithTimeOfDay()) if output_mediator.time_zone != pytz.UTC: try: datetime_object = datetime.datetime( year, month, day_of_month, hours, minutes, seconds, tzinfo=pytz.UTC) datetime_object = datetime_object.astimezone(output_mediator.time_zone) year = datetime_object.year month = datetime_object.month day_of_month = datetime_object.day except (OSError, OverflowError, TypeError, ValueError): year, month, day_of_month = (None, None, None) if None in (year, month, day_of_month): self._ReportEventError(event, event_data, ( 'unable to copy timestamp: {0!s} to a human readable date. ' 'Defaulting to: "00/00/0000"').format(event.timestamp)) return '00/00/0000' return '{0:02d}/{1:02d}/{2:04d}'.format(month, day_of_month, year) def _FormatExtraAttributes( self, output_mediator, event, event_data, event_data_stream): """Formats an extra attributes field. Args: output_mediator (OutputMediator): mediates interactions between output modules and other components, such as storage and dfVFS. event (EventObject): event. event_data (EventData): event data. event_data_stream (EventDataStream): event data stream. Returns: str: extra attributes field. """ message_formatter = output_mediator.GetMessageFormatter( event_data.data_type) if not message_formatter: logger.warning( 'Using default message formatter for data type: {0:s}'.format( event_data.data_type)) message_formatter = self._DEFAULT_MESSAGE_FORMATTER formatted_attribute_names = ( message_formatter.GetFormatStringAttributeNames()) formatted_attribute_names.update(self._RESERVED_VARIABLE_NAMES) extra_attributes = [] for attribute_name, attribute_value in event_data.GetAttributes(): if attribute_name in formatted_attribute_names: continue # Ignore attribute container identifier and date and time values. if isinstance(attribute_value, ( containers_interface.AttributeContainerIdentifier, dfdatetime_interface.DateTimeValues)): continue if (isinstance(attribute_value, list) and attribute_value and isinstance(attribute_value[0], dfdatetime_interface.DateTimeValues)): continue # Some parsers have written bytes values to storage. if isinstance(attribute_value, bytes): attribute_value = attribute_value.decode('utf-8', 'replace') logger.warning( 'Found bytes value for attribute "{0:s}" for data type: ' '{1!s}. Value was converted to UTF-8: "{2:s}"'.format( attribute_name, event_data.data_type, attribute_value)) # With ! in {1!s} we force a string conversion since some of # the extra attributes values can be integer, float point or # boolean values. extra_attributes.append('{0:s}: {1!s}'.format( attribute_name, attribute_value)) if event_data_stream: for attribute_name, attribute_value in event_data_stream.GetAttributes(): if attribute_name != 'path_spec': extra_attributes.append('{0:s}: {1!s}'.format( attribute_name, attribute_value)) extra_attributes = '; '.join(sorted(extra_attributes)) return extra_attributes.replace('\n', '-').replace('\r', '') def _FormatType(self, output_mediator, event, event_data, event_data_stream): """Formats a type field. Args: output_mediator (OutputMediator): mediates interactions between output modules and other components, such as storage and dfVFS. event (EventObject): event. event_data (EventData): event data. event_data_stream (EventDataStream): event data stream. Returns: str: type field. """ return getattr(event, 'timestamp_desc', '-') def _FormatVersion( self, output_mediator, event, event_data, event_data_stream): """Formats a version field. Args: output_mediator (OutputMediator): mediates interactions between output modules and other components, such as storage and dfVFS. event (EventObject): event. event_data (EventData): event data. event_data_stream (EventDataStream): event data stream. Returns: str: version field. """ return '2'
# pylint: enable=unused-argument
[docs] class L2TCSVOutputModule(text_file.SortedTextFileOutputModule): """CSV format used by log2timeline, with 17 fixed fields.""" NAME = 'l2tcsv' DESCRIPTION = 'CSV format used by legacy log2timeline, with 17 fixed fields.' _FIELD_NAMES = [ 'date', 'time', 'timezone', 'MACB', 'source', 'sourcetype', 'type', 'user', 'host', 'short', 'desc', 'version', 'filename', 'inode', 'notes', 'format', 'extra'] _SORT_KEY_FIELD_NAMES = ['time', 'filename', 'inode']
[docs] def __init__(self): """Initializes an output module.""" field_formatting_helper = L2TCSVFieldFormattingHelper() event_formatting_helper = L2TCSVEventFormattingHelper( field_formatting_helper, self._FIELD_NAMES) super(L2TCSVOutputModule, self).__init__(event_formatting_helper)
def _GetString(self, output_mediator, field_values): """Retrieves an output string. Args: output_mediator (OutputMediator): mediates interactions between output modules and other components, such as storage and dfVFS. field_values (dict[str, str]): output field values per name. Returns: str: output string. """ output_text = self._event_formatting_helper.field_delimiter.join( field_values.values()) return ''.join([output_text, '\n'])
[docs] def WriteFieldValuesOfMACBGroup(self, output_mediator, macb_group): """Writes field values of a MACB group to the output. Args: output_mediator (OutputMediator): mediates interactions between output modules and other components, such as storage and dfVFS. macb_group (list[tuple[event, event_data, event_data_stream, event_tag]]): group of event, event_data, event_data_stream and event_tag objects with identical timestamps, attributes and values. """ output_text = self._event_formatting_helper.GetFormattedMACBGroup( output_mediator, macb_group) self.WriteLine(output_text)
[docs] def WriteHeader(self, output_mediator): """Writes the header to the output. Args: output_mediator (OutputMediator): mediates interactions between output modules and other components, such as storage and dfVFS. """ output_text = self._event_formatting_helper.GetFormattedFieldNames() self.WriteLine(output_text)
manager.OutputManager.RegisterOutput(L2TCSVOutputModule)