Source code for plaso.storage.fake.fake_store

# -*- coding: utf-8 -*-
"""Fake (in-memory only) store for testing."""

from acstore import fake_store as acstore_fake_store

from plaso.containers import events
from plaso.storage.fake import event_heap


[docs] class FakeStore(acstore_fake_store.FakeAttributeContainerStore): """Fake (in-memory only) store for testing. Attributes: serialization_format (str): serialization format. """ _CONTAINER_TYPE_EVENT = events.EventObject.CONTAINER_TYPE
[docs] def __init__(self): """Initializes a fake (in-memory only) store.""" super(FakeStore, self).__init__() self._serializers_profiler = None self.serialization_format = None
[docs] def GetSortedEvents(self, time_range=None): """Retrieves the events in increasing chronological order. Args: time_range (Optional[TimeRange]): time range used to filter events that fall in a specific period. Returns: generator(EventObject): event generator. Raises: IOError: when the storage writer is closed. OSError: when the storage writer is closed. """ if not self._is_open: raise IOError('Unable to read from closed storage writer.') generator = self.GetAttributeContainers(self._CONTAINER_TYPE_EVENT) sorted_events = event_heap.EventHeap() for event_index, event in enumerate(generator): if (time_range and ( event.timestamp < time_range.start_timestamp or event.timestamp > time_range.end_timestamp)): continue # The event index is used to ensure to sort events with the same date and # time and description in the order they were added to the store. sorted_events.PushEvent(event, event_index) return iter(sorted_events.PopEvents())
[docs] def SetSerializersProfiler(self, serializers_profiler): """Sets the serializers profiler. Args: serializers_profiler (SerializersProfiler): serializers profiler. """ self._serializers_profiler = serializers_profiler