Source code for plaso.multi_process.analysis_process

"""The multi-process analysis worker process."""

import threading

from plaso.analysis import mediator as analysis_mediator
from plaso.containers import tasks
from plaso.lib import definitions
from plaso.lib import errors
from plaso.multi_process import logger
from plaso.multi_process import plaso_queue
from plaso.multi_process import task_process


[docs] class AnalysisProcess(task_process.MultiProcessTaskProcess): """Multi-processing analysis worker process.""" # Number of seconds to wait for the completion status to be queried # by the foreman process. _FOREMAN_STATUS_WAIT = 5 * 60
[docs] def __init__( self, event_queue, analysis_plugin, processing_configuration, user_accounts, data_location=None, event_filter_expression=None, **kwargs, ): """Initializes an analysis worker process. Non-specified keyword arguments (kwargs) are directly passed to multiprocessing.Process. Args: event_queue (plaso_queue.Queue): event queue. analysis_plugin (AnalysisPlugin): plugin running in the process. processing_configuration (ProcessingConfiguration): processing configuration. user_accounts (list[UserAccountArtifact]): user accounts. data_location (Optional[str]): path to the location that data files should be loaded from. event_filter_expression (Optional[str]): event filter expression. """ super().__init__(processing_configuration, **kwargs) self._abort = False self._analysis_mediator = None self._analysis_plugin = analysis_plugin self._data_location = data_location self._event_filter_expression = event_filter_expression self._event_queue = event_queue self._foreman_status_wait_event = None self._number_of_consumed_events = 0 self._status = definitions.STATUS_INDICATOR_INITIALIZED self._task = None self._user_accounts = user_accounts
def _GetStatus(self): """Retrieves status information. Returns: dict[str, object]: status attributes, indexed by name. """ logger.debug("Status update requested") if self._analysis_mediator: number_of_produced_event_tags = ( self._analysis_mediator.number_of_produced_event_tags ) number_of_produced_reports = ( self._analysis_mediator.number_of_produced_analysis_reports ) else: number_of_produced_event_tags = None number_of_produced_reports = None if self._process_information: used_memory = self._process_information.GetUsedMemory() or 0 else: used_memory = 0 if self._memory_profiler: self._memory_profiler.Sample("main", used_memory) status = { "display_name": "", "identifier": self._name, "number_of_consumed_event_data": None, "number_of_consumed_event_tags": None, "number_of_consumed_events": self._number_of_consumed_events, "number_of_consumed_reports": None, "number_of_consumed_sources": None, "number_of_produced_event_data": None, "number_of_produced_event_tags": number_of_produced_event_tags, "number_of_produced_events": None, "number_of_produced_reports": number_of_produced_reports, "number_of_produced_sources": None, "processing_status": self._status, "task_identifier": None, # XML RPC does not support integer values > 2 GiB so we format them # as a string. "used_memory": f"{used_memory:d}", } if self._status in ( definitions.STATUS_INDICATOR_ABORTED, definitions.STATUS_INDICATOR_COMPLETED, ): logger.debug("Set foreman status wait event") self._foreman_status_wait_event.set() return status def _Main(self): """The main loop.""" self._StartProfiling(self._processing_configuration.profiling) logger.debug(f"Analysis plugin: {self._name:s} (PID: {self._pid:d}) started.") # Creating the threading event in the constructor will cause a pickle # error on Windows when an analysis process is created. self._foreman_status_wait_event = threading.Event() self._status = definitions.STATUS_INDICATOR_ANALYZING task = tasks.Task() task.storage_format = definitions.STORAGE_FORMAT_SQLITE # TODO: temporary solution. task.identifier = self._analysis_plugin.plugin_name self._task = task task_storage_writer = self._storage_factory.CreateTaskStorageWriter( definitions.STORAGE_FORMAT_SQLITE ) if self._serializers_profiler: task_storage_writer.SetSerializersProfiler(self._serializers_profiler) if self._storage_profiler: task_storage_writer.SetStorageProfiler(self._storage_profiler) storage_file_path = self._GetTaskStorageFilePath( definitions.STORAGE_FORMAT_SQLITE, task ) task_storage_writer.Open(path=storage_file_path) self._analysis_mediator = analysis_mediator.AnalysisMediator( data_location=self._data_location, user_accounts=self._user_accounts ) # TODO: move into analysis process. self._analysis_mediator.SetStorageWriter(task_storage_writer) # TODO: set event_filter_expression in mediator. task_storage_writer.AddAttributeContainer(task) try: logger.debug( ( f"{self._name:s} (PID: {self._pid:d}) started monitoring event " f"queue." ) ) while not self._abort: try: queued_object = self._event_queue.PopItem() except (errors.QueueClose, errors.QueueEmpty) as exception: exception_type = type(exception) logger.debug( f"ConsumeItems exiting with exception {exception_type!s}." ) break if isinstance(queued_object, plaso_queue.QueueAbort): logger.debug("ConsumeItems exiting, dequeued QueueAbort object.") break self._ProcessEventTripple(self._analysis_mediator, queued_object) self._number_of_consumed_events += 1 logger.debug( ( f"{self._name:s} (PID: {self._pid:d}) stopped monitoring event " f"queue." ) ) if not self._abort: self._status = definitions.STATUS_INDICATOR_REPORTING self._analysis_mediator.ProduceAnalysisReport(self._analysis_plugin) # All exceptions need to be caught here to prevent the process # from being killed by an uncaught exception. except Exception as exception: # pylint: disable=broad-except logger.warning( ( f"Unhandled exception in process: {self._name:s} (PID: " f"{self._pid:d})." ) ) logger.exception(exception) self._abort = True finally: task.aborted = self._abort task_storage_writer.UpdateAttributeContainer(task) task_storage_writer.Close() if self._serializers_profiler: task_storage_writer.SetSerializersProfiler(None) if self._storage_profiler: task_storage_writer.SetStorageProfiler(None) try: self._FinalizeTaskStorageWriter(definitions.STORAGE_FORMAT_SQLITE, task) except OSError as exception: logger.warning(f"Unable to finalize task storage with error: {exception!s}") if self._abort: self._status = definitions.STATUS_INDICATOR_ABORTED else: self._status = definitions.STATUS_INDICATOR_COMPLETED logger.debug("Wait for foreman status wait event") self._foreman_status_wait_event.clear() self._foreman_status_wait_event.wait(self._FOREMAN_STATUS_WAIT) logger.debug(f"Analysis plugin: {self._name:s} (PID: {self._pid:d}) stopped.") self._StopProfiling() self._analysis_mediator = None self._foreman_status_wait_event = None self._task = None try: self._event_queue.Close(abort=self._abort) except errors.QueueAlreadyClosed: logger.error(f"Queue for {self.name:s} was already closed.") def _ProcessEventTripple(self, mediator, event_tripple): """Processes an event tripple. Args: mediator (AnalysisMediator): mediates interactions between analysis plugins and other components, such as storage and dfVFS. event_tripple (EventTripple): event tripple. """ try: self._analysis_plugin.ExamineEvent( mediator, event_tripple.event, event_tripple.event_data, event_tripple.event_data_stream, ) except Exception as exception: # pylint: disable=broad-except # TODO: write analysis error and change logger to debug only. logger.warning("Unhandled exception while processing event object.") logger.exception(exception)
[docs] def SignalAbort(self): """Signals the process to abort.""" self._abort = True if self._foreman_status_wait_event: logger.debug("Abort foreman status wait event") self._foreman_status_wait_event.set() if self._analysis_mediator: self._analysis_mediator.SignalAbort()