How to organize these calls using Reactive Extensions (Rx) in Silverlight?

I have several calls that must be executed sequentially. Consider an IService that has a Query and Load method. The request contains a list of widgets, and the load is a default widget. Therefore, my service looks like this.

void IService.Query(Action<IEnumerable<Widget>,Exception> callback); void IService.Load(Action<Widget,Exception> callback); 

With that in mind, here is an approximate sketch of a view model:

 public class ViewModel : BaseViewModel { public ViewModel() { Widgets = new ObservableCollection<Widget>(); WidgetService.Query((widgets,exception) => { if (exception != null) { throw exception; } Widgets.Clear(); foreach(var widget in widgets) { Widgets.Add(widget); } WidgetService.Load((defaultWidget,ex) => { if (ex != null) { throw ex; } if (defaultWidget != null) { CurrentWidget = defaultWidget; } } }); } public IService WidgetService { get; set; } // assume this is wired up public ObservableCollection<Widget> Widgets { get; private set; } private Widget _currentWidget; public Widget CurrentWidget { get { return _currentWidget; } set { _currentWidget = value; RaisePropertyChanged(()=>CurrentWidget); } } } 

What I would like to do is simplify the sequential workflow of calling the request, and then by default. Perhaps the best way to do this is nested in lambda expressions, as I have shown, but I decided there could be a more elegant way with Rx. I do not want to use Rx for the sake of Rx, but if this allows me to organize the logic above, so it is easier to read / maintain in the method, I will use it. Ideally, something like:

 Observable.Create( ()=>firstAction(), ()=>secondAction()) .Subscribe(action=>action(),error=>{ throw error; }); 

Using a streaming library, I would do something like:

 Service.Query(list=>{result=list}; yield return 1; ProcessList(result); Service.Query(widget=>{defaultWidget=widget}; yield return 1; CurrentWidget = defaultWidget; 

This makes it much more obvious that the workflow is sequential and eliminates nesting (the output is part of an asynchronous enumerator and are boundaries that are locked until the results return).

Something like that would make sense to me.

So the gist of the question is: Am I trying to put a square anchor in a circular hole, or is there a way to override nested asynchronous calls using Rx?

+7
asynchronous silverlight system.reactive
source share
2 answers

You can convert maintenance methods to return IObservable instead of accepting a callback as a parameter. In this case, a sequential workflow can be implemented using SelectMany , something like this ...

  WidgetService.Query() .SelectMany( widgets => { Widgets.Clear(); foreach (var w in widgets) { Widgets.Add(w); } return WidgetService.Load(); } ) .Do( defaultWidget => { if (defaultWidget != null) Default = defaultWidget; } ) .Subscribe( _ => { }, e => { throw e; } ); 

However, asynchronous IMO F # will look much clearer (in the example, I assume that the service methods return Async> and Async, respectively). Please note that the example does not take into account which stream changes data fields, in the real world code you should pay attention to this:

  let load = async { let! widgets = WidgetService.Query() Widgets.Clear() for w in widgets do Widgets.Add(w) let! defaultWidget = WidgetService.Load() if defaultWidget <> null then Default <- defaultWidget return () } Async.StartWithContinuations( load, ignore, // success continuation - ignore result raise, // error continuation - reraise exception ignore // cancellation continuation - ignore ) 

EDITED

You can actually use the technique with the iterators mentioned in your question:

  private IEnumerable<IObservable<object>> Intialize() { var widgetsList = WidgetService.Query().Start(); yield return widgetsList; Widgets.Clear(); foreach (var w in widgetsList[0]) { Widgets.Add(w); } var defaultWidgetList = WidgetService.Load().Start(); yield return defaultWidgetList; if (defaultWidgetList[0] != null) Default = defaultWidgetList[0]; } Observable .Iterate(Intialize) .Subscribe( _ => { }, ex => { throw ex; } ); 
+3
source share

You can also do this with ReactiveXaml , although since your CurrentWidget and Widgets are both mutable, you cannot do it as clean (there is a class called ObservableAsPropertyHelper that will update the property based on IObservable and run RaisePropertyChanged):

 public class ViewModel { public ViewModel() { // These return a Func that wraps an async call in an IObservable<T> // that always yields only one item (the result of the call) var QueryAsObservable = Observable.FromAsyncCommand<IEnumerable<Widget>>(WebService.BeginQuery, WebService.EndQuery); var LoadAsObservable = Observable.FromAsyncCommand<Widget>(WebService.BeginLoad, WebService.EndLoad); // Create a new command QueryAndLoad = new ReactiveAsyncCommand(); // QueryAndLoad fires every time someone calls ICommand.Execute // The .Do is the hacky part, for sync calls it hidden by RegisterAsyncFunction var async_results = QueryAndLoad.SelectMany(_ => QueryAsObservable()) .Do(_ => DoTranslate.AsyncCompletedNotification.OnNext(new Unit())); // Query up the Widgets async_results.Subscribe(x => x.Run(Widgets.Add)); // Now execute the Load async_results.SelectMany(_ => LoadAsObservable()) .Subscribe(x => CurrentWidget = x); QueryAndLoad.Execute(); } public ReactiveAsyncCommand QueryAndLoad {get; private set; } public ObservableCollection<Widget> Widgets {get; private set; } public Widget CurrentWidget {get; set; } } 
+1
source share

All Articles