Source code for plaso.parsers.sqlite_plugins.edge_load_statistics

"""Parser for Microsoft Edge load statistics database."""

from dfdatetime import webkit_time as dfdatetime_webkit_time

from plaso.containers import events
from plaso.parsers import sqlite
from plaso.parsers.sqlite_plugins import interface


[docs] class EdgeLoadStatisticsResourceEventData(events.EventData): """Microsoft Edge load statistics resource event data. Attributes: last_update: Last update time of resource, cached or not. query (str): query that created the event data. resource_hostname: External domain of the resource that was loaded resource_type: Integer descriptor of resource type top_level_hostname: Source domain that initiated resource load """ DATA_TYPE = "edge:resources:load_statistics"
[docs] def __init__(self): """Initializes event data.""" super().__init__(data_type=self.DATA_TYPE) self.last_update = None self.query = None self.resource_hostname = None self.resource_type = None self.top_level_hostname = None
[docs] class EdgeLoadStatisticsPlugin(interface.SQLitePlugin): """SQLite parser plugin for Microsoft Edge load statistics database.""" NAME = "edge_load_statistics" DESCRIPTION = "Parser for Microsoft Edge load_statistics.db" QUERIES = [ ( "SELECT top_level_hostname, resource_hostname, resource_type, " "last_update FROM load_statistics", "ParseResourceRow", ) ] REQUIRED_STRUCTURE = { "load_statistics": frozenset( [ "top_level_hostname", "resource_hostname", "resource_url_hash", "resource_type", "last_update", ] ), "meta": frozenset(["key", "value"]), "redirect_statistics": frozenset( [ "source_hostname", "destination_hostname", "is_top_level_document", "last_update", ] ), } SCHEMAS = [ { "load_statistics": ( "CREATE TABLE load_statistics(top_level_hostname TEXT," "resource_hostname TEXT, resource_url_hash TEXT, resource_type" "INTEGER, last_update INTEGER NOT NULL," "UNIQUE(top_level_hostname,resource_url_hash))" ), "meta": ( "CREATE TABLE meta(key LONGVARCHAR NOT NULL UNIQUE PRIMARY KEY," "value LONGVARCHAR)" ), "redirect_statistics": ( "CREATE TABLE redirect_statistics(source_hostname TEXT," "destination_hostname TEXT, is_top_level_document INTEGER NOT" "NULL, last_update INTEGER NOT NULL, UNIQUE(source_hostname,desti" "nation_hostname,is_top_level_document))" ), } ] def _GetWebKitDateTimeRowValue(self, query_hash, row, value_name): """Retrieves a WebKit date and time value from the row. Args: query_hash (int): hash of the query, that uniquely identifies the query that produced the row. row (sqlite3.Row): row. value_name (str): name of the value. Returns: dfdatetime.WebKitTime: date and time value or None if not available. """ timestamp = self._GetRowValue(query_hash, row, value_name) if timestamp is None: return None return dfdatetime_webkit_time.WebKitTime(timestamp=timestamp)
[docs] def ParseResourceRow(self, parser_mediator, query, row, **unused_kwargs): """Parses a row from the database. Args: parser_mediator (ParserMediator): mediates interactions between parsers and other components, such as storage and dfVFS. query (str): query that created the row. row (sqlite3.Row): row resulting from query. """ # Note that pysqlite does not accept a Unicode string in row['string'] and # will raise "IndexError: Index must be int or string". query_hash = hash(query) event_data = EdgeLoadStatisticsResourceEventData() event_data.last_update = self._GetWebKitDateTimeRowValue( query_hash, row, "last_update" ) event_data.query = query event_data.resource_hostname = self._GetRowValue( query_hash, row, "resource_hostname" ) event_data.resource_type = self._GetRowValue(query_hash, row, "resource_type") event_data.top_level_hostname = self._GetRowValue( query_hash, row, "top_level_hostname" ) parser_mediator.ProduceEventData(event_data)
sqlite.SQLiteParser.RegisterPlugin(EdgeLoadStatisticsPlugin)