Waiting for asynchronous workflow to cancel

The Cancel element of the CancellationTokenSource object "binds the cancellation request", which, I believe, means that it is fire and forgotten, and did not wait for the cancellation to complete (for example, all exception handlers were started). This is good, but I need to wait until the outstanding asynchronization is completely canceled before making another asynchronous call. Is there an easy way to do this?

+4
source share
2 answers

I don’t think there is a direct way to do this using standard library functions from asynchronous F # libraries. The closest us operation is Async.TryCancelled , which calls back when the workflow is canceled, but sending a message from the callback to the code that started the workflow must be done manually.

This is relatively easy to solve using events and the extension from the async F # extensions that I wrote (also included in the FSharpX package) - the GuardedAwaitObservable extension, which can be used to wait for an event to occur (which can be immediately triggered by some operation).

The following Async.StartCancellable method accepts an asynchronous workflow and returns Async<Async<unit>> . When you bind to an external workflow, it starts the argument (for example, Async.StartChild ), and when you bind to the returned internal workflow, it cancels the calculation and waits until it is actually canceled:

 open System.Threading module Async = /// Returns an asynchronous workflow 'Async<Async<unit>>'. When called /// using 'let!', it starts the workflow provided as an argument and returns /// a token that can be used to cancel the started work - this is an /// (asynchronously) blocking operation that waits until the workflow /// is actually cancelled let StartCancellable work = async { let cts = new CancellationTokenSource() // Creates an event used for notification let evt = new Event<_>() // Wrap the workflow with TryCancelled and notify when cancelled Async.Start(Async.TryCancelled(work, ignore >> evt.Trigger), cts.Token) // Return a workflow that waits for 'evt' and triggers 'Cancel' // after it attaches the event handler (to avoid missing event occurrence) let waitForCancel = Async.GuardedAwaitObservable evt.Publish cts.Cancel return async.TryFinally(waitForCancel, cts.Dispose) } 

EDIT Wrapped the result in TryFinally to get rid of the CancellationTokenSource , as John suggested. I think this should be enough to make sure that it is properly disposed of.

Here is an example that uses this method. The loop function is a simple workflow that I used for testing. The rest of the code runs it, waits 5.5 seconds, and then cancels it:

 /// Sample workflow that repeatedly starts and stops long running operation let loop = async { for i in 0 .. 9999 do printfn "Starting: %d" i do! Async.Sleep(1000) printfn "Done: %d" i } // Start the 'loop' workflow, wait for 5.5 seconds and then // cancel it and wait until it finishes current operation async { let! cancelToken = Async.StartCancellable(loop) printfn "started" do! Async.Sleep(5500) printfn "cancelling" do! cancelToken printfn "done" } |> Async.Start 

For completeness, the sample with the necessary definitions from FSharpX is here in F # fragments .

+4
source

This should not be difficult, given the easy-to-use synchronization primitives. I especially like write-once β€œlogical” variables:

 type Logic<'T> = new : unit -> Logic<'T> member Set : 'T -> unit member Await : Async<'T> 

It is easy to wrap Async to set a boolean variable upon completion, and then wait, for example:

 type IWork = abstract member Cancel : unit -> Async<unit> let startWork (work: Async<unit>) = let v = Logic<unit>() let s = new CancellationTokenSource() let main = async.TryFinally(work, fun () -> s.Dispose(); v.Set()) Async.Start(main, s.Token) { new IWork with member this.Cancel() = s.Cancel(); v.Await } 

A possible implementation of logical variables may be:

 type LogicState<'T> = | New | Value of 'T | Waiting of ('T -> unit) [<Sealed>] type Logic<'T>() = let lockRoot = obj () let mutable st = New let update up = let k = lock lockRoot <| fun () -> let (n, k) = up st st <- n k k () let wait (k: 'T -> unit) = update <| function | New -> (Waiting k, ignore) | Value value as st -> (st, fun () -> k value) | Waiting f -> (Waiting (fun x -> fx; kx), ignore) let await = Async.FromContinuations(fun (ok, _, _) -> wait ok) member this.Set<'T>(value: 'T) = update <| function | New -> (Value value, ignore) | Value _ as st -> (st, ignore) | Waiting f as st -> (Value value, fun () -> f value) member this.Await = await 
+4
source

All Articles