# -*- coding: utf-8 -*-
"""The timeliner, which is used to generate events from event data."""
import collections
import copy
import datetime
import os
import pytz
from dfdatetime import interface as dfdatetime_interface
from dfdatetime import semantic_time as dfdatetime_semantic_time
from plaso.containers import events
from plaso.containers import warnings
from plaso.engine import yaml_timeliner_file
from plaso.lib import definitions
[docs]class EventDataTimeliner(object):
"""The event data timeliner.
Attributes:
number_of_produced_events (int): number of produced events.
parsers_counter (collections.Counter): number of events per parser or
parser plugin.
"""
_DEFAULT_TIME_ZONE = pytz.UTC
_INT64_MIN = -1 << 63
_INT64_MAX = (1 << 63) - 1
_TIMELINER_CONFIGURATION_FILENAME = 'timeliner.yaml'
def __init__(
self, data_location=None, preferred_year=None,
system_configurations=None):
"""Initializes an event data timeliner.
Args:
data_location (Optional[str]): path of the timeliner configuration file.
preferred_year (Optional[int]): preferred initial year value for year-less
date and time values.
system_configurations (Optional[list[SystemConfigurationArtifact]]):
system configurations.
"""
super(EventDataTimeliner, self).__init__()
self._attribute_mappings = {}
self._base_years = {}
self._current_year = self._GetCurrentYear()
self._data_location = data_location
self._place_holder_event = set()
self._preferred_time_zone = None
self._preferred_year = preferred_year
self._time_zone_per_path_spec = None
self.number_of_produced_events = 0
self.parsers_counter = collections.Counter()
self._CreateTimeZonePerPathSpec(system_configurations)
self._ReadConfigurationFile()
def _CreateTimeZonePerPathSpec(self, system_configurations):
"""Creates the time zone per path specification lookup table.
Args:
system_configurations (list[SystemConfigurationArtifact]): system
configurations.
"""
self._time_zone_per_path_spec = {}
for system_configuration in system_configurations or []:
if system_configuration.time_zone:
for path_spec in system_configuration.path_specs:
if path_spec.parent:
self._time_zone_per_path_spec[path_spec.parent] = (
system_configuration.time_zone)
def _GetBaseYear(self, storage_writer, event_data):
"""Retrieves the base year.
Args:
storage_writer (StorageWriter): storage writer.
event_data (EventData): event data.
Returns:
int: base year.
"""
# If preferred year is set considered it a user override, otherwise try
# to determine the year based on the year-less log helper or fallback to
# the current year.
if self._preferred_year:
return self._preferred_year
event_data_stream_identifier = event_data.GetEventDataStreamIdentifier()
if not event_data_stream_identifier:
return self._current_year
lookup_key = event_data_stream_identifier.CopyToString()
base_year = self._base_years.get(lookup_key, None)
if base_year:
return base_year
filter_expression = '_event_data_stream_identifier == "{0:s}"'.format(
lookup_key)
year_less_log_helpers = list(storage_writer.GetAttributeContainers(
events.YearLessLogHelper.CONTAINER_TYPE,
filter_expression=filter_expression))
if not year_less_log_helpers:
message = (
'missing year-less log helper, defaulting to current year: '
'{0:d}').format(self._current_year)
self._ProduceTimeliningWarning(storage_writer, event_data, message)
base_year = self._current_year
else:
earliest_year = year_less_log_helpers[0].earliest_year
last_relative_year = year_less_log_helpers[0].last_relative_year
latest_year = year_less_log_helpers[0].latest_year
if earliest_year is None and latest_year is None:
message = (
'missing earliest and latest year in year-less log helper, '
'defaulting to current year: {0:d}').format(self._current_year)
self._ProduceTimeliningWarning(storage_writer, event_data, message)
base_year = self._current_year
elif earliest_year + last_relative_year < self._current_year:
base_year = earliest_year
elif latest_year < self._current_year:
message = (
'earliest year: {0:d} as base year would exceed current year: '
'{1:d} + {2:d}, using latest year: {3:d}').format(
earliest_year, self._current_year, last_relative_year,
latest_year)
self._ProduceTimeliningWarning(storage_writer, event_data, message)
base_year = latest_year - last_relative_year
else:
message = (
'earliest year: {0:d} and latest: year: {1:d} as base year '
'would exceed current year: {2:d} + {3:d}, using current '
'year').format(
earliest_year, latest_year, self._current_year,
last_relative_year)
self._ProduceTimeliningWarning(storage_writer, event_data, message)
base_year = self._current_year - last_relative_year
self._base_years[lookup_key] = base_year
return base_year
def _GetCurrentYear(self):
"""Retrieves current year.
Returns:
int: the current year.
"""
datetime_object = datetime.datetime.now()
return datetime_object.year
def _GetEvent(
self, storage_writer, event_data, event_data_stream, date_time,
date_time_description):
"""Retrieves an event.
Args:
storage_writer (StorageWriter): storage writer.
event_data (EventData): event data.
event_data_stream (EventDataStream): event data stream.
date_time (dfdatetime.DateTimeValues): date and time values.
date_time_description (str): description of the meaning of the date and
time values.
Returns:
EventObject: event.
"""
if date_time.is_delta:
base_year = self._GetBaseYear(storage_writer, event_data)
date_time = date_time.NewFromDeltaAndYear(base_year)
timestamp = date_time.GetPlasoTimestamp()
if timestamp is None:
self._ProduceTimeliningWarning(
storage_writer, event_data, 'unable to determine timestamp')
date_time = dfdatetime_semantic_time.InvalidTime()
timestamp = 0
# Check for out of bounds timestamps, for example if a data format has
# changed and the conversion leads to an incorrect large integer value.
# Integer values that are larger than 64-bit will cause an OverflowError
# in the SQLite storage.
elif timestamp < self._INT64_MIN or timestamp > self._INT64_MAX:
self._ProduceTimeliningWarning(
storage_writer, event_data, 'timestamp out of bounds')
date_time = dfdatetime_semantic_time.InvalidTime()
timestamp = 0
else:
if date_time.is_local_time:
time_zone = None
if date_time.time_zone_hint:
# TODO: cache time zones per hint.
try:
time_zone = pytz.timezone(date_time.time_zone_hint)
except pytz.UnknownTimeZoneError:
message = (
'unsupported time zone hint: {0:s}, using default time '
'zone').format(date_time.time_zone_hint)
self._ProduceTimeliningWarning(storage_writer, event_data, message)
if not time_zone and event_data_stream:
try:
time_zone = self._GetTimeZoneByPathSpec(event_data_stream.path_spec)
except pytz.UnknownTimeZoneError:
message = (
'unsupported system time zone: {0:s}, using default time '
'zone').format(date_time.time_zone_hint)
self._ProduceTimeliningWarning(storage_writer, event_data, message)
if not time_zone:
time_zone = self._preferred_time_zone or self._DEFAULT_TIME_ZONE
date_time = copy.deepcopy(date_time)
date_time.is_local_time = False
if time_zone != pytz.UTC:
datetime_object = datetime.datetime(
1970, 1, 1, 0, 0, 0, 0, tzinfo=None)
datetime_object += datetime.timedelta(microseconds=timestamp)
datetime_delta = time_zone.utcoffset(datetime_object, is_dst=False)
seconds_delta = int(datetime_delta.total_seconds())
timestamp -= seconds_delta * definitions.MICROSECONDS_PER_SECOND
date_time.time_zone_offset = seconds_delta // 60
event = events.EventObject()
event.date_time = date_time
event.timestamp = timestamp
event.timestamp_desc = date_time_description
event_data_identifier = event_data.GetIdentifier()
event.SetEventDataIdentifier(event_data_identifier)
return event
def _GetTimeZoneByPathSpec(self, path_spec):
"""Retrieves a time zone for a specific path specification.
Args:
path_spec (dfvfs.PathSpec): path specification.
Returns:
pytz.tzfile: time zone or None if not available.
Raises:
pytz.UnknownTimeZoneError: if the time zone is unknown.
"""
if not path_spec or not path_spec.parent:
return None
time_zone = self._time_zone_per_path_spec.get(path_spec.parent, None)
if not time_zone:
return None
if isinstance(time_zone, str):
try:
time_zone = pytz.timezone(time_zone)
self._time_zone_per_path_spec[path_spec.parent] = time_zone
except pytz.UnknownTimeZoneError as exeception:
self._time_zone_per_path_spec[path_spec.parent] = None
raise exeception
return time_zone
def _ProduceTimeliningWarning(self, storage_writer, event_data, message):
"""Produces a timelining warning.
Args:
storage_writer (StorageWriter): storage writer.
event_data (EventData): event data.
message (str): message of the warning.
"""
parser_chain = getattr(event_data, '_parser_chain', None)
path_spec = None
event_data_stream_identifier = event_data.GetEventDataStreamIdentifier()
if event_data_stream_identifier:
event_data_stream = storage_writer.GetAttributeContainerByIdentifier(
events.EventDataStream.CONTAINER_TYPE, event_data_stream_identifier)
if event_data_stream:
path_spec = event_data_stream.path_spec
warning = warnings.TimeliningWarning(
message=message, parser_chain=parser_chain, path_spec=path_spec)
storage_writer.AddAttributeContainer(warning)
def _ReadConfigurationFile(self):
"""Reads a timeliner configuration file.
Raises:
KeyError: if the attribute mappings are already set for the corresponding
data type.
"""
path = os.path.join(
self._data_location, self._TIMELINER_CONFIGURATION_FILENAME)
configuration_file = yaml_timeliner_file.YAMLTimelinerConfigurationFile()
for timeliner_definition in configuration_file.ReadFromFile(path):
if timeliner_definition.data_type in self._attribute_mappings:
raise KeyError(
'Attribute mappings for data type: {0:s} already set.'.format(
timeliner_definition.data_type))
self._attribute_mappings[timeliner_definition.data_type] = (
timeliner_definition.attribute_mappings)
if timeliner_definition.place_holder_event:
self._place_holder_event.add(timeliner_definition.data_type)
[docs] def ProcessEventData(self, storage_writer, event_data, event_data_stream):
"""Generate events from event data.
Args:
storage_writer (StorageWriter): storage writer.
event_data (EventData): event data.
event_data_stream (EventDataStream): event data stream.
"""
self.number_of_produced_events = 0
attribute_mappings = self._attribute_mappings.get(
event_data.data_type) or {}
if (not attribute_mappings and
event_data.data_type not in self._place_holder_event):
return
parser_name = None
parser_chain = getattr(event_data, '_parser_chain', None)
if parser_chain:
parser_name = parser_chain.rsplit('/', maxsplit=1)[-1]
number_of_events = 0
for attribute_name, time_description in attribute_mappings.items():
attribute_values = getattr(event_data, attribute_name, None) or []
if not isinstance(attribute_values, list):
attribute_values = [attribute_values]
for attribute_value in attribute_values:
if not isinstance(attribute_value, dfdatetime_interface.DateTimeValues):
message = 'unsupported date time attribute: {0:s}'.format(
attribute_name)
self._ProduceTimeliningWarning(storage_writer, event_data, message)
continue
event = self._GetEvent(
storage_writer, event_data, event_data_stream, attribute_value,
time_description)
try:
storage_writer.AddAttributeContainer(event)
except OverflowError as exception:
message = 'unable to add event with error: {0!s}'.format(exception)
self._ProduceTimeliningWarning(storage_writer, event_data, message)
continue
number_of_events += 1
if parser_name:
self.parsers_counter[parser_name] += 1
self.parsers_counter['total'] += 1
self.number_of_produced_events += 1
# Create a place holder event for event_data without date and time
# values to map.
if (not number_of_events and
event_data.data_type in self._place_holder_event):
date_time = dfdatetime_semantic_time.NotSet()
event = self._GetEvent(
storage_writer, event_data, event_data_stream, date_time,
definitions.TIME_DESCRIPTION_NOT_A_TIME)
storage_writer.AddAttributeContainer(event)
if parser_name:
self.parsers_counter[parser_name] += 1
self.parsers_counter['total'] += 1
self.number_of_produced_events += 1
[docs] def SetPreferredTimeZone(self, time_zone_string):
"""Sets the preferred time zone for zone-less date and time values.
Args:
time_zone_string (str): time zone such as "Europe/Amsterdam" or None if
the time zone determined by preprocessing or the default should be
used.
Raises:
ValueError: if the time zone is not supported.
"""
time_zone = None
if time_zone_string:
try:
time_zone = pytz.timezone(time_zone_string)
except pytz.UnknownTimeZoneError:
raise ValueError('Unsupported time zone: {0!s}'.format(
time_zone_string))
self._preferred_time_zone = time_zone