Source code for plaso.output.l2t_csv

"""Output module for the log2timeline (L2T) CSV format.

For documentation on the L2T CSV format see:
  https://forensics.wiki/l2t_csv
"""

import datetime
import pytz

from acstore.containers import interface as containers_interface

from dfdatetime import interface as dfdatetime_interface
from dfdatetime import posix_time as dfdatetime_posix_time

from plaso.output import formatting_helper
from plaso.output import logger
from plaso.output import manager
from plaso.output import shared_dsv
from plaso.output import text_file


[docs] class L2TCSVEventFormattingHelper(shared_dsv.DSVEventFormattingHelper): """L2T CSV output module event formatting helper."""
[docs] def GetFormattedMACBGroup(self, output_mediator, macb_group): """Retrieves a string representation of a MACB group. Args: output_mediator (OutputMediator): mediates interactions between output modules and other components, such as storage and dfVFS. macb_group (list[tuple[event, event_data, event_data_stream, event_tag]]): group of event, event_data, event_data_stream and event_tag objects with identical timestamps, attributes and values. Returns: str: string representation of the MACB group. """ timestamp_descriptions = [event.timestamp_desc for event, _, _, _ in macb_group] field_values = [] for field_name in self._field_names: if field_name == "MACB": field_value = output_mediator.GetMACBRepresentationFromDescriptions( timestamp_descriptions ) elif field_name == "type": # TODO: fix timestamp description in source. field_value = "; ".join(timestamp_descriptions) else: event, event_data, event_data_stream, event_tag = macb_group[0] field_value = self._field_formatting_helper.GetFormattedField( output_mediator, field_name, event, event_data, event_data_stream, event_tag, ) if field_value is None: field_value = "-" field_value = self._SanitizeField(field_value) field_values.append(field_value) return self.field_delimiter.join(field_values)
[docs] class L2TCSVFieldFormattingHelper(formatting_helper.FieldFormattingHelper): """L2T CSV output module field formatting helper.""" # Maps the name of a fields to a a callback function that formats # the field value. _FIELD_FORMAT_CALLBACKS = { "date": "_FormatDate", "desc": "_FormatMessage", "extra": "_FormatExtraAttributes", "filename": "_FormatDisplayName", "format": "_FormatParser", "host": "_FormatHostname", "inode": "_FormatInode", "MACB": "_FormatMACB", "notes": "_FormatTag", "short": "_FormatMessageShort", "source": "_FormatSourceShort", "sourcetype": "_FormatSource", "time": "_FormatTime", "timezone": "_FormatTimeZone", "type": "_FormatType", "user": "_FormatUsername", "values": "_FormatValues", "version": "_FormatVersion", } # Note that L2T CSV defines certain fields as part of the format. _RESERVED_VARIABLE_NAMES = frozenset( [ "_event_values_hash", "_parser_chain", "body", "data_type", "date_time", "display_name", "filename", "hostname", "http_headers", "inode", "mapped_files", "metadata", "offset", "path_spec", "query", "source_long", "source_short", "tag", "timestamp", "timestamp_desc", "timezone", "username", ] ) # The field format callback methods require specific arguments hence # the check for unused arguments is disabled here. # pylint: disable=unused-argument def _FormatDate(self, output_mediator, event, event_data, event_data_stream): """Formats a date field. Args: output_mediator (OutputMediator): mediates interactions between output modules and other components, such as storage and dfVFS. event (EventObject): event. event_data (EventData): event data. event_data_stream (EventDataStream): event data stream. Returns: str: date formatted as "MM/DD/YYYY" or "00/00/0000" on error. """ # For now check if event.timestamp is set, to mimic existing behavior of # using 00/00/0000 for 0 timestamp values. if not event.timestamp: return "00/00/0000" date_time = event.date_time if not date_time or date_time.is_local_time: date_time = dfdatetime_posix_time.PosixTimeInMicroseconds( timestamp=event.timestamp ) # Note that GetDateWithTimeOfDay will return the date and time in UTC, # so no adjustment for date_time.time_zone_offset is needed. year, month, day_of_month, hours, minutes, seconds = ( date_time.GetDateWithTimeOfDay() ) if output_mediator.time_zone != pytz.UTC: try: datetime_object = datetime.datetime( year, month, day_of_month, hours, minutes, seconds, tzinfo=pytz.UTC ) datetime_object = datetime_object.astimezone(output_mediator.time_zone) year = datetime_object.year month = datetime_object.month day_of_month = datetime_object.day except (OSError, OverflowError, TypeError, ValueError): year, month, day_of_month = (None, None, None) if None in (year, month, day_of_month): message = ( f"unable to copy timestamp: {event.timestamp!s} to a human readable " f'date. Defaulting to: "00/00/0000"' ) self._ReportEventError( event, event_data, message, ) return "00/00/0000" return f"{month:02d}/{day_of_month:02d}/{year:04d}" def _FormatExtraAttributes( self, output_mediator, event, event_data, event_data_stream ): """Formats an extra attributes field. Args: output_mediator (OutputMediator): mediates interactions between output modules and other components, such as storage and dfVFS. event (EventObject): event. event_data (EventData): event data. event_data_stream (EventDataStream): event data stream. Returns: str: extra attributes field. """ message_formatter = output_mediator.GetMessageFormatter(event_data.data_type) if not message_formatter: logger.warning( f"Using default message formatter for data type: " f"{event_data.data_type:s}" ) message_formatter = self._DEFAULT_MESSAGE_FORMATTER formatted_attribute_names = message_formatter.GetFormatStringAttributeNames() formatted_attribute_names.update(self._RESERVED_VARIABLE_NAMES) extra_attributes = [] for attribute_name, attribute_value in event_data.GetAttributes(): if attribute_name in formatted_attribute_names: continue # Ignore attribute container identifier and date and time values. if isinstance( attribute_value, ( containers_interface.AttributeContainerIdentifier, dfdatetime_interface.DateTimeValues, ), ): continue if ( isinstance(attribute_value, list) and attribute_value and isinstance(attribute_value[0], dfdatetime_interface.DateTimeValues) ): continue # Some parsers have written bytes values to storage. if isinstance(attribute_value, bytes): attribute_value = attribute_value.decode("utf-8", "replace") logger.warning( f'Found bytes value for attribute "{attribute_name:s}" for ' f"data type: {event_data.data_type!s}. Value was converted to " f'UTF-8: "{attribute_value:s}"' ) # With ! in {name!s} we force a string conversion since some of the extra # attributes values can be integer, float point or boolean values. extra_attributes.append(f"{attribute_name:s}: {attribute_value!s}") if event_data_stream: for attribute_name, attribute_value in event_data_stream.GetAttributes(): if attribute_name != "path_spec": extra_attributes.append(f"{attribute_name:s}: {attribute_value!s}") extra_attributes = "; ".join(sorted(extra_attributes)) return extra_attributes.replace("\n", "-").replace("\r", "") def _FormatType(self, output_mediator, event, event_data, event_data_stream): """Formats a type field. Args: output_mediator (OutputMediator): mediates interactions between output modules and other components, such as storage and dfVFS. event (EventObject): event. event_data (EventData): event data. event_data_stream (EventDataStream): event data stream. Returns: str: type field. """ return getattr(event, "timestamp_desc", "-") def _FormatVersion(self, output_mediator, event, event_data, event_data_stream): """Formats a version field. Args: output_mediator (OutputMediator): mediates interactions between output modules and other components, such as storage and dfVFS. event (EventObject): event. event_data (EventData): event data. event_data_stream (EventDataStream): event data stream. Returns: str: version field. """ return "2"
# pylint: enable=unused-argument
[docs] class L2TCSVOutputModule(text_file.SortedTextFileOutputModule): """CSV format used by log2timeline, with 17 fixed fields.""" NAME = "l2tcsv" DESCRIPTION = "CSV format used by legacy log2timeline, with 17 fixed fields." _FIELD_NAMES = [ "date", "time", "timezone", "MACB", "source", "sourcetype", "type", "user", "host", "short", "desc", "version", "filename", "inode", "notes", "format", "extra", ] _SORT_KEY_FIELD_NAMES = ["time", "filename", "inode"]
[docs] def __init__(self): """Initializes an output module.""" field_formatting_helper = L2TCSVFieldFormattingHelper() event_formatting_helper = L2TCSVEventFormattingHelper( field_formatting_helper, self._FIELD_NAMES ) super().__init__(event_formatting_helper)
def _GetString(self, output_mediator, field_values): """Retrieves an output string. Args: output_mediator (OutputMediator): mediates interactions between output modules and other components, such as storage and dfVFS. field_values (dict[str, str]): output field values per name. Returns: str: output string. """ output_text = self._event_formatting_helper.field_delimiter.join( field_values.values() ) return "".join([output_text, "\n"])
[docs] def WriteFieldValuesOfMACBGroup(self, output_mediator, macb_group): """Writes field values of a MACB group to the output. Args: output_mediator (OutputMediator): mediates interactions between output modules and other components, such as storage and dfVFS. macb_group (list[tuple[event, event_data, event_data_stream, event_tag]]): group of event, event_data, event_data_stream and event_tag objects with identical timestamps, attributes and values. """ output_text = self._event_formatting_helper.GetFormattedMACBGroup( output_mediator, macb_group ) self.WriteLine(output_text)
[docs] def WriteHeader(self, output_mediator): """Writes the header to the output. Args: output_mediator (OutputMediator): mediates interactions between output modules and other components, such as storage and dfVFS. """ output_text = self._event_formatting_helper.GetFormattedFieldNames() self.WriteLine(output_text)
manager.OutputManager.RegisterOutput(L2TCSVOutputModule)