Demonstrator Example with 1 Publisher and 4 Concurrent Consumers

In this example, https://stackoverflow.com/a/3208758/ and here Why is my disruptor example so slow? (at the end of the question) there is 1 publisher who publishes articles and 1 consumer.

But in my case, consumer work is much more complicated and takes some time. Therefore, I want 4 consumers to process data in parallel.

So, for example, if the manufacturer produces numbers: 1,2,3,4,5,6,7,8,9,10,11 ..

I want consumer1 to catch 1,5,9, ... consumer2 to catch 2,6,10, ... consumer3 to catch 3,7,11, ... consumer4 to catch 4,8,12 ... (well, not quite these numbers, the idea is that the data should be processed in parallel, I don’t care which particular number is processed by which consumer)

And remember that this must be done in parallel, because in real work the application is quite expensive. I expect consumers to run in different threads to use the power of multi-core systems.

Of course, I can just create 4 ringbuffers and attach 1 consumer to 1 ring buffer. That way I can use the original example. But I feel that it will be wrong. It would probably be right to create 1 publisher (1 ringbuffer) and 4 consumers - since that is what I need.

Adding a link to a very simulating question in google groups: https://groups.google.com/forum/#!msg/lmax-disruptor/-CLapWuwWLU/GHEP4UkxrAEJ

So, we have two options:

  • one ring of many consumers (each consumer will "wake up" on each addition, each consumer must have the same WaitStrategy).
  • a lot of "one ring - one consumer" (each consumer will wake up only on the data that he needs to process, each consumer can have his own WaitStrategy).
+6
source share
2 answers

EDIT . I forgot to mention that the code is partially taken from the FAQ . I don't know if this approach was better or worse than Frank's suggestion.

The project is heavily documented, which is a shame how it looks good.
Anyway, try the following snapshot (based on your first link) - checked for mono and seems to be in order:

using System; using System.Threading.Tasks; using Disruptor; using Disruptor.Dsl; namespace DisruptorTest { public sealed class ValueEntry { public long Value { get; set; } } public class MyHandler : IEventHandler<ValueEntry> { private static int _consumers = 0; private readonly int _ordinal; public MyHandler() { this._ordinal = _consumers++; } public void OnNext(ValueEntry data, long sequence, bool endOfBatch) { if ((sequence % _consumers) == _ordinal) Console.WriteLine("Event handled: Value = {0}, event {1} processed by {2}", data.Value, sequence, _ordinal); else Console.WriteLine("Event {0} rejected by {1}", sequence, _ordinal); } } class Program { private static readonly Random _random = new Random(); private const int SIZE = 16; // Must be multiple of 2 private const int WORKERS = 4; static void Main() { var disruptor = new Disruptor.Dsl.Disruptor<ValueEntry>(() => new ValueEntry(), SIZE, TaskScheduler.Default); for (int i=0; i < WORKERS; i++) disruptor.HandleEventsWith(new MyHandler()); var ringBuffer = disruptor.Start(); while (true) { long sequenceNo = ringBuffer.Next(); ringBuffer[sequenceNo].Value = _random.Next();; ringBuffer.Publish(sequenceNo); Console.WriteLine("Published entry {0}, value {1}", sequenceNo, ringBuffer[sequenceNo].Value); Console.ReadKey(); } } } } 
+1
source

From the specifications of the ring buffer, you will see that each consumer will try to process your ValueEvent . in your case you do not need it.

I solved it like this:

Add the field processed to your ValueEvent , and when the consumer accepts the event that he will check in this field, if he has already processed, he proceeds to the next field.

Not the most beautiful way, but how the buffer works.

0
source

All Articles