# -*- coding: utf-8 -*-
"""The parsers and plugins interface classes."""
import abc
import os
from plaso.lib import errors
[docs]
class BaseFileEntryFilter(object):
"""File entry filter interface."""
# pylint: disable=redundant-returns-doc
[docs]
@abc.abstractmethod
def Match(self, file_entry):
"""Determines if a file entry matches the filter.
Args:
file_entry (dfvfs.FileEntry): a file entry.
Returns:
bool: True if the file entry matches the filter.
"""
[docs]
class FileNameFileEntryFilter(BaseFileEntryFilter):
"""File name file entry filter."""
[docs]
def __init__(self, filename):
"""Initializes a file entry filter.
Args:
filename (str): name of the file.
"""
super(FileNameFileEntryFilter, self).__init__()
self._filename = filename.lower()
[docs]
def Match(self, file_entry):
"""Determines if a file entry matches the filter.
Args:
file_entry (dfvfs.FileEntry): a file entry.
Returns:
bool: True if the file entry matches the filter.
"""
if not file_entry:
return False
filename = file_entry.name.lower()
return filename == self._filename
[docs]
class BaseParser(object):
"""The parser interface."""
# The name of the parser. This is the name that is used in the registration
# and used for parser/plugin selection, so this needs to be concise and unique
# for all plugins/parsers, such as 'Chrome', 'Safari' or 'UserAssist'.
NAME = 'base_parser'
# Data format supported by the parser plugin. This information is used by
# the parser manager to generate parser and plugin information.
DATA_FORMAT = ''
# List of filters that should match for the parser to be applied.
FILTERS = frozenset()
ALL_PLUGINS = set(['*'])
# Every derived parser class that implements plugins should define
# its own _plugin_classes dict:
# _plugin_classes = {}
# We deliberately don't define it here to make sure the plugins of
# different parser classes don't end up in the same dict.
_plugin_classes = None
[docs]
def __init__(self):
"""Initializes a parser.
By default all plugins will be enabled. To only enable specific plugins use
the EnablePlugins method and pass it a list of strings containing the names
of the plugins to enable.
The default plugin, named "{self.NAME:s}_default", if it exists, is always
enabled and cannot be disabled.
"""
super(BaseParser, self).__init__()
self._default_plugin = None
self._default_plugin_name = '{0:s}_default'.format(self.NAME)
self._plugins_per_name = None
self.EnablePlugins(self.ALL_PLUGINS)
[docs]
@classmethod
def DeregisterPlugin(cls, plugin_class):
"""Deregisters a plugin class.
The plugin classes are identified based on their lower case name.
Args:
plugin_class (type): class of the plugin.
Raises:
KeyError: if plugin class is not set for the corresponding name.
"""
plugin_name = plugin_class.NAME.lower()
if plugin_name not in cls._plugin_classes:
raise KeyError(
'Plugin class not set for name: {0:s}.'.format(
plugin_class.NAME))
del cls._plugin_classes[plugin_name]
[docs]
def EnablePlugins(self, plugin_includes):
"""Enables parser plugins.
Args:
plugin_includes (set[str]): names of the plugins to enable, where
set(['*']) represents all plugins. Note the default plugin, if
it exists, is always enabled and cannot be disabled.
"""
self._plugins_per_name = {}
if not self._plugin_classes:
return
for plugin_name, plugin_class in self._plugin_classes.items():
if plugin_name == self._default_plugin_name:
self._default_plugin = plugin_class()
continue
if (plugin_includes != self.ALL_PLUGINS and
plugin_name not in plugin_includes):
continue
plugin_object = plugin_class()
self._plugins_per_name[plugin_name] = plugin_object
# TODO: move this to a filter.
# pylint: disable=redundant-returns-doc
[docs]
@classmethod
def GetPluginNames(cls):
"""Retrieves the names of registered plugins.
Returns:
list[str]: names of the plugins.
"""
return list(cls._plugin_classes.keys())
[docs]
@classmethod
def GetPluginObjectByName(cls, plugin_name):
"""Retrieves a specific plugin object by its name.
Args:
plugin_name (str): name of the plugin.
Returns:
BasePlugin: a plugin object or None if not available.
"""
plugin_class = cls._plugin_classes.get(plugin_name, None)
if plugin_class:
return plugin_class()
return None
[docs]
@classmethod
def GetPlugins(cls):
"""Retrieves the registered plugins.
Yields:
tuple[str, type]: name and class of the plugin.
"""
for plugin_name, plugin_class in cls._plugin_classes.items():
yield plugin_name, plugin_class
[docs]
@classmethod
def RegisterPlugin(cls, plugin_class):
"""Registers a plugin class.
The plugin classes are identified based on their lower case name.
Args:
plugin_class (type): class of the plugin.
Raises:
KeyError: if plugin class is already set for the corresponding name.
"""
plugin_name = plugin_class.NAME.lower()
if plugin_name in cls._plugin_classes:
raise KeyError((
'Plugin class already set for name: {0:s}.').format(
plugin_class.NAME))
cls._plugin_classes[plugin_name] = plugin_class
[docs]
@classmethod
def RegisterPlugins(cls, plugin_classes):
"""Registers plugin classes.
Args:
plugin_classes (list[type]): classes of plugins.
Raises:
KeyError: if plugin class is already set for the corresponding name.
"""
for plugin_class in plugin_classes:
cls.RegisterPlugin(plugin_class)
[docs]
@classmethod
def SupportsPlugins(cls):
"""Determines if a parser supports plugins.
Returns:
bool: True if the parser supports plugins.
"""
return cls._plugin_classes is not None
[docs]
class FileEntryParser(BaseParser):
"""The file entry parser interface."""
[docs]
def Parse(self, parser_mediator):
"""Parses a file entry.
Args:
parser_mediator (ParserMediator): mediates interactions between parsers
and other components, such as storage and dfVFS.
Raises:
WrongParser: when the file cannot be parsed.
"""
file_entry = parser_mediator.GetFileEntry()
if not file_entry:
raise errors.WrongParser('Invalid file entry')
parser_mediator.AppendToParserChain(self.NAME)
parser_chain = parser_mediator.GetParserChain()
parser_mediator.SampleStartTiming(parser_chain)
try:
self.ParseFileEntry(parser_mediator, file_entry)
finally:
parser_mediator.SampleStopTiming(parser_chain)
parser_mediator.PopFromParserChain()
[docs]
@abc.abstractmethod
def ParseFileEntry(self, parser_mediator, file_entry):
"""Parses a file entry.
Args:
parser_mediator (ParserMediator): mediates interactions between parsers
and other components, such as storage and dfVFS.
file_entry (dfvfs.FileEntry): a file entry to parse.
Raises:
WrongParser: when the file cannot be parsed.
"""
[docs]
class FileObjectParser(BaseParser):
"""The file-like object parser interface."""
# The initial file offset. Set this value to None if no initial file offset
# seek needs to be performed.
_INITIAL_FILE_OFFSET = 0
# The maximum file size supported by the parser. Set this value to None if no
# file size check needs to be performed.
_MAXIMUM_FILE_SIZE = None
# The minimum file size supported by the parser. Set this value to None if no
# file size check needs to be performed.
_MINIMUM_FILE_SIZE = None
[docs]
def Parse(self, parser_mediator, file_object):
"""Parses a single file-like object.
Args:
parser_mediator (ParserMediator): mediates interactions between parsers
and other components, such as storage and dfVFS.
file_object (dfvfs.FileIO): a file-like object to parse.
Raises:
WrongParser: when the file cannot be parsed.
"""
if not file_object:
raise errors.WrongParser('Invalid file object')
file_size = file_object.get_size()
if file_size == 0:
return
if (self._MINIMUM_FILE_SIZE is not None and
file_size < self._MINIMUM_FILE_SIZE):
raise errors.WrongParser(
'File size: {0:d} too small, minimum: {1:d}.'.format(
file_size, self._MINIMUM_FILE_SIZE))
if (self._MAXIMUM_FILE_SIZE is not None and
file_size > self._MAXIMUM_FILE_SIZE):
raise errors.WrongParser(
'File size: {0:d} too large, maximum: {1:d}.'.format(
file_size, self._MAXIMUM_FILE_SIZE))
if self._INITIAL_FILE_OFFSET is not None:
file_object.seek(self._INITIAL_FILE_OFFSET, os.SEEK_SET)
parser_mediator.AppendToParserChain(self.NAME)
parser_chain = parser_mediator.GetParserChain()
parser_mediator.SampleStartTiming(parser_chain)
try:
self.ParseFileObject(parser_mediator, file_object)
finally:
parser_mediator.SampleStopTiming(parser_chain)
parser_mediator.PopFromParserChain()
[docs]
@abc.abstractmethod
def ParseFileObject(self, parser_mediator, file_object):
"""Parses a file-like object.
Args:
parser_mediator (ParserMediator): mediates interactions between parsers
and other components, such as storage and dfVFS.
file_object (dfvfs.FileIO): a file-like object to parse.
Raises:
WrongParser: when the file cannot be parsed.
"""