Source code for plaso.parsers.chrome_cache

"""Parser for Google Chrome and Chromium Cache files."""

import os

from dfdatetime import webkit_time as dfdatetime_webkit_time
from dfvfs.resolver import resolver as path_spec_resolver
from dfvfs.path import factory as path_spec_factory

from plaso.containers import events
from plaso.lib import dtfabric_helper
from plaso.lib import errors
from plaso.lib import specification
from plaso.parsers import interface
from plaso.parsers import manager


[docs] class CacheAddress: """Chrome cache address. Attributes: block_number (int): block data file number. block_offset (int): offset within the block data file. block_size (int): block size. filename (str): name of the block data file. value (int): cache address. """ FILE_TYPE_SEPARATE = 0 FILE_TYPE_BLOCK_RANKINGS = 1 FILE_TYPE_BLOCK_256 = 2 FILE_TYPE_BLOCK_1024 = 3 FILE_TYPE_BLOCK_4096 = 4 _BLOCK_DATA_FILE_TYPES = [ FILE_TYPE_BLOCK_RANKINGS, FILE_TYPE_BLOCK_256, FILE_TYPE_BLOCK_1024, FILE_TYPE_BLOCK_4096] _FILE_TYPE_BLOCK_SIZES = [0, 36, 256, 1024, 4096]
[docs] def __init__(self, cache_address): """Initializes a cache address. Args: cache_address (int): cache address. """ super().__init__() self.block_number = None self.block_offset = None self.block_size = None self.filename = None self.value = cache_address if cache_address & 0x80000000: self.is_initialized = 'True' else: self.is_initialized = 'False' self.file_type = (cache_address & 0x70000000) >> 28 if not cache_address == 0x00000000: if self.file_type == self.FILE_TYPE_SEPARATE: file_selector = cache_address & 0x0fffffff self.filename = f'f_{file_selector:06x}' elif self.file_type in self._BLOCK_DATA_FILE_TYPES: file_selector = (cache_address & 0x00ff0000) >> 16 self.filename = f'data_{file_selector:d}' file_block_size = self._FILE_TYPE_BLOCK_SIZES[self.file_type] self.block_number = cache_address & 0x0000ffff self.block_size = (cache_address & 0x03000000) >> 24 self.block_size *= file_block_size self.block_offset = 8192 + (self.block_number * file_block_size)
[docs] class CacheEntry: """Chrome cache entry. Attributes: creation_time (int): creation time, in number of microseconds since January 1, 1601, 00:00:00 UTC. hash (int): super fast hash of the key. key (bytes): key. next (int): cache address of the next cache entry. original_url (str): original URL derived from the key. payloads ([str]): filenames (and offsets) of the cache payloads. rankings_node (int): cache address of the rankings node. """
[docs] def __init__(self): """Initializes a cache entry.""" super().__init__() self.creation_time = None self.hash = None self.key = None self.next = None self.original_url = None self.payloads = None self.rankings_node = None
[docs] class ChromeCacheIndexFileParser( interface.FileObjectParser, dtfabric_helper.DtFabricHelper): """Chrome cache index file parser. Attributes: creation_time (int): creation time, in number of microseconds since January 1, 1601, 00:00:00 UTC. index_table (list[CacheAddress]): the cache addresses which are stored in the index file. """ _DEFINITION_FILE = os.path.join( os.path.dirname(__file__), 'chrome_cache.yaml')
[docs] def __init__(self): """Initializes an index file.""" super().__init__() self.creation_time = None self.index_table = []
def _ParseFileHeader(self, file_object): """Parses the file header. Args: file_object (dfvfs.FileIO): a file-like object to parse. Raises: ParseError: if the file header cannot be read. """ file_header_map = self._GetDataTypeMap('chrome_cache_index_file_header') try: file_header, _ = self._ReadStructureFromFileObject( file_object, 0, file_header_map) except (ValueError, errors.ParseError) as exception: raise errors.ParseError( f'Unable to parse index file header with error: {exception!s}') format_version = ( f'{file_header.major_version:d}.{file_header.minor_version:d}') if format_version not in ('2.0', '2.1', '3.0'): raise errors.ParseError( f'Unsupported index file format version: {format_version:s}') self.creation_time = file_header.creation_time def _ParseIndexTable(self, file_object): """Parses the index table. Args: file_object (dfvfs.FileIO): a file-like object to parse. Raises: ParseError: if the index table cannot be read. """ cache_address_map = self._GetDataTypeMap('uint32le') file_offset = file_object.get_offset() cache_address_data = file_object.read(4) while len(cache_address_data) == 4: try: value = self._ReadStructureFromByteStream( cache_address_data, file_offset, cache_address_map) except (ValueError, errors.ParseError) as exception: raise errors.ParseError(( f'Unable to map cache address at offset: 0x{file_offset:08x} ' f'with error: {exception!s}')) if value: cache_address = CacheAddress(value) self.index_table.append(cache_address) file_offset += 4 cache_address_data = file_object.read(4)
[docs] def ParseFileObject(self, parser_mediator, file_object): """Parses a file-like object. Args: parser_mediator (ParserMediator): a parser mediator. file_object (dfvfs.FileIO): a file-like object to parse. Raises: ParseError: when the file cannot be parsed. """ try: self._ParseFileHeader(file_object) except errors.ParseError as exception: raise errors.ParseError( f'Unable to parse index file header with error: {exception!s}') # Skip over the LRU data, which is 112 bytes in size. file_object.seek(112, os.SEEK_CUR) self._ParseIndexTable(file_object)
[docs] class ChromeCacheDataBlockFileParser( interface.FileObjectParser, dtfabric_helper.DtFabricHelper): """Chrome cache data block file parser.""" _DEFINITION_FILE = os.path.join( os.path.dirname(__file__), 'chrome_cache.yaml') def _ParseFileHeader(self, file_object): """Parses the file header. Args: file_object (dfvfs.FileIO): a file-like object to parse. Raises: ParseError: if the file header cannot be read. """ file_header_map = self._GetDataTypeMap( 'chrome_cache_data_block_file_header') try: file_header, _ = self._ReadStructureFromFileObject( file_object, 0, file_header_map) except (ValueError, errors.ParseError) as exception: raise errors.ParseError( f'Unable to parse data block file header with error: {exception!s}') format_version = ( f'{file_header.major_version:d}.{file_header.minor_version:d}') if format_version not in ('2.0', '2.1'): raise errors.ParseError( f'Unsupported data block file format version: {format_version:s}') if file_header.block_size not in (256, 1024, 4096): raise errors.ParseError( f'Unsupported data block file block size: {file_header.block_size:d}')
[docs] def ParseCacheEntry(self, file_object, block_offset): """Parses a cache entry. Args: file_object (dfvfs.FileIO): a file-like object to read from. block_offset (int): block offset of the cache entry. Returns: CacheEntry: cache entry. Raises: ParseError: if the cache entry cannot be read. """ cache_entry_map = self._GetDataTypeMap('chrome_cache_entry') try: cache_entry, _ = self._ReadStructureFromFileObject( file_object, block_offset, cache_entry_map) except (ValueError, errors.ParseError) as exception: raise errors.ParseError(( f'Unable to parse cache entry at offset: 0x{block_offset:08x} with ' f'error: {exception!s}')) payloads = [] for stream in iter(cache_entry.data_stream_addresses): data_stream = CacheAddress(stream) if data_stream.filename is None: continue if data_stream.filename.startswith('f_'): payload = data_stream.filename else: payload = ( f'{data_stream.filename:s} (offset: ' f'0x{data_stream.block_offset:08x})') payloads.append(payload) cache_entry_object = CacheEntry() cache_entry_object.hash = cache_entry.hash cache_entry_object.next = CacheAddress(cache_entry.next_address) cache_entry_object.payloads = payloads cache_entry_object.rankings_node = CacheAddress( cache_entry.rankings_node_address) cache_entry_object.creation_time = cache_entry.creation_time byte_array = cache_entry.key byte_string = bytes(bytearray(byte_array)) cache_entry_object.key, _, _ = byte_string.partition(b'\x00') try: cache_entry_object.original_url = cache_entry_object.key.decode('ascii') except UnicodeDecodeError as exception: raise errors.ParseError( f'Unable to decode original URL in key with error: {exception!s}') return cache_entry_object
# pylint: disable=unused-argument
[docs] def ParseFileObject(self, parser_mediator, file_object): """Parses a file-like object. Args: parser_mediator (ParserMediator): a parser mediator. file_object (dfvfs.FileIO): a file-like object to parse. Raises: ParseError: when the file cannot be parsed. """ self._ParseFileHeader(file_object)
[docs] class ChromeCacheEntryEventData(events.EventData): """Chrome Cache event data. Attributes: creation_time (dfdatetime.DateTimeValues): creation date and time of the cache entry. original_url (str): original URL. payloads ([str]): filenames (and offsets) of the cache payloads. """ DATA_TYPE = 'chrome:cache:entry'
[docs] def __init__(self): """Initializes event data.""" super().__init__(data_type=self.DATA_TYPE) self.creation_time = None self.original_url = None self.payloads = None
[docs] class ChromeCacheParser(interface.FileEntryParser): """Parses Chrome Cache files.""" NAME = 'chrome_cache' DATA_FORMAT = 'Google Chrome or Chromium Cache file'
[docs] def __init__(self): """Initializes a Chrome Cache files parser.""" super().__init__() self._data_block_file_parser = ChromeCacheDataBlockFileParser()
def _ParseCacheEntries(self, parser_mediator, index_table, data_block_files): """Parses Chrome Cache file entries. Args: parser_mediator (ParserMediator): mediates interactions between parsers and other components, such as storage and dfVFS. index_table (list[CacheAddress]): the cache addresses which are stored in the index file. data_block_files (dict[str: file]): look up table for the data block file-like object handles. """ # Parse the cache entries in the data block files. for cache_address in index_table: cache_address_chain_length = 0 while cache_address.value != 0: if cache_address_chain_length >= 64: parser_mediator.ProduceExtractionWarning( 'Maximum allowed cache address chain length reached.') break data_block_file_object = data_block_files.get( cache_address.filename, None) if not data_block_file_object: parser_mediator.ProduceExtractionWarning( f'Cache address: 0x{cache_address.value:08x} missing data file.') break try: cache_entry = self._data_block_file_parser.ParseCacheEntry( data_block_file_object, cache_address.block_offset) except (OSError, errors.ParseError) as exception: parser_mediator.ProduceExtractionWarning( f'Unable to parse cache entry with error: {exception!s}') break event_data = ChromeCacheEntryEventData() event_data.creation_time = dfdatetime_webkit_time.WebKitTime( timestamp=cache_entry.creation_time) event_data.payloads = cache_entry.payloads # In Chrome Cache v3, doublekey-ing cache entries was introduced # This shows up as "_dk_{domain}( {domain})* {url}" # https://chromium.googlesource.com/chromium/src/+/ # 95faad3cfd90169f0a267e979c36e3348476a948/net/http/http_cache.cc#427 if '_dk_' in cache_entry.original_url[:20]: parsed_url = cache_entry.original_url.strip().rsplit(' ', 1)[-1] event_data.original_url = parsed_url else: event_data.original_url = cache_entry.original_url parser_mediator.ProduceEventData(event_data) cache_address = cache_entry.next cache_address_chain_length += 1 def _ParseIndexTable( self, parser_mediator, file_system, file_entry, index_table): """Parses a Chrome Cache index table. Args: parser_mediator (ParserMediator): mediates interactions between parsers and other components, such as storage and dfVFS. file_system (dfvfs.FileSystem): file system. file_entry (dfvfs.FileEntry): file entry. index_table (list[CacheAddress]): the cache addresses which are stored in the index file. """ # Build a lookup table for the data block files. path_segments = file_system.SplitPath(file_entry.path_spec.location) data_block_files = {} for cache_address in index_table: if cache_address.filename not in data_block_files: # Remove the previous filename from the path segments list and # add one of the data block files. path_segments.pop() path_segments.append(cache_address.filename) # We need to pass only used arguments to the path specification # factory otherwise it will raise. kwargs = {} if file_entry.path_spec.parent: kwargs['parent'] = file_entry.path_spec.parent kwargs['location'] = file_system.JoinPath(path_segments) data_block_file_path_spec = path_spec_factory.Factory.NewPathSpec( file_entry.path_spec.TYPE_INDICATOR, **kwargs) try: data_block_file_entry = path_spec_resolver.Resolver.OpenFileEntry( data_block_file_path_spec) except RuntimeError as exception: location = kwargs['location'] or 'N/A' parser_mediator.ProduceExtractionWarning( f'Unable to open data block file: {location!s} with error: ' f'{exception!s}') data_block_file_entry = None if not data_block_file_entry: parser_mediator.ProduceExtractionWarning( f'Missing data block file: {cache_address.filename:s}') data_block_file_object = None else: data_block_file_object = data_block_file_entry.GetFileObject() try: self._data_block_file_parser.ParseFileObject( parser_mediator, data_block_file_object) except (OSError, errors.ParseError) as exception: message = ( f'Unable to parse data block file: {cache_address.filename!s} ' f'with error: {exception!s}') parser_mediator.ProduceExtractionWarning(message) data_block_file_object = None data_block_files[cache_address.filename] = data_block_file_object self._ParseCacheEntries(parser_mediator, index_table, data_block_files)
[docs] @classmethod def GetFormatSpecification(cls): """Retrieves the format specification. Returns: FormatSpecification: format specification. """ format_specification = specification.FormatSpecification(cls.NAME) format_specification.AddNewSignature(b'\xc3\xca\x03\xc1', offset=0) return format_specification
[docs] def ParseFileEntry(self, parser_mediator, file_entry): """Parses Chrome Cache files. Args: parser_mediator (ParserMediator): mediates interactions between parsers and other components, such as storage and dfVFS. file_entry (dfvfs.FileEntry): file entry. Raises: WrongParser: when the file cannot be parsed. """ index_file_parser = ChromeCacheIndexFileParser() file_object = file_entry.GetFileObject() if not file_object: display_name = parser_mediator.GetDisplayName() raise errors.WrongParser( f'[{self.NAME:s}] unable to parse index file {display_name:s}') try: index_file_parser.ParseFileObject(parser_mediator, file_object) except (OSError, errors.ParseError) as exception: display_name = parser_mediator.GetDisplayName() raise errors.WrongParser( f'[{self.NAME:s}] unable to parse index file {display_name:s} with ' f'error: {exception!s}') # TODO: create event based on index file creation time. file_system = file_entry.GetFileSystem() self._ParseIndexTable( parser_mediator, file_system, file_entry, index_file_parser.index_table)
manager.ParsersManager.RegisterParser(ChromeCacheParser)