How to pause and resume consumption gracefully in rabbitmq, pika python

I use basic_consume () to receive messages and basic_cancel to cancel consumption, but there is a problem.

Here is the pika.channel code

def basic_consume(self, consumer_callback, queue='', no_ack=False, exclusive=False, consumer_tag=None): """Sends the AMQP command Basic.Consume to the broker and binds messages for the consumer_tag to the consumer callback. If you do not pass in a consumer_tag, one will be automatically generated for you. Returns the consumer tag. For more information on basic_consume, see: http://www.rabbitmq.com/amqp-0-9-1-reference.html#basic.consume :param method consumer_callback: The method to callback when consuming :param queue: The queue to consume from :type queue: str or unicode :param bool no_ack: Tell the broker to not expect a response :param bool exclusive: Don't allow other consumers on the queue :param consumer_tag: Specify your own consumer tag :type consumer_tag: str or unicode :rtype: str """ self._validate_channel_and_callback(consumer_callback) # If a consumer tag was not passed, create one consumer_tag = consumer_tag or 'ctag%i.%s' % (self.channel_number, uuid.uuid4().get_hex()) if consumer_tag in self._consumers or consumer_tag in self._cancelled: raise exceptions.DuplicateConsumerTag(consumer_tag) self._consumers[consumer_tag] = consumer_callback self._pending[consumer_tag] = list() self._rpc(spec.Basic.Consume(queue=queue, consumer_tag=consumer_tag, no_ack=no_ack, exclusive=exclusive), self._on_eventok, [(spec.Basic.ConsumeOk, {'consumer_tag': consumer_tag})]) return consumer_tag def basic_cancel(self, callback=None, consumer_tag='', nowait=False): """This method cancels a consumer. This does not affect already delivered messages, but it does mean the server will not send any more messages for that consumer. The client may receive an arbitrary number of messages in between sending the cancel method and receiving the cancel-ok reply. It may also be sent from the server to the client in the event of the consumer being unexpectedly cancelled (ie cancelled for any reason other than the server receiving the corresponding basic.cancel from the client). This allows clients to be notified of the loss of consumers due to events such as queue deletion. :param method callback: Method to call for a Basic.CancelOk response :param str consumer_tag: Identifier for the consumer :param bool nowait: Do not expect a Basic.CancelOk response :raises: ValueError """ self._validate_channel_and_callback(callback) if consumer_tag not in self.consumer_tags: return if callback: if nowait is True: raise ValueError('Can not pass a callback if nowait is True') self.callbacks.add(self.channel_number, spec.Basic.CancelOk, callback) self._cancelled.append(consumer_tag) self._rpc(spec.Basic.Cancel(consumer_tag=consumer_tag, nowait=nowait), self._on_cancelok, [(spec.Basic.CancelOk, {'consumer_tag': consumer_tag})] if nowait is False else []) 

As you can see every time I cancel consumption, consumer_tag is added to the list with the list. And if I use this tag again in basic_consume, a duplicateConsumer exception will be thrown. Well, I could use a new user_tag every time, but actually it is not. Because sooner or later, the generated tag will exactly match some of the previous ones.

How can I pause and resume consumption gracefully in pika?

+6
source share
2 answers

Is there a reason why you define your own consumer_tags ? You can pass an empty string and let RabbitMQ generate consumer tags for you. The answer from basic.consume , which is basic.consume-ok , will return the generated consumer_tag , so you can use it later to stop consumption.

See: http://www.rabbitmq.com/amqp-0-9-1-reference.html#basic.consume-ok

+1
source

It seems that Pika is doing more than necessary - there is no need to create a consumer tag if it is not provided (the server will be), and it also does not need to look for duplicate consumer tags (renewal using the same tag is supported by the server).

So I'm not sure how to do this with Pika - write a mistake, I suppose.

+1
source

All Articles