How to encapsulate the creation of long reactive chains observable

I currently have the following Rx / ReactiveUI code code:

this.WhenAnyValue(x => x.Listras) .Where(item => item != null) .Throttle(TimeSpan.FromMilliseconds(millis)) .ObserveOn(TaskPoolScheduler.Default) .Select(im => GetArray.FromChannels(im, 0, 1)) .ObserveOn(RxApp.MainThreadScheduler) .ToProperty(this, x => x.Grayscale, out _grayscale); this.WhenAnyValue(x => x.Grayscale) .Where(item => item != null) .Throttle(TimeSpan.FromMilliseconds(millis)) .ObserveOn(TaskPoolScheduler.Default) .Select(ar => Gaussian.GaussianConvolution(ar, 1.5)) .ObserveOn(RxApp.MainThreadScheduler) .ToProperty(this, x => x.BlurMenor, out _blurMenor); this.WhenAnyValue(x => x.BlurMenor) .Where(item => item != null) .Throttle(TimeSpan.FromMilliseconds(millis)) .ObserveOn(TaskPoolScheduler.Default) .Select(ar => { ConversorImagem.Converter(ar, out BitmapSource im); return im; }) .ObserveOn(RxApp.MainThreadScheduler) .ToProperty(this, x => x.ImagemBlurMenor, out _imagemBlurMenor); this.WhenAnyValue(x => x.BlurMenor) .Where(item => item != null) .Throttle(TimeSpan.FromMilliseconds(millis)) .ObserveOn(TaskPoolScheduler.Default) .Select(ar => Gaussian.VerticalGaussianConvolution(ar, 5)) .ObserveOn(RxApp.MainThreadScheduler) .ToProperty(this, x => x.BlurMaior, out _blurMaior); this.WhenAnyValue(x => x.BlurMaior) .Where(item => item != null) .Throttle(TimeSpan.FromMilliseconds(millis)) .ObserveOn(TaskPoolScheduler.Default) .Select(ar => { ConversorImagem.Converter(ar, out BitmapSource im); return im; }) .ObserveOn(RxApp.MainThreadScheduler) .ToProperty(this, x => x.ImagemBlurMaior, out _imagemBlurMaior); this.WhenAnyValue(x => x.BlurMenor, x => x.BlurMaior) .Where(tuple => tuple.Item1 != null && tuple.Item2 != null) .Throttle(TimeSpan.FromMilliseconds(millis)) .ObserveOn(TaskPoolScheduler.Default) .Select(tuple => ArrayOperations.Diferença(tuple.Item1, tuple.Item2)) .ObserveOn(RxApp.MainThreadScheduler) .ToProperty(this, x => x.Diferença, out _diferença); this.WhenAnyValue(x => x.Diferença) .Where(item => item != null) .Throttle(TimeSpan.FromMilliseconds(millis)) .ObserveOn(TaskPoolScheduler.Default) .Select(ar => { ConversorImagem.Converter(ar, out BitmapSource im); return im; }) .ObserveOn(RxApp.MainThreadScheduler) .ToProperty(this, x => x.ImagemDiferença, out _imagemDiferença); 

As you can see, it grossly violates the DRY principle, but I don’t know how I can parameterize the passage of properties and delegates.

What is the usual way to automate the creation of these method chains in Rx / ReactiveUI?

+7
c # system.reactive fluent reactiveui
source share
1 answer

The beauty of Rx is that you can create your own operators based on other operators. This is due to the functional side of Rx.

You can create a new statement that encapsulates all common behavior and accepts small differences in the quality of the parameters:

 // Put this class somewhere useful. Either beside your VM inside the same namespace // Or in a seperate file for all your custom operators public static class ObservableMixins { public static IObservable<TOut> ThrottledSelect<TIn, TOut>(this IObservable<TIn> source, int milliseconds, Func<TIn, TOut> selector) => source .Where(item => item != null) .Throttle(TimeSpan.FromMilliseconds(milliseconds)) .ObserveOn(TaskPoolScheduler.Default) .Select(selector) .ObserveOn(RxApp.MainThreadScheduler) } 

Use it like this:

 this.WhenAnyValue(x => x.Listras) .ThrottledSelect(millis, im => GetArray.FromChannels(im, 0, 1)) .ToProperty(this, x => x.Grayscale, out _grayscale); this.WhenAnyValue(x => x.Grayscale) .ThrottledSelect(millis, ar => Gaussian.GaussianConvolution(ar, 1.5)) .ToProperty(this, x => x.BlurMenor, out _blurMenor); this.WhenAnyValue(x => x.BlurMenor) .ThrottledSelect(millis, ar => { ConversorImagem.Converter(ar, out BitmapSource im); return im; }) .ToProperty(this, x => x.ImagemBlurMenor, out _imagemBlurMenor); this.WhenAnyValue(x => x.BlurMenor) .ThrottledSelect(millis, ar => Gaussian.VerticalGaussianConvolution(ar, 5)) .ToProperty(this, x => x.BlurMaior, out _blurMaior); this.WhenAnyValue(x => x.BlurMaior) .ThrottledSelect(millis, ar => { ConversorImagem.Converter(ar, out BitmapSource im); return im; }) .ToProperty(this, x => x.ImagemBlurMaior, out _imagemBlurMaior); this.WhenAnyValue(x => x.BlurMenor, x => x.BlurMaior) // Notice how I'm returning a null if either item is null // It will be filtered in the operator .Select(tuple => tuple.Item1 != null || tuple.Item2 != null ? null : tuple) .ThrottledSelect(millis, tuple => ArrayOperations.Diferença(tuple.Item1, tuple.Item2)) .ToProperty(this, x => x.Diferença, out _diferença); this.WhenAnyValue(x => x.Diferença) .ThrottledSelect(millis, ar => { ConversorImagem.Converter(ar, out BitmapSource im); return im; }) .ToProperty(this, x => x.ImagemDiferença, out _imagemDiferença); 

If you feel a little less adventurous, you can, of course, use it as a regular method, which is perceived as observable:

 public IObservable<T> ThrottledSelect<TIn, TOut>(IObservable<TIn> source, int milliseconds, Func<TIn, TOut> selector) => source .Where(item => item != null) .Throttle(TimeSpan.FromMilliseconds(milliseconds)) .ObserveOn(TaskPoolScheduler.Default) .Select(selector) .ObserveOn(RxApp.MainThreadScheduler) 

And use it as follows:

 ThrottledSelect(this.WhenAnyValue(x => x.Diferença), millis, ar => { ConversorImagem.Converter(ar, out BitmapSource im); return im; }) .ToProperty(this, x => x.ImagemDiferença, out _imagemDiferença); 
+8
source share

All Articles