F # Asynchronous Suspension

I have a fairly simple F # asynchronous code to download a hundred random Wikipedia articles (for research).

For some reason, the code crashes at arbitrary points in time during loading. Sometimes it's after 50, sometimes it's after 80.

Asynchronous code itself is pretty simple:

let parseWikiAsync(url:string, count:int ref) = async { use wc = new WebClientWithTimeout(Timeout = 5000) let! html = wc.AsyncDownloadString(Uri(url)) let ret = try html |> parseDoc |> parseArticle with | ex -> printfn "%A" ex; None lock count (fun () -> if !count % 10 = 0 then printfn "%d" !count count := !count + 1 ) return ret } 

Because I could not understand what the problem was, I created a WebClientWithTimeout, a System.Net.WebClient wrapper that allows me to specify a timeout:

 type WebClientWithTimeout() = inherit WebClient() member val Timeout = 60000 with get, set override x.GetWebRequest uri = let r = base.GetWebRequest(uri) r.Timeout <- x.Timeout r 

And then I use asynchronous combinators to retrieve a little over a hundred pages and filter out all the articles that return parseWikiAsync calls that return None (most of which are “value pages”) until I have exactly 100 articles:

 let en100 = let count = ref 0 seq { for _ in 1..110 -> parseWikiAsync("http://en.wikipedia.org/wiki/Special:Random", count) } |> Async.Parallel |> Async.RunSynchronously |> Seq.choose id |> Seq.take 100 

When I compile the code and run it in the debugger, there are only three threads, of which only one works with the actual code - the Async pipeline. The other two are "unavailable" for location and nothing in the call stack.

I think this means that it is not stuck in AsyncDownloadString or anywhere parseWikiAsync. What else could be causing this?

Oh, also, it takes a full minute first before the asynchronous code starts. After that, he walks quite reasonably, until he hangs again endlessly.

Here's the call stack for the main thread:

 > mscorlib.dll!System.Threading.WaitHandle.InternalWaitOne(System.Runtime.InteropServices.SafeHandle waitableSafeHandle, long millisecondsTimeout, bool hasThreadAffinity, bool exitContext) + 0x22 bytes mscorlib.dll!System.Threading.WaitHandle.WaitOne(int millisecondsTimeout, bool exitContext) + 0x28 bytes FSharp.Core.dll!Microsoft.FSharp.Control.AsyncImpl.ResultCell<Microsoft.FSharp.Control.AsyncBuilderImpl.Result<Microsoft.FSharp.Core.FSharpOption<Program.ArticleData>[]>>.TryWaitForResultSynchronously(Microsoft.FSharp.Core.FSharpOption<int> timeout) + 0x36 bytes FSharp.Core.dll!Microsoft.FSharp.Control.CancellationTokenOps.RunSynchronously<Microsoft.FSharp.Core.FSharpOption<Program.ArticleData>[]>(System.Threading.CancellationToken token, Microsoft.FSharp.Control.FSharpAsync<Microsoft.FSharp.Core.FSharpOption<Program.ArticleData>[]> computation, Microsoft.FSharp.Core.FSharpOption<int> timeout) + 0x1ba bytes FSharp.Core.dll!Microsoft.FSharp.Control.FSharpAsync.RunSynchronously<Microsoft.FSharp.Core.FSharpOption<Program.ArticleData>[]>(Microsoft.FSharp.Control.FSharpAsync<Microsoft.FSharp.Core.FSharpOption<Program.ArticleData>[]> computation, Microsoft.FSharp.Core.FSharpOption<int> timeout, Microsoft.FSharp.Core.FSharpOption<System.Threading.CancellationToken> cancellationToken) + 0xb9 bytes WikiSurvey.exe!<StartupCode$WikiSurvey> .$Program.main@ () Line 97 + 0x55 bytes F# 
+4
source share
2 answers

Wikipedia is not to blame here, it's the result of how Async.Parallel works internally. A typical signature for Async.Parallel is seq<Async<'T>> -> Async<'T[]> . It returns a single Async value containing all the results from the sequence, so it does not return until all calculations in seq<Async<'T>> are returned.

