# -*- coding: utf-8 -*-
"""Analysis plugin to look up files in VirusTotal and tag events.
Also see:
https://docs.virustotal.com/reference/overview
"""
from plaso.analysis import hash_tagging
from plaso.analysis import logger
from plaso.analysis import manager
from plaso.lib import errors
[docs]
class VirusTotalAnalysisPlugin(hash_tagging.HashTaggingAnalysisPlugin):
"""An analysis plugin for looking up hashes in VirusTotal."""
# TODO: Check if there are other file types worth checking VirusTotal for.
DATA_TYPES = frozenset(['pe', 'pe:compilation:compilation_time'])
NAME = 'virustotal'
SUPPORTED_HASHES = frozenset(['md5', 'sha1', 'sha256'])
_EICAR_SHA256 = (
'275a021bbfb6489e54d471899f7db9d1663fc695ec2fe2a2c4538aabf651fd0f')
_VIRUSTOTAL_API_REPORT_URL = (
'https://www.virustotal.com/vtapi/v2/file/report')
_VIRUSTOTAL_RESPONSE_CODE_NOT_PRESENT = 0
_VIRUSTOTAL_RESPONSE_CODE_PRESENT = 1
_VIRUSTOTAL_RESPONSE_CODE_ANALYSIS_PENDING = -2
[docs]
def __init__(self):
"""Initializes a VirusTotal analysis plugin."""
super(VirusTotalAnalysisPlugin, self).__init__()
self._api_key = None
def _Analyze(self, hashes):
"""Looks up hashes in VirusTotal using the VirusTotal HTTP API.
Args:
hashes (list[str]): hashes to look up.
Returns:
list[HashAnalysis]: analysis results.
Raises:
RuntimeError: If the VirusTotal API key has not been set.
"""
if not self._api_key:
raise RuntimeError('No API key specified for VirusTotal lookup.')
hash_analyses = []
json_response = self._QueryHashes(hashes) or []
# VirusTotal returns a dictionary when a single hash is queried
# and a list when multiple hashes are queried.
if isinstance(json_response, dict):
json_response = [json_response]
for result in json_response:
resource = result['resource']
hash_analysis = hash_tagging.HashAnalysis(resource, result)
hash_analyses.append(hash_analysis)
return hash_analyses
def _GenerateLabels(self, hash_information):
"""Generates a list of strings that will be used in the event tag.
Args:
hash_information (dict[str, object]): the JSON decoded contents of the
result of a VirusTotal lookup, as produced by the VirusTotalAnalyzer.
Returns:
list[str]: strings describing the results from VirusTotal.
"""
response_code = hash_information['response_code']
if response_code == self._VIRUSTOTAL_RESPONSE_CODE_NOT_PRESENT:
return ['virustotal_not_present']
if response_code == self._VIRUSTOTAL_RESPONSE_CODE_PRESENT:
positives = hash_information['positives']
if positives > 0:
return [f'virustotal_detections_{positives:d}']
return ['virustotal_no_detections']
if response_code == self._VIRUSTOTAL_RESPONSE_CODE_ANALYSIS_PENDING:
return ['virustotal_analysis_pending']
logger.error(f'VirusTotal returned unknown response code {response_code!s}')
return [f'virustotal_unknown_response_code_{response_code:d}']
def _QueryHashes(self, hashes):
"""Queries VirusTotal for a specific hashes.
Args:
hashes (list[str]): hashes to look up.
Returns:
dict[str, object]: JSON response or None on error.
"""
url_parameters = {'apikey': self._api_key, 'resource': ', '.join(hashes)}
try:
json_response = self._MakeRequestAndDecodeJSON(
self._VIRUSTOTAL_API_REPORT_URL, 'GET', params=url_parameters)
except errors.ConnectionError as exception:
json_response = None
logger.error(f'Unable to query VirusTotal with error: {exception!s}.')
return json_response
[docs]
def EnableFreeAPIKeyRateLimit(self):
"""Configures Rate limiting for queries to VirusTotal.
The default rate limit for free VirusTotal API keys is 4 requests per
minute.
"""
self._hashes_per_batch = 4
self._wait_after_analysis = 60.0
[docs]
def SetAPIKey(self, api_key):
"""Sets the VirusTotal API key to use in queries.
Args:
api_key (str): VirusTotal API key
"""
self._api_key = api_key
[docs]
def TestConnection(self):
"""Tests the connection to VirusTotal.
Returns:
bool: True if VirusTotal is reachable.
"""
json_response = self._QueryHashes([self._EICAR_SHA256])
return json_response is not None
manager.AnalysisPluginManager.RegisterPlugin(VirusTotalAnalysisPlugin)