Source code for plaso.multi_process.analysis_engine

# -*- coding: utf-8 -*-
"""The task-based multi-process processing analysis engine."""

import collections
import os
import time

from plaso.containers import counts
from plaso.containers import events
from plaso.containers import reports
from plaso.containers import tasks
from plaso.engine import processing_status
from plaso.lib import definitions
from plaso.lib import errors
from plaso.multi_process import analysis_process
from plaso.multi_process import logger
from plaso.multi_process import merge_helpers
from plaso.multi_process import plaso_queue
from plaso.multi_process import task_engine
from plaso.multi_process import zeromq_queue


[docs] class AnalysisMultiProcessEngine(task_engine.TaskMultiProcessEngine): """Task-based multi-process analysis engine. This class contains functionality to: * monitor and manage analysis tasks; * merge results returned by analysis worker processes. """ # pylint: disable=abstract-method _CONTAINER_TYPE_ANALYSIS_REPORT = reports.AnalysisReport.CONTAINER_TYPE _CONTAINER_TYPE_EVENT_TAG = events.EventTag.CONTAINER_TYPE _PROCESS_JOIN_TIMEOUT = 5.0 _QUEUE_TIMEOUT = 10 * 60
[docs] def __init__(self, worker_memory_limit=None, worker_timeout=None): """Initializes a task-based multi-process analysis engine. Args: worker_memory_limit (Optional[int]): maximum amount of memory a worker is allowed to consume, where None represents the default memory limit and 0 represents no limit. worker_timeout (Optional[float]): number of minutes before a worker process that is not providing status updates is considered inactive, where None or 0.0 represents the default timeout. """ if worker_memory_limit is None: worker_memory_limit = definitions.DEFAULT_WORKER_MEMORY_LIMIT super(AnalysisMultiProcessEngine, self).__init__() self._analysis_plugins = {} self._completed_analysis_processes = set() self._data_location = None self._event_filter_expression = None self._event_labels_counter = None self._event_queues = {} self._events_status = processing_status.EventsStatus() self._memory_profiler = None self._merge_task = None self._number_of_consumed_analysis_reports = 0 self._number_of_consumed_event_data = 0 self._number_of_consumed_event_tags = 0 self._number_of_consumed_events = 0 self._number_of_consumed_sources = 0 self._number_of_produced_analysis_reports = 0 self._number_of_produced_event_data = 0 self._number_of_produced_event_tags = 0 self._number_of_produced_events = 0 self._number_of_produced_sources = 0 self._processing_profiler = None self._serializers_profiler = None self._session = None self._status = definitions.STATUS_INDICATOR_IDLE self._status_update_callback = None self._user_accounts = None self._worker_memory_limit = worker_memory_limit self._worker_timeout = worker_timeout or definitions.DEFAULT_WORKER_TIMEOUT
def _AnalyzeEvents(self, storage_writer, analysis_plugins, event_filter=None): """Analyzes events in a Plaso storage. Args: storage_writer (StorageWriter): storage writer. analysis_plugins (dict[str, AnalysisPlugin]): analysis plugins that should be run and their names. event_filter (Optional[EventObjectFilter]): event filter. Returns: collections.Counter: counter containing information about the events processed and filtered. Raises: RuntimeError: if a non-recoverable situation is encountered. """ self._status = definitions.STATUS_INDICATOR_RUNNING self._number_of_consumed_analysis_reports = 0 self._number_of_consumed_event_data = 0 self._number_of_consumed_event_tags = 0 self._number_of_consumed_events = 0 self._number_of_consumed_sources = 0 self._number_of_produced_analysis_reports = 0 self._number_of_produced_event_data = 0 self._number_of_produced_event_tags = 0 self._number_of_produced_events = 0 self._number_of_produced_sources = 0 number_of_filtered_events = 0 logger.debug('Processing events.') filter_limit = getattr(event_filter, 'limit', None) for event in storage_writer.GetSortedEvents(): event_data_identifier = event.GetEventDataIdentifier() event_data = storage_writer.GetAttributeContainerByIdentifier( events.EventData.CONTAINER_TYPE, event_data_identifier) event_data_stream_identifier = event_data.GetEventDataStreamIdentifier() if event_data_stream_identifier: event_data_stream = storage_writer.GetAttributeContainerByIdentifier( events.EventDataStream.CONTAINER_TYPE, event_data_stream_identifier) else: event_data_stream = None event_identifier = event.GetIdentifier() event_tag = storage_writer.GetEventTagByEventIdentifer(event_identifier) if event_filter: filter_match = event_filter.Match( event, event_data, event_data_stream, event_tag) else: filter_match = None # pylint: disable=singleton-comparison if filter_match == False: number_of_filtered_events += 1 continue for event_queue in self._event_queues.values(): # TODO: Check for premature exit of analysis plugins. event_queue.PushItem((event, event_data, event_data_stream)) self._number_of_consumed_events += 1 if (event_filter and filter_limit and filter_limit == self._number_of_consumed_events): break logger.debug('Finished pushing events to analysis plugins.') # Signal that we have finished adding events. for event_queue in self._event_queues.values(): event_queue.PushItem(plaso_queue.QueueAbort(), block=False) logger.debug('Processing analysis plugin results.') # TODO: use a task based approach. plugin_names = list(analysis_plugins.keys()) while plugin_names: for plugin_name in list(plugin_names): if self._abort: break # TODO: temporary solution. task = tasks.Task() task.storage_format = definitions.STORAGE_FORMAT_SQLITE task.identifier = plugin_name merge_ready = self._CheckTaskReadyForMerge( definitions.STORAGE_FORMAT_SQLITE, task) if merge_ready: self._PrepareMergeTaskStorage(definitions.STORAGE_FORMAT_SQLITE, task) self._status = definitions.STATUS_INDICATOR_MERGING event_queue = self._event_queues[plugin_name] del self._event_queues[plugin_name] event_queue.Close() task_storage_reader = self._GetMergeTaskStorage( definitions.STORAGE_FORMAT_SQLITE, task) try: merge_helper = merge_helpers.AnalysisTaskMergeHelper( task_storage_reader, task.identifier) logger.debug('Starting merge of task: {0:s}'.format( merge_helper.task_identifier)) number_of_containers = self._MergeAttributeContainers( storage_writer, merge_helper) logger.debug('Merged {0:d} containers of task: {1:s}'.format( number_of_containers, merge_helper.task_identifier)) finally: task_storage_reader.Close() self._RemoveMergeTaskStorage( definitions.STORAGE_FORMAT_SQLITE, task) self._status = definitions.STATUS_INDICATOR_RUNNING # TODO: temporary solution. plugin_names.remove(plugin_name) events_counter = collections.Counter() events_counter['Events filtered'] = number_of_filtered_events events_counter['Events processed'] = self._number_of_consumed_events return events_counter def _CheckStatusAnalysisProcess(self, pid): """Checks the status of an analysis process. Args: pid (int): process ID (PID) of a registered analysis process. Raises: KeyError: if the process is not registered with the engine. """ # TODO: Refactor this method, simplify and separate concerns (monitoring # vs management). self._RaiseIfNotRegistered(pid) if pid in self._completed_analysis_processes: status_indicator = definitions.STATUS_INDICATOR_COMPLETED process_status = { 'processing_status': status_indicator} used_memory = 0 else: process = self._processes_per_pid[pid] process_status = self._QueryProcessStatus(process) if process_status is None: process_is_alive = False else: process_is_alive = True process_information = self._process_information_per_pid[pid] used_memory = process_information.GetUsedMemory() or 0 if self._worker_memory_limit and used_memory > self._worker_memory_limit: logger.warning(( 'Process: {0:s} (PID: {1:d}) killed because it exceeded the ' 'memory limit: {2:d}.').format( process.name, pid, self._worker_memory_limit)) self._KillProcess(pid) if isinstance(process_status, dict): self._rpc_errors_per_pid[pid] = 0 status_indicator = process_status.get('processing_status', None) if status_indicator == definitions.STATUS_INDICATOR_COMPLETED: self._completed_analysis_processes.add(pid) else: rpc_errors = self._rpc_errors_per_pid.get(pid, 0) + 1 self._rpc_errors_per_pid[pid] = rpc_errors if rpc_errors > self._MAXIMUM_RPC_ERRORS: process_is_alive = False if process_is_alive: rpc_port = process.rpc_port.value logger.warning(( 'Unable to retrieve process: {0:s} (PID: {1:d}) status via ' 'RPC socket: http://localhost:{2:d}').format( process.name, pid, rpc_port)) processing_status_string = 'RPC error' status_indicator = definitions.STATUS_INDICATOR_RUNNING else: processing_status_string = 'killed' status_indicator = definitions.STATUS_INDICATOR_KILLED process_status = { 'processing_status': processing_status_string} self._UpdateProcessingStatus(pid, process_status, used_memory) if status_indicator in definitions.ERROR_STATUS_INDICATORS: logger.error(( 'Process {0:s} (PID: {1:d}) is not functioning correctly. ' 'Status code: {2!s}.').format( process.name, pid, status_indicator)) self._TerminateProcessByPid(pid) def _MergeAttributeContainers(self, storage_writer, merge_helper): """Merges attribute containers from a task store into the storage writer. Args: storage_writer (StorageWriter): storage writer. merge_helper (AnalysisTaskMergeHelper): helper to merge attribute containers. Returns: int: number of containers merged. """ number_of_containers = 0 container = merge_helper.GetAttributeContainer() while container: number_of_containers += 1 if container.CONTAINER_TYPE == self._CONTAINER_TYPE_EVENT_TAG: storage_writer.AddOrUpdateEventTag(container) else: storage_writer.AddAttributeContainer(container) if container.CONTAINER_TYPE == self._CONTAINER_TYPE_ANALYSIS_REPORT: self._number_of_produced_analysis_reports += 1 elif container.CONTAINER_TYPE == self._CONTAINER_TYPE_EVENT_TAG: self._number_of_produced_event_tags += 1 for label in container.labels: self._event_labels_counter[label] += 1 self._event_labels_counter['total'] += 1 container = merge_helper.GetAttributeContainer() return number_of_containers def _StartAnalysisProcesses(self, analysis_plugins): """Starts the analysis processes. Args: analysis_plugins (dict[str, AnalysisPlugin]): analysis plugins that should be run and their names. """ logger.info('Starting analysis plugins.') for analysis_plugin in analysis_plugins.values(): self._analysis_plugins[analysis_plugin.NAME] = analysis_plugin process = self._StartWorkerProcess(analysis_plugin.NAME) if not process: logger.error('Unable to create analysis process: {0:s}'.format( analysis_plugin.NAME)) logger.info('Analysis plugins running') def _StartWorkerProcess(self, process_name): """Creates, starts, monitors and registers a worker process. Args: process_name (str): process name. Returns: MultiProcessWorkerProcess: extraction worker process or None on error. """ analysis_plugin = self._analysis_plugins.get(process_name, None) if not analysis_plugin: logger.error('Missing analysis plugin: {0:s}'.format(process_name)) return None queue_name = '{0:s} output event queue'.format(process_name) output_event_queue = zeromq_queue.ZeroMQPushBindQueue( name=queue_name, timeout_seconds=self._QUEUE_TIMEOUT) # Open the queue so it can bind to a random port, and we can get the # port number to use in the input queue. output_event_queue.Open() self._event_queues[process_name] = output_event_queue queue_name = '{0:s} input event queue'.format(process_name) input_event_queue = zeromq_queue.ZeroMQPullConnectQueue( name=queue_name, delay_open=True, port=output_event_queue.port, timeout_seconds=self._QUEUE_TIMEOUT) process = analysis_process.AnalysisProcess( input_event_queue, analysis_plugin, self._processing_configuration, self._user_accounts, data_location=self._data_location, event_filter_expression=self._event_filter_expression, name=process_name) process.start() logger.info('Started analysis plugin: {0:s} (PID: {1:d}).'.format( process_name, process.pid)) try: self._StartMonitoringProcess(process) except (IOError, KeyError) as exception: logger.error(( 'Unable to monitor analysis plugin: {0:s} (PID: {1:d}) ' 'with error: {2!s}').format(process_name, process.pid, exception)) process.terminate() return None self._RegisterProcess(process) return process def _StopAnalysisProcesses(self, abort=False): """Stops the analysis processes. Args: abort (bool): True to indicated the stop is issued on abort. """ logger.debug('Stopping analysis processes.') self._StopMonitoringProcesses() if abort: # Signal all the processes to abort. self._AbortTerminate() # Wake the processes to make sure that they are not blocking # waiting for the queue new items. for event_queue in self._event_queues.values(): event_queue.PushItem(plaso_queue.QueueAbort(), block=False) # Try waiting for the processes to exit normally. self._AbortJoin(timeout=self._PROCESS_JOIN_TIMEOUT) for event_queue in self._event_queues.values(): event_queue.Close(abort=abort) if abort: # Kill any remaining processes. self._AbortKill() else: # Check if the processes are still alive and terminate them if necessary. self._AbortTerminate() self._AbortJoin(timeout=self._PROCESS_JOIN_TIMEOUT) for event_queue in self._event_queues.values(): event_queue.Close(abort=True) def _UpdateForemanProcessStatus(self): """Update the foreman process status.""" used_memory = self._process_information.GetUsedMemory() or 0 display_name = getattr(self._merge_task, 'identifier', '') self._processing_status.UpdateForemanStatus( self._name, self._status, self._pid, used_memory, display_name, self._number_of_consumed_sources, self._number_of_produced_sources, self._number_of_consumed_event_data, self._number_of_produced_event_data, self._number_of_consumed_events, self._number_of_produced_events, self._number_of_consumed_event_tags, self._number_of_produced_event_tags, self._number_of_consumed_analysis_reports, self._number_of_produced_analysis_reports) self._processing_status.UpdateEventsStatus(self._events_status) def _UpdateProcessingStatus(self, pid, process_status, used_memory): """Updates the processing status. Args: pid (int): process identifier (PID) of the worker process. process_status (dict[str, object]): status values received from the worker process. used_memory (int): size of used memory in bytes. Raises: KeyError: if the process is not registered with the engine. """ self._RaiseIfNotRegistered(pid) if not process_status: return process = self._processes_per_pid[pid] status_indicator = process_status.get('processing_status', None) self._RaiseIfNotMonitored(pid) display_name = process_status.get('display_name', '') number_of_consumed_event_data = process_status.get( 'number_of_consumed_event_data', None) number_of_produced_event_data = process_status.get( 'number_of_produced_event_data', None) number_of_consumed_event_tags = process_status.get( 'number_of_consumed_event_tags', None) number_of_produced_event_tags = process_status.get( 'number_of_produced_event_tags', None) number_of_consumed_events = process_status.get( 'number_of_consumed_events', None) number_of_produced_events = process_status.get( 'number_of_produced_events', None) number_of_consumed_reports = process_status.get( 'number_of_consumed_reports', None) number_of_produced_reports = process_status.get( 'number_of_produced_reports', None) number_of_consumed_sources = process_status.get( 'number_of_consumed_sources', None) number_of_produced_sources = process_status.get( 'number_of_produced_sources', None) if status_indicator != definitions.STATUS_INDICATOR_IDLE: last_activity_timestamp = process_status.get( 'last_activity_timestamp', 0.0) if last_activity_timestamp: last_activity_timestamp += self._worker_timeout current_timestamp = time.time() if current_timestamp > last_activity_timestamp: logger.error(( 'Process {0:s} (PID: {1:d}) has not reported activity within ' 'the timeout period.').format(process.name, pid)) status_indicator = definitions.STATUS_INDICATOR_NOT_RESPONDING self._processing_status.UpdateWorkerStatus( process.name, status_indicator, pid, used_memory, display_name, number_of_consumed_sources, number_of_produced_sources, number_of_consumed_event_data, number_of_produced_event_data, number_of_consumed_events, number_of_produced_events, number_of_consumed_event_tags, number_of_produced_event_tags, number_of_consumed_reports, number_of_produced_reports) def _UpdateStatus(self): """Update the status.""" # Make a local copy of the PIDs in case the dict is changed by # the main thread. for pid in list(self._process_information_per_pid.keys()): self._CheckStatusAnalysisProcess(pid) self._UpdateForemanProcessStatus() if self._status_update_callback: self._status_update_callback(self._processing_status) # pylint: disable=too-many-arguments
[docs] def AnalyzeEvents( self, session, storage_writer, data_location, analysis_plugins, processing_configuration, event_filter=None, event_filter_expression=None, status_update_callback=None, storage_file_path=None): """Analyzes events in a Plaso storage. Args: session (Session): session in which the events are analyzed. storage_writer (StorageWriter): storage writer. data_location (str): path to the location that data files should be loaded from. analysis_plugins (dict[str, AnalysisPlugin]): analysis plugins that should be run and their names. processing_configuration (ProcessingConfiguration): processing configuration. event_filter (Optional[EventObjectFilter]): event filter. event_filter_expression (Optional[str]): event filter expression. status_update_callback (Optional[function]): callback function for status updates. storage_file_path (Optional[str]): path to the session storage file. Returns: ProcessingStatus: processing status. Raises: KeyboardInterrupt: if a keyboard interrupt was raised. ValueError: if analysis plugins are missing. """ if not analysis_plugins: raise ValueError('Missing analysis plugins') abort_kill = False keyboard_interrupt = False queue_full = False self._analysis_plugins = {} self._data_location = data_location self._event_filter_expression = event_filter_expression self._events_status = processing_status.EventsStatus() self._processing_configuration = processing_configuration self._session = session self._status_update_callback = status_update_callback self._storage_file_path = storage_file_path self._user_accounts = list( storage_writer.GetAttributeContainers('user_account')) stored_event_labels_counter = {} if storage_writer.HasAttributeContainers('event_label_count'): stored_event_labels_counter = { event_label_count.label: event_label_count for event_label_count in storage_writer.GetAttributeContainers( 'event_label_count')} self._event_labels_counter = collections.Counter() if storage_writer.HasAttributeContainers('parser_count'): parsers_counter = { parser_count.name: parser_count.number_of_events for parser_count in storage_writer.GetAttributeContainers( 'parser_count')} total_number_of_events = parsers_counter['total'] else: total_number_of_events = 0 for stored_session in storage_writer.GetSessions(): total_number_of_events += stored_session.parsers_counter['total'] self._events_status.total_number_of_events = total_number_of_events # Set up the storage writer before the analysis processes. self._StartTaskStorage(definitions.STORAGE_FORMAT_SQLITE) self._StartAnalysisProcesses(analysis_plugins) self._StartProfiling(self._processing_configuration.profiling) # Start the status update thread after open of the storage writer # so we don't have to clean up the thread if the open fails. self._StartStatusUpdateThread() try: self._AnalyzeEvents( storage_writer, analysis_plugins, event_filter=event_filter) for key, value in self._event_labels_counter.items(): event_label_count = stored_event_labels_counter.get(key, None) if event_label_count: event_label_count.number_of_events += value storage_writer.UpdateAttributeContainer(event_label_count) else: event_label_count = counts.EventLabelCount( label=key, number_of_events=value) storage_writer.AddAttributeContainer(event_label_count) self._status = definitions.STATUS_INDICATOR_FINALIZING except errors.QueueFull: queue_full = True self._abort = True except KeyboardInterrupt: keyboard_interrupt = True self._abort = True finally: self._processing_status.aborted = self._abort session.aborted = self._abort # Stop the status update thread after close of the storage writer # so we include the storage sync to disk in the status updates. self._StopStatusUpdateThread() self._StopProfiling() # Update the status view one last time before the analysis processses are # stopped. self._UpdateStatus() if queue_full: # TODO: handle abort on queue full more elegant. abort_kill = True else: try: self._StopAnalysisProcesses(abort=self._abort) except KeyboardInterrupt: keyboard_interrupt = True abort_kill = True if abort_kill: self._AbortKill() # The abort can leave the main process unresponsive # due to incorrectly finalized IPC. self._KillProcess(os.getpid()) try: self._StopTaskStorage( definitions.STORAGE_FORMAT_SQLITE, session.identifier, abort=self._abort) except (IOError, OSError) as exception: logger.error('Unable to stop task storage with error: {0!s}'.format( exception)) if self._abort: logger.debug('Analysis aborted.') else: logger.debug('Analysis completed.') # Update the status view one last time. self._UpdateStatus() # Reset values. self._analysis_plugins = {} self._data_location = None self._event_filter_expression = None self._processing_configuration = None self._session = None self._status_update_callback = None self._storage_file_path = None self._user_accounts = None if keyboard_interrupt: raise KeyboardInterrupt return self._processing_status