Source code for plaso.multi_processing.multi_process_queue

# -*- coding: utf-8 -*-
"""A multiprocessing-backed queue."""

from __future__ import unicode_literals

import multiprocessing
# We need to check that we aren't asking for a bigger queue than the
# platform supports, which requires access to this protected module.
import _multiprocessing


# The 'Queue' module was renamed to 'queue' in Python 3
try:
  import Queue
except ImportError:
  import queue as Queue  # pylint: disable=import-error

from plaso.engine import plaso_queue
from plaso.lib import errors
from plaso.multi_processing import logger


[docs]class MultiProcessingQueue(plaso_queue.Queue): """Multi-processing queue.""" def __init__(self, maximum_number_of_queued_items=0, timeout=None): """Initializes a multi-processing queue. Args: maximum_number_of_queued_items (Optional[int]): maximum number of queued items, where 0 represents no limit. timeout (Optional[float]): number of seconds for the get to time out, where None will block until a new item is put onto the queue. """ super(MultiProcessingQueue, self).__init__() self._timeout = timeout # maxsize contains the maximum number of items allowed to be queued, # where 0 represents unlimited. # We need to check that we aren't asking for a bigger queue than the # platform supports, which requires access to this internal # multiprocessing value. # pylint: disable=no-member,protected-access queue_max_length = _multiprocessing.SemLock.SEM_VALUE_MAX # pylint: enable=no-member,protected-access if maximum_number_of_queued_items > queue_max_length: logger.warning(( 'Requested maximum queue size: {0:d} is larger than the maximum ' 'size supported by the system. Defaulting to: {1:d}').format( maximum_number_of_queued_items, queue_max_length)) maximum_number_of_queued_items = queue_max_length # This queue appears not to be FIFO. self._queue = multiprocessing.Queue(maxsize=maximum_number_of_queued_items)
[docs] def Open(self): """Opens the queue."""
return
[docs] def Close(self, abort=False): """Closes the queue. This needs to be called from any process or thread putting items onto the queue. Args: abort (Optional[bool]): True if the close was issued on abort. """ if abort: # Prevent join_thread() from blocking. self._queue.cancel_join_thread() self._queue.close()
self._queue.join_thread()
[docs] def Empty(self): """Empties the queue.""" try: while True: self._queue.get(False) except Queue.Empty:
pass
[docs] def IsEmpty(self): """Determines if the queue is empty."""
return self._queue.empty()
[docs] def PushItem(self, item, block=True): """Pushes an item onto the queue. Args: item (object): item to add. block (Optional[bool]): True to block the process when the queue is full. Raises: QueueFull: if the item could not be pushed the queue because it's full. """ try: self._queue.put(item, block=block) except Queue.Full as exception:
raise errors.QueueFull(exception)
[docs] def PopItem(self): """Pops an item off the queue. Returns: object: item from the queue. Raises: QueueClose: if the queue has already been closed. QueueEmpty: if no item could be retrieved from the queue within the specified timeout. """ try: # If no timeout is specified the queue will block if empty otherwise # a Queue.Empty exception is raised. return self._queue.get(timeout=self._timeout) except KeyboardInterrupt: raise errors.QueueClose # If close() is called on the multiprocessing.Queue while it is blocking # on get() it will raise IOError. except IOError: raise errors.QueueClose except Queue.Empty:
raise errors.QueueEmpty