Disabling JeroMQ correctly

I am wondering how to close JeroMQ correctly, so far I know three methods in which everyone has their pros and cons, and I do not know which one is the best.

Situation:

  • Theme A: owns context, provides start / stop methods
  • Topic B: Actual Listener Stream

My current method:

Theme A

static ZContext CONTEXT = new ZContext(); Thread thread; public void start() { thread = new Thread(new B()).start(); } public void stop() { thread.stopping = true; thread.join(); } 

Thread b

 boolean stopping = false; ZMQ.Socket socket; public void run() { socket = CONTEXT.createSocket(ROUTER); ... // socket setup socket.setReceiveTimeout(10); while (!stopping) { socket.recv(); } if (NUM_SOCKETS >= 1) { CONTEXT.destroySocket(socket); } else { CONTEXT.destroy(); } } 

It works just fine. 10 ms to shutdown is not a problem for me, but I will unnecessarily increase the load on the CPU when there are no messages. At the moment, I prefer this one.


The second method splits the socket between two threads:

Theme A

 static ZContext CONTEXT = new ZContext(); ZMQ.Socket socket; Thread thread; public void start() { socket = CONTEXT.createSocket(ROUTER); ... // socket setup thread = new Thread(new B(socket)).start(); } public void stop() { thread.stopping = true; CONTEXT.destroySocket(socket); } 

Thread b

 boolean stopping = false; ZMQ.Socket socket; public void run() { try { while (!stopping) { socket.recv(); } } catch (ClosedSelection) { // socket closed by A socket = null; } if (socket != null) { // close socket myself if (NUM_SOCKETS >= 1) { CONTEXT.destroySocket(socket); } else { CONTEXT.destroy(); } } } 

It works like a charm, but even if recv already blocks the exception, it sometimes fails. If I wait one millisecond after the start of thread A, there will always be an exception. I don't know if this is a mistake or just the result of my misuse, as I am sharing a socket.


"revite" asked this question earlier ( https://github.com/zeromq/jeromq/issues/116 ) and received an answer, which is the third solution: https://github.com/zeromq/jeromq/blob/master/src /test/java/guide/interrupt.java

Summary: They call ctx.term() and break the thread lock in socket.recv() .

This works fine, but I don't want to interrupt my entire context, but only this single socket. I would have to use one context for each socket, so I could not use inproc.

Summary

At the moment, I don’t know how to get thread B from its blocking state, besides using timeouts, split the socket or complete the whole context.

What is the right way to do this?

+7
sockets zeromq recv jeromq
source share
3 answers

It is often mentioned that you can simply destroy the zmq context and everything that shares this context will come out, but this creates a nightmare because your output code should do everything possible to avoid a minefield accidentally invoking dead socket objects.

Trying to close the socket does not work either because they are not thread safe and you will end up with a failure.

ANSWER: Best to do as the ZeroMQ manual suggests for any use across multiple threads; use zmq sockets, not muffs / thread locks / etc. Set up an extra socket listener that you plug in and send something to a halt, and your run () should use JeroMQ Poller to check which of the two sockets gets something - if the extra socket gets something, then quit .

+4
source share

Old question, but just in case ...

I would recommend checking out the source of ZThread . You should be able to create an instance of IAttachedRunnable , which you can pass to the fork method, and the instance's launch method will be passed to the PAIR socket and executed in another, and the fork will return the connected PAIR socket to communicate with the PAIR socket that your IAttachedRunnable received.

+3
source share

Check the jeromq source here , even when you do a β€œlock” recv, you still write the CPU all the time (the thread never sleeps). If you are worried about this, ask the second thread to sleep between polls and let the parent thread interrupt. Something like (only relevant parts):

Theme A

 public void stop() { thread.interrupt(); thread.join(); } 

Thread b

 while (!Thread.interrupted()) { socket.recv(); // do whatever try { Thread.sleep(10); //milliseconds } catch (InterruptedException e) { break; } } 

In addition, with regard to your second solution, in general, you should not share sockets between threads - the zeromq manual says quite clearly about this - "Do not separate ØMQ sockets between threads. ØMQ sockets are not thread safe." Remember that the main advantage of ZMQ is IPC - streams that exchange data through connected sockets that do not have the same end of the same socket. There is no need for things like general logical stop variables.

+1
source share

All Articles