Source code for plaso.engine.tagging_file

# -*- coding: utf-8 -*-
"""Tagging file."""

from __future__ import unicode_literals

import io
import re

from efilter import ast as efilter_ast
from efilter import errors as efilter_errors
from efilter import query as efilter_query

from plaso.lib import errors

[docs]class TaggingFile(object): """Tagging file. A tagging file contains one or more event tagging rules. """ # A line with no indent is a tag name. _TAG_LABEL_LINE = re.compile(r'^(\w+)') # A line with leading indent is one of the rules for the preceding tag. _TAG_RULE_LINE = re.compile(r'^\s+(.+)') # If any of these words are in the query then it's probably objectfilter. _OBJECTFILTER_WORDS = re.compile( r'\s(is|isnot|equals|notequals|inset|notinset|contains|notcontains)\s') def __init__(self, path): """Initializes a tagging file. Args: path (str): path to a file that contains one or more event tagging rules. """ super(TaggingFile, self).__init__() self._path = path def _ParseDefinitions(self, tagging_file_path): """Parses the tag file and yields tuples of label name, list of rule ASTs. Args: tagging_file_path (str): path to the tagging file. Yields: tuple: containing: str: label name. list[efilter.query.Query]: efilter queries. """ queries = None label_name = None with, 'r', encoding='utf-8') as tagging_file: for line in tagging_file.readlines(): label_match = self._TAG_LABEL_LINE.match(line) if label_match: if label_name and queries: yield label_name, queries queries = [] label_name = continue event_tagging_expression = self._TAG_RULE_LINE.match(line) if not event_tagging_expression: continue tagging_rule = self._ParseEventTaggingRule( queries.append(tagging_rule) # Yield any remaining tags once we reach the end of the file. if label_name and queries: yield label_name, queries def _ParseEventTaggingRule(self, event_tagging_expression): """Parses an event tagging expression. This method attempts to detect whether the event tagging expression is valid objectfilter or dottysql syntax. Example: _ParseEventTaggingRule('5 + 5') # Returns Sum(Literal(5), Literal(5)) Args: event_tagging_expression (str): event tagging expression either in objectfilter or dottysql syntax. Returns: efilter.query.Query: efilter query of the event tagging expression. Raises: TaggingFileError: when the tagging file cannot be correctly parsed. """ if syntax = 'objectfilter' else: syntax = 'dottysql' try: return efilter_query.Query(event_tagging_expression, syntax=syntax) except efilter_errors.EfilterParseError as exception: stripped_expression = event_tagging_expression.rstrip() raise errors.TaggingFileError(( 'Unable to parse event tagging expressoin: "{0:s}" with error: ' '{1!s}').format(stripped_expression, exception))
[docs] def GetEventTaggingRules(self): """Retrieves the event tagging rules from the tagging file. Returns: efilter.ast.Expression: efilter abstract syntax tree (AST), containing the tagging rules. """ tags = [] for label_name, rules in self._ParseDefinitions(self._path): if not rules: continue tag = efilter_ast.IfElse( # Union will be true if any of the 'rules' match. efilter_ast.Union(*[rule.root for rule in rules]), # If so then evaluate to a string with the name of the tag. efilter_ast.Literal(label_name), # Otherwise don't return anything. efilter_ast.Literal(None)) tags.append(tag) # Generate a repeated value with all the tags (None will be skipped).
return efilter_ast.Repeat(*tags)