"""The single process extraction engine."""
import collections
import os
import pdb
import threading
import time
from dfvfs.lib import definitions as dfvfs_definitions
from dfvfs.resolver import resolver as path_spec_resolver
from plaso.containers import counts
from plaso.containers import event_sources
from plaso.containers import events
from plaso.engine import engine
from plaso.engine import extractors
from plaso.engine import logger
from plaso.engine import process_info
from plaso.engine import timeliner
from plaso.engine import worker
from plaso.lib import definitions
from plaso.lib import errors
from plaso.parsers import mediator as parsers_mediator
[docs]
class SingleProcessEngine(engine.BaseEngine):
"""Single process extraction engine."""
_CONTAINER_TYPE_EVENT_DATA_STREAM = events.EventDataStream.CONTAINER_TYPE
# Maximum number of dfVFS file system objects to cache.
_FILE_SYSTEM_CACHE_SIZE = 3
[docs]
def __init__(self, status_update_callback=None):
"""Initializes a single process extraction engine.
Args:
status_update_callback (Optional[function]): callback function for status
updates.
"""
super().__init__()
self._current_display_name = ""
self._data_types_counter = None
self._event_data_timeliner = None
self._extraction_worker = None
self._file_system_cache = []
self._number_of_consumed_event_data = 0
self._number_of_consumed_sources = 0
self._number_of_produced_events = 0
self._parser_mediator = None
self._parsers_counter = None
self._path_spec_extractor = extractors.PathSpecExtractor()
self._pid = os.getpid()
self._process_information = process_info.ProcessInfo(self._pid)
self._processing_configuration = None
self._resolver_context = None
self._status = definitions.STATUS_INDICATOR_IDLE
self._status_update_active = False
self._status_update_callback = status_update_callback
self._status_update_thread = None
self._storage_writer = None
def _CacheFileSystem(self, file_system):
"""Caches a dfVFS file system object.
Keeping and additional reference to a dfVFS file system object causes the
object to remain cached in the resolver context. This minimizes the number
times the file system is re-opened.
Args:
file_system (dfvfs.FileSystem): file system.
"""
if file_system not in self._file_system_cache:
if len(self._file_system_cache) == self._FILE_SYSTEM_CACHE_SIZE:
self._file_system_cache.pop(0)
self._file_system_cache.append(file_system)
elif len(self._file_system_cache) == self._FILE_SYSTEM_CACHE_SIZE:
# Move the file system to the end of the list to preserve the most
# recently file system object.
self._file_system_cache.remove(file_system)
self._file_system_cache.append(file_system)
def _CheckExcludedPathSpec(self, file_system, path_spec):
"""Determines if the path specification should be excluded from extraction.
Args:
file_system (dfvfs.FileSystem): file system which the path specification
is part of.
path_spec (dfvfs.PathSpec): path specification.
Returns:
bool: True if the path specification should be excluded from extraction.
"""
for find_spec in self._excluded_file_system_find_specs or []:
if find_spec.ComparePathSpecLocation(path_spec, file_system):
return True
return False
def _CollectInitialEventSources(self, parser_mediator, file_system_path_specs):
"""Collects the initial event sources.
Args:
parser_mediator (ParserMediator): mediates interactions between parsers
and other components, such as storage and dfVFS.
file_system_path_specs (list[dfvfs.PathSpec]): path specifications of
the source file systems to process.
"""
self._status = definitions.STATUS_INDICATOR_COLLECTING
included_find_specs = self.GetCollectionIncludedFindSpecs()
for file_system_path_spec in file_system_path_specs:
if self._abort:
break
try:
file_system = path_spec_resolver.Resolver.OpenFileSystem(
file_system_path_spec, resolver_context=self._resolver_context
)
path_spec_generator = self._path_spec_extractor.ExtractPathSpecs(
file_system_path_spec,
find_specs=included_find_specs,
recurse_file_system=False,
resolver_context=self._resolver_context,
)
for path_spec in path_spec_generator:
if self._abort:
break
if self._CheckExcludedPathSpec(file_system, path_spec):
display_name = parser_mediator.GetDisplayNameForPathSpec(
path_spec
)
logger.debug(f"Excluded from extraction: {display_name:s}.")
continue
# TODO: determine if event sources should be DataStream or FileEntry
# or both.
event_source = event_sources.FileEntryEventSource(
path_spec=path_spec
)
parser_mediator.ProduceEventSource(event_source)
except KeyboardInterrupt:
self._abort = True
self._processing_status.aborted = True
if self._status_update_callback:
self._status_update_callback(self._processing_status)
# All exceptions need to be caught here to prevent the process
# from being killed by an uncaught exception.
except Exception as exception: # pylint: disable=broad-except
parser_mediator.ProduceExtractionWarning(
(
f"unable to process path specification with error: "
f"{exception!s}"
),
file_system_path_spec,
)
def _ProcessEventData(self):
"""Generate events from event data."""
if self._processing_profiler:
self._processing_profiler.StartTiming("process_event_data")
self._status = definitions.STATUS_INDICATOR_TIMELINING
if self._processing_profiler:
self._processing_profiler.StartTiming("get_event_data")
event_data = self._storage_writer.GetFirstWrittenEventData()
if self._processing_profiler:
self._processing_profiler.StopTiming("get_event_data")
while event_data:
if self._abort:
break
event_data_stream_identifier = event_data.GetEventDataStreamIdentifier()
event_data_stream = None
if event_data_stream_identifier:
if self._processing_profiler:
self._processing_profiler.StartTiming("get_event_data_stream")
event_data_stream = (
self._storage_writer.GetAttributeContainerByIdentifier(
self._CONTAINER_TYPE_EVENT_DATA_STREAM,
event_data_stream_identifier,
)
)
if self._processing_profiler:
self._processing_profiler.StopTiming("get_event_data_stream")
self._event_data_timeliner.ProcessEventData(
self._storage_writer, event_data, event_data_stream
)
self._number_of_consumed_event_data += 1
self._number_of_produced_events += (
self._event_data_timeliner.number_of_produced_events
)
# TODO: track number of consumed event data containers?
if self._processing_profiler:
self._processing_profiler.StartTiming("get_event_data")
event_data = self._storage_writer.GetNextWrittenEventData()
if self._processing_profiler:
self._processing_profiler.StopTiming("get_event_data")
if self._abort:
self._status = definitions.STATUS_INDICATOR_ABORTED
else:
self._status = definitions.STATUS_INDICATOR_COMPLETED
if self._processing_profiler:
self._processing_profiler.StopTiming("process_event_data")
def _ProcessEventSources(self, storage_writer, parser_mediator):
"""Processes event sources.
Args:
storage_writer (StorageWriter): storage writer for a session storage.
parser_mediator (ParserMediator): mediates interactions between parsers
and other components, such as storage and dfVFS.
"""
self._status = definitions.STATUS_INDICATOR_RUNNING
if self._processing_profiler:
self._processing_profiler.StartTiming("get_event_source")
event_source = storage_writer.GetFirstWrittenEventSource()
if self._processing_profiler:
self._processing_profiler.StopTiming("get_event_source")
while event_source:
if self._abort:
break
self._ProcessPathSpec(parser_mediator, event_source.path_spec)
self._number_of_consumed_sources += 1
if self._processing_profiler:
self._processing_profiler.StartTiming("get_event_source")
event_source = storage_writer.GetNextWrittenEventSource()
if self._processing_profiler:
self._processing_profiler.StopTiming("get_event_source")
def _ProcessPathSpec(self, parser_mediator, path_spec):
"""Processes a path specification.
Args:
parser_mediator (ParserMediator): mediates interactions between parsers
and other components, such as storage and dfVFS.
path_spec (dfvfs.PathSpec): path specification.
"""
try:
self._current_display_name = parser_mediator.GetDisplayNameForPathSpec(
path_spec
)
file_entry = path_spec_resolver.Resolver.OpenFileEntry(
path_spec, resolver_context=parser_mediator.resolver_context
)
if file_entry is None:
logger.warning(
f"Unable to open file entry: {self._current_display_name:s}"
)
return
file_system = file_entry.GetFileSystem()
if (
path_spec
and not path_spec.IsSystemLevel()
and path_spec.type_indicator != dfvfs_definitions.TYPE_INDICATOR_GZIP
):
self._CacheFileSystem(file_system)
if self._CheckExcludedPathSpec(file_system, path_spec):
logger.debug(
f"Excluded from extraction: {self._current_display_name:s}."
)
return
self._extraction_worker.ProcessFileEntry(parser_mediator, file_entry)
except KeyboardInterrupt:
self._abort = True
self._processing_status.aborted = True
if self._status_update_callback:
self._status_update_callback(self._processing_status)
# All exceptions need to be caught here to prevent the process
# from being killed by an uncaught exception.
except Exception as exception: # pylint: disable=broad-except
parser_mediator.ProduceExtractionWarning(
(f"unable to process path specification with error: " f"{exception!s}"),
path_spec=path_spec,
)
if getattr(self._processing_configuration, "debug_output", False):
self._StopStatusUpdateThread()
logger.warning(
(
f"Unhandled exception while processing path spec: "
f"{self._current_display_name:s}."
)
)
logger.exception(exception)
pdb.post_mortem()
self._StartStatusUpdateThread()
def _ProcessSource(self, parser_mediator, file_system_path_specs):
"""Processes file systems within a source.
Args:
parser_mediator (ParserMediator): mediates interactions between parsers
and other components, such as storage and dfVFS.
file_system_path_specs (list[dfvfs.PathSpec]): path specifications of
the source file systems to process.
"""
self._current_display_name = ""
self._number_of_consumed_sources = 0
if self._processing_profiler:
self._processing_profiler.StartTiming("process_source")
self._CollectInitialEventSources(parser_mediator, file_system_path_specs)
if not self._abort:
self._ProcessEventSources(self._storage_writer, parser_mediator)
if self._processing_profiler:
self._processing_profiler.StopTiming("process_source")
if self._abort:
self._status = definitions.STATUS_INDICATOR_ABORTED
else:
self._status = definitions.STATUS_INDICATOR_COMPLETED
def _StartStatusUpdateThread(self):
"""Starts the status update thread."""
self._status_update_active = True
self._status_update_thread = threading.Thread(
name="Status update", target=self._StatusUpdateThreadMain
)
self._status_update_thread.start()
def _StatusUpdateThreadMain(self):
"""Main function of the status update thread."""
while self._status_update_active:
self._UpdateStatus()
time.sleep(self._status_update_interval)
def _StopStatusUpdateThread(self):
"""Stops the status update thread."""
if self._status_update_thread:
self._status_update_active = False
if self._status_update_thread.is_alive():
self._status_update_thread.join()
self._status_update_thread = None
def _UpdateStatus(self):
"""Updates the processing status."""
status = self._extraction_worker.processing_status
if status == definitions.STATUS_INDICATOR_IDLE:
status = self._status
if status == definitions.STATUS_INDICATOR_IDLE:
status = definitions.STATUS_INDICATOR_RUNNING
used_memory = self._process_information.GetUsedMemory() or 0
self._processing_status.UpdateForemanStatus(
self._name,
status,
self._pid,
used_memory,
self._current_display_name,
self._number_of_consumed_sources,
self._parser_mediator.number_of_produced_event_sources,
self._number_of_consumed_event_data,
self._parser_mediator.number_of_produced_event_data,
0,
self._number_of_produced_events,
0,
0,
0,
0,
)
if self._status_update_callback:
self._status_update_callback(self._processing_status)
def _CreateParserMediator(
self,
storage_writer,
resolver_context,
processing_configuration,
system_configurations,
windows_event_log_providers,
):
"""Creates a parser mediator.
Args:
storage_writer (StorageWriter): storage writer for a session storage.
resolver_context (dfvfs.Context): resolver context.
processing_configuration (ProcessingConfiguration): processing
configuration.
system_configurations (list[SystemConfigurationArtifact]): system
configurations.
windows_event_log_providers (list[WindowsEventLogProviderArtifact]):
Windows EventLog providers.
Returns:
ParserMediator: parser mediator.
Raises:
BadConfigOption: if an invalid collection filter was specified.
"""
# TODO: get environment_variables per system_configuration
environment_variables = None
if self.knowledge_base:
environment_variables = self.knowledge_base.GetEnvironmentVariables()
user_accounts = list(storage_writer.GetAttributeContainers("user_account"))
try:
self.BuildCollectionFilters(
environment_variables,
user_accounts,
artifact_filter_names=processing_configuration.artifact_filters,
filter_file_path=processing_configuration.filter_file,
)
except errors.InvalidFilter as exception:
raise errors.BadConfigOption(
f"Unable to build collection filters with error: {exception!s}"
)
parser_mediator = parsers_mediator.ParserMediator(
registry_find_specs=self._registry_find_specs,
resolver_context=resolver_context,
system_configurations=system_configurations,
)
parser_mediator.SetExtractWinEvtResources(
processing_configuration.extraction.extract_winevt_resources
)
parser_mediator.SetExtractWinRegBinaryValues(
processing_configuration.extraction.extract_winreg_binary
)
parser_mediator.SetPreferredCodepage(
processing_configuration.preferred_codepage
)
parser_mediator.SetPreferredLanguage(
processing_configuration.preferred_language
)
parser_mediator.SetTemporaryDirectory(
processing_configuration.temporary_directory
)
parser_mediator.SetWindowsEventLogProviders(windows_event_log_providers)
return parser_mediator
[docs]
def ProcessSource(
self,
storage_writer,
resolver_context,
processing_configuration,
system_configurations,
file_system_path_specs,
):
"""Processes file systems within a source.
Args:
storage_writer (StorageWriter): storage writer for a session storage.
resolver_context (dfvfs.Context): resolver context.
processing_configuration (ProcessingConfiguration): processing
configuration.
system_configurations (list[SystemConfigurationArtifact]): system
configurations.
file_system_path_specs (list[dfvfs.PathSpec]): path specifications of
the source file systems to process.
Returns:
ProcessingStatus: processing status.
Raises:
BadConfigOption: if an invalid collection filter was specified or if
the preferred time zone is invalid.
"""
if not self._artifacts_registry:
# TODO: refactor.
self.BuildArtifactsRegistry(
processing_configuration.artifact_definitions_path,
processing_configuration.custom_artifacts_path,
)
windows_event_log_providers = list(
storage_writer.GetAttributeContainers("windows_eventlog_provider")
)
parser_mediator = self._CreateParserMediator(
storage_writer,
resolver_context,
processing_configuration,
system_configurations,
windows_event_log_providers,
)
parser_mediator.SetStorageWriter(storage_writer)
self._extraction_worker = worker.EventExtractionWorker(
force_parser=processing_configuration.force_parser,
parser_filter_expression=(
processing_configuration.parser_filter_expression
),
)
self._extraction_worker.SetExtractionConfiguration(
processing_configuration.extraction
)
self._event_data_timeliner = timeliner.EventDataTimeliner(
data_location=processing_configuration.data_location,
preferred_year=processing_configuration.preferred_year,
system_configurations=system_configurations,
)
try:
self._event_data_timeliner.SetPreferredTimeZone(
processing_configuration.preferred_time_zone
)
except ValueError as exception:
raise errors.BadConfigOption(exception)
self._parser_mediator = parser_mediator
self._processing_configuration = processing_configuration
self._resolver_context = resolver_context
self._storage_writer = storage_writer
logger.debug("Processing started.")
parser_mediator.StartProfiling(
self._processing_configuration.profiling,
self._name,
self._process_information,
)
self._StartProfiling(self._processing_configuration.profiling)
if self._analyzers_profiler:
self._extraction_worker.SetAnalyzersProfiler(self._analyzers_profiler)
if self._processing_profiler:
self._extraction_worker.SetProcessingProfiler(self._processing_profiler)
if self._serializers_profiler:
self._storage_writer.SetSerializersProfiler(self._serializers_profiler)
if self._storage_profiler:
self._storage_writer.SetStorageProfiler(self._storage_profiler)
self._StartStatusUpdateThread()
self._data_types_counter = collections.Counter(
{
data_type_count.name: data_type_count
for data_type_count in self._storage_writer.GetAttributeContainers(
"data_type_count"
)
}
)
self._parsers_counter = collections.Counter(
{
parser_count.name: parser_count
for parser_count in self._storage_writer.GetAttributeContainers(
"parser_count"
)
}
)
try:
self._ProcessSource(parser_mediator, file_system_path_specs)
self._ProcessEventData()
finally:
# Stop the status update thread after close of the storage writer
# so we include the storage sync to disk in the status updates.
self._StopStatusUpdateThread()
if self._analyzers_profiler:
self._extraction_worker.SetAnalyzersProfiler(None)
if self._processing_profiler:
self._extraction_worker.SetProcessingProfiler(None)
if self._serializers_profiler:
self._storage_writer.SetSerializersProfiler(None)
if self._storage_profiler:
self._storage_writer.SetStorageProfiler(None)
self._StopProfiling()
parser_mediator.StopProfiling()
for key, value in self._event_data_timeliner.data_types_counter.items():
data_type_count = self._data_types_counter.get(key, None)
if data_type_count:
data_type_count.number_of_events += value
self._storage_writer.UpdateAttributeContainer(data_type_count)
else:
data_type_count = counts.DataTypeCount(name=key, number_of_events=value)
self._data_types_counter[key] = data_type_count
self._storage_writer.AddAttributeContainer(data_type_count)
for key, value in self._event_data_timeliner.parsers_counter.items():
parser_count = self._parsers_counter.get(key)
if parser_count:
parser_count.number_of_events += value
self._storage_writer.UpdateAttributeContainer(parser_count)
else:
parser_count = counts.ParserCount(name=key, number_of_events=value)
self._parsers_counter[key] = parser_count
self._storage_writer.AddAttributeContainer(parser_count)
# TODO: remove after completion event and event data split.
for key, value in parser_mediator.parsers_counter.items():
parser_count = self._parsers_counter.get(key)
if parser_count:
parser_count.number_of_events += value
self._storage_writer.UpdateAttributeContainer(parser_count)
else:
parser_count = counts.ParserCount(name=key, number_of_events=value)
self._parsers_counter[key] = parser_count
self._storage_writer.AddAttributeContainer(parser_count)
if self._abort:
logger.debug("Processing aborted.")
self._processing_status.aborted = True
else:
logger.debug("Processing completed.")
# Update the status view one last time.
self._UpdateStatus()
self._event_data_timeliner = None
self._extraction_worker = None
self._file_system_cache = []
self._parser_mediator = None
self._processing_configuration = None
self._resolver_context = None
self._storage_writer = None
return self._processing_status