C # One writer Many readers read exactly once

I have 4 threads. One reads some information from the network, writing it into a variable, and should signal after each part. 3 of them read this variable and must read it exactly once. The current solution is to record the event script after it is written and wait for the readers events. Readers wait for the event, then read and set their events (which means they read). The problem is that readers can read more than once, and I have duplicates in them. How can I reach the rule of reading readers exactly once?

+4
source share
5 answers

One way to implement this is as follows

Data is shared between streams as a singly linked list. Each node in the list can be a marker or have data. The list starts as a single node, which is entered into the marker. When data is read, a new list is formed that has a series of data nodes followed by a marker. This list is added to the last marker added to the list.

Each reader thread begins with a reference to the source node marker and AutoResetEvent . Whenever a new fragment is written to the device, it will signal an AutoResetEvent for each stream of readers. Then the reader thread will just walk until it finds a marker without the next node.

This scheme ensures that all readers will see the data only once. The biggest complication is creating a list so that it can be written and read with a lock. This is pretty straight forward with Interlocked.CompareExchange though

Linked List Type

 class Node<T> { public bool IsMarker; public T Data; public Node<T> Next; } 

Record Sample Type

 class Writer<T> { private List<AutoResetEvent> m_list; private Node<T> m_lastMarker; public Writer(List<AutoResetEvent> list, Node<T> marker) { m_lastMarker = marker; m_list = list; } // Assuming this can't overlap. If this can overload then you will // need synchronization in this method around the writing of // m_lastMarker void OnDataRead(T[] items) { if (items.Length == 0) { return; } // Build up a linked list of the new data followed by a // Marker to signify the end of the data. var head = new Node<T>() { Data = items[0] }; var current = head; for (int i = 1; i < items.Length; i++) { current.Next = new Node<T>{ Data = items[i] }; current = current.Next; } var marker = new Node<T> { IsMarker = true }; current.Next = marker; // Append the list to the end of the last marker node the writer // created m_lastMarker.Next = head; m_lastMarker = marker; // Tell each of the readers that there is new data foreach (var e in m_list) { e.Set(); } } } 

Reader Type Example

 class Reader<T> { private AutoResetEvent m_event; private Node<T> m_marker; void Go() { while(true) { m_event.WaitOne(); var current = m_marker.Next; while (current != null) { if (current.IsMarker) { // Found a new marker. Always record the marker because it may // be the last marker in the chain m_marker = current; } else { // Actually process the data ProcessData(current.Data); } current = current.Next; } } } } 
+2
source

I agree with the comment, which says that you must encode consumer streams in order to accept the possibility of getting the same value several times. Perhaps the easiest way to do this is to add a consistent identifier for each update. In this way, the thread can compare the sequential identifier with the last identifier that it reads, and know if it receives a duplicate.

He would also know if he missed the meaning.

But if you really need them to be in lock mode and only get a value once, I would suggest that you use two objects, ManualResetEvent and CountdownEvent . Here's how to use them.

 ManualResetEvent DataReadyEvent = new ManualResetEvent(); ManualResetEvent WaitForResultEvent = new ManualResetEvent(); CountdownEvent Acknowledgement = new CountdownEvent(NumWaitingThreads); 

Reader threads are waiting on a DataReadyEvent .

When another thread reads a value from the network, it does this:

 Acknowledgement.Reset(NumWaitingThreads); DataReadyEvent.Set(); // signal waiting threads to process Acknowledgement.WaitOne(); // wait for all threads to signal they got it. DataReadyEvent.Reset(); // block threads' reading WaitForResultEvent.Set(); // tell threads they can continue 

Pending threads do this:

 DataReadyEvent.WaitOne(); // wait for value to be available // read the value Acknowledgement.Set(); // acknowledge receipt WaitForResultEvent.WaitOne(); // wait for signal to proceed 

This has the same effect as two events on the waiting thread, but is much simpler.

This has a drawback, however, that if the flow falls, it will hang in the countdown event. But then your method will also be if the producer thread is expecting all messages from the thread.

+1
source

This is suitable for the Barrier class .

You can use two Barriers to trigger between two states.

Here is an example:

 using System; using System.Threading; using System.Threading.Tasks; namespace Demo { internal class Program { private static void Main(string[] args) { int readerCount = 4; Barrier barrier1 = new Barrier(readerCount + 1); Barrier barrier2 = new Barrier(readerCount + 1); for (int i = 0; i < readerCount; ++i) { Task.Factory.StartNew(() => reader(barrier1, barrier2)); } while (true) { barrier1.SignalAndWait(); // Wait for all threads to reach the "new data available" point. if ((value % 10000) == 0) // Print message every so often. Console.WriteLine(value); barrier2.SignalAndWait(); // Wait for the reader threads to read the current value. ++value; // Produce the next value. } } private static void reader(Barrier barrier1, Barrier barrier2) { int expected = 0; while (true) { barrier1.SignalAndWait(); // Wait for "new data available". if (value != expected) { Console.WriteLine("Expected " + expected + ", got " + value); } ++expected; barrier2.SignalAndWait(); // Signal that we've read the data, and wait for all other threads. } } private static volatile int value; } } 
+1
source

I would recommend ConcurrentQueue - it ensures that each thread gets a unique instance from the queue. Here is a good explanation of how to use it.

ConnurrentQueue<T>.TryDequeue() is a thread-safe method that checks if a queue is empty and if it does not accept an element from the queue. Since he performs both operations at once, programmers do not have to worry about the state of the race.

0
source

I think I found a way. I created 2 AutoResetEvent arrays, and each reader has 2 events, wait for the record and set the read event, and the writer set all the record events and wait for all the events read.

JaredPar, your answer was helpful and helped me

0
source

All Articles