How to pass async method inside Observable.Do method?

Given:

  • IObservable<T> src
  • async Task F(T){...}.
  • Fcan only be called sequentially. So, await F(x);await F(y);okay, but it Task.Factory.ContinueWhenAll(new[]{F(x),F(y)}, _ => {...});is not true because F(x), and F(y)should not be run at the same time.

It is clear to me that it is await src.Do(F)wrong, because it will run Fsimultaneously.

My question is: how to do it right?

+4
source share
2 answers

The easiest way is to use TPL Dataflow, which by default will throttle to one simultaneous call, even for asynchronous methods:

var block = new ActionBlock<T>(F);
src.Subscribe(block.AsObserver());
await block.Completion;

Alternatively, you can subscribe to an asynchronous method that you activate yourself:

var semaphore = new SemaphoreSlim(1);
src.Do(async x =>
{
  await semaphore.WaitAsync();
  try
  {
    await F(x);
  }
  finally
  {
    semaphore.Release();
  }
});
+1
source

Observable.Do , . SelectMany , . Rx 2.0, SelectMany, Task<T>. ( concurrency, Task/Observable coop.)

var q = from value in src
        from _ in F(value.X).AsVoidAsync()  // See helper definition below
        from __ in F(value.Y).AsVoidAsync()
        select value;

, , Do, , src , , F . , SelectMany , Select->Merge; , , Select->Concat.

// using System.Reactive.Threading.Tasks

var q = src.Select(x => Observable.Defer(() => F(x).ToObservable())).Concat();

Defer, F(x) .

AsVoidAsync:

IObservable<T> T, Task void, Rx , a Task<T> a Task. Rx System.Reactive.Unit struct T:

public static class TaskExtensions
{
  public static Task<Unit> AsVoidAsync(this Task task)
  {
    return task.ContinueWith(t =>
    {
      var tcs = new TaskCompletionSource<Unit>();

      if (t.IsCanceled)
      {
        tcs.SetCanceled();
      }
      else if (t.IsFaulted)
      {
        tcs.SetException(t.Exception);
      }
      else
      {
        tcs.SetResult(Unit.Default);
      }

      return tcs.Task;
    },
    TaskContinuationOptions.ExecuteSynchronously)
    .Unwrap();
  }
}

, ToObservable.

+6

All Articles