Source code for plaso.output.dynamic

"""Dynamic selected delimiter separated values output module."""

import datetime
import pytz

from dfdatetime import posix_time as dfdatetime_posix_time

from plaso.output import formatting_helper
from plaso.output import manager
from plaso.output import shared_dsv


[docs] class DynamicFieldFormattingHelper(formatting_helper.FieldFormattingHelper): """Dynamic output module field formatting helper.""" # TODO: determine why _FormatTimestampDescription is mapped to both # timestamp_desc and type. # Maps the name of a fields to a a callback function that formats # the field value. _FIELD_FORMAT_CALLBACKS = { "date": "_FormatDate", "datetime": "_FormatDateTime", "description": "_FormatMessage", "description_short": "_FormatMessageShort", "display_name": "_FormatDisplayName", "filename": "_FormatFilename", "host": "_FormatHostname", "hostname": "_FormatHostname", "inode": "_FormatInode", "macb": "_FormatMACB", "message": "_FormatMessage", "message_short": "_FormatMessageShort", "parser": "_FormatParser", "source": "_FormatSourceShort", "sourcetype": "_FormatSource", "source_long": "_FormatSource", "tag": "_FormatTag", "time": "_FormatTime", "timestamp_desc": "_FormatTimestampDescription", "timezone": "_FormatTimeZone", "type": "_FormatTimestampDescription", "user": "_FormatUsername", "username": "_FormatUsername", "values": "_FormatValues", "yara_match": "_FormatYaraMatch", "zone": "_FormatTimeZone", } # The field format callback methods require specific arguments hence # the check for unused arguments is disabled here. # pylint: disable=unused-argument def _FormatDate(self, output_mediator, event, event_data, event_data_stream): """Formats a date field. Args: output_mediator (OutputMediator): mediates interactions between output modules and other components, such as storage and dfVFS. event (EventObject): event. event_data (EventData): event data. event_data_stream (EventDataStream): event data stream. Returns: str: date formatted as "YYYY-MM-DD" or "0000-00-00" on error. """ # For now check if event.timestamp is set, to mimic existing behavior of # using 0000-00-00 for 0 timestamp values. if not event.timestamp: return "0000-00-00" date_time = event.date_time if not date_time or date_time.is_local_time: date_time = dfdatetime_posix_time.PosixTimeInMicroseconds( timestamp=event.timestamp ) # Note that GetDateWithTimeOfDay will return the date and time in UTC, # so no adjustment for date_time.time_zone_offset is needed. year, month, day_of_month, hours, minutes, seconds = ( date_time.GetDateWithTimeOfDay() ) if output_mediator.time_zone != pytz.UTC: try: datetime_object = datetime.datetime( year, month, day_of_month, hours, minutes, seconds, tzinfo=pytz.UTC ) datetime_object = datetime_object.astimezone(output_mediator.time_zone) year = datetime_object.year month = datetime_object.month day_of_month = datetime_object.day except (OSError, OverflowError, TypeError, ValueError): year, month, day_of_month = (None, None, None) if None in (year, month, day_of_month): message = ( f"unable to copy timestamp: {event.timestamp!s} to a human readable " f'date. Defaulting to: "0000-00-00"' ) self._ReportEventError( event, event_data, message, ) return "0000-00-00" return f"{year:04d}-{month:02d}-{day_of_month:02d}" def _FormatTimestampDescription( self, output_mediator, event, event_data, event_data_stream ): """Formats a timestamp description field. Args: output_mediator (OutputMediator): mediates interactions between output modules and other components, such as storage and dfVFS. event (EventObject): event. event_data (EventData): event data. event_data_stream (EventDataStream): event data stream. Returns: str: timestamp description field. """ return event.timestamp_desc or "-" def _FormatYaraMatch(self, output_mediator, event, event_data, event_data_stream): """Formats a Yara match field. Args: output_mediator (OutputMediator): mediates interactions between output modules and other components, such as storage and dfVFS. event (EventObject): event. event_data (EventData): event data. event_data_stream (EventDataStream): event data stream. Returns: str: Yara match field. """ yara_match = getattr(event_data_stream, "yara_match", None) or [] return "; ".join(yara_match) or "-"
# pylint: enable=unused-argument
[docs] class DynamicOutputModule(shared_dsv.DSVOutputModule): """Dynamic selected delimiter separated values (DSV) output module.""" NAME = "dynamic" DESCRIPTION = "Dynamic selection of fields for a separated value output format." SUPPORTS_ADDITIONAL_FIELDS = True SUPPORTS_CUSTOM_FIELDS = True _DEFAULT_NAMES = [ "datetime", "timestamp_desc", "source", "source_long", "message", "parser", "display_name", "tag", ] _SORT_KEY_FIELD_NAMES = ["datetime", "display_name", "message"]
[docs] def __init__(self): """Initializes an output module.""" field_formatting_helper = DynamicFieldFormattingHelper() super().__init__(field_formatting_helper, self._DEFAULT_NAMES)
manager.OutputManager.RegisterOutput(DynamicOutputModule)