Consuming rabbitmq queue from inside python threads

It is long.

I have a list of usernames and passwords. For each of them I want to log in to my account and do something. I want to use several machines to make it faster. The way I thought about this is the main machine, whose work has only cron, which from time to time checks to see if there is a rabbitmq queue. If so, read the list of usernames and passwords from the file and send them to the rabbitmq queue. Then have a bunch of machines that are subscribed to this queue, whose job the user / pass receives, do something on it, confirm it and go to the next until the queue is empty, and then the main machine will fill it again. While I think that I have everything.

Now my problem. I checked that everything that needs to be done with each user / passes is not so intense, and so I could each of them execute three of them at the same time using python threads. Actually for one machine, I implemented this when I load the user / goes into the python () queue, and then three threads consume this Queue (). Now I want to do something similar, but instead of consuming python () from the queue, each thread of each machine should consume rabbitmq from the queue. I'm stuck here. To start the tests, I started by using the rabbit tutorial.

send.py:

import pika, sys connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel() channel.queue_declare(queue='hello') message = ' '.join(sys.argv[1:]) channel.basic_publish(exchange='', routing_key='hello', body=message) connection.close() 

worker.py

 import time, pika connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel() channel.queue_declare(queue='hello') def callback(ch, method, properties, body): print ' [x] received %r' % (body,) time.sleep( body.count('.') ) ch.basic_ack(delivery_tag = method.delivery_tag) channel.basic_qos(prefetch_count=1) channel.basic_consume(callback, queue='hello', no_ack=False) channel.start_consuming() 

For the above, you can run two worker.py that will subscribe to the rabbitmq queue and consume as expected.

My thread without rabbitmq looks something like this:

runit.py

 class Threaded_do_stuff(threading.Thread): def __init__(self, user_queue): threading.Thread.__init__(self) self.user_queue = user_queue def run(self): while True: login = self.user_queue.get() do_stuff(user=login[0], pass=login[1]) self.user_queue.task_done() user_queue = Queue.Queue() for i in range(3): td = Threaded_do_stuff(user_queue) td.setDaemon(True) td.start() ## fill up the queue for user in list_users: user_queue.put(user) ## go! user_queue.join() 

This also works as expected: you fill out the queue and sign 3 threads. Now I want to do something like runit.py, but instead of using python Queue (), using something like worker.py, where the queue is actually a rabbitmq queue.

Here is something that I tried and did not work (and I don’t understand why)

rabbitmq_runit.py

 import time, threading, pika class Threaded_worker(threading.Thread): def callback(self, ch, method, properties, body): print ' [x] received %r' % (body,) time.sleep( body.count('.') ) ch.basic_ack(delivery_tag = method.delivery_tag) def __init__(self): threading.Thread.__init__(self) self.connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) self.channel = self.connection.channel() self.channel.queue_declare(queue='hello') self.channel.basic_qos(prefetch_count=1) self.channel.basic_consume(self.callback, queue='hello') def run(self): print 'start consuming' self.channel.start_consuming() for _ in range(3): print 'launch thread' td = Threaded_worker() td.setDaemon(True) td.start() 

I would expect this to start three threads, each of which is blocked by .start_consuming (), which just stays there, waiting for the rabbitmq queue to send them sometihing. Instead, this program starts, prints and exits. The structure of the existing one is also strange:

 launch thread launch thread start consuming launch thread start consuming 

In particular, a notice that there is no β€œstart of consumption".

What's happening?

EDIT: One of the answers I found for a similar question is here Consuming a rabbitmq message queue with multiple threads (Python Kombu) and the answer is to "use celery" whatever that means. I do not buy it, I do not need anything remote, as complex as celery. In particular, I am not trying to configure RPC, and I do not need to read responses from do_stuff routines.

EDIT 2: I would like the print template to be as follows. I do

 python send.py first message...... python send.py second message. python send.py third message. python send.py fourth message. 

and the print pattern will be

 launch thread start consuming [x] received 'first message......' launch thread start consuming [x] received 'second message.' launch thread start consuming [x] received 'third message.' [x] received 'fourth message.' 
+7
python multithreading rabbitmq
source share
1 answer

The problem is that you are creating a daemon thread:

 td = Threaded_worker() td.setDaemon(True) # Shouldn't do that. td.start() 

Daemonic threads will be completed as soon as the output of main thread s:

A thread can be marked as a "daemon thread". The meaning of this flag is that the entire Python program exits when only daemon threads are left. The initial value is inherited from the creating thread. the flag can be set via the daemon property.

Leave setDaemon(True) , and you will see that it behaves as you expect.

In addition, the pika FAQ contains a note on how to use it in streams:

Pika has no idea about flow in code. If you want to use a threaded Pika, make sure you have a Pika connection for the thread created in that thread. It is not safe to share one Pika connection in streams.

This suggests that you must move everything you do to __init__() to run() so that the connection is created in the same thread that you are actually consuming from the queue.

+10
source share

All Articles