Source code for plaso.engine.timeliner

"""The timeliner, which is used to generate events from event data."""

import collections
import copy
import datetime
import decimal
import os
import pytz

from dfdatetime import interface as dfdatetime_interface
from dfdatetime import semantic_time as dfdatetime_semantic_time

from plaso.containers import events
from plaso.containers import warnings
from plaso.engine import yaml_timeliner_file
from plaso.lib import definitions


[docs] class EventDataTimeliner: """The event data timeliner. Attributes: data_types_counter (collections.Counter): number of events per data types. number_of_produced_events (int): number of produced events. parsers_counter (collections.Counter): number of events per parser or parser plugin. """ _DEFAULT_TIME_ZONE = pytz.UTC _INT64_MIN = -1 << 63 _INT64_MAX = (1 << 63) - 1 _TIMELINER_CONFIGURATION_FILENAME = "timeliner.yaml"
[docs] def __init__( self, data_location=None, preferred_year=None, system_configurations=None ): """Initializes an event data timeliner. Args: data_location (Optional[str]): path of the timeliner configuration file. preferred_year (Optional[int]): preferred initial year value for date-less date and time values. system_configurations (Optional[list[SystemConfigurationArtifact]]): system configurations. """ super().__init__() self._attribute_mappings = {} self._base_dates = {} self._current_date = self._GetCurrentDate() self._data_location = data_location self._place_holder_event = set() self._preferred_time_zone = None self._preferred_year = preferred_year self._time_zone_per_path_spec = None self.data_types_counter = collections.Counter() self.number_of_produced_events = 0 self.parsers_counter = collections.Counter() self._CreateTimeZonePerPathSpec(system_configurations) self._ReadConfigurationFile()
def _CreateTimeZonePerPathSpec(self, system_configurations): """Creates the time zone per path specification lookup table. Args: system_configurations (list[SystemConfigurationArtifact]): system configurations. """ self._time_zone_per_path_spec = {} for system_configuration in system_configurations or []: if system_configuration.time_zone: for path_spec in system_configuration.path_specs: if path_spec.parent: self._time_zone_per_path_spec[path_spec.parent] = ( system_configuration.time_zone ) def _GetBaseDate(self, storage_writer, event_data): """Retrieves the base date. Args: storage_writer (StorageWriter): storage writer. event_data (EventData): event data. Returns: tuple[int, int, int]: base date, as a tuple of year, month, day of month. """ # If preferred year is set considered it a user override, otherwise try # to determine the year based on the date-less log helper or fallback to # the current year. if self._preferred_year: return self._preferred_year, 0, 0 current_date = self._current_date event_data_stream_identifier = event_data.GetEventDataStreamIdentifier() if not event_data_stream_identifier: return current_date[0], 0, 0 lookup_key = event_data_stream_identifier.CopyToString() base_date = self._base_dates.get(lookup_key) if base_date: return base_date filter_expression = f'_event_data_stream_identifier == "{lookup_key:s}"' date_less_log_helpers = list( storage_writer.GetAttributeContainers( events.DateLessLogHelper.CONTAINER_TYPE, filter_expression=filter_expression, ) ) if not date_less_log_helpers: message = ( f"missing date-less log helper, defaulting to date: " f"{current_date[0]:d}-{current_date[1]:d}-{current_date[2]:d}" ) self._ProduceTimeliningWarning( storage_writer, event_data, message, ) base_date = (current_date[0], 0, 0) else: date_less_log_helper = date_less_log_helpers[0] earliest_date = date_less_log_helper.GetEarliestDate() last_relative_date = date_less_log_helper.GetLastRelativeDate() latest_date = date_less_log_helper.GetLatestDate() if date_less_log_helper.granularity == ( date_less_log_helper.GRANULARITY_NO_YEAR ): current_date = (current_date[0], 0, 0) if earliest_date is None or last_relative_date is None: last_date = None else: last_date = tuple( map( lambda earliest, last_relative: earliest + last_relative, earliest_date, last_relative_date, ) ) if earliest_date is None and latest_date is None: message = ( f"missing earliest and latest date in date-less log helper, " f"defaulting to date: {current_date[0]:d}-{current_date[1]:d}-" f"{current_date[2]:d}" ) self._ProduceTimeliningWarning( storage_writer, event_data, message, ) base_date = current_date elif last_date < current_date: base_date = earliest_date elif latest_date < current_date: message = ( f"earliest date: {earliest_date[0]:d}-{earliest_date[1]:d}-" f"{earliest_date[2]:d} as base date would exceed : " f"{current_date[0]:d}-{current_date[1]:d}-{current_date[2]:d} + " f"{last_relative_date[0]:d}-{last_relative_date[1]:d}-" f"{last_relative_date[2]:d}, using latest date: {latest_date[0]:d}-" f"{latest_date[1]:d}-{latest_date[2]:d}" ) self._ProduceTimeliningWarning( storage_writer, event_data, message, ) base_date = tuple( map( lambda latest, last_relative: latest - last_relative, latest_date, last_relative_date, ) ) else: message = ( f"earliest date: {earliest_date[0]:d}-{earliest_date[1]:d}-" f"{earliest_date[2]:d} and latest date: {latest_date[0]:d}-" f"{latest_date[1]:d}-{latest_date[2]:d} as base date would exceed " f"date: {current_date[0]:d}-{current_date[1]:d}-" f"{current_date[2]:d} + {last_relative_date[0]:d}-" f"{last_relative_date[1]:d}-{last_relative_date[2]:d}, using date: " f"{current_date[0]:d}-{current_date[1]:d}-{current_date[2]:d}" ) self._ProduceTimeliningWarning( storage_writer, event_data, message, ) base_date = tuple( map( lambda current, last_relative: current - last_relative, current_date, last_relative_date, ) ) self._base_dates[lookup_key] = base_date return base_date def _GetCurrentDate(self): """Retrieves current date. Returns: tuple[int, int, int]: current date, as a tuple of year, month, day of month. """ datetime_object = datetime.datetime.now(pytz.UTC) return datetime_object.year, datetime_object.month, datetime_object.day def _GetEvent( self, storage_writer, event_data, event_data_stream, date_time, date_time_description, ): """Retrieves an event. Args: storage_writer (StorageWriter): storage writer. event_data (EventData): event data. event_data_stream (EventDataStream): event data stream. date_time (dfdatetime.DateTimeValues): date and time values. date_time_description (str): description of the meaning of the date and time values. Returns: EventObject: event. """ timestamp = None if date_time.is_delta: base_date = self._GetBaseDate(storage_writer, event_data) try: date_time = date_time.NewFromDeltaAndDate(*base_date) except ValueError as exception: self._ProduceTimeliningWarning( storage_writer, event_data, str(exception) ) date_time = dfdatetime_semantic_time.InvalidTime() timestamp = 0 if timestamp is None: try: timestamp = date_time.GetPlasoTimestamp() except decimal.InvalidOperation as exception: self._ProduceTimeliningWarning( storage_writer, event_data, str(exception) ) date_time = dfdatetime_semantic_time.InvalidTime() timestamp = 0 if timestamp is None: self._ProduceTimeliningWarning( storage_writer, event_data, "unable to determine timestamp" ) date_time = dfdatetime_semantic_time.InvalidTime() timestamp = 0 # Check for out of bounds timestamps, for example if a data format has # changed and the conversion leads to an incorrect large integer value. # Integer values that are larger than 64-bit will cause an OverflowError # in the SQLite storage. elif timestamp < self._INT64_MIN or timestamp > self._INT64_MAX: self._ProduceTimeliningWarning( storage_writer, event_data, "timestamp out of bounds" ) date_time = dfdatetime_semantic_time.InvalidTime() timestamp = 0 else: if date_time.is_local_time: time_zone = None if date_time.time_zone_hint: # TODO: cache time zones per hint. try: time_zone = pytz.timezone(date_time.time_zone_hint) except pytz.UnknownTimeZoneError: message = ( f"unsupported time zone hint: " f"{date_time.time_zone_hint:s}, using default time zone" ) self._ProduceTimeliningWarning( storage_writer, event_data, message, ) if not time_zone and event_data_stream: try: time_zone = self._GetTimeZoneByPathSpec( event_data_stream.path_spec ) except pytz.UnknownTimeZoneError: message = ( f"unsupported system time zone: " f"{date_time.time_zone_hint:s}, using default time zone" ) self._ProduceTimeliningWarning( storage_writer, event_data, message, ) if not time_zone: time_zone = self._preferred_time_zone or self._DEFAULT_TIME_ZONE date_time = copy.deepcopy(date_time) date_time.is_local_time = False if time_zone != pytz.UTC: datetime_object = datetime.datetime( 1970, 1, 1, 0, 0, 0, 0, tzinfo=None ) datetime_object += datetime.timedelta(microseconds=timestamp) datetime_delta = time_zone.utcoffset(datetime_object, is_dst=False) seconds_delta = int(datetime_delta.total_seconds()) timestamp -= seconds_delta * definitions.MICROSECONDS_PER_SECOND date_time.time_zone_offset = seconds_delta // 60 event = events.EventObject() event.date_time = date_time event.timestamp = timestamp event.timestamp_desc = date_time_description event_data_identifier = event_data.GetIdentifier() event.SetEventDataIdentifier(event_data_identifier) return event def _GetTimeZoneByPathSpec(self, path_spec): """Retrieves a time zone for a specific path specification. Args: path_spec (dfvfs.PathSpec): path specification. Returns: pytz.tzfile: time zone or None if not available. Raises: pytz.UnknownTimeZoneError: if the time zone is unknown. """ if not path_spec or not path_spec.parent: return None time_zone = self._time_zone_per_path_spec.get(path_spec.parent) if not time_zone: return None if isinstance(time_zone, str): try: time_zone = pytz.timezone(time_zone) self._time_zone_per_path_spec[path_spec.parent] = time_zone except pytz.UnknownTimeZoneError as exeception: self._time_zone_per_path_spec[path_spec.parent] = None raise exeception return time_zone def _ProduceTimeliningWarning(self, storage_writer, event_data, message): """Produces a timelining warning. Args: storage_writer (StorageWriter): storage writer. event_data (EventData): event data. message (str): message of the warning. """ parser_chain = getattr(event_data, "_parser_chain", None) path_spec = None event_data_stream_identifier = event_data.GetEventDataStreamIdentifier() if event_data_stream_identifier: event_data_stream = storage_writer.GetAttributeContainerByIdentifier( events.EventDataStream.CONTAINER_TYPE, event_data_stream_identifier ) if event_data_stream: path_spec = event_data_stream.path_spec warning = warnings.TimeliningWarning( message=message, parser_chain=parser_chain, path_spec=path_spec ) storage_writer.AddAttributeContainer(warning) def _ReadConfigurationFile(self): """Reads a timeliner configuration file. Raises: KeyError: if the attribute mappings are already set for the corresponding data type. """ path = os.path.join(self._data_location, self._TIMELINER_CONFIGURATION_FILENAME) configuration_file = yaml_timeliner_file.YAMLTimelinerConfigurationFile() for timeliner_definition in configuration_file.ReadFromFile(path): if timeliner_definition.data_type in self._attribute_mappings: raise KeyError( ( f"Attribute mappings for data type: " f"{timeliner_definition.data_type:s} already set." ) ) self._attribute_mappings[timeliner_definition.data_type] = ( timeliner_definition.attribute_mappings ) if timeliner_definition.place_holder_event: self._place_holder_event.add(timeliner_definition.data_type)
[docs] def ProcessEventData(self, storage_writer, event_data, event_data_stream): """Generate events from event data. Args: storage_writer (StorageWriter): storage writer. event_data (EventData): event data. event_data_stream (EventDataStream): event data stream. """ self.number_of_produced_events = 0 attribute_mappings = self._attribute_mappings.get(event_data.data_type) or {} if ( not attribute_mappings and event_data.data_type not in self._place_holder_event ): return data_type_name = getattr(event_data, "data_type", None) parser_name = None parser_chain = getattr(event_data, "_parser_chain", None) if parser_chain: parser_name = parser_chain.rsplit("/", maxsplit=1)[-1] number_of_events = 0 for attribute_name, time_description in attribute_mappings.items(): attribute_values = getattr(event_data, attribute_name, None) or [] if not isinstance(attribute_values, list): attribute_values = [attribute_values] for attribute_value in attribute_values: if not isinstance(attribute_value, dfdatetime_interface.DateTimeValues): message = f"unsupported date time attribute: {attribute_name:s}" self._ProduceTimeliningWarning(storage_writer, event_data, message) continue event = self._GetEvent( storage_writer, event_data, event_data_stream, attribute_value, time_description, ) try: storage_writer.AddAttributeContainer(event) except OverflowError as exception: message = f"unable to add event with error: {exception!s}" self._ProduceTimeliningWarning(storage_writer, event_data, message) continue number_of_events += 1 if data_type_name: self.data_types_counter[data_type_name] += 1 self.data_types_counter["total"] += 1 if parser_name: self.parsers_counter[parser_name] += 1 self.parsers_counter["total"] += 1 self.number_of_produced_events += 1 # Create a place holder event for event_data without date and time values to # map. if not number_of_events and event_data.data_type in self._place_holder_event: date_time = dfdatetime_semantic_time.NotSet() event = self._GetEvent( storage_writer, event_data, event_data_stream, date_time, definitions.TIME_DESCRIPTION_NOT_A_TIME, ) storage_writer.AddAttributeContainer(event) if data_type_name: self.data_types_counter[data_type_name] += 1 self.data_types_counter["total"] += 1 if parser_name: self.parsers_counter[parser_name] += 1 self.parsers_counter["total"] += 1 self.number_of_produced_events += 1
[docs] def SetPreferredTimeZone(self, time_zone_string): """Sets the preferred time zone for zone-less date and time values. Args: time_zone_string (str): time zone such as "Europe/Amsterdam" or None if the time zone determined by preprocessing or the default should be used. Raises: ValueError: if the time zone is not supported. """ time_zone = None if time_zone_string: try: time_zone = pytz.timezone(time_zone_string) except pytz.UnknownTimeZoneError: raise ValueError(f"Unsupported time zone: {time_zone_string!s}") self._preferred_time_zone = time_zone