To illustrate this, I modified your code to track the number of outstanding requests, that is, requests that were sent to the server but have not yet received / not processed the response.

 open Microsoft.FSharp.Control open Microsoft.FSharp.Control.WebExtensions open System open System.Net open System.Threading type WebClientWithTimeout() = inherit WebClient() let mutable timeout = -1 member __.Timeout with get () = timeout and set value = timeout <- value override x.GetWebRequest uri = let r = base.GetWebRequest(uri) r.Timeout <- x.Timeout r type ParsedDoc = ParsedDoc type ParsedArticle = ParsedArticle let parseDoc (str : string) = ParsedDoc let parseArticle (doc : ParsedDoc) = Some ParsedArticle /// A synchronized wrapper around Console.Out so we don't /// get garbled console output. let synchedOut = System.Console.Out |> System.IO.TextWriter.Synchronized let parseWikiAsync(url : string, outstandingRequestCount : int ref) = async { use wc = new WebClientWithTimeout(Timeout = 5000) wc.Headers.Add ("User-Agent", "Friendly Bot 1.0 ( FriendlyBot@friendlybot.com )") // Increment the outstanding request count just before we send the request. do // NOTE : The message must be created THEN passed to synchedOut.WriteLine -- // piping it (|>) into synchedOut.WriteLine or using fprintfn causes a closure // to be created which somehow defeats the synchronization and garbles the output. let msg = Interlocked.Increment outstandingRequestCount |> sprintf "Outstanding requests: %i" synchedOut.WriteLine msg let! html = wc.AsyncDownloadString(Uri(url)) let ret = try html |> parseDoc |> parseArticle with ex -> let msg = sprintf "%A" ex synchedOut.WriteLine msg None // Decrement the outstanding request count now that we've // received a reponse and parsed it. do let msg = Interlocked.Decrement outstandingRequestCount |> sprintf "Outstanding requests: %i" synchedOut.WriteLine msg return ret } /// Writes a message to the console, passing a value through /// so it can be used within a function pipeline. let inline passThruWithMessage (msg : string) value = Console.WriteLine msg value let en100 = let outstandingRequestCount = ref 0 seq { for _ in 1..120 -> parseWikiAsync("http://en.wikipedia.org/wiki/Special:Random", outstandingRequestCount) } |> Async.Parallel |> Async.RunSynchronously |> passThruWithMessage "Finished running all of the requests." |> Seq.choose id |> Seq.take 100 

If you compile and run this code, you will see the output as follows:

 Outstanding requests: 4 Outstanding requests: 2 Outstanding requests: 1 Outstanding requests: 3 Outstanding requests: 5 Outstanding requests: 6 Outstanding requests: 7 Outstanding requests: 8 Outstanding requests: 9 Outstanding requests: 10 Outstanding requests: 12 Outstanding requests: 14 Outstanding requests: 15 Outstanding requests: 16 Outstanding requests: 17 Outstanding requests: 18 Outstanding requests: 13 Outstanding requests: 19 Outstanding requests: 20 Outstanding requests: 24 Outstanding requests: 22 Outstanding requests: 26 Outstanding requests: 27 Outstanding requests: 28 Outstanding requests: 29 Outstanding requests: 30 Outstanding requests: 25 Outstanding requests: 21 Outstanding requests: 23 Outstanding requests: 11 Outstanding requests: 29 Outstanding requests: 28 Outstanding requests: 27 Outstanding requests: 26 Outstanding requests: 25 Outstanding requests: 24 Outstanding requests: 23 Outstanding requests: 22 Outstanding requests: 21 Outstanding requests: 20 Outstanding requests: 19 Outstanding requests: 18 Outstanding requests: 17 Outstanding requests: 16 Outstanding requests: 15 Outstanding requests: 14 Outstanding requests: 13 Outstanding requests: 12 Outstanding requests: 11 Outstanding requests: 10 Outstanding requests: 9 Outstanding requests: 8 Outstanding requests: 7 Outstanding requests: 6 Outstanding requests: 5 Outstanding requests: 4 Outstanding requests: 3 Outstanding requests: 2 Outstanding requests: 1 Outstanding requests: 0 Finished running all of the requests. 

As you can see, all requests are executed before any of them are parsed - so if you use a slower connection or try to get a large number of documents, the server may drop the connection because it may assume that you are not receiving a response that he is trying to send. Another problem with the code is that you need to explicitly specify the number of elements to generate in seq , which makes the code less reusable.

