# -*- coding: utf-8 -*-
"""The storage reader."""
from plaso.containers import events
from plaso.containers import sessions
from plaso.storage import logger
[docs]
class StorageReader(object):
"""Storage reader interface."""
_CONTAINER_TYPE_SESSION = sessions.Session.CONTAINER_TYPE
_CONTAINER_TYPE_EVENT_TAG = events.EventTag.CONTAINER_TYPE
[docs]
def __init__(self):
"""Initializes a storage reader."""
super(StorageReader, self).__init__()
self._serializers_profiler = None
self._storage_profiler = None
self._store = None
[docs]
def __enter__(self):
"""Make usable with "with" statement."""
return self
# pylint: disable=unused-argument
[docs]
def __exit__(self, exception_type, value, traceback):
"""Make usable with "with" statement."""
self.Close()
[docs]
def Close(self):
"""Closes the storage reader."""
self._store.Close()
self._store = None
[docs]
def GetAttributeContainerByIdentifier(self, container_type, identifier):
"""Retrieves a specific type of container with a specific identifier.
Args:
container_type (str): container type.
identifier (AttributeContainerIdentifier): attribute container identifier.
Returns:
AttributeContainer: attribute container or None if not available.
"""
return self._store.GetAttributeContainerByIdentifier(
container_type, identifier)
[docs]
def GetAttributeContainerByIndex(self, container_type, index):
"""Retrieves a specific attribute container.
Args:
container_type (str): attribute container type.
index (int): attribute container index.
Returns:
AttributeContainer: attribute container or None if not available.
"""
return self._store.GetAttributeContainerByIndex(container_type, index)
[docs]
def GetAttributeContainers(self, container_type, filter_expression=None):
"""Retrieves a specific type of attribute containers.
Args:
container_type (str): attribute container type.
filter_expression (Optional[str]): expression to filter the resulting
attribute containers by.
Returns:
generator(AttributeContainers): attribute container generator.
"""
return self._store.GetAttributeContainers(
container_type, filter_expression=filter_expression)
[docs]
def GetEventTagByEventIdentifer(self, event_identifier):
"""Retrieves the event tag of a specific event.
Args:
event_identifier (AttributeContainerIdentifier): event attribute
container identifier.
Returns:
EventTag: event tag or None if the event has no event tag.
"""
lookup_key = event_identifier.CopyToString()
event_tags = list(self.GetAttributeContainers(
self._CONTAINER_TYPE_EVENT_TAG,
filter_expression=f'_event_identifier == "{lookup_key:s}"'))
if not event_tags:
return None
if len(event_tags) > 1:
logger.warning('More than 1 event tag returned.')
return event_tags[0]
[docs]
def GetNumberOfAttributeContainers(self, container_type):
"""Retrieves the number of a specific type of attribute containers.
Args:
container_type (str): attribute container type.
Returns:
int: the number of containers of a specified type.
"""
return self._store.GetNumberOfAttributeContainers(container_type)
[docs]
def GetSessions(self):
"""Retrieves the sessions.
Yields:
Session: session attribute container.
"""
yield from self.GetAttributeContainers(self._CONTAINER_TYPE_SESSION)
[docs]
def GetSortedEvents(self, time_range=None):
"""Retrieves the events in increasing chronological order.
This includes all events written to the storage including those pending
being flushed (written) to the storage.
Args:
time_range (Optional[TimeRange]): time range used to filter events
that fall in a specific period.
Returns:
generator(EventObject): event generator.
"""
return self._store.GetSortedEvents(time_range=time_range)
[docs]
def HasAttributeContainers(self, container_type):
"""Determines if a store contains a specific type of attribute container.
Args:
container_type (str): attribute container type.
Returns:
bool: True if the store contains the specified type of attribute
containers.
"""
return self._store.HasAttributeContainers(container_type)
[docs]
def SetSerializersProfiler(self, serializers_profiler):
"""Sets the serializers profiler.
Args:
serializers_profiler (SerializersProfiler): serializers profiler.
"""
self._serializers_profiler = serializers_profiler
if self._store:
self._store.SetSerializersProfiler(serializers_profiler)
[docs]
def SetStorageProfiler(self, storage_profiler):
"""Sets the storage profiler.
Args:
storage_profiler (StorageProfiler): storage profiler.
"""
self._storage_profiler = storage_profiler
if self._store:
self._store.SetStorageProfiler(storage_profiler)