Source code for plaso.parsers.sqlite_plugins.macos_document_versions

"""SQLite parser plugin for MacOS document revision database files."""

from dfdatetime import posix_time as dfdatetime_posix_time

from plaso.containers import events
from plaso.parsers import sqlite
from plaso.parsers.sqlite_plugins import interface


[docs] class MacOSDocumentVersionsEventData(events.EventData): """MacOS document revision event data. Attributes: creation_time (dfdatetime.DateTimeValues): date and time the version information was created. last_seen_time (dfdatetime.DateTimeValues): date and time and the original file was last seen (replicated). name (str): name of the original file. path (str): path from the original file. query (str): SQL query that was used to obtain the event data. user_sid (str): user identifier that open the file. version_path (str): path to the version copy of the original file. """ DATA_TYPE = "macos:document_versions:file"
[docs] def __init__(self): """Initializes event data.""" super().__init__(data_type=self.DATA_TYPE) self.creation_time = None self.last_seen_time = None self.name = None self.path = None self.query = None self.user_sid = None self.version_path = None
[docs] class MacOSDocumentVersionsPlugin(interface.SQLitePlugin): """SQLite parser plugin for MacOS document revision database files.""" NAME = "mac_document_versions" DATA_FORMAT = "MacOS document revisions SQLite database file" REQUIRED_STRUCTURE = { "files": frozenset( ["file_name", "file_path", "file_last_seen", "file_storage_id"] ), "generations": frozenset( ["generation_path", "generation_add_time", "generation_storage_id"] ), } # Define the needed queries. # name: name from the original file. # path: path from the original file (include the file) # last_time: last time when the file was replicated. # version_path: path where the version is stored. # version_time: the timestamp when the version was created. QUERIES = [ ( ( "SELECT f.file_name AS name, f.file_path AS path, " "f.file_last_seen AS last_time, g.generation_path AS version_path, " "g.generation_add_time AS version_time FROM files f, generations g " "WHERE f.file_storage_id = g.generation_storage_id;" ), "DocumentVersionsRow", ) ] SCHEMAS = [ { "files": ( "CREATE TABLE files (file_row_id INTEGER PRIMARY KEY ASC, file_name " "TEXT, file_parent_id INTEGER, file_path TEXT, file_inode INTEGER, " "file_last_seen INTEGER NOT NULL DEFAULT 0, file_status INTEGER NOT " "NULL DEFAULT 1, file_storage_id INTEGER NOT NULL)" ), "generations": ( "CREATE TABLE generations (generation_id INTEGER PRIMARY KEY ASC, " "generation_storage_id INTEGER NOT NULL, generation_name TEXT NOT " "NULL, generation_client_id TEXT NOT NULL, generation_path TEXT " "UNIQUE, generation_options INTEGER NOT NULL DEFAULT 1, " "generation_status INTEGER NOT NULL DEFAULT 1, generation_add_time " "INTEGER NOT NULL DEFAULT 0, generation_size INTEGER NOT NULL " "DEFAULT 0, generation_prunable INTEGER NOT NULL DEFAULT 0)" ), "storage": ( "CREATE TABLE storage (storage_id INTEGER PRIMARY KEY ASC " "AUTOINCREMENT, storage_options INTEGER NOT NULL DEFAULT 1, " "storage_status INTEGER NOT NULL DEFAULT 1)" ), } ] # The SQL field path is the relative path from DocumentRevisions. # For this reason the Path to the program has to be added at the beginning. ROOT_VERSION_PATH = "/.DocumentRevisions-V100/" def _GetDateTimeRowValue(self, query_hash, row, value_name): """Retrieves a date and time value from the row. Args: query_hash (int): hash of the query, that uniquely identifies the query that produced the row. row (sqlite3.Row): row. value_name (str): name of the value. Returns: dfdatetime.PosixTime: date and time value or None if not available. """ timestamp = self._GetRowValue(query_hash, row, value_name) if timestamp is None: return None return dfdatetime_posix_time.PosixTime(timestamp=timestamp)
[docs] def DocumentVersionsRow(self, parser_mediator, query, row, **unused_kwargs): """Parses a document versions row. Args: parser_mediator (ParserMediator): mediates interactions between parsers and other components, such as storage and dfVFS. query (str): query that created the row. row (sqlite3.Row): row. """ query_hash = hash(query) # version_path = "PerUser/UserID/xx/client_id/version_file" # where PerUser and UserID are a real directories. version_path = self._GetRowValue(query_hash, row, "version_path") path = self._GetRowValue(query_hash, row, "path") paths = version_path.split("/") if len(paths) < 2 or not paths[1].isdigit(): user_sid = "" else: user_sid = paths[1] version_path = self.ROOT_VERSION_PATH + version_path path, _, _ = path.rpartition("/") event_data = MacOSDocumentVersionsEventData() event_data.creation_time = self._GetDateTimeRowValue( query_hash, row, "version_time" ) event_data.last_seen_time = self._GetDateTimeRowValue( query_hash, row, "last_time" ) event_data.name = self._GetRowValue(query_hash, row, "name") event_data.path = path event_data.query = query # Note that the user_sid value is expected to be a string. event_data.user_sid = f"{user_sid!s}" event_data.version_path = version_path parser_mediator.ProduceEventData(event_data)
sqlite.SQLiteParser.RegisterPlugin(MacOSDocumentVersionsPlugin)