TPL FromAsync with TaskScheduler and TaskFactory

I am trying to create a task scheduler / ordered scheduler in conjunction with using TaskFactory.FromAsync .

I want to be able to disable web service requests (using FromAsync to use I / O completion ports), but keep them in order and only perform one run at any time.

I am not currently using FromAsync , so I can do TaskFactory.StartNew(()=>api.DoSyncWebServiceCall()) and rely on the OrderedTaskScheduler used by TaskFactory to ensure that only one request is inactive.

I assumed that this behavior will remain when using the FromAsync method, but it is not:

 TaskFactory<Stuff> taskFactory = new TaskFactory<Stuff>(new OrderedTaskScheduler()); var t1 = taskFactory.FromAsync((a, s) => api.beginGetStuff(a, s), a => api.endGetStuff(a)); var t2 = taskFactory.FromAsync((a, s) => api.beginGetStuff(a, s), a => api.endGetStuff(a)); var t3 = taskFactory.FromAsync((a, s) => api.beginGetStuff(a, s), a => api.endGetStuff(a)); 

All of these beginGetStuff methods beginGetStuff called in a FromAsync call (therefore, although they are sent in order, at the same time, n api calls are encountered).

There is FromAsync overload that accepts TaskScheduler:

 public Task FromAsync( IAsyncResult asyncResult, Action<IAsyncResult> endMethod, TaskCreationOptions creationOptions, TaskScheduler scheduler ) 

but the docs say:

TaskScheduler, which is used to schedule a task that runs the end method.

And as you can see, it accepts the already built IAsyncResult , not Func<IAsyncResult> .

Does this mean that the FromAsync custom method FromAsync or am I missing something? Can anyone suggest where to start this implementation?

Greetings

EDIT:

I want to distract this behavior from the caller, so in accordance with the behavior of the TaskFactory (with a specialized TaskScheduler ) I need the Task to be returned immediately - this Task not only encapsulates FromAsync Task, but also the sequence of this task while it is waiting for its progress.

One possible solution:

 class TaskExecutionQueue { private readonly OrderedTaskScheduler _orderedTaskScheduler; private readonly TaskFactory _taskFactory; public TaskExecutionQueue(OrderedTaskScheduler orderedTaskScheduler) { _orderedTaskScheduler = orderedTaskScheduler; _taskFactory = new TaskFactory(orderedTaskScheduler); } public Task<TResult> QueueTask<TResult>(Func<Task<TResult>> taskGenerator) { return _taskFactory.StartNew(taskGenerator).Unwrap(); } } 

However, this uses the thread while FromAsync is called. Ideally, I did not need to do this.

+4
source share
3 answers

I decided to create my own solution here ... The locks are dirty and unwanted, but at the moment it does the work that I want.

 public interface ITaskExecutionQueue { Task<TResult> QueueTask<TResult>(Func<Task<TResult>> taskGenerator); Task<TResult> QueueTask<TResult>(Task<Task<TResult>> taskGenerator); int OutstandingTaskCount { get; } event EventHandler OutstandingTaskCountChanged; } /// This class ensures that only a single Task is executed at any one time. They are executed sequentially in order being queued. /// The advantages of this class over OrderedTaskScheduler is that you can use any type of Task such as FromAsync (I/O Completion ports) /// which are not able to be scheduled using a traditional TaskScheduler. /// Ensure that the `outer` tasks you queue are unstarted. Eg <![CDATA[ /// _taskExeQueue.QueueTask(new Task<Task<TResult>>(() => StartMyRealTask())); /// ]]> class OrderedTaskExecutionQueue : ITaskExecutionQueue { private readonly Queue<Task> _queuedTasks = new Queue<Task>(); private Task _currentTask; private readonly object _lockSync = new object(); /// <summary> /// Queues a task for execution /// </summary> /// <typeparam name="TResult"></typeparam> /// <param name="taskGenerator">An unstarted Task that creates your started real-work task</param> /// <returns></returns> public Task<TResult> QueueTask<TResult>(Func<Task<TResult>> taskGenerator) { return QueueTask(new Task<Task<TResult>>(taskGenerator)); } public Task<TResult> QueueTask<TResult>(Task<Task<TResult>> taskGenerator) { Task<TResult> unwrapped = taskGenerator.Unwrap(); unwrapped.ContinueWith(_ => { EndTask(); StartNextTaskIfQueued(); }, TaskContinuationOptions.ExecuteSynchronously); lock (_lockSync) { _queuedTasks.Enqueue(taskGenerator); if (_currentTask == null) { StartNextTaskIfQueued(); } } TaskCompletionSource<TResult> tcs = new TaskCompletionSource<TResult>(); tcs.TrySetFromTaskIncomplete(unwrapped); OutstandingTaskCountChanged.Raise(this); return tcs.Task; } private void EndTask() { lock (_lockSync) { _currentTask = null; _queuedTasks.Dequeue(); } OutstandingTaskCountChanged.Raise(this); } private void StartNextTaskIfQueued() { lock (_lockSync) { if (_queuedTasks.Count > 0) { _currentTask = _queuedTasks.Peek(); _currentTask.RunSynchronously(); } } } /// <summary> /// Includes the currently executing task. /// </summary> public int OutstandingTaskCount { get { lock (_lockSync) { return _queuedTasks.Count; } } } public event EventHandler OutstandingTaskCountChanged; } 

It accepts a non-standard Task<Task<TResult>> - this allows the queue to decide when to execute it and start a FromAsync call (which is an internal task). Using:

 Task<Task<TResult>> queueTask = new Task<Task<TResult>>(() => Task.Factory.FromAsync(beginAction, endAction)); Task<TResult> asyncCallTask = _taskExecutionQueue.QueueTask(queueTask); 
0
source

You cannot schedule I / O tasks because they do not have a thread associated with them. The Windows kernel provides risk-free I / O. Running these TaskScheduler does not include managed code, and the TaskScheduler class TaskScheduler not included in the game.

Therefore, you must delay the start of IO until you are sure you really want the network to be removed. You can use SemaphoreSlim.WaitAsync to reduce the number of tasks currently being performed. Expect the result of this method before starting a separate IO and expect this.

+2
source

The easiest way to do this is to use the TPL data stream .

You can define a โ€œblockโ€ that receives a stream of asynchronous delegates and executes them one at a time (until each of them is completed before the next one starts):

 var block = new ActionBlock<Func<Task>>(func => func()); 

Then, to turn off the web service request:

 block.Post(() => Task.Factory.FromAsync(...)); 

or (which I prefer):

 block.Post(() => client.GetStuffAsync(a, b, c)); 

The ActionBlock approach ActionBlock good if you just want to complete tasks. If you want to create an output stream, look at the TransformBlock :

 var block = new TransformBlock<Func<Task<Stuff>>, Stuff>(func => func()); 

You run your queries the same way, and you can get the results by calling Receive or ReceiveAsync .

+2
source

All Articles