# -*- coding: utf-8 -*-
"""The single process extraction engine."""
import collections
import os
import pdb
import threading
import time
from dfvfs.lib import definitions as dfvfs_definitions
from dfvfs.resolver import resolver as path_spec_resolver
from plaso.containers import counts
from plaso.containers import event_sources
from plaso.containers import events
from plaso.engine import engine
from plaso.engine import extractors
from plaso.engine import logger
from plaso.engine import process_info
from plaso.engine import timeliner
from plaso.engine import worker
from plaso.lib import definitions
from plaso.lib import errors
from plaso.parsers import mediator as parsers_mediator
[docs]
class SingleProcessEngine(engine.BaseEngine):
"""Single process extraction engine."""
_CONTAINER_TYPE_EVENT_DATA_STREAM = events.EventDataStream.CONTAINER_TYPE
# Maximum number of dfVFS file system objects to cache.
_FILE_SYSTEM_CACHE_SIZE = 3
[docs]
def __init__(self, status_update_callback=None):
"""Initializes a single process extraction engine.
Args:
status_update_callback (Optional[function]): callback function for status
updates.
"""
super(SingleProcessEngine, self).__init__()
self._current_display_name = ''
self._event_data_timeliner = None
self._extraction_worker = None
self._file_system_cache = []
self._number_of_consumed_event_data = 0
self._number_of_consumed_sources = 0
self._number_of_produced_events = 0
self._parser_mediator = None
self._parsers_counter = None
self._path_spec_extractor = extractors.PathSpecExtractor()
self._pid = os.getpid()
self._process_information = process_info.ProcessInfo(self._pid)
self._processing_configuration = None
self._resolver_context = None
self._status = definitions.STATUS_INDICATOR_IDLE
self._status_update_active = False
self._status_update_callback = status_update_callback
self._status_update_thread = None
self._storage_writer = None
def _CacheFileSystem(self, file_system):
"""Caches a dfVFS file system object.
Keeping and additional reference to a dfVFS file system object causes the
object to remain cached in the resolver context. This minimizes the number
times the file system is re-opened.
Args:
file_system (dfvfs.FileSystem): file system.
"""
if file_system not in self._file_system_cache:
if len(self._file_system_cache) == self._FILE_SYSTEM_CACHE_SIZE:
self._file_system_cache.pop(0)
self._file_system_cache.append(file_system)
elif len(self._file_system_cache) == self._FILE_SYSTEM_CACHE_SIZE:
# Move the file system to the end of the list to preserve the most
# recently file system object.
self._file_system_cache.remove(file_system)
self._file_system_cache.append(file_system)
def _CheckExcludedPathSpec(self, file_system, path_spec):
"""Determines if the path specification should be excluded from extraction.
Args:
file_system (dfvfs.FileSystem): file system which the path specification
is part of.
path_spec (dfvfs.PathSpec): path specification.
Returns:
bool: True if the path specification should be excluded from extraction.
"""
for find_spec in self._excluded_file_system_find_specs or []:
if find_spec.ComparePathSpecLocation(path_spec, file_system):
return True
return False
def _CollectInitialEventSources(
self, parser_mediator, file_system_path_specs):
"""Collects the initial event sources.
Args:
parser_mediator (ParserMediator): mediates interactions between parsers
and other components, such as storage and dfVFS.
file_system_path_specs (list[dfvfs.PathSpec]): path specifications of
the source file systems to process.
"""
self._status = definitions.STATUS_INDICATOR_COLLECTING
included_find_specs = self.GetCollectionIncludedFindSpecs()
for file_system_path_spec in file_system_path_specs:
if self._abort:
break
try:
file_system = path_spec_resolver.Resolver.OpenFileSystem(
file_system_path_spec, resolver_context=self._resolver_context)
path_spec_generator = self._path_spec_extractor.ExtractPathSpecs(
file_system_path_spec, find_specs=included_find_specs,
recurse_file_system=False, resolver_context=self._resolver_context)
for path_spec in path_spec_generator:
if self._abort:
break
if self._CheckExcludedPathSpec(file_system, path_spec):
display_name = parser_mediator.GetDisplayNameForPathSpec(path_spec)
logger.debug(f'Excluded from extraction: {display_name:s}.')
continue
# TODO: determine if event sources should be DataStream or FileEntry
# or both.
event_source = event_sources.FileEntryEventSource(path_spec=path_spec)
parser_mediator.ProduceEventSource(event_source)
except KeyboardInterrupt:
self._abort = True
self._processing_status.aborted = True
if self._status_update_callback:
self._status_update_callback(self._processing_status)
# All exceptions need to be caught here to prevent the process
# from being killed by an uncaught exception.
except Exception as exception: # pylint: disable=broad-except
parser_mediator.ProduceExtractionWarning((
f'unable to process path specification with error: '
f'{exception!s}'), file_system_path_spec)
def _ProcessEventData(self):
"""Generate events from event data."""
if self._processing_profiler:
self._processing_profiler.StartTiming('process_event_data')
self._status = definitions.STATUS_INDICATOR_TIMELINING
if self._processing_profiler:
self._processing_profiler.StartTiming('get_event_data')
event_data = self._storage_writer.GetFirstWrittenEventData()
if self._processing_profiler:
self._processing_profiler.StopTiming('get_event_data')
while event_data:
if self._abort:
break
event_data_stream_identifier = event_data.GetEventDataStreamIdentifier()
event_data_stream = None
if event_data_stream_identifier:
if self._processing_profiler:
self._processing_profiler.StartTiming('get_event_data_stream')
event_data_stream = (
self._storage_writer.GetAttributeContainerByIdentifier(
self._CONTAINER_TYPE_EVENT_DATA_STREAM,
event_data_stream_identifier))
if self._processing_profiler:
self._processing_profiler.StopTiming('get_event_data_stream')
self._event_data_timeliner.ProcessEventData(
self._storage_writer, event_data, event_data_stream)
self._number_of_consumed_event_data += 1
self._number_of_produced_events += (
self._event_data_timeliner.number_of_produced_events)
# TODO: track number of consumed event data containers?
if self._processing_profiler:
self._processing_profiler.StartTiming('get_event_data')
event_data = self._storage_writer.GetNextWrittenEventData()
if self._processing_profiler:
self._processing_profiler.StopTiming('get_event_data')
if self._abort:
self._status = definitions.STATUS_INDICATOR_ABORTED
else:
self._status = definitions.STATUS_INDICATOR_COMPLETED
if self._processing_profiler:
self._processing_profiler.StopTiming('process_event_data')
def _ProcessEventSources(self, storage_writer, parser_mediator):
"""Processes event sources.
Args:
storage_writer (StorageWriter): storage writer for a session storage.
parser_mediator (ParserMediator): mediates interactions between parsers
and other components, such as storage and dfVFS.
"""
self._status = definitions.STATUS_INDICATOR_RUNNING
if self._processing_profiler:
self._processing_profiler.StartTiming('get_event_source')
event_source = storage_writer.GetFirstWrittenEventSource()
if self._processing_profiler:
self._processing_profiler.StopTiming('get_event_source')
while event_source:
if self._abort:
break
self._ProcessPathSpec(parser_mediator, event_source.path_spec)
self._number_of_consumed_sources += 1
if self._processing_profiler:
self._processing_profiler.StartTiming('get_event_source')
event_source = storage_writer.GetNextWrittenEventSource()
if self._processing_profiler:
self._processing_profiler.StopTiming('get_event_source')
def _ProcessPathSpec(self, parser_mediator, path_spec):
"""Processes a path specification.
Args:
parser_mediator (ParserMediator): mediates interactions between parsers
and other components, such as storage and dfVFS.
path_spec (dfvfs.PathSpec): path specification.
"""
try:
self._current_display_name = parser_mediator.GetDisplayNameForPathSpec(
path_spec)
file_entry = path_spec_resolver.Resolver.OpenFileEntry(
path_spec, resolver_context=parser_mediator.resolver_context)
if file_entry is None:
logger.warning(
f'Unable to open file entry: {self._current_display_name:s}')
return
file_system = file_entry.GetFileSystem()
if (path_spec and not path_spec.IsSystemLevel() and
path_spec.type_indicator != dfvfs_definitions.TYPE_INDICATOR_GZIP):
self._CacheFileSystem(file_system)
if self._CheckExcludedPathSpec(file_system, path_spec):
logger.debug(
f'Excluded from extraction: {self._current_display_name:s}.')
return
self._extraction_worker.ProcessFileEntry(parser_mediator, file_entry)
except KeyboardInterrupt:
self._abort = True
self._processing_status.aborted = True
if self._status_update_callback:
self._status_update_callback(self._processing_status)
# All exceptions need to be caught here to prevent the process
# from being killed by an uncaught exception.
except Exception as exception: # pylint: disable=broad-except
parser_mediator.ProduceExtractionWarning((
f'unable to process path specification with error: '
f'{exception!s}'), path_spec=path_spec)
if getattr(self._processing_configuration, 'debug_output', False):
self._StopStatusUpdateThread()
logger.warning((
f'Unhandled exception while processing path spec: '
f'{self._current_display_name:s}.'))
logger.exception(exception)
pdb.post_mortem()
self._StartStatusUpdateThread()
def _ProcessSource(self, parser_mediator, file_system_path_specs):
"""Processes file systems within a source.
Args:
parser_mediator (ParserMediator): mediates interactions between parsers
and other components, such as storage and dfVFS.
file_system_path_specs (list[dfvfs.PathSpec]): path specifications of
the source file systems to process.
"""
self._current_display_name = ''
self._number_of_consumed_sources = 0
if self._processing_profiler:
self._processing_profiler.StartTiming('process_source')
self._CollectInitialEventSources(
parser_mediator, file_system_path_specs)
if not self._abort:
self._ProcessEventSources(self._storage_writer, parser_mediator)
if self._processing_profiler:
self._processing_profiler.StopTiming('process_source')
if self._abort:
self._status = definitions.STATUS_INDICATOR_ABORTED
else:
self._status = definitions.STATUS_INDICATOR_COMPLETED
def _StartStatusUpdateThread(self):
"""Starts the status update thread."""
self._status_update_active = True
self._status_update_thread = threading.Thread(
name='Status update', target=self._StatusUpdateThreadMain)
self._status_update_thread.start()
def _StatusUpdateThreadMain(self):
"""Main function of the status update thread."""
while self._status_update_active:
self._UpdateStatus()
time.sleep(self._status_update_interval)
def _StopStatusUpdateThread(self):
"""Stops the status update thread."""
if self._status_update_thread:
self._status_update_active = False
if self._status_update_thread.is_alive():
self._status_update_thread.join()
self._status_update_thread = None
def _UpdateStatus(self):
"""Updates the processing status."""
status = self._extraction_worker.processing_status
if status == definitions.STATUS_INDICATOR_IDLE:
status = self._status
if status == definitions.STATUS_INDICATOR_IDLE:
status = definitions.STATUS_INDICATOR_RUNNING
used_memory = self._process_information.GetUsedMemory() or 0
self._processing_status.UpdateForemanStatus(
self._name, status, self._pid, used_memory, self._current_display_name,
self._number_of_consumed_sources,
self._parser_mediator.number_of_produced_event_sources,
self._number_of_consumed_event_data,
self._parser_mediator.number_of_produced_event_data,
0, self._number_of_produced_events,
0, 0,
0, 0)
if self._status_update_callback:
self._status_update_callback(self._processing_status)
def _CreateParserMediator(
self, storage_writer, resolver_context, processing_configuration,
system_configurations, windows_event_log_providers):
"""Creates a parser mediator.
Args:
storage_writer (StorageWriter): storage writer for a session storage.
resolver_context (dfvfs.Context): resolver context.
processing_configuration (ProcessingConfiguration): processing
configuration.
system_configurations (list[SystemConfigurationArtifact]): system
configurations.
windows_event_log_providers (list[WindowsEventLogProviderArtifact]):
Windows EventLog providers.
Returns:
ParserMediator: parser mediator.
Raises:
BadConfigOption: if an invalid collection filter was specified.
"""
# TODO: get environment_variables per system_configuration
environment_variables = None
if self.knowledge_base:
environment_variables = self.knowledge_base.GetEnvironmentVariables()
user_accounts = list(storage_writer.GetAttributeContainers('user_account'))
try:
self.BuildCollectionFilters(
environment_variables, user_accounts,
artifact_filter_names=processing_configuration.artifact_filters,
filter_file_path=processing_configuration.filter_file)
except errors.InvalidFilter as exception:
raise errors.BadConfigOption(
f'Unable to build collection filters with error: {exception!s}')
parser_mediator = parsers_mediator.ParserMediator(
registry_find_specs=self._registry_find_specs,
resolver_context=resolver_context,
system_configurations=system_configurations)
parser_mediator.SetExtractWinEvtResources(
processing_configuration.extraction.extract_winevt_resources)
parser_mediator.SetExtractWinRegBinaryValues(
processing_configuration.extraction.extract_winreg_binary)
parser_mediator.SetPreferredCodepage(
processing_configuration.preferred_codepage)
parser_mediator.SetPreferredLanguage(
processing_configuration.preferred_language)
parser_mediator.SetTemporaryDirectory(
processing_configuration.temporary_directory)
parser_mediator.SetWindowsEventLogProviders(windows_event_log_providers)
return parser_mediator
[docs]
def ProcessSource(
self, storage_writer, resolver_context, processing_configuration,
system_configurations, file_system_path_specs):
"""Processes file systems within a source.
Args:
storage_writer (StorageWriter): storage writer for a session storage.
resolver_context (dfvfs.Context): resolver context.
processing_configuration (ProcessingConfiguration): processing
configuration.
system_configurations (list[SystemConfigurationArtifact]): system
configurations.
file_system_path_specs (list[dfvfs.PathSpec]): path specifications of
the source file systems to process.
Returns:
ProcessingStatus: processing status.
Raises:
BadConfigOption: if an invalid collection filter was specified or if
the preferred time zone is invalid.
"""
if not self._artifacts_registry:
# TODO: refactor.
self.BuildArtifactsRegistry(
processing_configuration.artifact_definitions_path,
processing_configuration.custom_artifacts_path)
windows_event_log_providers = list(storage_writer.GetAttributeContainers(
'windows_eventlog_provider'))
parser_mediator = self._CreateParserMediator(
storage_writer, resolver_context, processing_configuration,
system_configurations, windows_event_log_providers)
parser_mediator.SetStorageWriter(storage_writer)
self._extraction_worker = worker.EventExtractionWorker(
force_parser=processing_configuration.force_parser,
parser_filter_expression=(
processing_configuration.parser_filter_expression))
self._extraction_worker.SetExtractionConfiguration(
processing_configuration.extraction)
self._event_data_timeliner = timeliner.EventDataTimeliner(
data_location=processing_configuration.data_location,
preferred_year=processing_configuration.preferred_year,
system_configurations=system_configurations)
try:
self._event_data_timeliner.SetPreferredTimeZone(
processing_configuration.preferred_time_zone)
except ValueError as exception:
raise errors.BadConfigOption(exception)
self._parser_mediator = parser_mediator
self._processing_configuration = processing_configuration
self._resolver_context = resolver_context
self._storage_writer = storage_writer
logger.debug('Processing started.')
parser_mediator.StartProfiling(
self._processing_configuration.profiling, self._name,
self._process_information)
self._StartProfiling(self._processing_configuration.profiling)
if self._analyzers_profiler:
self._extraction_worker.SetAnalyzersProfiler(self._analyzers_profiler)
if self._processing_profiler:
self._extraction_worker.SetProcessingProfiler(self._processing_profiler)
if self._serializers_profiler:
self._storage_writer.SetSerializersProfiler(self._serializers_profiler)
if self._storage_profiler:
self._storage_writer.SetStorageProfiler(self._storage_profiler)
self._StartStatusUpdateThread()
self._parsers_counter = collections.Counter({
parser_count.name: parser_count
for parser_count in self._storage_writer.GetAttributeContainers(
'parser_count')})
try:
self._ProcessSource(parser_mediator, file_system_path_specs)
self._ProcessEventData()
finally:
# Stop the status update thread after close of the storage writer
# so we include the storage sync to disk in the status updates.
self._StopStatusUpdateThread()
if self._analyzers_profiler:
self._extraction_worker.SetAnalyzersProfiler(None)
if self._processing_profiler:
self._extraction_worker.SetProcessingProfiler(None)
if self._serializers_profiler:
self._storage_writer.SetSerializersProfiler(None)
if self._storage_profiler:
self._storage_writer.SetStorageProfiler(None)
self._StopProfiling()
parser_mediator.StopProfiling()
for key, value in self._event_data_timeliner.parsers_counter.items():
parser_count = self._parsers_counter.get(key, None)
if parser_count:
parser_count.number_of_events += value
self._storage_writer.UpdateAttributeContainer(parser_count)
else:
parser_count = counts.ParserCount(name=key, number_of_events=value)
self._parsers_counter[key] = parser_count
self._storage_writer.AddAttributeContainer(parser_count)
# TODO: remove after completion event and event data split.
for key, value in parser_mediator.parsers_counter.items():
parser_count = self._parsers_counter.get(key, None)
if parser_count:
parser_count.number_of_events += value
self._storage_writer.UpdateAttributeContainer(parser_count)
else:
parser_count = counts.ParserCount(name=key, number_of_events=value)
self._parsers_counter[key] = parser_count
self._storage_writer.AddAttributeContainer(parser_count)
if self._abort:
logger.debug('Processing aborted.')
self._processing_status.aborted = True
else:
logger.debug('Processing completed.')
# Update the status view one last time.
self._UpdateStatus()
self._event_data_timeliner = None
self._extraction_worker = None
self._file_system_cache = []
self._parser_mediator = None
self._processing_configuration = None
self._resolver_context = None
self._storage_writer = None
return self._processing_status