# -*- coding: utf-8 -*-
"""The timeliner, which is used to generate events from event data."""
import collections
import copy
import datetime
import os
import pytz
from dfdatetime import interface as dfdatetime_interface
from dfdatetime import semantic_time as dfdatetime_semantic_time
from plaso.containers import events
from plaso.containers import warnings
from plaso.engine import yaml_timeliner_file
from plaso.lib import definitions
[docs]
class EventDataTimeliner(object):
"""The event data timeliner.
Attributes:
number_of_produced_events (int): number of produced events.
parsers_counter (collections.Counter): number of events per parser or
parser plugin.
"""
_DEFAULT_TIME_ZONE = pytz.UTC
_INT64_MIN = -1 << 63
_INT64_MAX = (1 << 63) - 1
_TIMELINER_CONFIGURATION_FILENAME = 'timeliner.yaml'
[docs]
def __init__(
self, data_location=None, preferred_year=None,
system_configurations=None):
"""Initializes an event data timeliner.
Args:
data_location (Optional[str]): path of the timeliner configuration file.
preferred_year (Optional[int]): preferred initial year value for date-less
date and time values.
system_configurations (Optional[list[SystemConfigurationArtifact]]):
system configurations.
"""
super(EventDataTimeliner, self).__init__()
self._attribute_mappings = {}
self._base_dates = {}
self._current_date = self._GetCurrentDate()
self._data_location = data_location
self._place_holder_event = set()
self._preferred_time_zone = None
self._preferred_year = preferred_year
self._time_zone_per_path_spec = None
self.number_of_produced_events = 0
self.parsers_counter = collections.Counter()
self._CreateTimeZonePerPathSpec(system_configurations)
self._ReadConfigurationFile()
def _CreateTimeZonePerPathSpec(self, system_configurations):
"""Creates the time zone per path specification lookup table.
Args:
system_configurations (list[SystemConfigurationArtifact]): system
configurations.
"""
self._time_zone_per_path_spec = {}
for system_configuration in system_configurations or []:
if system_configuration.time_zone:
for path_spec in system_configuration.path_specs:
if path_spec.parent:
self._time_zone_per_path_spec[path_spec.parent] = (
system_configuration.time_zone)
def _GetBaseDate(self, storage_writer, event_data):
"""Retrieves the base date.
Args:
storage_writer (StorageWriter): storage writer.
event_data (EventData): event data.
Returns:
tuple[int, int, int]: base date, as a tuple of year, month, day of month.
"""
# If preferred year is set considered it a user override, otherwise try
# to determine the year based on the date-less log helper or fallback to
# the current year.
if self._preferred_year:
current_date = (self._preferred_year, 1, 1)
else:
current_date = self._current_date
event_data_stream_identifier = event_data.GetEventDataStreamIdentifier()
if not event_data_stream_identifier:
return current_date[0], 0, 0
lookup_key = event_data_stream_identifier.CopyToString()
base_date = self._base_dates.get(lookup_key, None)
if base_date:
return base_date
filter_expression = f'_event_data_stream_identifier == "{lookup_key:s}"'
date_less_log_helpers = list(storage_writer.GetAttributeContainers(
events.DateLessLogHelper.CONTAINER_TYPE,
filter_expression=filter_expression))
if not date_less_log_helpers:
message = (
f'missing date-less log helper, defaulting to date: '
f'{current_date[0]:d}-{current_date[1]:d}-{current_date[2]:d}')
self._ProduceTimeliningWarning(storage_writer, event_data, message)
base_date = (current_date[0], 0, 0)
else:
date_less_log_helper = date_less_log_helpers[0]
earliest_date = date_less_log_helper.GetEarliestDate()
last_relative_date = date_less_log_helper.GetLastRelativeDate()
latest_date = date_less_log_helper.GetLatestDate()
if date_less_log_helper.granularity == (
date_less_log_helper.GRANULARITY_NO_YEAR):
current_date = (current_date[0], 0, 0)
if earliest_date is None or last_relative_date is None:
last_date = None
else:
last_date = tuple(map(
lambda earliest, last_relative: earliest + last_relative,
earliest_date, last_relative_date))
if earliest_date is None and latest_date is None:
message = (
f'missing earliest and latest date in date-less log helper, '
f'defaulting to date: {current_date[0]:d}-{current_date[1]:d}-'
f'{current_date[2]:d}')
self._ProduceTimeliningWarning(storage_writer, event_data, message)
base_date = current_date
elif last_date < current_date:
base_date = earliest_date
elif latest_date < current_date:
message = (
f'earliest date: {earliest_date[0]:d}-{earliest_date[1]:d}-'
f'{earliest_date[2]:d} as base date would exceed : '
f'{current_date[0]:d}-{current_date[1]:d}-{current_date[2]:d} + '
f'{last_relative_date[0]:d}-{last_relative_date[1]:d}-'
f'{last_relative_date[2]:d}, using latest date: {latest_date[0]:d}-'
f'{latest_date[1]:d}-{latest_date[2]:d}')
self._ProduceTimeliningWarning(storage_writer, event_data, message)
base_date = tuple(map(
lambda latest, last_relative: latest - last_relative,
latest_date, last_relative_date))
else:
message = (
f'earliest date: {earliest_date[0]:d}-{earliest_date[1]:d}-'
f'{earliest_date[2]:d} and latest: date: {latest_date[0]:d}-'
f'{latest_date[1]:d}-{latest_date[2]:d} as base date would exceed '
f'date: {current_date[0]:d}-{current_date[1]:d}-'
f'{current_date[2]:d} + {last_relative_date[0]:d}-'
f'{last_relative_date[1]:d}-{last_relative_date[2]:d}, using date: '
f'{current_date[0]:d}-{current_date[1]:d}-{current_date[2]:d}')
self._ProduceTimeliningWarning(storage_writer, event_data, message)
base_date = tuple(map(
lambda current, last_relative: current - last_relative,
current_date, last_relative_date))
self._base_dates[lookup_key] = base_date
return base_date
def _GetCurrentDate(self):
"""Retrieves current date.
Returns:
tuple[int, int, int]: current date, as a tuple of year, month, day of
month.
"""
datetime_object = datetime.datetime.now(pytz.UTC)
return datetime_object.year, datetime_object.month, datetime_object.day
def _GetEvent(
self, storage_writer, event_data, event_data_stream, date_time,
date_time_description):
"""Retrieves an event.
Args:
storage_writer (StorageWriter): storage writer.
event_data (EventData): event data.
event_data_stream (EventDataStream): event data stream.
date_time (dfdatetime.DateTimeValues): date and time values.
date_time_description (str): description of the meaning of the date and
time values.
Returns:
EventObject: event.
"""
timestamp = None
if date_time.is_delta:
base_date = self._GetBaseDate(storage_writer, event_data)
try:
date_time = date_time.NewFromDeltaAndDate(*base_date)
except ValueError as exception:
self._ProduceTimeliningWarning(
storage_writer, event_data, str(exception))
date_time = dfdatetime_semantic_time.InvalidTime()
timestamp = 0
if timestamp is None:
timestamp = date_time.GetPlasoTimestamp()
if timestamp is None:
self._ProduceTimeliningWarning(
storage_writer, event_data, 'unable to determine timestamp')
date_time = dfdatetime_semantic_time.InvalidTime()
timestamp = 0
# Check for out of bounds timestamps, for example if a data format has
# changed and the conversion leads to an incorrect large integer value.
# Integer values that are larger than 64-bit will cause an OverflowError
# in the SQLite storage.
elif timestamp < self._INT64_MIN or timestamp > self._INT64_MAX:
self._ProduceTimeliningWarning(
storage_writer, event_data, 'timestamp out of bounds')
date_time = dfdatetime_semantic_time.InvalidTime()
timestamp = 0
else:
if date_time.is_local_time:
time_zone = None
if date_time.time_zone_hint:
# TODO: cache time zones per hint.
try:
time_zone = pytz.timezone(date_time.time_zone_hint)
except pytz.UnknownTimeZoneError:
message = (
f'unsupported time zone hint: {date_time.time_zone_hint:s}, '
f'using default time zone')
self._ProduceTimeliningWarning(storage_writer, event_data, message)
if not time_zone and event_data_stream:
try:
time_zone = self._GetTimeZoneByPathSpec(event_data_stream.path_spec)
except pytz.UnknownTimeZoneError:
message = (
f'unsupported system time zone: {date_time.time_zone_hint:s}, '
f'using default time zone')
self._ProduceTimeliningWarning(storage_writer, event_data, message)
if not time_zone:
time_zone = self._preferred_time_zone or self._DEFAULT_TIME_ZONE
date_time = copy.deepcopy(date_time)
date_time.is_local_time = False
if time_zone != pytz.UTC:
datetime_object = datetime.datetime(
1970, 1, 1, 0, 0, 0, 0, tzinfo=None)
datetime_object += datetime.timedelta(microseconds=timestamp)
datetime_delta = time_zone.utcoffset(datetime_object, is_dst=False)
seconds_delta = int(datetime_delta.total_seconds())
timestamp -= seconds_delta * definitions.MICROSECONDS_PER_SECOND
date_time.time_zone_offset = seconds_delta // 60
event = events.EventObject()
event.date_time = date_time
event.timestamp = timestamp
event.timestamp_desc = date_time_description
event_data_identifier = event_data.GetIdentifier()
event.SetEventDataIdentifier(event_data_identifier)
return event
def _GetTimeZoneByPathSpec(self, path_spec):
"""Retrieves a time zone for a specific path specification.
Args:
path_spec (dfvfs.PathSpec): path specification.
Returns:
pytz.tzfile: time zone or None if not available.
Raises:
pytz.UnknownTimeZoneError: if the time zone is unknown.
"""
if not path_spec or not path_spec.parent:
return None
time_zone = self._time_zone_per_path_spec.get(path_spec.parent, None)
if not time_zone:
return None
if isinstance(time_zone, str):
try:
time_zone = pytz.timezone(time_zone)
self._time_zone_per_path_spec[path_spec.parent] = time_zone
except pytz.UnknownTimeZoneError as exeception:
self._time_zone_per_path_spec[path_spec.parent] = None
raise exeception
return time_zone
def _ProduceTimeliningWarning(self, storage_writer, event_data, message):
"""Produces a timelining warning.
Args:
storage_writer (StorageWriter): storage writer.
event_data (EventData): event data.
message (str): message of the warning.
"""
parser_chain = getattr(event_data, '_parser_chain', None)
path_spec = None
event_data_stream_identifier = event_data.GetEventDataStreamIdentifier()
if event_data_stream_identifier:
event_data_stream = storage_writer.GetAttributeContainerByIdentifier(
events.EventDataStream.CONTAINER_TYPE, event_data_stream_identifier)
if event_data_stream:
path_spec = event_data_stream.path_spec
warning = warnings.TimeliningWarning(
message=message, parser_chain=parser_chain, path_spec=path_spec)
storage_writer.AddAttributeContainer(warning)
def _ReadConfigurationFile(self):
"""Reads a timeliner configuration file.
Raises:
KeyError: if the attribute mappings are already set for the corresponding
data type.
"""
path = os.path.join(
self._data_location, self._TIMELINER_CONFIGURATION_FILENAME)
configuration_file = yaml_timeliner_file.YAMLTimelinerConfigurationFile()
for timeliner_definition in configuration_file.ReadFromFile(path):
if timeliner_definition.data_type in self._attribute_mappings:
raise KeyError((
f'Attribute mappings for data type: '
f'{timeliner_definition.data_type:s} already set.'))
self._attribute_mappings[timeliner_definition.data_type] = (
timeliner_definition.attribute_mappings)
if timeliner_definition.place_holder_event:
self._place_holder_event.add(timeliner_definition.data_type)
[docs]
def ProcessEventData(self, storage_writer, event_data, event_data_stream):
"""Generate events from event data.
Args:
storage_writer (StorageWriter): storage writer.
event_data (EventData): event data.
event_data_stream (EventDataStream): event data stream.
"""
self.number_of_produced_events = 0
attribute_mappings = self._attribute_mappings.get(
event_data.data_type) or {}
if (not attribute_mappings and
event_data.data_type not in self._place_holder_event):
return
parser_name = None
parser_chain = getattr(event_data, '_parser_chain', None)
if parser_chain:
parser_name = parser_chain.rsplit('/', maxsplit=1)[-1]
number_of_events = 0
for attribute_name, time_description in attribute_mappings.items():
attribute_values = getattr(event_data, attribute_name, None) or []
if not isinstance(attribute_values, list):
attribute_values = [attribute_values]
for attribute_value in attribute_values:
if not isinstance(attribute_value, dfdatetime_interface.DateTimeValues):
message = f'unsupported date time attribute: {attribute_name:s}'
self._ProduceTimeliningWarning(storage_writer, event_data, message)
continue
event = self._GetEvent(
storage_writer, event_data, event_data_stream, attribute_value,
time_description)
try:
storage_writer.AddAttributeContainer(event)
except OverflowError as exception:
message = f'unable to add event with error: {exception!s}'
self._ProduceTimeliningWarning(storage_writer, event_data, message)
continue
number_of_events += 1
if parser_name:
self.parsers_counter[parser_name] += 1
self.parsers_counter['total'] += 1
self.number_of_produced_events += 1
# Create a place holder event for event_data without date and time
# values to map.
if (not number_of_events and
event_data.data_type in self._place_holder_event):
date_time = dfdatetime_semantic_time.NotSet()
event = self._GetEvent(
storage_writer, event_data, event_data_stream, date_time,
definitions.TIME_DESCRIPTION_NOT_A_TIME)
storage_writer.AddAttributeContainer(event)
if parser_name:
self.parsers_counter[parser_name] += 1
self.parsers_counter['total'] += 1
self.number_of_produced_events += 1
[docs]
def SetPreferredTimeZone(self, time_zone_string):
"""Sets the preferred time zone for zone-less date and time values.
Args:
time_zone_string (str): time zone such as "Europe/Amsterdam" or None if
the time zone determined by preprocessing or the default should be
used.
Raises:
ValueError: if the time zone is not supported.
"""
time_zone = None
if time_zone_string:
try:
time_zone = pytz.timezone(time_zone_string)
except pytz.UnknownTimeZoneError:
raise ValueError(f'Unsupported time zone: {time_zone_string!s}')
self._preferred_time_zone = time_zone