Source code for plaso.parsers.text_plugins.winfirewall

"""Text parser plugin for Windows Firewall Log files."""

import pyparsing

from dfdatetime import time_elements as dfdatetime_time_elements

from plaso.containers import events
from plaso.lib import errors
from plaso.parsers import text_parser
from plaso.parsers.text_plugins import interface


[docs] class WinFirewallEventData(events.EventData): """Windows Firewall event data. Attributes: action (str): action taken. destination_ip (str): destination IP address. destination_port (int): TCP or UDP destination port. icmp_code (int): ICMP code. icmp_type (int): ICMP type. information (str): additional information. last_written_time (dfdatetime.DateTimeValues): entry last written date and time. packet_size (int): packet size. path (str): direction of the communication, which can be: SEND, RECEIVE, FORWARD, and UNKNOWN. protocol (str): IP protocol. source_ip (str): source IP address. source_port (int): TCP or UDP source port. tcp_ack (int): TCP acknowledgement number. tcp_flags (str): TCP flags. tcp_sequence_number (int): TCP sequence number. tcp_window_size (int): TCP window size. """ DATA_TYPE = "windows:firewall_log:entry"
[docs] def __init__(self): """Initializes event data.""" super().__init__(data_type=self.DATA_TYPE) self.action = None self.destination_ip = None self.destination_port = None self.icmp_code = None self.icmp_type = None self.information = None self.last_written_time = None self.packet_size = None self.path = None self.protocol = None self.source_ip = None self.source_port = None self.tcp_ack = None self.tcp_flags = None self.tcp_sequence_number = None self.tcp_window_size = None
[docs] class WinFirewallLogTextPlugin(interface.TextPlugin): """Text parser plugin for Windows Firewall Log files.""" NAME = "winfirewall" DATA_FORMAT = "Windows Firewall log file" # A Windows Firewall is encoded using the system codepage. ENCODING = None _TWO_DIGITS = pyparsing.Word(pyparsing.nums, exact=2).set_parse_action( lambda tokens: int(tokens[0], 10) ) _FOUR_DIGITS = pyparsing.Word(pyparsing.nums, exact=4).set_parse_action( lambda tokens: int(tokens[0], 10) ) _DATE = pyparsing.Group( _FOUR_DIGITS + pyparsing.Suppress("-") + _TWO_DIGITS + pyparsing.Suppress("-") + _TWO_DIGITS ) _TIME = pyparsing.Group( _TWO_DIGITS + pyparsing.Suppress(":") + _TWO_DIGITS + pyparsing.Suppress(":") + _TWO_DIGITS ) _ACTION = pyparsing.Word(pyparsing.alphanums + "-", min=2) _WORD_OR_BLANK = pyparsing.Word(pyparsing.alphanums) | pyparsing.Suppress("-") _IP_ADDRESS_OR_BLANK = ( pyparsing.pyparsing_common.ipv4_address | pyparsing.pyparsing_common.ipv6_address | pyparsing.Suppress("-") ) _PORT_NUMBER_OR_BLANK = pyparsing.Word(pyparsing.nums, max=6).set_parse_action( lambda tokens: int(tokens[0], 10) ) | pyparsing.Suppress("-") _INTEGER_OR_BLANK = pyparsing.Word(pyparsing.nums).set_parse_action( lambda tokens: int(tokens[0], 10) ) | pyparsing.Suppress("-") _END_OF_LINE = pyparsing.Suppress(pyparsing.LineEnd()) _FIELDS_METADATA = pyparsing.Suppress( "Fields: " ) + pyparsing.restOfLine().set_results_name("fields") _TIME_FORMAT_METADATA = pyparsing.Suppress( "Time Format: " ) + pyparsing.restOfLine().set_results_name("time_format") _METADATA = _FIELDS_METADATA | _TIME_FORMAT_METADATA | pyparsing.restOfLine() _COMMENT_LOG_LINE = pyparsing.Suppress("#") + _METADATA + _END_OF_LINE # Version 1.5 fields: # date time action protocol src-ip dst-ip src-port dst-port size tcpflags # tcpsyn tcpack tcpwin icmptype icmpcode info path _LOG_LINE_1_5 = ( _DATE.set_results_name("date") + _TIME.set_results_name("time") + _ACTION.set_results_name("action") + _WORD_OR_BLANK.set_results_name("protocol") + _IP_ADDRESS_OR_BLANK.set_results_name("source_ip") + _IP_ADDRESS_OR_BLANK.set_results_name("destination_ip") + _PORT_NUMBER_OR_BLANK.set_results_name("source_port") + _PORT_NUMBER_OR_BLANK.set_results_name("destination_port") + _INTEGER_OR_BLANK.set_results_name("packet_size") + _WORD_OR_BLANK.set_results_name("tcp_flags") + _INTEGER_OR_BLANK.set_results_name("tcp_sequence_number") + _INTEGER_OR_BLANK.set_results_name("tcp_ack") + _INTEGER_OR_BLANK.set_results_name("tcp_window_size") + _INTEGER_OR_BLANK.set_results_name("icmp_type") + _INTEGER_OR_BLANK.set_results_name("icmp_code") + _WORD_OR_BLANK.set_results_name("information") + _WORD_OR_BLANK.set_results_name("path") + _END_OF_LINE ) # Common fields. Set results name with underscores, not hyphens because regex # will not pick them up. _LOG_LINE_STRUCTURES = { "action": _ACTION.set_results_name("action"), "date": _DATE.set_results_name("date"), "dst-ip": _IP_ADDRESS_OR_BLANK.set_results_name("destination_ip"), "dst-port": _PORT_NUMBER_OR_BLANK.set_results_name("destination_port"), "icmpcode": _INTEGER_OR_BLANK.set_results_name("icmp_code"), "icmptype": _INTEGER_OR_BLANK.set_results_name("icmp_type"), "info": _WORD_OR_BLANK.set_results_name("information"), "path": _WORD_OR_BLANK.set_results_name("path"), "protocol": _WORD_OR_BLANK.set_results_name("protocol"), "size": _INTEGER_OR_BLANK.set_results_name("packet_size"), "src-ip": _IP_ADDRESS_OR_BLANK.set_results_name("source_ip"), "src-port": _PORT_NUMBER_OR_BLANK.set_results_name("source_port"), "tcpack": _INTEGER_OR_BLANK.set_results_name("tcp_ack"), "tcpflags": _WORD_OR_BLANK.set_results_name("tcp_flags"), "tcpsyn": _INTEGER_OR_BLANK.set_results_name("tcp_sequence_number"), "tcpwin": _INTEGER_OR_BLANK.set_results_name("tcp_window_size"), "time": _TIME.set_results_name("time"), } _HEADER_GRAMMAR = pyparsing.OneOrMore(_COMMENT_LOG_LINE) _LINE_STRUCTURES = [("log_line", _LOG_LINE_1_5)] VERIFICATION_GRAMMAR = ( pyparsing.ZeroOrMore( pyparsing.Regex("#(Fields|Time Format|Version): .*") + _END_OF_LINE ) + pyparsing.Regex("#Software: Microsoft Windows Firewall") + _END_OF_LINE ) VERIFICATION_LITERALS = ["#Software: Microsoft Windows Firewall "]
[docs] def __init__(self): """Initializes a text parser plugin.""" super().__init__() self._use_local_time = False
def _ParseFieldsMetadata(self, parser_mediator, fields): """Parses the fields metadata and updates the log line definition to match. Args: parser_mediator (ParserMediator): mediates interactions between parsers and other components, such as storage and dfVFS. fields (str): field definitions. """ log_line_structure = pyparsing.Empty() for member in fields.split(" "): if not member: continue field_structure = self._LOG_LINE_STRUCTURES.get(member) if not field_structure: field_structure = self._WORD_OR_BLANK parser_mediator.ProduceExtractionWarning( ( f"missing definition for field: {member:s} defaulting to " f"WORD_OR_BLANK" ) ) log_line_structure += field_structure log_line_structure += self._END_OF_LINE self._SetLineStructures([("log_line", log_line_structure)]) def _ParseHeader(self, parser_mediator, text_reader): """Parses a text-log file header. Args: parser_mediator (ParserMediator): mediates interactions between parsers and other components, such as storage and dfVFS. text_reader (EncodedTextReader): text reader. Raises: ParseError: when the header cannot be parsed. """ try: structure_generator = self._HEADER_GRAMMAR.scan_string( text_reader.lines, max_matches=1 ) structure, start, end = next(structure_generator) except StopIteration: structure = None except pyparsing.ParseException as exception: raise errors.ParseError(exception) if not structure or start != 0: raise errors.ParseError("No match found.") fields = self._GetValueFromStructure(structure, "fields", default_value="") fields = fields.strip() if fields: self._ParseFieldsMetadata(parser_mediator, fields) time_format = self._GetValueFromStructure( structure, "time_format", default_value="" ) self._use_local_time = time_format.lower() == "local" text_reader.SkipAhead(end) def _ParseLogLine(self, parser_mediator, structure): """Parse a single log line. Args: parser_mediator (ParserMediator): mediates interactions between parsers and other components, such as storage and dfVFS. structure (pyparsing.ParseResults): tokens from a parsed log line. """ event_data = WinFirewallEventData() event_data.action = self._GetValueFromStructure(structure, "action") event_data.destination_ip = self._GetValueFromStructure( structure, "destination_ip" ) event_data.destination_port = self._GetValueFromStructure( structure, "destination_port" ) event_data.icmp_code = self._GetValueFromStructure(structure, "icmp_code") event_data.icmp_type = self._GetValueFromStructure(structure, "icmp_type") event_data.information = self._GetValueFromStructure(structure, "information") event_data.last_written_time = self._ParseTimeElements(structure) event_data.path = self._GetValueFromStructure(structure, "path") event_data.protocol = self._GetValueFromStructure(structure, "protocol") event_data.packet_size = self._GetValueFromStructure(structure, "packet_size") event_data.source_ip = self._GetValueFromStructure(structure, "source_ip") event_data.source_port = self._GetValueFromStructure(structure, "source_port") event_data.tcp_ack = self._GetValueFromStructure(structure, "tcp_ack") event_data.tcp_flags = self._GetValueFromStructure(structure, "tcp_flags") event_data.tcp_sequence_number = self._GetValueFromStructure( structure, "tcp_sequence_number" ) event_data.tcp_window_size = self._GetValueFromStructure( structure, "tcp_window_size" ) parser_mediator.ProduceEventData(event_data) def _ParseRecord(self, parser_mediator, key, structure): """Parses a pyparsing structure. Args: parser_mediator (ParserMediator): mediates interactions between parsers and other components, such as storage and dfVFS. key (str): name of the parsed structure. structure (pyparsing.ParseResults): tokens from a parsed log line. Raises: ParseError: if the structure cannot be parsed. """ self._ParseLogLine(parser_mediator, structure) def _ParseTimeElements(self, structure): """Parses date and time elements of a log line. Args: structure (pyparsing.ParseResults): tokens from a parsed log line. Returns: dfdatetime.TimeElements: date and time value. Raises: ParseError: if a valid date and time value cannot be derived from the time elements. """ try: date_elements_structure = self._GetValueFromStructure(structure, "date") time_elements_structure = self._GetValueFromStructure(structure, "time") year, month, day_of_month = date_elements_structure hours, minutes, seconds = time_elements_structure time_elements_tuple = (year, month, day_of_month, hours, minutes, seconds) date_time = dfdatetime_time_elements.TimeElements( time_elements_tuple=time_elements_tuple ) date_time.is_local_time = self._use_local_time return date_time except (IndexError, TypeError, ValueError) as exception: raise errors.ParseError( f"Unable to parse time elements with error: {exception!s}" ) def _ResetState(self): """Resets stored values.""" self._use_local_time = False self._SetLineStructures(self._LINE_STRUCTURES)
[docs] def CheckRequiredFormat(self, parser_mediator, text_reader): """Check if the log record has the minimal structure required by the plugin. Args: parser_mediator (ParserMediator): mediates interactions between parsers and other components, such as storage and dfVFS. text_reader (EncodedTextReader): text reader. Returns: bool: True if this is the correct plugin, False otherwise. """ try: self._VerifyString(text_reader.lines) except errors.ParseError: return False self._ResetState() return True
text_parser.TextLogParser.RegisterPlugin(WinFirewallLogTextPlugin)