Source code for plaso.preprocessors.manager

# -*- coding: utf-8 -*-
"""The preprocess plugins manager."""

from dfvfs.helpers import file_system_searcher
from dfvfs.helpers import windows_path_resolver
from dfvfs.lib import errors as dfvfs_errors
from dfwinreg import interface as dfwinreg_interface
from dfwinreg import regf as dfwinreg_regf
from dfwinreg import registry as dfwinreg_registry
from dfwinreg import registry_searcher

from plaso.lib import errors
from plaso.preprocessors import interface
from plaso.preprocessors import logger


[docs]class FileSystemWinRegistryFileReader(dfwinreg_interface.WinRegistryFileReader): """A file system-based Windows Registry file reader.""" def __init__(self, file_system, mount_point, environment_variables=None): """Initializes a Windows Registry file reader object. Args: file_system (dfvfs.FileSystem): file system. mount_point (dfvfs.PathSpec): mount point path specification. environment_variables (Optional[list[EnvironmentVariableArtifact]]): environment variables. """ super(FileSystemWinRegistryFileReader, self).__init__() self._file_system = file_system self._path_resolver = self._CreateWindowsPathResolver( file_system, mount_point, environment_variables=environment_variables) def _CreateWindowsPathResolver( self, file_system, mount_point, environment_variables): """Create a Windows path resolver and sets the environment variables. Args: file_system (dfvfs.FileSystem): file system. mount_point (dfvfs.PathSpec): mount point path specification. environment_variables (list[EnvironmentVariableArtifact]): environment variables. Returns: dfvfs.WindowsPathResolver: Windows path resolver. """ if environment_variables is None: environment_variables = [] path_resolver = windows_path_resolver.WindowsPathResolver( file_system, mount_point) for environment_variable in environment_variables: name = environment_variable.name.lower() if name not in ('systemroot', 'userprofile'): continue path_resolver.SetEnvironmentVariable( environment_variable.name, environment_variable.value) return path_resolver def _OpenPathSpec(self, path_specification, ascii_codepage='cp1252'): """Opens the Windows Registry file specified by the path specification. Args: path_specification (dfvfs.PathSpec): path specification. ascii_codepage (Optional[str]): ASCII string codepage. Returns: WinRegistryFile: Windows Registry file or None. """ if not path_specification: return None file_entry = self._file_system.GetFileEntryByPathSpec(path_specification) if file_entry is None: return None file_object = file_entry.GetFileObject() if file_object is None: return None registry_file = dfwinreg_regf.REGFWinRegistryFile( ascii_codepage=ascii_codepage) try: registry_file.Open(file_object) except IOError as exception: logger.warning( 'Unable to open Windows Registry file with error: {0!s}'.format( exception)) return None return registry_file
[docs] def Open(self, path, ascii_codepage='cp1252'): """Opens the Windows Registry file specified by the path. Args: path (str): path of the Windows Registry file. ascii_codepage (Optional[str]): ASCII string codepage. Returns: WinRegistryFile: Windows Registry file or None. """ path_specification = None try: path_specification = self._path_resolver.ResolvePath(path) except dfvfs_errors.BackEndError as exception: logger.warning(( 'Unable to open Windows Registry file: {0:s} with error: ' '{1!s}').format(path, exception)) if path_specification is None: return None return self._OpenPathSpec(path_specification)
[docs]class PreprocessPluginsManager(object): """Preprocess plugins manager.""" _plugins = {} _file_system_plugins = {} _knowledge_base_plugins = {} _windows_registry_plugins = {}
[docs] @classmethod def CollectFromFileSystem( cls, artifacts_registry, mediator, searcher, file_system): """Collects values from Windows Registry values. Args: artifacts_registry (artifacts.ArtifactDefinitionsRegistry): artifacts definitions registry. mediator (PreprocessMediator): mediates interactions between preprocess plugins and other components, such as storage and knowledge base. searcher (dfvfs.FileSystemSearcher): file system searcher to preprocess the file system. file_system (dfvfs.FileSystem): file system to be preprocessed. """ for preprocess_plugin in cls._file_system_plugins.values(): artifact_definition = artifacts_registry.GetDefinitionByName( preprocess_plugin.ARTIFACT_DEFINITION_NAME) if not artifact_definition: logger.warning('Missing artifact definition: {0:s}'.format( preprocess_plugin.ARTIFACT_DEFINITION_NAME)) continue logger.debug('Running file system preprocessor plugin: {0:s}'.format( preprocess_plugin.ARTIFACT_DEFINITION_NAME)) try: preprocess_plugin.Collect( mediator, artifact_definition, searcher, file_system) except (IOError, errors.PreProcessFail) as exception: logger.warning(( 'Unable to collect value from artifact definition: {0:s} ' 'with error: {1!s}').format( preprocess_plugin.ARTIFACT_DEFINITION_NAME, exception))
[docs] @classmethod def CollectFromKnowledgeBase(cls, mediator): """Collects values from knowledge base values. Args: mediator (PreprocessMediator): mediates interactions between preprocess plugins and other components, such as storage and knowledge base. """ for preprocess_plugin in cls._knowledge_base_plugins.values(): logger.debug('Running knowledge base preprocessor plugin: {0:s}'.format( preprocess_plugin.__class__.__name__)) try: preprocess_plugin.Collect(mediator) except errors.PreProcessFail as exception: logger.warning( 'Unable to collect knowledge base value with error: {0!s}'.format( exception))
[docs] @classmethod def CollectFromWindowsRegistry(cls, artifacts_registry, mediator, searcher): """Collects values from Windows Registry values. Args: artifacts_registry (artifacts.ArtifactDefinitionsRegistry): artifacts definitions registry. mediator (PreprocessMediator): mediates interactions between preprocess plugins and other components, such as storage and knowledge base. searcher (dfwinreg.WinRegistrySearcher): Windows Registry searcher to preprocess the Windows Registry. """ # TODO: define preprocessing plugin dependency and sort preprocess_plugins # for now sort alphabetically to ensure WindowsAvailableTimeZones is run # before WindowsTimezone. for _, preprocess_plugin in sorted(cls._windows_registry_plugins.items()): artifact_definition = artifacts_registry.GetDefinitionByName( preprocess_plugin.ARTIFACT_DEFINITION_NAME) if not artifact_definition: logger.warning('Missing artifact definition: {0:s}'.format( preprocess_plugin.ARTIFACT_DEFINITION_NAME)) continue logger.debug('Running Windows Registry preprocessor plugin: {0:s}'.format( preprocess_plugin.ARTIFACT_DEFINITION_NAME)) try: preprocess_plugin.Collect(mediator, artifact_definition, searcher) except (IOError, errors.PreProcessFail) as exception: logger.warning(( 'Unable to collect value from artifact definition: {0:s} ' 'with error: {1!s}').format( preprocess_plugin.ARTIFACT_DEFINITION_NAME, exception))
[docs] @classmethod def DeregisterPlugin(cls, plugin_class): """Deregisters an preprocess plugin class. Args: plugin_class (type): preprocess plugin class. Raises: KeyError: if plugin class is not set for the corresponding name. TypeError: if the source type of the plugin class is not supported. """ name = getattr( plugin_class, 'ARTIFACT_DEFINITION_NAME', plugin_class.__name__) name = name.lower() if name not in cls._plugins: raise KeyError( 'Artifact plugin class not set for name: {0:s}.'.format(name)) del cls._plugins[name] if name in cls._file_system_plugins: del cls._file_system_plugins[name] if name in cls._knowledge_base_plugins: del cls._knowledge_base_plugins[name] if name in cls._windows_registry_plugins: del cls._windows_registry_plugins[name]
[docs] @classmethod def GetNames(cls): """Retrieves the names of the registered artifact definitions. Returns: list[str]: registered artifact definitions names. """ names = [] for plugin_class in cls._plugins.values(): name = getattr(plugin_class, 'ARTIFACT_DEFINITION_NAME', None) if name: names.append(name) return names
[docs] @classmethod def RegisterPlugin(cls, plugin_class): """Registers an preprocess plugin class. Args: plugin_class (type): preprocess plugin class. Raises: KeyError: if plugin class is already set for the corresponding name. TypeError: if the source type of the plugin class is not supported. """ name = getattr( plugin_class, 'ARTIFACT_DEFINITION_NAME', plugin_class.__name__) name = name.lower() if name in cls._plugins: raise KeyError( 'Artifact plugin class already set for name: {0:s}.'.format(name)) preprocess_plugin = plugin_class() cls._plugins[name] = preprocess_plugin if isinstance( preprocess_plugin, interface.FileSystemArtifactPreprocessorPlugin): cls._file_system_plugins[name] = preprocess_plugin elif isinstance( preprocess_plugin, interface.KnowledgeBasePreprocessorPlugin): cls._knowledge_base_plugins[name] = preprocess_plugin elif isinstance( preprocess_plugin, interface.WindowsRegistryKeyArtifactPreprocessorPlugin): cls._windows_registry_plugins[name] = preprocess_plugin
[docs] @classmethod def RegisterPlugins(cls, plugin_classes): """Registers preprocess plugin classes. Args: plugin_classes (list[type]): preprocess plugin classes. Raises: KeyError: if plugin class is already set for the corresponding name. """ for plugin_class in plugin_classes: cls.RegisterPlugin(plugin_class)
[docs] @classmethod def RunPlugins(cls, artifacts_registry, file_system, mount_point, mediator): """Runs the preprocessing plugins. Args: artifacts_registry (artifacts.ArtifactDefinitionsRegistry): artifacts definitions registry. file_system (dfvfs.FileSystem): file system to be preprocessed. mount_point (dfvfs.PathSpec): mount point path specification that refers to the base location of the file system. mediator (PreprocessMediator): mediates interactions between preprocess plugins and other components, such as storage and knowledge base. """ searcher = file_system_searcher.FileSystemSearcher(file_system, mount_point) cls.CollectFromFileSystem( artifacts_registry, mediator, searcher, file_system) # Run the Registry plugins separately so we do not have to open # Registry files for every preprocess plugin. environment_variables = None if mediator.knowledge_base: environment_variables = mediator.knowledge_base.GetEnvironmentVariables() registry_file_reader = FileSystemWinRegistryFileReader( file_system, mount_point, environment_variables=environment_variables) win_registry = dfwinreg_registry.WinRegistry( registry_file_reader=registry_file_reader) searcher = registry_searcher.WinRegistrySearcher(win_registry) cls.CollectFromWindowsRegistry(artifacts_registry, mediator, searcher) cls.CollectFromKnowledgeBase(mediator) if not mediator.knowledge_base.HasUserAccounts(): logger.warning('Unable to find any user accounts on the system.')