Python producer / consumer with exception handling

I'm trying to write a seemingly simple implementation of a classic manufacturer - a consumer idiom in Python. There is one relatively fast producer for a few slower consumers. In principle, this is easy to do with the Queue module , and the library documentation has an example that spawns only a few lines of code.

However, I also want the code to work correctly in case of exceptions. Both manufacturers and all consumers should stop if one of the following events occurs:

  • manufacturer failure with exception
  • any user fails with an exception
  • user stops the program (calls KeyboardInterrupt)

After that, the whole process should not raise the initial exception in order to inform the caller of what went wrong.

The main problem is to cleanly end the consumer flow without ending in a blocking connection (). It seems popular to set Thread.deamon = True, but for my understanding of this leads to resource leaks in the event of a manufacturer failure with the exception.

I managed to write an implementation that meets my requirements (see below). However, I believe that the code will be much more complicated than expected.

Is there an easier way to handle this scenario?

Here are some sample calls and the final summary log message from my current implementation:

Produce and consume 10 items:

$ python procon.py
INFO:root:processed all items

No items:

$ python procon.py --items 0
INFO:root:processed all items

Produce 5 points for 10 consumers, thus using only a few of the available consumers:

$ python procon.py --items 5 --consumers 10
INFO:root:processed all items

, Control-C:

$ python procon.py
^CWARNING:root:interrupted by user

3:

$ python procon.py --producer-fails-at 3
ERROR:root:cannot produce item 3

3:

$ python procon.py --consumer-fails-at 3
ERROR:root:cannot consume item 3

:

$ python procon.py --items 10 --consumer-fails-at 9
ERROR:root:cannot consume item 9

, , :

"""
Consumer/producer to test exception handling in threads. Both the producer
and the consumer can be made to fail deliberately when processing a certain
item using command line options.
"""
import logging
import optparse
import Queue
import threading
import time

_PRODUCTION_DELAY = 0.1
_CONSUMPTION_DELAY = 0.3

# Delay for ugly hacks and polling loops.
_HACK_DELAY = 0.05

class _Consumer(threading.Thread):
    """
    Thread to consume items from an item queue filled by a producer, which can
    be told to terminate in two ways:

    1. using `finish()`, which keeps processing the remaining items on the
       queue until it is empty
    2. using `cancel()`, which finishes consuming the current item and then
       terminates
    """
    def __init__(self, name, itemQueue, failedConsumers):
        super(_Consumer, self).__init__(name=name)
        self._log = logging.getLogger(name)
        self._itemQueue = itemQueue
        self._failedConsumers = failedConsumers
        self.error = None
        self.itemToFailAt = None
        self._log.info(u"waiting for items to consume")
        self._isFinishing = False
        self._isCanceled = False

    def finish(self):
        self._isFinishing = True

    def cancel(self):
        self._isCanceled = True

    def consume(self, item):
        self._log.info(u"consume item %d", item)
        if item == self.itemToFailAt:
            raise ValueError("cannot consume item %d" % item)
        time.sleep(_CONSUMPTION_DELAY)

    def run(self):
        try:
            while not (self._isFinishing and self._itemQueue.empty()) \
                    and not self._isCanceled:
                # HACK: Use a timeout when getting the item from the queue
                # because between `empty()` and `get()` another consumer might
                # have removed it.
                try:
                    item = self._itemQueue.get(timeout=_HACK_DELAY)
                    self.consume(item)
                except Queue.Empty:
                    pass
            if self._isCanceled:
                self._log.info(u"canceled")
            if self._isFinishing:
                self._log.info(u"finished")
        except Exception, error:
            self._log.error(u"cannot continue to consume: %s", error)
            self.error = error
            self._failedConsumers.put(self)


