"""Text parser plugin for Windows Firewall Log files."""
import pyparsing
from dfdatetime import time_elements as dfdatetime_time_elements
from plaso.containers import events
from plaso.lib import errors
from plaso.parsers import text_parser
from plaso.parsers.text_plugins import interface
[docs]
class WinFirewallEventData(events.EventData):
"""Windows Firewall event data.
Attributes:
action (str): action taken.
destination_ip (str): destination IP address.
destination_port (int): TCP or UDP destination port.
icmp_code (int): ICMP code.
icmp_type (int): ICMP type.
information (str): additional information.
last_written_time (dfdatetime.DateTimeValues): entry last written date and
time.
packet_size (int): packet size.
path (str): direction of the communication, which can be: SEND, RECEIVE,
FORWARD, and UNKNOWN.
protocol (str): IP protocol.
source_ip (str): source IP address.
source_port (int): TCP or UDP source port.
tcp_ack (int): TCP acknowledgement number.
tcp_flags (str): TCP flags.
tcp_sequence_number (int): TCP sequence number.
tcp_window_size (int): TCP window size.
"""
DATA_TYPE = "windows:firewall_log:entry"
[docs]
def __init__(self):
"""Initializes event data."""
super().__init__(data_type=self.DATA_TYPE)
self.action = None
self.destination_ip = None
self.destination_port = None
self.icmp_code = None
self.icmp_type = None
self.information = None
self.last_written_time = None
self.packet_size = None
self.path = None
self.protocol = None
self.source_ip = None
self.source_port = None
self.tcp_ack = None
self.tcp_flags = None
self.tcp_sequence_number = None
self.tcp_window_size = None
[docs]
class WinFirewallLogTextPlugin(interface.TextPlugin):
"""Text parser plugin for Windows Firewall Log files."""
NAME = "winfirewall"
DATA_FORMAT = "Windows Firewall log file"
# A Windows Firewall is encoded using the system codepage.
ENCODING = None
_TWO_DIGITS = pyparsing.Word(pyparsing.nums, exact=2).set_parse_action(
lambda tokens: int(tokens[0], 10)
)
_FOUR_DIGITS = pyparsing.Word(pyparsing.nums, exact=4).set_parse_action(
lambda tokens: int(tokens[0], 10)
)
_DATE = pyparsing.Group(
_FOUR_DIGITS
+ pyparsing.Suppress("-")
+ _TWO_DIGITS
+ pyparsing.Suppress("-")
+ _TWO_DIGITS
)
_TIME = pyparsing.Group(
_TWO_DIGITS
+ pyparsing.Suppress(":")
+ _TWO_DIGITS
+ pyparsing.Suppress(":")
+ _TWO_DIGITS
)
_ACTION = pyparsing.Word(pyparsing.alphanums + "-", min=2)
_WORD_OR_BLANK = pyparsing.Word(pyparsing.alphanums) | pyparsing.Suppress("-")
_IP_ADDRESS_OR_BLANK = (
pyparsing.pyparsing_common.ipv4_address
| pyparsing.pyparsing_common.ipv6_address
| pyparsing.Suppress("-")
)
_PORT_NUMBER_OR_BLANK = pyparsing.Word(pyparsing.nums, max=6).set_parse_action(
lambda tokens: int(tokens[0], 10)
) | pyparsing.Suppress("-")
_INTEGER_OR_BLANK = pyparsing.Word(pyparsing.nums).set_parse_action(
lambda tokens: int(tokens[0], 10)
) | pyparsing.Suppress("-")
_END_OF_LINE = pyparsing.Suppress(pyparsing.LineEnd())
_FIELDS_METADATA = pyparsing.Suppress(
"Fields: "
) + pyparsing.restOfLine().set_results_name("fields")
_TIME_FORMAT_METADATA = pyparsing.Suppress(
"Time Format: "
) + pyparsing.restOfLine().set_results_name("time_format")
_METADATA = _FIELDS_METADATA | _TIME_FORMAT_METADATA | pyparsing.restOfLine()
_COMMENT_LOG_LINE = pyparsing.Suppress("#") + _METADATA + _END_OF_LINE
# Version 1.5 fields:
# date time action protocol src-ip dst-ip src-port dst-port size tcpflags
# tcpsyn tcpack tcpwin icmptype icmpcode info path
_LOG_LINE_1_5 = (
_DATE.set_results_name("date")
+ _TIME.set_results_name("time")
+ _ACTION.set_results_name("action")
+ _WORD_OR_BLANK.set_results_name("protocol")
+ _IP_ADDRESS_OR_BLANK.set_results_name("source_ip")
+ _IP_ADDRESS_OR_BLANK.set_results_name("destination_ip")
+ _PORT_NUMBER_OR_BLANK.set_results_name("source_port")
+ _PORT_NUMBER_OR_BLANK.set_results_name("destination_port")
+ _INTEGER_OR_BLANK.set_results_name("packet_size")
+ _WORD_OR_BLANK.set_results_name("tcp_flags")
+ _INTEGER_OR_BLANK.set_results_name("tcp_sequence_number")
+ _INTEGER_OR_BLANK.set_results_name("tcp_ack")
+ _INTEGER_OR_BLANK.set_results_name("tcp_window_size")
+ _INTEGER_OR_BLANK.set_results_name("icmp_type")
+ _INTEGER_OR_BLANK.set_results_name("icmp_code")
+ _WORD_OR_BLANK.set_results_name("information")
+ _WORD_OR_BLANK.set_results_name("path")
+ _END_OF_LINE
)
# Common fields. Set results name with underscores, not hyphens because regex
# will not pick them up.
_LOG_LINE_STRUCTURES = {
"action": _ACTION.set_results_name("action"),
"date": _DATE.set_results_name("date"),
"dst-ip": _IP_ADDRESS_OR_BLANK.set_results_name("destination_ip"),
"dst-port": _PORT_NUMBER_OR_BLANK.set_results_name("destination_port"),
"icmpcode": _INTEGER_OR_BLANK.set_results_name("icmp_code"),
"icmptype": _INTEGER_OR_BLANK.set_results_name("icmp_type"),
"info": _WORD_OR_BLANK.set_results_name("information"),
"path": _WORD_OR_BLANK.set_results_name("path"),
"protocol": _WORD_OR_BLANK.set_results_name("protocol"),
"size": _INTEGER_OR_BLANK.set_results_name("packet_size"),
"src-ip": _IP_ADDRESS_OR_BLANK.set_results_name("source_ip"),
"src-port": _PORT_NUMBER_OR_BLANK.set_results_name("source_port"),
"tcpack": _INTEGER_OR_BLANK.set_results_name("tcp_ack"),
"tcpflags": _WORD_OR_BLANK.set_results_name("tcp_flags"),
"tcpsyn": _INTEGER_OR_BLANK.set_results_name("tcp_sequence_number"),
"tcpwin": _INTEGER_OR_BLANK.set_results_name("tcp_window_size"),
"time": _TIME.set_results_name("time"),
}
_HEADER_GRAMMAR = pyparsing.OneOrMore(_COMMENT_LOG_LINE)
_LINE_STRUCTURES = [("log_line", _LOG_LINE_1_5)]
VERIFICATION_GRAMMAR = (
pyparsing.ZeroOrMore(
pyparsing.Regex("#(Fields|Time Format|Version): .*") + _END_OF_LINE
)
+ pyparsing.Regex("#Software: Microsoft Windows Firewall")
+ _END_OF_LINE
)
VERIFICATION_LITERALS = ["#Software: Microsoft Windows Firewall "]
[docs]
def __init__(self):
"""Initializes a text parser plugin."""
super().__init__()
self._use_local_time = False
def _ParseFieldsMetadata(self, parser_mediator, fields):
"""Parses the fields metadata and updates the log line definition to match.
Args:
parser_mediator (ParserMediator): mediates interactions between parsers
and other components, such as storage and dfVFS.
fields (str): field definitions.
"""
log_line_structure = pyparsing.Empty()
for member in fields.split(" "):
if not member:
continue
field_structure = self._LOG_LINE_STRUCTURES.get(member)
if not field_structure:
field_structure = self._WORD_OR_BLANK
parser_mediator.ProduceExtractionWarning(
(
f"missing definition for field: {member:s} defaulting to "
f"WORD_OR_BLANK"
)
)
log_line_structure += field_structure
log_line_structure += self._END_OF_LINE
self._SetLineStructures([("log_line", log_line_structure)])
def _ParseHeader(self, parser_mediator, text_reader):
"""Parses a text-log file header.
Args:
parser_mediator (ParserMediator): mediates interactions between parsers
and other components, such as storage and dfVFS.
text_reader (EncodedTextReader): text reader.
Raises:
ParseError: when the header cannot be parsed.
"""
try:
structure_generator = self._HEADER_GRAMMAR.scan_string(
text_reader.lines, max_matches=1
)
structure, start, end = next(structure_generator)
except StopIteration:
structure = None
except pyparsing.ParseException as exception:
raise errors.ParseError(exception)
if not structure or start != 0:
raise errors.ParseError("No match found.")
fields = self._GetValueFromStructure(structure, "fields", default_value="")
fields = fields.strip()
if fields:
self._ParseFieldsMetadata(parser_mediator, fields)
time_format = self._GetValueFromStructure(
structure, "time_format", default_value=""
)
self._use_local_time = time_format.lower() == "local"
text_reader.SkipAhead(end)
def _ParseLogLine(self, parser_mediator, structure):
"""Parse a single log line.
Args:
parser_mediator (ParserMediator): mediates interactions between parsers
and other components, such as storage and dfVFS.
structure (pyparsing.ParseResults): tokens from a parsed log line.
"""
event_data = WinFirewallEventData()
event_data.action = self._GetValueFromStructure(structure, "action")
event_data.destination_ip = self._GetValueFromStructure(
structure, "destination_ip"
)
event_data.destination_port = self._GetValueFromStructure(
structure, "destination_port"
)
event_data.icmp_code = self._GetValueFromStructure(structure, "icmp_code")
event_data.icmp_type = self._GetValueFromStructure(structure, "icmp_type")
event_data.information = self._GetValueFromStructure(structure, "information")
event_data.last_written_time = self._ParseTimeElements(structure)
event_data.path = self._GetValueFromStructure(structure, "path")
event_data.protocol = self._GetValueFromStructure(structure, "protocol")
event_data.packet_size = self._GetValueFromStructure(structure, "packet_size")
event_data.source_ip = self._GetValueFromStructure(structure, "source_ip")
event_data.source_port = self._GetValueFromStructure(structure, "source_port")
event_data.tcp_ack = self._GetValueFromStructure(structure, "tcp_ack")
event_data.tcp_flags = self._GetValueFromStructure(structure, "tcp_flags")
event_data.tcp_sequence_number = self._GetValueFromStructure(
structure, "tcp_sequence_number"
)
event_data.tcp_window_size = self._GetValueFromStructure(
structure, "tcp_window_size"
)
parser_mediator.ProduceEventData(event_data)
def _ParseRecord(self, parser_mediator, key, structure):
"""Parses a pyparsing structure.
Args:
parser_mediator (ParserMediator): mediates interactions between parsers
and other components, such as storage and dfVFS.
key (str): name of the parsed structure.
structure (pyparsing.ParseResults): tokens from a parsed log line.
Raises:
ParseError: if the structure cannot be parsed.
"""
self._ParseLogLine(parser_mediator, structure)
def _ParseTimeElements(self, structure):
"""Parses date and time elements of a log line.
Args:
structure (pyparsing.ParseResults): tokens from a parsed log line.
Returns:
dfdatetime.TimeElements: date and time value.
Raises:
ParseError: if a valid date and time value cannot be derived from
the time elements.
"""
try:
date_elements_structure = self._GetValueFromStructure(structure, "date")
time_elements_structure = self._GetValueFromStructure(structure, "time")
year, month, day_of_month = date_elements_structure
hours, minutes, seconds = time_elements_structure
time_elements_tuple = (year, month, day_of_month, hours, minutes, seconds)
date_time = dfdatetime_time_elements.TimeElements(
time_elements_tuple=time_elements_tuple
)
date_time.is_local_time = self._use_local_time
return date_time
except (IndexError, TypeError, ValueError) as exception:
raise errors.ParseError(
f"Unable to parse time elements with error: {exception!s}"
)
def _ResetState(self):
"""Resets stored values."""
self._use_local_time = False
self._SetLineStructures(self._LINE_STRUCTURES)
[docs]
def CheckRequiredFormat(self, parser_mediator, text_reader):
"""Check if the log record has the minimal structure required by the plugin.
Args:
parser_mediator (ParserMediator): mediates interactions between parsers
and other components, such as storage and dfVFS.
text_reader (EncodedTextReader): text reader.
Returns:
bool: True if this is the correct plugin, False otherwise.
"""
try:
self._VerifyString(text_reader.lines)
except errors.ParseError:
return False
self._ResetState()
return True
text_parser.TextLogParser.RegisterPlugin(WinFirewallLogTextPlugin)