Source code for plaso.parsers.text_parser

# -*- coding: utf-8 -*-
"""Text log parser."""

import codecs
import io
import os

import pysigscan

from plaso.lib import errors
from plaso.parsers import interface
from plaso.parsers import logger
from plaso.parsers import manager


[docs] class EncodedTextReader(object): """Encoded text reader. Attributes: line_number (int): current line number. lines (str): lines of text. lines_size (int): size of the lines of text. """ BUFFER_SIZE = 65536 _READ_BUFFER_SIZE = 16 * BUFFER_SIZE
[docs] def __init__( self, file_object, encoding='utf-8', encoding_errors='strict'): """Initializes the encoded text reader object. Args: file_object (FileIO): a file-like object to read from. encoding (Optional[str]): text encoding. encoding_errors (Optional[str]): text encoding errors handler. """ stream_reader_class = codecs.getreader(encoding) super(EncodedTextReader, self).__init__() self._file_object = file_object self._stream_reader = stream_reader_class( file_object, errors=encoding_errors) self.line_number = 0 self.lines = '' self.lines_size = 0
[docs] def ReadLine(self): """Reads a line. Returns: str: line read from the lines buffer. """ if not self.lines: self.ReadLines() line, _, self.lines = self.lines.partition('\n') self.lines_size += len(line) + 1 self.line_number += 1 return line
[docs] def ReadLines(self): """Reads lines into the lines buffer.""" if self.lines_size < self.BUFFER_SIZE: current_offset = self._file_object.tell() # Consequative reads, decodes and joins are expensive hence we read # a larger buffer at once. decoded_data = self._stream_reader.read(size=self._READ_BUFFER_SIZE) if decoded_data: # Remove a byte-order mark at the start of the file. if current_offset == 0 and decoded_data[0] == '\ufeff': decoded_data = decoded_data[1:] # Strip carriage returns from the text. decoded_data = '\n'.join([ line.rstrip('\r') for line in decoded_data.split('\n')]) self.lines = ''.join([self.lines, decoded_data]) self.lines_size += len(decoded_data)
[docs] def SkipAhead(self, number_of_characters): """Skips ahead a number of characters. Args: number_of_characters (int): number of characters. """ while number_of_characters >= self.lines_size: number_of_characters -= self.lines_size self.lines = '' self.lines_size = 0 self.ReadLines() if self.lines_size == 0: return self.line_number += self.lines[:number_of_characters].count('\n') self.lines = self.lines[number_of_characters:] self.lines_size -= number_of_characters
# Note: that the following functions do not follow the style guide # because they are part of the file-like object interface. # pylint: disable=invalid-name
[docs] def get_offset(self): """Retrieves the current offset into the file-like object. Returns: int: current offset into the file-like object. """ return self._file_object.tell()
[docs] class TextLogParser(interface.FileObjectParser): """Text-based log file parser.""" NAME = 'text' DATA_FORMAT = 'text-based log file' _NON_TEXT_CHARACTERS = frozenset([ '\x00', '\x01', '\x02', '\x03', '\x04', '\x05', '\x06', '\x0b', '\x0e', '\x0f', '\x10', '\x11', '\x12', '\x13', '\x14', '\x15', '\x16', '\x17', '\x18', '\x19', '\x1a', '\x1c', '\x1d', '\x1e', '\x1f', '\x7f']) _plugin_classes = {}
[docs] def __init__(self): """Initializes a text-based log parser.""" super(TextLogParser, self).__init__() self._plugins_per_encoding = {} self._format_scanner = None self._non_sigscan_plugin_names = None self._plugin_name_per_format_identifier = {}
def _ContainsBinary(self, text): """Determines if the text contains binary (non-text) characters. Args: text (str): text. Returns: bool: True if the text contains binary (non-text) characters. """ return bool(self._NON_TEXT_CHARACTERS.intersection(set(text))) def _CreateFormatScanner(self, parser_mediator): """Creates a signature scanner for required format check. Args: parser_mediator (ParserMediator): mediates interactions between parsers and other components, such as storage and dfVFS. """ self._non_sigscan_plugin_names = set() self._plugin_name_per_format_identifier = {} scanner_object = pysigscan.scanner() scanner_object.set_scan_buffer_size(65536) for plugin_name, plugin in self._plugins_per_name.items(): if not plugin.VERIFICATION_LITERALS: self._non_sigscan_plugin_names.add(plugin_name) else: encoding = plugin.ENCODING if not encoding: encoding = parser_mediator.GetCodePage() for index, literal in enumerate(plugin.VERIFICATION_LITERALS): identifier = '{0:s}{1:d}'.format(plugin_name, index) encoded_literal = literal.encode(encoding) scanner_object.add_signature( identifier, 0, encoded_literal, pysigscan.signature_flags.NO_OFFSET) self._plugin_name_per_format_identifier[identifier] = plugin_name if self._plugin_name_per_format_identifier: self._format_scanner = scanner_object
[docs] def EnablePlugins(self, plugin_includes): """Enables parser plugins. Args: plugin_includes (set[str]): names of the plugins to enable, where set(['*']) represents all plugins. Note the default plugin, if it exists, is always enabled and cannot be disabled. """ self._plugins_per_name = {} self._plugins_per_encoding = {} if not self._plugin_classes: return for plugin_name, plugin_class in self._plugin_classes.items(): if plugin_name == self._default_plugin_name: self._default_plugin = plugin_class() continue if (plugin_includes != self.ALL_PLUGINS and plugin_name not in plugin_includes): continue plugin_object = plugin_class() self._plugins_per_name[plugin_name] = plugin_object encoding = plugin_class.ENCODING or 'default' if encoding not in self._plugins_per_encoding: self._plugins_per_encoding[encoding] = [] self._plugins_per_encoding[encoding].append(plugin_object)
[docs] def ParseFileObject(self, parser_mediator, file_object): """Parses a text log file-like object. Args: parser_mediator (ParserMediator): mediates interactions between parsers and other components, such as storage and dfVFS. file_object (dfvfs.FileIO): file-like object. Raises: WrongParser: when the file cannot be parsed. """ if not self._format_scanner and not self._non_sigscan_plugin_names: self._CreateFormatScanner(parser_mediator) file_object.seek(0, os.SEEK_SET) # Cache the first 64k of encoded data so it does not need to be read for # each encoding. encoded_data_buffer = file_object.read(EncodedTextReader.BUFFER_SIZE) encoded_data_file_object = io.BytesIO(encoded_data_buffer) plugins_with_matching_literals = set() if self._format_scanner: parser_mediator.SampleFormatCheckStartTiming('text_format_scanner') try: scan_state = pysigscan.scan_state() self._format_scanner.scan_file_object( scan_state, encoded_data_file_object) for scan_result in iter(scan_state.scan_results): plugin_name = self._plugin_name_per_format_identifier.get( scan_result.identifier, None) plugins_with_matching_literals.add(plugin_name) finally: parser_mediator.SampleFormatCheckStopTiming('text_format_scanner') matching_plugin = False for encoding, plugins in self._plugins_per_encoding.items(): if parser_mediator.abort: break if encoding == 'default': encoding = parser_mediator.GetCodePage() text_reader = None for plugin in plugins: if parser_mediator.abort: break profiling_name = '/'.join([self.NAME, plugin.NAME]) parser_mediator.SampleFormatCheckStartTiming(profiling_name) try: logger.debug( 'Checking required format of: {0:s} in encoding: {1:s}'.format( plugin.NAME, encoding)) result = False if (plugin.NAME in plugins_with_matching_literals or plugin.NAME in self._non_sigscan_plugin_names): if not text_reader: encoded_data_file_object.seek(0, os.SEEK_SET) text_reader = EncodedTextReader( encoded_data_file_object, encoding=encoding) text_reader.ReadLines() # TODO: check if this works with xchatscrollback log. if self._ContainsBinary(text_reader.lines): logger.debug('Detected binary format') continue result = plugin.CheckRequiredFormat(parser_mediator, text_reader) except UnicodeDecodeError: logger.debug( 'Unable to read text-based log file with encoding: {0:s}'.format( encoding)) result = False finally: parser_mediator.SampleFormatCheckStopTiming(profiling_name) if result: matching_plugin = True parser_mediator.SampleStartTiming(profiling_name) try: plugin.UpdateChainAndProcess( parser_mediator, file_object=file_object) except Exception as exception: # pylint: disable=broad-except parser_mediator.ProduceExtractionWarning(( 'plugin: {0:s} unable to parse text file with error: ' '{1!s}').format(plugin.NAME, exception)) continue finally: parser_mediator.SampleStopTiming(profiling_name) if hasattr(plugin, 'GetDateLessLogHelper'): date_less_log_helper = plugin.GetDateLessLogHelper() parser_mediator.AddDateLessLogHelper(date_less_log_helper) break if matching_plugin: break if not matching_plugin: raise errors.WrongParser('No matching text-based log plugin found.')
manager.ParsersManager.RegisterParser(TextLogParser)