How to use IObservable / IObserver with ConcurrentQueue or ConcurrentStack

I realized that when I try to process items in a parallel queue using multiple threads, while multiple threads can put items into it, the ideal solution would be to use Reactive Extensions with parallel data structures.

My original question is:

When using ConcurrentQueue trying to de-synchronize in parallel loop

So I'm curious if there is a way to get a LINQ (or PLINQ) query that will be permanently deactivated as the elements fit into it.

I am trying to get this to work in such a way that I can have n number of producers who queue and a limited number of threads to process, so I do not overload the database.

If I could use the Rx structure, then I expected that I could just start it, and if 100 elements were placed within 100 ms, then 20 threads that are part of the PLINQ request would simply be processed through the queue.

There are three technologies that I try to work together:

  • Rx Framework (Reactive LINQ)
  • Pling
  • System.Collections.Concurrent structure
+5
source share
2 answers

, , ConcurrentQueue, , , BlockingCollection. , . 7 * http://www.amazon.co.uk/Parallel-Programming-Microsoft-NET-Decomposition/dp/0735651590/ref=sr_1_1?ie=UTF8&qid=1294319704&sr=8-1 , BlockingCollection , "". GetConsumingEnumerable() , , .ToObservable().

* .

:

, , , , ?

class Program
{
    private static ManualResetEvent _mre = new ManualResetEvent(false);
    static void Main(string[] args)
    {
        var theQueue = new BlockingCollection<string>();
        theQueue.GetConsumingEnumerable()
            .ToObservable(Scheduler.TaskPool)
            .Subscribe(x => ProcessNewValue(x, "Consumer 1", 10000000));

        theQueue.GetConsumingEnumerable()
            .ToObservable(Scheduler.TaskPool)
            .Subscribe(x => ProcessNewValue(x, "Consumer 2", 50000000));

        theQueue.GetConsumingEnumerable()
            .ToObservable(Scheduler.TaskPool)
            .Subscribe(x => ProcessNewValue(x, "Consumer 3", 30000000));


        LoadQueue(theQueue, "Producer A");
        LoadQueue(theQueue, "Producer B");
        LoadQueue(theQueue, "Producer C");

        _mre.Set();

        Console.WriteLine("Processing now....");

        Console.ReadLine();
    }

    private static void ProcessNewValue(string value, string consumerName, int delay)
    {
        Thread.SpinWait(delay);
        Console.WriteLine("{1} consuming {0}", value, consumerName);
    }

    private static void LoadQueue(BlockingCollection<string> target, string prefix)
    {
        var thread = new Thread(() =>
                                    {
                                        _mre.WaitOne();
                                        for (int i = 0; i < 100; i++)
                                        {
                                            target.Add(string.Format("{0} {1}", prefix, i));
                                        }
                                    });
        thread.Start();
    }
}
+6

, Rx, BlockingCollection<T> -. , ConcurrentQueue<T>. Task, , Parallel::ForEach BlockingCollection<T> , . , , GetConsumingPartitioner ParallelExtensions, , , . .

, CompleteAdding BlockingCollection<T> Task::Wait Task, , , .

+3

All Articles