How to deal with ZMQ sockets that do not have thread safety?

I used ZMQ in some Python applications for a while, but most recently I decided to override one of them in Go, and I realized that ZMQ sockets are not thread safe.

The original Python implementation uses an event loop that looks like this:

while running:
    socks = dict(poller.poll(TIMEOUT))
    if socks.get(router) == zmq.POLLIN:
        client_id = router.recv()
        _ = router.recv()
        data = router.recv()
        requests.append((client_id, data))

    for req in requests:
        rep = handle_request(req)
        if rep:
            replies.append(rep)
            requests.remove(req)

    for client_id, data in replies:
        router.send(client_id, zmq.SNDMORE)
        router.send(b'', zmq.SNDMORE)
        router.send(data)
        del replies[:]

The problem is that the answer may not be ready for the first skip, so when I have pending requests, I have to ask for a very short timeout, or the clients will wait longer than necessary, and the application ends up using a lot of CPU to poll .

When I decided to override it in Go, I thought it would be as simple as this, avoiding the problem, using an infinite timeout when polling:

for {
    sockets, _ := poller.Poll(-1) 
    for _, socket := range sockets {
        switch s := socket.Socket; s {
        case router:
            msg, _ := s.RecvMessage(0)
            client_id := msg[0]
            data := msg[2]
            go handleRequest(router, client_id, data)                
        }
    }
}

, . libzmq. :

  • zmq4 docs, sync.Mutex / . . , ZMQ .

  • goroutine / , req/rep Python. , .

  • , 2, GOMAXPROCS=1. , , , Poll().

  • req/rep 2, runtime.LockOSThread, , . , . , .

  • , 4, - Python. , , Python.

  • goroutines, . , .

  • zmq TCP, . , .

, , 6 - , ZMQ , goroutines , , , . ?


, POPL inproc PULL goroutine , . , , , Python.

+4
2

I 1,5 , https://github.com/vaughan0/go-zmq/blob/master/channels.go pebbe/zmq4. , ( ) .

gist , pebbe/zmq4 ( Socket). , Socket Socket, , .

- Socket (, s), :

channels := s.Channels()
outBound := channels.Out()
inBound := channels.In()

[][]byte, goroutines, goroutine, , Poller .

+4

pebbe/zmq4 Reactor. Go, , , , - , , Python. zmq inproc, , - , . , , .

+1

All Articles