class Worker(object):
    """
    Controller for interaction between producer and consumers.
    """
    def __init__(self, itemsToProduceCount, itemProducerFailsAt,
            itemConsumerFailsAt, consumerCount):
        self._itemsToProduceCount = itemsToProduceCount
        self._itemProducerFailsAt = itemProducerFailsAt
        self._itemConsumerFailsAt = itemConsumerFailsAt
        self._consumerCount = consumerCount
        self._itemQueue = Queue.Queue()
        self._failedConsumers = Queue.Queue()
        self._log = logging.getLogger("producer")
        self._consumers = []

    def _possiblyRaiseConsumerError(self):
            if not self._failedConsumers.empty():
                failedConsumer = self._failedConsumers.get()
                self._log.info(u"handling failed %s", failedConsumer.name)
                raise failedConsumer.error

    def _cancelAllConsumers(self):
        self._log.info(u"canceling all consumers")
        for consumerToCancel in self._consumers:
            consumerToCancel.cancel()
        self._log.info(u"waiting for consumers to be canceled")
        for possiblyCanceledConsumer in self._consumers:
            # In this case, we ignore possible consumer errors because there
            # already is an error to report.
            possiblyCanceledConsumer.join(_HACK_DELAY)
            if possiblyCanceledConsumer.isAlive():
                self._consumers.append(possiblyCanceledConsumer)

    def work(self):
        """
        Launch consumer thread and produce items. In case any consumer or the
        producer raise an exception, fail by raising this exception  
        """
        self.consumers = []
        for consumerId in range(self._consumerCount):
            consumerToStart = _Consumer(u"consumer %d" % consumerId,
                self._itemQueue, self._failedConsumers)
            self._consumers.append(consumerToStart)
            consumerToStart.start()
            if self._itemConsumerFailsAt is not None:
                consumerToStart.itemToFailAt = self._itemConsumerFailsAt

        self._log = logging.getLogger("producer  ")
        self._log.info(u"producing %d items", self._itemsToProduceCount)

        for itemNumber in range(self._itemsToProduceCount):
            self._possiblyRaiseConsumerError()
            self._log.info(u"produce item %d", itemNumber)
            if itemNumber == self._itemProducerFailsAt:
                raise ValueError("ucannot produce item %d" % itemNumber)
            # Do the actual work.
            time.sleep(_PRODUCTION_DELAY)
            self._itemQueue.put(itemNumber)

        self._log.info(u"telling consumers to finish the remaining items")
        for consumerToFinish in self._consumers:
            consumerToFinish.finish()
        self._log.info(u"waiting for consumers to finish")
        for possiblyFinishedConsumer in self._consumers:
            self._possiblyRaiseConsumerError()
            possiblyFinishedConsumer.join(_HACK_DELAY)
            if possiblyFinishedConsumer.isAlive():
                self._consumers.append(possiblyFinishedConsumer)


if __name__ == "__main__":
    logging.basicConfig(level=logging.INFO)
    parser = optparse.OptionParser()
    parser.add_option("-c", "--consumer-fails-at", metavar="NUMBER",
        type="long", help="number of items at which consumer fails (default: %default)")
    parser.add_option("-i", "--items", metavar="NUMBER", type="long",
        help="number of items to produce (default: %default)", default=10)
    parser.add_option("-n", "--consumers", metavar="NUMBER", type="long",
        help="number of consumers (default: %default)", default=2)
    parser.add_option("-p", "--producer-fails-at", metavar="NUMBER",
        type="long", help="number of items at which producer fails (default: %default)")
    options, others = parser.parse_args()
    worker = Worker(options.items, options.producer_fails_at,
        options.consumer_fails_at, options.consumers)
    try:
        worker.work()
        logging.info(u"processed all items")
    except KeyboardInterrupt:
        logging.warning(u"interrupted by user")
        worker._cancelAllConsumers()
    except Exception, error:
        logging.error(u"%s", error)
        worker._cancelAllConsumers()
+5
2

, , , http://pypi.python.org/pypi/proconex/. https://github.com/roskakori/proconex. , - , .

. , with finally:worker.close(), .

:

import logging
import proconex

class IntegerProducer(proconex.Producer):
    def items(self):
        for item in xrange(10):
            logging.info('produce %d', item)
            yield item

class IntegerConsumer(proconex.Consumer):
    def consume(self, item):
        logging.info('consume %d with %s', item, self.name)

if __name__ == '__main__':
    logging.basicConfig(level=logging.INFO)
    producer = IntegerProducer()
    consumer1 = IntegerConsumer('consumer1')
    consumer2 = IntegerConsumer('consumer2')

    with proconex.Worker(producer, [consumer1, consumer2]) as worker:
        worker.work()
0

, , , . join(), . get() . cancel() .

, Python Queue . :

  • ( )
  • python ( Queue Python)
  • / ( , /)
  • /
+2

All Articles