The best solution would be to get and analyze the pages, as they are needed for some consumption code. (And if you think about it, then that suits F # seq .) Let's start by creating a function that takes Uri and creates a seq<Async<'T>> , i.e. Creates an endless sequence of Async<'T> , each of which will extract content from Uri, parse it and return a result.

 /// Given a Uri, creates an infinite sequence of whose elements are retrieved /// from the Uri. let createDocumentSeq (uri : System.Uri) = #if DEBUG let outstandingRequestCount = ref 0 #endif Seq.initInfinite <| fun _ -> async { use wc = new WebClientWithTimeout(Timeout = 5000) wc.Headers.Add ("User-Agent", "Friendly Bot 1.0 ( FriendlyBot@friendlybot.com )") #if DEBUG // Increment the outstanding request count just before we send the request. do // NOTE : The message must be created THEN passed to synchedOut.WriteLine -- // piping it (|>) into synchedOut.WriteLine or using fprintfn causes a closure // to be created which somehow defeats the synchronization and garbles the output. let msg = Interlocked.Increment outstandingRequestCount |> sprintf "Outstanding requests: %i" synchedOut.WriteLine msg #endif let! html = wc.AsyncDownloadString uri let ret = try Some html with ex -> let msg = sprintf "%A" ex synchedOut.WriteLine msg None #if DEBUG // Decrement the outstanding request count now that we've // received a reponse and parsed it. do let msg = Interlocked.Decrement outstandingRequestCount |> sprintf "Outstanding requests: %i" synchedOut.WriteLine msg #endif return ret } 

Now we use this function to retrieve pages as a stream:

 // let en100_Streaming = #if DEBUG let documentCount = ref 0 #endif Uri ("http://en.wikipedia.org/wiki/Special:Random") |> createDocumentSeq |> Seq.choose (fun asyncDoc -> Async.RunSynchronously asyncDoc |> Option.bind (parseDoc >> parseArticle)) #if DEBUG |> Seq.map (fun x -> let msg = Interlocked.Increment documentCount |> sprintf "Parsed documents: %i" synchedOut.WriteLine msg x) #endif |> Seq.take 50 // None of the computations actually take place until // this point, because Seq.toArray forces evaluation of the sequence. |> Seq.toArray 

If you run this code, you will see that it pulls the results one by one from the server and does not leave outstanding requests. In addition, it is very easy to change the number of results you want to get - all you have to do is change the value that you pass to Seq.take .

Now that this stream code is working fine, it does not execute requests in parallel, so it can be slow for a lot of documents. This is an easy fix to fix, although the solution may be a little unintuitive. Instead of trying to execute the entire query sequence in parallel - which is a problem in the source code, let me create a function that uses Async.Parallel to execute small parallel query packets, then use Seq.collect to combine the results into a flat sequence.

 /// Given a sequence of Async<'T>, creates a new sequence whose elements /// are computed in batches of a specified size. let parallelBatch batchSize (sequence : seq<Async<'T>>) = sequence |> Seq.windowed batchSize |> Seq.collect (fun batch -> batch |> Async.Parallel |> Async.RunSynchronously) 

To use this feature, we just need some small tweaks for the code from the streaming version:

 let en100_Batched = let batchSize = 10 #if DEBUG let documentCount = ref 0 #endif Uri ("http://en.wikipedia.org/wiki/Special:Random") |> createDocumentSeq // Execute batches in parallel |> parallelBatch batchSize |> Seq.choose (Option.bind (parseDoc >> parseArticle)) #if DEBUG |> Seq.map (fun x -> let msg = Interlocked.Increment documentCount |> sprintf "Parsed documents: %i" synchedOut.WriteLine msg x) #endif |> Seq.take 50 // None of the computations actually take place until // this point, because Seq.toArray forces evaluation of the sequence. |> Seq.toArray 

Again, it’s easy to change the number of documents you want to receive, and the batch size can be easily changed (again, I suggest you keep it small enough). If you wanted to, you can make several settings for the "streaming" and "batch" code so that you can switch between them at runtime.

Last: with my code, requests should not be timeouts, so you can probably get rid of the WebClientWithTimeout class and just use WebClient directly.

+8
source

Your code does not seem to do anything special, so I'm going to assume that Wikipedia does not like your activity. Take a look at their bot policy . Digging a little deeper, they also seem to have a strict User-Agent policy

As of February 15, 2010, Wikimedia sites require an HTTP agent-agent header for all requests. This was an operational decision made by the technical staff, and was announced and discussed on the mailing list. [1] [2] The rationale is that clients who do not send User-Agent Strings are mostly bad politeness scripts that cause a lot of load on servers without using projects. Note that non-descriptive default values ​​for the User-Agent string, such as those used by Perl libwww, can also be blocked when using Wikimedia websites (or parts of websites such as api.php).

User agents (browsers or scripts) that do not send the User-Agent header may now receive an error message that resembles the following:

Scenarios should use the User-Agent informative string with contact information or they may be blocked by IP without notification.

This way, with everything I found, they probably won't like what you do, even if you add the right user agent, but you can also try.

 wc.Headers.Add ("User-Agent", "Friendly Bot 1.0 ( FriendlyBot@friendlybot.com )") 

It would also be nice to make so many connections to their servers.

+2
source

All Articles