Source code for

# -*- coding: utf-8 -*-
"""Redis merge reader."""
from __future__ import unicode_literals

import codecs

from plaso.containers import event_sources
from plaso.containers import events
from plaso.containers import reports
from plaso.containers import tasks
from plaso.containers import warnings
from plaso.lib import definitions
from import identifiers
from import interface
from import logger
from import redis_store

[docs]class RedisMergeReader(interface.StorageMergeReader): """Redis store reader for merging.""" # TODO: move definitions into parent class. _CONTAINER_TYPE_ANALYSIS_REPORT = reports.AnalysisReport.CONTAINER_TYPE _CONTAINER_TYPE_EVENT = events.EventObject.CONTAINER_TYPE _CONTAINER_TYPE_EVENT_DATA = events.EventData.CONTAINER_TYPE _CONTAINER_TYPE_EVENT_SOURCE = event_sources.EventSource.CONTAINER_TYPE _CONTAINER_TYPE_EVENT_TAG = events.EventTag.CONTAINER_TYPE _CONTAINER_TYPE_EXTRACTION_ERROR = warnings.ExtractionError.CONTAINER_TYPE _CONTAINER_TYPE_TASK_COMPLETION = tasks.TaskCompletion.CONTAINER_TYPE _CONTAINER_TYPE_TASK_START = tasks.TaskStart.CONTAINER_TYPE # Some container types reference other container types, such as event # referencing event_data. Container types in this tuple must be ordered after # all the container types they reference. _CONTAINER_TYPES = ( _CONTAINER_TYPE_EVENT_SOURCE, _CONTAINER_TYPE_EVENT_DATA, _CONTAINER_TYPE_EVENT, _CONTAINER_TYPE_EVENT_TAG, _CONTAINER_TYPE_EXTRACTION_ERROR, _CONTAINER_TYPE_ANALYSIS_REPORT) _ADD_CONTAINER_TYPE_METHODS = { _CONTAINER_TYPE_ANALYSIS_REPORT: '_AddAnalysisReport', _CONTAINER_TYPE_EVENT: '_AddEvent', _CONTAINER_TYPE_EVENT_DATA: '_AddEventData', _CONTAINER_TYPE_EVENT_SOURCE: '_AddEventSource', _CONTAINER_TYPE_EVENT_TAG: '_AddEventTag', _CONTAINER_TYPE_EXTRACTION_ERROR: '_AddError', } def __init__(self, storage_writer, task, redis_client=None): """Initializes a Redis storage merge reader. Args: storage_writer (StorageWriter): storage writer. task (Task): the task whose store is being merged. redis_client (Optional[Redis]): Redis client to query. If specified, no new client will be created. Raises: RuntimeError: if an add container method is missing. """ super(RedisMergeReader, self).__init__(storage_writer) self._active_container_type = None self._container_types = [] self._active_cursor = 0 self._add_active_container_method = None self._store = redis_store.RedisStore( definitions.STORAGE_TYPE_TASK, session_identifier=task.session_identifier, task_identifier=task.identifier) self._store.Open(redis_client=redis_client) self._event_data_identifier_mappings = {} self._add_container_type_methods = {} self._active_extra_containers = [] # Create a runtime lookup table for the add container type method. This # prevents having to create a series of if-else checks for container types. # The table is generated at runtime as there are no forward function # declarations in Python. for container_type, method_name in self._ADD_CONTAINER_TYPE_METHODS.items(): method = getattr(self, method_name, None) if not method: raise RuntimeError( 'Add method missing for container type: {0:s}'.format( container_type)) self._add_container_type_methods[container_type] = method def _AddAnalysisReport(self, analysis_report): """Adds an analysis report. Args: analysis_report (AnalysisReport): analysis report. """ self._storage_writer.AddAnalysisReport(analysis_report) def _AddError(self, error): """Adds an error. Args: error (ExtractionError): error. """ self._storage_writer.AddError(error) def _AddEventSource(self, event_source): """Adds an event source. Args: event_source (EventSource): event source.F """ self._storage_writer.AddEventSource(event_source) def _AddEventTag(self, event_tag): """Adds an event tag. Args: event_tag (EventTag): event tag. """ self._storage_writer.AddEventTag(event_tag) def _AddEvent(self, event): """Adds an event. Args: event (EventObject): event. """ if hasattr(event, 'event_data_row_identifier'): event_data_identifier = identifiers.SQLTableIdentifier( self._CONTAINER_TYPE_EVENT_DATA, event.event_data_row_identifier) lookup_key = event_data_identifier.CopyToString() event_data_identifier = self._event_data_identifier_mappings[lookup_key] event.SetEventDataIdentifier(event_data_identifier) # TODO: add event identifier mappings for event tags. self._storage_writer.AddEvent(event) def _AddEventData(self, event_data): """Adds event data. Args: event_data (EventData): event data. """ identifier = event_data.GetIdentifier() lookup_key = identifier.CopyToString() self._storage_writer.AddEventData(event_data) post_write_identifier = event_data.GetIdentifier() self._event_data_identifier_mappings[lookup_key] = post_write_identifier def _PrepareForNextContainerType(self): """Prepares for the next container type. This method prepares the task storage for merging the next container type. It sets the active container type, its add method and active cursor accordingly. """ self._active_container_type = self._container_types.pop(0) self._add_active_container_method = self._add_container_type_methods.get( self._active_container_type) self._active_cursor = 0 def _GetContainerTypes(self): """Retrieves the container types to merge. Container types not defined in _CONTAINER_TYPES are ignored and not merged. Specific container types reference other container types, such as event referencing event data. The names are ordered to ensure the attribute containers are merged in the correct order. Returns: list[str]: names of the container types to merge. """ container_types = [] for container_type in self._CONTAINER_TYPES: # pylint: disable=protected-access if self._store._HasAttributeContainers(container_type): container_types.append(container_type) return container_types def _GetAttributeContainers( self, container_type, callback=None, cursor=0, maximum_number_of_items=0): """Retrieves attribute containers of the specified type. Args: container_type (str): attribute container type. callback (function[StorageWriter, AttributeContainer]): function to call after each attribute container is deserialized. cursor (int): Redis cursor for scanning items. maximum_number_of_items (Optional[int]): maximum number of containers to retrieve, where 0 represent no limit. Returns: list(AttributeContainer): attribute containers from Redis. """ if not cursor: cursor = 0 cursor, items = self._store.GetSerializedAttributeContainers( container_type, cursor, maximum_number_of_items) containers = [] identifiers_to_delete = [] for identifier_bytes, serialized_container in items.items(): identifier_string = codecs.decode(identifier_bytes, 'utf-8') identifier = identifiers.RedisKeyIdentifier(identifier_string) identifiers_to_delete.append(identifier) container = self._DeserializeAttributeContainer( self._active_container_type, serialized_container) container.SetIdentifier(identifier) if callback: callback(self._storage_writer, container) containers.append(container) self._store.RemoveAttributeContainers(container_type, identifiers_to_delete) self._active_cursor = cursor containers = self._active_extra_containers + containers if maximum_number_of_items: self._active_extra_containers = containers[maximum_number_of_items:] return containers[:maximum_number_of_items]
[docs] def MergeAttributeContainers( self, callback=None, maximum_number_of_containers=0): """Reads attribute containers from a task store into the writer. Args: callback (Optional[function[StorageWriter, AttributeContainer]]): function to call after each attribute container is deserialized. maximum_number_of_containers (Optional[int]): maximum number of containers to merge, where 0 represent no limit. Returns: bool: True if the entire task storage file has been merged. Raises: RuntimeError: if the add method for the active attribute container type is missing. """ if not self._container_types: self._container_types = self._GetContainerTypes() number_of_containers = 0 while (self._active_cursor or self._container_types or self._active_extra_containers): if not self._active_cursor and not self._active_extra_containers: self._PrepareForNextContainerType() containers = self._GetAttributeContainers( self._active_container_type, callback=callback, cursor=self._active_cursor, maximum_number_of_items=maximum_number_of_containers) if not containers: self._active_cursor = 0 continue for container in containers: self._add_active_container_method(container) number_of_containers += 1 if 0 < maximum_number_of_containers <= number_of_containers: logger.debug( 'Only merged {0:d} containers'.format(number_of_containers)) return False logger.debug('Merged {0:d} containers'.format(number_of_containers)) # While all the containers have been merged, the 'merging' key is still # present, so we still need to remove the store. self._store.Remove() return True