"""The processing engine."""
import os
from artifacts import errors as artifacts_errors
from artifacts import reader as artifacts_reader
from artifacts import registry as artifacts_registry
from dfvfs.lib import errors as dfvfs_errors
from dfvfs.path import factory as path_spec_factory
from dfvfs.resolver import resolver as path_spec_resolver
from plaso.containers import artifacts
from plaso.containers import sessions
from plaso.engine import artifact_filters
from plaso.engine import knowledge_base
from plaso.engine import logger
from plaso.engine import path_filters
from plaso.engine import processing_status
from plaso.engine import profilers
from plaso.engine import yaml_filter_file
from plaso.lib import errors
from plaso.preprocessors import manager as preprocess_manager
from plaso.preprocessors import mediator as preprocess_mediator
[docs]
class BaseEngine:
"""Processing engine interface.
Attributes:
knowledge_base (KnowledgeBase): knowledge base.
"""
_WINDOWS_REGISTRY_FILES_ARTIFACT_NAMES = [
"WindowsSystemRegistryFiles",
"WindowsUserRegistryFiles",
]
[docs]
def __init__(self):
"""Initializes an engine."""
super().__init__()
self._abort = False
self._analyzers_profiler = None
self._artifacts_registry = None
self._artifacts_trie = None
self._excluded_file_system_find_specs = None
self._included_file_system_find_specs = None
self._memory_profiler = None
self._name = "Main"
self._processing_status = processing_status.ProcessingStatus()
self._processing_profiler = None
self._registry_find_specs = None
self._serializers_profiler = None
# The interval of status updates in number of seconds.
self._status_update_interval = 0.5
self._storage_profiler = None
self._task_queue_profiler = None
self.knowledge_base = knowledge_base.KnowledgeBase()
def _StartProfiling(self, configuration):
"""Starts profiling.
Args:
configuration (ProfilingConfiguration): profiling configuration.
"""
if not configuration:
return
if configuration.HaveProfileMemory():
self._memory_profiler = profilers.MemoryProfiler(self._name, configuration)
self._memory_profiler.Start()
if configuration.HaveProfileAnalyzers():
self._analyzers_profiler = profilers.AnalyzersProfiler(
f"{self._name:s}-analyzers", configuration
)
self._analyzers_profiler.Start()
if configuration.HaveProfileProcessing():
self._processing_profiler = profilers.ProcessingProfiler(
f"{self._name:s}-processing", configuration
)
self._processing_profiler.Start()
if configuration.HaveProfileSerializers():
self._serializers_profiler = profilers.SerializersProfiler(
f"{self._name:s}-serializers", configuration
)
self._serializers_profiler.Start()
if configuration.HaveProfileStorage():
self._storage_profiler = profilers.StorageProfiler(
self._name, configuration
)
self._storage_profiler.Start()
if configuration.HaveProfileTaskQueue():
self._task_queue_profiler = profilers.TaskQueueProfiler(
self._name, configuration
)
self._task_queue_profiler.Start()
def _StopProfiling(self):
"""Stops profiling."""
if self._memory_profiler:
self._memory_profiler.Stop()
self._memory_profiler = None
if self._analyzers_profiler:
self._analyzers_profiler.Stop()
self._analyzers_profiler = None
if self._processing_profiler:
self._processing_profiler.Stop()
self._processing_profiler = None
if self._serializers_profiler:
self._serializers_profiler.Stop()
self._serializers_profiler = None
if self._storage_profiler:
self._storage_profiler.Stop()
self._storage_profiler = None
if self._task_queue_profiler:
self._task_queue_profiler.Stop()
self._task_queue_profiler = None
[docs]
def BuildArtifactsRegistry(self, artifact_definitions_path, custom_artifacts_path):
"""Builds an artificats definition registry.
Args:
artifact_definitions_path (str): path to artifact definitions directory
or file.
custom_artifacts_path (str): path to custom artifact definitions
directory or file.
Raises:
BadConfigOption: if artifact definitions cannot be read.
"""
if not artifact_definitions_path:
raise errors.BadConfigOption("Missing artifact definitions path.")
registry = artifacts_registry.ArtifactDefinitionsRegistry()
reader = artifacts_reader.YamlArtifactsReader()
try:
if os.path.isdir(artifact_definitions_path):
registry.ReadFromDirectory(reader, artifact_definitions_path)
else:
registry.ReadFromFile(reader, artifact_definitions_path)
except (KeyError, artifacts_errors.FormatError) as exception:
raise errors.BadConfigOption(
(
f"Unable to read artifact definitions from: "
f"{artifact_definitions_path:s} with error: {exception!s}"
)
)
if custom_artifacts_path:
try:
if os.path.isdir(custom_artifacts_path):
registry.ReadFromDirectory(reader, custom_artifacts_path)
else:
registry.ReadFromFile(reader, custom_artifacts_path)
except (KeyError, artifacts_errors.FormatError) as exception:
raise errors.BadConfigOption(
(
f"Unable to read custom artifact definitions from: "
f"{custom_artifacts_path:s} with error: {exception!s}"
)
)
self._artifacts_registry = registry
[docs]
def BuildCollectionFilters(
self,
environment_variables,
user_accounts,
artifact_filter_names=None,
filter_file_path=None,
enable_artifacts_map=False,
):
"""Builds collection filters from artifacts or filter file if available.
Args:
environment_variables (list[EnvironmentVariableArtifact]):
environment variables.
user_accounts (list[UserAccountArtifact]): user accounts.
artifact_filter_names (Optional[list[str]]): names of artifact
definitions that are used for filtering file system and Windows
Registry key paths.
filter_file_path (Optional[str]): path of filter file.
enable_artifacts_map (Optional[bool]): True if the artifacts path map
should be generated. Defaults to False.
Raises:
InvalidFilter: if no valid file system find specifications are built.
"""
filters_helper = None
if artifact_filter_names:
names = ", ".join(artifact_filter_names)
logger.debug(f"building find specification based on artifacts: {names:s}")
filters_helper = artifact_filters.ArtifactDefinitionsFiltersHelper(
self._artifacts_registry
)
filters_helper.BuildFindSpecs(
artifact_filter_names,
environment_variables=environment_variables,
user_accounts=user_accounts,
enable_artifacts_map=enable_artifacts_map,
)
# If the user selected Windows Registry artifacts we have to ensure
# the Windows Registry files are parsed.
if filters_helper.registry_find_specs:
filters_helper.BuildFindSpecs(
self._WINDOWS_REGISTRY_FILES_ARTIFACT_NAMES,
environment_variables=environment_variables,
user_accounts=user_accounts,
enable_artifacts_map=enable_artifacts_map,
original_registry_artifact_filter_names=(
filters_helper.registry_find_specs_artifact_names
),
)
if not filters_helper.file_system_find_specs:
raise errors.InvalidFilter(
"No valid file system find specifications were built from "
"artifacts."
)
self._included_file_system_find_specs = (
filters_helper.file_system_find_specs
)
self._registry_find_specs = filters_helper.registry_find_specs
self._artifacts_trie = filters_helper.artifacts_trie
elif filter_file_path:
logger.debug(
(
f"building find specification based on filter file: "
f"{filter_file_path:s}"
)
)
filter_file_object = yaml_filter_file.YAMLFilterFile()
filter_file_path_filters = filter_file_object.ReadFromFile(filter_file_path)
filters_helper = path_filters.PathCollectionFiltersHelper()
filters_helper.BuildFindSpecs(
filter_file_path_filters, environment_variables=environment_variables
)
if (
not filters_helper.excluded_file_system_find_specs
and not filters_helper.included_file_system_find_specs
):
raise errors.InvalidFilter(
f"No valid file system find specifications were built from filter "
f"file: {filter_file_path:s}."
)
self._excluded_file_system_find_specs = (
filters_helper.excluded_file_system_find_specs
)
self._included_file_system_find_specs = (
filters_helper.included_file_system_find_specs
)
# pylint: disable=too-many-arguments
[docs]
@classmethod
def CreateSession(
cls,
artifact_filter_names=None,
command_line_arguments=None,
debug_mode=False,
filter_file_path=None,
preferred_encoding="utf-8",
):
"""Creates a session attribute container.
Args:
artifact_filter_names (Optional[list[str]]): names of artifact definitions
that are used for filtering file system and Windows Registry
key paths.
command_line_arguments (Optional[str]): the command line arguments.
debug_mode (Option[bool]): True if debug mode was enabled.
filter_file_path (Optional[str]): path to a file with find specifications.
preferred_encoding (Optional[str]): preferred encoding.
Returns:
Session: session attribute container.
"""
session = sessions.Session()
session.artifact_filters = artifact_filter_names
session.command_line_arguments = command_line_arguments
session.debug_mode = debug_mode
session.filter_file = filter_file_path
session.preferred_encoding = preferred_encoding
return session
[docs]
def GetArtifactsTrie(self):
"""Retrieves the artifacts trie.
Returns:
ArtifactsTrie: artifacts trie.
"""
return self._artifacts_trie
[docs]
def GetCollectionExcludedFindSpecs(self):
"""Retrieves find specifications to exclude from collection.
Returns:
list[dfvfs.FindSpec]: find specifications to exclude from collection.
"""
return self._excluded_file_system_find_specs or []
[docs]
def GetCollectionIncludedFindSpecs(self):
"""Retrieves find specifications to include in collection.
Returns:
list[dfvfs.FindSpec]: find specifications to include in collection.
"""
return self._included_file_system_find_specs or []
[docs]
def GetSourceFileSystem(self, file_system_path_spec, resolver_context=None):
"""Retrieves the file system of the source.
Args:
file_system_path_spec (dfvfs.PathSpec): path specifications of
the source file system to process.
resolver_context (dfvfs.Context): resolver context.
Returns:
tuple[dfvfs.FileSystem, path.PathSpec]: file system and mount point path
specification. The mount point path specification refers to either a
directory or a volume on a storage media device or image. It is
needed by the dfVFS file system to indicate the base location of the
file system.
Raises:
RuntimeError: if source file system path specification is not set.
"""
if not file_system_path_spec:
raise RuntimeError("Missing source file system path specification.")
file_system = path_spec_resolver.Resolver.OpenFileSystem(
file_system_path_spec, resolver_context=resolver_context
)
type_indicator = file_system_path_spec.type_indicator
if path_spec_factory.Factory.IsSystemLevelTypeIndicator(type_indicator):
mount_point = file_system_path_spec
else:
mount_point = file_system_path_spec.parent
return file_system, mount_point
[docs]
def PreprocessSource(
self, file_system_path_specs, storage_writer, resolver_context=None
):
"""Preprocesses a source.
Args:
file_system_path_specs (list[dfvfs.PathSpec]): path specifications of
the source file systems to process.
storage_writer (StorageWriter): storage writer.
resolver_context (Optional[dfvfs.Context]): resolver context.
Returns:
list[SystemConfigurationArtifact]: system configurations found in
the source.
"""
mediator = preprocess_mediator.PreprocessMediator(storage_writer)
detected_operating_systems = []
system_configurations = []
for path_spec in file_system_path_specs:
try:
file_system, mount_point = self.GetSourceFileSystem(
path_spec, resolver_context=resolver_context
)
except (RuntimeError, dfvfs_errors.BackEndError) as exception:
logger.error(exception)
continue
preprocess_manager.PreprocessPluginsManager.RunPlugins(
self._artifacts_registry, file_system, mount_point, mediator
)
operating_system = mediator.GetValue("operating_system")
if not operating_system:
continue
detected_operating_systems.append(operating_system)
system_configuration = artifacts.SystemConfigurationArtifact(
code_page=mediator.code_page, language=mediator.language
)
# Ensure environment_variables is a list otherwise serialization will
# fail.
system_configuration.environment_variables = list(
mediator.GetEnvironmentVariables()
)
system_configuration.hostname = mediator.hostname
system_configuration.keyboard_layout = mediator.GetValue("keyboard_layout")
system_configuration.operating_system = mediator.GetValue(
"operating_system"
)
system_configuration.operating_system_product = mediator.GetValue(
"operating_system_product"
)
system_configuration.operating_system_version = mediator.GetValue(
"operating_system_version"
)
# TODO: add support for multi file system system configurations.
system_configuration.path_specs = [path_spec]
if mediator.time_zone:
system_configuration.time_zone = mediator.time_zone.zone
system_configurations.append(system_configuration)
mediator.Reset()
if system_configurations:
# TODO: kept for backwards compatibility.
self.knowledge_base.ReadSystemConfigurationArtifact(
system_configurations[0]
)
for environment_variable in system_configuration.environment_variables:
self.knowledge_base.AddEnvironmentVariable(environment_variable)
return system_configurations
[docs]
def SetStatusUpdateInterval(self, status_update_interval):
"""Sets the status update interval.
Args:
status_update_interval (float): status update interval.
"""
self._status_update_interval = status_update_interval