Source code for plaso.parsers.text_plugins.interface

# -*- coding: utf-8 -*-
"""This file contains the interface for text plugins."""

import abc
import codecs
import os

import pyparsing

from plaso.lib import errors
from plaso.parsers import logger
from plaso.parsers import plugins
from plaso.parsers import text_parser


[docs] class TextPlugin(plugins.BasePlugin): """The interface for text plugins.""" NAME = 'text_plugin' DATA_FORMAT = 'Text file' ENCODING = None # List of tuples of pyparsing expression per unique identifier that define # the supported grammar. _LINE_STRUCTURES = [] # PyParsing grammer used to verify the text-log file format. Note that since # this is called often it should optimize on failing fast. VERIFICATION_GRAMMAR = None VERIFICATION_LITERALS = None # The maximum number of consecutive lines that do not match the grammar before # aborting parsing. _MAXIMUM_CONSECUTIVE_LINE_FAILURES = 20
[docs] def __init__(self): """Initializes a parser.""" super(TextPlugin, self).__init__() self._current_offset = 0 self._parser_mediator = None self._pyparsing_grammar = None codecs.register_error('text_parser_handler', self._EncodingErrorHandler) self._SetLineStructures(self._LINE_STRUCTURES)
def _EncodingErrorHandler(self, exception): """Encoding error handler. Args: exception [UnicodeDecodeError]: exception. Returns: tuple[str, int]: replacement string and a position where encoding should continue. Raises: TypeError: if exception is not of type UnicodeDecodeError. """ if not isinstance(exception, UnicodeDecodeError): raise TypeError('Unsupported exception type.') if self._parser_mediator: self._parser_mediator.ProduceExtractionWarning( 'error decoding 0x{0:02x} at offset: {1:d}'.format( exception.object[exception.start], self._current_offset + exception.start)) escaped = '\\x{0:2x}'.format(exception.object[exception.start]) return escaped, exception.start + 1 def _GetDecimalValueFromStructure(self, structure, name): """Retrieves a decimal integer value from a Pyparsing structure. Args: structure (pyparsing.ParseResults): tokens from a parsed log line. name (str): name of the token. Returns: int: decimal integer value or None if not available or invalid. """ integer_value = self._GetValueFromStructure(structure, name) try: return int(integer_value, 10) except (TypeError, ValueError): pass return None def _GetStringValueFromStructure(self, structure, name): """Retrieves a string value from a Pyparsing structure. Args: structure (pyparsing.ParseResults): tokens from a parsed log line. name (str): name of the token. Returns: str: string value or None if not available or empty. """ string_value = self._GetValueFromStructure( structure, name, default_value='') return string_value.strip() or None def _GetValueFromStructure(self, structure, name, default_value=None): """Retrieves a token value from a Pyparsing structure. This method ensures the token value is set to the default value when the token is not present in the structure. Instead of returning the Pyparsing default value of an empty byte stream (b''). Args: structure (pyparsing.ParseResults): tokens from a parsed log line. name (str): name of the token. default_value (Optional[object]): default value. Returns: object: value in the token or default value if the token is not available in the structure. """ value = structure.get(name, default_value) if isinstance(value, pyparsing.ParseResults) and not value: # Ensure the return value is not an empty pyparsing.ParseResults otherwise # serialization will fail. return None return value def _ParseFinalize(self, parser_mediator): # pylint: disable=unused-argument """Finalizes parsing. Args: parser_mediator (ParserMediator): mediates interactions between parsers and other components, such as storage and dfVFS. """ return def _ParseHeader(self, parser_mediator, text_reader): # pylint: disable=unused-argument """Parses a text-log file header. Args: parser_mediator (ParserMediator): mediates interactions between parsers and other components, such as storage and dfVFS. text_reader (EncodedTextReader): text reader. Raises: ParseError: when the header cannot be parsed. """ return def _ParseLines(self, parser_mediator, text_reader): """Parses lines of text using a pyparsing definition. Args: parser_mediator (ParserMediator): mediates interactions between parsers and other components, such as storage and dfVFS. text_reader (EncodedTextReader): text reader. """ consecutive_line_failures = 0 try: text_reader.ReadLines() self._current_offset = text_reader.get_offset() except UnicodeDecodeError as exception: parser_mediator.ProduceExtractionWarning(( 'unable to read and decode log line at offset {0:d} with error: ' '{1!s}').format(self._current_offset, exception)) return while text_reader.lines: if parser_mediator.abort: break if consecutive_line_failures > self._MAXIMUM_CONSECUTIVE_LINE_FAILURES: parser_mediator.ProduceExtractionWarning( 'more than {0:d} consecutive failures to parse lines.'.format( self._MAXIMUM_CONSECUTIVE_LINE_FAILURES)) break try: key, structure, _, end = self._ParseString(text_reader.lines) except errors.ParseError as exception: line = text_reader.ReadLine() # Pyparsing does not appear to detect single empty lines hence that # we ignore them here. if not line: continue logger.debug('unable to parse string with error: {0!s}'.format( exception)) if len(line) > 80: line = '{0:s}...'.format(line[:77]) parser_mediator.ProduceExtractionWarning( 'unable to parse log line: {0:d} "{1:s}"'.format( text_reader.line_number, line)) consecutive_line_failures += 1 continue consecutive_line_failures = 0 try: # TODO: use a callback per key. self._ParseRecord(parser_mediator, key, structure) except errors.ParseError as exception: parser_mediator.ProduceExtractionWarning( 'unable to parse record: {0:s} with error: {1!s}'.format( key, exception)) text_reader.SkipAhead(end) try: text_reader.ReadLines() self._current_offset = text_reader.get_offset() except UnicodeDecodeError as exception: parser_mediator.ProduceExtractionWarning(( 'unable to read and decode log line at offset {0:d} with error: ' '{1!s}').format(self._current_offset, exception)) break @abc.abstractmethod def _ParseRecord(self, parser_mediator, key, structure): """Parses a pyparsing structure. Args: parser_mediator (ParserMediator): mediates interactions between parsers and other components, such as storage and dfVFS. key (str): name of the parsed structure. structure (pyparsing.ParseResults): tokens from a parsed log line. Raises: ParseError: when the structure type is unknown. """ def _ParseString(self, string): """Parses a string for known grammar. Args: string (str): string. Returns: tuple[str, pyparsing.ParseResults, int, int]: key, parsed tokens, start and end offset. Raises: ParseError: when the string cannot be parsed by the grammar. """ try: structure_generator = self._pyparsing_grammar.scan_string( string, max_matches=1) structure, start, end = next(structure_generator) except StopIteration: structure = None except pyparsing.ParseException as exception: raise errors.ParseError(exception) if not structure: raise errors.ParseError('No match found.') if start > 0 and '\n' in string[:start + 1]: raise errors.ParseError('Found a line preceeding match.') # Unwrap the line structure and retrieve its name (key). keys = list(structure.keys()) if len(keys) != 1: raise errors.ParseError('Missing key of line structructure.') return keys[0], structure[0], start, end def _SetLineStructures(self, line_structures): """Sets the line structures. Args: line_structures ([(str, pyparsing.ParserElement)]): tuples of pyparsing expressions to parse a line and their names. """ self._pyparsing_grammar = None for key, expression in line_structures: # Wrap the line structures in groups with a result name to build a single # pyparsing grammar. if not isinstance(expression, pyparsing.Group): expression = pyparsing.Group(expression).set_results_name(key) if not self._pyparsing_grammar: self._pyparsing_grammar = expression else: self._pyparsing_grammar ^= expression # Override Pyparsing's default replacement of tabs with spaces to # SkipAhead() the correct number of bytes after a match. self._pyparsing_grammar.parse_with_tabs() # Override Pyparsing's whitespace characters to spaces only. self._pyparsing_grammar.set_default_whitespace_chars(' ') def _VerifyString(self, string): """Checks a string for known grammar. Args: string (str): string. Returns: pyparsing.ParseResults: parsed tokens. Raises: ParseError: when the string cannot be parsed by the grammar. """ try: structure = self.VERIFICATION_GRAMMAR.parse_string(string) except pyparsing.ParseException as exception: raise errors.ParseError(exception) if not structure: raise errors.ParseError('No match found.') return structure
[docs] @abc.abstractmethod def CheckRequiredFormat(self, parser_mediator, text_reader): """Check if the log record has the minimal structure required by the plugin. Args: parser_mediator (ParserMediator): mediates interactions between parsers and other components, such as storage and dfVFS. text_reader (EncodedTextReader): text reader. Returns: bool: True if this is the correct plugin, False otherwise. """
# pylint: disable=arguments-differ
[docs] def Process(self, parser_mediator, file_object=None, **kwargs): """Extracts events from a text log file. Args: parser_mediator (ParserMediator): mediates interactions between parsers and other components, such as storage and dfVFS. file_object (Optional[dfvfs.FileIO]): a file-like object. """ # This will raise if unhandled keyword arguments are passed. super(TextPlugin, self).Process(parser_mediator) # Keep a reference to the parser mediator for the encoding error handler. self._parser_mediator = parser_mediator try: # Set the offset to the beginning of the file. file_object.seek(0, os.SEEK_SET) self._current_offset = 0 encoding = self.ENCODING if not encoding: encoding = parser_mediator.GetCodePage() text_reader = text_parser.EncodedTextReader( file_object, encoding=encoding, encoding_errors='text_parser_handler') try: text_reader.ReadLines() self._current_offset = text_reader.get_offset() except UnicodeDecodeError as exception: parser_mediator.ProduceExtractionWarning(( 'unable to read and decode log line at offset {0:d} with error: ' '{1!s}').format(self._current_offset, exception)) return try: self._ParseHeader(parser_mediator, text_reader) except UnicodeDecodeError as exception: parser_mediator.ProduceExtractionWarning(( 'unable to parser header with error: {0!s}').format(exception)) return self._ParseLines(parser_mediator, text_reader) self._ParseFinalize(parser_mediator) finally: self._parser_mediator = None
[docs] class TextPluginWithLineContinuation(TextPlugin): """The interface for text plugins with line continuation.""" # pylint: disable=abstract-method
[docs] def __init__(self): """Initializes a text parser plugin.""" super(TextPluginWithLineContinuation, self).__init__() self._last_string_match = None
def _ParseString(self, string): """Parses a string for known grammar. Args: string (str): string. Returns: tuple[str, pyparsing.ParseResults, int, int]: key, parsed tokens, start and end offset. Raises: ParseError: when the string cannot be parsed by the grammar. """ if self._last_string_match: last_string_match = self._last_string_match self._last_string_match = None return last_string_match try: structure_generator = self._pyparsing_grammar.scan_string( string, max_matches=1) structure, start, end = next(structure_generator) except StopIteration: structure = None except pyparsing.ParseException as exception: raise errors.ParseError(exception) if not structure: return '_line_continuation', string, 0, len(string) # Unwrap the line structure and retrieve its name (key). keys = list(structure.keys()) if len(keys) != 1: raise errors.ParseError('Missing key of line structructure.') if start == 0: return keys[0], structure[0], start, end self._last_string_match = (keys[0], structure[0], 0, end - start) return '_line_continuation', string[:start], 0, start