Source code for plaso.storage.writer

# -*- coding: utf-8 -*-
"""The storage writer."""

import abc
import collections

from plaso.containers import event_sources
from plaso.containers import events
from plaso.containers import reports
from plaso.containers import warnings
from plaso.lib import definitions
from plaso.storage import reader


[docs] class StorageWriter(reader.StorageReader): """Storage writer interface.""" _CONTAINER_TYPE_ANALYSIS_REPORT = reports.AnalysisReport.CONTAINER_TYPE _CONTAINER_TYPE_ANALYSIS_WARNING = warnings.AnalysisWarning.CONTAINER_TYPE _CONTAINER_TYPE_EVENT = events.EventObject.CONTAINER_TYPE _CONTAINER_TYPE_EVENT_DATA = events.EventData.CONTAINER_TYPE _CONTAINER_TYPE_EVENT_DATA_STREAM = events.EventDataStream.CONTAINER_TYPE _CONTAINER_TYPE_EVENT_SOURCE = event_sources.EventSource.CONTAINER_TYPE _CONTAINER_TYPE_EVENT_TAG = events.EventTag.CONTAINER_TYPE _CONTAINER_TYPE_EXTRACTION_WARNING = warnings.ExtractionWarning.CONTAINER_TYPE _CONTAINER_TYPE_PREPROCESSING_WARNING = ( warnings.PreprocessingWarning.CONTAINER_TYPE) _CONTAINER_TYPE_RECOVERY_WARNING = warnings.RecoveryWarning.CONTAINER_TYPE # The maximum number of cached event tags _MAXIMUM_CACHED_EVENT_TAGS = 32 * 1024
[docs] def __init__(self, storage_type=definitions.STORAGE_TYPE_SESSION): """Initializes a storage writer. Args: storage_type (Optional[str]): storage type. """ super(StorageWriter, self).__init__() self._attribute_containers_counter = collections.Counter() self._event_tag_per_event_identifier = collections.OrderedDict() self._storage_type = storage_type
def _CacheEventTagByEventIdentifier(self, event_tag, event_identifier): """Caches a specific event tag. Args: event_tag (EventTag): event tag. event_identifier (AttributeContainerIdentifier): event identifier. """ if len(self._event_tag_per_event_identifier) >= ( self._MAXIMUM_CACHED_EVENT_TAGS): self._event_tag_per_event_identifier.popitem(last=True) lookup_key = event_identifier.CopyToString() self._event_tag_per_event_identifier[lookup_key] = event_tag self._event_tag_per_event_identifier.move_to_end(lookup_key, last=False) def _GetCachedEventTag(self, event_identifier): """Retrieves a specific cached event tag. Args: event_identifier (AttributeContainerIdentifier): event identifier. Returns: EventTag: event tag or None if not available. Raises: IOError: when there is an error querying the storage file. OSError: when there is an error querying the storage file. """ lookup_key = event_identifier.CopyToString() event_tag = self._event_tag_per_event_identifier.get(lookup_key, None) if not event_tag: generator = self._store.GetAttributeContainers( self._CONTAINER_TYPE_EVENT_TAG, filter_expression=f'_event_identifier == "{lookup_key:s}"') existing_event_tags = list(generator) if len(existing_event_tags) == 1: event_tag = existing_event_tags[0] if len(self._event_tag_per_event_identifier) >= ( self._MAXIMUM_CACHED_EVENT_TAGS): self._event_tag_per_event_identifier.popitem(last=True) self._event_tag_per_event_identifier[lookup_key] = event_tag if event_tag: self._event_tag_per_event_identifier.move_to_end(lookup_key, last=False) return event_tag def _RaiseIfNotWritable(self): """Raises if the storage writer is not writable. Raises: IOError: when the storage writer is closed. OSError: when the storage writer is closed. """ if not self._store: raise IOError('Unable to write to closed storage writer.')
[docs] def AddAttributeContainer(self, container): """Adds an attribute container. Args: container (AttributeContainer): attribute container. Raises: IOError: when the storage writer is closed. OSError: when the storage writer is closed. """ self._RaiseIfNotWritable() self._store.AddAttributeContainer(container) self._attribute_containers_counter[container.CONTAINER_TYPE] += 1
[docs] def AddOrUpdateEventTag(self, event_tag): """Adds a new or updates an existing event tag. Args: event_tag (EventTag): event tag. Raises: IOError: when the storage writer is closed. OSError: when the storage writer is closed. """ self._RaiseIfNotWritable() event_identifier = event_tag.GetEventIdentifier() existing_event_tag = self._GetCachedEventTag(event_identifier) if not existing_event_tag: self.AddAttributeContainer(event_tag) self._CacheEventTagByEventIdentifier(event_tag, event_identifier) else: if not set(existing_event_tag.labels).issubset(event_tag.labels): # No need to update the storage if all the labels are already set. existing_event_tag.AddLabels(event_tag.labels) self._store.UpdateAttributeContainer(existing_event_tag) if self._storage_type == definitions.STORAGE_TYPE_TASK: self._attribute_containers_counter[self._CONTAINER_TYPE_EVENT_TAG] += 1
[docs] def Close(self): """Closes the storage writer. Raises: IOError: when the storage writer is closed. OSError: when the storage writer is closed. """ self._RaiseIfNotWritable() self._store.Close() self._store = None
[docs] @abc.abstractmethod def GetFirstWrittenEventData(self): """Retrieves the first event data that was written after open. Using GetFirstWrittenEventData and GetNextWrittenEventData newly added event data can be retrieved in order of addition. Returns: EventData: event data or None if there are no newly written ones. """
[docs] @abc.abstractmethod def GetNextWrittenEventData(self): """Retrieves the next event data that was written after open. Returns: EventData: event data or None if there are no newly written ones. """
[docs] @abc.abstractmethod def GetFirstWrittenEventSource(self): """Retrieves the first event source that was written after open. Using GetFirstWrittenEventSource and GetNextWrittenEventSource newly added event sources can be retrieved in order of addition. Returns: EventSource: event source or None if there are no newly written ones. """
[docs] @abc.abstractmethod def GetNextWrittenEventSource(self): """Retrieves the next event source that was written after open. Returns: EventSource: event source or None if there are no newly written ones. """
[docs] @abc.abstractmethod def Open(self, **kwargs): """Opens the storage writer."""
[docs] def UpdateAttributeContainer(self, container): """Updates an existing attribute container. Args: container (AttributeContainer): attribute container. Raises: IOError: when the storage writer is closed. OSError: when the storage writer is closed. """ self._RaiseIfNotWritable() self._store.UpdateAttributeContainer(container)