Source code for plaso.parsers.sqlite_plugins.macos_document_versions

# -*- coding: utf-8 -*-
"""SQLite parser plugin for MacOS document revision database files."""

from dfdatetime import posix_time as dfdatetime_posix_time

from plaso.containers import events
from plaso.parsers import sqlite
from plaso.parsers.sqlite_plugins import interface


[docs] class MacOSDocumentVersionsEventData(events.EventData): """MacOS document revision event data. Attributes: creation_time (dfdatetime.DateTimeValues): date and time the version information was created. last_seen_time (dfdatetime.DateTimeValues): date and time and the original file was last seen (replicated). name (str): name of the original file. path (str): path from the original file. query (str): SQL query that was used to obtain the event data. user_sid (str): user identifier that open the file. version_path (str): path to the version copy of the original file. """ DATA_TYPE = 'macos:document_versions:file'
[docs] def __init__(self): """Initializes event data.""" super(MacOSDocumentVersionsEventData, self).__init__( data_type=self.DATA_TYPE) self.creation_time = None self.last_seen_time = None self.name = None self.path = None self.query = None self.user_sid = None self.version_path = None
[docs] class MacOSDocumentVersionsPlugin(interface.SQLitePlugin): """SQLite parser plugin for MacOS document revision database files.""" NAME = 'mac_document_versions' DATA_FORMAT = 'MacOS document revisions SQLite database file' REQUIRED_STRUCTURE = { 'files': frozenset([ 'file_name', 'file_path', 'file_last_seen', 'file_storage_id']), 'generations': frozenset([ 'generation_path', 'generation_add_time', 'generation_storage_id'])} # Define the needed queries. # name: name from the original file. # path: path from the original file (include the file) # last_time: last time when the file was replicated. # version_path: path where the version is stored. # version_time: the timestamp when the version was created. QUERIES = [ (('SELECT f.file_name AS name, f.file_path AS path, ' 'f.file_last_seen AS last_time, g.generation_path AS version_path, ' 'g.generation_add_time AS version_time FROM files f, generations g ' 'WHERE f.file_storage_id = g.generation_storage_id;'), 'DocumentVersionsRow')] SCHEMAS = [{ 'files': ( 'CREATE TABLE files (file_row_id INTEGER PRIMARY KEY ASC, file_name ' 'TEXT, file_parent_id INTEGER, file_path TEXT, file_inode INTEGER, ' 'file_last_seen INTEGER NOT NULL DEFAULT 0, file_status INTEGER NOT ' 'NULL DEFAULT 1, file_storage_id INTEGER NOT NULL)'), 'generations': ( 'CREATE TABLE generations (generation_id INTEGER PRIMARY KEY ASC, ' 'generation_storage_id INTEGER NOT NULL, generation_name TEXT NOT ' 'NULL, generation_client_id TEXT NOT NULL, generation_path TEXT ' 'UNIQUE, generation_options INTEGER NOT NULL DEFAULT 1, ' 'generation_status INTEGER NOT NULL DEFAULT 1, generation_add_time ' 'INTEGER NOT NULL DEFAULT 0, generation_size INTEGER NOT NULL ' 'DEFAULT 0, generation_prunable INTEGER NOT NULL DEFAULT 0)'), 'storage': ( 'CREATE TABLE storage (storage_id INTEGER PRIMARY KEY ASC ' 'AUTOINCREMENT, storage_options INTEGER NOT NULL DEFAULT 1, ' 'storage_status INTEGER NOT NULL DEFAULT 1)')}] # The SQL field path is the relative path from DocumentRevisions. # For this reason the Path to the program has to be added at the beginning. ROOT_VERSION_PATH = '/.DocumentRevisions-V100/' def _GetDateTimeRowValue(self, query_hash, row, value_name): """Retrieves a date and time value from the row. Args: query_hash (int): hash of the query, that uniquely identifies the query that produced the row. row (sqlite3.Row): row. value_name (str): name of the value. Returns: dfdatetime.PosixTime: date and time value or None if not available. """ timestamp = self._GetRowValue(query_hash, row, value_name) if timestamp is None: return None return dfdatetime_posix_time.PosixTime(timestamp=timestamp)
[docs] def DocumentVersionsRow( self, parser_mediator, query, row, **unused_kwargs): """Parses a document versions row. Args: parser_mediator (ParserMediator): mediates interactions between parsers and other components, such as storage and dfVFS. query (str): query that created the row. row (sqlite3.Row): row. """ query_hash = hash(query) # version_path = "PerUser/UserID/xx/client_id/version_file" # where PerUser and UserID are a real directories. version_path = self._GetRowValue(query_hash, row, 'version_path') path = self._GetRowValue(query_hash, row, 'path') paths = version_path.split('/') if len(paths) < 2 or not paths[1].isdigit(): user_sid = '' else: user_sid = paths[1] version_path = self.ROOT_VERSION_PATH + version_path path, _, _ = path.rpartition('/') event_data = MacOSDocumentVersionsEventData() event_data.creation_time = self._GetDateTimeRowValue( query_hash, row, 'version_time') event_data.last_seen_time = self._GetDateTimeRowValue( query_hash, row, 'last_time') event_data.name = self._GetRowValue(query_hash, row, 'name') event_data.path = path event_data.query = query # Note that the user_sid value is expected to be a string. event_data.user_sid = '{0!s}'.format(user_sid) event_data.version_path = version_path parser_mediator.ProduceEventData(event_data)
sqlite.SQLiteParser.RegisterPlugin(MacOSDocumentVersionsPlugin)