Source code for plaso.storage.reader

# -*- coding: utf-8 -*-
"""The storage reader."""

from plaso.containers import events
from plaso.containers import sessions
from plaso.storage import logger


[docs] class StorageReader(object): """Storage reader interface.""" _CONTAINER_TYPE_SESSION = sessions.Session.CONTAINER_TYPE _CONTAINER_TYPE_EVENT_TAG = events.EventTag.CONTAINER_TYPE
[docs] def __init__(self): """Initializes a storage reader.""" super(StorageReader, self).__init__() self._serializers_profiler = None self._storage_profiler = None self._store = None
[docs] def __enter__(self): """Make usable with "with" statement.""" return self
# pylint: disable=unused-argument
[docs] def __exit__(self, exception_type, value, traceback): """Make usable with "with" statement.""" self.Close()
[docs] def Close(self): """Closes the storage reader.""" self._store.Close() self._store = None
[docs] def GetAttributeContainerByIdentifier(self, container_type, identifier): """Retrieves a specific type of container with a specific identifier. Args: container_type (str): container type. identifier (AttributeContainerIdentifier): attribute container identifier. Returns: AttributeContainer: attribute container or None if not available. """ return self._store.GetAttributeContainerByIdentifier( container_type, identifier)
[docs] def GetAttributeContainerByIndex(self, container_type, index): """Retrieves a specific attribute container. Args: container_type (str): attribute container type. index (int): attribute container index. Returns: AttributeContainer: attribute container or None if not available. """ return self._store.GetAttributeContainerByIndex(container_type, index)
[docs] def GetAttributeContainers(self, container_type, filter_expression=None): """Retrieves a specific type of attribute containers. Args: container_type (str): attribute container type. filter_expression (Optional[str]): expression to filter the resulting attribute containers by. Returns: generator(AttributeContainers): attribute container generator. """ return self._store.GetAttributeContainers( container_type, filter_expression=filter_expression)
[docs] def GetEventTagByEventIdentifer(self, event_identifier): """Retrieves the event tag of a specific event. Args: event_identifier (AttributeContainerIdentifier): event attribute container identifier. Returns: EventTag: event tag or None if the event has no event tag. """ lookup_key = event_identifier.CopyToString() event_tags = list(self.GetAttributeContainers( self._CONTAINER_TYPE_EVENT_TAG, filter_expression=f'_event_identifier == "{lookup_key:s}"')) if not event_tags: return None if len(event_tags) > 1: logger.warning('More than 1 event tag returned.') return event_tags[0]
[docs] def GetFormatVersion(self): """Retrieves the format version of the underlying storage file. Returns: int: the format version. """ return self._store.format_version
[docs] def GetNumberOfAttributeContainers(self, container_type): """Retrieves the number of a specific type of attribute containers. Args: container_type (str): attribute container type. Returns: int: the number of containers of a specified type. """ return self._store.GetNumberOfAttributeContainers(container_type)
[docs] def GetSerializationFormat(self): """Retrieves the serialization format of the underlying storage file. Returns: str: the serialization format. """ return self._store.serialization_format
[docs] def GetSessions(self): """Retrieves the sessions. Yields: Session: session attribute container. """ yield from self.GetAttributeContainers(self._CONTAINER_TYPE_SESSION)
[docs] def GetSortedEvents(self, time_range=None): """Retrieves the events in increasing chronological order. This includes all events written to the storage including those pending being flushed (written) to the storage. Args: time_range (Optional[TimeRange]): time range used to filter events that fall in a specific period. Returns: generator(EventObject): event generator. """ return self._store.GetSortedEvents(time_range=time_range)
[docs] def HasAttributeContainers(self, container_type): """Determines if a store contains a specific type of attribute container. Args: container_type (str): attribute container type. Returns: bool: True if the store contains the specified type of attribute containers. """ return self._store.HasAttributeContainers(container_type)
[docs] def SetSerializersProfiler(self, serializers_profiler): """Sets the serializers profiler. Args: serializers_profiler (SerializersProfiler): serializers profiler. """ self._serializers_profiler = serializers_profiler if self._store: self._store.SetSerializersProfiler(serializers_profiler)
[docs] def SetStorageProfiler(self, storage_profiler): """Sets the storage profiler. Args: storage_profiler (StorageProfiler): storage profiler. """ self._storage_profiler = storage_profiler if self._store: self._store.SetStorageProfiler(storage_profiler)