Source code for plaso.output.mediator

# -*- coding: utf-8 -*-
"""The output mediator object."""

import glob
import os
import pytz

from plaso.engine import path_helper
from plaso.formatters import manager as formatters_manager
from plaso.formatters import yaml_formatters_file
from import languages
from plaso.lib import definitions
from plaso.output import winevt_rc

[docs]class OutputMediator(object): """Output mediator. Attributes: data_location (Optional[str]): path of the formatter data files. """ _DEFAULT_ENCODING = 'utf-8' # The default LCID 0x0409, which represents en-US. _DEFAULT_LCID = 0x0409 _DEFAULT_TIME_ZONE = pytz.UTC _WINEVT_RC_DATABASE = 'winevt-rc.db' def __init__( self, storage_reader, data_location=None, dynamic_time=False, preferred_encoding='utf-8'): """Initializes an output mediator. Args: storage_reader (StorageReader): storage reader. data_location (Optional[str]): path of the formatter data files. dynamic_time (Optional[bool]): True if date and time values should be represented in their granularity or semantically. preferred_encoding (Optional[str]): preferred encoding to output. """ super(OutputMediator, self).__init__() self._dynamic_time = dynamic_time self._hostname = None self._language_tag = None self._lcid = None self._message_formatters = {} self._preferred_encoding = preferred_encoding self._source_mappings = {} self._storage_reader = storage_reader self._system_configurations = None self._time_zone = None self._username_by_identifier = {} self.data_location = data_location @property def dynamic_time(self): """bool: True if dynamic time should be used.""" return self._dynamic_time @property def encoding(self): """str: preferred encoding to output.""" return self._preferred_encoding or self._DEFAULT_ENCODING @property def time_zone(self): """datetime.tzinfo: time zone.""" return self._time_zone or self._DEFAULT_TIME_ZONE def _ReadMessageFormattersFile(self, path, override_existing=False): """Reads a message formatters configuration file. Args: path (str): path of file that contains the message formatters configuration. override_existing (bool): True if existing message formatters should be overridden. Raises: KeyError: if the message formatter is already set for the corresponding data type and override existing is False. """ message_formatters_file = yaml_formatters_file.YAMLFormattersFile() for message_formatter in message_formatters_file.ReadFromFile(path): if (message_formatter.data_type in self._message_formatters and not override_existing): raise KeyError( 'Message formatter for data type: {0:s} already exists'.format( message_formatter.data_type)) for identifier in message_formatter.custom_helpers: custom_formatter_helper = ( formatters_manager.FormattersManager.GetEventFormatterHelper( identifier)) if custom_formatter_helper: message_formatter.AddHelper(custom_formatter_helper) self._message_formatters[message_formatter.data_type] = message_formatter self._source_mappings[message_formatter.data_type] = ( message_formatter.source_mapping) def _ReadUserAccount(self, user_identifier): """Reads a specific user account from the storage. Args: user_identifier (str): user identifier (UID or SID). Returns: UserAccountArtifact: user account or None if not available. """ # TODO: get username related to the source. filter_expression = 'identifier == "{0:s}"'.format(user_identifier) user_accounts = list(self._storage_reader.GetAttributeContainers( 'user_account', filter_expression=filter_expression)) if not user_accounts: return None return user_accounts[0]
[docs] def GetDisplayNameForPathSpec(self, path_spec): """Retrieves the display name for a path specification. Args: path_spec (dfvfs.PathSpec): path specification. Returns: str: human readable version of the path specification. """ return path_helper.PathHelper.GetDisplayNameForPathSpec(path_spec)
[docs] def GetHostname(self, event_data, default_hostname='-'): """Retrieves the hostname related to the event. Args: event_data (EventData): event data. default_hostname (Optional[str]): default hostname. Returns: str: hostname. """ hostname = getattr(event_data, 'hostname', None) if hostname: return hostname if self._system_configurations is None: self._system_configurations = list( self._storage_reader.GetAttributeContainers('system_configuration')) # TODO: get hostname related to the source. if not self._hostname and self._system_configurations: hostname_artifact = self._system_configurations[-1].hostname if hostname_artifact: self._hostname = return self._hostname or default_hostname
[docs] def GetMACBRepresentation(self, event, event_data): """Retrieves the MACB representation. Args: event (EventObject): event. event_data (EventData): event data. Returns: str: MACB representation. """ data_type = getattr(event_data, 'data_type', None) if not data_type: return '....' # The filestat parser is somewhat limited. if data_type == 'fs:stat': descriptions = event.timestamp_desc.split(';') return_characters = ['.', '.', '.', '.'] for description in descriptions: if description in ( 'mtime', definitions.TIME_DESCRIPTION_MODIFICATION): return_characters[0] = 'M' elif description in ( 'atime', definitions.TIME_DESCRIPTION_LAST_ACCESS): return_characters[1] = 'A' elif description in ( 'ctime', definitions.TIME_DESCRIPTION_METADATA_MODIFICATION): return_characters[2] = 'C' elif description in ( 'crtime', definitions.TIME_DESCRIPTION_CREATION): return_characters[3] = 'B' return ''.join(return_characters) # Access time. if event.timestamp_desc in [ definitions.TIME_DESCRIPTION_LAST_ACCESS, definitions.TIME_DESCRIPTION_ACCOUNT_CREATED, definitions.TIME_DESCRIPTION_LAST_VISITED, definitions.TIME_DESCRIPTION_START, definitions.TIME_DESCRIPTION_LAST_SHUTDOWN, definitions.TIME_DESCRIPTION_LAST_LOGIN, definitions.TIME_DESCRIPTION_LAST_PASSWORD_RESET, definitions.TIME_DESCRIPTION_LAST_CONNECTED, definitions.TIME_DESCRIPTION_LAST_RUN, definitions.TIME_DESCRIPTION_LAST_PRINTED]: return '.A..' # Content modification. if event.timestamp_desc in [ definitions.TIME_DESCRIPTION_MODIFICATION, definitions.TIME_DESCRIPTION_WRITTEN, definitions.TIME_DESCRIPTION_DELETED]: return 'M...' # Content creation time. if event.timestamp_desc in [ definitions.TIME_DESCRIPTION_CREATION, definitions.TIME_DESCRIPTION_ADDED, definitions.TIME_DESCRIPTION_FILE_DOWNLOADED, definitions.TIME_DESCRIPTION_FIRST_CONNECTED]: return '...B' # Metadata modification. if event.timestamp_desc == ( definitions.TIME_DESCRIPTION_METADATA_MODIFICATION): return '..C.' return '....'
[docs] def GetMACBRepresentationFromDescriptions(self, timestamp_descriptions): """Determines the MACB representation from the timestamp descriptions. MACB representation is a shorthand for representing one or more of modification, access, change, birth timestamp descriptions as the letters "MACB" or a "." if the corresponding timestamp is not set. Note that this is an output format shorthand and does not guarantee that the timestamps represent the same occurrence. Args: timestamp_descriptions (list[str]): timestamp descriptions, which are defined in definitions.TIME_DESCRIPTIONS. Returns: str: MACB representation. """ macb_representation = [] if ('mtime' in timestamp_descriptions or definitions.TIME_DESCRIPTION_MODIFICATION in timestamp_descriptions): macb_representation.append('M') else: macb_representation.append('.') if ('atime' in timestamp_descriptions or definitions.TIME_DESCRIPTION_LAST_ACCESS in timestamp_descriptions): macb_representation.append('A') else: macb_representation.append('.') if ('ctime' in timestamp_descriptions or definitions.TIME_DESCRIPTION_METADATA_MODIFICATION in ( timestamp_descriptions)): macb_representation.append('C') else: macb_representation.append('.') if ('crtime' in timestamp_descriptions or definitions.TIME_DESCRIPTION_CREATION in timestamp_descriptions): macb_representation.append('B') else: macb_representation.append('.') return ''.join(macb_representation)
[docs] def GetMessageFormatter(self, data_type): """Retrieves the message formatter for a specific data type. Args: data_type (str): data type. Returns: EventFormatter: corresponding message formatter or the default message formatter if not available. """ data_type = data_type.lower() return self._message_formatters.get(data_type, None)
[docs] def GetRelativePathForPathSpec(self, path_spec): """Retrieves the relative path for a path specification. Args: path_spec (dfvfs.PathSpec): path specification. Returns: str: relateive path of the path specification. """ return path_helper.PathHelper.GetRelativePathForPathSpec(path_spec)
[docs] def GetSourceMapping(self, data_type): """Retrieves the source mapping for a specific data type. Args: data_type (str): data type. Returns: tuple[str, str]: short and (long) source mappings or (None, None) if not available. """ data_type = data_type.lower() return self._source_mappings.get(data_type, (None, None))
[docs] def GetUsername(self, event_data, default_username='-'): """Retrieves the username related to the event data. Args: event_data (EventData): event data. default_username (Optional[str]): default username. Returns: str: username. """ username = getattr(event_data, 'username', None) if username and username != '-': return username username = default_username user_identifier = getattr(event_data, 'user_sid', None) if (user_identifier and user_identifier not in self._username_by_identifier): user_account = self._ReadUserAccount(user_identifier) if user_account: username = user_account.username self._username_by_identifier[user_identifier] = username return username or default_username
[docs] def GetWinevtResourcesHelper(self): """Retrieves a Windows EventLog resources helper. Returns: WinevtResourcesHelper: Windows EventLog resources helper. """ lcid = self._lcid if not lcid: # TODO: determine LCID from system configurations pass if not lcid: lcid = self._DEFAULT_LCID return winevt_rc.WinevtResourcesHelper( self._storage_reader, self.data_location, lcid)
[docs] def ReadMessageFormattersFromDirectory(self, path): """Reads message formatters from a directory. Args: path (str): path of directory that contains the message formatters configuration files. Raises: KeyError: if the message formatter is already set for the corresponding data type. """ for formatters_file_path in glob.glob(os.path.join(path, '*.yaml')): self._ReadMessageFormattersFile(formatters_file_path)
[docs] def ReadMessageFormattersFromFile(self, path, override_existing=False): """Reads message formatters from a file. Args: path (str): path of file that contains the message formatters configuration. override_existing (bool): True if existing message formatters should be overridden. Raises: KeyError: if the message formatter is already set for the corresponding data type. """ self._ReadMessageFormattersFile(path, override_existing=override_existing)
[docs] def SetPreferredLanguageIdentifier(self, language_tag): """Sets the preferred language identifier. Args: language_tag (str): language tag such as "en-US" for US English or "is-IS" for Icelandic. Raises: ValueError: if the language tag is not a string type or no LCID can be determined that corresponds with the language tag. """ lcid = None if language_tag: if not isinstance(language_tag, str): raise ValueError('Language tag: {0!s} is not a string.'.format( language_tag)) lcid = languages.WindowsLanguageHelper.GetLCIDForLanguageTag(language_tag) if not lcid: raise ValueError('No LCID found for language tag: {0:s}.'.format( language_tag)) self._language_tag = language_tag self._lcid = lcid
[docs] def SetTimeZone(self, time_zone): """Sets the time zone. Args: time_zone (str): time zone. Raises: ValueError: if the time zone is not supported. """ if time_zone: try: time_zone = pytz.timezone(time_zone) except pytz.UnknownTimeZoneError: raise ValueError('Unsupported time zone: {0:s}'.format(time_zone)) self._time_zone = time_zone