Source code for plaso.multi_process.engine

# -*- coding: utf-8 -*-
"""The multi-process processing engine."""

import abc
import ctypes
import os
import signal
import sys
import threading
import time

from plaso.engine import engine
from plaso.engine import process_info
from plaso.lib import definitions
from plaso.multi_process import logger
from plaso.multi_process import plaso_xmlrpc


[docs] class MultiProcessEngine(engine.BaseEngine): """Multi-process engine base. This class contains functionality to: * monitor and manage worker processes; * retrieve a process status information via RPC; * manage the status update thread. """ # Note that on average Windows seems to require a longer wait. _RPC_SERVER_TIMEOUT = 8.0 _MAXIMUM_RPC_ERRORS = 10 # Maximum number of attempts to try to start a replacement worker process. _MAXIMUM_REPLACEMENT_RETRIES = 3 # Number of seconds to wait between attempts to start a replacement worker # process _REPLACEMENT_WORKER_RETRY_DELAY = 1 _PROCESS_JOIN_TIMEOUT = 5.0 _ZEROMQ_NO_WORKER_REQUEST_TIME_SECONDS = 300
[docs] def __init__(self): """Initializes a multi-process engine.""" super(MultiProcessEngine, self).__init__() self._debug_output = False self._name = 'Main' self._last_worker_number = 0 self._log_filename = None self._pid = os.getpid() self._process_information = process_info.ProcessInfo(self._pid) self._process_information_per_pid = {} self._processes_per_pid = {} self._quiet_mode = False self._rpc_clients_per_pid = {} self._rpc_errors_per_pid = {} self._status_update_active = False self._status_update_thread = None self._storage_writer = None self._worker_memory_limit = definitions.DEFAULT_WORKER_MEMORY_LIMIT
def _AbortJoin(self, timeout=None): """Aborts all registered processes by joining with the parent process. Args: timeout (int): number of seconds to wait for processes to join, where None represents no timeout. """ for pid, process in self._processes_per_pid.items(): logger.debug('Waiting for process: {0:s} (PID: {1:d}).'.format( process.name, pid)) process.join(timeout=timeout) if not process.is_alive(): logger.debug('Process {0:s} (PID: {1:d}) stopped.'.format( process.name, pid)) def _AbortKill(self): """Aborts all registered processes by sending a SIGKILL or equivalent.""" for pid, process in self._processes_per_pid.items(): if not process.is_alive(): continue logger.warning('Killing process: {0:s} (PID: {1:d}).'.format( process.name, pid)) self._KillProcess(pid) def _AbortTerminate(self): """Aborts all registered processes by sending a SIGTERM or equivalent.""" for pid, process in self._processes_per_pid.items(): if not process.is_alive(): continue logger.warning('Terminating process: {0:s} (PID: {1:d}).'.format( process.name, pid)) process.terminate() def _CheckStatusWorkerProcess(self, pid): """Checks the status of a worker process. If a worker process is not responding the process is terminated and a replacement process is started. Args: pid (int): process ID (PID) of a registered worker process. Raises: KeyError: if the process is not registered with the engine. """ # TODO: Refactor this method, simplify and separate concerns (monitoring # vs management). self._RaiseIfNotRegistered(pid) process = self._processes_per_pid[pid] process_status = self._QueryProcessStatus(process) if process_status is None: process_is_alive = False else: process_is_alive = True process_information = self._process_information_per_pid[pid] used_memory = process_information.GetUsedMemory() or 0 if self._worker_memory_limit and used_memory > self._worker_memory_limit: logger.warning(( 'Process: {0:s} (PID: {1:d}) killed because it exceeded the ' 'memory limit: {2:d}.').format( process.name, pid, self._worker_memory_limit)) self._KillProcess(pid) if isinstance(process_status, dict): self._rpc_errors_per_pid[pid] = 0 status_indicator = process_status.get('processing_status', None) else: rpc_errors = self._rpc_errors_per_pid.get(pid, 0) + 1 self._rpc_errors_per_pid[pid] = rpc_errors if rpc_errors > self._MAXIMUM_RPC_ERRORS: process_is_alive = False if process_is_alive: rpc_port = process.rpc_port.value logger.warning(( 'Unable to retrieve process: {0:s} (PID: {1:d}) status via ' 'RPC socket: http://localhost:{2:d}').format( process.name, pid, rpc_port)) processing_status_string = 'RPC error' status_indicator = definitions.STATUS_INDICATOR_RUNNING else: processing_status_string = 'killed' status_indicator = definitions.STATUS_INDICATOR_KILLED process_status = { 'processing_status': processing_status_string} self._UpdateProcessingStatus(pid, process_status, used_memory) # _UpdateProcessingStatus can also change the status of the worker, # So refresh the status if applicable. for worker_status in self._processing_status.workers_status: if worker_status.pid == pid: status_indicator = worker_status.status break if status_indicator in definitions.ERROR_STATUS_INDICATORS: logger.error(( 'Process {0:s} (PID: {1:d}) is not functioning correctly. ' 'Status code: {2!s}.').format(process.name, pid, status_indicator)) self._TerminateProcessByPid(pid) replacement_process = None replacement_process_name = 'Worker_{0:02d}'.format( self._last_worker_number) for replacement_process_attempt in range( self._MAXIMUM_REPLACEMENT_RETRIES): logger.info(( 'Attempt: {0:d} to start replacement worker process for ' '{1:s}').format(replacement_process_attempt + 1, process.name)) replacement_process = self._StartWorkerProcess(replacement_process_name) if replacement_process: break time.sleep(self._REPLACEMENT_WORKER_RETRY_DELAY) if not replacement_process: logger.error( 'Unable to create replacement worker process for: {0:s}'.format( process.name)) def _KillProcess(self, pid): """Issues a SIGKILL or equivalent to the process. Args: pid (int): process identifier (PID). """ if sys.platform.startswith('win'): process_terminate = 1 handle = ctypes.windll.kernel32.OpenProcess( process_terminate, False, pid) ctypes.windll.kernel32.TerminateProcess(handle, -1) ctypes.windll.kernel32.CloseHandle(handle) else: try: os.kill(pid, signal.SIGKILL) except OSError as exception: logger.error('Unable to kill process {0:d} with error: {1!s}'.format( pid, exception)) def _QueryProcessStatus(self, process): """Queries a process to determine its status. Args: process (MultiProcessBaseProcess): process to query for its status. Returns: dict[str, str]: status values received from the worker process. """ process_is_alive = process.is_alive() if not process_is_alive: return None rpc_client = self._rpc_clients_per_pid.get(process.pid, None) return rpc_client.CallFunction() def _RaiseIfNotMonitored(self, pid): """Raises if the process is not monitored by the engine. Args: pid (int): process identifier (PID). Raises: KeyError: if the process is not monitored by the engine. """ if pid not in self._process_information_per_pid: raise KeyError( 'Process (PID: {0:d}) not monitored by engine.'.format(pid)) def _RaiseIfNotRegistered(self, pid): """Raises if the process is not registered with the engine. Args: pid (int): process identifier (PID). Raises: KeyError: if the process is not registered with the engine. """ if pid not in self._processes_per_pid: raise KeyError( 'Process (PID: {0:d}) not registered with engine'.format(pid)) def _RegisterProcess(self, process): """Registers a process with the engine. Args: process (MultiProcessBaseProcess): process. Raises: KeyError: if the process is already registered with the engine. ValueError: if the process is missing. """ if process is None: raise ValueError('Missing process.') if process.pid in self._processes_per_pid: raise KeyError( 'Already managing process: {0!s} (PID: {1:d})'.format( process.name, process.pid)) self._processes_per_pid[process.pid] = process # pylint: disable=redundant-returns-doc @abc.abstractmethod def _StartWorkerProcess(self, process_name): """Creates, starts, monitors and registers a worker process. Args: process_name (str): process name. Returns: MultiProcessWorkerProcess: extraction worker process. """ def _StartMonitoringProcess(self, process): """Starts monitoring a process. Args: process (MultiProcessBaseProcess): process. Raises: IOError: if the RPC client cannot connect to the server. KeyError: if the process is not registered with the engine or if the process is already being monitored. OSError: if the RPC client cannot connect to the server. ValueError: if the process is missing. """ if process is None: raise ValueError('Missing process.') pid = process.pid if pid in self._process_information_per_pid: raise KeyError( 'Already monitoring process (PID: {0:d}).'.format(pid)) if pid in self._rpc_clients_per_pid: raise KeyError( 'RPC client (PID: {0:d}) already exists'.format(pid)) rpc_client = plaso_xmlrpc.XMLProcessStatusRPCClient() # Make sure that a worker process has started its RPC server. # The RPC port will be 0 if no server is available. rpc_port = process.rpc_port.value time_waited_for_process = 0.0 while not rpc_port: time.sleep(0.1) rpc_port = process.rpc_port.value time_waited_for_process += 0.1 if time_waited_for_process >= self._RPC_SERVER_TIMEOUT: raise IOError( 'RPC client unable to determine server (PID: {0:d}) port.'.format( pid)) hostname = 'localhost' if not rpc_client.Open(hostname, rpc_port): raise IOError(( 'RPC client unable to connect to server (PID: {0:d}) ' 'http://{1:s}:{2:d}').format(pid, hostname, rpc_port)) self._rpc_clients_per_pid[pid] = rpc_client self._process_information_per_pid[pid] = process_info.ProcessInfo(pid) def _StartStatusUpdateThread(self): """Starts the status update thread.""" self._status_update_active = True self._status_update_thread = threading.Thread( name='Status update', target=self._StatusUpdateThreadMain) self._status_update_thread.start() def _StatusUpdateThreadMain(self): """Main function of the status update thread.""" while self._status_update_active: self._UpdateStatus() time.sleep(self._status_update_interval) def _StopMonitoringProcess(self, process): """Stops monitoring a process. Args: process (MultiProcessBaseProcess): process. Raises: KeyError: if the process is not monitored. ValueError: if the process is missing. """ if process is None: raise ValueError('Missing process.') pid = process.pid self._RaiseIfNotMonitored(pid) del self._process_information_per_pid[pid] rpc_client = self._rpc_clients_per_pid.get(pid, None) if rpc_client: rpc_client.Close() del self._rpc_clients_per_pid[pid] if pid in self._rpc_errors_per_pid: del self._rpc_errors_per_pid[pid] logger.debug('Stopped monitoring process: {0:s} (PID: {1:d})'.format( process.name, pid)) def _StopMonitoringProcesses(self): """Stops monitoring all processes.""" # We need to make a copy of the list of pids since we are changing # the dict in the loop. for pid in list(self._process_information_per_pid.keys()): self._RaiseIfNotRegistered(pid) process = self._processes_per_pid[pid] self._StopMonitoringProcess(process) def _StopStatusUpdateThread(self): """Stops the status update thread.""" if self._status_update_thread: self._status_update_active = False if self._status_update_thread.is_alive(): self._status_update_thread.join() self._status_update_thread = None # Update the status view one last time so we have the latest worker process # status information. self._UpdateStatus() def _TerminateProcessByPid(self, pid): """Terminate a process that's monitored by the engine. Args: pid (int): process identifier (PID). Raises: KeyError: if the process is not registered with and monitored by the engine. """ self._RaiseIfNotRegistered(pid) process = self._processes_per_pid[pid] self._TerminateProcess(process) self._StopMonitoringProcess(process) def _TerminateProcess(self, process): """Terminate a process. Args: process (MultiProcessBaseProcess): process to terminate. """ pid = process.pid logger.warning('Terminating process: (PID: {0:d}).'.format(pid)) process.terminate() # Wait for the process to exit. process.join(timeout=self._PROCESS_JOIN_TIMEOUT) if process.is_alive(): logger.warning('Killing process: (PID: {0:d}).'.format(pid)) self._KillProcess(pid) @abc.abstractmethod def _UpdateProcessingStatus(self, pid, process_status, used_memory): """Updates the processing status. Args: pid (int): process identifier (PID) of the worker process. process_status (dict[str, object]): status values received from the worker process. used_memory (int): size of used memory in bytes. Raises: KeyError: if the process is not registered with the engine. """ @abc.abstractmethod def _UpdateStatus(self): """Updates the status."""