Celery + rabbitmq empty queue

I use celery + rabbit. I cannot find a convenient way to clear the queue in celery + rabbitmq. I do this with removal and create vhost.

rabbitmqctl delete_vhost <vhostpath> rabbitmqctl add_vhost <vhostpath> 

Is this the preferred way to clear the celery line?

+4
source share
3 answers

I'm not quite sure how celery works, but I suspect that you want to clear the RabbitMQ queue (you are currently simulating this by removing queues and adding celery).

You can install the RabbitMQ Control Plugin . Its WebUI will allow you to clear the required queue. It should also tell you which queue you are aiming at, so you won’t need to delete everything.

As soon as you find out in which queue, you can purge it programmatically. For example, using py-amqplib , you would do something like:

 from amqplib import client_0_8 as amqp conn = amqp.Connection(host="localhost:5672", userid="guest", password="guest", virtual_host="/", insist=False) conn = conn.channel() conn.queue_purge("the-target-queue") 

Probably the best way to do this.

+13
source

If you run into this problem because you used rabbitmq for the backend of the result, and as a result you got too many queues, then I would suggest using a different backend of the result (redis or mongodb).

This is one of the known shortcomings of celery. It will create a separate queue for each result if you use the result database.

If you still want to stick with amqp as a result. It will be cleaned after 24 hours. However, you can set it to a lower value using the CELERY_AMQP_TASK_RESULT_EXPIRES parameter.

+2
source

If you need to delete ALL items in the queue (especially when the list is long)

1) Saves all elements in a file

 sudo rabbitmqctl list_queues -p /yourvhost name > queues.txt 

don't forget to remove the first and last lines from 'queues.txt'

2) Use the specified python code to complete the task

 from amqplib import client_0_8 as amqp conn = amqp.Connection(host="127.0.0.1:5672", userid="guest", password="guest", virtual_host="/yourvhost", insist=False) conn = conn.channel() queues = None with open('queues.txt', 'r') as f: queues = f.readlines() for q in queues: if q: #print 'deleting %s' % q conn.queue_purge(q.strip()) print 'purged %d items' % len(queues) 
+1
source

All Articles