"""Parser for Windows Defender scan DetectionHistory files."""
import os
from dfdatetime import filetime as dfdatetime_filetime
from dtfabric.runtime import data_maps as dtfabric_data_maps
from plaso.containers import events
from plaso.lib import dtfabric_helper
from plaso.lib import specification
from plaso.parsers import interface
from plaso.parsers import manager
[docs]
class WindowsDefenderHistoryEventData(events.EventData):
"""Windows Defender scan DetectionHistory event data.
Attributes:
additional_filenames (list[str]): locations of additional detected files.
container_filenames (list[str]): location of files detected inside a
container.
filename (str): name of the file that the threat was detected in.
host_and_user (str): name of the host and user in "DOMAIN\\USER" format.
process (str): name of the process that caused the detection.
recorded_time (dfdatetime.DateTimeValues): date and time the log entry
was recorded.
sha256 (str): SHA-256 hash of the file.
threat_name (str): name of the threat that was detected.
web_filenames (list[str]): URI of files detected as downloaded from the web.
"""
DATA_TYPE = "av:defender:detection_history"
[docs]
def __init__(self):
"""Initializes event data."""
super().__init__(data_type=self.DATA_TYPE)
self.additional_filenames = None
self.container_filenames = None
self.filename = None
self.host_and_user = None
self.recorded_time = None
self.process = None
self.sha256 = None
self.threat_name = None
self.web_filenames = None
[docs]
class WinDefenderHistoryParser(
interface.FileObjectParser, dtfabric_helper.DtFabricHelper
):
"""Parses a Windows Defender scan DetectionHistory file."""
NAME = "windefender_history"
DATA_FORMAT = "Windows Defender scan DetectionHistory file"
_FILE_SIGNATURE = "Magic.Version:1.2"
_VALUE_DESCRIPTIONS = [
{0: "Threat identifier", 1: "Identifier"},
{0: "UnknownMagic1", 1: "Threat name", 4: "Category"},
{
0: "UnknownMagic2",
1: "Resource type",
2: "Resource location",
4: "Threat tracking data size",
5: "Threat tracking data",
6: "Last threat status change time",
12: "Domain user1",
14: "Process name",
18: "Initial detection time",
20: "Remediation time",
24: "Domain user2",
},
]
_DEFINITION_FILE = os.path.join(
os.path.dirname(__file__), "windefender_history.yaml"
)
_VALUE_DATA_TYPE_BINARY_DATA = 0x00000028
_VALUE_DATA_TYPE_FILETIME = 0x0000000A
_VALUE_DATA_TYPE_GUID = 0x0000001E
_VALUE_DATA_TYPE_STRING = 0x00000015
_VALUE_DATA_TYPES_INTEGER = frozenset(
[0x00000000, 0x00000005, 0x00000006, 0x00000008]
)
[docs]
@classmethod
def GetFormatSpecification(cls):
"""Retrieves the format specification.
Returns:
FormatSpecification: format specification.
"""
format_specification = specification.FormatSpecification(cls.NAME)
format_specification.AddNewSignature(
cls._FILE_SIGNATURE.encode("utf-16-le"), offset=0x30
)
return format_specification
def _ReadThreatTrackingData(self, threat_tracking_data, file_offset):
"""Reads the threat tracking data.
Args:
threat_tracking_data (bytes): threat tracking data.
file_offset (int): offset of the threat tracking data relative to
the start of the file.
Returns:
dict[str, str]: Mapping of threat tracking keys to values.
Raises:
OSError: if the threat tracking data cannot be read.
"""
threat_tracking = {}
data_type_map = self._GetDataTypeMap("uint32le")
values_data_size = self._ReadStructureFromByteStream(
threat_tracking_data, 0, data_type_map
)
if values_data_size != 1:
values_data_offset = 4
values_data_end_offset = values_data_size
else:
header = self._ReadThreatTrackingHeader(threat_tracking_data)
values_data_offset = header.header_size + 4
values_data_end_offset = header.total_data_size
while values_data_offset < values_data_end_offset:
threat_value, data_size = self._ReadThreatTrackingValue(
threat_tracking_data[values_data_offset:],
file_offset + values_data_offset,
)
if hasattr(threat_value, "value_string"):
threat_tracking[threat_value.key_string] = threat_value.value_string
if hasattr(threat_value, "value_integer"):
threat_tracking[threat_value.key_string] = threat_value.value_integer
values_data_offset += data_size
return threat_tracking
def _ReadThreatTrackingHeader(self, threat_tracking_data):
"""Reads the threat tracking header.
Args:
threat_tracking_data (bytes): threat tracking data.
Returns:
threat_tracking_header: threat tracking header.
Raises:
OSError: if the threat tracking header cannot be read.
"""
data_type_map = self._GetDataTypeMap("threat_tracking_header")
return self._ReadStructureFromByteStream(threat_tracking_data, 0, data_type_map)
def _ReadThreatTrackingValue(self, threat_tracking_data, file_offset):
"""Reads the threat tracking value.
Args:
threat_tracking_data (bytes): threat tracking data.
file_offset (int): offset of the threat tracking data relative to
the start of the file.
Returns:
tuple[threat_tracking_value, int]: threat tracking value and
data size.
Raises:
OSError: if the threat tracking value cannot be read.
"""
data_type_map = self._GetDataTypeMap("threat_tracking_value")
context = dtfabric_data_maps.DataTypeMapContext()
threat_tracking_value = self._ReadStructureFromByteStream(
threat_tracking_data, file_offset, data_type_map, context=context
)
return threat_tracking_value, context.byte_size
def _ReadValue(self, file_object, file_offset, parser_mediator):
"""Reads the value.
Args:
file_object (file): file-like object.
file_offset (int): offset of the value relative to the start of the file.
parser_mediator (ParserMediator): mediates interactions between parsers
and other components, such as storage and dfVFS.
Returns:
object: value.
Raises:
OSError: if the value cannot be read.
ParseError: when the value data type is unknown.
"""
data_type_map = self._GetDataTypeMap("detection_history_value")
value, _ = self._ReadStructureFromFileObject(
file_object, file_offset, data_type_map
)
value_object = None
if value.data_type in self._VALUE_DATA_TYPES_INTEGER:
value_object = value.value_integer
elif value.data_type == self._VALUE_DATA_TYPE_FILETIME:
value_object = value.value_filetime
elif value.data_type == self._VALUE_DATA_TYPE_STRING:
value_object = value.value_string
elif value.data_type == self._VALUE_DATA_TYPE_GUID:
value_object = value.value_guid
elif value.data_type == self._VALUE_DATA_TYPE_BINARY_DATA:
value_object = value.data
else:
parser_mediator.ProduceExtractionWarning(
f"unknown value data type: {value.data_type!s}"
)
value_object = value.data
return value_object
[docs]
def ParseFileObject(self, parser_mediator, file_object):
"""Parses a Windows Defender History file-like object.
Args:
parser_mediator (ParserMediator): mediates interactions between
parsers and other components, such as storage and dfVFS.
file_object (dfvfs.FileIO): file-like object.
Raises:
WrongParser: when the file cannot be parsed.
"""
value_tuples = []
file_offset = 0
file_size = file_object.get_size()
while file_offset < file_size:
value_object = self._ReadValue(file_object, file_offset, parser_mediator)
value_tuples.append((file_offset, value_object))
file_offset = file_object.tell()
resource_name = ""
threat_attributes = {"Resources": []}
value_index_set = 0
value_index = 0
for file_offset, value_object in value_tuples:
if value_object == self._FILE_SIGNATURE:
if value_index_set < 2:
value_index_set += 1
value_index = 0
if (value_index_set, value_index) == (2, 5):
threat_attributes.update(
self._ReadThreatTrackingData(value_object, file_offset + 8)
)
else:
description = self._VALUE_DESCRIPTIONS[value_index_set].get(
value_index, f"UNKNOWN_{value_index_set:d}_{value_index:d}"
)
value_string = f"{value_object!s}"
if description == "Resource type":
resource_name = value_string
elif description == "Resource location" and resource_name is not None:
threat_attributes["Resources"].append(
{"Type": resource_name, "Location": value_string}
)
resource_name = None
else:
threat_attributes[description] = value_string
value_index += 1
filenames = [
threat_attribute["Location"]
for threat_attribute in threat_attributes["Resources"]
if threat_attribute["Type"] == "file"
]
if not filenames:
filename = threat_attributes.get("CONTEXT_DATA_FILENAME")
process_ppid = threat_attributes.get("CONTEXT_DATA_PROCESS_PPID")
filenames = [",".join(list(filter(None, [filename, process_ppid])))]
web_files = [
threat_attribute["Location"]
for threat_attribute in threat_attributes["Resources"]
if threat_attribute["Type"] == "webfile"
]
container_files = [
threat_attribute["Location"]
for threat_attribute in threat_attributes["Resources"]
if threat_attribute["Type"] == "containerfile"
]
additional_filenames = [
threat_attribute["Location"]
for threat_attribute in threat_attributes["Resources"]
if "file" not in threat_attribute["Type"]
]
timestamp = threat_attributes.get("ThreatTrackingStartTime", 0)
event_data = WindowsDefenderHistoryEventData()
event_data.additional_filenames = additional_filenames
event_data.container_filenames = container_files
event_data.filename = filenames[0]
event_data.host_and_user = threat_attributes.get("Domain user1")
event_data.process = threat_attributes.get("Process name")
event_data.recorded_time = dfdatetime_filetime.Filetime(timestamp=timestamp)
event_data.sha256 = threat_attributes.get("ThreatTrackingSha256")
event_data.threat_name = threat_attributes.get("Threat name")
event_data.web_filenames = web_files
parser_mediator.ProduceEventData(event_data)
manager.ParsersManager.RegisterParser(WinDefenderHistoryParser)