Source code for plaso.filters.expression_parser

# -*- coding: utf-8 -*-
"""Event filter expression parser."""

import binascii
import codecs
import re

from dfdatetime import posix_time as dfdatetime_posix_time
from dfdatetime import time_elements as dfdatetime_time_elements

from plaso.containers import artifacts
from plaso.filters import expressions
from plaso.filters import logger
from plaso.lib import errors


[docs] class Token(object): """An event filter expression parser token. Attributes: actions (list[str]): list of method names in the EventFilterExpressionParser to call. next_state (str): next state we transition to if this Token matches. state (str): parser state within the token should be applied or None if the token should be applied regardless of the parser state. """ _ACTION_SEPARATOR = ','
[docs] def __init__(self, state, regex, actions, next_state): """Initializes an event filter expressions parser token. Args: state (str): parser state within the token should be applied or None if the token should be applied regardless of the parser state. regex (str): regular expression to try and match from the current point. actions (list[str]): list of method names in the EventFilterExpressionParser to call. next_state (str): next state we transition to if this Token matches. """ super(Token, self).__init__() self._regex = re.compile(regex, re.DOTALL | re.I | re.M | re.S | re.U) self.actions = [] self.next_state = next_state self.state = state if actions: self.actions = actions.split(self._ACTION_SEPARATOR)
[docs] def CompareExpression(self, expression): """Compares the token against an expression string. Args: expression (str): expression string. Returns: re.Match: the regular expression match object if the expression string matches the token or None if no match. """ return self._regex.match(expression)
[docs] class EventFilterExpressionParser(object): """Event filter expression parser. Examples of valid syntax: size is 40 (name contains "Program Files" AND hash.md5 is "123abc") @imported_modules (num_symbols = 14 AND symbol.name is "FindWindow") """ _OPERATORS_WITH_NEGATION = frozenset(['contains', 'equals', 'inset', 'is']) _STATE_ARGUMENT = 'ARGUMENT' _STATE_ATTRIBUTE = 'ATTRIBUTE' _STATE_BINARY_OPERATOR = 'BINARY' _STATE_DATETIME = 'DATETIME' _STATE_NEGATION_OPERATOR = 'CHECKNOT' _STATE_CONTINUE = 'CONTINUE' _STATE_INITIAL = 'INITIAL' _STATE_OPERATOR = 'OPERATOR' _STATE_PATH = 'PATH' _STATE_STRING_DOUBLE_QUOTE = 'STRING_DOUBLE_QUOTE' _STATE_STRING_SINGLE_QUOTE = 'STRING_SINGLE_QUOTE' _TOKENS = [ # Operators and related tokens Token(_STATE_INITIAL, r'[^\s\(\)]', '_PushState,_PushBack', _STATE_ATTRIBUTE), Token(_STATE_INITIAL, r'\(', '_PushState,_AddBracketOpen', None), Token(_STATE_INITIAL, r'\)', '_AddBracketClose', _STATE_BINARY_OPERATOR), # Double quoted string Token(_STATE_STRING_DOUBLE_QUOTE, '"', '_PopState,_StringFinish', None), Token(_STATE_STRING_DOUBLE_QUOTE, r'\\x(..)', 'HexEscape', None), Token(_STATE_STRING_DOUBLE_QUOTE, r'\\(.)', '_StringEscape', None), Token(_STATE_STRING_DOUBLE_QUOTE, r'[^\\"]+', '_StringExpand', None), # Single quoted string Token(_STATE_STRING_SINGLE_QUOTE, '\'', '_PopState,_StringFinish', None), Token(_STATE_STRING_SINGLE_QUOTE, r'\\x(..)', 'HexEscape', None), Token(_STATE_STRING_SINGLE_QUOTE, r'\\(.)', '_StringEscape', None), Token(_STATE_STRING_SINGLE_QUOTE, r'[^\\\']+', '_StringExpand', None), # Date and time definition Token(_STATE_DATETIME, r'\)', '_PopState,_AddArgumentDateTime', None), Token(_STATE_DATETIME, r'(\d+)', '_SetDateTimeDecimalInteger', _STATE_DATETIME), Token(_STATE_DATETIME, '"', '_PushState,_StringStart', _STATE_STRING_DOUBLE_QUOTE), Token(_STATE_DATETIME, '\'', '_PushState,_StringStart', _STATE_STRING_SINGLE_QUOTE), # Path definition Token(_STATE_PATH, r'\)', '_PopState,_AddArgumentPath', None), Token(_STATE_PATH, '"', '_PushState,_StringStart', _STATE_STRING_DOUBLE_QUOTE), Token(_STATE_PATH, '\'', '_PushState,_StringStart', _STATE_STRING_SINGLE_QUOTE), # Basic expression Token(_STATE_ATTRIBUTE, r'[\w._0-9]+', '_SetAttribute', _STATE_OPERATOR), Token(_STATE_OPERATOR, 'not ', '_NegateExpression', None), Token(_STATE_OPERATOR, r'(\w+|[<>!=]=?)', '_SetOperator', _STATE_NEGATION_OPERATOR), Token(_STATE_NEGATION_OPERATOR, 'not', '_NegateExpression', _STATE_ARGUMENT), Token(_STATE_NEGATION_OPERATOR, r'\s+', None, None), Token(_STATE_NEGATION_OPERATOR, r'([^not])', '_PushBack', _STATE_ARGUMENT), Token(_STATE_ARGUMENT, r'(\d+\.\d+)', '_AddArgumentFloatingPoint', _STATE_ARGUMENT), Token(_STATE_ARGUMENT, r'(0x\d+)', '_AddArgumentHexadecimalInteger', _STATE_ARGUMENT), Token(_STATE_ARGUMENT, r'(\d+)', '_AddArgumentDecimalInteger', _STATE_ARGUMENT), Token(_STATE_ARGUMENT, '"', '_PushState,_StringStart', _STATE_STRING_DOUBLE_QUOTE), Token(_STATE_ARGUMENT, '\'', '_PushState,_StringStart', _STATE_STRING_SINGLE_QUOTE), Token(_STATE_ARGUMENT, r'DATETIME\(', '_PushState', _STATE_DATETIME), Token(_STATE_ARGUMENT, r'PATH\(', '_PushState', _STATE_PATH), # When the last parameter from arg_list has been pushed # State where binary operators are supported (AND, OR) Token(_STATE_BINARY_OPERATOR, r'(?i)(and|or|\&\&|\|\|)', '_AddBinaryOperator', _STATE_INITIAL), # - We can also skip spaces Token(_STATE_BINARY_OPERATOR, r'\s+', None, None), # - But if it's not "and" or just spaces we have to go back Token(_STATE_BINARY_OPERATOR, '.', '_PushBack,_PopState', None), # Skip whitespace. Token(None, r'\s+', None, None)]
[docs] def __init__(self): """Initializes an event filter expression parser.""" super(EventFilterExpressionParser, self).__init__() self._buffer = '' self._datetime_value = None self._current_expression = None self._error = 0 self._flags = 0 self._have_negate_keyword = False self._processed_buffer = '' self._stack = [] self._state = self._STATE_INITIAL self._state_stack = [] self._string = None
# The parser token callback methods use a specific function interface. # pylint: disable=redundant-returns-doc,useless-return def _AddArgument(self, value): """Adds an argument to the current expression. Args: value (object): argument value. Returns: str: state or None if the argument could not be added to the current expression. Raises: ParseError: if the operator does not support negation. """ logger.debug('Storing argument: {0!s}'.format(value)) if self._have_negate_keyword: operator = self._current_expression.operator if operator and operator.lower() not in self._OPERATORS_WITH_NEGATION: raise errors.ParseError( 'Operator: {0:s} does not support negation (not).'.format(operator)) # This expression is complete if self._current_expression.AddArgument(value): self._stack.append(self._current_expression) self._current_expression = expressions.EventExpression() # We go to the BINARY state, to find if there's an AND or OR operator return self._STATE_BINARY_OPERATOR return None def _AddArgumentDateTime(self, **unused_kwargs): """Adds a date and time argument to the current expression. Note that this function is used as a callback by _GetNextToken. Returns: str: state or None if the argument could not be added to the current expression. Raises: ParseError: if datetime value does not contain a valid POSIX timestamp in microseconds or ISO 8601 date and time string. """ if isinstance(self._datetime_value, int): date_time = dfdatetime_posix_time.PosixTimeInMicroseconds( timestamp=self._datetime_value) elif isinstance(self._datetime_value, str): try: date_time = dfdatetime_time_elements.TimeElementsInMicroseconds() date_time.CopyFromStringISO8601(self._datetime_value) except ValueError: raise errors.ParseError('unsupported ISO 8601 string: {0:s}.'.format( self._datetime_value)) else: raise errors.ParseError('unsupported datetime value: {0!s}.'.format( self._datetime_value)) self._datetime_value = None return self._AddArgument(date_time) def _AddArgumentDecimalInteger(self, string='', **unused_kwargs): """Adds a decimal integer argument to the current expression. Note that this function is used as a callback by _GetNextToken. Args: string (Optional[str]): argument string that contains an integer value formatted in decimal. Returns: str: state or None if the argument could not be added to the current expression. Raises: ParseError: if string does not contain a valid integer. """ try: int_value = int(string) except (TypeError, ValueError): raise errors.ParseError('{0:s} is not a valid integer.'.format(string)) return self._AddArgument(int_value) def _AddArgumentFloatingPoint(self, string='', **unused_kwargs): """Adds a floating-point argument to the current expression. Note that this function is used as a callback by _GetNextToken. Args: string (Optional[str]): argument string that contains a floating-point value. Returns: str: state or None if the argument could not be added to the current expression. Raises: ParseError: if string does not contain a valid floating-point number. """ try: float_value = float(string) except (TypeError, ValueError): raise errors.ParseError('{0:s} is not a valid float.'.format(string)) return self._AddArgument(float_value) def _AddArgumentHexadecimalInteger(self, string='', **unused_kwargs): """Adds a hexadecimal integer argument to the current expression. Note that this function is used as a callback by _GetNextToken. Args: string (Optional[str]): argument string that contains an integer value formatted in hexadecimal. Returns: str: state or None if the argument could not be added to the current expression. Raises: ParseError: if string does not contain a valid base16 formatted integer. """ try: int_value = int(string, 16) except (TypeError, ValueError): raise errors.ParseError( '{0:s} is not a valid base16 integer.'.format(string)) return self._AddArgument(int_value) def _AddArgumentPath(self, **unused_kwargs): """Adds a path argument to the current expression. Note that this function is used as a callback by _GetNextToken. Returns: str: state or None if the argument could not be added to the current expression. """ value = artifacts.PathArtifact(path=self._string) return self._AddArgument(value) def _AddBinaryOperator(self, string=None, **unused_kwargs): """Adds a binary operator to the stack. Note that this function is used as a callback by _GetNextToken. Args: string (str): operator, such as "and", "or", "&&" or "||". Returns: str: next state, which is None. """ expression = expressions.BinaryExpression(operator=string) self._stack.append(expression) return None def _AddBracketClose(self, **unused_kwargs): """Adds a closing bracket to the stack. Note that this function is used as a callback by _GetNextToken. Returns: str: next state, which is None. """ self._stack.append(')') return None def _AddBracketOpen(self, **unused_kwargs): """Adds an opening bracket to the stack. Note that this function is used as a callback by _GetNextToken. Returns: str: next state, which is None. """ self._stack.append('(') return None def _CombineBinaryExpressions(self, operator): """Combines the binary expressions on the stack. Args: operator (str): operator, such as "and" or "or". """ operator_lower = operator.lower() item_index = 1 number_of_items = len(self._stack) - 1 while item_index < number_of_items: item = self._stack[item_index] if (isinstance(item, expressions.BinaryExpression) and item.operator.lower() == operator_lower and not item.args): previous_item = self._stack[item_index - 1] next_item = self._stack[item_index + 1] if (isinstance(previous_item, expressions.Expression) and isinstance(next_item, expressions.Expression)): item.AddOperands(previous_item, next_item) self._stack.pop(item_index + 1) self._stack.pop(item_index - 1) item_index -= 2 number_of_items -= 2 item_index += 1 if item_index == 0: item_index += 1 def _CombineParenthesis(self): """Combines parenthesis (braces) expressions on the stack.""" item_index = 1 number_of_items = len(self._stack) - 1 while item_index < number_of_items: item = self._stack[item_index] previous_item = self._stack[item_index - 1] next_item = self._stack[item_index + 1] if (previous_item == '(' and next_item == ')' and isinstance(item, expressions.Expression)): self._stack.pop(item_index + 1) self._stack.pop(item_index - 1) item_index -= 2 number_of_items -= 2 item_index += 1 if item_index == 0: item_index += 1 def _GetNextToken(self): """Determines the next parser token based on the expression. Returns: Token: parser token or None if the buffer is empty. Raises: ParseError: if no token matched the expression. """ if not self._buffer: return None supported_states = (None, self._state) for token in self._TOKENS: if token.state not in supported_states: continue match = token.CompareExpression(self._buffer) if not match: continue # The match consumes the data off the buffer (the handler can put it back # if it likes) match_end_offset = match.end() match_buffer = self._buffer[:match_end_offset] self._buffer = self._buffer[match_end_offset:] self._processed_buffer = ''.join([self._processed_buffer, match_buffer]) next_state = token.next_state for action in token.actions: callback = getattr(self, action, self._NoOperation) # Allow a callback to skip other callbacks. possible_next_state = callback(string=match.group(0), match=match) if possible_next_state == self._STATE_CONTINUE: continue # Override the state from the Token if possible_next_state: next_state = possible_next_state # Update the next state if next_state: self._state = next_state return token raise errors.ParseError(( 'No token match for parser state: {0:s} at position {1!s}: {2!s} ' '<---> {3!s} )').format( self._state, len(self._processed_buffer), self._processed_buffer, self._buffer)) def _NegateExpression(self, **unused_kwargs): """Reverses the logic of (negates) the current expression. Raises: ParseError: when the negation keyword (not) is expressed more than once, used after an argument or before an operator that does not support negation. """ if self._have_negate_keyword: raise errors.ParseError( 'Negation keyword (not) can only be expressed once.') if self._current_expression.args: raise errors.ParseError( 'Negation keyword (not) cannot be used after an argument.') operator = self._current_expression.operator if operator and operator.lower() not in self._OPERATORS_WITH_NEGATION: raise errors.ParseError( 'Operator: {0:s} does not support negation.'.format(operator)) self._have_negate_keyword = True logger.debug('Negating expression') self._current_expression.Negate() def _NoOperation(self, **kwarg): """No operation. Note that this function is used as a callback by _GetNextToken. """ logger.debug('Default handler: {0!s}'.format(kwarg)) def _PopState(self, **unused_kwargs): """Pops the previous state from the stack. Returns: str: next state, which is the previous state on the stack. Raises: ParseError: if the stack is empty. """ try: self._state = self._state_stack.pop() except IndexError: raise errors.ParseError(( 'Tried to pop state from an empty stack - possible recursion error ' 'at position {0!s}: {1!s} <---> {2!s} )').format( len(self._processed_buffer), self._processed_buffer, self._buffer)) logger.debug('Returned state to {0:s}'.format(self._state)) return self._state def _PushBack(self, string='', **unused_kwargs): """Pushes the string from processed buffer back onto the buffer. Note that this function is used as a callback by _GetNextToken. Args: string (Optional[str]): string. Returns: str: next state, which is None. """ self._buffer = ''.join([string, self._buffer]) self._processed_buffer = self._processed_buffer[:-len(string)] return None def _PushState(self, **unused_kwargs): """Pushes the current state on the state stack. Note that this function is used as a callback by _GetNextToken. Returns: str: next state, which is None. """ logger.debug('Storing state {0:s}'.format(repr(self._state))) self._state_stack.append(self._state) return None def _Reduce(self): """Reduces the expression stack into a single expression. Returns: Expression: remaining expression on the stack. Raises: ParseError: if the current state is unsupported or the remaining number of items on the stack is not 1. """ if self._state not in (self._STATE_BINARY_OPERATOR, self._STATE_INITIAL): raise errors.ParseError(( 'Unsupported initial state: {0:s} - premature end of expression ' 'at position {1!s}: {2!s} <---> {3!s} )').format( self._state, len(self._processed_buffer), self._processed_buffer, self._buffer)) number_of_items = len(self._stack) while number_of_items > 1: # Precedence order self._CombineParenthesis() self._CombineBinaryExpressions('and') self._CombineBinaryExpressions('or') # No change if len(self._stack) == number_of_items: break number_of_items = len(self._stack) if number_of_items != 1: raise errors.ParseError(( 'Unsupported event filter expression at position {0!s}: {1!s} <---> ' '{2!s} )').format( len(self._processed_buffer), self._processed_buffer, self._buffer)) return self._stack[0] def _Reset(self): """Resets the parser.""" self._buffer = '' self._current_expression = expressions.EventExpression() self._error = 0 self._flags = 0 self._have_negate_keyword = False self._processed_buffer = '' self._stack = [] self._state = self._STATE_INITIAL self._state_stack = [] self._string = None def _SetAttribute(self, string='', **unused_kwargs): """Sets the attribute in the current expression. Note that this function is used as a callback by _GetNextToken. Args: string (Optional[str]): attribute. Returns: str: next state, which is the operator state. """ logger.debug('Storing attribute {0:s}'.format(repr(string))) self._current_expression.SetAttribute(string) self._have_negate_keyword = False return self._STATE_OPERATOR def _SetDateTimeDecimalInteger(self, string='', **unused_kwargs): """Sets a decimal integer argument to the datetime value. Note that this function is used as a callback by _GetNextToken. Args: string (Optional[str]): argument string that contains an integer value formatted in decimal. Returns: str: state. Raises: ParseError: if string does not contain a valid integer. """ try: self._datetime_value = int(string) except (TypeError, ValueError): raise errors.ParseError('{0:s} is not a valid integer.'.format(string)) return self._STATE_DATETIME def _SetOperator(self, string='', **unused_kwargs): """Sets the operator in the current expression. Note that this function is used as a callback by _GetNextToken. Args: string (Optional[str]): operator. Returns: str: next state, which is None. """ logger.debug('Storing operator {0:s}'.format(repr(string))) self._current_expression.SetOperator(string) return None def _StringEscape(self, string='', match='', **unused_kwargs): """Escapes backslashes found inside an expression string. Backslashes followed by anything other than [\\'"rnbt.ws] will raise an Error. Note that this function is used as a callback by _GetNextToken. Args: string (Optional[str]): expression string. match (Optional[re.MatchObject]): the regular expression match object, where match.group(1) contains the escaped code. Returns: str: next state, which is None. Raises: ParseError: when the escaped string is not one of [\\'"rnbt]. """ if match.group(1) not in '\\\'"rnbt\\.ws': raise errors.ParseError('Invalid escape character {0:s}.'.format(string)) decoded_string = codecs.decode(string, 'unicode_escape') return self._StringExpand(string=decoded_string) def _StringExpand(self, string='', **unused_kwargs): """Expands the internal string with the expression string. Note that this function is used as a callback by _GetNextToken. Args: string (Optional[str]): expression string. Returns: str: next state, which is None. """ self._string = ''.join([self._string, string]) return None def _StringFinish(self, **unused_kwargs): """Finishes parsing a string. Note that this function is used as a callback by _GetNextToken. Returns: str: next state, or None when the state is not the attribute or argument state. """ if self._state == self._STATE_ATTRIBUTE: return self._SetAttribute(string=self._string) if self._state == self._STATE_ARGUMENT: return self._AddArgument(self._string) if self._state == self._STATE_DATETIME: self._datetime_value = self._string return self._STATE_DATETIME if self._state == self._STATE_PATH: return self._STATE_PATH return None def _StringStart(self, **unused_kwargs): """Initializes parsing a string. Note that this function is used as a callback by _GetNextToken. Returns: str: next state, which is None. """ self._string = '' return None
[docs] def HexEscape(self, string, match, **unused_kwargs): """Converts a hex escaped string. Note that this function is used as a callback by _GetNextToken. Returns: str: next state, which is None. Raises: ParseError: if the string is not hex escaped. """ logger.debug('HexEscape matched {0:s}.'.format(string)) hex_string = match.group(1) try: hex_string = binascii.unhexlify(hex_string) hex_string = codecs.decode(hex_string, 'utf-8') self._string += hex_string except (TypeError, binascii.Error): raise errors.ParseError('Invalid hex escape {0!s}.'.format(hex_string)) return None
[docs] def Parse(self, expression): """Parses an event filter expression. Args: expression (str): event filter expression. Returns: Expression: expression. """ if not expression: return expressions.IdentityExpression() self._Reset() self._buffer = expression token = self._GetNextToken() while token: token = self._GetNextToken() return self._Reduce()