How to limit the sequence of consumption reactive?

We have an application in which we have a materialized array of elements that we will process through the jet pipeline. It's a bit like this

EventLoopScheduler eventLoop = new EventLoopScheduler();
IScheduler concurrency = new TaskPoolScheduler(
    new TaskFactory(
        new LimitedConcurrencyLevelTaskScheduler(threadCount)));
IEnumerable<int> numbers = Enumerable.Range(1, itemCount);

// 1. transform on single thread
IConnectableObservable<byte[]> source = 
    numbers.Select(Transform).ToObservable(eventLoop).Publish();

// 2. naive parallelization, restricts parallelization to Work 
// only; chunk up sequence into smaller sequences and process
// in parallel, merging results
IObservable<int> final = source.
    Buffer(10).
    Select(
        batch =>
        batch.
        ToObservable(concurrency).
        Buffer(10).
        Select(
            concurrentBatch =>
            concurrentBatch.
            Select(Work).
            ToArray().
            ToObservable(eventLoop)).
        Merge()).
    Merge();

final.Subscribe();

source.Connect();
Await(final).Wait();

If you're really interested in playing with this, the stand-in methods look like

private async static Task Await(IObservable<int> final)
{
    await final.LastOrDefaultAsync();
}

private static byte[] Transform(int number)
{
    if (number == itemCount)
    {
        Console.WriteLine("numbers exhausted.");
    }
    byte[] buffer = new byte[1000000];
    Buffer.BlockCopy(bloat, 0, buffer, 0, bloat.Length);
    return buffer;
}

private static int Work(byte[] buffer)
{
    Console.WriteLine("t {0}.", Thread.CurrentThread.ManagedThreadId);
    Thread.Sleep(50);
    return 1;
}

A little explanation. Range(1, itemCount)simulates the original inputs received from the data source. Transformimitates the enrichment process, each input of which must pass, and leads to an increase in memory. Workis a “long” process that works with a converted input.

, , , Work. (10 ), (threadCount).

, 5 50 Transform ; , , 1 , , 50 .

, , . , Reactive numbers Transform ( numbers exhausted.), (@1GB 1000 itemCount).

: , (.. , )?

: ; -, , paulpdaniels Enigmativity Work(Transform) ( , , , ), : .. .

+4
3

, .

:

IEnumerable<int> numbers = Enumerable.Range(1, itemCount);

Enumerable.Range, , numbers.Select(Transform) numbers , . Rx - , .

:

final.Subscribe();

source.Connect();
Await(final).Wait();

final.Subscribe() Await(final).Wait();, final.

source.Connect(), .

, , , , - .

:

IObservable<int> final =
    Observable
        .Range(1, itemCount)
        .Select(n => Transform(n))
        .Select(bs => Work(bs));

. , 20 1 .

. Work Transform, , .

concurrency.

IObservable<int> final =
    Observable
        .Range(1, itemCount)
        .Select(n => Transform(n))
        .SelectMany(bs => Observable.Start(() => Work(bs)));

20 0.284 , 5 . . Observable.Start, .

concurrency.

IObservable<int> final =
    Observable
        .Range(1, itemCount)
        .Select(n => Transform(n))
        .SelectMany(bs => Observable.Start(() => Work(bs), concurrency));

20 0,5 . , . , concurrency . , .

, , . , Transform(...) , Work(...), .

, :

IObservable<int> final =
    Observable
        .Range(1, itemCount)
        .SelectMany(n => Observable.Start(() => Work(Transform(n)), concurrency));

. , , , , Work(Transform(...)) .

+5

, , , , , . Rx , , , . , Rx - ; .

PLinq, Parallel.ForEach DataFlow? .

+4

@JamesWorld , , PLinq , , .

, Merge :

var source = numbers
  .Select(n => 
          Observable.Defer(() => Observable.Start(() => Work(Transform(n)), concurrency)))
  //Maximum concurrency
  .Merge(10)
  //Schedule all the output back onto the event loop scheduler
  .ObserveOn(eventLoop);

(, ), , Defer Merge, , x . Start() , . , Transform Work, Start.

await a Observable, , ..

await source; //== await source.LastAsync();
+2

All Articles