Source code for plaso.analysis.virustotal

"""Analysis plugin to look up files in VirusTotal and tag events.

Also see:
  https://docs.virustotal.com/reference/overview
"""

from plaso.analysis import hash_tagging
from plaso.analysis import logger
from plaso.analysis import manager
from plaso.lib import errors


[docs] class VirusTotalAnalysisPlugin(hash_tagging.HashTaggingAnalysisPlugin): """An analysis plugin for looking up hashes in VirusTotal.""" # TODO: Check if there are other file types worth checking VirusTotal for. DATA_TYPES = frozenset(["pe", "pe:compilation:compilation_time"]) NAME = "virustotal" SUPPORTED_HASHES = frozenset(["md5", "sha1", "sha256"]) _EICAR_SHA256 = "275a021bbfb6489e54d471899f7db9d1663fc695ec2fe2a2c4538aabf651fd0f" _VIRUSTOTAL_API_REPORT_URL = "https://www.virustotal.com/vtapi/v2/file/report" _VIRUSTOTAL_RESPONSE_CODE_NOT_PRESENT = 0 _VIRUSTOTAL_RESPONSE_CODE_PRESENT = 1 _VIRUSTOTAL_RESPONSE_CODE_ANALYSIS_PENDING = -2
[docs] def __init__(self): """Initializes a VirusTotal analysis plugin.""" super().__init__() self._api_key = None
def _Analyze(self, hashes): """Looks up hashes in VirusTotal using the VirusTotal HTTP API. Args: hashes (list[str]): hashes to look up. Returns: list[HashAnalysis]: analysis results. Raises: RuntimeError: If the VirusTotal API key has not been set. """ if not self._api_key: raise RuntimeError("No API key specified for VirusTotal lookup.") hash_analyses = [] json_response = self._QueryHashes(hashes) or [] # VirusTotal returns a dictionary when a single hash is queried # and a list when multiple hashes are queried. if isinstance(json_response, dict): json_response = [json_response] for result in json_response: resource = result["resource"] hash_analysis = hash_tagging.HashAnalysis(resource, result) hash_analyses.append(hash_analysis) return hash_analyses def _GenerateLabels(self, hash_information): """Generates a list of strings that will be used in the event tag. Args: hash_information (dict[str, object]): the JSON decoded contents of the result of a VirusTotal lookup, as produced by the VirusTotalAnalyzer. Returns: list[str]: strings describing the results from VirusTotal. """ response_code = hash_information["response_code"] if response_code == self._VIRUSTOTAL_RESPONSE_CODE_NOT_PRESENT: return ["virustotal_not_present"] if response_code == self._VIRUSTOTAL_RESPONSE_CODE_PRESENT: positives = hash_information["positives"] if positives > 0: return [f"virustotal_detections_{positives:d}"] return ["virustotal_no_detections"] if response_code == self._VIRUSTOTAL_RESPONSE_CODE_ANALYSIS_PENDING: return ["virustotal_analysis_pending"] logger.error(f"VirusTotal returned unknown response code {response_code!s}") return [f"virustotal_unknown_response_code_{response_code:d}"] def _QueryHashes(self, hashes): """Queries VirusTotal for a specific hashes. Args: hashes (list[str]): hashes to look up. Returns: dict[str, object]: JSON response or None on error. """ url_parameters = {"apikey": self._api_key, "resource": ", ".join(hashes)} try: json_response = self._MakeRequestAndDecodeJSON( self._VIRUSTOTAL_API_REPORT_URL, "GET", params=url_parameters ) except errors.ConnectionError as exception: json_response = None logger.error(f"Unable to query VirusTotal with error: {exception!s}.") return json_response
[docs] def EnableFreeAPIKeyRateLimit(self): """Configures Rate limiting for queries to VirusTotal. The default rate limit for free VirusTotal API keys is 4 requests per minute. """ self._hashes_per_batch = 4 self._wait_after_analysis = 60.0
[docs] def SetAPIKey(self, api_key): """Sets the VirusTotal API key to use in queries. Args: api_key (str): VirusTotal API key """ self._api_key = api_key
[docs] def TestConnection(self): """Tests the connection to VirusTotal. Returns: bool: True if VirusTotal is reachable. """ json_response = self._QueryHashes([self._EICAR_SHA256]) return json_response is not None
manager.AnalysisPluginManager.RegisterPlugin(VirusTotalAnalysisPlugin)