# -*- coding: utf-8 -*-
"""Analysis plugin to look up file hashes in bloom database."""
import flor # pylint: disable=import-error
from plaso.analysis import hash_tagging
from plaso.analysis import logger
from plaso.analysis import manager
[docs]
class BloomAnalysisPlugin(hash_tagging.HashTaggingAnalysisPlugin):
"""Analysis plugin for looking up hashes in bloom database."""
DATA_TYPES = frozenset(['fs:stat', 'fs:stat:ntfs'])
NAME = 'bloom'
SUPPORTED_HASHES = frozenset(['md5', 'sha1', 'sha256'])
DEFAULT_LABEL = 'bloom_present'
[docs]
def __init__(self):
"""Initializes a bloom database analysis plugin."""
super(BloomAnalysisPlugin, self).__init__()
self._bloom_database_path = None
self._bloom_filter_object = None
self._label = self.DEFAULT_LABEL
def _Analyze(self, hashes):
"""Looks up file hashes in a bloom database.
Args:
hashes (list[str]): hash values to look up.
Returns:
list[HashAnalysis]: analysis results, or an empty list on error.
Raises:
RuntimeError: when the analyzer fail to get a bloom filter object.
"""
bloom_filter = self._GetBloomFilterObject(cached=True)
if not bloom_filter:
raise RuntimeError('Failed to open bloom file')
hash_analyses = []
for digest in hashes:
response = self._QueryHash(digest=digest, bloom_filter=bloom_filter)
if response is not None:
hash_analysis = hash_tagging.HashAnalysis(
subject_hash=digest, hash_information=response)
hash_analyses.append(hash_analysis)
return hash_analyses
def _GenerateLabels(self, hash_information):
"""Generates a list of strings that will be used in the event tag.
Args:
hash_information (bool): response from the hash tagging that indicates
that the file hash was present or not.
Returns:
list[str]: list of labels to apply to event.
"""
if hash_information:
return [self._label]
return []
def _GetBloomFilterObject(self, cached=True):
"""Loads a bloom filter file in memory.
Args:
cached (bool): True if the bloom filter should be cached.
Returns:
flor.BloomFilter: bloom filter object or None if not available.
"""
bloom_filter = self._bloom_filter_object
if not bloom_filter:
logger.debug(
f'Opening bloom database file: {self._bloom_database_path:s}.')
if not flor:
logger.warning('Missing optional dependency: flor')
return None
try:
bloom_filter = flor.BloomFilter()
with open(self._bloom_database_path, 'rb') as file_object:
bloom_filter.read(file_object)
except IOError as exception:
bloom_filter = None
logger.warning((
f'Unable to open bloom database file: '
f'{self._bloom_database_path:s} with error: {exception!s}.'))
if cached:
self._bloom_filter_object = bloom_filter
return bloom_filter
def _QueryHash(self, digest, bloom_filter):
"""Queries BloomFilter for a specific hash in upper case.
Args:
digest (str): hash to look up.
bloom_filter (flor.BloomFilter): instanced bloom filter.
Returns:
bool: True if the hash was found, False if not.
"""
value_to_test = digest.upper().encode('utf-8')
return value_to_test in bloom_filter
[docs]
def SetBloomDatabasePath(self, bloom_database_path):
"""Set the path to the bloom file containing hash.
Args:
bloom_database_path (str): Path to the bloom file
"""
self._bloom_database_path = bloom_database_path
[docs]
def SetLabel(self, label):
"""Sets the tagging label.
Args:
label (str): label to apply to events extracted from files that are
present in the bloom database.
"""
self._label = label
[docs]
def TestLoading(self):
"""Checks if the bloom database exist and is valid.
Returns:
bool: True is the bloom database exist and is valid.
"""
return bool(self._GetBloomFilterObject(cached=False))
manager.AnalysisPluginManager.RegisterPlugin(BloomAnalysisPlugin)