0mq one-to-many connection

What is the best way to establish two-way communication between processes using 0mq? I need to create some background processes that will wait for commands from the main process, perform some calculations and return the result back to the main process.

+4
source share
2 answers

There are several ways to do this. The simplest approach might be to use REQ / REP sockets. Each background process / worker must have a REP socket, and you must use the REQ socket to communicate with them:

 import zmq def worker(addr): context = zmq.Context() socket = context.socket(zmq.REP) socket.bind(addr) while True: # get message from boss msg = socket.recv() # ...do smth # send back results socket.send(msg) if __name__ == '__main__': # spawn 5 workers from multiprocessing import Process for i in range(5): Process(target=worker, args=('tcp://127.0.0.1:500%d' % i,)).start() 

You will need to connect to each employee to send him a message and get the results:

 context = zmq.Context() socket = context.socket(zmq.REQ) socket.connect(worker_addr) socket.send('message') msg = socket.recv() 

Another approach would be to use PUB / SUB to disable messages for workers and PUSH / PULL to get results:

 import zmq def worker(worker_id, publisher_addr, results_addr): context = zmq.Context() sub = context.socket(zmq.SUB) sub.connect(publisher_addr) sub.setsockopt(zmq.SUBSCRIBE, worker_id) push = context.socket(zmq.PUSH) push.connect(results_addr) while True: msg = sub.recv_multipart()[1] # do smth, send off results push.send_multipart([worker_id, msg]) if __name__ == '__main__': publisher_addr = 'tcp://127.0.0.1:5000' results_addr = 'tcp://127.0.0.1:5001' # launch some workers into space from multiprocessing import Process for i in range(5): Process(target=worker, args=('worker-%d' % i, publisher_addr, results_addr,)).start() 

To pass a command to a specific employee, you should do something like:

 context = zmq.Context() pub = context.socket(zmq.PUB) pub.bind(publisher_addr) # send message to worker-1 pub.send_multipart(['worker-1', 'hello']) 

Pull the results:

 context = zmq.Context() pull = context.socket(zmq.PULL) pull.bind(results_addr) while True: worker_id, result = pull.recv_multipart() print worker_id, result 
+6
source

Consider using Request Reply Broker , but replace the REQ socket with DEALER. DEALER does not block sending and automatically loads balancing traffic in relation to your employees.

In the picture, Client will be your main process and Service A/B/C your background processes (workers) . main process must snap to the endpoint. Workers must connect to the primary endpoint of the process to receive work items.

In the main process save the list of work items and send the time. If there is no response for some time, simply resubmit the work item, as worker may have died.

Request Reply Broker

+3
source

All Articles