Using Rx to Simplify an Asynchronous Request to a Silverlight Web Service

I wrote a simplified Silverlight client library for my WCF web service using Rx, however sometimes I notice that I miss events that have completed.

public IObservable<XElement> GetReport(string reportName) { return from client in Observable.Return(new WebServiceClient()) from request in Observable.ToAsync<string>(client.GetReportDataAsync)(reportName) from result in Observable.FromEvent<GetReportDataCompletedEventArgs>(client, "GetReportDataCompleted").Take(1) from close in this.CloseClient(client) select result.EventArgs.Result; } 

I believe that the problem is caused by the fact that the web service is called and returned before signing the completed event. I can't figure out how to get Rx to subscribe to the event before calling Async. I tried StartWith, but this requires the input and output types to be the same, any ideas?

+4
source share
2 answers

It seems the best answer is to use Observable.CreateWithDisposable ()

eg.

 public IObservable<XElement> GetReport(string reportName) { return from client in Observable.Return(new WebServiceClient()) from completed in Observable.CreateWithDisposable<GetReportDataCompletedEventArgs>(observer => { var subscription = Observable.FromEvent<GetReportDataCompletedEventArgs>(client, "GetReportDataCompleted") .Take(1) .Select(e => e.EventArgs) .Subscribe(observer); client.GetReportDataAsync(reportName); return subscription; }) from close in this.CloseClient(client) select completed.Result; } 

To make it easier to work with I, I converted CreateWithDisposable to a generic function that can be used with all my web service calls, including automatically detecting the event name from the args type of events:

 private IObservable<T> CallService<T>(ICommunicationObject serviceClient, Action start) where T : AsyncCompletedEventArgs { if (typeof(T) == typeof(AsyncCompletedEventArgs)) { throw new InvalidOperationException("Event arguments type cannot be used to determine event name, use event name overload instead."); } string completedEventName = typeof(T).Name.TrimEnd("EventArgs"); return CallService<T>(serviceClient, start, completedEventName); } private IObservable<T> CallService<T>(ICommunicationObject serviceClient, Action start, string completedEventName) where T : AsyncCompletedEventArgs { return Observable.CreateWithDisposable<T>(observer => { var subscription = Observable.FromEvent<T>(serviceClient, completedEventName).Take(1).Select(e => e.EventArgs).Subscribe(observer); start(); return subscription; }); } // Example usage: public IObservable<XElement> GetReport(string reportName) { return from client in Observable.Return(new WebServiceClient()) from completed in this.CallService<GetReportDataCompletedEventArgs>(client, () => client.GetReportDataAsync(reportName)) from close in this.CloseClient(client) select completed.Result; } /// <summary> /// Asynchronously closes the web service client /// </summary> /// <param name="client">The web service client to be closed.</param> /// <returns>Returns a cold observable sequence of a single success Unit.</returns> private IObservable<AsyncCompletedEventArgs> CloseClient(WebServiceClient client) { return this.CallService<AsyncCompletedEventArgs>(client, client.CloseAsync, "CloseCompleted"); } 

Hope this helps someone else!

+7
source

I need to use a common WebClient.DownloadStringAsync , so here is my version.

First wrap the event:

 public static IObservable<IEvent<DownloadStringCompletedEventArgs>> GetDownloadStringObservableEvent(this WebClient wc) { return Observable.FromEvent<DownloadStringCompletedEventArgs>( wc, "DownloadStringCompleted"); } 

Then create an extension method:

 public static IObservable<string> GetDownloadString(this WebClient wc, Uri uri) { return Observable.CreateWithDisposable<string>( observer => { // Several downloads may be going on simultaneously. The token allows // us to establish that we're retrieving the right one. Guid token = Guid.NewGuid(); var stringDownloaded = wc.GetDownloadStringObservableEvent() .Where(evt => ((Guid)evt.EventArgs.UserState) == token) .Take(1); //implicitly unhooks handler after event is received bool errorOccurred = false; IDisposable unsubscribe = stringDownloaded.Subscribe( // OnNext action ev => { // Propagate the exception if one is reported. if (ev.EventArgs.Error != null) { errorOccurred = true; observer.OnError(ev.EventArgs.Error); } else if (!ev.EventArgs.Cancelled) { observer.OnNext(ev.EventArgs.Result); } }, // OnError action (propagate exception) ex => observer.OnError(ex), // OnCompleted action () => { if (!errorOccurred) { observer.OnCompleted(); } }); try { wc.DownloadStringAsync(uri, token); } catch (Exception ex) { observer.OnError(ex); } return unsubscribe; } ); } 

The use is simple:

 wc.GetDownloadString(new Uri("http://myservice")) .Subscribe(resultCallback , errorCallback); 
+1
source

All Articles