Source code for plaso.analysis.bloom

"""Analysis plugin to look up file hashes in bloom database."""

import flor  # pylint: disable=import-error

from plaso.analysis import hash_tagging
from plaso.analysis import logger
from plaso.analysis import manager


[docs] class BloomAnalysisPlugin(hash_tagging.HashTaggingAnalysisPlugin): """Analysis plugin for looking up hashes in bloom database.""" DATA_TYPES = frozenset(["fs:stat", "fs:stat:ntfs"]) NAME = "bloom" SUPPORTED_HASHES = frozenset(["md5", "sha1", "sha256"]) DEFAULT_LABEL = "bloom_present"
[docs] def __init__(self): """Initializes a bloom database analysis plugin.""" super().__init__() self._bloom_database_path = None self._bloom_filter_object = None self._label = self.DEFAULT_LABEL
def _Analyze(self, hashes): """Looks up file hashes in a bloom database. Args: hashes (list[str]): hash values to look up. Returns: list[HashAnalysis]: analysis results, or an empty list on error. Raises: RuntimeError: when the analyzer fail to get a bloom filter object. """ bloom_filter = self._GetBloomFilterObject(cached=True) if not bloom_filter: raise RuntimeError("Failed to open bloom file") hash_analyses = [] for digest in hashes: response = self._QueryHash(digest=digest, bloom_filter=bloom_filter) if response is not None: hash_analysis = hash_tagging.HashAnalysis( subject_hash=digest, hash_information=response ) hash_analyses.append(hash_analysis) return hash_analyses def _GenerateLabels(self, hash_information): """Generates a list of strings that will be used in the event tag. Args: hash_information (bool): response from the hash tagging that indicates that the file hash was present or not. Returns: list[str]: list of labels to apply to event. """ if hash_information: return [self._label] return [] def _GetBloomFilterObject(self, cached=True): """Loads a bloom filter file in memory. Args: cached (bool): True if the bloom filter should be cached. Returns: flor.BloomFilter: bloom filter object or None if not available. """ bloom_filter = self._bloom_filter_object if not bloom_filter: logger.debug(f"Opening bloom database file: {self._bloom_database_path:s}.") if not flor: logger.warning("Missing optional dependency: flor") return None try: bloom_filter = flor.BloomFilter() with open(self._bloom_database_path, "rb") as file_object: bloom_filter.read(file_object) except OSError as exception: bloom_filter = None logger.warning( ( f"Unable to open bloom database file: " f"{self._bloom_database_path:s} with error: {exception!s}." ) ) if cached: self._bloom_filter_object = bloom_filter return bloom_filter def _QueryHash(self, digest, bloom_filter): """Queries BloomFilter for a specific hash in upper case. Args: digest (str): hash to look up. bloom_filter (flor.BloomFilter): instanced bloom filter. Returns: bool: True if the hash was found, False if not. """ value_to_test = digest.upper().encode("utf-8") return value_to_test in bloom_filter
[docs] def SetBloomDatabasePath(self, bloom_database_path): """Set the path to the bloom file containing hash. Args: bloom_database_path (str): Path to the bloom file """ self._bloom_database_path = bloom_database_path
[docs] def SetLabel(self, label): """Sets the tagging label. Args: label (str): label to apply to events extracted from files that are present in the bloom database. """ self._label = label
[docs] def TestLoading(self): """Checks if the bloom database exist and is valid. Returns: bool: True is the bloom database exist and is valid. """ return bool(self._GetBloomFilterObject(cached=False))
manager.AnalysisPluginManager.RegisterPlugin(BloomAnalysisPlugin)