Throttle only if certain conditions are met

I watch myself sign up. This obsevable will return an object that has an ActivationType property that can be set multiple times.

What I'm trying to achieve is to log a message whenever the ActivationType parameter is set to "Type1". However, if ActivationType is set to Type2, register the message only once and wait 30 seconds before logging in again if ActivationType is Type2.

So, if I have:

myObservable .Where(o => o.ActivationType == "Type1" || o.ActivationType == "Type2") //listen for types 1 and 2 .Throttle() // ??? somehow only throttle if we are currently looking at Type2 .Subscribe(Log); //log some stuff 

I believe that Throttle () is what I'm looking for, but not sure how to call it conditionally.

Any suggestions?

+4
source share
2 answers

Ah, an ideal case for the almost impossible to understand Window operator!

EDIT: I post this link a dozen times a month, I swear - I read best that I saw Window , Join , Buffer , GroupJoin , etc:

Lee Campbell: Rx Part 9-Join, Window, Buffer and Group Join

 var source = new Subject<Thing>(); var feed = source.Publish().RefCount(); var ofType1 = feed.Where(t => t.ActivationType == "Type1"); var ofType2 = feed // only window the type2s .Where(t => t.ActivationType == "Type2") // our "end window selector" will be a tick 30s off from start .Window(() => Observable.Timer(TimeSpan.FromSeconds(30))) // we want the first one in each window... .Select(lst => lst.Take(1)) // moosh them all back together .Merge(); // We want all "type 1s" and the buffered outputs of "type 2s" var query = ofType1.Merge(ofType2); // Let set up a fake stream of data var running = true; var feeder = Task.Factory.StartNew( () => { // until we say stop... while(running) { // pump new Things into the stream every 500ms source.OnNext(new Thing()); Thread.Sleep(500); } }); using(query.Subscribe(Console.WriteLine)) { // Block until we hit enter so we can see the live output // from the above subscribe Console.ReadLine(); // Shutdown our fake feeder running = false; feeder.Wait(); } 
+6
source

Why not just use two streams?

 var baseStream = myObservable.Publish().RefCount(); // evaluate once var type1 = baseStream.Where(o => o.ActivationType == "Type1"); var type2 = baseStream.Where(o => o.ActivationType == "Type2").Throttle(TimeSpan.FromSeconds(30)); type1.Merge(type2).Subscribe(Log); 
+2
source

All Articles