Source code for plaso.multi_process.zeromq_queue

# -*- coding: utf-8 -*-
"""ZeroMQ implementations of the Plaso queue interface."""

import abc
import errno
import queue
import threading
import time

import zmq

from plaso.engine import logger
from plaso.lib import errors
from plaso.multi_process import plaso_queue


# pylint: disable=no-member

[docs] class ZeroMQQueue(plaso_queue.Queue): """Interface for a ZeroMQ backed queue. Attributes: name (str): name to identify the queue. port (int): TCP port that the queue is connected or bound to. If the queue is not yet bound or connected to a port, this value will be None. timeout_seconds (int): number of seconds that calls to PopItem and PushItem may block for, before returning queue.QueueEmpty. """ _SOCKET_ADDRESS = 'tcp://127.0.0.1' _SOCKET_TYPE = None _ZMQ_SOCKET_SEND_TIMEOUT_MILLISECONDS = 1500 _ZMQ_SOCKET_RECEIVE_TIMEOUT_MILLISECONDS = 1500 SOCKET_CONNECTION_BIND = 1 SOCKET_CONNECTION_CONNECT = 2 SOCKET_CONNECTION_TYPE = None
[docs] def __init__( self, delay_open=True, linger_seconds=10, maximum_items=1000, name='Unnamed', port=None, timeout_seconds=5): """Initializes a ZeroMQ backed queue. Args: delay_open (Optional[bool]): whether a ZeroMQ socket should be created the first time the queue is pushed to or popped from, rather than at queue object initialization. This is useful if a queue needs to be passed to a child process from a parent process. linger_seconds (Optional[int]): number of seconds that the underlying ZeroMQ socket can remain open after the queue has been closed, to allow queued items to be transferred to other ZeroMQ sockets. maximum_items (Optional[int]): maximum number of items to queue on the ZeroMQ socket. ZeroMQ refers to this value as "high water mark" or "hwm". Note that this limit only applies at one "end" of the queue. The default of 1000 is the ZeroMQ default value. name (Optional[str]): Optional name to identify the queue. port (Optional[int]): The TCP port to use for the queue. The default is None, which indicates that the queue should choose a random port to bind to. timeout_seconds (Optional[int]): number of seconds that calls to PopItem and PushItem may block for, before returning queue.QueueEmpty. Raises: ValueError: if the queue is configured to connect to an endpoint, but no port is specified. """ if (self.SOCKET_CONNECTION_TYPE == self.SOCKET_CONNECTION_CONNECT and not port): raise ValueError('No port specified to connect to.') super(ZeroMQQueue, self).__init__() self._closed_event = None self._high_water_mark = maximum_items self._linger_seconds = linger_seconds self._terminate_event = None self._zmq_context = None self._zmq_socket = None self.name = name self.port = port self.timeout_seconds = timeout_seconds if not delay_open: self._CreateZMQSocket()
def _SendItem(self, zmq_socket, item, block=True): """Attempts to send an item to a ZeroMQ socket. Args: zmq_socket (zmq.Socket): used to the send the item. item (object): sent on the queue. Will be pickled prior to sending. block (Optional[bool]): whether the push should be performed in blocking or non-blocking mode. Returns: bool: whether the item was sent successfully. """ try: logger.debug('{0:s} sending item'.format(self.name)) if block: zmq_socket.send_pyobj(item) else: zmq_socket.send_pyobj(item, zmq.DONTWAIT) logger.debug('{0:s} sent item'.format(self.name)) return True except zmq.error.Again: logger.debug('{0:s} could not send an item'.format(self.name)) except zmq.error.ZMQError as exception: if exception.errno == errno.EINTR: logger.error( 'ZMQ syscall interrupted in {0:s}.'.format( self.name)) return False def _ReceiveItemOnActivity(self, zmq_socket): """Attempts to receive an item from a ZeroMQ socket. Args: zmq_socket (zmq.Socket): used to the receive the item. Returns: object: item from the socket. Raises: QueueEmpty: if no item could be received within the timeout. zmq.error.ZMQError: if an error occurs in ZeroMQ """ events = zmq_socket.poll( self._ZMQ_SOCKET_RECEIVE_TIMEOUT_MILLISECONDS) if events: try: received_object = self._zmq_socket.recv_pyobj() return received_object except zmq.error.Again: logger.error( '{0:s}. Failed to receive item in time.'.format( self.name)) raise except zmq.error.ZMQError as exception: if exception.errno == errno.EINTR: logger.error( 'ZMQ syscall interrupted in {0:s}. Queue aborting.'.format( self.name)) raise raise errors.QueueEmpty def _SetSocketTimeouts(self): """Sets the timeouts for socket send and receive.""" # Note that timeout must be an integer value. If timeout is a float # it appears that zmq will not enforce the timeout. timeout = int(self.timeout_seconds * 1000) receive_timeout = min( self._ZMQ_SOCKET_RECEIVE_TIMEOUT_MILLISECONDS, timeout) send_timeout = min(self._ZMQ_SOCKET_SEND_TIMEOUT_MILLISECONDS, timeout) self._zmq_socket.setsockopt(zmq.RCVTIMEO, receive_timeout) self._zmq_socket.setsockopt(zmq.SNDTIMEO, send_timeout) def _SetSocketHighWaterMark(self): """Sets the high water mark for the socket. This number is the maximum number of items that will be queued in the socket on this end of the queue. """ self._zmq_socket.hwm = self._high_water_mark def _CreateZMQSocket(self): """Creates a ZeroMQ socket.""" logger.debug('Creating socket for {0:s}'.format(self.name)) if not self._zmq_context: self._zmq_context = zmq.Context() # pylint: disable=abstract-class-instantiated # The terminate and close threading events need to be created when the # socket is opened. Threading events are unpickleable objects and cannot # passed in multiprocessing on Windows. if not self._terminate_event: self._terminate_event = threading.Event() if not self._closed_event: self._closed_event = threading.Event() if self._zmq_socket: logger.debug('Closing old socket for {0:s}'.format(self.name)) self._zmq_socket.close() self._zmq_socket = None self._zmq_socket = self._zmq_context.socket(self._SOCKET_TYPE) self._SetSocketTimeouts() self._SetSocketHighWaterMark() if self.port: address = '{0:s}:{1:d}'.format(self._SOCKET_ADDRESS, self.port) if self.SOCKET_CONNECTION_TYPE == self.SOCKET_CONNECTION_CONNECT: self._zmq_socket.connect(address) logger.debug('{0:s} connected to {1:s}'.format(self.name, address)) else: self._zmq_socket.bind(address) logger.debug( '{0:s} bound to specified port {1:s}'.format(self.name, address)) else: self.port = self._zmq_socket.bind_to_random_port(self._SOCKET_ADDRESS) logger.debug( '{0:s} bound to random port {1:d}'.format(self.name, self.port))
[docs] def Open(self): """Opens this queue, causing the creation of a ZeroMQ socket. Raises: QueueAlreadyStarted: if the queue is already started, and a socket already exists. """ if self._zmq_socket: raise errors.QueueAlreadyStarted() self._CreateZMQSocket()
[docs] def Close(self, abort=False): """Closes the queue. Args: abort (Optional[bool]): whether the Close is the result of an abort condition. If True, queue contents may be lost. Raises: QueueAlreadyClosed: if the queue is not started, or has already been closed. RuntimeError: if closed or terminate event is missing. """ if not self._closed_event or not self._terminate_event: raise RuntimeError('Missing closed or terminate event.') if not abort and self._closed_event.is_set(): raise errors.QueueAlreadyClosed() self._closed_event.set() if abort: if not self._closed_event.is_set(): logger.warning( '{0:s} queue aborting. Contents may be lost.'.format(self.name)) self._linger_seconds = 0 # We can't determine whether a there might be an operation being performed # on the socket in a separate method or thread, so we'll signal that any # such operation should cease. self._terminate_event.set() else: logger.debug( '{0:s} queue closing, will linger for up to {1:d} seconds'.format( self.name, self._linger_seconds))
[docs] def IsBound(self): """Checks if the queue is bound to a port.""" return (self.SOCKET_CONNECTION_TYPE == self.SOCKET_CONNECTION_BIND and self.port is not None)
[docs] def IsConnected(self): """Checks if the queue is connected to a port.""" return (self.SOCKET_CONNECTION_TYPE == self.SOCKET_CONNECTION_CONNECT and self.port is not None)
[docs] def IsEmpty(self): """Checks if the queue is empty. ZeroMQ queues don't have a concept of "empty" - there could always be messages on the queue that a producer or consumer is unaware of. Thus, the queue is never empty, so we return False. Note that it is possible that a queue is unable to pop an item from a queue within a timeout, which will cause PopItem to raise a QueueEmpty exception, but this is a different condition. Returns: bool: False, to indicate the the queue isn't empty. """ return False
[docs] @abc.abstractmethod def PushItem(self, item, block=True): """Pushes an item on to the queue. Args: item (object): item to push on the queue. block (Optional[bool]): whether the push should be performed in blocking or non-blocking mode. Raises: QueueAlreadyClosed: if the queue is closed. """
# pylint: disable=redundant-returns-doc
[docs] @abc.abstractmethod def PopItem(self): """Pops an item off the queue. Returns: object: item from the queue. Raises: QueueEmpty: if the queue is empty, and no item could be popped within the queue timeout. """
[docs] class ZeroMQPullQueue(ZeroMQQueue): """Parent class for Plaso queues backed by ZeroMQ PULL sockets. This class should not be instantiated directly, a subclass should be instantiated instead. Instances of this class or subclasses may only be used to pop items, not to push. """ _SOCKET_TYPE = zmq.PULL
[docs] def PopItem(self): """Pops an item off the queue. If no ZeroMQ socket has been created, one will be created the first time this method is called. Returns: object: item from the queue. Raises: KeyboardInterrupt: if the process is sent a KeyboardInterrupt while popping an item. QueueEmpty: if the queue is empty, and no item could be popped within the queue timeout. RuntimeError: if closed or terminate event is missing. zmq.error.ZMQError: if a ZeroMQ error occurs. """ if not self._zmq_socket: self._CreateZMQSocket() if not self._closed_event or not self._terminate_event: raise RuntimeError('Missing closed or terminate event.') logger.debug( 'Pop on {0:s} queue, port {1:d}'.format(self.name, self.port)) last_retry_timestamp = time.time() + self.timeout_seconds while not self._closed_event.is_set() or not self._terminate_event.is_set(): try: return self._ReceiveItemOnActivity(self._zmq_socket) except errors.QueueEmpty: if time.time() > last_retry_timestamp: raise except KeyboardInterrupt: self.Close(abort=True) raise return None
[docs] def PushItem(self, item, block=True): """Pushes an item on to the queue. Provided for compatibility with the API, but doesn't actually work. Args: item (object): item to push on the queue. block (Optional[bool]): whether the push should be performed in blocking or non-blocking mode. Raises: WrongQueueType: As Push is not supported this queue. """ raise errors.WrongQueueType()
[docs] class ZeroMQPullConnectQueue(ZeroMQPullQueue): """A Plaso queue backed by a ZeroMQ PULL socket that connects to a port. This queue may only be used to pop items, not to push. """ SOCKET_CONNECTION_TYPE = ZeroMQQueue.SOCKET_CONNECTION_CONNECT
[docs] class ZeroMQPushQueue(ZeroMQQueue): """Parent class for Plaso queues backed by ZeroMQ PUSH sockets. This class should not be instantiated directly, a subclass should be instantiated instead. Instances of this class or subclasses may only be used to push items, not to pop. """ _SOCKET_TYPE = zmq.PUSH
[docs] def PopItem(self): """Pops an item of the queue. Provided for compatibility with the API, but doesn't actually work. Raises: WrongQueueType: As Pull is not supported this queue. """ raise errors.WrongQueueType()
[docs] def PushItem(self, item, block=True): """Push an item on to the queue. If no ZeroMQ socket has been created, one will be created the first time this method is called. Args: item (object): item to push on the queue. block (Optional[bool]): whether the push should be performed in blocking or non-blocking mode. Raises: KeyboardInterrupt: if the process is sent a KeyboardInterrupt while pushing an item. QueueFull: if it was not possible to push the item to the queue within the timeout. RuntimeError: if terminate event is missing. zmq.error.ZMQError: if a ZeroMQ specific error occurs. """ if not self._zmq_socket: self._CreateZMQSocket() if not self._terminate_event: raise RuntimeError('Missing terminate event.') logger.debug( 'Push on {0:s} queue, port {1:d}'.format(self.name, self.port)) last_retry_timestamp = time.time() + self.timeout_seconds while not self._terminate_event.is_set(): try: send_successful = self._SendItem(self._zmq_socket, item, block) if send_successful: break if time.time() > last_retry_timestamp: logger.error('{0:s} unable to push item, raising.'.format( self.name)) raise errors.QueueFull except KeyboardInterrupt: self.Close(abort=True) raise
[docs] class ZeroMQPushBindQueue(ZeroMQPushQueue): """A Plaso queue backed by a ZeroMQ PUSH socket that binds to a port. This queue may only be used to push items, not to pop. """ SOCKET_CONNECTION_TYPE = ZeroMQQueue.SOCKET_CONNECTION_BIND
[docs] class ZeroMQRequestQueue(ZeroMQQueue): """Parent class for Plaso queues backed by ZeroMQ REQ sockets. This class should not be instantiated directly, a subclass should be instantiated instead. Instances of this class or subclasses may only be used to pop items, not to push. """ _SOCKET_TYPE = zmq.REQ
[docs] def PopItem(self): """Pops an item off the queue. If no ZeroMQ socket has been created, one will be created the first time this method is called. Returns: object: item from the queue. Raises: KeyboardInterrupt: if the process is sent a KeyboardInterrupt while popping an item. QueueEmpty: if the queue is empty, and no item could be popped within the queue timeout. RuntimeError: if terminate event is missing. zmq.error.ZMQError: if an error occurs in ZeroMQ. """ if not self._zmq_socket: self._CreateZMQSocket() if not self._terminate_event: raise RuntimeError('Missing terminate event.') logger.debug('Pop on {0:s} queue, port {1:d}'.format( self.name, self.port)) last_retry_time = time.time() + self.timeout_seconds while not self._terminate_event.is_set(): try: self._zmq_socket.send_pyobj(None) break except zmq.error.Again: # The existing socket is now out of sync, so we need to open a new one. self._CreateZMQSocket() if time.time() > last_retry_time: logger.warning('{0:s} timeout requesting item'.format(self.name)) raise errors.QueueEmpty continue while not self._terminate_event.is_set(): try: return self._ReceiveItemOnActivity(self._zmq_socket) except errors.QueueEmpty: continue except KeyboardInterrupt: self.Close(abort=True) raise return None
[docs] def PushItem(self, item, block=True): """Pushes an item on to the queue. Provided for compatibility with the API, but doesn't actually work. Args: item (object): item to push on the queue. block (Optional[bool]): whether the push should be performed in blocking or non-blocking mode. Raises: WrongQueueType: As Push is not supported this queue. """ raise errors.WrongQueueType
[docs] class ZeroMQRequestConnectQueue(ZeroMQRequestQueue): """A Plaso queue backed by a ZeroMQ REQ socket that connects to a port. This queue may only be used to pop items, not to push. """ SOCKET_CONNECTION_TYPE = ZeroMQQueue.SOCKET_CONNECTION_CONNECT
[docs] class ZeroMQBufferedQueue(ZeroMQQueue): """Parent class for buffered Plaso queues. Buffered queues use a regular Python queue to store items that are pushed or popped from the queue without blocking on underlying ZeroMQ operations. This class should not be instantiated directly, a subclass should be instantiated instead. """
[docs] def __init__( self, buffer_timeout_seconds=2, buffer_max_size=10000, delay_open=True, linger_seconds=10, maximum_items=1000, name='Unnamed', port=None, timeout_seconds=5): """Initializes a buffered, ZeroMQ backed queue. Args: buffer_max_size (Optional[int]): maximum number of items to store in the buffer, before or after they are sent/received via ZeroMQ. buffer_timeout_seconds(Optional[int]): number of seconds to wait when doing a put or get to/from the internal buffer. delay_open (Optional[bool]): whether a ZeroMQ socket should be created the first time the queue is pushed to or popped from, rather than at queue object initialization. This is useful if a queue needs to be passed to a child process from a parent process. linger_seconds (Optional[int]): number of seconds that the underlying ZeroMQ socket can remain open after the queue object has been closed, to allow queued items to be transferred to other ZeroMQ sockets. maximum_items (Optional[int]): maximum number of items to queue on the ZeroMQ socket. ZeroMQ refers to this value as "high water mark" or "hwm". Note that this limit only applies at one "end" of the queue. The default of 1000 is the ZeroMQ default value. name (Optional[str]): name to identify the queue. port (Optional[int]): The TCP port to use for the queue. None indicates that the queue should choose a random port to bind to. timeout_seconds (Optional[int]): number of seconds that calls to PopItem and PushItem may block for, before returning queue.QueueEmpty. """ self._buffer_timeout_seconds = buffer_timeout_seconds self._queue = queue.Queue(maxsize=buffer_max_size) self._zmq_thread = None # We need to set up the internal buffer queue before we call super, so that # if the call to super opens the ZMQSocket, the backing thread will work. super(ZeroMQBufferedQueue, self).__init__( delay_open=delay_open, linger_seconds=linger_seconds, maximum_items=maximum_items, name=name, port=port, timeout_seconds=timeout_seconds)
def _CreateZMQSocket(self): """Creates a ZeroMQ socket as well as a regular queue and a thread.""" super(ZeroMQBufferedQueue, self)._CreateZMQSocket() if not self._zmq_thread: thread_name = '{0:s}_zmq_responder'.format(self.name) self._zmq_thread = threading.Thread( target=self._ZeroMQResponder, args=[self._queue], name=thread_name) self._zmq_thread.start() @abc.abstractmethod def _ZeroMQResponder(self, source_queue): """Listens for requests and replies to clients. Args: source_queue (queue.queue): queue to to pull items from. """
[docs] def Close(self, abort=False): """Closes the queue. Args: abort (Optional[bool]): whether the Close is the result of an abort condition. If True, queue contents may be lost. Raises: QueueAlreadyClosed: if the queue is not started, or has already been closed. RuntimeError: if closed or terminate event is missing. """ if not self._closed_event or not self._terminate_event: raise RuntimeError('Missing closed or terminate event.') if not abort and self._closed_event.is_set(): raise errors.QueueAlreadyClosed() self._closed_event.set() if abort: if not self._closed_event.is_set(): logger.warning( '{0:s} queue aborting. Contents may be lost.'.format(self.name)) # We can't determine whether a there might be an operation being performed # on the socket in a separate method or thread, so we'll signal that any # such operation should cease. self._terminate_event.set() self._linger_seconds = 0 if self._zmq_thread: logger.debug('[{0:s}] Waiting for thread to exit.'.format(self.name)) self._zmq_thread.join(timeout=self.timeout_seconds) if self._zmq_thread.is_alive(): logger.error(( '{0:s} ZMQ responder thread did not exit within timeout').format( self.name)) else: logger.debug( '{0:s} queue closing, will linger for up to {1:d} seconds'.format( self.name, self._linger_seconds))
[docs] def Empty(self): """Removes all items from the internal buffer.""" try: while True: self._queue.get(False) except queue.Empty: pass
[docs] class ZeroMQBufferedReplyQueue(ZeroMQBufferedQueue): """Parent class for buffered Plaso queues backed by ZeroMQ REP sockets. This class should not be instantiated directly, a subclass should be instantiated instead. Instances of this class or subclasses may only be used to push items, not to pop. """ _ZMQ_SOCKET_RECEIVE_TIMEOUT_MILLISECONDS = 4000 _ZMQ_SOCKET_SEND_TIMEOUT_MILLISECONDS = 2000 _SOCKET_TYPE = zmq.REP def _ZeroMQResponder(self, source_queue): """Listens for requests and replies to clients. Args: source_queue (queue.Queue): queue to use to pull items from. Raises: RuntimeError: if closed or terminate event is missing. """ if not self._closed_event or not self._terminate_event: raise RuntimeError('Missing closed or terminate event.') logger.debug('{0:s} responder thread started'.format(self.name)) item = None while not self._terminate_event.is_set(): if not item: try: if self._closed_event.is_set(): item = source_queue.get_nowait() else: item = source_queue.get(True, self._buffer_timeout_seconds) except queue.Empty: if self._closed_event.is_set(): break continue try: # We need to receive a request before we can reply with the item. self._ReceiveItemOnActivity(self._zmq_socket) except errors.QueueEmpty: if self._closed_event.is_set() and self._queue.empty(): break continue sent_successfully = self._SendItem(self._zmq_socket, item) item = None if not sent_successfully: logger.error('Queue {0:s} unable to send item.'.format(self.name)) break logger.info('Queue {0:s} responder exiting.'.format(self.name)) self._zmq_socket.close(self._linger_seconds)
[docs] def PopItem(self): """Pops an item of the queue. Provided for compatibility with the API, but doesn't actually work. Raises: WrongQueueType: As Pop is not supported by this queue. """ raise errors.WrongQueueType()
[docs] def PushItem(self, item, block=True): """Push an item on to the queue. If no ZeroMQ socket has been created, one will be created the first time this method is called. Args: item (object): item to push on the queue. block (Optional[bool]): whether the push should be performed in blocking or non-blocking mode. Raises: QueueAlreadyClosed: if the queue is closed. QueueFull: if the internal buffer was full and it was not possible to push the item to the buffer within the timeout. RuntimeError: if closed event is missing. """ if not self._closed_event: raise RuntimeError('Missing closed event.') if self._closed_event.is_set(): raise errors.QueueAlreadyClosed() if not self._zmq_socket: self._CreateZMQSocket() try: if block: self._queue.put(item, timeout=self.timeout_seconds) else: self._queue.put(item, block=False) except queue.Full as exception: raise errors.QueueFull(exception)
[docs] class ZeroMQBufferedReplyBindQueue(ZeroMQBufferedReplyQueue): """A Plaso queue backed by a ZeroMQ REP socket that binds to a port. This queue may only be used to pop items, not to push. """ SOCKET_CONNECTION_TYPE = ZeroMQQueue.SOCKET_CONNECTION_BIND