Source code for plaso.parsers.bencode_parser

# -*- coding: utf-8 -*-
"""Parser for bencoded files."""

import re
import os

import bencode

from plaso.lib import errors
from plaso.parsers import interface
from plaso.parsers import logger
from plaso.parsers import manager


[docs]class BencodeValues(object): """Bencode values.""" def __init__(self, decoded_values): """Initializes bencode values. Args: decoded_values (collections.OrderedDict[bytes|str, object]): decoded values. """ super(BencodeValues, self).__init__() self._decoded_values = decoded_values
[docs] def GetDecodedValue(self, name): """Retrieves a decoded value. Args: name (str): name of the value. Returns: object: decoded value or None if not available. """ value = self._decoded_values.get(name, None) if value is None: # Work-around for issue in bencode 3.0.1 where keys are bytes. name_as_byte_stream = name.encode('utf-8') value = self._decoded_values.get(name_as_byte_stream, None) if isinstance(value, bytes): # Work-around for issue in bencode 3.0.1 where string valuse are bytes. value = value.decode('utf-8') return value
[docs]class BencodeFile(object): """Bencode file. Attributes: decoded_values (collections.OrderedDict[bytes|str, object]]): decoded values. """ def __init__(self): """Initializes a bencode file.""" super(BencodeFile, self).__init__() self._key_names = set() self.decoded_values = None @property def keys(self): """set[str]: names of all the keys.""" return self._key_names
[docs] def Close(self): """Closes the file.""" self.decoded_values = None
[docs] def GetDecodedValue(self, name): """Retrieves a decoded value. Args: name (str): name of the value. Returns: object: decoded value or None if not available. """ bencoded_values = BencodeValues(self.decoded_values) return bencoded_values.GetDecodedValue(name)
[docs] def GetDecodedValues(self): """Retrieves the decoded values. Yields: tuple[str, object]: name and decoded value. """ for key, value in self.decoded_values.items(): if isinstance(key, bytes): # Work-around for issue in bencode 3.0.1 where keys are bytes. key = key.decode('utf-8') yield key, value
[docs] def Open(self, file_object): """Opens a bencode file. Args: file_object (dfvfs.FileIO): file-like object. Raises: IOError: if the file-like object cannot be read. OSError: if the file-like object cannot be read. ValueError: if the file-like object is missing. """ if not file_object: raise ValueError('Missing file object.') file_object.seek(0, os.SEEK_SET) try: self.decoded_values = bencode.bread(file_object) except bencode.BencodeDecodeError as exception: raise IOError(exception) self._key_names = set() for key in self.decoded_values.keys(): if isinstance(key, bytes): # Work-around for issue in bencode 3.0.1 where keys are bytes. key = key.decode('utf-8') self._key_names.add(key)
[docs]class BencodeParser(interface.FileObjectParser): """Parser for bencoded files. The Plaso engine calls parsers by their Parse() method. The Parse() function deserializes bencoded files using the BitTorrent-bencode library and calls plugins (BencodePlugin) registered through the interface by their Process() to produce event objects. Plugins are how this parser understands the content inside a bencoded file, each plugin holds logic specific to a particular bencoded file. See the bencode_plugins / directory for examples of how bencode plugins are implemented. """ # Regex match for a bencode dictionary followed by a field size. _BENCODE_RE = re.compile(b'd[0-9]') NAME = 'bencode' DATA_FORMAT = 'Bencoded file' _plugin_classes = {}
[docs] def ParseFileObject(self, parser_mediator, file_object): """Parses a bencoded file-like object. Args: parser_mediator (ParserMediator): mediates interactions between parsers and other components, such as storage and dfvfs. file_object (dfvfs.FileIO): a file-like object. Raises: UnableToParseFile: when the file cannot be parsed. """ header_data = file_object.read(2) if not self._BENCODE_RE.match(header_data): raise errors.UnableToParseFile('Not a valid Bencoded file.') bencode_file = BencodeFile() try: bencode_file.Open(file_object) except IOError as exception: diplay_name = parser_mediator.GetDisplayName() raise errors.UnableToParseFile( '[{0:s}] unable to parse file: {1:s} with error: {2!s}'.format( self.NAME, diplay_name, exception)) if not bencode_file.decoded_values: parser_mediator.ProduceExtractionWarning('missing decoded Bencode values') return try: for plugin in self._plugins: if parser_mediator.abort: break if not plugin.CheckRequiredKeys(bencode_file): continue logger.debug('Bencode plugin used: {0:s}'.format(plugin.NAME)) try: plugin.UpdateChainAndProcess( parser_mediator, bencode_file=bencode_file) except Exception as exception: # pylint: disable=broad-except parser_mediator.ProduceExtractionWarning(( 'plugin: {0:s} unable to parse Bencode file with error: ' '{1!s}').format(plugin.NAME, exception)) finally: bencode_file.Close()
manager.ParsersManager.RegisterParser(BencodeParser)