Source code for plaso.analysis.hash_tagging

# -*- coding: utf-8 -*-
"""This file contains the interface for analysis plugins."""

import abc
import collections
import time

import requests

from plaso.analysis import interface
from plaso.analysis import logger
from plaso.containers import events
from plaso.lib import errors

[docs]class HashAnalysis(object): """Analysis information about a hash. Attributes: hash_information (object): object containing information about the hash. subject_hash (str): hash that was analyzed. """ def __init__(self, subject_hash, hash_information): """Initializes analysis information about a hash. Args: subject_hash (str): hash that the hash_information relates to. hash_information (object): information about the hash. This object will be used by the GenerateLabels method in the HashTaggingAnalysisPlugin to tag events that relate to the hash. """ self.hash_information = hash_information self.subject_hash = subject_hash
[docs]class HashTaggingAnalysisPlugin(interface.AnalysisPlugin): """An interface for plugins that tag events based on the source file hash.""" # The event data types the plugin will collect hashes from. Subclasses # must override this attribute. DATA_TYPES = [] # Lookup hashes supported by the hash tagging analysis plugin. SUPPORTED_HASHES = frozenset([]) _DEFAULT_HASHES_PER_BATCH = 1 _DEFAULT_LOOKUP_HASH = 'sha256' _DEFAULT_WAIT_AFTER_ANALYSIS = 0.0 _REQUEST_TIMEOUT = 60 def __init__(self): """Initializes a hash tagging analysis plugin.""" super(HashTaggingAnalysisPlugin, self).__init__() self._batch_of_lookup_hashes = [] self._data_stream_identifiers = set() self._data_streams_by_hash = collections.defaultdict(set) self._event_identifiers_by_data_stream = collections.defaultdict(set) self._hashes_per_batch = self._DEFAULT_HASHES_PER_BATCH self._lookup_hash = self._DEFAULT_LOOKUP_HASH self._wait_after_analysis = self._DEFAULT_WAIT_AFTER_ANALYSIS @abc.abstractmethod def _Analyze(self, hashes): """Analyzes a list of hashes. Args: hashes (list[str]): list of hashes to look up. Returns: list[HashAnalysis]: list of results of analyzing the hashes. """ @abc.abstractmethod def _GenerateLabels(self, hash_information): """Generates a list of strings to tag events with. Args: hash_information (bool): response from the hash tagging analyzer that indicates that the file hash was present or not. Returns: list[str]: list of labels to apply to event. """ def _MakeRequestAndDecodeJSON(self, url, method, **kwargs): """Make a HTTP request and decode the results as JSON. Args: url (str): URL to make a request to. method (str): HTTP method to used to make the request. GET and POST are supported. kwargs: parameters to the requests .get() or post() methods, depending on the value of the method parameter. Returns: dict[str, object]: body of the HTTP response, decoded from JSON. Raises: ConnectionError: If it is not possible to connect to the given URL, or it the request returns a HTTP error. ValueError: If an invalid HTTP method is specified. """ method_upper = method.upper() if method_upper not in ('GET', 'POST'): raise ValueError('Method {0:s} is not supported') try: if method_upper == 'GET': response = requests.get(url, timeout=self._REQUEST_TIMEOUT, **kwargs) elif method_upper == 'POST': response =, timeout=self._REQUEST_TIMEOUT, **kwargs) response.raise_for_status() except requests.ConnectionError as exception: raise errors.ConnectionError( f'Unable to connect to: {url:s} with error: {exception!s}') except requests.HTTPError as exception: raise errors.ConnectionError( f'Connect to: {url:s} returned a HTTP error: {exception!s}') return response.json() def _ProcessHashAnalysis(self, analysis_mediator, hash_analysis): """Processes the results of the analysis of a hash. This method ensures that labels are generated for the hash, then tags all events derived from files with that hash. Args: analysis_mediator (AnalysisMediator): mediates interactions between analysis plugins and other components, such as storage and dfVFS. hash_analysis (HashAnalysis): hash analysis plugin's results for a given hash. """ labels = self._GenerateLabels(hash_analysis.hash_information) try: data_stream_identifiers = self._data_streams_by_hash.pop( hash_analysis.subject_hash) except KeyError: data_stream_identifiers = [] logger.error(( f'unable to retrieve data streams for digest hash: ' f'{hash_analysis.subject_hash:s}')) for data_stream_identifier in data_stream_identifiers: event_identifiers = self._event_identifiers_by_data_stream.pop( data_stream_identifier) # Do no bail out earlier to maintain the state of # self._data_streams_by_hash and self._event_identifiers_by_data_stream. if not labels: continue for event_identifier in event_identifiers: event_tag = events.EventTag() event_tag.SetEventIdentifier(event_identifier) try: event_tag.AddLabels(labels) except (TypeError, ValueError): error_label = f'error_{self.NAME:s}' labels_string = ', '.join(labels) logger.error(( f'unable to add labels: {labels_string!s} for digest hash: ' f'{hash_analysis.subject_hash:s} defaulting to: {error_label:s}')) labels = [error_label] event_tag.AddLabels(labels) analysis_mediator.ProduceEventTag(event_tag) for label in labels: self._analysis_counter[label] += 1
[docs] def CompileReport(self, analysis_mediator): """Compiles an analysis report. Args: analysis_mediator (AnalysisMediator): mediates interactions between analysis plugins and other components, such as storage and dfVFS. Returns: AnalysisReport: report. """ if self._batch_of_lookup_hashes: for hash_analysis in self._Analyze(self._batch_of_lookup_hashes): self._ProcessHashAnalysis(analysis_mediator, hash_analysis) self._batch_of_lookup_hashes = [] return super(HashTaggingAnalysisPlugin, self).CompileReport( analysis_mediator)
[docs] def ExamineEvent( self, analysis_mediator, event, event_data, event_data_stream): """Evaluates whether an event contains the right data for a hash lookup. Args: analysis_mediator (AnalysisMediator): mediates interactions between analysis plugins and other components, such as storage and dfVFS. event (EventObject): event. event_data (EventData): event data. event_data_stream (EventDataStream): event data stream. """ if (not self._lookup_hash or not event_data_stream or event_data.data_type not in self.DATA_TYPES): return data_stream_identifier = event_data_stream.GetIdentifier() if data_stream_identifier not in self._data_stream_identifiers: self._data_stream_identifiers.add(data_stream_identifier) lookup_hash = f'{self._lookup_hash:s}_hash' lookup_hash = getattr(event_data_stream, lookup_hash, None) if not lookup_hash: path_specification = getattr(event_data_stream, 'path_spec', None) display_name = analysis_mediator.GetDisplayNameForPathSpec( path_specification) logger.warning(( f'Lookup hash attribute: {self._lookup_hash:s}_hash missing from ' f'event data stream: {display_name:s}.')) else: self._data_streams_by_hash[lookup_hash].add(data_stream_identifier) self._batch_of_lookup_hashes.append(lookup_hash) event_identifier = event.GetIdentifier() self._event_identifiers_by_data_stream[data_stream_identifier].add( event_identifier) if len(self._batch_of_lookup_hashes) >= self._hashes_per_batch: for hash_analysis in self._Analyze(self._batch_of_lookup_hashes): self._ProcessHashAnalysis(analysis_mediator, hash_analysis) self._batch_of_lookup_hashes = [] time.sleep(self._wait_after_analysis)
[docs] def SetLookupHash(self, lookup_hash): """Sets the hash to query. Args: lookup_hash (str): name of the hash attribute to look up. Raises: ValueError: if the lookup hash is not supported. """ if lookup_hash not in self.SUPPORTED_HASHES: raise ValueError(f'Unsupported lookup hash: {lookup_hash!s}') self._lookup_hash = lookup_hash