Asynchronous barrier in F #

I wrote a program in F # that asynchronously lists all directories on a disk. An asynchronous task lists all the files in a given directory and creates separate asynchronous tasks (daemons: I run them using Async.Start) to display subdirectories. All of them transmit the results to the central mail server.

My problem is how I found that all daemon tasks are complete and there will be no more files. In fact, I need a barrier for all tasks that are (direct and indirect) children of my main task. I could not find anything like this in the F # async model.

Instead, I created a separate mailbox where I register the start and end of each task. When the active number is counted to zero, I am done. But I am not happy with this decision. Any other suggestions?

+5
source share
5 answers

Have you tried using Async.Parallel? That is, instead of Async.Starteach subdirectory, simply combine the tasks of the subdirectory into one asynchronous one using Async.Parallel. Then you get the (nested) fork-join task, which can RunSynchronouslywait for the final result.

EDIT

Here is an approximate code that shows the essence, if not the full detail:

open System.IO

let agent = MailboxProcessor.Start(fun mbox ->
    async {
        while true do
            let! msg = mbox.Receive()
            printfn "%s" msg
    })

let rec traverse dir =
    async {
        agent.Post(dir)
        let subDirs = Directory.EnumerateDirectories(dir)
        return! [for d in subDirs do yield traverse d] 
                 |> Async.Parallel |> Async.Ignore 
    }

traverse "d:\\" |> Async.RunSynchronously
// now all will be traversed, 
// though Post-ed messages to agent may still be in flight

EDIT 2

Here is a wait version that uses answers:

open System.IO

let agent = MailboxProcessor.Start(fun mbox ->
    async {
        while true do
            let! dir, (replyChannel:AsyncReplyChannel<unit>) = mbox.Receive()
            printfn "%s" dir
            replyChannel.Reply()
    })

let rec traverse dir =
    async {
        let r = agent.PostAndAsyncReply(fun replyChannel -> dir, replyChannel)
        let subDirs = Directory.EnumerateDirectories(dir)
        do! [for d in subDirs do yield traverse d] 
                 |> Async.Parallel |> Async.Ignore 
        do! r // wait for Post to finish
    }

traverse "c:\\Projects\\" |> Async.RunSynchronously
// now all will be traversed to completion 
+7
source

Interlocked / , , . MailboxProcessors.

+1

Task.Factory.StartNew() Task.WaitAll().

+1

, , , , , . ... ( , - F #, )

open System.IO

let rec traverse dir =
seq {
    let subDirs = Directory.EnumerateDirectories(dir)
    yield dir 
    for d in subDirs do
        yield! traverse d

}

, , Async F # "" , .

+1

Just for clarification: I thought there might be a better solution, similar to what can be done in the Chapel. There you have the instruction "sync", a barrier waiting for the completion of all tasks generated in the application. Here is an example from the chapel manual:

def concurrentUpdate(tree: Tree) {
    if requiresUpdate(tree) then
        begin update(tree);
    if !tree.isLeaf {
        concurrentUpdate(tree.left);
        concurrentUpdate(tree.right);
    }
}
sync concurrentUpdate(tree);

The "begin" statement creates a task that runs in parallel, somewhat similar to the F # "async" block with Async.Start.

0
source

All Articles