Zeromq pub / sub examples with C # winform

I am trying to create a C # Winform application that uses ZeroMQ (clrzmq.net bindings (x86) via nuget) in a pub / sub-model.

After a long search, I can only find individual C # examples where the code uses the while statement to process new messages indefinitely. When I try to use these examples, I don’t know where to put the code, and it just blocks gui and everything else.

I don’t know if it is impossible to do without using another thread, but I got the impression that the asynchronous behavior of ZeroMQ could work without encoding additional threads. Maybe I just don’t know where to put the zeromq code, or maybe I really need another thread.

If someone could provide a simple pub / sub with directions on where to actually paste the code into the default C # Winform application, it would be very helpful.

+8
c # winforms zeromq
source share
1 answer

I assume that you are using clrzmq ZeroMq wrapper in your project. As far as I know, it is not possible to get a non-blocking message in a simple loop using clrzmq, it will block either indefinitely for a certain time (by submitting a timeout to the receiving method), or until you receive the message.

However, it’s quite simple to configure the thread to periodically poll the socket and enter incoming messages on Queue . Then you can use, for example, simple WinForms Timer to periodically delete any pending messages from this (common) Queue . Here is an example of a streaming subscriber:

 public class ZeroMqSubscriber { private readonly ZmqContext _zmqContext; private readonly ZmqSocket _zmqSocket; private readonly Thread _workerThread; private readonly ManualResetEvent _stopEvent = new ManualResetEvent(false); private readonly object _locker = new object(); private readonly Queue<string> _queue = new Queue<string>(); public ZeroMqSubscriber(string endPoint) { _zmqContext = ZmqContext.Create(); _zmqSocket = _zmqContext.CreateSocket(SocketType.SUB); _zmqSocket.Connect(endPoint); _zmqSocket.SubscribeAll(); _workerThread = new Thread(ReceiveData); _workerThread.Start(); } public string[] GetMessages() { lock (_locker) { var messages = _queue.ToArray(); _queue.Clear(); return messages; } } public void Stop() { _stopEvent.Set(); _workerThread.Join(); } private void ReceiveData() { try { while (!_stopEvent.WaitOne(0)) { var message = _zmqSocket.Receive(Encoding.UTF8, new TimeSpan(0, 0, 0, 1)); if (string.IsNullOrEmpty(message)) continue; lock (_locker) _queue.Enqueue(message); } } finally { _zmqSocket.Dispose(); _zmqContext.Dispose(); } } } 

From Form you periodically poll the queue (this example uses Forms Timer and simply adds the message data to the Textbox ):

 private readonly ZeroMqSubscriber _zeroMqSubscriber = new ZeroMqSubscriber("tcp://127.0.0.1:5000"); void ReceiveTimerTick(object sender, EventArgs e) { var messages = _zeroMqSubscriber.GetMessages(); foreach (var message in messages) _textbox.AppendText(message + Environment.NewLine); } 
+7
source share

All Articles