"""The task-based multi-process processing analysis engine."""
import collections
import os
import time
from plaso.containers import counts
from plaso.containers import events
from plaso.containers import reports
from plaso.containers import tasks
from plaso.engine import processing_status
from plaso.lib import definitions
from plaso.lib import errors
from plaso.multi_process import analysis_process
from plaso.multi_process import logger
from plaso.multi_process import merge_helpers
from plaso.multi_process import plaso_queue
from plaso.multi_process import task_engine
from plaso.multi_process import zeromq_queue
[docs]
class AnalysisMultiProcessEngine(task_engine.TaskMultiProcessEngine):
"""Task-based multi-process analysis engine.
This class contains functionality to:
* monitor and manage analysis tasks;
* merge results returned by analysis worker processes.
"""
# pylint: disable=abstract-method
_CONTAINER_TYPE_ANALYSIS_REPORT = reports.AnalysisReport.CONTAINER_TYPE
_CONTAINER_TYPE_EVENT_TAG = events.EventTag.CONTAINER_TYPE
_PROCESS_JOIN_TIMEOUT = 5.0
_QUEUE_TIMEOUT = 10 * 60
[docs]
def __init__(self, worker_memory_limit=None, worker_timeout=None):
"""Initializes a task-based multi-process analysis engine.
Args:
worker_memory_limit (Optional[int]): maximum amount of memory a worker is
allowed to consume, where None represents the default memory limit
and 0 represents no limit.
worker_timeout (Optional[float]): number of minutes before a worker
process that is not providing status updates is considered inactive,
where None or 0.0 represents the default timeout.
"""
if worker_memory_limit is None:
worker_memory_limit = definitions.DEFAULT_WORKER_MEMORY_LIMIT
super().__init__()
self._analysis_plugins = {}
self._completed_analysis_processes = set()
self._data_location = None
self._event_filter_expression = None
self._event_labels_counter = None
self._event_queues = {}
self._events_status = processing_status.EventsStatus()
self._memory_profiler = None
self._merge_task = None
self._number_of_consumed_analysis_reports = 0
self._number_of_consumed_event_data = 0
self._number_of_consumed_event_tags = 0
self._number_of_consumed_events = 0
self._number_of_consumed_sources = 0
self._number_of_produced_analysis_reports = 0
self._number_of_produced_event_data = 0
self._number_of_produced_event_tags = 0
self._number_of_produced_events = 0
self._number_of_produced_sources = 0
self._processing_profiler = None
self._serializers_profiler = None
self._session = None
self._status = definitions.STATUS_INDICATOR_IDLE
self._status_update_callback = None
self._user_accounts = None
self._worker_memory_limit = worker_memory_limit
self._worker_timeout = worker_timeout or definitions.DEFAULT_WORKER_TIMEOUT
def _AnalyzeEvents(self, storage_writer, analysis_plugins, event_filter=None):
"""Analyzes events in a Plaso storage.
Args:
storage_writer (StorageWriter): storage writer.
analysis_plugins (dict[str, AnalysisPlugin]): analysis plugins that
should be run and their names.
event_filter (Optional[EventObjectFilter]): event filter.
Returns:
collections.Counter: counter containing information about the events
processed and filtered.
Raises:
RuntimeError: if a non-recoverable situation is encountered.
"""
self._status = definitions.STATUS_INDICATOR_RUNNING
self._number_of_consumed_analysis_reports = 0
self._number_of_consumed_event_data = 0
self._number_of_consumed_event_tags = 0
self._number_of_consumed_events = 0
self._number_of_consumed_sources = 0
self._number_of_produced_analysis_reports = 0
self._number_of_produced_event_data = 0
self._number_of_produced_event_tags = 0
self._number_of_produced_events = 0
self._number_of_produced_sources = 0
number_of_filtered_events = 0
logger.debug("Processing events.")
filter_limit = getattr(event_filter, "limit", None)
for event in storage_writer.GetSortedEvents():
event_data_identifier = event.GetEventDataIdentifier()
event_data = storage_writer.GetAttributeContainerByIdentifier(
events.EventData.CONTAINER_TYPE, event_data_identifier
)
event_data_stream_identifier = event_data.GetEventDataStreamIdentifier()
if event_data_stream_identifier:
event_data_stream = storage_writer.GetAttributeContainerByIdentifier(
events.EventDataStream.CONTAINER_TYPE, event_data_stream_identifier
)
else:
event_data_stream = None
event_identifier = event.GetIdentifier()
event_tag = storage_writer.GetEventTagByEventIdentifer(event_identifier)
if event_filter:
filter_match = event_filter.Match(
event, event_data, event_data_stream, event_tag
)
else:
filter_match = None
# pylint: disable=singleton-comparison
if filter_match == False:
number_of_filtered_events += 1
continue
for event_queue in self._event_queues.values():
# TODO: Check for premature exit of analysis plugins.
event_tripple = events.EventTripple()
event_tripple.event = event
event_tripple.event_data = event_data
event_tripple.event_data_stream = event_data_stream
event_queue.PushItem(event_tripple)
self._number_of_consumed_events += 1
if (
event_filter
and filter_limit
and filter_limit == self._number_of_consumed_events
):
break
logger.debug("Finished pushing events to analysis plugins.")
# Signal that we have finished adding events.
for event_queue in self._event_queues.values():
event_queue.PushItem(plaso_queue.QueueAbort(), block=False)
logger.debug("Processing analysis plugin results.")
# TODO: use a task based approach.
plugin_names = list(analysis_plugins.keys())
while plugin_names:
for plugin_name in list(plugin_names):
if self._abort:
break
# TODO: temporary solution.
task = tasks.Task()
task.storage_format = definitions.STORAGE_FORMAT_SQLITE
task.identifier = plugin_name
merge_ready = self._CheckTaskReadyForMerge(
definitions.STORAGE_FORMAT_SQLITE, task
)
if merge_ready:
self._PrepareMergeTaskStorage(
definitions.STORAGE_FORMAT_SQLITE, task
)
self._status = definitions.STATUS_INDICATOR_MERGING
event_queue = self._event_queues[plugin_name]
del self._event_queues[plugin_name]
event_queue.Close()
task_storage_reader = self._GetMergeTaskStorage(
definitions.STORAGE_FORMAT_SQLITE, task
)
try:
merge_helper = merge_helpers.AnalysisTaskMergeHelper(
task_storage_reader, task.identifier
)
logger.debug(
f"Starting merge of task: {merge_helper.task_identifier:s}"
)
number_of_containers = self._MergeAttributeContainers(
storage_writer, merge_helper
)
logger.debug(
(
f"Merged {number_of_containers:d} containers of task: "
f"{merge_helper.task_identifier:s}"
)
)
finally:
task_storage_reader.Close()
self._RemoveMergeTaskStorage(
definitions.STORAGE_FORMAT_SQLITE, task
)
self._status = definitions.STATUS_INDICATOR_RUNNING
# TODO: temporary solution.
plugin_names.remove(plugin_name)
events_counter = collections.Counter()
events_counter["Events filtered"] = number_of_filtered_events
events_counter["Events processed"] = self._number_of_consumed_events
return events_counter
def _CheckStatusAnalysisProcess(self, pid):
"""Checks the status of an analysis process.
Args:
pid (int): process ID (PID) of a registered analysis process.
Raises:
KeyError: if the process is not registered with the engine.
"""
# TODO: Refactor this method, simplify and separate concerns (monitoring
# vs management).
self._RaiseIfNotRegistered(pid)
if pid in self._completed_analysis_processes:
status_indicator = definitions.STATUS_INDICATOR_COMPLETED
process_status = {"processing_status": status_indicator}
used_memory = 0
else:
process = self._processes_per_pid[pid]
process_status = self._QueryProcessStatus(process)
if process_status is None:
process_is_alive = False
else:
process_is_alive = True
process_information = self._process_information_per_pid[pid]
used_memory = process_information.GetUsedMemory() or 0
if self._worker_memory_limit and used_memory > self._worker_memory_limit:
logger.warning(
f"Process: {process.name:s} (PID: {pid:d}) killed because it "
f"exceeded the memory limit: {self._worker_memory_limit:d}"
)
self._KillProcess(pid)
if isinstance(process_status, dict):
self._rpc_errors_per_pid[pid] = 0
status_indicator = process_status.get("processing_status")
if status_indicator == definitions.STATUS_INDICATOR_COMPLETED:
self._completed_analysis_processes.add(pid)
else:
rpc_errors = self._rpc_errors_per_pid.get(pid, 0) + 1
self._rpc_errors_per_pid[pid] = rpc_errors
if rpc_errors > self._MAXIMUM_RPC_ERRORS:
process_is_alive = False
if process_is_alive:
rpc_port = process.rpc_port.value
logger.warning(
f"Unable to retrieve process: {process.name:s} (PID: {pid:d}) "
f"status via RPC socket: http://localhost:{rpc_port:d}"
)
processing_status_string = "RPC error"
status_indicator = definitions.STATUS_INDICATOR_RUNNING
else:
processing_status_string = "killed"
status_indicator = definitions.STATUS_INDICATOR_KILLED
process_status = {"processing_status": processing_status_string}
self._UpdateProcessingStatus(pid, process_status, used_memory)
if status_indicator in definitions.ERROR_STATUS_INDICATORS:
logger.error(
f"Process {process.name:s} (PID: {pid:d}) is not functioning "
f"correctly. Status code: {status_indicator!s}."
)
self._TerminateProcessByPid(pid)
def _MergeAttributeContainers(self, storage_writer, merge_helper):
"""Merges attribute containers from a task store into the storage writer.
Args:
storage_writer (StorageWriter): storage writer.
merge_helper (AnalysisTaskMergeHelper): helper to merge attribute
containers.
Returns:
int: number of containers merged.
"""
number_of_containers = 0
container = merge_helper.GetAttributeContainer()
while container:
number_of_containers += 1
if container.CONTAINER_TYPE == self._CONTAINER_TYPE_EVENT_TAG:
storage_writer.AddOrUpdateEventTag(container)
else:
storage_writer.AddAttributeContainer(container)
if container.CONTAINER_TYPE == self._CONTAINER_TYPE_ANALYSIS_REPORT:
self._number_of_produced_analysis_reports += 1
elif container.CONTAINER_TYPE == self._CONTAINER_TYPE_EVENT_TAG:
self._number_of_produced_event_tags += 1
for label in container.labels:
self._event_labels_counter[label] += 1
self._event_labels_counter["total"] += 1
container = merge_helper.GetAttributeContainer()
return number_of_containers
def _StartAnalysisProcesses(self, analysis_plugins):
"""Starts the analysis processes.
Args:
analysis_plugins (dict[str, AnalysisPlugin]): analysis plugins that
should be run and their names.
"""
logger.info("Starting analysis plugins.")
for analysis_plugin in analysis_plugins.values():
self._analysis_plugins[analysis_plugin.NAME] = analysis_plugin
process = self._StartWorkerProcess(analysis_plugin.NAME)
if not process:
logger.error(
f"Unable to create analysis process: {analysis_plugin.NAME:s}"
)
logger.info("Analysis plugins running")
def _StartWorkerProcess(self, process_name):
"""Creates, starts, monitors and registers a worker process.
Args:
process_name (str): process name.
Returns:
MultiProcessWorkerProcess: extraction worker process or None on error.
"""
analysis_plugin = self._analysis_plugins.get(process_name)
if not analysis_plugin:
logger.error(f"Missing analysis plugin: {process_name:s}")
return None
queue_name = f"{process_name:s} output event queue"
output_event_queue = zeromq_queue.ZeroMQPushBindQueue(
name=queue_name, timeout_seconds=self._QUEUE_TIMEOUT
)
# Open the queue so it can bind to a random port, and we can get the
# port number to use in the input queue.
output_event_queue.Open()
self._event_queues[process_name] = output_event_queue
queue_name = f"{process_name:s} input event queue"
input_event_queue = zeromq_queue.ZeroMQPullConnectQueue(
name=queue_name,
delay_open=True,
port=output_event_queue.port,
timeout_seconds=self._QUEUE_TIMEOUT,
)
process = analysis_process.AnalysisProcess(
input_event_queue,
analysis_plugin,
self._processing_configuration,
self._user_accounts,
data_location=self._data_location,
event_filter_expression=self._event_filter_expression,
name=process_name,
)
process.start()
logger.info(
f"Started analysis plugin: {process_name:s} (PID: {process.pid:d})."
)
try:
self._StartMonitoringProcess(process)
except (KeyError, OSError) as exception:
logger.error(
(
f"Unable to monitor analysis plugin: {process_name:s} (PID: "
f"{process.pid:d}) with error: {exception!s}"
)
)
process.terminate()
return None
self._RegisterProcess(process)
return process
def _StopAnalysisProcesses(self, abort=False):
"""Stops the analysis processes.
Args:
abort (bool): True to indicated the stop is issued on abort.
"""
logger.debug("Stopping analysis processes.")
self._StopMonitoringProcesses()
if abort:
# Signal all the processes to abort.
self._AbortTerminate()
# Wake the processes to make sure that they are not blocking
# waiting for the queue new items.
for event_queue in self._event_queues.values():
event_queue.PushItem(plaso_queue.QueueAbort(), block=False)
# Try waiting for the processes to exit normally.
self._AbortJoin(timeout=self._PROCESS_JOIN_TIMEOUT)
for event_queue in self._event_queues.values():
event_queue.Close(abort=abort)
if abort:
# Kill any remaining processes.
self._AbortKill()
else:
# Check if the processes are still alive and terminate them if necessary.
self._AbortTerminate()
self._AbortJoin(timeout=self._PROCESS_JOIN_TIMEOUT)
for event_queue in self._event_queues.values():
event_queue.Close(abort=True)
def _UpdateForemanProcessStatus(self):
"""Update the foreman process status."""
used_memory = self._process_information.GetUsedMemory() or 0
display_name = getattr(self._merge_task, "identifier", "")
self._processing_status.UpdateForemanStatus(
self._name,
self._status,
self._pid,
used_memory,
display_name,
self._number_of_consumed_sources,
self._number_of_produced_sources,
self._number_of_consumed_event_data,
self._number_of_produced_event_data,
self._number_of_consumed_events,
self._number_of_produced_events,
self._number_of_consumed_event_tags,
self._number_of_produced_event_tags,
self._number_of_consumed_analysis_reports,
self._number_of_produced_analysis_reports,
)
self._processing_status.UpdateEventsStatus(self._events_status)
def _UpdateProcessingStatus(self, pid, process_status, used_memory):
"""Updates the processing status.
Args:
pid (int): process identifier (PID) of the worker process.
process_status (dict[str, object]): status values received from
the worker process.
used_memory (int): size of used memory in bytes.
Raises:
KeyError: if the process is not registered with the engine.
"""
self._RaiseIfNotRegistered(pid)
if not process_status:
return
process = self._processes_per_pid[pid]
status_indicator = process_status.get("processing_status")
self._RaiseIfNotMonitored(pid)
display_name = process_status.get("display_name", "")
number_of_consumed_event_data = process_status.get(
"number_of_consumed_event_data", None
)
number_of_produced_event_data = process_status.get(
"number_of_produced_event_data", None
)
number_of_consumed_event_tags = process_status.get(
"number_of_consumed_event_tags", None
)
number_of_produced_event_tags = process_status.get(
"number_of_produced_event_tags", None
)
number_of_consumed_events = process_status.get(
"number_of_consumed_events", None
)
number_of_produced_events = process_status.get(
"number_of_produced_events", None
)
number_of_consumed_reports = process_status.get(
"number_of_consumed_reports", None
)
number_of_produced_reports = process_status.get(
"number_of_produced_reports", None
)
number_of_consumed_sources = process_status.get(
"number_of_consumed_sources", None
)
number_of_produced_sources = process_status.get(
"number_of_produced_sources", None
)
if status_indicator != definitions.STATUS_INDICATOR_IDLE:
last_activity_timestamp = process_status.get("last_activity_timestamp", 0.0)
if last_activity_timestamp:
last_activity_timestamp += self._worker_timeout
current_timestamp = time.time()
if current_timestamp > last_activity_timestamp:
logger.error(
f"Process {process.name:s} (PID: {pid:d}) has not reported "
f"activity within the timeout period."
)
status_indicator = definitions.STATUS_INDICATOR_NOT_RESPONDING
self._processing_status.UpdateWorkerStatus(
process.name,
status_indicator,
pid,
used_memory,
display_name,
number_of_consumed_sources,
number_of_produced_sources,
number_of_consumed_event_data,
number_of_produced_event_data,
number_of_consumed_events,
number_of_produced_events,
number_of_consumed_event_tags,
number_of_produced_event_tags,
number_of_consumed_reports,
number_of_produced_reports,
)
def _UpdateStatus(self):
"""Update the status."""
# Make a local copy of the PIDs in case the dict is changed by
# the main thread.
for pid in list(self._process_information_per_pid.keys()):
self._CheckStatusAnalysisProcess(pid)
self._UpdateForemanProcessStatus()
if self._status_update_callback:
self._status_update_callback(self._processing_status)
# pylint: disable=too-many-arguments
[docs]
def AnalyzeEvents(
self,
session,
storage_writer,
data_location,
analysis_plugins,
processing_configuration,
event_filter=None,
event_filter_expression=None,
status_update_callback=None,
storage_file_path=None,
):
"""Analyzes events in a Plaso storage.
Args:
session (Session): session in which the events are analyzed.
storage_writer (StorageWriter): storage writer.
data_location (str): path to the location that data files should
be loaded from.
analysis_plugins (dict[str, AnalysisPlugin]): analysis plugins that
should be run and their names.
processing_configuration (ProcessingConfiguration): processing
configuration.
event_filter (Optional[EventObjectFilter]): event filter.
event_filter_expression (Optional[str]): event filter expression.
status_update_callback (Optional[function]): callback function for status
updates.
storage_file_path (Optional[str]): path to the session storage file.
Returns:
ProcessingStatus: processing status.
Raises:
KeyboardInterrupt: if a keyboard interrupt was raised.
ValueError: if analysis plugins are missing.
"""
if not analysis_plugins:
raise ValueError("Missing analysis plugins")
abort_kill = False
keyboard_interrupt = False
queue_full = False
self._analysis_plugins = {}
self._data_location = data_location
self._event_filter_expression = event_filter_expression
self._events_status = processing_status.EventsStatus()
self._processing_configuration = processing_configuration
self._session = session
self._status_update_callback = status_update_callback
self._storage_file_path = storage_file_path
self._user_accounts = list(
storage_writer.GetAttributeContainers("user_account")
)
stored_event_labels_counter = {}
if storage_writer.HasAttributeContainers("event_label_count"):
stored_event_labels_counter = {
event_label_count.label: event_label_count
for event_label_count in storage_writer.GetAttributeContainers(
"event_label_count"
)
}
self._event_labels_counter = collections.Counter()
if storage_writer.HasAttributeContainers("parser_count"):
parsers_counter = {
parser_count.name: parser_count.number_of_events
for parser_count in storage_writer.GetAttributeContainers(
"parser_count"
)
}
total_number_of_events = parsers_counter["total"]
else:
total_number_of_events = 0
for stored_session in storage_writer.GetSessions():
total_number_of_events += stored_session.parsers_counter["total"]
self._events_status.total_number_of_events = total_number_of_events
# Set up the storage writer before the analysis processes.
self._StartTaskStorage(definitions.STORAGE_FORMAT_SQLITE)
self._StartAnalysisProcesses(analysis_plugins)
self._StartProfiling(self._processing_configuration.profiling)
# Start the status update thread after open of the storage writer
# so we don't have to clean up the thread if the open fails.
self._StartStatusUpdateThread()
try:
self._AnalyzeEvents(
storage_writer, analysis_plugins, event_filter=event_filter
)
for key, value in self._event_labels_counter.items():
event_label_count = stored_event_labels_counter.get(key)
if event_label_count:
event_label_count.number_of_events += value
storage_writer.UpdateAttributeContainer(event_label_count)
else:
event_label_count = counts.EventLabelCount(
label=key, number_of_events=value
)
storage_writer.AddAttributeContainer(event_label_count)
self._status = definitions.STATUS_INDICATOR_FINALIZING
except errors.QueueFull:
queue_full = True
self._abort = True
except KeyboardInterrupt:
keyboard_interrupt = True
self._abort = True
finally:
self._processing_status.aborted = self._abort
session.aborted = self._abort
# Stop the status update thread after close of the storage writer
# so we include the storage sync to disk in the status updates.
self._StopStatusUpdateThread()
self._StopProfiling()
# Update the status view one last time before the analysis processses are
# stopped.
self._UpdateStatus()
if queue_full:
# TODO: handle abort on queue full more elegant.
abort_kill = True
else:
try:
self._StopAnalysisProcesses(abort=self._abort)
except KeyboardInterrupt:
keyboard_interrupt = True
abort_kill = True
if abort_kill:
self._AbortKill()
# The abort can leave the main process unresponsive
# due to incorrectly finalized IPC.
self._KillProcess(os.getpid())
try:
self._StopTaskStorage(
definitions.STORAGE_FORMAT_SQLITE, session.identifier, abort=self._abort
)
except OSError as exception:
logger.error(f"Unable to stop task storage with error: {exception!s}")
if self._abort:
logger.debug("Analysis aborted.")
else:
logger.debug("Analysis completed.")
# Update the status view one last time.
self._UpdateStatus()
# Reset values.
self._analysis_plugins = {}
self._data_location = None
self._event_filter_expression = None
self._processing_configuration = None
self._session = None
self._status_update_callback = None
self._storage_file_path = None
self._user_accounts = None
if keyboard_interrupt:
raise KeyboardInterrupt
return self._processing_status