Source code for plaso.filters.expression_parser

"""Event filter expression parser."""

import binascii
import codecs
import re

from dfdatetime import posix_time as dfdatetime_posix_time
from dfdatetime import time_elements as dfdatetime_time_elements

from plaso.containers import artifacts
from plaso.filters import expressions
from plaso.filters import logger
from plaso.lib import errors


[docs] class Token: """An event filter expression parser token. Attributes: actions (list[str]): list of method names in the EventFilterExpressionParser to call. next_state (str): next state we transition to if this Token matches. state (str): parser state within the token should be applied or None if the token should be applied regardless of the parser state. """ _ACTION_SEPARATOR = ","
[docs] def __init__(self, state, regex, actions, next_state): """Initializes an event filter expressions parser token. Args: state (str): parser state within the token should be applied or None if the token should be applied regardless of the parser state. regex (str): regular expression to try and match from the current point. actions (list[str]): list of method names in the EventFilterExpressionParser to call. next_state (str): next state we transition to if this Token matches. """ super().__init__() self._regex = re.compile(regex, re.DOTALL | re.I | re.M | re.S | re.U) self.actions = [] self.next_state = next_state self.state = state if actions: self.actions = actions.split(self._ACTION_SEPARATOR)
[docs] def CompareExpression(self, expression): """Compares the token against an expression string. Args: expression (str): expression string. Returns: re.Match: the regular expression match object if the expression string matches the token or None if no match. """ return self._regex.match(expression)
[docs] class EventFilterExpressionParser: """Event filter expression parser. Examples of valid syntax: size is 40 (name contains "Program Files" AND hash.md5 is "123abc") @imported_modules (num_symbols = 14 AND symbol.name is "FindWindow") """ _OPERATORS_WITH_NEGATION = frozenset(["contains", "equals", "inset", "is"]) _STATE_ARGUMENT = "ARGUMENT" _STATE_ATTRIBUTE = "ATTRIBUTE" _STATE_BINARY_OPERATOR = "BINARY" _STATE_DATETIME = "DATETIME" _STATE_NEGATION_OPERATOR = "CHECKNOT" _STATE_CONTINUE = "CONTINUE" _STATE_INITIAL = "INITIAL" _STATE_OPERATOR = "OPERATOR" _STATE_PATH = "PATH" _STATE_STRING_DOUBLE_QUOTE = "STRING_DOUBLE_QUOTE" _STATE_STRING_SINGLE_QUOTE = "STRING_SINGLE_QUOTE" _TOKENS = [ # Operators and related tokens Token(_STATE_INITIAL, r"[^\s\(\)]", "_PushState,_PushBack", _STATE_ATTRIBUTE), Token(_STATE_INITIAL, r"\(", "_PushState,_AddBracketOpen", None), Token(_STATE_INITIAL, r"\)", "_AddBracketClose", _STATE_BINARY_OPERATOR), # Double quoted string Token(_STATE_STRING_DOUBLE_QUOTE, '"', "_PopState,_StringFinish", None), Token(_STATE_STRING_DOUBLE_QUOTE, r"\\x(..)", "HexEscape", None), Token(_STATE_STRING_DOUBLE_QUOTE, r"\\(.)", "_StringEscape", None), Token(_STATE_STRING_DOUBLE_QUOTE, r'[^\\"]+', "_StringExpand", None), # Single quoted string Token(_STATE_STRING_SINGLE_QUOTE, "'", "_PopState,_StringFinish", None), Token(_STATE_STRING_SINGLE_QUOTE, r"\\x(..)", "HexEscape", None), Token(_STATE_STRING_SINGLE_QUOTE, r"\\(.)", "_StringEscape", None), Token(_STATE_STRING_SINGLE_QUOTE, r"[^\\\']+", "_StringExpand", None), # Date and time definition Token(_STATE_DATETIME, r"\)", "_PopState,_AddArgumentDateTime", None), Token(_STATE_DATETIME, r"(\d+)", "_SetDateTimeDecimalInteger", _STATE_DATETIME), Token( _STATE_DATETIME, '"', "_PushState,_StringStart", _STATE_STRING_DOUBLE_QUOTE ), Token( _STATE_DATETIME, "'", "_PushState,_StringStart", _STATE_STRING_SINGLE_QUOTE ), # Path definition Token(_STATE_PATH, r"\)", "_PopState,_AddArgumentPath", None), Token(_STATE_PATH, '"', "_PushState,_StringStart", _STATE_STRING_DOUBLE_QUOTE), Token(_STATE_PATH, "'", "_PushState,_StringStart", _STATE_STRING_SINGLE_QUOTE), # Basic expression Token(_STATE_ATTRIBUTE, r"[\w._0-9]+", "_SetAttribute", _STATE_OPERATOR), Token(_STATE_OPERATOR, "not ", "_NegateExpression", None), Token( _STATE_OPERATOR, r"(\w+|[<>!=]=?)", "_SetOperator", _STATE_NEGATION_OPERATOR ), Token(_STATE_NEGATION_OPERATOR, "not", "_NegateExpression", _STATE_ARGUMENT), Token(_STATE_NEGATION_OPERATOR, r"\s+", None, None), Token(_STATE_NEGATION_OPERATOR, r"([^not])", "_PushBack", _STATE_ARGUMENT), Token( _STATE_ARGUMENT, r"(\d+\.\d+)", "_AddArgumentFloatingPoint", _STATE_ARGUMENT ), Token( _STATE_ARGUMENT, r"(0x\d+)", "_AddArgumentHexadecimalInteger", _STATE_ARGUMENT, ), Token(_STATE_ARGUMENT, r"(\d+)", "_AddArgumentDecimalInteger", _STATE_ARGUMENT), Token( _STATE_ARGUMENT, '"', "_PushState,_StringStart", _STATE_STRING_DOUBLE_QUOTE ), Token( _STATE_ARGUMENT, "'", "_PushState,_StringStart", _STATE_STRING_SINGLE_QUOTE ), Token(_STATE_ARGUMENT, r"DATETIME\(", "_PushState", _STATE_DATETIME), Token(_STATE_ARGUMENT, r"PATH\(", "_PushState", _STATE_PATH), # When the last parameter from arg_list has been pushed # State where binary operators are supported (AND, OR) Token( _STATE_BINARY_OPERATOR, r"(?i)(and|or|\&\&|\|\|)", "_AddBinaryOperator", _STATE_INITIAL, ), # - We can also skip spaces Token(_STATE_BINARY_OPERATOR, r"\s+", None, None), # - But if it's not "and" or just spaces we have to go back Token(_STATE_BINARY_OPERATOR, ".", "_PushBack,_PopState", None), # Skip whitespace. Token(None, r"\s+", None, None), ]
[docs] def __init__(self): """Initializes an event filter expression parser.""" super().__init__() self._buffer = "" self._datetime_value = None self._current_expression = None self._error = 0 self._flags = 0 self._have_negate_keyword = False self._processed_buffer = "" self._stack = [] self._state = self._STATE_INITIAL self._state_stack = [] self._string = None
# The parser token callback methods use a specific function interface. # pylint: disable=redundant-returns-doc,useless-return def _AddArgument(self, value): """Adds an argument to the current expression. Args: value (object): argument value. Returns: str: state or None if the argument could not be added to the current expression. Raises: ParseError: if the operator does not support negation. """ logger.debug(f"Storing argument: {value!s}") if self._have_negate_keyword: operator = self._current_expression.operator if operator and operator.lower() not in self._OPERATORS_WITH_NEGATION: raise errors.ParseError( f"Operator: {operator:s} does not support negation (not)." ) # This expression is complete if self._current_expression.AddArgument(value): self._stack.append(self._current_expression) self._current_expression = expressions.EventExpression() # We go to the BINARY state, to find if there's an AND or OR operator return self._STATE_BINARY_OPERATOR return None def _AddArgumentDateTime(self, **unused_kwargs): """Adds a date and time argument to the current expression. Note that this function is used as a callback by _GetNextToken. Returns: str: state or None if the argument could not be added to the current expression. Raises: ParseError: if datetime value does not contain a valid POSIX timestamp in microseconds or ISO 8601 date and time string. """ if isinstance(self._datetime_value, int): date_time = dfdatetime_posix_time.PosixTimeInMicroseconds( timestamp=self._datetime_value ) elif isinstance(self._datetime_value, str): try: date_time = dfdatetime_time_elements.TimeElementsInMicroseconds() date_time.CopyFromStringISO8601(self._datetime_value) except ValueError: raise errors.ParseError( f"unsupported ISO 8601 string: {self._datetime_value:s}." ) else: raise errors.ParseError( f"unsupported datetime value: {self._datetime_value!s}." ) self._datetime_value = None return self._AddArgument(date_time) def _AddArgumentDecimalInteger(self, string="", **unused_kwargs): """Adds a decimal integer argument to the current expression. Note that this function is used as a callback by _GetNextToken. Args: string (Optional[str]): argument string that contains an integer value formatted in decimal. Returns: str: state or None if the argument could not be added to the current expression. Raises: ParseError: if string does not contain a valid integer. """ try: int_value = int(string) except (TypeError, ValueError): raise errors.ParseError(f"{string:s} is not a valid integer.") return self._AddArgument(int_value) def _AddArgumentFloatingPoint(self, string="", **unused_kwargs): """Adds a floating-point argument to the current expression. Note that this function is used as a callback by _GetNextToken. Args: string (Optional[str]): argument string that contains a floating-point value. Returns: str: state or None if the argument could not be added to the current expression. Raises: ParseError: if string does not contain a valid floating-point number. """ try: float_value = float(string) except (TypeError, ValueError): raise errors.ParseError(f"{string:s} is not a valid float.") return self._AddArgument(float_value) def _AddArgumentHexadecimalInteger(self, string="", **unused_kwargs): """Adds a hexadecimal integer argument to the current expression. Note that this function is used as a callback by _GetNextToken. Args: string (Optional[str]): argument string that contains an integer value formatted in hexadecimal. Returns: str: state or None if the argument could not be added to the current expression. Raises: ParseError: if string does not contain a valid base16 formatted integer. """ try: int_value = int(string, 16) except (TypeError, ValueError): raise errors.ParseError(f"{string:s} is not a valid base16 integer.") return self._AddArgument(int_value) def _AddArgumentPath(self, **unused_kwargs): """Adds a path argument to the current expression. Note that this function is used as a callback by _GetNextToken. Returns: str: state or None if the argument could not be added to the current expression. """ value = artifacts.PathArtifact(path=self._string) return self._AddArgument(value) def _AddBinaryOperator(self, string=None, **unused_kwargs): """Adds a binary operator to the stack. Note that this function is used as a callback by _GetNextToken. Args: string (str): operator, such as "and", "or", "&&" or "||". Returns: str: next state, which is None. """ expression = expressions.BinaryExpression(operator=string) self._stack.append(expression) return None def _AddBracketClose(self, **unused_kwargs): """Adds a closing bracket to the stack. Note that this function is used as a callback by _GetNextToken. Returns: str: next state, which is None. """ self._stack.append(")") return None def _AddBracketOpen(self, **unused_kwargs): """Adds an opening bracket to the stack. Note that this function is used as a callback by _GetNextToken. Returns: str: next state, which is None. """ self._stack.append("(") return None def _CombineBinaryExpressions(self, operator): """Combines the binary expressions on the stack. Args: operator (str): operator, such as "and" or "or". """ operator_lower = operator.lower() item_index = 1 number_of_items = len(self._stack) - 1 while item_index < number_of_items: item = self._stack[item_index] if ( isinstance(item, expressions.BinaryExpression) and item.operator.lower() == operator_lower and not item.args ): previous_item = self._stack[item_index - 1] next_item = self._stack[item_index + 1] if isinstance(previous_item, expressions.Expression) and isinstance( next_item, expressions.Expression ): item.AddOperands(previous_item, next_item) self._stack.pop(item_index + 1) self._stack.pop(item_index - 1) item_index -= 2 number_of_items -= 2 item_index += 1 if item_index == 0: item_index += 1 def _CombineParenthesis(self): """Combines parenthesis (braces) expressions on the stack.""" item_index = 1 number_of_items = len(self._stack) - 1 while item_index < number_of_items: item = self._stack[item_index] previous_item = self._stack[item_index - 1] next_item = self._stack[item_index + 1] if ( previous_item == "(" and next_item == ")" and isinstance(item, expressions.Expression) ): self._stack.pop(item_index + 1) self._stack.pop(item_index - 1) item_index -= 2 number_of_items -= 2 item_index += 1 if item_index == 0: item_index += 1 def _GetNextToken(self): """Determines the next parser token based on the expression. Returns: Token: parser token or None if the buffer is empty. Raises: ParseError: if no token matched the expression. """ if not self._buffer: return None supported_states = (None, self._state) for token in self._TOKENS: if token.state not in supported_states: continue match = token.CompareExpression(self._buffer) if not match: continue # The match consumes the data off the buffer (the handler can put it back # if it likes) match_end_offset = match.end() match_buffer = self._buffer[:match_end_offset] self._buffer = self._buffer[match_end_offset:] self._processed_buffer = "".join([self._processed_buffer, match_buffer]) next_state = token.next_state for action in token.actions: callback = getattr(self, action, self._NoOperation) # Allow a callback to skip other callbacks. possible_next_state = callback(string=match.group(0), match=match) if possible_next_state == self._STATE_CONTINUE: continue # Override the state from the Token if possible_next_state: next_state = possible_next_state # Update the next state if next_state: self._state = next_state return token number_of_bytes = len(self._processed_buffer) raise errors.ParseError( f"No token match for parser state: {self._state:s} at position " f"{number_of_bytes:d}: {self._processed_buffer:s} <---> " f"{self._buffer:s}" ) def _NegateExpression(self, **unused_kwargs): """Reverses the logic of (negates) the current expression. Raises: ParseError: when the negation keyword (not) is expressed more than once, used after an argument or before an operator that does not support negation. """ if self._have_negate_keyword: raise errors.ParseError( "Negation keyword (not) can only be expressed once." ) if self._current_expression.args: raise errors.ParseError( "Negation keyword (not) cannot be used after an argument." ) operator = self._current_expression.operator if operator and operator.lower() not in self._OPERATORS_WITH_NEGATION: raise errors.ParseError( f"Operator: {operator:s} does not support negation." ) self._have_negate_keyword = True logger.debug("Negating expression") self._current_expression.Negate() def _NoOperation(self, **kwarg): """No operation. Note that this function is used as a callback by _GetNextToken. """ logger.debug(f"Default handler: {kwarg!s}") def _PopState(self, **unused_kwargs): """Pops the previous state from the stack. Returns: str: next state, which is the previous state on the stack. Raises: ParseError: if the stack is empty. """ try: self._state = self._state_stack.pop() except IndexError: number_of_bytes = len(self._processed_buffer) raise errors.ParseError( f"Tried to pop state from an empty stack - possible recursion error " f"at position {number_of_bytes:d}: {self._processed_buffer:s} <---> " f"{self._buffer:s}" ) logger.debug(f"Returned state to {self._state:s}") return self._state def _PushBack(self, string="", **unused_kwargs): """Pushes the string from processed buffer back onto the buffer. Note that this function is used as a callback by _GetNextToken. Args: string (Optional[str]): string. Returns: str: next state, which is None. """ self._buffer = "".join([string, self._buffer]) self._processed_buffer = self._processed_buffer[: -len(string)] return None def _PushState(self, **unused_kwargs): """Pushes the current state on the state stack. Note that this function is used as a callback by _GetNextToken. Returns: str: next state, which is None. """ logger.debug(f"Storing state {self._state:s}") self._state_stack.append(self._state) return None def _Reduce(self): """Reduces the expression stack into a single expression. Returns: Expression: remaining expression on the stack. Raises: ParseError: if the current state is unsupported or the remaining number of items on the stack is not 1. """ if self._state not in (self._STATE_BINARY_OPERATOR, self._STATE_INITIAL): number_of_bytes = len(self._processed_buffer) raise errors.ParseError( f"Unsupported initial state: {self._state:s} - premature end of " f"expression at position {number_of_bytes:d}: " f"{self._processed_buffer:s} <---> {self._buffer:s}" ) number_of_items = len(self._stack) while number_of_items > 1: # Precedence order self._CombineParenthesis() self._CombineBinaryExpressions("and") self._CombineBinaryExpressions("or") # No change if len(self._stack) == number_of_items: break number_of_items = len(self._stack) if number_of_items != 1: number_of_bytes = len(self._processed_buffer) raise errors.ParseError( f"Unsupported event filter expression at position " f"{number_of_bytes:d}: {self._processed_buffer:s} <---> " f"{self._buffer:s}" ) return self._stack[0] def _Reset(self): """Resets the parser.""" self._buffer = "" self._current_expression = expressions.EventExpression() self._error = 0 self._flags = 0 self._have_negate_keyword = False self._processed_buffer = "" self._stack = [] self._state = self._STATE_INITIAL self._state_stack = [] self._string = None def _SetAttribute(self, string="", **unused_kwargs): """Sets the attribute in the current expression. Note that this function is used as a callback by _GetNextToken. Args: string (Optional[str]): attribute. Returns: str: next state, which is the operator state. """ logger.debug(f"Storing attribute {string:s}") self._current_expression.SetAttribute(string) self._have_negate_keyword = False return self._STATE_OPERATOR def _SetDateTimeDecimalInteger(self, string="", **unused_kwargs): """Sets a decimal integer argument to the datetime value. Note that this function is used as a callback by _GetNextToken. Args: string (Optional[str]): argument string that contains an integer value formatted in decimal. Returns: str: state. Raises: ParseError: if string does not contain a valid integer. """ try: self._datetime_value = int(string) except (TypeError, ValueError): raise errors.ParseError(f"{string:s} is not a valid integer.") return self._STATE_DATETIME def _SetOperator(self, string="", **unused_kwargs): """Sets the operator in the current expression. Note that this function is used as a callback by _GetNextToken. Args: string (Optional[str]): operator. Returns: str: next state, which is None. """ logger.debug(f"Storing operator {repr(string)!s}") self._current_expression.SetOperator(string) return None def _StringEscape(self, string="", match="", **unused_kwargs): """Escapes backslashes found inside an expression string. Backslashes followed by anything other than [\\'"rnbt.ws] will raise an Error. Note that this function is used as a callback by _GetNextToken. Args: string (Optional[str]): expression string. match (Optional[re.MatchObject]): the regular expression match object, where match.group(1) contains the escaped code. Returns: str: next state, which is None. Raises: ParseError: when the escaped string is not one of [\\'"rnbt]. """ if match.group(1) not in "\\'\"rnbt\\.ws": raise errors.ParseError(f"Invalid escape character {string:s}.") decoded_string = codecs.decode(string, "unicode_escape") return self._StringExpand(string=decoded_string) def _StringExpand(self, string="", **unused_kwargs): """Expands the internal string with the expression string. Note that this function is used as a callback by _GetNextToken. Args: string (Optional[str]): expression string. Returns: str: next state, which is None. """ self._string = "".join([self._string, string]) return None def _StringFinish(self, **unused_kwargs): """Finishes parsing a string. Note that this function is used as a callback by _GetNextToken. Returns: str: next state, or None when the state is not the attribute or argument state. """ if self._state == self._STATE_ATTRIBUTE: return self._SetAttribute(string=self._string) if self._state == self._STATE_ARGUMENT: return self._AddArgument(self._string) if self._state == self._STATE_DATETIME: self._datetime_value = self._string return self._STATE_DATETIME if self._state == self._STATE_PATH: return self._STATE_PATH return None def _StringStart(self, **unused_kwargs): """Initializes parsing a string. Note that this function is used as a callback by _GetNextToken. Returns: str: next state, which is None. """ self._string = "" return None
[docs] def HexEscape(self, string, match, **unused_kwargs): """Converts a hex escaped string. Note that this function is used as a callback by _GetNextToken. Returns: str: next state, which is None. Raises: ParseError: if the string is not hex escaped. """ logger.debug(f"HexEscape matched {string:s}.") hex_string = match.group(1) try: hex_string = binascii.unhexlify(hex_string) hex_string = codecs.decode(hex_string, "utf-8") self._string += hex_string except (TypeError, binascii.Error): raise errors.ParseError(f"Invalid hex escape {hex_string!s}.") return None
[docs] def Parse(self, expression): """Parses an event filter expression. Args: expression (str): event filter expression. Returns: Expression: expression. """ if not expression: return expressions.IdentityExpression() self._Reset() self._buffer = expression token = self._GetNextToken() while token: token = self._GetNextToken() return self._Reduce()