"""Analysis plugin to look up file hashes in nsrlsvr and tag events."""
import socket
from plaso.analysis import hash_tagging
from plaso.analysis import logger
from plaso.analysis import manager
[docs]
class NsrlsvrAnalysisPlugin(hash_tagging.HashTaggingAnalysisPlugin):
"""Analysis plugin for looking up hashes in nsrlsvr."""
# The NSRL contains files of all different types, and can handle a high load
# so look up all files.
DATA_TYPES = frozenset(["fs:stat", "fs:stat:ntfs"])
NAME = "nsrlsvr"
SUPPORTED_HASHES = frozenset(["md5", "sha1"])
DEFAULT_LABEL = "nsrl_present"
_RECEIVE_BUFFER_SIZE = 4096
_SOCKET_TIMEOUT = 3
[docs]
def __init__(self):
"""Initializes an nsrlsvr analysis plugin."""
super().__init__()
self._host = None
self._label = self.DEFAULT_LABEL
self._port = None
def _Analyze(self, hashes):
"""Looks up file hashes in nsrlsvr.
Args:
hashes (list[str]): hash values to look up.
Returns:
list[HashAnalysis]: analysis results, or an empty list on error.
"""
logger.debug(f"Opening connection to {self._host:s}:{self._port:d}")
nsrl_socket = self._GetSocket()
if not nsrl_socket:
self.SignalAbort()
return []
hash_analyses = []
for digest in hashes:
response = self._QueryHash(nsrl_socket, digest)
if response is not None:
hash_analysis = hash_tagging.HashAnalysis(digest, response)
hash_analyses.append(hash_analysis)
nsrl_socket.close()
logger.debug(f"Closed connection to {self._host:s}:{self._port:d}")
return hash_analyses
def _GenerateLabels(self, hash_information):
"""Generates a list of strings that will be used in the event tag.
Args:
hash_information (bool): response from the hash tagging that indicates
that the file hash was present or not.
Returns:
list[str]: list of labels to apply to event.
"""
if hash_information:
return [self._label]
# TODO: Renable when tagging is removed from the analysis report.
# return ['nsrl_not_present']
return []
def _GetSocket(self):
"""Establishes a connection to an nsrlsvr instance.
Returns:
socket._socketobject: socket connected to an nsrlsvr instance or None if
a connection cannot be established.
"""
try:
connected_socket = socket.create_connection(
(self._host, self._port), self._SOCKET_TIMEOUT
)
except socket.error as exception:
connected_socket = None
logger.error(f"Unable to connect to nsrlsvr with error: {exception!s}.")
return connected_socket
def _QueryHash(self, nsrl_socket, digest):
"""Queries nsrlsvr for a specific hash.
Args:
nsrl_socket (socket._socketobject): socket of connection to nsrlsvr.
digest (str): hash to look up.
Returns:
bool: True if the hash was found, False if not or None on error.
"""
try:
query = f"QUERY {digest:s}\n".encode("ascii")
except UnicodeDecodeError:
logger.error(f"Unable to encode digest: {digest!s} to ASCII.")
return False
response = None
try:
nsrl_socket.sendall(query)
response = nsrl_socket.recv(self._RECEIVE_BUFFER_SIZE)
except socket.error as exception:
logger.error(f"Unable to query nsrlsvr with error: {exception!s}.")
if not response:
return False
# Strip end-of-line characters since they can differ per platform on which
# nsrlsvr is running.
response = response.strip()
# nsrlsvr returns "OK 1" if the has was found or "OK 0" if not.
return response == b"OK 1"
[docs]
def SetLabel(self, label):
"""Sets the tagging label.
Args:
label (str): label to apply to events extracted from files that are
present in nsrlsvr.
"""
self._label = label
[docs]
def SetHost(self, host):
"""Sets the address or hostname of the server running nsrlsvr.
Args:
host (str): IP address or hostname to query.
"""
self._host = host
[docs]
def SetPort(self, port):
"""Sets the port where nsrlsvr is listening.
Args:
port (int): port to query.
"""
self._port = port
[docs]
def TestConnection(self):
"""Tests the connection to nsrlsvr.
Checks if a connection can be set up and queries the server for the
MD5 of an empty file and expects a response. The value of the response
is not checked.
Returns:
bool: True if nsrlsvr instance is reachable.
"""
response = None
nsrl_socket = self._GetSocket()
if nsrl_socket:
response = self._QueryHash(nsrl_socket, "d41d8cd98f00b204e9800998ecf8427e")
nsrl_socket.close()
return response is not None
manager.AnalysisPluginManager.RegisterPlugin(NsrlsvrAnalysisPlugin)