It is long.
I have a list of usernames and passwords. For each of them I want to log in to my account and do something. I want to use several machines to make it faster. The way I thought about this is the main machine, whose work has only cron, which from time to time checks to see if there is a rabbitmq queue. If so, read the list of usernames and passwords from the file and send them to the rabbitmq queue. Then have a bunch of machines that are subscribed to this queue, whose job the user / pass receives, do something on it, confirm it and go to the next until the queue is empty, and then the main machine will fill it again. While I think that I have everything.
Now my problem. I checked that everything that needs to be done with each user / passes is not so intense, and so I could each of them execute three of them at the same time using python threads. Actually for one machine, I implemented this when I load the user / goes into the python () queue, and then three threads consume this Queue (). Now I want to do something similar, but instead of consuming python () from the queue, each thread of each machine should consume rabbitmq from the queue. I'm stuck here. To start the tests, I started by using the rabbit tutorial.
send.py:
import pika, sys connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel() channel.queue_declare(queue='hello') message = ' '.join(sys.argv[1:]) channel.basic_publish(exchange='', routing_key='hello', body=message) connection.close()
worker.py
import time, pika connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel() channel.queue_declare(queue='hello') def callback(ch, method, properties, body): print ' [x] received %r' % (body,) time.sleep( body.count('.') ) ch.basic_ack(delivery_tag = method.delivery_tag) channel.basic_qos(prefetch_count=1) channel.basic_consume(callback, queue='hello', no_ack=False) channel.start_consuming()
For the above, you can run two worker.py that will subscribe to the rabbitmq queue and consume as expected.
My thread without rabbitmq looks something like this:
runit.py
class Threaded_do_stuff(threading.Thread): def __init__(self, user_queue): threading.Thread.__init__(self) self.user_queue = user_queue def run(self): while True: login = self.user_queue.get() do_stuff(user=login[0], pass=login[1]) self.user_queue.task_done() user_queue = Queue.Queue() for i in range(3): td = Threaded_do_stuff(user_queue) td.setDaemon(True) td.start()
This also works as expected: you fill out the queue and sign 3 threads. Now I want to do something like runit.py, but instead of using python Queue (), using something like worker.py, where the queue is actually a rabbitmq queue.
Here is something that I tried and did not work (and I donβt understand why)
rabbitmq_runit.py
import time, threading, pika class Threaded_worker(threading.Thread): def callback(self, ch, method, properties, body): print ' [x] received %r' % (body,) time.sleep( body.count('.') ) ch.basic_ack(delivery_tag = method.delivery_tag) def __init__(self): threading.Thread.__init__(self) self.connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) self.channel = self.connection.channel() self.channel.queue_declare(queue='hello') self.channel.basic_qos(prefetch_count=1) self.channel.basic_consume(self.callback, queue='hello') def run(self): print 'start consuming' self.channel.start_consuming() for _ in range(3): print 'launch thread' td = Threaded_worker() td.setDaemon(True) td.start()
I would expect this to start three threads, each of which is blocked by .start_consuming (), which just stays there, waiting for the rabbitmq queue to send them sometihing. Instead, this program starts, prints and exits. The structure of the existing one is also strange:
launch thread launch thread start consuming launch thread start consuming
In particular, a notice that there is no βstart of consumption".
What's happening?
EDIT: One of the answers I found for a similar question is here Consuming a rabbitmq message queue with multiple threads (Python Kombu) and the answer is to "use celery" whatever that means. I do not buy it, I do not need anything remote, as complex as celery. In particular, I am not trying to configure RPC, and I do not need to read responses from do_stuff routines.
EDIT 2: I would like the print template to be as follows. I do
python send.py first message...... python send.py second message. python send.py third message. python send.py fourth message.
and the print pattern will be
launch thread start consuming [x] received 'first message......' launch thread start consuming [x] received 'second message.' launch thread start consuming [x] received 'third message.' [x] received 'fourth message.'