Source code for plaso.multi_process.output_engine

# -*- coding: utf-8 -*-
"""The output and formatting multi-processing engine."""

import heapq
import os

from plaso.containers import events
from plaso.engine import processing_status
from plaso.lib import bufferlib
from plaso.lib import definitions
from plaso.lib import errors
from plaso.multi_process import engine
from plaso.multi_process import logger
from plaso.output import mediator as output_mediator
from plaso.storage import time_range as storage_time_range


[docs] class PsortEventHeap(object): """Psort event heap."""
[docs] def __init__(self): """Initializes a psort events heap.""" super(PsortEventHeap, self).__init__() self._heap = []
@property def number_of_events(self): """int: number of events on the heap.""" return len(self._heap)
[docs] def PopEvent(self): """Pops an event from the heap. Returns: tuple: containing: str: identifier of the event MACB group or None if the event cannot be grouped. str: identifier of the event content. EventObject: event. EventData: event data. EventDataStream: event data stream. """ try: (event_values_hash, _, event, event_data, event_data_stream) = heapq.heappop(self._heap) return event_values_hash, event, event_data, event_data_stream except IndexError: return None
[docs] def PopEvents(self): """Pops events from the heap. Yields: tuple: containing: str: identifier of the event MACB group or None if the event cannot be grouped. str: identifier of the event content. EventObject: event. EventData: event data. EventDataStream: event data stream. """ heap_values = self.PopEvent() while heap_values: yield heap_values heap_values = self.PopEvent()
[docs] def PushEvent(self, event, event_data, event_data_stream): """Pushes an event onto the heap. Args: event (EventObject): event. event_data (EventData): event data. event_data_stream (EventDataStream): event data stream. """ event_values_hash = getattr(event_data, '_event_values_hash', None) if event_values_hash is None: logger.warning('Missing _event_values_hash attribute') event_values_hash = 'UNKNOWN' timestamp_desc = event.timestamp_desc if timestamp_desc is None: logger.warning('Missing timestamp_desc attribute') timestamp_desc = definitions.TIME_DESCRIPTION_UNKNOWN # Note that only events with the same timestamp are stored in the event # heap. The event values hash is stored first to cluster events with # similar event values. heapq.heappush(self._heap, ( event_values_hash, timestamp_desc, event, event_data, event_data_stream))
[docs] class OutputAndFormattingMultiProcessEngine(engine.MultiProcessEngine): """Output and formatting multi-processing engine.""" # TODO: move this to a single process engine. # pylint: disable=abstract-method _HEAP_MAXIMUM_EVENTS = 100000 _MESSAGE_FORMATTERS_DIRECTORY_NAME = 'formatters' _MESSAGE_FORMATTERS_FILE_NAME = 'formatters.yaml'
[docs] def __init__(self): """Initializes an output and formatting multi-processing engine.""" super(OutputAndFormattingMultiProcessEngine, self).__init__() # The export event heap is used to make sure the events are sorted in # a deterministic way. self._events_status = processing_status.EventsStatus() self._export_event_heap = PsortEventHeap() self._export_event_timestamp = 0 self._number_of_consumed_events = 0 self._output_mediator = None self._processing_configuration = None self._status = definitions.STATUS_INDICATOR_IDLE self._status_update_callback = None
def _CreateOutputMediator(self, storage_reader, processing_configuration): """Creates an output mediator. Args: storage_reader (StorageReader): storage reader. processing_configuration (ProcessingConfiguration): processing configuration. Returns: OutputMediator: mediates interactions between output modules and other components, such as storage and dfVFS. Raises: BadConfigOption: if the message formatters file or directory cannot be read. """ mediator = output_mediator.OutputMediator( storage_reader, data_location=processing_configuration.data_location, dynamic_time=processing_configuration.dynamic_time, preferred_encoding=processing_configuration.preferred_encoding) if processing_configuration.preferred_language: try: mediator.SetPreferredLanguageIdentifier( processing_configuration.preferred_language) except (KeyError, TypeError): logger.warning('Unable to to set preferred language: {0!s}.'.format( processing_configuration.preferred_language)) mediator.SetTimeZone(processing_configuration.preferred_time_zone) self._ReadMessageFormatters( mediator, processing_configuration.data_location, processing_configuration.custom_formatters_path) return mediator def _ExportEvent( self, storage_reader, output_module, event, event_data, event_data_stream, deduplicate_events=True): """Exports an event using an output module. Args: storage_reader (StorageReader): storage reader. output_module (OutputModule): output module. event (EventObject): event. event_data (EventData): event data. event_data_stream (EventDataStream): event data stream. deduplicate_events (Optional[bool]): True if events should be deduplicated. """ if (event.timestamp != self._export_event_timestamp or self._export_event_heap.number_of_events > self._HEAP_MAXIMUM_EVENTS): self._FlushExportBuffer( storage_reader, output_module, deduplicate_events=deduplicate_events) self._export_event_timestamp = event.timestamp self._export_event_heap.PushEvent(event, event_data, event_data_stream) def _ExportEvents( self, storage_reader, output_module, deduplicate_events=True, event_filter=None, time_slice=None, use_time_slicer=False): """Exports events using an output module. Args: storage_reader (StorageReader): storage reader. output_module (OutputModule): output module. deduplicate_events (Optional[bool]): True if events should be deduplicated. event_filter (Optional[EventObjectFilter]): event filter. time_slice (Optional[TimeRange]): time range that defines a time slice to filter events. use_time_slicer (Optional[bool]): True if the 'time slicer' should be used. The 'time slicer' will provide a context of events around an event of interest. """ self._status = definitions.STATUS_INDICATOR_EXPORTING time_slice_buffer = None time_slice_range = None if time_slice: if time_slice.event_timestamp is not None: time_slice_range = storage_time_range.TimeRange( time_slice.start_timestamp, time_slice.end_timestamp) if use_time_slicer: time_slice_buffer = bufferlib.CircularBuffer(time_slice.duration) filter_limit = getattr(event_filter, 'limit', None) forward_entries = 0 self._events_status.number_of_filtered_events = 0 self._events_status.number_of_events_from_time_slice = 0 for event in storage_reader.GetSortedEvents(time_range=time_slice_range): event_data_identifier = event.GetEventDataIdentifier() event_data = storage_reader.GetAttributeContainerByIdentifier( events.EventData.CONTAINER_TYPE, event_data_identifier) event_data_stream_identifier = event_data.GetEventDataStreamIdentifier() if event_data_stream_identifier: event_data_stream = storage_reader.GetAttributeContainerByIdentifier( events.EventDataStream.CONTAINER_TYPE, event_data_stream_identifier) else: event_data_stream = None event_identifier = event.GetIdentifier() event_tag = storage_reader.GetEventTagByEventIdentifer(event_identifier) if time_slice_range and event.timestamp != time_slice.event_timestamp: self._events_status.number_of_events_from_time_slice += 1 if event_filter: filter_match = event_filter.Match( event, event_data, event_data_stream, event_tag) else: filter_match = None # pylint: disable=singleton-comparison if filter_match == False: if not time_slice_buffer: self._events_status.number_of_filtered_events += 1 elif forward_entries == 0: time_slice_buffer.Append((event, event_data)) self._events_status.number_of_filtered_events += 1 elif forward_entries <= time_slice_buffer.size: self._ExportEvent( storage_reader, output_module, event, event_data, event_data_stream, deduplicate_events=deduplicate_events) self._number_of_consumed_events += 1 self._events_status.number_of_events_from_time_slice += 1 forward_entries += 1 else: # We reached the maximum size of the time slice and don't need to # include other entries. self._events_status.number_of_filtered_events += 1 forward_entries = 0 else: # pylint: disable=singleton-comparison if filter_match == True and time_slice_buffer: # Empty the time slice buffer. for event_in_buffer, event_data_in_buffer in ( time_slice_buffer.Flush()): self._ExportEvent( storage_reader, output_module, event_in_buffer, event_data_in_buffer, event_data_stream, deduplicate_events=deduplicate_events) self._number_of_consumed_events += 1 self._events_status.number_of_filtered_events += 1 self._events_status.number_of_events_from_time_slice += 1 forward_entries = 1 self._ExportEvent( storage_reader, output_module, event, event_data, event_data_stream, deduplicate_events=deduplicate_events) self._number_of_consumed_events += 1 # pylint: disable=singleton-comparison if (filter_match == True and filter_limit and filter_limit == self._number_of_consumed_events): break self._FlushExportBuffer(storage_reader, output_module) def _FlushExportBuffer( self, storage_reader, output_module, deduplicate_events=True): """Flushes buffered events and writes them to the output module. Args: storage_reader (StorageReader): storage reader. output_module (OutputModule): output module. deduplicate_events (Optional[bool]): True if events should be deduplicated. """ last_event_values_hash = None last_macb_group_identifier = None last_timestamp_desc = None macb_group = [] for (event_values_hash, event, event_data, event_data_stream) in self._export_event_heap.PopEvents(): timestamp_desc = event.timestamp_desc if (deduplicate_events and timestamp_desc == last_timestamp_desc and event_values_hash == last_event_values_hash): self._events_status.number_of_duplicate_events += 1 continue event_identifier = event.GetIdentifier() event_tag = storage_reader.GetEventTagByEventIdentifer(event_identifier) if timestamp_desc in ( definitions.TIME_DESCRIPTION_LAST_ACCESS, definitions.TIME_DESCRIPTION_CREATION, definitions.TIME_DESCRIPTION_METADATA_MODIFICATION, definitions.TIME_DESCRIPTION_MODIFICATION): macb_group_identifier = event_values_hash else: macb_group_identifier = None if macb_group_identifier is None: if macb_group: output_module.WriteFieldValuesOfMACBGroup( self._output_mediator, macb_group) macb_group = [] field_values = output_module.GetFieldValues( self._output_mediator, event, event_data, event_data_stream, event_tag) output_module.WriteFieldValues(self._output_mediator, field_values) else: if (last_macb_group_identifier == macb_group_identifier or not macb_group): macb_group.append((event, event_data, event_data_stream, event_tag)) else: output_module.WriteFieldValuesOfMACBGroup( self._output_mediator, macb_group) macb_group = [(event, event_data, event_data_stream, event_tag)] self._events_status.number_of_macb_grouped_events += 1 last_event_values_hash = event_values_hash last_macb_group_identifier = macb_group_identifier last_timestamp_desc = timestamp_desc if macb_group: output_module.WriteFieldValuesOfMACBGroup( self._output_mediator, macb_group) def _ReadMessageFormatters( self, output_mediator_object, data_location, custom_formatters_path): """Reads the message formatters from a formatters file or directory. Args: output_mediator_object (OutputMediator): mediates interactions between output modules and other components, such as storage and dfVFS. data_location (str): path to the data files. custom_formatters_path (str): path to custom formatter definitions file. Raises: BadConfigOption: if the message formatters file or directory cannot be read. """ formatters_directory = os.path.join( data_location, self._MESSAGE_FORMATTERS_DIRECTORY_NAME) formatters_file = os.path.join( data_location, self._MESSAGE_FORMATTERS_FILE_NAME) if os.path.isdir(formatters_directory): try: output_mediator_object.ReadMessageFormattersFromDirectory( formatters_directory) except KeyError as exception: raise errors.BadConfigOption(( 'Unable to read message formatters from directory: {0:s} with ' 'error: {1!s}').format(formatters_directory, exception)) elif os.path.isfile(formatters_file): try: output_mediator_object.ReadMessageFormattersFromFile(formatters_file) except KeyError as exception: raise errors.BadConfigOption(( 'Unable to read message formatters from file: {0:s} with error: ' '{1!s}').format(formatters_file, exception)) else: raise errors.BadConfigOption('Missing formatters file and directory.') if custom_formatters_path: try: output_mediator_object.ReadMessageFormattersFromFile( custom_formatters_path, override_existing=True) except KeyError as exception: raise errors.BadConfigOption(( 'Unable to read custrom message formatters from file: {0:s} with ' 'error: {1!s}').format(formatters_file, exception)) def _UpdateForemanProcessStatus(self): """Update the foreman process status.""" used_memory = self._process_information.GetUsedMemory() or 0 self._processing_status.UpdateForemanStatus( self._name, self._status, self._pid, used_memory, '', 0, 0, 0, 0, self._number_of_consumed_events, 0, 0, 0, 0, 0) self._processing_status.UpdateEventsStatus(self._events_status) def _UpdateStatus(self): """Update the status.""" self._UpdateForemanProcessStatus() if self._status_update_callback: self._status_update_callback(self._processing_status)
[docs] def ExportEvents( self, storage_reader, output_module, processing_configuration, deduplicate_events=True, event_filter=None, status_update_callback=None, time_slice=None, use_time_slicer=False): """Exports events using an output module. Args: storage_reader (StorageReader): storage reader. output_module (OutputModule): output module. processing_configuration (ProcessingConfiguration): processing configuration. deduplicate_events (Optional[bool]): True if events should be deduplicated. event_filter (Optional[EventObjectFilter]): event filter. status_update_callback (Optional[function]): callback function for status updates. time_slice (Optional[TimeSlice]): slice of time to output. use_time_slicer (Optional[bool]): True if the 'time slicer' should be used. The 'time slicer' will provide a context of events around an event of interest. Raises: BadConfigOption: if the message formatters file or directory cannot be read. """ self._events_status = processing_status.EventsStatus() self._processing_configuration = processing_configuration self._status_update_callback = status_update_callback total_number_of_events = 0 if storage_reader.HasAttributeContainers('parser_count'): parsers_counter = { parser_count.name: parser_count.number_of_events for parser_count in storage_reader.GetAttributeContainers( 'parser_count')} total_number_of_events = parsers_counter['total'] self._events_status.total_number_of_events = total_number_of_events self._output_mediator = self._CreateOutputMediator( storage_reader, processing_configuration) output_module.WriteHeader(output_mediator) self._StartStatusUpdateThread() self._StartProfiling(self._processing_configuration.profiling) try: self._ExportEvents( storage_reader, output_module, deduplicate_events=deduplicate_events, event_filter=event_filter, time_slice=time_slice, use_time_slicer=use_time_slicer) self._status = definitions.STATUS_INDICATOR_COMPLETED finally: # Stop the status update thread after close of the storage writer # so we include the storage sync to disk in the status updates. self._StopStatusUpdateThread() output_module.WriteFooter() # Update the status view one last time. self._UpdateStatus() self._StopProfiling() # Reset values. self._events_status = None self._output_mediator = None self._processing_configuration = None self._status_update_callback = None