Source code for plaso.multi_process.extraction_process
"""The multi-process extraction worker process."""
from dfvfs.lib import definitions as dfvfs_definitions
from dfvfs.resolver import context
from dfvfs.resolver import resolver as path_spec_resolver
from plaso.engine import worker
from plaso.lib import definitions
from plaso.lib import errors
from plaso.multi_process import logger
from plaso.multi_process import plaso_queue
from plaso.multi_process import task_process
from plaso.parsers import mediator as parsers_mediator
[docs]
class ExtractionWorkerProcess(task_process.MultiProcessTaskProcess):
"""Multi-processing extraction worker process."""
# Maximum number of dfVFS file system objects to cache in the worker process.
_FILE_SYSTEM_CACHE_SIZE = 3
[docs]
def __init__(
self,
task_queue,
processing_configuration,
system_configurations,
windows_event_log_providers,
registry_find_specs,
**kwargs,
):
"""Initializes an extraction worker process.
Non-specified keyword arguments (kwargs) are directly passed to
multiprocessing.Process.
Args:
task_queue (PlasoQueue): task queue.
processing_configuration (ProcessingConfiguration): processing
configuration.
system_configurations (list[SystemConfigurationArtifact]): system
configurations.
windows_event_log_providers (list[WindowsEventLogProviderArtifact]):
Windows EventLog providers.
registry_find_specs (list[dfwinreg.FindSpec]): Windows Registry find
specifications.
kwargs: keyword arguments to pass to multiprocessing.Process.
"""
super().__init__(processing_configuration, **kwargs)
self._abort = False
self._buffer_size = 0
self._current_display_name = ""
self._extraction_worker = None
self._file_system_cache = []
self._number_of_consumed_sources = 0
self._parser_mediator = None
self._registry_find_specs = registry_find_specs
self._resolver_context = None
self._status = definitions.STATUS_INDICATOR_INITIALIZED
self._task = None
self._task_queue = task_queue
self._system_configurations = system_configurations
self._windows_event_log_providers = windows_event_log_providers
def _CacheFileSystem(self, file_system):
"""Caches a dfVFS file system object.
Keeping and additional reference to a dfVFS file system object causes the
object to remain cached in the resolver context. This minimizes the number
times the file system is re-opened.
Args:
file_system (dfvfs.FileSystem): file system.
"""
if file_system not in self._file_system_cache:
if len(self._file_system_cache) == self._FILE_SYSTEM_CACHE_SIZE:
self._file_system_cache.pop(0)
self._file_system_cache.append(file_system)
elif len(self._file_system_cache) == self._FILE_SYSTEM_CACHE_SIZE:
# Move the file system to the end of the list to preserve the most
# recently file system object.
self._file_system_cache.remove(file_system)
self._file_system_cache.append(file_system)
def _CreateParserMediator(
self,
resolver_context,
processing_configuration,
system_configurations,
windows_event_log_providers,
):
"""Creates a parser mediator.
Args:
resolver_context (dfvfs.Context): resolver context.
processing_configuration (ProcessingConfiguration): processing
configuration.
system_configurations (list[SystemConfigurationArtifact]): system
configurations.
windows_event_log_providers (list[WindowsEventLogProviderArtifact]):
Windows EventLog providers.
Returns:
ParserMediator: parser mediator.
"""
parser_mediator = parsers_mediator.ParserMediator(
registry_find_specs=self._registry_find_specs,
resolver_context=resolver_context,
system_configurations=system_configurations,
)
parser_mediator.SetExtractWinEvtResources(
processing_configuration.extraction.extract_winevt_resources
)
parser_mediator.SetExtractWinRegBinaryValues(
processing_configuration.extraction.extract_winreg_binary
)
parser_mediator.SetPreferredCodepage(
processing_configuration.preferred_codepage
)
parser_mediator.SetPreferredLanguage(
processing_configuration.preferred_language
)
parser_mediator.SetTemporaryDirectory(
processing_configuration.temporary_directory
)
parser_mediator.SetWindowsEventLogProviders(windows_event_log_providers)
return parser_mediator
def _GetStatus(self):
"""Retrieves status information.
Returns:
dict[str, object]: status attributes, indexed by name.
"""
if self._parser_mediator:
number_of_produced_event_data = (
self._parser_mediator.number_of_produced_event_data
)
number_of_produced_sources = (
self._parser_mediator.number_of_produced_event_sources
)
else:
number_of_produced_event_data = None
number_of_produced_sources = None
if self._extraction_worker and self._parser_mediator:
last_activity_timestamp = max(
self._extraction_worker.last_activity_timestamp,
self._parser_mediator.last_activity_timestamp,
)
processing_status = self._extraction_worker.processing_status
else:
last_activity_timestamp = 0.0
processing_status = self._status
task_identifier = getattr(self._task, "identifier", "")
if self._process_information:
used_memory = self._process_information.GetUsedMemory() or 0
else:
used_memory = 0
if self._memory_profiler:
self._memory_profiler.Sample("main", used_memory)
# XML RPC does not support integer values > 2 GiB so we format them
# as a string.
used_memory = f"{used_memory:d}"
status = {
"display_name": self._current_display_name,
"identifier": self._name,
"last_activity_timestamp": last_activity_timestamp,
"number_of_consumed_event_data": None,
"number_of_consumed_event_tags": None,
"number_of_consumed_events": None,
"number_of_consumed_sources": self._number_of_consumed_sources,
"number_of_produced_event_data": number_of_produced_event_data,
"number_of_produced_event_tags": None,
"number_of_produced_events": None,
"number_of_produced_sources": number_of_produced_sources,
"processing_status": processing_status,
"task_identifier": task_identifier,
"used_memory": used_memory,
}
return status
def _Main(self):
"""The main loop."""
# We need a resolver context per process to prevent multi processing
# issues with file objects stored in images.
self._resolver_context = context.Context()
for credential_configuration in self._processing_configuration.credentials:
path_spec_resolver.Resolver.key_chain.SetCredential(
credential_configuration.path_spec,
credential_configuration.credential_type,
credential_configuration.credential_data,
)
self._parser_mediator = self._CreateParserMediator(
self._resolver_context,
self._processing_configuration,
self._system_configurations,
self._windows_event_log_providers,
)
# We need to initialize the parser and hasher objects after the process
# has forked otherwise on Windows the "fork" will fail with
# a PickleError for Python modules that cannot be pickled.
self._extraction_worker = worker.EventExtractionWorker(
parser_filter_expression=(
self._processing_configuration.parser_filter_expression
)
)
self._extraction_worker.SetExtractionConfiguration(
self._processing_configuration.extraction
)
self._parser_mediator.StartProfiling(
self._processing_configuration.profiling,
self._name,
self._process_information,
)
self._StartProfiling(self._processing_configuration.profiling)
if self._analyzers_profiler:
self._extraction_worker.SetAnalyzersProfiler(self._analyzers_profiler)
if self._processing_profiler:
self._extraction_worker.SetProcessingProfiler(self._processing_profiler)
logger.debug(f"Worker: {self._name!s} (PID: {self._pid:d}) started.")
self._status = definitions.STATUS_INDICATOR_RUNNING
try:
logger.debug(
f"{self._name!s} (PID: {self._pid:d}) started monitoring task queue."
)
while not self._abort:
try:
task = self._task_queue.PopItem()
except (errors.QueueClose, errors.QueueEmpty) as exception:
exception_type = type(exception)
logger.debug(
f"ConsumeItems exiting with exception: {exception_type!s}."
)
break
if isinstance(task, plaso_queue.QueueAbort):
logger.debug("ConsumeItems exiting, dequeued QueueAbort object.")
break
self._ProcessTask(task)
logger.debug(
f"{self._name!s} (PID: {self._pid:d}) stopped monitoring task queue."
)
# All exceptions need to be caught here to prevent the process
# from being killed by an uncaught exception.
except Exception as exception: # pylint: disable=broad-except
logger.warning(
(
f"Unhandled exception in process: {self._name!s} "
f"(PID: {self._pid:d})."
)
)
logger.exception(exception)
self._abort = True
if self._analyzers_profiler:
self._extraction_worker.SetAnalyzersProfiler(None)
if self._processing_profiler:
self._extraction_worker.SetProcessingProfiler(None)
self._StopProfiling()
self._parser_mediator.StopProfiling()
self._extraction_worker = None
self._file_system_cache = []
self._parser_mediator = None
self._resolver_context = None
if self._abort:
self._status = definitions.STATUS_INDICATOR_ABORTED
else:
self._status = definitions.STATUS_INDICATOR_COMPLETED
logger.debug(f"Worker: {self._name!s} (PID: {self._pid:d}) stopped.")
try:
self._task_queue.Close(abort=self._abort)
except errors.QueueAlreadyClosed:
logger.error(f"Queue for {self.name:s} was already closed.")
def _ProcessPathSpec(self, extraction_worker, parser_mediator, path_spec):
"""Processes a path specification.
Args:
extraction_worker (worker.ExtractionWorker): extraction worker.
parser_mediator (ParserMediator): parser mediator.
path_spec (dfvfs.PathSpec): path specification.
"""
self._current_display_name = parser_mediator.GetDisplayNameForPathSpec(
path_spec
)
try:
file_entry = path_spec_resolver.Resolver.OpenFileEntry(
path_spec, resolver_context=parser_mediator.resolver_context
)
if file_entry is None:
logger.warning(
f"Unable to open file entry: {self._current_display_name:s}"
)
return
if (
path_spec
and not path_spec.IsSystemLevel()
and path_spec.type_indicator != dfvfs_definitions.TYPE_INDICATOR_GZIP
):
file_system = file_entry.GetFileSystem()
self._CacheFileSystem(file_system)
extraction_worker.ProcessFileEntry(parser_mediator, file_entry)
except Exception as exception: # pylint: disable=broad-except
parser_mediator.ProduceExtractionWarning(
(f"unable to process path specification with error: " f"{exception!s}"),
path_spec=path_spec,
)
if self._processing_configuration.debug_output:
logger.warning(
(
f"Unhandled exception while processing path specification: "
f"{self._current_display_name:s}."
)
)
logger.exception(exception)
def _ProcessTask(self, task):
"""Processes a task.
Args:
task (Task): task.
"""
logger.debug(f"Started processing task: {task.identifier:s}.")
if self._tasks_profiler:
self._tasks_profiler.Sample(task, "processing_started")
task.storage_format = self._processing_configuration.task_storage_format
self._task = task
task_storage_writer = self._storage_factory.CreateTaskStorageWriter(
self._processing_configuration.task_storage_format
)
if self._serializers_profiler:
task_storage_writer.SetSerializersProfiler(self._serializers_profiler)
if self._storage_profiler:
task_storage_writer.SetStorageProfiler(self._storage_profiler)
self._parser_mediator.SetStorageWriter(task_storage_writer)
storage_file_path = self._GetTaskStorageFilePath(
self._processing_configuration.task_storage_format, task
)
task_storage_writer.Open(
path=storage_file_path,
session_identifier=task.session_identifier,
task_identifier=task.identifier,
)
try:
task_storage_writer.AddAttributeContainer(task)
# TODO: add support for more task types.
self._ProcessPathSpec(
self._extraction_worker, self._parser_mediator, task.path_spec
)
self._number_of_consumed_sources += 1
finally:
task.aborted = self._abort
task_storage_writer.UpdateAttributeContainer(task)
task_storage_writer.Close()
self._parser_mediator.SetStorageWriter(None)
try:
self._FinalizeTaskStorageWriter(
self._processing_configuration.task_storage_format, task
)
except OSError:
pass
self._task = None
if self._tasks_profiler:
self._tasks_profiler.Sample(task, "processing_completed")
logger.debug(f"Completed processing task: {task.identifier:s}.")
[docs]
def SignalAbort(self):
"""Signals the process to abort."""
self._abort = True
if self._extraction_worker:
self._extraction_worker.SignalAbort()
if self._parser_mediator:
self._parser_mediator.SignalAbort()