Wikipedia is not to blame here, it's the result of how Async.Parallel works internally. A typical signature for Async.Parallel is seq<Async<'T>> -> Async<'T[]> . It returns a single Async value containing all the results from the sequence, so it does not return until all calculations in seq<Async<'T>> are returned.
To illustrate this, I modified your code to track the number of outstanding requests, that is, requests that were sent to the server but have not yet received / not processed the response.
open Microsoft.FSharp.Control open Microsoft.FSharp.Control.WebExtensions open System open System.Net open System.Threading type WebClientWithTimeout() = inherit WebClient() let mutable timeout = -1 member __.Timeout with get () = timeout and set value = timeout <- value override x.GetWebRequest uri = let r = base.GetWebRequest(uri) r.Timeout <- x.Timeout r type ParsedDoc = ParsedDoc type ParsedArticle = ParsedArticle let parseDoc (str : string) = ParsedDoc let parseArticle (doc : ParsedDoc) = Some ParsedArticle /// A synchronized wrapper around Console.Out so we don't /// get garbled console output. let synchedOut = System.Console.Out |> System.IO.TextWriter.Synchronized let parseWikiAsync(url : string, outstandingRequestCount : int ref) = async { use wc = new WebClientWithTimeout(Timeout = 5000) wc.Headers.Add ("User-Agent", "Friendly Bot 1.0 ( FriendlyBot@friendlybot.com )") // Increment the outstanding request count just before we send the request. do // NOTE : The message must be created THEN passed to synchedOut.WriteLine -- // piping it (|>) into synchedOut.WriteLine or using fprintfn causes a closure // to be created which somehow defeats the synchronization and garbles the output. let msg = Interlocked.Increment outstandingRequestCount |> sprintf "Outstanding requests: %i" synchedOut.WriteLine msg let! html = wc.AsyncDownloadString(Uri(url)) let ret = try html |> parseDoc |> parseArticle with ex -> let msg = sprintf "%A" ex synchedOut.WriteLine msg None // Decrement the outstanding request count now that we've // received a reponse and parsed it. do let msg = Interlocked.Decrement outstandingRequestCount |> sprintf "Outstanding requests: %i" synchedOut.WriteLine msg return ret } /// Writes a message to the console, passing a value through /// so it can be used within a function pipeline. let inline passThruWithMessage (msg : string) value = Console.WriteLine msg value let en100 = let outstandingRequestCount = ref 0 seq { for _ in 1..120 -> parseWikiAsync("http://en.wikipedia.org/wiki/Special:Random", outstandingRequestCount) } |> Async.Parallel |> Async.RunSynchronously |> passThruWithMessage "Finished running all of the requests." |> Seq.choose id |> Seq.take 100
If you compile and run this code, you will see the output as follows:
Outstanding requests: 4 Outstanding requests: 2 Outstanding requests: 1 Outstanding requests: 3 Outstanding requests: 5 Outstanding requests: 6 Outstanding requests: 7 Outstanding requests: 8 Outstanding requests: 9 Outstanding requests: 10 Outstanding requests: 12 Outstanding requests: 14 Outstanding requests: 15 Outstanding requests: 16 Outstanding requests: 17 Outstanding requests: 18 Outstanding requests: 13 Outstanding requests: 19 Outstanding requests: 20 Outstanding requests: 24 Outstanding requests: 22 Outstanding requests: 26 Outstanding requests: 27 Outstanding requests: 28 Outstanding requests: 29 Outstanding requests: 30 Outstanding requests: 25 Outstanding requests: 21 Outstanding requests: 23 Outstanding requests: 11 Outstanding requests: 29 Outstanding requests: 28 Outstanding requests: 27 Outstanding requests: 26 Outstanding requests: 25 Outstanding requests: 24 Outstanding requests: 23 Outstanding requests: 22 Outstanding requests: 21 Outstanding requests: 20 Outstanding requests: 19 Outstanding requests: 18 Outstanding requests: 17 Outstanding requests: 16 Outstanding requests: 15 Outstanding requests: 14 Outstanding requests: 13 Outstanding requests: 12 Outstanding requests: 11 Outstanding requests: 10 Outstanding requests: 9 Outstanding requests: 8 Outstanding requests: 7 Outstanding requests: 6 Outstanding requests: 5 Outstanding requests: 4 Outstanding requests: 3 Outstanding requests: 2 Outstanding requests: 1 Outstanding requests: 0 Finished running all of the requests.
As you can see, all requests are executed before any of them are parsed - so if you use a slower connection or try to get a large number of documents, the server may drop the connection because it may assume that you are not receiving a response that he is trying to send. Another problem with the code is that you need to explicitly specify the number of elements to generate in seq , which makes the code less reusable.
The best solution would be to get and analyze the pages, as they are needed for some consumption code. (And if you think about it, then that suits F # seq .) Let's start by creating a function that takes Uri and creates a seq<Async<'T>> , i.e. Creates an endless sequence of Async<'T> , each of which will extract content from Uri, parse it and return a result.
/// Given a Uri, creates an infinite sequence of whose elements are retrieved /// from the Uri. let createDocumentSeq (uri : System.Uri) =
Now we use this function to retrieve pages as a stream:
// let en100_Streaming =
If you run this code, you will see that it pulls the results one by one from the server and does not leave outstanding requests. In addition, it is very easy to change the number of results you want to get - all you have to do is change the value that you pass to Seq.take .
Now that this stream code is working fine, it does not execute requests in parallel, so it can be slow for a lot of documents. This is an easy fix to fix, although the solution may be a little unintuitive. Instead of trying to execute the entire query sequence in parallel - which is a problem in the source code, let me create a function that uses Async.Parallel to execute small parallel query packets, then use Seq.collect to combine the results into a flat sequence.
To use this feature, we just need some small tweaks for the code from the streaming version:
let en100_Batched = let batchSize = 10 #if DEBUG let documentCount = ref 0 #endif Uri ("http://en.wikipedia.org/wiki/Special:Random") |> createDocumentSeq // Execute batches in parallel |> parallelBatch batchSize |> Seq.choose (Option.bind (parseDoc >> parseArticle)) #if DEBUG |> Seq.map (fun x -> let msg = Interlocked.Increment documentCount |> sprintf "Parsed documents: %i" synchedOut.WriteLine msg x) #endif |> Seq.take 50 // None of the computations actually take place until // this point, because Seq.toArray forces evaluation of the sequence. |> Seq.toArray
Again, it’s easy to change the number of documents you want to receive, and the batch size can be easily changed (again, I suggest you keep it small enough). If you wanted to, you can make several settings for the "streaming" and "batch" code so that you can switch between them at runtime.
Last: with my code, requests should not be timeouts, so you can probably get rid of the WebClientWithTimeout class and just use WebClient directly.