Source code for plaso.parsers.chrome_cache

"""Parser for Google Chrome and Chromium Cache files."""

import os

from dfdatetime import webkit_time as dfdatetime_webkit_time
from dfvfs.resolver import resolver as path_spec_resolver
from dfvfs.path import factory as path_spec_factory

from plaso.containers import events
from plaso.lib import dtfabric_helper
from plaso.lib import errors
from plaso.lib import specification
from plaso.parsers import interface
from plaso.parsers import manager


[docs] class CacheAddress: """Chrome cache address. Attributes: block_number (int): block data file number. block_offset (int): offset within the block data file. block_size (int): block size. filename (str): name of the block data file. value (int): cache address. """ FILE_TYPE_SEPARATE = 0 FILE_TYPE_BLOCK_RANKINGS = 1 FILE_TYPE_BLOCK_256 = 2 FILE_TYPE_BLOCK_1024 = 3 FILE_TYPE_BLOCK_4096 = 4 _BLOCK_DATA_FILE_TYPES = [ FILE_TYPE_BLOCK_RANKINGS, FILE_TYPE_BLOCK_256, FILE_TYPE_BLOCK_1024, FILE_TYPE_BLOCK_4096, ] _FILE_TYPE_BLOCK_SIZES = [0, 36, 256, 1024, 4096]
[docs] def __init__(self, cache_address): """Initializes a cache address. Args: cache_address (int): cache address. """ super().__init__() self.block_number = None self.block_offset = None self.block_size = None self.filename = None self.value = cache_address if cache_address & 0x80000000: self.is_initialized = "True" else: self.is_initialized = "False" self.file_type = (cache_address & 0x70000000) >> 28 if not cache_address == 0x00000000: if self.file_type == self.FILE_TYPE_SEPARATE: file_selector = cache_address & 0x0FFFFFFF self.filename = f"f_{file_selector:06x}" elif self.file_type in self._BLOCK_DATA_FILE_TYPES: file_selector = (cache_address & 0x00FF0000) >> 16 self.filename = f"data_{file_selector:d}" file_block_size = self._FILE_TYPE_BLOCK_SIZES[self.file_type] self.block_number = cache_address & 0x0000FFFF self.block_size = (cache_address & 0x03000000) >> 24 self.block_size *= file_block_size self.block_offset = 8192 + (self.block_number * file_block_size)
[docs] class CacheEntry: """Chrome cache entry. Attributes: creation_time (int): creation time, in number of microseconds since January 1, 1601, 00:00:00 UTC. hash (int): super fast hash of the key. key (bytes): key. next (int): cache address of the next cache entry. original_url (str): original URL derived from the key. payloads ([str]): filenames (and offsets) of the cache payloads. rankings_node (int): cache address of the rankings node. """
[docs] def __init__(self): """Initializes a cache entry.""" super().__init__() self.creation_time = None self.hash = None self.key = None self.next = None self.original_url = None self.payloads = None self.rankings_node = None
[docs] class ChromeCacheIndexFileParser( interface.FileObjectParser, dtfabric_helper.DtFabricHelper ): """Chrome cache index file parser. Attributes: creation_time (int): creation time, in number of microseconds since January 1, 1601, 00:00:00 UTC. index_table (list[CacheAddress]): the cache addresses which are stored in the index file. """ _DEFINITION_FILE = os.path.join(os.path.dirname(__file__), "chrome_cache.yaml")
[docs] def __init__(self): """Initializes an index file.""" super().__init__() self.creation_time = None self.index_table = []
def _ParseFileHeader(self, file_object): """Parses the file header. Args: file_object (dfvfs.FileIO): a file-like object to parse. Raises: ParseError: if the file header cannot be read. """ file_header_map = self._GetDataTypeMap("chrome_cache_index_file_header") try: file_header, _ = self._ReadStructureFromFileObject( file_object, 0, file_header_map ) except (ValueError, errors.ParseError) as exception: raise errors.ParseError( f"Unable to parse index file header with error: {exception!s}" ) format_version = f"{file_header.major_version:d}.{file_header.minor_version:d}" if format_version not in ("2.0", "2.1", "3.0"): raise errors.ParseError( f"Unsupported index file format version: {format_version:s}" ) self.creation_time = file_header.creation_time def _ParseIndexTable(self, file_object): """Parses the index table. Args: file_object (dfvfs.FileIO): a file-like object to parse. Raises: ParseError: if the index table cannot be read. """ cache_address_map = self._GetDataTypeMap("uint32le") file_offset = file_object.get_offset() cache_address_data = file_object.read(4) while len(cache_address_data) == 4: try: value = self._ReadStructureFromByteStream( cache_address_data, file_offset, cache_address_map ) except (ValueError, errors.ParseError) as exception: raise errors.ParseError( f"Unable to map cache address at offset: 0x{file_offset:08x} " f"with error: {exception!s}" ) if value: cache_address = CacheAddress(value) self.index_table.append(cache_address) file_offset += 4 cache_address_data = file_object.read(4)
[docs] def ParseFileObject(self, parser_mediator, file_object): """Parses a file-like object. Args: parser_mediator (ParserMediator): a parser mediator. file_object (dfvfs.FileIO): a file-like object to parse. Raises: ParseError: when the file cannot be parsed. """ try: self._ParseFileHeader(file_object) except errors.ParseError as exception: raise errors.ParseError( f"Unable to parse index file header with error: {exception!s}" ) # Skip over the LRU data, which is 112 bytes in size. file_object.seek(112, os.SEEK_CUR) self._ParseIndexTable(file_object)
[docs] class ChromeCacheDataBlockFileParser( interface.FileObjectParser, dtfabric_helper.DtFabricHelper ): """Chrome cache data block file parser.""" _DEFINITION_FILE = os.path.join(os.path.dirname(__file__), "chrome_cache.yaml") def _ParseFileHeader(self, file_object): """Parses the file header. Args: file_object (dfvfs.FileIO): a file-like object to parse. Raises: ParseError: if the file header cannot be read. """ file_header_map = self._GetDataTypeMap("chrome_cache_data_block_file_header") try: file_header, _ = self._ReadStructureFromFileObject( file_object, 0, file_header_map ) except (ValueError, errors.ParseError) as exception: raise errors.ParseError( f"Unable to parse data block file header with error: {exception!s}" ) format_version = f"{file_header.major_version:d}.{file_header.minor_version:d}" if format_version not in ("2.0", "2.1"): raise errors.ParseError( f"Unsupported data block file format version: {format_version:s}" ) if file_header.block_size not in (256, 1024, 4096): raise errors.ParseError( f"Unsupported data block file block size: {file_header.block_size:d}" )
[docs] def ParseCacheEntry(self, file_object, block_offset): """Parses a cache entry. Args: file_object (dfvfs.FileIO): a file-like object to read from. block_offset (int): block offset of the cache entry. Returns: CacheEntry: cache entry. Raises: ParseError: if the cache entry cannot be read. """ cache_entry_map = self._GetDataTypeMap("chrome_cache_entry") try: cache_entry, _ = self._ReadStructureFromFileObject( file_object, block_offset, cache_entry_map ) except (ValueError, errors.ParseError) as exception: raise errors.ParseError( f"Unable to parse cache entry at offset: 0x{block_offset:08x} with " f"error: {exception!s}" ) payloads = [] for stream in iter(cache_entry.data_stream_addresses): data_stream = CacheAddress(stream) if data_stream.filename is None: continue if data_stream.filename.startswith("f_"): payload = data_stream.filename else: payload = ( f"{data_stream.filename:s} (offset: " f"0x{data_stream.block_offset:08x})" ) payloads.append(payload) cache_entry_object = CacheEntry() cache_entry_object.hash = cache_entry.hash cache_entry_object.next = CacheAddress(cache_entry.next_address) cache_entry_object.payloads = payloads cache_entry_object.rankings_node = CacheAddress( cache_entry.rankings_node_address ) cache_entry_object.creation_time = cache_entry.creation_time byte_array = cache_entry.key byte_string = bytes(bytearray(byte_array)) cache_entry_object.key, _, _ = byte_string.partition(b"\x00") try: cache_entry_object.original_url = cache_entry_object.key.decode("ascii") except UnicodeDecodeError as exception: raise errors.ParseError( f"Unable to decode original URL in key with error: {exception!s}" ) return cache_entry_object
# pylint: disable=unused-argument
[docs] def ParseFileObject(self, parser_mediator, file_object): """Parses a file-like object. Args: parser_mediator (ParserMediator): a parser mediator. file_object (dfvfs.FileIO): a file-like object to parse. Raises: ParseError: when the file cannot be parsed. """ self._ParseFileHeader(file_object)
[docs] class ChromeCacheEntryEventData(events.EventData): """Chrome Cache event data. Attributes: creation_time (dfdatetime.DateTimeValues): creation date and time of the cache entry. original_url (str): original URL. payloads ([str]): filenames (and offsets) of the cache payloads. """ DATA_TYPE = "chrome:cache:entry"
[docs] def __init__(self): """Initializes event data.""" super().__init__(data_type=self.DATA_TYPE) self.creation_time = None self.original_url = None self.payloads = None
[docs] class ChromeCacheParser(interface.FileEntryParser): """Parses Chrome Cache files.""" NAME = "chrome_cache" DATA_FORMAT = "Google Chrome or Chromium Cache file"
[docs] def __init__(self): """Initializes a Chrome Cache files parser.""" super().__init__() self._data_block_file_parser = ChromeCacheDataBlockFileParser()
def _ParseCacheEntries(self, parser_mediator, index_table, data_block_files): """Parses Chrome Cache file entries. Args: parser_mediator (ParserMediator): mediates interactions between parsers and other components, such as storage and dfVFS. index_table (list[CacheAddress]): the cache addresses which are stored in the index file. data_block_files (dict[str: file]): look up table for the data block file-like object handles. """ # Parse the cache entries in the data block files. for cache_address in index_table: cache_address_chain_length = 0 while cache_address.value != 0: if cache_address_chain_length >= 64: parser_mediator.ProduceExtractionWarning( "Maximum allowed cache address chain length reached." ) break data_block_file_object = data_block_files.get( cache_address.filename, None ) if not data_block_file_object: parser_mediator.ProduceExtractionWarning( f"Cache address: 0x{cache_address.value:08x} missing data file." ) break try: cache_entry = self._data_block_file_parser.ParseCacheEntry( data_block_file_object, cache_address.block_offset ) except (OSError, errors.ParseError) as exception: parser_mediator.ProduceExtractionWarning( f"Unable to parse cache entry with error: {exception!s}" ) break event_data = ChromeCacheEntryEventData() event_data.creation_time = dfdatetime_webkit_time.WebKitTime( timestamp=cache_entry.creation_time ) event_data.payloads = cache_entry.payloads # In Chrome Cache v3, doublekey-ing cache entries was introduced # This shows up as "_dk_{domain}( {domain})* {url}" # https://chromium.googlesource.com/chromium/src/+/ # 95faad3cfd90169f0a267e979c36e3348476a948/net/http/http_cache.cc#427 if "_dk_" in cache_entry.original_url[:20]: parsed_url = cache_entry.original_url.strip().rsplit(" ", 1)[-1] event_data.original_url = parsed_url else: event_data.original_url = cache_entry.original_url parser_mediator.ProduceEventData(event_data) cache_address = cache_entry.next cache_address_chain_length += 1 def _ParseIndexTable(self, parser_mediator, file_system, file_entry, index_table): """Parses a Chrome Cache index table. Args: parser_mediator (ParserMediator): mediates interactions between parsers and other components, such as storage and dfVFS. file_system (dfvfs.FileSystem): file system. file_entry (dfvfs.FileEntry): file entry. index_table (list[CacheAddress]): the cache addresses which are stored in the index file. """ # Build a lookup table for the data block files. path_segments = file_system.SplitPath(file_entry.path_spec.location) data_block_files = {} for cache_address in index_table: if cache_address.filename not in data_block_files: # Remove the previous filename from the path segments list and # add one of the data block files. path_segments.pop() path_segments.append(cache_address.filename) # We need to pass only used arguments to the path specification # factory otherwise it will raise. kwargs = {} if file_entry.path_spec.parent: kwargs["parent"] = file_entry.path_spec.parent kwargs["location"] = file_system.JoinPath(path_segments) data_block_file_path_spec = path_spec_factory.Factory.NewPathSpec( file_entry.path_spec.TYPE_INDICATOR, **kwargs ) try: data_block_file_entry = path_spec_resolver.Resolver.OpenFileEntry( data_block_file_path_spec ) except RuntimeError as exception: location = kwargs["location"] or "N/A" parser_mediator.ProduceExtractionWarning( f"Unable to open data block file: {location!s} with error: " f"{exception!s}" ) data_block_file_entry = None if not data_block_file_entry: parser_mediator.ProduceExtractionWarning( f"Missing data block file: {cache_address.filename:s}" ) data_block_file_object = None else: data_block_file_object = data_block_file_entry.GetFileObject() try: self._data_block_file_parser.ParseFileObject( parser_mediator, data_block_file_object ) except (OSError, errors.ParseError) as exception: message = ( f"Unable to parse data block file: " f"{cache_address.filename!s} with error: {exception!s}" ) parser_mediator.ProduceExtractionWarning(message) data_block_file_object = None data_block_files[cache_address.filename] = data_block_file_object self._ParseCacheEntries(parser_mediator, index_table, data_block_files)
[docs] @classmethod def GetFormatSpecification(cls): """Retrieves the format specification. Returns: FormatSpecification: format specification. """ format_specification = specification.FormatSpecification(cls.NAME) format_specification.AddNewSignature(b"\xc3\xca\x03\xc1", offset=0) return format_specification
[docs] def ParseFileEntry(self, parser_mediator, file_entry): """Parses Chrome Cache files. Args: parser_mediator (ParserMediator): mediates interactions between parsers and other components, such as storage and dfVFS. file_entry (dfvfs.FileEntry): file entry. Raises: WrongParser: when the file cannot be parsed. """ index_file_parser = ChromeCacheIndexFileParser() file_object = file_entry.GetFileObject() if not file_object: display_name = parser_mediator.GetDisplayName() raise errors.WrongParser( f"[{self.NAME:s}] unable to parse index file {display_name:s}" ) try: index_file_parser.ParseFileObject(parser_mediator, file_object) except (OSError, errors.ParseError) as exception: display_name = parser_mediator.GetDisplayName() raise errors.WrongParser( f"[{self.NAME:s}] unable to parse index file {display_name:s} with " f"error: {exception!s}" ) # TODO: create event based on index file creation time. file_system = file_entry.GetFileSystem() self._ParseIndexTable( parser_mediator, file_system, file_entry, index_file_parser.index_table )
manager.ParsersManager.RegisterParser(ChromeCacheParser)