Source code for plaso.storage.redis.redis_store

# -*- coding: utf-8 -*-
"""Redis store.

Only supports task storage at the moment.
"""

import uuid

import redis  # pylint: disable=import-error

from acstore import interface
from acstore.containers import interface as containers_interface

from plaso.containers import events
from plaso.lib import definitions
from plaso.serializer import json_serializer
from plaso.storage import logger


[docs]class RedisStore(interface.AttributeContainerStore): """Redis store. Attribute containers are stored as Redis Hashes. All keys are prefixed with the session identifier to avoid collisions. Event identifiers are also stored in an index to enable sorting. """ _CONTAINER_TYPE_EVENT = events.EventObject.CONTAINER_TYPE _CONTAINER_TYPE_EVENT_DATA = events.EventData.CONTAINER_TYPE _CONTAINER_TYPE_EVENT_DATA_STREAM = events.EventDataStream.CONTAINER_TYPE _FORMAT_VERSION = '20181013' _EVENT_INDEX_NAME = 'sorted_event_identifier' DEFAULT_REDIS_URL = 'redis://127.0.0.1/0' def __init__(self): """Initializes a Redis store.""" super(RedisStore, self).__init__() self._redis_client = None self._session_identifier = None self._serializer = json_serializer.JSONAttributeContainerSerializer self._serializers_profiler = None self._task_identifier = None self.serialization_format = definitions.SERIALIZER_FORMAT_JSON def _DeserializeAttributeContainer(self, container_type, serialized_data): """Deserializes an attribute container. Args: container_type (str): attribute container type. serialized_data (bytes): serialized attribute container data. Returns: AttributeContainer: attribute container or None. Raises: IOError: if the serialized data cannot be decoded. OSError: if the serialized data cannot be decoded. """ if not serialized_data: return None if self._serializers_profiler: self._serializers_profiler.StartTiming(container_type) try: serialized_string = serialized_data.decode('utf-8') attribute_container = self._serializer.ReadSerialized(serialized_string) except UnicodeDecodeError as exception: raise IOError('Unable to decode serialized data: {0!s}'.format(exception)) except (TypeError, ValueError) as exception: # TODO: consider re-reading attribute container with error correction. raise IOError('Unable to read serialized data: {0!s}'.format(exception)) finally: if self._serializers_profiler: self._serializers_profiler.StopTiming(container_type) return attribute_container def _GetRedisHashName(self, container_type): """Retrieves the Redis hash name of the attribute container type. Args: container_type (str): container type. Returns: str: a Redis key name. """ return '{0:s}-{1:s}-{2:s}'.format( self._session_identifier, self._task_identifier, container_type) def _RaiseIfNotReadable(self): """Checks that the store is ready to for reading. Raises: IOError: if the store cannot be read from. OSError: if the store cannot be read from. """ if not self._redis_client: raise IOError('Unable to read, client not connected.') def _RaiseIfNotWritable(self): """Checks that the store is ready to for writing. Raises: IOError: if the store cannot be written to. OSError: if the store cannot be written to. """ if not self._redis_client: raise IOError('Unable to write, client not connected.') def _SerializeAttributeContainer(self, attribute_container): """Serializes an attribute container. Args: attribute_container (AttributeContainer): attribute container. Returns: bytes: serialized attribute container. Raises: IOError: if the attribute container cannot be serialized. OSError: if the attribute container cannot be serialized. """ if self._serializers_profiler: self._serializers_profiler.StartTiming( attribute_container.CONTAINER_TYPE) try: attribute_container_data = self._serializer.WriteSerialized( attribute_container) if not attribute_container_data: raise IOError( 'Unable to serialize attribute container: {0:s}.'.format( attribute_container.CONTAINER_TYPE)) finally: if self._serializers_profiler: self._serializers_profiler.StopTiming( attribute_container.CONTAINER_TYPE) return attribute_container_data @classmethod def _SetClientName(cls, redis_client, name): """Attempts to sets a Redis client name. This method ignores errors from the Redis server or exceptions indicating the method is missing, as setting the name is not a critical function, and it is not currently supported by the fakeredis test library. Args: redis_client (Redis): an open Redis client. name (str): name to set. """ try: redis_client.client_setname(name) except redis.ResponseError as exception: logger.debug( 'Unable to set redis client name: {0:s} with error: {1!s}'.format( name, exception)) def _UpdateAttributeContainerAfterDeserialize(self, container): """Updates an attribute container after deserialization. Args: container (AttributeContainer): attribute container. Raises: ValueError: if an attribute container identifier is missing. """ if container.CONTAINER_TYPE == self._CONTAINER_TYPE_EVENT: identifier = getattr(container, '_event_data_identifier', None) if identifier: event_data_identifier = ( containers_interface.AttributeContainerIdentifier( name=self._CONTAINER_TYPE_EVENT_DATA, sequence_number=identifier)) container.SetEventDataIdentifier(event_data_identifier) elif container.CONTAINER_TYPE == self._CONTAINER_TYPE_EVENT_DATA: identifier = getattr(container, '_event_data_stream_identifier', None) if identifier: event_data_stream_identifier = ( containers_interface.AttributeContainerIdentifier( name=self._CONTAINER_TYPE_EVENT_DATA_STREAM, sequence_number=identifier)) container.SetEventDataStreamIdentifier(event_data_stream_identifier) def _UpdateAttributeContainerBeforeSerialize(self, container): """Updates an attribute container before serialization. Args: container (AttributeContainer): attribute container. Raises: IOError: if the attribute container identifier type is not supported. OSError: if the attribute container identifier type is not supported. """ if container.CONTAINER_TYPE == self._CONTAINER_TYPE_EVENT: event_data_identifier = container.GetEventDataIdentifier() if event_data_identifier: setattr(container, '_event_data_identifier', event_data_identifier.sequence_number) elif container.CONTAINER_TYPE == self._CONTAINER_TYPE_EVENT_DATA: event_data_stream_identifier = container.GetEventDataStreamIdentifier() if event_data_stream_identifier: setattr(container, '_event_data_stream_identifier', event_data_stream_identifier.sequence_number) def _WriteExistingAttributeContainer(self, container): """Writes an existing attribute container to the store. Args: container (AttributeContainer): attribute container. Raises: IOError: if an unsupported identifier is provided or if the attribute container does not exist. RuntimeError: since this method is not implemented. OSError: if an unsupported identifier is provided or if the attribute container does not exist. """ identifier = container.GetIdentifier() redis_hash_name = self._GetRedisHashName(container.CONTAINER_TYPE) redis_key = identifier.CopyToString() self._UpdateAttributeContainerBeforeSerialize(container) serialized_data = self._SerializeAttributeContainer(container) self._redis_client.hset( redis_hash_name, key=redis_key, value=serialized_data) def _WriteNewAttributeContainer(self, container): """Writes a new attribute container to the store. Args: container (AttributeContainer): attribute container. """ next_sequence_number = self._GetAttributeContainerNextSequenceNumber( container.CONTAINER_TYPE) identifier = containers_interface.AttributeContainerIdentifier( name=container.CONTAINER_TYPE, sequence_number=next_sequence_number) container.SetIdentifier(identifier) redis_hash_name = self._GetRedisHashName(container.CONTAINER_TYPE) redis_key = identifier.CopyToString() self._UpdateAttributeContainerBeforeSerialize(container) serialized_data = self._SerializeAttributeContainer(container) self._redis_client.hsetnx(redis_hash_name, redis_key, serialized_data) if container.CONTAINER_TYPE == self._CONTAINER_TYPE_EVENT: index_name = self._GetRedisHashName(self._EVENT_INDEX_NAME) self._redis_client.zincrby(index_name, container.timestamp, redis_key) def _WriteStorageMetadata(self): """Writes the storage metadata.""" metadata = { 'format_version': self._FORMAT_VERSION, 'serialization_format': self.serialization_format} metadata_key = self._GetRedisHashName('metadata') for key, value in metadata.items(): self._redis_client.hset(metadata_key, key=key, value=value)
[docs] def Close(self): """Closes the store. Raises: IOError: if the store is already closed. OSError: if the store is already closed. """ if not self._redis_client: raise IOError('Store already closed.') self._redis_client = None
[docs] def GetAttributeContainerByIdentifier(self, container_type, identifier): """Retrieves a specific type of container with a specific identifier. Args: container_type (str): container type. identifier (AttributeContainerIdentifier): attribute container identifier. Returns: AttributeContainer: attribute container or None if not available. Raises: IOError: when the store is closed or if an unsupported identifier is provided. OSError: when the store is closed or if an unsupported identifier is provided. """ redis_hash_name = self._GetRedisHashName(container_type) redis_key = identifier.CopyToString() serialized_data = self._redis_client.hget(redis_hash_name, redis_key) if not serialized_data: return None attribute_container = self._DeserializeAttributeContainer( container_type, serialized_data) attribute_container.SetIdentifier(identifier) self._UpdateAttributeContainerAfterDeserialize(attribute_container) return attribute_container
[docs] def GetAttributeContainerByIndex(self, container_type, index): """Retrieves a specific attribute container. Args: container_type (str): attribute container type. index (int): attribute container index. Returns: AttributeContainer: attribute container or None if not available. """ sequence_number = index + 1 redis_hash_name = self._GetRedisHashName(container_type) redis_key = '{0:s}.{1:d}'.format(container_type, sequence_number) serialized_data = self._redis_client.hget(redis_hash_name, redis_key) if not serialized_data: return None attribute_container = self._DeserializeAttributeContainer( container_type, serialized_data) identifier = containers_interface.AttributeContainerIdentifier( name=container_type, sequence_number=sequence_number) attribute_container.SetIdentifier(identifier) self._UpdateAttributeContainerAfterDeserialize(attribute_container) return attribute_container
[docs] def GetAttributeContainers(self, container_type, filter_expression=None): """Retrieves attribute containers Args: container_type (str): container type attribute of the container being added. filter_expression (Optional[str]): expression to filter the resulting attribute containers by. Yields: AttributeContainer: attribute container. """ redis_hash_name = self._GetRedisHashName(container_type) for redis_key, serialized_data in self._redis_client.hscan_iter( redis_hash_name): redis_key = redis_key.decode('utf-8') attribute_container = self._DeserializeAttributeContainer( container_type, serialized_data) _, sequence_number = redis_key.split('.') sequence_number = int(sequence_number, 10) identifier = containers_interface.AttributeContainerIdentifier( name=container_type, sequence_number=sequence_number) attribute_container.SetIdentifier(identifier) self._UpdateAttributeContainerAfterDeserialize(attribute_container) # TODO: map filter expression to Redis native filter. if attribute_container.MatchesExpression(filter_expression): yield attribute_container
[docs] def GetNumberOfAttributeContainers(self, container_type): """Retrieves the number of a specific type of attribute containers. Args: container_type (str): attribute container type. Returns: int: the number of containers of a specified type. """ redis_hash_name = self._GetRedisHashName(container_type) return self._redis_client.hlen(redis_hash_name)
[docs] def GetSerializedAttributeContainers( self, container_type, cursor, maximum_number_of_items): """Fetches serialized attribute containers. Args: container_type (str): attribute container type. cursor (int): Redis cursor. maximum_number_of_items (int): maximum number of containers to retrieve, where 0 represent no limit. Returns: tuple: containing: int: Redis cursor. list[bytes]: serialized attribute containers. """ name = self._GetRedisHashName(container_type) # Redis treats None as meaning "no limit", not 0. if maximum_number_of_items == 0: maximum_number_of_items = None cursor, items = self._redis_client.hscan( name, cursor=cursor, count=maximum_number_of_items) return cursor, items
[docs] def GetSortedEvents(self, time_range=None): """Retrieves the events in increasing chronological order. Args: time_range (Optional[TimeRange]): This argument is not supported by the Redis store. Yields: EventObject: event. Raises: RuntimeError: if a time_range argument is specified. """ event_index_name = self._GetRedisHashName(self._EVENT_INDEX_NAME) if time_range: raise RuntimeError('Not supported') for redis_key, _ in self._redis_client.zscan_iter(event_index_name): redis_key = redis_key.decode('utf-8') container_type, sequence_number = redis_key.split('.') sequence_number = int(sequence_number, 10) identifier = containers_interface.AttributeContainerIdentifier( name=container_type, sequence_number=sequence_number) yield self.GetAttributeContainerByIdentifier( self._CONTAINER_TYPE_EVENT, identifier)
[docs] def HasAttributeContainers(self, container_type): """Determines if the store contains a specific type of attribute container. Args: container_type (str): attribute container type. Returns: bool: True if the store contains the specified type of attribute containers. """ redis_hash_name = self._GetRedisHashName(container_type) number_of_containers = self._redis_client.hlen(redis_hash_name) return number_of_containers > 0
# pylint: disable=arguments-differ
[docs] def Open( self, redis_client=None, session_identifier=None, task_identifier=None, url=None, **unused_kwargs): """Opens the store. Args: redis_client (Optional[Redis]): Redis client to query. If specified, no new client will be created. If no client is specified a new client will be opened connected to the Redis instance specified by 'url'. session_identifier (Optional[str]): identifier of the session. task_identifier (Optional[str]): unique identifier of the task the store will store containers for. If not specified, an identifier will be generated. url (Optional[str]): URL for a Redis database. If not specified, the DEFAULT_REDIS_URL will be used. Raises: IOError: if the store is already connected to a Redis instance. OSError: if the store is already connected to a Redis instance. """ if self._redis_client: raise IOError('Redis client already connected') if not redis_client: if not url: url = self.DEFAULT_REDIS_URL redis_client = redis.from_url(url=url, socket_timeout=60) self._redis_client = redis_client self._session_identifier = session_identifier or str(uuid.uuid4()) self._task_identifier = task_identifier or str(uuid.uuid4()) client_name = self._GetRedisHashName('') self._SetClientName(self._redis_client, client_name) metadata_key = self._GetRedisHashName('metadata') if not self._redis_client.exists(metadata_key): self._WriteStorageMetadata()
[docs] def SetSerializersProfiler(self, serializers_profiler): """Sets the serializers profiler. Args: serializers_profiler (SerializersProfiler): serializers profiler. """ self._serializers_profiler = serializers_profiler