Source code for plaso.multi_process.task_engine

# -*- coding: utf-8 -*-
"""The task-based multi-process processing engine."""

import os
import shutil
import tempfile

from plaso.lib import definitions
from plaso.multi_process import engine
from plaso.storage import factory as storage_factory

try:
  # pylint: disable=ungrouped-imports
  import redis
  from plaso.storage.redis import redis_store
except ModuleNotFoundError:
  redis = None
  redis_store = None


[docs] class TaskMultiProcessEngine(engine.MultiProcessEngine): """Task-based multi-process engine base. This class contains functionality to: * manage task storage used to store task results. """ # pylint: disable=abstract-method
[docs] def __init__(self): """Initializes a task-based multi-process engine.""" super(TaskMultiProcessEngine, self).__init__() self._merge_task_storage_path = None self._processing_configuration = None self._processed_task_storage_path = None self._redis_client = None self._storage_file_path = None self._task_storage_path = None
# TODO: remove, currently only used by psort. def _CheckTaskReadyForMerge(self, task_storage_format, task): """Checks if a task is ready for merging with this session storage. Args: task_storage_format (str): storage format used to store task results. task (Task): task the storage changes are part of. Returns: bool: True if the task is ready to be merged. Raises: IOError: if the size of the SQLite task storage file cannot be determined. OSError: if the size of the SQLite task storage file cannot be determined. """ if task_storage_format == definitions.STORAGE_FORMAT_SQLITE: processed_storage_file_path = self._GetProcessedStorageFilePath(task) try: stat_info = os.stat(processed_storage_file_path) except (IOError, OSError): return False task.storage_file_size = stat_info.st_size return True return False def _GetMergeTaskStorage(self, task_storage_format, task): """Retrieves a task store ready to be merged with the session store. Args: task_storage_format (str): storage format used to store task results. task (Task): task the storage changes are part of. Returns: StorageReader: storage reader of the task storage. Raises: IOError: if the temporary path for the task storage does not exist or if the temporary path for the task storage doe not refers to a file. OSError: if the temporary path for the task storage does not exist or if the temporary path for the task storage doe not refers to a file. """ merge_storage_file_path = self._GetMergeTaskStorageFilePath( task_storage_format, task) if task_storage_format == definitions.STORAGE_FORMAT_SQLITE: if not self._merge_task_storage_path: raise IOError('Missing merge task storage path.') if not os.path.isfile(merge_storage_file_path): raise IOError('Merge task storage path is not a file.') task_storage_reader = ( storage_factory.StorageFactory.CreateTaskStorageReader( task_storage_format, task, merge_storage_file_path)) task_storage_reader.SetStorageProfiler(self._storage_profiler) return task_storage_reader def _GetMergeTaskStorageRedisHashName(self, task): """Retrieves the Redis hash name of a task store that should be merged. Args: task (Task): task the storage changes are part of. Returns: str: Redis hash name of a task store. """ return '{0:s}-merge'.format(task.session_identifier) def _GetMergeTaskStorageFilePath(self, task_storage_format, task): """Retrieves the path of a task storage file in the merge directory. Args: task_storage_format (str): storage format used to store task results. task (Task): task the storage changes are part of. Returns: str: path of a task storage file file in the merge directory or None if not set. """ if task_storage_format == definitions.STORAGE_FORMAT_SQLITE: filename = '{0:s}.plaso'.format(task.identifier) return os.path.join(self._merge_task_storage_path, filename) return None def _GetProcessedRedisHashName(self, session_identifier): """Retrieves the Redis hash name of a processed task store. Args: session_identifier (str): the identifier of the session the tasks are part of. Returns: str: Redis hash name of a task store. """ return '{0:s}-processed'.format(session_identifier) def _GetProcessedStorageFilePath(self, task): """Retrieves the path of a task storage file in the processed directory. Args: task (Task): task the storage changes are part of. Returns: str: path of a task storage file in the processed directory. """ filename = '{0:s}.plaso'.format(task.identifier) return os.path.join(self._processed_task_storage_path, filename) def _GetProcessedTaskIdentifiers( self, task_storage_format, session_identifier): """Identifiers for tasks which have been processed. Args: task_storage_format (str): storage format used to store task results. session_identifier (str): the identifier of the session the tasks are part of. Returns: list[str]: task identifiers that are processed. Raises: IOError: if the temporary path for the task storage does not exist. OSError: if the temporary path for the task storage does not exist. """ if task_storage_format == definitions.STORAGE_FORMAT_REDIS: redis_hash_name = self._GetProcessedRedisHashName(session_identifier) try: task_identifiers = self._redis_client.hkeys(redis_hash_name) task_identifiers = [ identifier.decode('utf-8') for identifier in task_identifiers] except redis.exceptions.TimeoutError: # If there is a timeout fetching identifiers, we assume that there are # no processed tasks. task_identifiers = [] elif task_storage_format == definitions.STORAGE_FORMAT_SQLITE: if not self._processed_task_storage_path: raise IOError('Missing processed task storage path.') task_identifiers = [ path.replace('.plaso', '') for path in os.listdir(self._processed_task_storage_path)] return task_identifiers def _PrepareMergeTaskStorage(self, task_storage_format, task): """Prepares a task storage for merging. Moves the task storage file from the processed directory to the merge directory. Args: task_storage_format (str): storage format used to store task results. task (Task): task the storage changes are part of. Raises: IOError: if the SQLite task storage file cannot be renamed. OSError: if the SQLite task storage file cannot be renamed. """ if task_storage_format == definitions.STORAGE_FORMAT_REDIS: # TODO: use number of attribute containers instead of file size? task.storage_file_size = 1000 redis_hash_name = self._GetProcessedRedisHashName(task.session_identifier) number_of_results = self._redis_client.hdel( redis_hash_name, task.identifier) if number_of_results == 0: raise IOError('Task identifier {0:s} was not processed'.format( task.identifier)) redis_hash_name = self._GetMergeTaskStorageRedisHashName(task) # TODO: set timestamp as value. self._redis_client.hset( redis_hash_name, key=task.identifier, value=b'true') elif task_storage_format == definitions.STORAGE_FORMAT_SQLITE: merge_storage_file_path = self._GetMergeTaskStorageFilePath( task_storage_format, task) processed_storage_file_path = self._GetProcessedStorageFilePath(task) task.storage_file_size = os.path.getsize(processed_storage_file_path) try: os.rename(processed_storage_file_path, merge_storage_file_path) except OSError as exception: raise IOError(( 'Unable to rename task storage file: {0:s} with error: ' '{1!s}').format(processed_storage_file_path, exception)) def _RemoveMergeTaskStorage(self, task_storage_format, task): """Removes a merge task storage. Args: task_storage_format (str): storage format used to store task results. task (Task): task the storage changes are part of. Raises: IOError: if a SQLite task storage file cannot be removed. OSError: if a SQLite task storage file cannot be removed. """ if task_storage_format == definitions.STORAGE_FORMAT_REDIS: redis_hash_pattern = '{0:s}-{1:s}-*'.format( task.session_identifier, task.identifier) for redis_hash_name in self._redis_client.keys(redis_hash_pattern): self._redis_client.delete(redis_hash_name) elif task_storage_format == definitions.STORAGE_FORMAT_SQLITE: merge_storage_file_path = self._GetMergeTaskStorageFilePath( task_storage_format, task) try: os.remove(merge_storage_file_path) except OSError as exception: raise IOError(( 'Unable to remove merge task storage file: {0:s} with error: ' '{1!s}').format(merge_storage_file_path, exception)) def _RemoveProcessedTaskStorage(self, task_storage_format, task): """Removes a processed task storage. Args: task_storage_format (str): storage format used to store task results. task (Task): task the storage changes are part of. Raises: IOError: if a SQLite task storage file cannot be removed. OSError: if a SQLite task storage file cannot be removed. """ if task_storage_format == definitions.STORAGE_FORMAT_REDIS: redis_hash_pattern = '{0:s}-{1:s}-*'.format( task.session_identifier, task.identifier) for redis_hash_name in self._redis_client.keys(redis_hash_pattern): self._redis_client.delete(redis_hash_name) elif task_storage_format == definitions.STORAGE_FORMAT_SQLITE: processed_storage_file_path = self._GetProcessedStorageFilePath(task) try: os.remove(processed_storage_file_path) except OSError as exception: raise IOError(( 'Unable to remove processed task storage file: {0:s} with error: ' '{1!s}').format(processed_storage_file_path, exception)) def _StartTaskStorage(self, task_storage_format): """Starts the task storage. Args: task_storage_format (str): storage format used to store task results. Raises: IOError: if the temporary path for the SQLite task storage already exists. OSError: if the temporary path for the SQLite task storage already exists. """ if task_storage_format == definitions.STORAGE_FORMAT_REDIS and redis_store: url = redis_store.RedisAttributeContainerStore.DEFAULT_REDIS_URL self._redis_client = redis.from_url(url=url, socket_timeout=60) self._redis_client.client_setname('task_engine') elif task_storage_format == definitions.STORAGE_FORMAT_SQLITE: if self._task_storage_path: raise IOError('SQLite task storage path already exists.') output_directory = os.path.dirname(self._storage_file_path) self._task_storage_path = tempfile.mkdtemp(dir=output_directory) self._merge_task_storage_path = os.path.join( self._task_storage_path, 'merge') os.mkdir(self._merge_task_storage_path) self._processed_task_storage_path = os.path.join( self._task_storage_path, 'processed') os.mkdir(self._processed_task_storage_path) self._processing_configuration.task_storage_path = self._task_storage_path def _StopTaskStorage( self, task_storage_format, session_identifier, abort=False): """Stops the task storage. The results of tasks will be lost on abort. Args: task_storage_format (str): storage format used to store task results. session_identifier (str): the identifier of the session the tasks are part of. abort (Optional[bool]): True to indicate the stop is issued on abort. """ if task_storage_format == definitions.STORAGE_FORMAT_REDIS: redis_hash_pattern = '{0:s}-*'.format(session_identifier) for redis_hash_name in self._redis_client.keys(redis_hash_pattern): self._redis_client.delete(redis_hash_name) self._redis_client = None elif task_storage_format == definitions.STORAGE_FORMAT_SQLITE: if os.path.isdir(self._merge_task_storage_path): if abort: shutil.rmtree(self._merge_task_storage_path) else: os.rmdir(self._merge_task_storage_path) if os.path.isdir(self._processed_task_storage_path): if abort: shutil.rmtree(self._processed_task_storage_path) else: os.rmdir(self._processed_task_storage_path) if os.path.isdir(self._task_storage_path): if abort: shutil.rmtree(self._task_storage_path) else: os.rmdir(self._task_storage_path) self._merge_task_storage_path = None self._processed_task_storage_path = None self._task_storage_path = None self._processing_configuration.task_storage_path = None