Source code for plaso.output.formatting_helper

"""Output module field formatting helper."""

import abc
import datetime
import math
import pytz

from dfdatetime import posix_time as dfdatetime_posix_time
from dfvfs.lib import definitions as dfvfs_definitions

from plaso.containers import events
from plaso.formatters import default
from plaso.output import logger


[docs] class EventFormattingHelper: """Output module event formatting helper."""
[docs] @abc.abstractmethod def GetFieldValues( self, output_mediator, event, event_data, event_data_stream, event_tag): """Retrieves the output field values. Args: output_mediator (OutputMediator): mediates interactions between output modules and other components, such as storage and dfVFS. event (EventObject): event. event_data (EventData): event data. event_data_stream (EventDataStream): event data stream. event_tag (EventTag): event tag. Returns: list[str]: output field values. """
[docs] class FieldFormattingHelper: """Output module field formatting helper.""" _DEFAULT_MESSAGE_FORMATTER = default.DefaultEventFormatter() # Maps the name of a field to callback function that formats the field value. _FIELD_FORMAT_CALLBACKS = {}
[docs] def __init__(self): """Initializes a field formatting helper.""" event_data_stream = events.EventDataStream() super().__init__() self._callback_functions = {} self._event_data_stream_field_names = event_data_stream.GetAttributeNames() self._event_tag_field_names = [] for field_name, callback_name in self._FIELD_FORMAT_CALLBACKS.items(): if callback_name == '_FormatTag': self._event_tag_field_names.append(field_name) else: self._callback_functions[field_name] = getattr( self, callback_name, None)
# The field format callback methods require specific arguments hence # the check for unused arguments is disabled here. # pylint: disable=unused-argument def _FormatDateTime( self, output_mediator, event, event_data, event_data_stream): """Formats a date and time field in ISO 8601 format. Args: output_mediator (OutputMediator): mediates interactions between output modules and other components, such as storage and dfVFS. event (EventObject): event. event_data (EventData): event data. event_data_stream (EventDataStream): event data stream. Returns: str: date and time field with time zone offset, semantic time. """ if output_mediator.dynamic_time and event.date_time: iso8601_string = getattr(event.date_time, 'string', None) if iso8601_string: return iso8601_string date_time = event.date_time if event.date_time.is_local_time: # TODO: replace this by a more generic solution. if event.date_time.precision == '1s': timestamp, _ = divmod(event.timestamp, 1000000) date_time = dfdatetime_posix_time.PosixTime(timestamp=timestamp) else: date_time = dfdatetime_posix_time.PosixTimeInMicroseconds( timestamp=event.timestamp) iso8601_string = date_time.CopyToDateTimeStringISO8601() if not iso8601_string: return 'Invalid' if iso8601_string[-1] == 'Z': iso8601_string = f'{iso8601_string[:-1]:s}+00:00' if output_mediator.time_zone != pytz.UTC or date_time.time_zone_offset: # For output in a specific time zone overwrite the date, time in # seconds and time zone offset in the UTC ISO8601 string. year, month, day_of_month, hours, minutes, seconds = ( date_time.GetDateWithTimeOfDay()) try: datetime_object = datetime.datetime( year, month, day_of_month, hours, minutes, seconds, tzinfo=pytz.UTC) datetime_object = datetime_object.astimezone( output_mediator.time_zone) isoformat_string = datetime_object.isoformat() iso8601_string = ''.join([ isoformat_string[:19], iso8601_string[19:-6], isoformat_string[-6:]]) except (OSError, OverflowError, TypeError, ValueError): return 'Invalid' else: if not event.date_time or event.date_time.is_local_time: timestamp = event.timestamp else: timestamp, fraction_of_second = ( event.date_time.CopyToPosixTimestampWithFractionOfSecond()) timestamp = (timestamp or 0) * 1000000 if fraction_of_second: while fraction_of_second < 1000000: fraction_of_second *= 10 while fraction_of_second >= 1000000: fraction_of_second /= 10 if timestamp < 0: timestamp -= math.ceil(fraction_of_second) else: timestamp += math.ceil(fraction_of_second) # For now check if event.timestamp is set, to mimic existing behavior of # using 0000-00-00T00:00:00.000000+00:00 for 0 timestamp values if not timestamp: return '0000-00-00T00:00:00.000000+00:00' try: # Note that setting tzinfo to None can have unexpected results on # certain platforms. datetime_object = ( datetime.datetime(1970, 1, 1, tzinfo=pytz.UTC) + datetime.timedelta(microseconds=timestamp)) datetime_object = datetime_object.astimezone(output_mediator.time_zone) iso8601_string = datetime_object.isoformat() iso8601_string = ( f'{iso8601_string[:19]:s}.{datetime_object.microsecond:06d}' f'{iso8601_string[-6:]:s}') except (OSError, OverflowError, TypeError, ValueError) as exception: iso8601_string = '0000-00-00T00:00:00.000000+00:00' self._ReportEventError(event, event_data, ( f'unable to copy timestamp: {event.timestamp!s} to a human ' f'readable date and time with error: {exception!s}. Defaulting ' f'to: "{iso8601_string:s}"')) return iso8601_string def _FormatDisplayName( self, output_mediator, event, event_data, event_data_stream): """Formats the display name. The display_name field can be set as an attribute to event_data otherwise it is derived from the path specification. Args: output_mediator (OutputMediator): mediates interactions between output modules and other components, such as storage and dfVFS. event (EventObject): event. event_data (EventData): event data. event_data_stream (EventDataStream): event data stream. Returns: str: display name field. """ display_name = getattr(event_data, 'display_name', None) if not display_name: path_spec = getattr(event_data_stream, 'path_spec', None) if path_spec: display_name = output_mediator.GetDisplayNameForPathSpec(path_spec) else: display_name = '-' return display_name def _FormatFilename( self, output_mediator, event, event_data, event_data_stream): """Formats the filename. The filename field can be set as an attribute to event_data otherwise it is derived from the path specification. Args: output_mediator (OutputMediator): mediates interactions between output modules and other components, such as storage and dfVFS. event (EventObject): event. event_data (EventData): event data. event_data_stream (EventDataStream): event data stream. Returns: str: filename field. """ filename = getattr(event_data, 'filename', None) if not filename: path_spec = getattr(event_data_stream, 'path_spec', None) if path_spec: filename = output_mediator.GetRelativePathForPathSpec(path_spec) else: filename = '-' return filename def _FormatHostname( self, output_mediator, event, event_data, event_data_stream): """Formats a hostname field. Args: output_mediator (OutputMediator): mediates interactions between output modules and other components, such as storage and dfVFS. event (EventObject): event. event_data (EventData): event data. event_data_stream (EventDataStream): event data stream. Returns: str: hostname field. """ return output_mediator.GetHostname(event_data) def _FormatInode(self, output_mediator, event, event_data, event_data_stream): """Formats an inode field. Args: output_mediator (OutputMediator): mediates interactions between output modules and other components, such as storage and dfVFS. event (EventObject): event. event_data (EventData): event data. event_data_stream (EventDataStream): event data stream. Returns: str: inode field. """ inode = getattr(event_data, 'inode', None) # Note that inode can contain 0. if inode is None: path_specification = getattr(event_data_stream, 'path_spec', None) if path_specification: if path_specification.type_indicator in ( dfvfs_definitions.TYPE_INDICATOR_APFS, dfvfs_definitions.TYPE_INDICATOR_HFS): inode = getattr(path_specification, 'identifier', None) elif path_specification.type_indicator == ( dfvfs_definitions.TYPE_INDICATOR_NTFS): inode = getattr(path_specification, 'mft_entry', None) elif path_specification.type_indicator in ( dfvfs_definitions.TYPE_INDICATOR_EXT, dfvfs_definitions.TYPE_INDICATOR_TSK): # Note that inode can contain a TSK metadata address. inode = getattr(path_specification, 'inode', None) if inode is None: inode = '-' elif isinstance(inode, int): inode = f'{inode:d}' return inode def _FormatMACB(self, output_mediator, event, event_data, event_data_stream): """Formats a legacy MACB representation field. Args: output_mediator (OutputMediator): mediates interactions between output modules and other components, such as storage and dfVFS. event (EventObject): event. event_data (EventData): event data. event_data_stream (EventDataStream): event data stream. Returns: str: MACB field. """ return output_mediator.GetMACBRepresentation(event, event_data) def _FormatMessage( self, output_mediator, event, event_data, event_data_stream): """Formats a message field. Args: output_mediator (OutputMediator): mediates interactions between output modules and other components, such as storage and dfVFS. event (EventObject): event. event_data (EventData): event data. event_data_stream (EventDataStream): event data stream. Returns: str: message field. """ message_formatter = output_mediator.GetMessageFormatter( event_data.data_type) if not message_formatter: logger.warning( f'Using default message formatter for data type: ' f'{event_data.data_type:s}') message_formatter = self._DEFAULT_MESSAGE_FORMATTER event_values = event_data.CopyToDict() message_formatter.FormatEventValues(output_mediator, event_values) return message_formatter.GetMessage(event_values) def _FormatMessageShort( self, output_mediator, event, event_data, event_data_stream): """Formats a short message field. Args: output_mediator (OutputMediator): mediates interactions between output modules and other components, such as storage and dfVFS. event (EventObject): event. event_data (EventData): event data. event_data_stream (EventDataStream): event data stream. Returns: str: short message field. """ message_formatter = output_mediator.GetMessageFormatter( event_data.data_type) if not message_formatter: logger.warning( f'Using default message formatter for data type: ' f'{event_data.data_type:s}') message_formatter = self._DEFAULT_MESSAGE_FORMATTER event_values = event_data.CopyToDict() message_formatter.FormatEventValues(output_mediator, event_values) return message_formatter.GetMessageShort(event_values) def _FormatParser( self, output_mediator, event, event_data, event_data_stream): """Formats a parser field. Args: output_mediator (OutputMediator): mediates interactions between output modules and other components, such as storage and dfVFS. event (EventObject): event. event_data (EventData): event data. event_data_stream (EventDataStream): event data stream. Returns: str: parser field. """ return getattr(event_data, '_parser_chain', None) or '-' def _FormatSource( self, output_mediator, event, event_data, event_data_stream): """Formats a source field. Args: output_mediator (OutputMediator): mediates interactions between output modules and other components, such as storage and dfVFS. event (EventObject): event. event_data (EventData): event data. event_data_stream (EventDataStream): event data stream. Returns: str: source field. """ data_type = getattr(event_data, 'data_type', None) or '-' _, source = output_mediator.GetSourceMapping(data_type) return source or 'N/A' def _FormatSourceShort( self, output_mediator, event, event_data, event_data_stream): """Formats a short source field. Args: output_mediator (OutputMediator): mediates interactions between output modules and other components, such as storage and dfVFS. event (EventObject): event. event_data (EventData): event data. event_data_stream (EventDataStream): event data stream. Returns: str: short source field. """ data_type = getattr(event_data, 'data_type', None) or '-' source_short, _ = output_mediator.GetSourceMapping(data_type) return source_short or 'N/A' def _FormatTag(self, output_mediator, event_tag): """Formats an event tag field. Args: output_mediator (OutputMediator): mediates interactions between output modules and other components, such as storage and dfVFS. event_tag (EventTag): event tag or None if not set. Returns: str: event tag labels or "-" if event tag is not set. """ if not event_tag: return '-' return ' '.join(event_tag.labels) def _FormatTime(self, output_mediator, event, event_data, event_data_stream): """Formats a time field. Args: output_mediator (OutputMediator): mediates interactions between output modules and other components, such as storage and dfVFS. event (EventObject): event. event_data (EventData): event data. event_data_stream (EventDataStream): event data stream. Returns: str: time in seconds formatted as "HH:MM:SS" or "--:--:--" on error. """ # For now check if event.timestamp is set, to mimic existing behavior of # using --:--:-- for 0 timestamp values. if not event.timestamp: return '--:--:--' date_time = event.date_time if not date_time or date_time.is_local_time: date_time = dfdatetime_posix_time.PosixTimeInMicroseconds( timestamp=event.timestamp) year, month, day_of_month, hours, minutes, seconds = ( date_time.GetDateWithTimeOfDay()) if output_mediator.time_zone != pytz.UTC: try: datetime_object = datetime.datetime( year, month, day_of_month, hours, minutes, seconds, tzinfo=pytz.UTC) datetime_object = datetime_object.astimezone(output_mediator.time_zone) hours, minutes, seconds = ( datetime_object.hour, datetime_object.minute, datetime_object.second) except (OSError, OverflowError, TypeError, ValueError): hours, minutes, seconds = (None, None, None) if None in (hours, minutes, seconds): self._ReportEventError(event, event_data, ( f'unable to copy timestamp: {event.timestamp!s} to a human readable ' f'time. Defaulting to: "--:--:--"')) return '--:--:--' return f'{hours:02d}:{minutes:02d}:{seconds:02d}' def _FormatTimeZone( self, output_mediator, event, event_data, event_data_stream): """Formats a time zone field. Args: output_mediator (OutputMediator): mediates interactions between output modules and other components, such as storage and dfVFS. event (EventObject): event. event_data (EventData): event data. event_data_stream (EventDataStream): event data stream. Returns: str: time zone field. """ if not event.timestamp: return '-' date_time = event.date_time if not date_time or date_time.is_local_time: date_time = dfdatetime_posix_time.PosixTimeInMicroseconds( timestamp=event.timestamp) if output_mediator.time_zone == pytz.UTC: return 'UTC' year, month, day_of_month, hours, minutes, seconds = ( date_time.GetDateWithTimeOfDay()) try: # For tzname to work the datetime object must be naive (without # a time zone). datetime_object = datetime.datetime( year, month, day_of_month, hours, minutes, seconds) return output_mediator.time_zone.tzname(datetime_object) except (OverflowError, TypeError, ValueError): self._ReportEventError(event, event_data, ( f'unable to copy timestamp: {event.timestamp!s} to a human readable ' f'time zone. Defaulting to: "-"')) return '-' def _FormatUsername( self, output_mediator, event, event_data, event_data_stream): """Formats an username field. Args: output_mediator (OutputMediator): mediates interactions between output modules and other components, such as storage and dfVFS. event (EventObject): event. event_data (EventData): event data. event_data_stream (EventDataStream): event data stream. Returns: str: username field. """ return output_mediator.GetUsername(event_data) def _FormatValues( self, output_mediator, event, event_data, event_data_stream): """Formats a values. Args: output_mediator (OutputMediator): mediates interactions between output modules and other components, such as storage and dfVFS. event (EventObject): event. event_data (EventData): event data. event_data_stream (EventDataStream): event data stream. Returns: str: values field. """ values = getattr(event_data, 'values', None) if isinstance(values, list) and event_data.data_type in ( 'windows:registry:key_value', 'windows:registry:service'): values = ' '.join( f'{name or "(default)"}: [{data_type:s}] {data or "(empty)"}' for name, data_type, data in sorted(values)) return values # pylint: enable=unused-argument def _ReportEventError(self, event, event_data, error_message): """Reports an event related error. Args: event (EventObject): event. event_data (EventData): event data. error_message (str): error message. """ event_identifier = event.GetIdentifier() event_identifier_string = event_identifier.CopyToString() display_name = getattr(event_data, 'display_name', None) or 'N/A' parser_chain = getattr(event_data, '_parser_chain', None) or 'N/A' logger.error(( f'Event: {event_identifier_string!s} description: ' f'{event.timestamp_desc:s} data type: {event_data.data_type:s} ' f'display name: {display_name:s} parser chain: {parser_chain:s} with ' f'error: {error_message:s}'))
[docs] def GetFormattedField( self, output_mediator, field_name, event, event_data, event_data_stream, event_tag): """Formats the specified field. Args: output_mediator (OutputMediator): mediates interactions between output modules and other components, such as storage and dfVFS. field_name (str): name of the field. event (EventObject): event. event_data (EventData): event data. event_data_stream (EventDataStream): event data stream. event_tag (EventTag): event tag. Returns: str: value of the field or None if not available. """ if field_name in self._event_tag_field_names: return self._FormatTag(output_mediator, event_tag) callback_function = self._callback_functions.get(field_name) if callback_function: output_value = callback_function( output_mediator, event, event_data, event_data_stream) elif field_name in self._event_data_stream_field_names: output_value = getattr(event_data_stream, field_name, None) else: output_value = getattr(event_data, field_name, None) if output_value is not None and not isinstance(output_value, str): output_value = f'{output_value!s}' return output_value