Source code for plaso.parsers.sqlite_plugins.edge_load_statistics

# -*- coding: utf-8 -*-
"""Parser for Microsoft Edge load statistics database."""

from dfdatetime import webkit_time as dfdatetime_webkit_time

from plaso.containers import events
from plaso.parsers import sqlite
from plaso.parsers.sqlite_plugins import interface


[docs] class EdgeLoadStatisticsResourceEventData(events.EventData): """Microsoft Edge load statistics resource event data. Attributes: last_update: Last update time of resource, cached or not. query (str): query that created the event data. resource_hostname: External domain of the resource that was loaded resource_type: Integer descriptor of resource type top_level_hostname: Source domain that initiated resource load """ DATA_TYPE = 'edge:resources:load_statistics'
[docs] def __init__(self): """Initializes event data.""" super(EdgeLoadStatisticsResourceEventData, self).__init__( data_type=self.DATA_TYPE) self.last_update = None self.query = None self.resource_hostname = None self.resource_type = None self.top_level_hostname = None
[docs] class EdgeLoadStatisticsPlugin(interface.SQLitePlugin): """SQLite parser plugin for Microsoft Edge load statistics database.""" NAME = 'edge_load_statistics' DESCRIPTION = 'Parser for Microsoft Edge load_statistics.db' QUERIES = [ ('SELECT top_level_hostname, resource_hostname, resource_type, ' 'last_update FROM load_statistics', 'ParseResourceRow')] REQUIRED_STRUCTURE = { 'load_statistics': frozenset([ 'top_level_hostname', 'resource_hostname', 'resource_url_hash', 'resource_type', 'last_update']), 'meta':frozenset([ 'key','value']), 'redirect_statistics':frozenset([ 'source_hostname','destination_hostname', 'is_top_level_document','last_update'])} SCHEMAS = [{ 'load_statistics': ( 'CREATE TABLE load_statistics(top_level_hostname TEXT,' 'resource_hostname TEXT, resource_url_hash TEXT, resource_type' 'INTEGER, last_update INTEGER NOT NULL,' 'UNIQUE(top_level_hostname,resource_url_hash))'), 'meta': ( 'CREATE TABLE meta(key LONGVARCHAR NOT NULL UNIQUE PRIMARY KEY,' 'value LONGVARCHAR)'), 'redirect_statistics': ( 'CREATE TABLE redirect_statistics(source_hostname TEXT,' 'destination_hostname TEXT, is_top_level_document INTEGER NOT' 'NULL, last_update INTEGER NOT NULL, UNIQUE(source_hostname,desti' 'nation_hostname,is_top_level_document))')}] def _GetWebKitDateTimeRowValue(self, query_hash, row, value_name): """Retrieves a WebKit date and time value from the row. Args: query_hash (int): hash of the query, that uniquely identifies the query that produced the row. row (sqlite3.Row): row. value_name (str): name of the value. Returns: dfdatetime.WebKitTime: date and time value or None if not available. """ timestamp = self._GetRowValue(query_hash, row, value_name) if timestamp is None: return None return dfdatetime_webkit_time.WebKitTime(timestamp=timestamp)
[docs] def ParseResourceRow(self, parser_mediator, query, row, **unused_kwargs): """Parses a row from the database. Args: parser_mediator (ParserMediator): mediates interactions between parsers and other components, such as storage and dfvfs. query (str): query that created the row. row (sqlite3.Row): row resulting from query. """ # Note that pysqlite does not accept a Unicode string in row['string'] and # will raise "IndexError: Index must be int or string". query_hash = hash(query) event_data = EdgeLoadStatisticsResourceEventData() event_data.last_update = self._GetWebKitDateTimeRowValue( query_hash, row, 'last_update') event_data.query = query event_data.resource_hostname = self._GetRowValue( query_hash, row, 'resource_hostname') event_data.resource_type = self._GetRowValue( query_hash, row, 'resource_type') event_data.top_level_hostname = self._GetRowValue( query_hash, row, 'top_level_hostname') parser_mediator.ProduceEventData(event_data)
sqlite.SQLiteParser.RegisterPlugin(EdgeLoadStatisticsPlugin)