Cyclic dependencies between threads in reactive programming

Helping in reactive programming, I often come across situations where two threads are dependent on each other. What is the idiomatic way to solve these cases?

Minimal example: there are buttons A and B, both display the value. Pressing A should increase the value of A by B. When pressing B, you must set the value of B to A.

The first solution I could come up with (for example, in F #, but answers in any language are welcome):

let solution1 buttonA buttonB = let mutable lastA = 0 let mutable lastB = 1 let a = new Subject<_> () let b = new Subject<_> () (OnClick buttonA).Subscribe(fun _ -> lastA <- lastA + lastB; a.OnNext lastA) (OnClick buttonB).Subscribe(fun _ -> lastB <- lastA; b.OnNext lastB) a.Subscribe(SetText buttonA) b.Subscribe(SetText buttonA) a.OnNext 0 b.OnNext 1 

This solution uses a volatile state and objects, it is not very readable and does not look idiomatic.

The second solution I tried includes creating a method that binds two dependent threads together:

 let dependency (aGivenB: IObservable<_> -> IObservable<_>) (bGivenA: IObservable<_> -> IObservable<_>) = let bProxy = new ReplaySubject<_> () let a = aGivenB bProxy let b = bGivenA a b.Subscribe(bProxy.OnNext) a, b let solution2 buttonA buttonB = let aGivenB b = Observable.WithLatestFrom(OnClick buttonA, b, fun click bValue -> bValue) .Scan(fun acc x -> acc + x) .StartWith(0) let bGivenA a = Observable.Sample(a, OnClick buttonB) .StartWith(1) let a, b = dependency aGivenB bGivenA a.Subscribe(SetText buttonA) b.Subscribe(SetText buttonB) 

This seems a little better, but since there is no method like dependency in the reactive library, I believe there is a more idiomatic solution. It is also easy to introduce infinite recursion using the second approach.

What is the recommended way to solve problems related to cyclic dependency between threads, for example, in the above example, in reactive programming?

+7
c # reactive-programming f # system.reactive reactivex
source share
3 answers

EDIT

Here's the F # solution:

 type DU = | A | B type State = { AValue : int; BValue : int } let solution2 (aObservable:IObservable<_>, bObservable:IObservable<_>) = let union = aObservable.Select(fun _ -> A).Merge(bObservable.Select(fun _ -> B)) let result = union.Scan({AValue = 0; BValue = 1}, fun state du -> match du with | A -> { state with AValue = state.AValue + state.BValue } | B -> { state with BValue = state.AValue } ) result 

F # is actually a great language for this, thanks to its built-in discriminated associations and records. Here is an answer written in C # with the usual Discriminatory Union; my f # is pretty rusty.

The trick is to turn your two observables into one observable using a discriminated union. Thus, basically the union of a and b into one observable from the delimited union:

 a : *---*---*---** b : -*-*--*---*--- du: ab-ba-bab-aa 

Once this is done, you can respond if the item is a button press of “A” or “B”.

Just for confirmation, I assume that there is no way to explicitly set the value embedded in ButtonA / ButtonB. If there is, then these changes should be modeled as observable, and work in a discriminatory union.

 var a = new Subject<Unit>(); var b = new Subject<Unit>(); var observable = a.DiscriminatedUnion(b) .Scan(new State(0, 1), (state, du) => du.Unify( /* A clicked case */_ => new State(state.A + state.B, state.B), /* B clicked case */_ => new State(state.A, state.A) ) ); observable.Subscribe(state => Console.WriteLine($"a = {state.A}, b = {state.B}")); a.OnNext(Unit.Default); a.OnNext(Unit.Default); a.OnNext(Unit.Default); a.OnNext(Unit.Default); b.OnNext(Unit.Default); a.OnNext(Unit.Default); a.OnNext(Unit.Default); a.OnNext(Unit.Default); a.OnNext(Unit.Default); b.OnNext(Unit.Default); 

Here are the classes this relies on C # for. Most of this translates easily to F # built-in types.

 public class State /*easily replaced with an F# record */ { public State(int a, int b) { A = a; B = b; } public int A { get; } public int B { get; } } /* easily replaced with built-in discriminated unions and pattern matching */ public static class DiscriminatedUnionExtensions { public static IObservable<DiscriminatedUnionClass<T1, T2>> DiscriminatedUnion<T1, T2>(this IObservable<T1> a, IObservable<T2> b) { return Observable.Merge( a.Select(t1 => DiscriminatedUnionClass<T1, T2>.Create(t1)), b.Select(t2 => DiscriminatedUnionClass<T1, T2>.Create(t2)) ); } public static IObservable<TResult> Unify<T1, T2, TResult>(this IObservable<DiscriminatedUnionClass<T1, T2>> source, Func<T1, TResult> f1, Func<T2, TResult> f2) { return source.Select(union => Unify(union, f1, f2)); } public static TResult Unify<T1, T2, TResult>(this DiscriminatedUnionClass<T1, T2> union, Func<T1, TResult> f1, Func<T2, TResult> f2) { return union.Item == 1 ? f1(union.Item1) : f2(union.Item2) ; } } public class DiscriminatedUnionClass<T1, T2> { private readonly T1 _t1; private readonly T2 _t2; private readonly int _item; private DiscriminatedUnionClass(T1 t1, T2 t2, int item) { _t1 = t1; _t2 = t2; _item = item; } public int Item { get { return _item; } } public T1 Item1 { get { return _t1; } } public T2 Item2 { get { return _t2; } } public static DiscriminatedUnionClass<T1, T2> Create(T1 t1) { return new DiscriminatedUnionClass<T1, T2>(t1, default(T2), 1); } public static DiscriminatedUnionClass<T1, T2> Create(T2 t2) { return new DiscriminatedUnionClass<T1, T2>(default(T1), t2, 2); } } 
+3
source share

Here's a very simple solution using Gjallarhorn :

 #r @"..\packages\Gjallarhorn\lib\portable-net45+netcore45+wpa81+wp8+MonoAndroid1+MonoTouch1\Gjallarhorn.dll" open Gjallarhorn (* Clicking on A must increment the value of A by B. Clicking on B must set the value of B to A. *) let a = Mutable.create 3 let b = Mutable.create 4 let clickA() = a.Value <- a.Value + b.Value let clickB() = b.Value <- a.Value let d1 = Signal.Subscription.create (fun x -> printfn "%A" <| "Clicked A: " + x.ToString()) a let d2 = Signal.Subscription.create (fun x -> printfn "%A" <| "Clicked B: " + x.ToString()) b clickA() clickB() 

In fact, it is very similar to your initial one, therefore it uses a volatile state, but it simplifies binding to the user interface, for more idiomatic use, see this blog post .

+1
source share

Assuming that the output will eventually be sent back to the source code, you can do this using the basic operators. All you have to do is call withLatestFrom twice for each observed button / signal. My solution is in java, but it should be easy enough to follow!

 private static Pair<Observable<Integer>, Observable<Integer>> test( final Observable<Integer> aValues, final Observable<Integer> bValues, final Observable<Void> aButton, final Observable<Void> bButton, final Func2<Integer, Integer, Integer> aFunction, final Func2<Integer, Integer, Integer> bFunction ) { return new Pair<>( aButton.withLatestFrom(aValues, (button, a) -> a).withLatestFrom(bValues, aFunction), bButton.withLatestFrom(aValues, (button, a) -> a).withLatestFrom(bValues, bFunction) ); } 

Here is the test code I used:

 final TestScheduler scheduler = new TestScheduler(); final TestSubject<Integer> aSubject = TestSubject.create(scheduler); final TestSubject<Integer> bSubject = TestSubject.create(scheduler); aSubject.onNext(1); bSubject.onNext(1); final TestSubject<Void> aButton = TestSubject.create(scheduler); final TestSubject<Void> bButton = TestSubject.create(scheduler); final Pair<Observable<Integer>, Observable<Integer>> pair = test( aSubject, bSubject, aButton, bButton, (a, b) -> a + b, (a, b) -> a ); pair.component1().subscribe(aSubject::onNext); pair.component2().subscribe(bSubject::onNext); pair.component1().map(a -> "A: " + a).subscribe(System.out::println); pair.component2().map(b -> "B: " + b).subscribe(System.out::println); aButton.onNext(null); scheduler.triggerActions(); bButton.onNext(null); scheduler.triggerActions(); aButton.onNext(null); scheduler.triggerActions(); aButton.onNext(null); scheduler.triggerActions(); bButton.onNext(null); scheduler.triggerActions(); 

Fingerprints:

 A: 2 B: 2 A: 4 A: 6 B: 6 
+1
source share

All Articles