Source code for plaso.output.l2t_csv
"""Output module for the log2timeline (L2T) CSV format.
For documentation on the L2T CSV format see:
https://forensics.wiki/l2t_csv
"""
import datetime
import pytz
from acstore.containers import interface as containers_interface
from dfdatetime import interface as dfdatetime_interface
from dfdatetime import posix_time as dfdatetime_posix_time
from plaso.output import formatting_helper
from plaso.output import logger
from plaso.output import manager
from plaso.output import shared_dsv
from plaso.output import text_file
[docs]
class L2TCSVEventFormattingHelper(shared_dsv.DSVEventFormattingHelper):
"""L2T CSV output module event formatting helper."""
[docs]
def GetFormattedMACBGroup(self, output_mediator, macb_group):
"""Retrieves a string representation of a MACB group.
Args:
output_mediator (OutputMediator): mediates interactions between output
modules and other components, such as storage and dfVFS.
macb_group (list[tuple[event, event_data, event_data_stream, event_tag]]):
group of event, event_data, event_data_stream and event_tag objects
with identical timestamps, attributes and values.
Returns:
str: string representation of the MACB group.
"""
timestamp_descriptions = [event.timestamp_desc for event, _, _, _ in macb_group]
field_values = []
for field_name in self._field_names:
if field_name == "MACB":
field_value = output_mediator.GetMACBRepresentationFromDescriptions(
timestamp_descriptions
)
elif field_name == "type":
# TODO: fix timestamp description in source.
field_value = "; ".join(timestamp_descriptions)
else:
event, event_data, event_data_stream, event_tag = macb_group[0]
field_value = self._field_formatting_helper.GetFormattedField(
output_mediator,
field_name,
event,
event_data,
event_data_stream,
event_tag,
)
if field_value is None:
field_value = "-"
field_value = self._SanitizeField(field_value)
field_values.append(field_value)
return self.field_delimiter.join(field_values)
[docs]
class L2TCSVFieldFormattingHelper(formatting_helper.FieldFormattingHelper):
"""L2T CSV output module field formatting helper."""
# Maps the name of a fields to a a callback function that formats
# the field value.
_FIELD_FORMAT_CALLBACKS = {
"date": "_FormatDate",
"desc": "_FormatMessage",
"extra": "_FormatExtraAttributes",
"filename": "_FormatDisplayName",
"format": "_FormatParser",
"host": "_FormatHostname",
"inode": "_FormatInode",
"MACB": "_FormatMACB",
"notes": "_FormatTag",
"short": "_FormatMessageShort",
"source": "_FormatSourceShort",
"sourcetype": "_FormatSource",
"time": "_FormatTime",
"timezone": "_FormatTimeZone",
"type": "_FormatType",
"user": "_FormatUsername",
"values": "_FormatValues",
"version": "_FormatVersion",
}
# Note that L2T CSV defines certain fields as part of the format.
_RESERVED_VARIABLE_NAMES = frozenset(
[
"_event_values_hash",
"_parser_chain",
"body",
"data_type",
"date_time",
"display_name",
"filename",
"hostname",
"http_headers",
"inode",
"mapped_files",
"metadata",
"offset",
"path_spec",
"query",
"source_long",
"source_short",
"tag",
"timestamp",
"timestamp_desc",
"timezone",
"username",
]
)
# The field format callback methods require specific arguments hence
# the check for unused arguments is disabled here.
# pylint: disable=unused-argument
def _FormatDate(self, output_mediator, event, event_data, event_data_stream):
"""Formats a date field.
Args:
output_mediator (OutputMediator): mediates interactions between output
modules and other components, such as storage and dfVFS.
event (EventObject): event.
event_data (EventData): event data.
event_data_stream (EventDataStream): event data stream.
Returns:
str: date formatted as "MM/DD/YYYY" or "00/00/0000" on error.
"""
# For now check if event.timestamp is set, to mimic existing behavior of
# using 00/00/0000 for 0 timestamp values.
if not event.timestamp:
return "00/00/0000"
date_time = event.date_time
if not date_time or date_time.is_local_time:
date_time = dfdatetime_posix_time.PosixTimeInMicroseconds(
timestamp=event.timestamp
)
# Note that GetDateWithTimeOfDay will return the date and time in UTC,
# so no adjustment for date_time.time_zone_offset is needed.
year, month, day_of_month, hours, minutes, seconds = (
date_time.GetDateWithTimeOfDay()
)
if output_mediator.time_zone != pytz.UTC:
try:
datetime_object = datetime.datetime(
year, month, day_of_month, hours, minutes, seconds, tzinfo=pytz.UTC
)
datetime_object = datetime_object.astimezone(output_mediator.time_zone)
year = datetime_object.year
month = datetime_object.month
day_of_month = datetime_object.day
except (OSError, OverflowError, TypeError, ValueError):
year, month, day_of_month = (None, None, None)
if None in (year, month, day_of_month):
message = (
f"unable to copy timestamp: {event.timestamp!s} to a human readable "
f'date. Defaulting to: "00/00/0000"'
)
self._ReportEventError(
event,
event_data,
message,
)
return "00/00/0000"
return f"{month:02d}/{day_of_month:02d}/{year:04d}"
def _FormatExtraAttributes(
self, output_mediator, event, event_data, event_data_stream
):
"""Formats an extra attributes field.
Args:
output_mediator (OutputMediator): mediates interactions between output
modules and other components, such as storage and dfVFS.
event (EventObject): event.
event_data (EventData): event data.
event_data_stream (EventDataStream): event data stream.
Returns:
str: extra attributes field.
"""
message_formatter = output_mediator.GetMessageFormatter(event_data.data_type)
if not message_formatter:
logger.warning(
f"Using default message formatter for data type: "
f"{event_data.data_type:s}"
)
message_formatter = self._DEFAULT_MESSAGE_FORMATTER
formatted_attribute_names = message_formatter.GetFormatStringAttributeNames()
formatted_attribute_names.update(self._RESERVED_VARIABLE_NAMES)
extra_attributes = []
for attribute_name, attribute_value in event_data.GetAttributes():
if attribute_name in formatted_attribute_names:
continue
# Ignore attribute container identifier and date and time values.
if isinstance(
attribute_value,
(
containers_interface.AttributeContainerIdentifier,
dfdatetime_interface.DateTimeValues,
),
):
continue
if (
isinstance(attribute_value, list)
and attribute_value
and isinstance(attribute_value[0], dfdatetime_interface.DateTimeValues)
):
continue
# Some parsers have written bytes values to storage.
if isinstance(attribute_value, bytes):
attribute_value = attribute_value.decode("utf-8", "replace")
logger.warning(
f'Found bytes value for attribute "{attribute_name:s}" for '
f"data type: {event_data.data_type!s}. Value was converted to "
f'UTF-8: "{attribute_value:s}"'
)
# With ! in {name!s} we force a string conversion since some of the extra
# attributes values can be integer, float point or boolean values.
extra_attributes.append(f"{attribute_name:s}: {attribute_value!s}")
if event_data_stream:
for attribute_name, attribute_value in event_data_stream.GetAttributes():
if attribute_name != "path_spec":
extra_attributes.append(f"{attribute_name:s}: {attribute_value!s}")
extra_attributes = "; ".join(sorted(extra_attributes))
return extra_attributes.replace("\n", "-").replace("\r", "")
def _FormatType(self, output_mediator, event, event_data, event_data_stream):
"""Formats a type field.
Args:
output_mediator (OutputMediator): mediates interactions between output
modules and other components, such as storage and dfVFS.
event (EventObject): event.
event_data (EventData): event data.
event_data_stream (EventDataStream): event data stream.
Returns:
str: type field.
"""
return getattr(event, "timestamp_desc", "-")
def _FormatVersion(self, output_mediator, event, event_data, event_data_stream):
"""Formats a version field.
Args:
output_mediator (OutputMediator): mediates interactions between output
modules and other components, such as storage and dfVFS.
event (EventObject): event.
event_data (EventData): event data.
event_data_stream (EventDataStream): event data stream.
Returns:
str: version field.
"""
return "2"
# pylint: enable=unused-argument
[docs]
class L2TCSVOutputModule(text_file.SortedTextFileOutputModule):
"""CSV format used by log2timeline, with 17 fixed fields."""
NAME = "l2tcsv"
DESCRIPTION = "CSV format used by legacy log2timeline, with 17 fixed fields."
_FIELD_NAMES = [
"date",
"time",
"timezone",
"MACB",
"source",
"sourcetype",
"type",
"user",
"host",
"short",
"desc",
"version",
"filename",
"inode",
"notes",
"format",
"extra",
]
_SORT_KEY_FIELD_NAMES = ["time", "filename", "inode"]
[docs]
def __init__(self):
"""Initializes an output module."""
field_formatting_helper = L2TCSVFieldFormattingHelper()
event_formatting_helper = L2TCSVEventFormattingHelper(
field_formatting_helper, self._FIELD_NAMES
)
super().__init__(event_formatting_helper)
def _GetString(self, output_mediator, field_values):
"""Retrieves an output string.
Args:
output_mediator (OutputMediator): mediates interactions between output
modules and other components, such as storage and dfVFS.
field_values (dict[str, str]): output field values per name.
Returns:
str: output string.
"""
output_text = self._event_formatting_helper.field_delimiter.join(
field_values.values()
)
return "".join([output_text, "\n"])
[docs]
def WriteFieldValuesOfMACBGroup(self, output_mediator, macb_group):
"""Writes field values of a MACB group to the output.
Args:
output_mediator (OutputMediator): mediates interactions between output
modules and other components, such as storage and dfVFS.
macb_group (list[tuple[event, event_data, event_data_stream, event_tag]]):
group of event, event_data, event_data_stream and event_tag objects
with identical timestamps, attributes and values.
"""
output_text = self._event_formatting_helper.GetFormattedMACBGroup(
output_mediator, macb_group
)
self.WriteLine(output_text)
[docs]
def WriteHeader(self, output_mediator):
"""Writes the header to the output.
Args:
output_mediator (OutputMediator): mediates interactions between output
modules and other components, such as storage and dfVFS.
"""
output_text = self._event_formatting_helper.GetFormattedFieldNames()
self.WriteLine(output_text)
manager.OutputManager.RegisterOutput(L2TCSVOutputModule)