Source code for plaso.multi_processing.base_process

# -*- coding: utf-8 -*-
"""Base class for a process used in multi-processing."""

import abc
import logging
import multiprocessing
import os
import random
import signal
import sys
import time

from plaso.engine import process_info
from plaso.engine import profilers
from plaso.lib import loggers
from plaso.multi_processing import logger
from plaso.multi_processing import plaso_xmlrpc

[docs]class MultiProcessBaseProcess(multiprocessing.Process): """Multi-processing process interface. Attributes: rpc_port (int): port number of the process status RPC server. """ _NUMBER_OF_RPC_SERVER_START_ATTEMPTS = 14 _PROCESS_JOIN_TIMEOUT = 5.0 def __init__( self, processing_configuration, enable_sigsegv_handler=False, **kwargs): """Initializes a process. Args: processing_configuration (ProcessingConfiguration): processing configuration. enable_sigsegv_handler (Optional[bool]): True if the SIGSEGV handler should be enabled. kwargs (dict[str,object]): keyword arguments to pass to multiprocessing.Process. """ super(MultiProcessBaseProcess, self).__init__(**kwargs) self._analyzers_profiler = None self._debug_output = False self._enable_sigsegv_handler = enable_sigsegv_handler self._log_filename = None self._memory_profiler = None self._original_sigsegv_handler = None # TODO: check if this can be replaced by or does this only apply # to the parent process? self._pid = None self._processing_configuration = processing_configuration self._process_information = None self._processing_profiler = None self._quiet_mode = False self._rpc_server = None self._serializers_profiler = None self._status_is_running = False self._storage_profiler = None self._tasks_profiler = None if self._processing_configuration: self._debug_output = self._processing_configuration.debug_output if processing_configuration.log_filename: log_path = os.path.dirname(self._processing_configuration.log_filename) log_filename = os.path.basename( self._processing_configuration.log_filename) log_filename = '{0:s}_{1:s}'.format(self._name, log_filename) self._log_filename = os.path.join(log_path, log_filename) # We need to share the RPC port number with the engine process. self.rpc_port = multiprocessing.Value('I', 0) @property def name(self): """str: process name.""" return self._name # pylint: disable=redundant-returns-doc @abc.abstractmethod def _GetStatus(self): """Returns status information. Returns: dict [str, object]: status attributes, indexed by name. """ @abc.abstractmethod def _Main(self): """The process main loop. This method is called when the process is ready to start. A sub class should override this method to do the necessary actions in the main loop. """ def _OnCriticalError(self): """The process on critical error handler. This method is called when the process encounters a critical error for example a segfault. A sub class should override this method to do the necessary actions before the original critical error signal handler it called. Be aware that the state of the process should not be trusted, as a significant part of memory could have been overwritten before a segfault. This callback is primarily intended to salvage what we need to troubleshoot the error. """ return # pylint: disable=unused-argument def _SigSegvHandler(self, signal_number, stack_frame): """Signal handler for the SIGSEGV signal. Args: signal_number (int): numeric representation of the signal. stack_frame (frame): current stack frame or None. """ self._OnCriticalError() # Note that the original SIGSEGV handler can be 0. if self._original_sigsegv_handler is not None: # Let the original SIGSEGV handler take over. signal.signal(signal.SIGSEGV, self._original_sigsegv_handler) os.kill(self._pid, signal.SIGSEGV) # pylint: disable=unused-argument def _SigTermHandler(self, signal_number, stack_frame): """Signal handler for the SIGTERM signal. Args: signal_number (int): numeric representation of the signal. stack_frame (frame): current stack frame or None. """ self.SignalAbort() def _StartProcessStatusRPCServer(self): """Starts the process status RPC server.""" if self._rpc_server: return self._rpc_server = plaso_xmlrpc.XMLProcessStatusRPCServer(self._GetStatus) hostname = 'localhost' # Try the PID as port number first otherwise pick something random # between 1024 and 60000. if self._pid < 1024 or self._pid > 60000: port = random.randint(1024, 60000) else: port = self._pid if not self._rpc_server.Start(hostname, port): port = 0 for _ in range(self._NUMBER_OF_RPC_SERVER_START_ATTEMPTS): port = random.randint(1024, 60000) if self._rpc_server.Start(hostname, port): break port = 0 if not port: logger.error(( 'Unable to start a process status RPC server for {0!s} ' '(PID: {1:d})').format(self._name, self._pid)) self._rpc_server = None return self.rpc_port.value = port logger.debug( 'Process: {0!s} process status RPC server started'.format(self._name)) def _StartProfiling(self, configuration): """Starts profiling. Args: configuration (ProfilingConfiguration): profiling configuration. """ if not configuration: return if configuration.HaveProfileMemory(): self._memory_profiler = profilers.MemoryProfiler( self._name, configuration) self._memory_profiler.Start() if configuration.HaveProfileAnalyzers(): identifier = '{0:s}-analyzers'.format(self._name) self._analyzers_profiler = profilers.AnalyzersProfiler( identifier, configuration) self._analyzers_profiler.Start() if configuration.HaveProfileProcessing(): identifier = '{0:s}-processing'.format(self._name) self._processing_profiler = profilers.ProcessingProfiler( identifier, configuration) self._processing_profiler.Start() if configuration.HaveProfileSerializers(): identifier = '{0:s}-serializers'.format(self._name) self._serializers_profiler = profilers.SerializersProfiler( identifier, configuration) self._serializers_profiler.Start() if configuration.HaveProfileStorage(): self._storage_profiler = profilers.StorageProfiler( self._name, configuration) self._storage_profiler.Start() if configuration.HaveProfileTasks(): self._tasks_profiler = profilers.TasksProfiler(self._name, configuration) self._tasks_profiler.Start() def _StopProcessStatusRPCServer(self): """Stops the process status RPC server.""" if not self._rpc_server: return # Make sure the engine gets one more status update so it knows # the worker has completed. self._WaitForStatusNotRunning() self._rpc_server.Stop() self._rpc_server = None self.rpc_port.value = 0 logger.debug( 'Process: {0!s} process status RPC server stopped'.format(self._name)) def _StopProfiling(self): """Stops profiling.""" if self._memory_profiler: self._memory_profiler.Stop() self._memory_profiler = None if self._analyzers_profiler: self._analyzers_profiler.Stop() self._analyzers_profiler = None if self._processing_profiler: self._processing_profiler.Stop() self._processing_profiler = None if self._serializers_profiler: self._serializers_profiler.Stop() self._serializers_profiler = None if self._storage_profiler: self._storage_profiler.Stop() self._storage_profiler = None if self._tasks_profiler: self._tasks_profiler.Stop() self._tasks_profiler = None def _WaitForStatusNotRunning(self): """Waits for the status is running to change to false.""" # We wait slightly longer than the status check sleep time. time.sleep(2.0) time_slept = 2.0 while self._status_is_running: time.sleep(0.5) time_slept += 0.5 if time_slept >= self._PROCESS_JOIN_TIMEOUT: break # This method is part of the multiprocessing.Process interface hence # its name does not follow the style guide.
[docs] def run(self): """Runs the process.""" if '_lsprof' in sys.modules: # If profiling is active make sure the worker process is included. # cProfile needs to be imported only when needed otherwise the _lsprof # module will be loaded. import cProfile # pylint: disable=import-outside-toplevel profile = cProfile.Profile() profile.enable() profile.runcall(self._RunProcess) profile.disable() profile.dump_stats('{0:s}-profile.output'.format(self._name)) else: self._RunProcess()
def _RunProcess(self): """Runs the process.""" # Prevent the KeyboardInterrupt being raised inside the process. # This will prevent a process from generating a traceback when interrupted. signal.signal(signal.SIGINT, signal.SIG_IGN) # A SIGTERM signal handler is necessary to make sure IPC is cleaned up # correctly on terminate. signal.signal(signal.SIGTERM, self._SigTermHandler) # A SIGSEGV signal handler is necessary to try to indicate where # worker failed. # WARNING the SIGSEGV handler will deadlock the process on a real segfault. if self._enable_sigsegv_handler: self._original_sigsegv_handler = signal.signal( signal.SIGSEGV, self._SigSegvHandler) self._pid = os.getpid() self._process_information = process_info.ProcessInfo(self._pid) # We need to set the is running status explicitly to True in case # the process completes before the engine is able to determine # the status of the process, such as in the unit tests. self._status_is_running = True # Logging needs to be configured before the first output otherwise we # mess up the logging of the parent process. loggers.ConfigureLogging( debug_output=self._debug_output, filename=self._log_filename, quiet_mode=self._quiet_mode) logger.debug( 'Process: {0!s} (PID: {1:d}) started'.format(self._name, self._pid)) self._StartProcessStatusRPCServer() self._Main() self._StopProcessStatusRPCServer() logger.debug( 'Process: {0!s} (PID: {1:d}) stopped'.format(self._name, self._pid)) # Make sure log files are cleanly closed. logging.shutdown() self._status_is_running = False
[docs] @abc.abstractmethod def SignalAbort(self): """Signals the process to abort."""