C # Waiting for several events in a Producer / Consumer

I implement a data link layer using a producer / consumer pattern . The level of the data channel has its own stream and a state machine for transmitting the protocol of the data line via cable (Ethernet, RS-232 ...). The physical layer interface is represented as System.IO.Stream. Another thread writes messages and reads messages from the data link object.

A data link object has a wait state that must wait for one of four conditions:

  • Byte received
  • Message from network stream
  • Standby timer expired
  • All messages have been canceled by the network layer.

I find it difficult to determine the best way to do this without dividing the connection into a read / write stream (thereby greatly increasing complexity). Here's how I can get 3 out of 4:

// Read a byte from 'stream'. Timeout after 10 sec. Monitor the cancellation token. stream.ReadTimeout = 10000; await stream.ReadAsync(buf, 0, 1, cts.Token); 

or

 BlockingCollection<byte[]> SendQueue = new ...; ... // Check for a message from network layer. Timeout after 10 seconds. // Monitor cancellation token. SendQueue.TryTake(out msg, 10000, cts.Token); 

What should I do to block a thread, waiting for all four conditions? All recommendations are welcome. I am not configured for any data architectures or structures.

EDIT: ******** Thanks for helping everyone. Here is my solution ********

At first, I don’t think there was an asynchronous implementation of the producer / consumer queue. So I did something similar overflowing https://stackoverflow.com/a/2126188/212880 .

I need an external and internal cancellation source to stop the consumer flow and cancel the intermediate tasks, respectively, similarly to this article .

 byte[] buf = new byte[1]; using (CancellationTokenSource internalTokenSource = new CancellationTokenSource()) { CancellationToken internalToken = internalTokenSource.Token; CancellationToken stopToken = stopTokenSource.Token; using (CancellationTokenSource linkedCts = CancellationTokenSource.CreateLinkedTokenSource(stopToken, internalToken)) { CancellationToken ct = linkedCts.Token; Task<int> readTask = m_stream.ReadAsync(buf, 0, 1, ct); Task<byte[]> msgTask = m_sendQueue.DequeueAsync(ct); Task keepAliveTask = Task.Delay(m_keepAliveTime, ct); // Wait for at least one task to complete await Task.WhenAny(readTask, msgTask, keepAliveTask); // Next cancel the other tasks internalTokenSource.Cancel(); try { await Task.WhenAll(readTask, msgTask, keepAliveTask); } catch (OperationCanceledException e) { if (e.CancellationToken == stopToken) throw; } if (msgTask.IsCompleted) // Send the network layer message else if (readTask.IsCompleted) // Process the byte from the physical layer else Contract.Assert(keepAliveTask.IsCompleted); // Send a keep alive message } } 
+6
source share
3 answers

In this case, I would only use undo tokens to undo. A repeatable timeout, such as a save timer, is better represented as a timer.

So, I would simulate this as three excellent tasks. First, the cancellation token:

All messages were canceled by the network layer.

 CancellationToken token = ...; 

Then three simultaneous operations:

Byte received

 var readByteTask = stream.ReadAsync(buf, 0, 1, token); 

The standby timer has expired

 var keepAliveTimerTask = Task.Delay(TimeSpan.FromSeconds(10), token); 

Message from network stream available

This is a little trickier. Your current code uses a BlockingCollection<T> , which is not compatible with asynchronous. I recommend switching to TPL Dataflow BufferBlock<T> or my own AsyncProducerConsumerQueue<T> , any of which can be used as async-compatible producer / consumer queues (which means that the producer can synchronize either asynchronously and the consumer can synchronize or asynchronously).

 BufferBlock<byte[]> SendQueue = new ...; ... var messageTask = SendQueue.ReceiveAsync(token); 

Then you can use Task.WhenAny to determine which of these tasks has been completed:

 var completedTask = await Task.WhenAny(readByteTask, keepAliveTimerTask, messageTask); 

Now you can get the results by comparing completedTask with others and await with them:

 if (completedTask == readByteTask) { // Throw an exception if there was a read error or cancellation. await readByteTask; var byte = buf[0]; ... // Continue reading readByteTask = stream.ReadAsync(buf, 0, 1, token); } else if (completedTask == keepAliveTimerTask) { // Throw an exception if there was a cancellation. await keepAliveTimerTask; ... // Restart keepalive timer. keepAliveTimerTask = Task.Delay(TimeSpan.FromSeconds(10), token); } else if (completedTask == messageTask) { // Throw an exception if there was a cancellation (or the SendQueue was marked as completed) byte[] message = await messageTask; ... // Continue reading messageTask = SendQueue.ReceiveAsync(token); } 
+2
source

I would go with your option two, expecting that any of 4 conditions will happen. Assuming you have 4 tasks as expected methods:

 var task1 = WaitForByteReceivedAsync(); var task2 = WaitForMessageAvailableAsync(); var task3 = WaitForKeepAliveTimerAsync(); var task4 = WaitForCommunicationCancelledAsync(); // now gather them IEnumerable<Task<bool>> theTasks = new List<IEnumerable<Task<bool>>>{ task1, task2, task3, task4 }; // Wait for any of the things to complete var result = await Task.WhenAny(theTasks); 

The above code will resume immediately after the completion of the first task and ignore the other 3.

Note:

The documentation for WhenAny says:

The returned task will always end in the RanToCompletion state, and its result will be set to the first task. This is true even if the first task completed in the Canceled or Failed state.

So, be sure to do a final check before trusting what happened:

 if(result.Result.Result == true) ... // First Result is the task, the second is the bool that the task returns 
+3
source

Cancel reading does not let you know if the data was read or not. Cancellation and reading are not atomic with respect to each other. This approach only works when closing the stream after canceling.

The line approach is better. You can create a related CancellationTokenSource that will be canceled whenever you want to. Instead of passing cts.Token you pass a token that you control.

You can then transfer this token based on time, another token, and any other event that you like. If you use the built-in timeout, the queue will internally do the same to associate the incoming token with the timeout.

+1
source

All Articles