Source code for plaso.parsers.dsv_parser

# -*- coding: utf-8 -*-
"""Delimiter separated values (DSV) parser interface."""

from __future__ import unicode_literals

import abc
import csv
import os

from dfvfs.helpers import text_file

from plaso.lib import errors
from plaso.lib import specification
from plaso.parsers import interface


[docs]class DSVParser(interface.FileObjectParser): """Delimiter separated values (DSV) parser interface.""" # A list that contains the names of all the fields in the log file. This # needs to be defined by each DSV parser. COLUMNS = [] # The default delimiter is a comma, but a tab, pipe or other character are # known to be used. Note the delimiter must be a byte string otherwise csv # module can raise a TypeError indicating that "delimiter" must be a single # character string. DELIMITER = ',' # If there is a header before the lines start it can be defined here, and # the number of header lines that need to be skipped before the parsing # starts. NUMBER_OF_HEADER_LINES = 0 # If there is a special quote character used inside the structured text # it can be defined here. QUOTE_CHAR = '"' # The maximum size of a single field in the parser FIELD_SIZE_LIMIT = csv.field_size_limit() # Value that should not appear inside the file, made to test the actual # file to see if it confirms to standards. _MAGIC_TEST_STRING = 'RegnThvotturMeistarans' def __init__(self, encoding=None): """Initializes a delimiter separated values (DSV) parser. Args: encoding (Optional[str]): encoding used in the DSV file, where None indicates the codepage of the parser mediator should be used. """ super(DSVParser, self).__init__() self._encoding = encoding self._end_of_line = '\n' self._maximum_line_length = ( len(self._end_of_line) + len(self.COLUMNS) * (self.FIELD_SIZE_LIMIT + len(self.DELIMITER))) def _CreateDictReader(self, line_reader): """Returns a reader that processes each row and yields dictionaries. csv.DictReader does this job well for single-character delimiters; parsers that need multi-character delimiters need to override this method. Args: line_reader (iter): yields lines from a file-like object. Returns: iter: a reader of dictionaries, as returned by csv.DictReader(). """ return csv.DictReader( line_reader, delimiter=self.DELIMITER, fieldnames=self.COLUMNS, quotechar=self.QUOTE_CHAR, restkey=self._MAGIC_TEST_STRING, restval=self._MAGIC_TEST_STRING) def _CreateLineReader(self, file_object, encoding=None): """Creates an object that reads lines from a text file. The line reader is advanced to the beginning of the DSV content, skipping any header lines. Args: file_object (dfvfs.FileIO): file-like object. encoding (Optional[str]): encoding used in the DSV file, where None indicates the codepage of the parser mediator should be used. Returns: TextFile: an object that implements an iterator over lines in a text file. Raises: UnicodeDecodeError: if the file cannot be read with the specified encoding. """ line_reader = text_file.TextFile( file_object, encoding=encoding, end_of_line=self._end_of_line) # pylint: disable=protected-access maximum_read_buffer_size = line_reader._MAXIMUM_READ_BUFFER_SIZE # Line length is one less than the maximum read buffer size so that we # tell if there's a line that doesn't end at the end before the end of # the file. if self._maximum_line_length > maximum_read_buffer_size: self._maximum_line_length = maximum_read_buffer_size - 1 # If we specifically define a number of lines we should skip, do that here. for _ in range(0, self.NUMBER_OF_HEADER_LINES): line_reader.readline(self._maximum_line_length) return line_reader def _HasExpectedLineLength(self, file_object, encoding=None): """Determines if a file begins with lines of the expected length. As we know the maximum length of valid lines in the DSV file, the presence of lines longer than this indicates that the file will not be parsed successfully, without reading excessive data from a large file. Args: file_object (dfvfs.FileIO): file-like object. encoding (Optional[str]): encoding used in the DSV file, where None indicates the codepage of the parser mediator should be used. Returns: bool: True if the file has lines of the expected length. """ original_file_position = file_object.tell() result = True # Attempt to read a line that is longer than any line that should be in # the file. line_reader = self._CreateLineReader(file_object, encoding=encoding) for _ in range(0, 20): sample_line = line_reader.readline(self._maximum_line_length + 1) if len(sample_line) > self._maximum_line_length: result = False break file_object.seek(original_file_position, os.SEEK_SET) return result
[docs] @classmethod def GetFormatSpecification(cls): """Retrieves the format specification. Returns: FormatSpecification: format specification. """ return specification.FormatSpecification(cls.NAME, text_format=True)
def _CheckForByteOrderMark(self, file_object): """Check if the file contains a byte-order-mark (BOM). Also see: https://en.wikipedia.org/wiki/Byte_order_mark#Byte_order_marks_by_encoding Args: file_object (dfvfs.FileIO): file-like object. Returns: tuple: contains: str: encoding determined based on BOM or None if no BOM was found. int: offset of the text after the BOM of 0 if no BOM was found. """ file_object.seek(0, os.SEEK_SET) file_data = file_object.read(4) # Look the for a match with the longest byte-order-mark first. if file_data[0:4] == b'\x00\x00\xfe\xff': return 'utf-32-be', 4 if file_data[0:4] == b'\xff\xfe\x00\x00': return 'utf-32-le', 4 if file_data[0:3] == b'\xef\xbb\xbf': return 'utf-8', 3 if file_data[0:2] == b'\xfe\xff': return 'utf-16-be', 2 if file_data[0:2] == b'\xff\xfe': return 'utf-16-le', 2 return None, 0
[docs] def ParseFileObject(self, parser_mediator, file_object): """Parses a DSV text file-like object. Args: parser_mediator (ParserMediator): mediates interactions between parsers and other components, such as storage and dfvfs. file_object (dfvfs.FileIO): file-like object. Raises: UnableToParseFile: when the file cannot be parsed. """ encoding, text_offset = self._CheckForByteOrderMark(file_object) if encoding and self._encoding and encoding != self._encoding: display_name = parser_mediator.GetDisplayName() raise errors.UnableToParseFile(( '[{0:s}] Unable to parse DSV file: {1:s} encoding does not match the ' 'one required by the parser.').format(self._encoding, display_name)) if not encoding: # Fallback to UTF-8 as a last resort otherwise the creation of # text_file.TextFile will fail if no encoding is set. encoding = self._encoding or parser_mediator.codepage or 'utf-8' file_object.seek(text_offset, os.SEEK_SET) try: if not self._HasExpectedLineLength(file_object, encoding=encoding): display_name = parser_mediator.GetDisplayName() raise errors.UnableToParseFile(( '[{0:s}] Unable to parse DSV file: {1:s} with error: ' 'unexpected line length.').format(self.NAME, display_name)) except UnicodeDecodeError as exception: display_name = parser_mediator.GetDisplayName() raise errors.UnableToParseFile( '[{0:s}] Unable to parse DSV file: {1:s} with error: {2!s}.'.format( self.NAME, display_name, exception)) try: line_reader = self._CreateLineReader(file_object, encoding=encoding) reader = self._CreateDictReader(line_reader) row_offset = line_reader.tell() row = next(reader) except (StopIteration, UnicodeDecodeError, csv.Error) as exception: display_name = parser_mediator.GetDisplayName() raise errors.UnableToParseFile( '[{0:s}] Unable to parse DSV file: {1:s} with error: {2!s}.'.format( self.NAME, display_name, exception)) number_of_columns = len(self.COLUMNS) number_of_records = len(row) if number_of_records != number_of_columns: display_name = parser_mediator.GetDisplayName() raise errors.UnableToParseFile(( '[{0:s}] Unable to parse DSV file: {1:s}. Wrong number of ' 'records (expected: {2:d}, got: {3:d})').format( self.NAME, display_name, number_of_columns, number_of_records)) for key, value in row.items(): if self._MAGIC_TEST_STRING in (key, value): display_name = parser_mediator.GetDisplayName() raise errors.UnableToParseFile(( '[{0:s}] Unable to parse DSV file: {1:s}. Signature ' 'mismatch.').format(self.NAME, display_name)) if not self.VerifyRow(parser_mediator, row): display_name = parser_mediator.GetDisplayName() raise errors.UnableToParseFile(( '[{0:s}] Unable to parse DSV file: {1:s}. Verification ' 'failed.').format(self.NAME, display_name)) self.ParseRow(parser_mediator, row_offset, row) row_offset = line_reader.tell() for row in reader: if parser_mediator.abort: break self.ParseRow(parser_mediator, row_offset, row) row_offset = line_reader.tell()
[docs] @abc.abstractmethod def ParseRow(self, parser_mediator, row_offset, row): """Parses a line of the log file and produces events. Args: parser_mediator (ParserMediator): mediates interactions between parsers and other components, such as storage and dfvfs. row_offset (int): offset of the row. row (dict[str, str]): fields of a single row, as specified in COLUMNS. """
# pylint: disable=redundant-returns-doc
[docs] @abc.abstractmethod def VerifyRow(self, parser_mediator, row): """Verifies if a line of the file is in the expected format. Args: parser_mediator (ParserMediator): mediates interactions between parsers and other components, such as storage and dfvfs. row (dict[str, str]): fields of a single row, as specified in COLUMNS. Returns: bool: True if this is the correct parser, False otherwise. """