Source code for plaso.multi_process.extraction_process

"""The multi-process extraction worker process."""

from dfvfs.lib import definitions as dfvfs_definitions
from dfvfs.resolver import context
from dfvfs.resolver import resolver as path_spec_resolver

from plaso.engine import worker
from plaso.lib import definitions
from plaso.lib import errors
from plaso.multi_process import logger
from plaso.multi_process import plaso_queue
from plaso.multi_process import task_process
from plaso.parsers import mediator as parsers_mediator


[docs] class ExtractionWorkerProcess(task_process.MultiProcessTaskProcess): """Multi-processing extraction worker process.""" # Maximum number of dfVFS file system objects to cache in the worker process. _FILE_SYSTEM_CACHE_SIZE = 3
[docs] def __init__( self, task_queue, processing_configuration, system_configurations, windows_event_log_providers, registry_find_specs, **kwargs, ): """Initializes an extraction worker process. Non-specified keyword arguments (kwargs) are directly passed to multiprocessing.Process. Args: task_queue (PlasoQueue): task queue. processing_configuration (ProcessingConfiguration): processing configuration. system_configurations (list[SystemConfigurationArtifact]): system configurations. windows_event_log_providers (list[WindowsEventLogProviderArtifact]): Windows EventLog providers. registry_find_specs (list[dfwinreg.FindSpec]): Windows Registry find specifications. kwargs: keyword arguments to pass to multiprocessing.Process. """ super().__init__(processing_configuration, **kwargs) self._abort = False self._buffer_size = 0 self._current_display_name = "" self._extraction_worker = None self._file_system_cache = [] self._number_of_consumed_sources = 0 self._parser_mediator = None self._registry_find_specs = registry_find_specs self._resolver_context = None self._status = definitions.STATUS_INDICATOR_INITIALIZED self._task = None self._task_queue = task_queue self._system_configurations = system_configurations self._windows_event_log_providers = windows_event_log_providers
def _CacheFileSystem(self, file_system): """Caches a dfVFS file system object. Keeping and additional reference to a dfVFS file system object causes the object to remain cached in the resolver context. This minimizes the number times the file system is re-opened. Args: file_system (dfvfs.FileSystem): file system. """ if file_system not in self._file_system_cache: if len(self._file_system_cache) == self._FILE_SYSTEM_CACHE_SIZE: self._file_system_cache.pop(0) self._file_system_cache.append(file_system) elif len(self._file_system_cache) == self._FILE_SYSTEM_CACHE_SIZE: # Move the file system to the end of the list to preserve the most # recently file system object. self._file_system_cache.remove(file_system) self._file_system_cache.append(file_system) def _CreateParserMediator( self, resolver_context, processing_configuration, system_configurations, windows_event_log_providers, ): """Creates a parser mediator. Args: resolver_context (dfvfs.Context): resolver context. processing_configuration (ProcessingConfiguration): processing configuration. system_configurations (list[SystemConfigurationArtifact]): system configurations. windows_event_log_providers (list[WindowsEventLogProviderArtifact]): Windows EventLog providers. Returns: ParserMediator: parser mediator. """ parser_mediator = parsers_mediator.ParserMediator( registry_find_specs=self._registry_find_specs, resolver_context=resolver_context, system_configurations=system_configurations, ) parser_mediator.SetExtractWinEvtResources( processing_configuration.extraction.extract_winevt_resources ) parser_mediator.SetExtractWinRegBinaryValues( processing_configuration.extraction.extract_winreg_binary ) parser_mediator.SetPreferredCodepage( processing_configuration.preferred_codepage ) parser_mediator.SetPreferredLanguage( processing_configuration.preferred_language ) parser_mediator.SetTemporaryDirectory( processing_configuration.temporary_directory ) parser_mediator.SetWindowsEventLogProviders(windows_event_log_providers) return parser_mediator def _GetStatus(self): """Retrieves status information. Returns: dict[str, object]: status attributes, indexed by name. """ if self._parser_mediator: number_of_produced_event_data = ( self._parser_mediator.number_of_produced_event_data ) number_of_produced_sources = ( self._parser_mediator.number_of_produced_event_sources ) else: number_of_produced_event_data = None number_of_produced_sources = None if self._extraction_worker and self._parser_mediator: last_activity_timestamp = max( self._extraction_worker.last_activity_timestamp, self._parser_mediator.last_activity_timestamp, ) processing_status = self._extraction_worker.processing_status else: last_activity_timestamp = 0.0 processing_status = self._status task_identifier = getattr(self._task, "identifier", "") if self._process_information: used_memory = self._process_information.GetUsedMemory() or 0 else: used_memory = 0 if self._memory_profiler: self._memory_profiler.Sample("main", used_memory) # XML RPC does not support integer values > 2 GiB so we format them # as a string. used_memory = f"{used_memory:d}" status = { "display_name": self._current_display_name, "identifier": self._name, "last_activity_timestamp": last_activity_timestamp, "number_of_consumed_event_data": None, "number_of_consumed_event_tags": None, "number_of_consumed_events": None, "number_of_consumed_sources": self._number_of_consumed_sources, "number_of_produced_event_data": number_of_produced_event_data, "number_of_produced_event_tags": None, "number_of_produced_events": None, "number_of_produced_sources": number_of_produced_sources, "processing_status": processing_status, "task_identifier": task_identifier, "used_memory": used_memory, } return status def _Main(self): """The main loop.""" # We need a resolver context per process to prevent multi processing # issues with file objects stored in images. self._resolver_context = context.Context() for credential_configuration in self._processing_configuration.credentials: path_spec_resolver.Resolver.key_chain.SetCredential( credential_configuration.path_spec, credential_configuration.credential_type, credential_configuration.credential_data, ) self._parser_mediator = self._CreateParserMediator( self._resolver_context, self._processing_configuration, self._system_configurations, self._windows_event_log_providers, ) # We need to initialize the parser and hasher objects after the process # has forked otherwise on Windows the "fork" will fail with # a PickleError for Python modules that cannot be pickled. self._extraction_worker = worker.EventExtractionWorker( parser_filter_expression=( self._processing_configuration.parser_filter_expression ) ) self._extraction_worker.SetExtractionConfiguration( self._processing_configuration.extraction ) self._parser_mediator.StartProfiling( self._processing_configuration.profiling, self._name, self._process_information, ) self._StartProfiling(self._processing_configuration.profiling) if self._analyzers_profiler: self._extraction_worker.SetAnalyzersProfiler(self._analyzers_profiler) if self._processing_profiler: self._extraction_worker.SetProcessingProfiler(self._processing_profiler) logger.debug(f"Worker: {self._name!s} (PID: {self._pid:d}) started.") self._status = definitions.STATUS_INDICATOR_RUNNING try: logger.debug( f"{self._name!s} (PID: {self._pid:d}) started monitoring task queue." ) while not self._abort: try: task = self._task_queue.PopItem() except (errors.QueueClose, errors.QueueEmpty) as exception: exception_type = type(exception) logger.debug( f"ConsumeItems exiting with exception: {exception_type!s}." ) break if isinstance(task, plaso_queue.QueueAbort): logger.debug("ConsumeItems exiting, dequeued QueueAbort object.") break self._ProcessTask(task) logger.debug( f"{self._name!s} (PID: {self._pid:d}) stopped monitoring task queue." ) # All exceptions need to be caught here to prevent the process # from being killed by an uncaught exception. except Exception as exception: # pylint: disable=broad-except logger.warning( ( f"Unhandled exception in process: {self._name!s} " f"(PID: {self._pid:d})." ) ) logger.exception(exception) self._abort = True if self._analyzers_profiler: self._extraction_worker.SetAnalyzersProfiler(None) if self._processing_profiler: self._extraction_worker.SetProcessingProfiler(None) self._StopProfiling() self._parser_mediator.StopProfiling() self._extraction_worker = None self._file_system_cache = [] self._parser_mediator = None self._resolver_context = None if self._abort: self._status = definitions.STATUS_INDICATOR_ABORTED else: self._status = definitions.STATUS_INDICATOR_COMPLETED logger.debug(f"Worker: {self._name!s} (PID: {self._pid:d}) stopped.") try: self._task_queue.Close(abort=self._abort) except errors.QueueAlreadyClosed: logger.error(f"Queue for {self.name:s} was already closed.") def _ProcessPathSpec(self, extraction_worker, parser_mediator, path_spec): """Processes a path specification. Args: extraction_worker (worker.ExtractionWorker): extraction worker. parser_mediator (ParserMediator): parser mediator. path_spec (dfvfs.PathSpec): path specification. """ self._current_display_name = parser_mediator.GetDisplayNameForPathSpec( path_spec ) try: file_entry = path_spec_resolver.Resolver.OpenFileEntry( path_spec, resolver_context=parser_mediator.resolver_context ) if file_entry is None: logger.warning( f"Unable to open file entry: {self._current_display_name:s}" ) return if ( path_spec and not path_spec.IsSystemLevel() and path_spec.type_indicator != dfvfs_definitions.TYPE_INDICATOR_GZIP ): file_system = file_entry.GetFileSystem() self._CacheFileSystem(file_system) extraction_worker.ProcessFileEntry(parser_mediator, file_entry) except Exception as exception: # pylint: disable=broad-except parser_mediator.ProduceExtractionWarning( (f"unable to process path specification with error: " f"{exception!s}"), path_spec=path_spec, ) if self._processing_configuration.debug_output: logger.warning( ( f"Unhandled exception while processing path specification: " f"{self._current_display_name:s}." ) ) logger.exception(exception) def _ProcessTask(self, task): """Processes a task. Args: task (Task): task. """ logger.debug(f"Started processing task: {task.identifier:s}.") if self._tasks_profiler: self._tasks_profiler.Sample(task, "processing_started") task.storage_format = self._processing_configuration.task_storage_format self._task = task task_storage_writer = self._storage_factory.CreateTaskStorageWriter( self._processing_configuration.task_storage_format ) if self._serializers_profiler: task_storage_writer.SetSerializersProfiler(self._serializers_profiler) if self._storage_profiler: task_storage_writer.SetStorageProfiler(self._storage_profiler) self._parser_mediator.SetStorageWriter(task_storage_writer) storage_file_path = self._GetTaskStorageFilePath( self._processing_configuration.task_storage_format, task ) task_storage_writer.Open( path=storage_file_path, session_identifier=task.session_identifier, task_identifier=task.identifier, ) try: task_storage_writer.AddAttributeContainer(task) # TODO: add support for more task types. self._ProcessPathSpec( self._extraction_worker, self._parser_mediator, task.path_spec ) self._number_of_consumed_sources += 1 finally: task.aborted = self._abort task_storage_writer.UpdateAttributeContainer(task) task_storage_writer.Close() self._parser_mediator.SetStorageWriter(None) try: self._FinalizeTaskStorageWriter( self._processing_configuration.task_storage_format, task ) except OSError: pass self._task = None if self._tasks_profiler: self._tasks_profiler.Sample(task, "processing_completed") logger.debug(f"Completed processing task: {task.identifier:s}.")
[docs] def SignalAbort(self): """Signals the process to abort.""" self._abort = True if self._extraction_worker: self._extraction_worker.SignalAbort() if self._parser_mediator: self._parser_mediator.SignalAbort()