"""SQLite parser plugin for MacOS document revision database files."""
from dfdatetime import posix_time as dfdatetime_posix_time
from plaso.containers import events
from plaso.parsers import sqlite
from plaso.parsers.sqlite_plugins import interface
[docs]
class MacOSDocumentVersionsEventData(events.EventData):
"""MacOS document revision event data.
Attributes:
creation_time (dfdatetime.DateTimeValues): date and time the version
information was created.
last_seen_time (dfdatetime.DateTimeValues): date and time and the original
file was last seen (replicated).
name (str): name of the original file.
path (str): path from the original file.
query (str): SQL query that was used to obtain the event data.
user_sid (str): user identifier that open the file.
version_path (str): path to the version copy of the original file.
"""
DATA_TYPE = "macos:document_versions:file"
[docs]
def __init__(self):
"""Initializes event data."""
super().__init__(data_type=self.DATA_TYPE)
self.creation_time = None
self.last_seen_time = None
self.name = None
self.path = None
self.query = None
self.user_sid = None
self.version_path = None
[docs]
class MacOSDocumentVersionsPlugin(interface.SQLitePlugin):
"""SQLite parser plugin for MacOS document revision database files."""
NAME = "mac_document_versions"
DATA_FORMAT = "MacOS document revisions SQLite database file"
REQUIRED_STRUCTURE = {
"files": frozenset(
["file_name", "file_path", "file_last_seen", "file_storage_id"]
),
"generations": frozenset(
["generation_path", "generation_add_time", "generation_storage_id"]
),
}
# Define the needed queries.
# name: name from the original file.
# path: path from the original file (include the file)
# last_time: last time when the file was replicated.
# version_path: path where the version is stored.
# version_time: the timestamp when the version was created.
QUERIES = [
(
(
"SELECT f.file_name AS name, f.file_path AS path, "
"f.file_last_seen AS last_time, g.generation_path AS version_path, "
"g.generation_add_time AS version_time FROM files f, generations g "
"WHERE f.file_storage_id = g.generation_storage_id;"
),
"DocumentVersionsRow",
)
]
SCHEMAS = [
{
"files": (
"CREATE TABLE files (file_row_id INTEGER PRIMARY KEY ASC, file_name "
"TEXT, file_parent_id INTEGER, file_path TEXT, file_inode INTEGER, "
"file_last_seen INTEGER NOT NULL DEFAULT 0, file_status INTEGER NOT "
"NULL DEFAULT 1, file_storage_id INTEGER NOT NULL)"
),
"generations": (
"CREATE TABLE generations (generation_id INTEGER PRIMARY KEY ASC, "
"generation_storage_id INTEGER NOT NULL, generation_name TEXT NOT "
"NULL, generation_client_id TEXT NOT NULL, generation_path TEXT "
"UNIQUE, generation_options INTEGER NOT NULL DEFAULT 1, "
"generation_status INTEGER NOT NULL DEFAULT 1, generation_add_time "
"INTEGER NOT NULL DEFAULT 0, generation_size INTEGER NOT NULL "
"DEFAULT 0, generation_prunable INTEGER NOT NULL DEFAULT 0)"
),
"storage": (
"CREATE TABLE storage (storage_id INTEGER PRIMARY KEY ASC "
"AUTOINCREMENT, storage_options INTEGER NOT NULL DEFAULT 1, "
"storage_status INTEGER NOT NULL DEFAULT 1)"
),
}
]
# The SQL field path is the relative path from DocumentRevisions.
# For this reason the Path to the program has to be added at the beginning.
ROOT_VERSION_PATH = "/.DocumentRevisions-V100/"
def _GetDateTimeRowValue(self, query_hash, row, value_name):
"""Retrieves a date and time value from the row.
Args:
query_hash (int): hash of the query, that uniquely identifies the query
that produced the row.
row (sqlite3.Row): row.
value_name (str): name of the value.
Returns:
dfdatetime.PosixTime: date and time value or None if not available.
"""
timestamp = self._GetRowValue(query_hash, row, value_name)
if timestamp is None:
return None
return dfdatetime_posix_time.PosixTime(timestamp=timestamp)
[docs]
def DocumentVersionsRow(self, parser_mediator, query, row, **unused_kwargs):
"""Parses a document versions row.
Args:
parser_mediator (ParserMediator): mediates interactions between parsers
and other components, such as storage and dfVFS.
query (str): query that created the row.
row (sqlite3.Row): row.
"""
query_hash = hash(query)
# version_path = "PerUser/UserID/xx/client_id/version_file"
# where PerUser and UserID are a real directories.
version_path = self._GetRowValue(query_hash, row, "version_path")
path = self._GetRowValue(query_hash, row, "path")
paths = version_path.split("/")
if len(paths) < 2 or not paths[1].isdigit():
user_sid = ""
else:
user_sid = paths[1]
version_path = self.ROOT_VERSION_PATH + version_path
path, _, _ = path.rpartition("/")
event_data = MacOSDocumentVersionsEventData()
event_data.creation_time = self._GetDateTimeRowValue(
query_hash, row, "version_time"
)
event_data.last_seen_time = self._GetDateTimeRowValue(
query_hash, row, "last_time"
)
event_data.name = self._GetRowValue(query_hash, row, "name")
event_data.path = path
event_data.query = query
# Note that the user_sid value is expected to be a string.
event_data.user_sid = f"{user_sid!s}"
event_data.version_path = version_path
parser_mediator.ProduceEventData(event_data)
sqlite.SQLiteParser.RegisterPlugin(MacOSDocumentVersionsPlugin)