Source code for plaso.preprocessors.manager

# -*- coding: utf-8 -*-
"""The preprocess plugins manager."""

from dfvfs.helpers import file_system_searcher
from dfvfs.helpers import windows_path_resolver
from dfvfs.lib import errors as dfvfs_errors
from dfwinreg import interface as dfwinreg_interface
from dfwinreg import regf as dfwinreg_regf
from dfwinreg import registry as dfwinreg_registry
from dfwinreg import registry_searcher

from plaso.lib import errors
from plaso.preprocessors import interface
from plaso.preprocessors import logger


[docs] class FileSystemWinRegistryFileReader(dfwinreg_interface.WinRegistryFileReader): """A file system-based Windows Registry file reader."""
[docs] def __init__(self, file_system, mount_point, environment_variables=None): """Initializes a Windows Registry file reader object. Args: file_system (dfvfs.FileSystem): file system. mount_point (dfvfs.PathSpec): mount point path specification. environment_variables (Optional[list[EnvironmentVariableArtifact]]): environment variables. """ super(FileSystemWinRegistryFileReader, self).__init__() self._file_system = file_system self._path_resolver = self._CreateWindowsPathResolver( file_system, mount_point, environment_variables=environment_variables)
def _CreateWindowsPathResolver( self, file_system, mount_point, environment_variables): """Create a Windows path resolver and sets the environment variables. Args: file_system (dfvfs.FileSystem): file system. mount_point (dfvfs.PathSpec): mount point path specification. environment_variables (list[EnvironmentVariableArtifact]): environment variables. Returns: dfvfs.WindowsPathResolver: Windows path resolver. """ if environment_variables is None: environment_variables = [] path_resolver = windows_path_resolver.WindowsPathResolver( file_system, mount_point) for environment_variable in environment_variables: name = environment_variable.name.lower() if name not in ('systemroot', 'userprofile'): continue path_resolver.SetEnvironmentVariable( environment_variable.name, environment_variable.value) return path_resolver def _OpenPathSpec(self, path_specification, ascii_codepage='cp1252'): """Opens the Windows Registry file specified by the path specification. Args: path_specification (dfvfs.PathSpec): path specification. ascii_codepage (Optional[str]): ASCII string codepage. Returns: WinRegistryFile: Windows Registry file or None. """ if not path_specification: return None file_entry = self._file_system.GetFileEntryByPathSpec(path_specification) if file_entry is None: return None file_object = file_entry.GetFileObject() if file_object is None: return None registry_file = dfwinreg_regf.REGFWinRegistryFile( ascii_codepage=ascii_codepage) try: registry_file.Open(file_object) except IOError as exception: logger.warning( f'Unable to open Windows Registry file with error: {exception!s}') return None return registry_file
[docs] def Open(self, path, ascii_codepage='cp1252'): """Opens the Windows Registry file specified by the path. Args: path (str): path of the Windows Registry file. ascii_codepage (Optional[str]): ASCII string codepage. Returns: WinRegistryFile: Windows Registry file or None. """ path_specification = None try: path_specification = self._path_resolver.ResolvePath(path) except dfvfs_errors.BackEndError as exception: logger.warning(( f'Unable to open Windows Registry file: {path:s} with error: ' f'{exception!s}')) if path_specification is None: return None return self._OpenPathSpec(path_specification)
[docs] class PreprocessPluginsManager(object): """Preprocess plugins manager.""" _plugins = {} _file_system_plugins = {} # TODO: rename knowledge base plugins. _knowledge_base_plugins = {} _windows_registry_plugins = {}
[docs] @classmethod def CollectFromFileSystem( cls, artifacts_registry, mediator, searcher, file_system): """Collects values from Windows Registry values. Args: artifacts_registry (artifacts.ArtifactDefinitionsRegistry): artifacts definitions registry. mediator (PreprocessMediator): mediates interactions between preprocess plugins and other components, such as storage and knowledge base. searcher (dfvfs.FileSystemSearcher): file system searcher to preprocess the file system. file_system (dfvfs.FileSystem): file system to be preprocessed. """ for preprocess_plugin in cls._file_system_plugins.values(): artifact_definition = None if preprocess_plugin.ARTIFACT_DEFINITION_NAME: artifact_definition = artifacts_registry.GetDefinitionByName( preprocess_plugin.ARTIFACT_DEFINITION_NAME) if not artifact_definition: artifact_definition = artifacts_registry.GetDefinitionByAlias( preprocess_plugin.ARTIFACT_DEFINITION_NAME) if not artifact_definition: logger.warning(( f'Missing artifact definition: ' f'{preprocess_plugin.ARTIFACT_DEFINITION_NAME:s}')) continue class_name = preprocess_plugin.__class__.__name__ definition_name = preprocess_plugin.ARTIFACT_DEFINITION_NAME or 'N/A' logger.debug(( f'Running file system preprocessor plugin: {class_name:s} with ' f'artifact definition: {definition_name:s}')) try: preprocess_plugin.Collect( mediator, artifact_definition, searcher, file_system) except (IOError, errors.PreProcessFail) as exception: logger.warning(( f'Preprocessor plugin: {class_name:s} with artifact definition: ' f'{definition_name:s} was unable to collect value with error: ' f'{exception!s}'))
[docs] @classmethod def CollectFromKnowledgeBase(cls, mediator): """Collects values from knowledge base values. Args: mediator (PreprocessMediator): mediates interactions between preprocess plugins and other components, such as storage and knowledge base. """ for preprocess_plugin in cls._knowledge_base_plugins.values(): class_name = preprocess_plugin.__class__.__name__ logger.debug( f'Running knowledge base preprocessor plugin: {class_name:s}') try: preprocess_plugin.Collect(mediator) except errors.PreProcessFail as exception: logger.warning( f'Unable to collect knowledge base value with error: {exception!s}')
[docs] @classmethod def CollectFromWindowsRegistry(cls, artifacts_registry, mediator, searcher): """Collects values from Windows Registry values. Args: artifacts_registry (artifacts.ArtifactDefinitionsRegistry): artifacts definitions registry. mediator (PreprocessMediator): mediates interactions between preprocess plugins and other components, such as storage and knowledge base. searcher (dfwinreg.WinRegistrySearcher): Windows Registry searcher to preprocess the Windows Registry. """ # TODO: define preprocessing plugin dependency and sort preprocess_plugins # for now sort alphabetically to ensure WindowsAvailableTimeZones is run # before WindowsTimezone. for _, preprocess_plugin in sorted(cls._windows_registry_plugins.items()): artifact_definition = artifacts_registry.GetDefinitionByName( preprocess_plugin.ARTIFACT_DEFINITION_NAME) if not artifact_definition: artifact_definition = artifacts_registry.GetDefinitionByAlias( preprocess_plugin.ARTIFACT_DEFINITION_NAME) if not artifact_definition: logger.warning(( f'Missing artifact definition: ' f'{preprocess_plugin.ARTIFACT_DEFINITION_NAME:s}')) continue logger.debug(( f'Running Windows Registry preprocessor plugin: ' f'{preprocess_plugin.ARTIFACT_DEFINITION_NAME:s}')) try: preprocess_plugin.Collect(mediator, artifact_definition, searcher) except (IOError, errors.PreProcessFail) as exception: logger.warning(( f'Unable to collect value from artifact definition: ' f'{preprocess_plugin.ARTIFACT_DEFINITION_NAME:s} with error: ' f'{exception!s}'))
[docs] @classmethod def DeregisterPlugin(cls, plugin_class): """Deregisters an preprocess plugin class. Args: plugin_class (type): preprocess plugin class. Raises: KeyError: if plugin class is not set for the corresponding name. TypeError: if the source type of the plugin class is not supported. """ name = (getattr(plugin_class, 'ARTIFACT_DEFINITION_NAME', None) or plugin_class.__name__) name = name.lower() if name not in cls._plugins: raise KeyError(f'Artifact plugin class not set for name: {name:s}.') del cls._plugins[name] if name in cls._file_system_plugins: del cls._file_system_plugins[name] if name in cls._knowledge_base_plugins: del cls._knowledge_base_plugins[name] if name in cls._windows_registry_plugins: del cls._windows_registry_plugins[name]
[docs] @classmethod def GetNames(cls): """Retrieves the names of the registered artifact definitions. Returns: list[str]: registered artifact definitions names. """ names = [] for plugin_class in cls._plugins.values(): name = getattr(plugin_class, 'ARTIFACT_DEFINITION_NAME', None) if name: names.append(name) return names
[docs] @classmethod def RegisterPlugin(cls, plugin_class): """Registers an preprocess plugin class. Args: plugin_class (type): preprocess plugin class. Raises: KeyError: if plugin class is already set for the corresponding name. TypeError: if the source type of the plugin class is not supported. """ name = (getattr(plugin_class, 'ARTIFACT_DEFINITION_NAME', None) or plugin_class.__name__) name = name.lower() if name in cls._plugins: raise KeyError(f'Artifact plugin class already set for name: {name:s}.') preprocess_plugin = plugin_class() cls._plugins[name] = preprocess_plugin if isinstance( preprocess_plugin, interface.FileSystemArtifactPreprocessorPlugin): cls._file_system_plugins[name] = preprocess_plugin elif isinstance( preprocess_plugin, interface.KnowledgeBasePreprocessorPlugin): cls._knowledge_base_plugins[name] = preprocess_plugin elif isinstance( preprocess_plugin, interface.WindowsRegistryKeyArtifactPreprocessorPlugin): cls._windows_registry_plugins[name] = preprocess_plugin
[docs] @classmethod def RegisterPlugins(cls, plugin_classes): """Registers preprocess plugin classes. Args: plugin_classes (list[type]): preprocess plugin classes. Raises: KeyError: if plugin class is already set for the corresponding name. """ for plugin_class in plugin_classes: cls.RegisterPlugin(plugin_class)
[docs] @classmethod def RunPlugins(cls, artifacts_registry, file_system, mount_point, mediator): """Runs the preprocessing plugins. Args: artifacts_registry (artifacts.ArtifactDefinitionsRegistry): artifacts definitions registry. file_system (dfvfs.FileSystem): file system to be preprocessed. mount_point (dfvfs.PathSpec): mount point path specification that refers to the base location of the file system. mediator (PreprocessMediator): mediates interactions between preprocess plugins and other components, such as storage and knowledge base. """ searcher = file_system_searcher.FileSystemSearcher(file_system, mount_point) cls.CollectFromFileSystem( artifacts_registry, mediator, searcher, file_system) # Run the Registry plugins separately so we do not have to open # Registry files for every preprocess plugin. environment_variables = mediator.GetEnvironmentVariables() registry_file_reader = FileSystemWinRegistryFileReader( file_system, mount_point, environment_variables=environment_variables) win_registry = dfwinreg_registry.WinRegistry( registry_file_reader=registry_file_reader) searcher = registry_searcher.WinRegistrySearcher(win_registry) cls.CollectFromWindowsRegistry(artifacts_registry, mediator, searcher) cls.CollectFromKnowledgeBase(mediator)