Source code for plaso.output.dynamic
"""Dynamic selected delimiter separated values output module."""
import datetime
import pytz
from dfdatetime import posix_time as dfdatetime_posix_time
from plaso.output import formatting_helper
from plaso.output import manager
from plaso.output import shared_dsv
[docs]
class DynamicFieldFormattingHelper(formatting_helper.FieldFormattingHelper):
"""Dynamic output module field formatting helper."""
# TODO: determine why _FormatTimestampDescription is mapped to both
# timestamp_desc and type.
# Maps the name of a fields to a a callback function that formats
# the field value.
_FIELD_FORMAT_CALLBACKS = {
"date": "_FormatDate",
"datetime": "_FormatDateTime",
"description": "_FormatMessage",
"description_short": "_FormatMessageShort",
"display_name": "_FormatDisplayName",
"filename": "_FormatFilename",
"host": "_FormatHostname",
"hostname": "_FormatHostname",
"inode": "_FormatInode",
"macb": "_FormatMACB",
"message": "_FormatMessage",
"message_short": "_FormatMessageShort",
"parser": "_FormatParser",
"source": "_FormatSourceShort",
"sourcetype": "_FormatSource",
"source_long": "_FormatSource",
"tag": "_FormatTag",
"time": "_FormatTime",
"timestamp_desc": "_FormatTimestampDescription",
"timezone": "_FormatTimeZone",
"type": "_FormatTimestampDescription",
"user": "_FormatUsername",
"username": "_FormatUsername",
"values": "_FormatValues",
"yara_match": "_FormatYaraMatch",
"zone": "_FormatTimeZone",
}
# The field format callback methods require specific arguments hence
# the check for unused arguments is disabled here.
# pylint: disable=unused-argument
def _FormatDate(self, output_mediator, event, event_data, event_data_stream):
"""Formats a date field.
Args:
output_mediator (OutputMediator): mediates interactions between output
modules and other components, such as storage and dfVFS.
event (EventObject): event.
event_data (EventData): event data.
event_data_stream (EventDataStream): event data stream.
Returns:
str: date formatted as "YYYY-MM-DD" or "0000-00-00" on error.
"""
# For now check if event.timestamp is set, to mimic existing behavior of
# using 0000-00-00 for 0 timestamp values.
if not event.timestamp:
return "0000-00-00"
date_time = event.date_time
if not date_time or date_time.is_local_time:
date_time = dfdatetime_posix_time.PosixTimeInMicroseconds(
timestamp=event.timestamp
)
# Note that GetDateWithTimeOfDay will return the date and time in UTC,
# so no adjustment for date_time.time_zone_offset is needed.
year, month, day_of_month, hours, minutes, seconds = (
date_time.GetDateWithTimeOfDay()
)
if output_mediator.time_zone != pytz.UTC:
try:
datetime_object = datetime.datetime(
year, month, day_of_month, hours, minutes, seconds, tzinfo=pytz.UTC
)
datetime_object = datetime_object.astimezone(output_mediator.time_zone)
year = datetime_object.year
month = datetime_object.month
day_of_month = datetime_object.day
except (OSError, OverflowError, TypeError, ValueError):
year, month, day_of_month = (None, None, None)
if None in (year, month, day_of_month):
message = (
f"unable to copy timestamp: {event.timestamp!s} to a human readable "
f'date. Defaulting to: "0000-00-00"'
)
self._ReportEventError(
event,
event_data,
message,
)
return "0000-00-00"
return f"{year:04d}-{month:02d}-{day_of_month:02d}"
def _FormatTimestampDescription(
self, output_mediator, event, event_data, event_data_stream
):
"""Formats a timestamp description field.
Args:
output_mediator (OutputMediator): mediates interactions between output
modules and other components, such as storage and dfVFS.
event (EventObject): event.
event_data (EventData): event data.
event_data_stream (EventDataStream): event data stream.
Returns:
str: timestamp description field.
"""
return event.timestamp_desc or "-"
def _FormatYaraMatch(self, output_mediator, event, event_data, event_data_stream):
"""Formats a Yara match field.
Args:
output_mediator (OutputMediator): mediates interactions between output
modules and other components, such as storage and dfVFS.
event (EventObject): event.
event_data (EventData): event data.
event_data_stream (EventDataStream): event data stream.
Returns:
str: Yara match field.
"""
yara_match = getattr(event_data_stream, "yara_match", None) or []
return "; ".join(yara_match) or "-"
# pylint: enable=unused-argument
[docs]
class DynamicOutputModule(shared_dsv.DSVOutputModule):
"""Dynamic selected delimiter separated values (DSV) output module."""
NAME = "dynamic"
DESCRIPTION = "Dynamic selection of fields for a separated value output format."
SUPPORTS_ADDITIONAL_FIELDS = True
SUPPORTS_CUSTOM_FIELDS = True
_DEFAULT_NAMES = [
"datetime",
"timestamp_desc",
"source",
"source_long",
"message",
"parser",
"display_name",
"tag",
]
_SORT_KEY_FIELD_NAMES = ["datetime", "display_name", "message"]
[docs]
def __init__(self):
"""Initializes an output module."""
field_formatting_helper = DynamicFieldFormattingHelper()
super().__init__(field_formatting_helper, self._DEFAULT_NAMES)
manager.OutputManager.RegisterOutput(DynamicOutputModule)