Source code for plaso.analysis.tagging

# -*- coding: utf-8 -*-
"""Analysis plugin that labels events according to rules in a tagging file."""

from plaso.analysis import interface
from plaso.analysis import manager
from plaso.engine import tagging_file


[docs] class TaggingAnalysisPlugin(interface.AnalysisPlugin): """Analysis plugin that labels events according to rules in a tagging file.""" NAME = 'tagging'
[docs] def __init__(self): """Initializes a tagging analysis plugin.""" super(TaggingAnalysisPlugin, self).__init__() self._tagging_rules = None
[docs] def ExamineEvent( self, analysis_mediator, event, event_data, event_data_stream): """Labels events according to the rules in a tagging file. Args: analysis_mediator (AnalysisMediator): mediates interactions between analysis plugins and other components, such as storage and dfVFS. event (EventObject): event to examine. event_data (EventData): event data. event_data_stream (EventDataStream): event data stream. """ matched_label_names = [] for label_name, filter_objects in self._tagging_rules.items(): for filter_object in filter_objects: # Note that tagging events based on existing labels is currently # not supported. if filter_object.Match(event, event_data, event_data_stream, None): matched_label_names.append(label_name) break if matched_label_names: event_tag = self._CreateEventTag(event, matched_label_names) analysis_mediator.ProduceEventTag(event_tag) for label_name in matched_label_names: self._analysis_counter[label_name] += 1 self._analysis_counter['event_tags'] += 1
[docs] def SetAndLoadTagFile(self, tagging_file_path): """Sets the tagging file to be used by the plugin. Args: tagging_file_path (str): path of the tagging file. """ tagging_file_object = tagging_file.TaggingFile(tagging_file_path) self._tagging_rules = tagging_file_object.GetEventTaggingRules()
manager.AnalysisPluginManager.RegisterPlugin(TaggingAnalysisPlugin)