Source code for plaso.parsers.pls_recall

# -*- coding: utf-8 -*-
"""Parser for PL/SQL Developer Recall files."""

import os

from dfdatetime import delphi_date_time as dfdatetime_delphi_date_time

from plaso.containers import events
from plaso.lib import dtfabric_helper
from plaso.lib import errors
from plaso.parsers import interface
from plaso.parsers import manager

[docs]class PlsRecallEventData(events.EventData): """PL/SQL Recall event data. Attributes: database_name (str): name of the database. offset (int): offset of the PL/SQL Recall record relative to the start of the file, from which the event data was extracted. query (str): PL/SQL query. sequence_number (int): sequence number. username (str): username used to query. written_time (dfdatetime.DateTimeValues): entry written date and time. """ DATA_TYPE = 'pls_recall:entry' def __init__(self): """Initializes event data.""" super(PlsRecallEventData, self).__init__(data_type=self.DATA_TYPE) self.database_name = None self.offset = None self.query = None self.sequence_number = None self.username = None self.written_time = None
[docs]class PlsRecallParser( interface.FileObjectParser, dtfabric_helper.DtFabricHelper): """Parse PL/SQL Recall files. This parser is based on the Delphi definition of the data type: TRecallRecord = packed record Sequence: Integer; TimeStamp: TDateTime; Username: array[0..30] of Char; Database: array[0..80] of Char; Text: array[0..4000] of Char; end; Delphi TDateTime is a little-endian 64-bit floating-point value without time zone information. """ NAME = 'pls_recall' DATA_FORMAT = 'PL SQL cache file (PL-SQL developer recall file) format' _DEFINITION_FILE = os.path.join( os.path.dirname(__file__), 'pls_recall.yaml') _PLS_KEYWORD = frozenset([ 'begin', 'commit', 'create', 'declare', 'drop', 'end', 'exception', 'execute', 'insert', 'replace', 'rollback', 'select', 'set', 'update']) def __init__(self): """Initializes a PL/SQL Recall file parser.""" super(PlsRecallParser, self).__init__() self._record_map = self._GetDataTypeMap('pls_recall_record') def _ParseTDateTimeValue(self, tdatetime_value): """Parses a TDateTime value. Args: parser_mediator (ParserMediator): mediates interactions between parsers and other components, such as storage and dfVFS. tdatetime_value (float): a TDateTime value. Returns: dfdatetime.DelphiDateTime: date and time value. Raises: ParseError: if a valid date and time value cannot be derived from the TDateTime value. """ # The maximum date supported by TDateTime values is limited to: # 9999-12-31 23:59:59.999 (approximate 2958465 days since epoch). # The minimum date is unknown hence assuming it is limited to: # 0001-01-01 00:00:00.000 (approximate -693593 days since epoch). if tdatetime_value < -693593.0 or tdatetime_value > 2958465.0: raise errors.ParseError('Invalid TDateTime value out bounds') return dfdatetime_delphi_date_time.DelphiDateTime(timestamp=tdatetime_value) def _VerifyFirstRecord(self, parser_mediator, pls_record): """Verifies the first PLS Recall record. Args: parser_mediator (ParserMediator): mediates interactions between parsers and other components, such as storage and dfVFS. pls_record (pls_recall_record): a PLS Recall record to verify. Returns: bool: True if this is a valid PLS Recall record, False otherwise. """ try: date_time = self._ParseTDateTimeValue(pls_record.last_written_time) except errors.ParseError: return False # TDateTime uses milliseconds precision so a timestamp less than # 1 millisecond is likely to be invalid. if (pls_record.last_written_time > -0.0001 and pls_record.last_written_time < 0.0001 and pls_record.last_written_time != 0.0): return False year, _, _ = date_time.GetDate() # Verify that the PLS timestamp is no more than six years into the future. # Six years is an arbitrary time length just to evaluate the timestamp # against some value. There is no guarantee that this will catch everything. # TODO: Add a check for similarly valid value back in time. Maybe if it the # timestamp is before 1980 we are pretty sure it is invalid? # TODO: This is a very flaky assumption, maybe use the # file entry time # range instead? current_year = parser_mediator.GetCurrentYear() if year > current_year + 6: return False # Take the first word from the query field and attempt to match that against # known query keywords. first_word, _, _ = pls_record.query.partition(' ') if first_word.lower() not in self._PLS_KEYWORD: return False return True
[docs] def ParseFileObject(self, parser_mediator, file_object): """Parses a PLSRecall.dat file-like object. Args: parser_mediator (ParserMediator): mediates interactions between parsers and other components, such as storage and dfVFS. file_object (dfvfs.FileIO): a file-like object. Raises: WrongParser: when the file cannot be parsed. """ file_offset = 0 file_size = file_object.get_size() while file_offset < file_size: try: pls_record, record_data_size = self._ReadStructureFromFileObject( file_object, file_offset, self._record_map) except (ValueError, errors.ParseError) as exception: if file_offset == 0: raise errors.WrongParser('Unable to parse first record.') parser_mediator.ProduceExtractionWarning(( 'unable to parse record at offset: 0x{0:08x} with error: ' '{1!s}').format(file_offset, exception)) break if file_offset == 0 and not self._VerifyFirstRecord( parser_mediator, pls_record): raise errors.WrongParser('Verification of first record failed.') event_data = PlsRecallEventData() event_data.database_name = pls_record.database_name event_data.sequence_number = pls_record.sequence_number event_data.offset = file_offset event_data.query = pls_record.query event_data.username = pls_record.username try: event_data.written_time = self._ParseTDateTimeValue( pls_record.last_written_time) except errors.ParseError: parser_mediator.ProduceExtractionWarning(( 'unable to parse TDateTime value of record at offset: 0x{0:08x} ' 'with error: {1!s}').format(file_offset, exception)) parser_mediator.ProduceEventData(event_data) file_offset += record_data_size