Source code for plaso.containers.tasks

"""Task attribute container definition."""

import time
import uuid

from acstore.containers import interface
from acstore.containers import manager

from plaso.lib import definitions


[docs] class Task(interface.AttributeContainer): """Task attribute container. A task describes a piece of work for a multi processing worker process for example a task to process a path specification or to analyze an event. Attributes: aborted (bool): True if the task was aborted. completion_time (int): time that the task was completed. Contains the number of micro seconds since January 1, 1970, 00:00:00 UTC. file_entry_type (str): dfVFS type of the file entry the path specification is referencing. has_retry (bool): True if the task was previously abandoned and a retry task was created, False otherwise. identifier (str): unique identifier of the task. last_processing_time (int): the last time the task was marked as being processed as number of milliseconds since January 1, 1970, 00:00:00 UTC. merge_priority (int): priority used for the task storage file merge, where a lower value indicates a higher priority to merge. path_spec (dfvfs.PathSpec): path specification. session_identifier (str): the identifier of the session the task is part of. start_time (int): time that the task was started. Contains the number of micro seconds since January 1, 1970, 00:00:00 UTC. storage_file_size (int): size of the storage file in bytes. storage_format (str): the format the task results are to be stored in. """ CONTAINER_TYPE = "task" SCHEMA = { "aborted": "bool", "completion_time": "int", "file_entry_type": "str", "has_retry": "bool", "identifier": "str", "last_processing_time": "int", "merge_priority": "int", "path_spec": "dfvfs.PathSpec", "session_identifier": "str", "start_time": "int", "storage_file_size": "int", "storage_format": "str", }
[docs] def __init__(self, session_identifier=None): """Initializes a task attribute container. Args: session_identifier (Optional[str]): identifier of the session the task is part of. """ super().__init__() self.aborted = False self.completion_time = None self.file_entry_type = None self.has_retry = False self.identifier = f"{uuid.uuid4().hex:s}" self.last_processing_time = None self.merge_priority = None self.path_spec = None self.session_identifier = session_identifier self.start_time = int(time.time() * definitions.MICROSECONDS_PER_SECOND) self.storage_file_size = None self.storage_format = None
# This method is necessary for heap sort.
[docs] def __lt__(self, other): """Compares if the task attribute container is less than the other. Args: other (Task): task attribute container to compare to. Returns: bool: True if the task attribute container is less than the other. """ return self.identifier < other.identifier
[docs] def CreateRetryTask(self): """Creates a new task to retry a previously abandoned task. The retry task will have a new identifier but most of the attributes will be a copy of the previously abandoned task. Returns: Task: a task to retry a previously abandoned task. """ retry_task = Task(session_identifier=self.session_identifier) retry_task.file_entry_type = self.file_entry_type retry_task.merge_priority = self.merge_priority retry_task.path_spec = self.path_spec retry_task.storage_file_size = self.storage_file_size retry_task.storage_format = self.storage_format self.has_retry = True return retry_task
[docs] def UpdateProcessingTime(self): """Updates the processing time to now.""" self.last_processing_time = int( time.time() * definitions.MICROSECONDS_PER_SECOND )
manager.AttributeContainersManager.RegisterAttributeContainer(Task)