Source code for plaso.multi_process.extraction_process

# -*- coding: utf-8 -*-
"""The multi-process extraction worker process."""

from dfvfs.lib import definitions as dfvfs_definitions
from dfvfs.resolver import context
from dfvfs.resolver import resolver as path_spec_resolver

from plaso.engine import worker
from plaso.lib import definitions
from plaso.lib import errors
from plaso.multi_process import logger
from plaso.multi_process import plaso_queue
from plaso.multi_process import task_process
from plaso.parsers import mediator as parsers_mediator


[docs] class ExtractionWorkerProcess(task_process.MultiProcessTaskProcess): """Multi-processing extraction worker process.""" # Maximum number of dfVFS file system objects to cache in the worker process. _FILE_SYSTEM_CACHE_SIZE = 3
[docs] def __init__( self, task_queue, processing_configuration, system_configurations, windows_event_log_providers, registry_find_specs, **kwargs): """Initializes an extraction worker process. Non-specified keyword arguments (kwargs) are directly passed to multiprocessing.Process. Args: task_queue (PlasoQueue): task queue. processing_configuration (ProcessingConfiguration): processing configuration. system_configurations (list[SystemConfigurationArtifact]): system configurations. windows_event_log_providers (list[WindowsEventLogProviderArtifact]): Windows EventLog providers. registry_find_specs (list[dfwinreg.FindSpec]): Windows Registry find specifications. kwargs: keyword arguments to pass to multiprocessing.Process. """ super(ExtractionWorkerProcess, self).__init__( processing_configuration, **kwargs) self._abort = False self._buffer_size = 0 self._current_display_name = '' self._extraction_worker = None self._file_system_cache = [] self._number_of_consumed_sources = 0 self._parser_mediator = None self._registry_find_specs = registry_find_specs self._resolver_context = None self._status = definitions.STATUS_INDICATOR_INITIALIZED self._task = None self._task_queue = task_queue self._system_configurations = system_configurations self._windows_event_log_providers = windows_event_log_providers
def _CacheFileSystem(self, file_system): """Caches a dfVFS file system object. Keeping and additional reference to a dfVFS file system object causes the object to remain cached in the resolver context. This minimizes the number times the file system is re-opened. Args: file_system (dfvfs.FileSystem): file system. """ if file_system not in self._file_system_cache: if len(self._file_system_cache) == self._FILE_SYSTEM_CACHE_SIZE: self._file_system_cache.pop(0) self._file_system_cache.append(file_system) elif len(self._file_system_cache) == self._FILE_SYSTEM_CACHE_SIZE: # Move the file system to the end of the list to preserve the most # recently file system object. self._file_system_cache.remove(file_system) self._file_system_cache.append(file_system) def _CreateParserMediator( self, resolver_context, processing_configuration, system_configurations, windows_event_log_providers): """Creates a parser mediator. Args: resolver_context (dfvfs.Context): resolver context. processing_configuration (ProcessingConfiguration): processing configuration. system_configurations (list[SystemConfigurationArtifact]): system configurations. windows_event_log_providers (list[WindowsEventLogProviderArtifact]): Windows EventLog providers. Returns: ParserMediator: parser mediator. """ parser_mediator = parsers_mediator.ParserMediator( registry_find_specs=self._registry_find_specs, resolver_context=resolver_context, system_configurations=system_configurations) parser_mediator.SetExtractWinEvtResources( processing_configuration.extraction.extract_winevt_resources) parser_mediator.SetExtractWinRegBinaryValues( processing_configuration.extraction.extract_winreg_binary) parser_mediator.SetPreferredCodepage( processing_configuration.preferred_codepage) parser_mediator.SetPreferredLanguage( processing_configuration.preferred_language) parser_mediator.SetTemporaryDirectory( processing_configuration.temporary_directory) parser_mediator.SetWindowsEventLogProviders(windows_event_log_providers) return parser_mediator def _GetStatus(self): """Retrieves status information. Returns: dict[str, object]: status attributes, indexed by name. """ if self._parser_mediator: number_of_produced_event_data = ( self._parser_mediator.number_of_produced_event_data) number_of_produced_sources = ( self._parser_mediator.number_of_produced_event_sources) else: number_of_produced_event_data = None number_of_produced_sources = None if self._extraction_worker and self._parser_mediator: last_activity_timestamp = max( self._extraction_worker.last_activity_timestamp, self._parser_mediator.last_activity_timestamp) processing_status = self._extraction_worker.processing_status else: last_activity_timestamp = 0.0 processing_status = self._status task_identifier = getattr(self._task, 'identifier', '') if self._process_information: used_memory = self._process_information.GetUsedMemory() or 0 else: used_memory = 0 if self._memory_profiler: self._memory_profiler.Sample('main', used_memory) # XML RPC does not support integer values > 2 GiB so we format them # as a string. used_memory = f'{used_memory:d}' status = { 'display_name': self._current_display_name, 'identifier': self._name, 'last_activity_timestamp': last_activity_timestamp, 'number_of_consumed_event_data': None, 'number_of_consumed_event_tags': None, 'number_of_consumed_events': None, 'number_of_consumed_sources': self._number_of_consumed_sources, 'number_of_produced_event_data': number_of_produced_event_data, 'number_of_produced_event_tags': None, 'number_of_produced_events': None, 'number_of_produced_sources': number_of_produced_sources, 'processing_status': processing_status, 'task_identifier': task_identifier, 'used_memory': used_memory} return status def _Main(self): """The main loop.""" # We need a resolver context per process to prevent multi processing # issues with file objects stored in images. self._resolver_context = context.Context() for credential_configuration in self._processing_configuration.credentials: path_spec_resolver.Resolver.key_chain.SetCredential( credential_configuration.path_spec, credential_configuration.credential_type, credential_configuration.credential_data) self._parser_mediator = self._CreateParserMediator( self._resolver_context, self._processing_configuration, self._system_configurations, self._windows_event_log_providers) # We need to initialize the parser and hasher objects after the process # has forked otherwise on Windows the "fork" will fail with # a PickleError for Python modules that cannot be pickled. self._extraction_worker = worker.EventExtractionWorker( parser_filter_expression=( self._processing_configuration.parser_filter_expression)) self._extraction_worker.SetExtractionConfiguration( self._processing_configuration.extraction) self._parser_mediator.StartProfiling( self._processing_configuration.profiling, self._name, self._process_information) self._StartProfiling(self._processing_configuration.profiling) if self._analyzers_profiler: self._extraction_worker.SetAnalyzersProfiler(self._analyzers_profiler) if self._processing_profiler: self._extraction_worker.SetProcessingProfiler(self._processing_profiler) logger.debug(f'Worker: {self._name!s} (PID: {self._pid:d}) started.') self._status = definitions.STATUS_INDICATOR_RUNNING try: logger.debug( f'{self._name!s} (PID: {self._pid:d}) started monitoring task queue.') while not self._abort: try: task = self._task_queue.PopItem() except (errors.QueueClose, errors.QueueEmpty) as exception: exception_type = type(exception) logger.debug( f'ConsumeItems exiting with exception: {exception_type!s}.') break if isinstance(task, plaso_queue.QueueAbort): logger.debug('ConsumeItems exiting, dequeued QueueAbort object.') break self._ProcessTask(task) logger.debug( f'{self._name!s} (PID: {self._pid:d}) stopped monitoring task queue.') # All exceptions need to be caught here to prevent the process # from being killed by an uncaught exception. except Exception as exception: # pylint: disable=broad-except logger.warning(( f'Unhandled exception in process: {self._name!s} ' f'(PID: {self._pid:d}).')) logger.exception(exception) self._abort = True if self._analyzers_profiler: self._extraction_worker.SetAnalyzersProfiler(None) if self._processing_profiler: self._extraction_worker.SetProcessingProfiler(None) self._StopProfiling() self._parser_mediator.StopProfiling() self._extraction_worker = None self._file_system_cache = [] self._parser_mediator = None self._resolver_context = None if self._abort: self._status = definitions.STATUS_INDICATOR_ABORTED else: self._status = definitions.STATUS_INDICATOR_COMPLETED logger.debug(f'Worker: {self._name!s} (PID: {self._pid:d}) stopped.') try: self._task_queue.Close(abort=self._abort) except errors.QueueAlreadyClosed: logger.error(f'Queue for {self.name:s} was already closed.') def _ProcessPathSpec(self, extraction_worker, parser_mediator, path_spec): """Processes a path specification. Args: extraction_worker (worker.ExtractionWorker): extraction worker. parser_mediator (ParserMediator): parser mediator. path_spec (dfvfs.PathSpec): path specification. """ self._current_display_name = parser_mediator.GetDisplayNameForPathSpec( path_spec) try: file_entry = path_spec_resolver.Resolver.OpenFileEntry( path_spec, resolver_context=parser_mediator.resolver_context) if file_entry is None: logger.warning( f'Unable to open file entry: {self._current_display_name:s}') return if (path_spec and not path_spec.IsSystemLevel() and path_spec.type_indicator != dfvfs_definitions.TYPE_INDICATOR_GZIP): file_system = file_entry.GetFileSystem() self._CacheFileSystem(file_system) extraction_worker.ProcessFileEntry(parser_mediator, file_entry) except Exception as exception: # pylint: disable=broad-except parser_mediator.ProduceExtractionWarning(( f'unable to process path specification with error: ' f'{exception!s}'), path_spec=path_spec) if self._processing_configuration.debug_output: logger.warning(( f'Unhandled exception while processing path specification: ' f'{self._current_display_name:s}.')) logger.exception(exception) def _ProcessTask(self, task): """Processes a task. Args: task (Task): task. """ logger.debug(f'Started processing task: {task.identifier:s}.') if self._tasks_profiler: self._tasks_profiler.Sample(task, 'processing_started') task.storage_format = self._processing_configuration.task_storage_format self._task = task task_storage_writer = self._storage_factory.CreateTaskStorageWriter( self._processing_configuration.task_storage_format) if self._serializers_profiler: task_storage_writer.SetSerializersProfiler(self._serializers_profiler) if self._storage_profiler: task_storage_writer.SetStorageProfiler(self._storage_profiler) self._parser_mediator.SetStorageWriter(task_storage_writer) storage_file_path = self._GetTaskStorageFilePath( self._processing_configuration.task_storage_format, task) task_storage_writer.Open( path=storage_file_path, session_identifier=task.session_identifier, task_identifier=task.identifier) try: task_storage_writer.AddAttributeContainer(task) # TODO: add support for more task types. self._ProcessPathSpec( self._extraction_worker, self._parser_mediator, task.path_spec) self._number_of_consumed_sources += 1 finally: task.aborted = self._abort task_storage_writer.UpdateAttributeContainer(task) task_storage_writer.Close() self._parser_mediator.SetStorageWriter(None) try: self._FinalizeTaskStorageWriter( self._processing_configuration.task_storage_format, task) except IOError: pass self._task = None if self._tasks_profiler: self._tasks_profiler.Sample(task, 'processing_completed') logger.debug(f'Completed processing task: {task.identifier:s}.')
[docs] def SignalAbort(self): """Signals the process to abort.""" self._abort = True if self._extraction_worker: self._extraction_worker.SignalAbort() if self._parser_mediator: self._parser_mediator.SignalAbort()