RX Observable.TakeWhile checks the condition before each element, but I need to check after

Observable.TakeWhile allows you to run a sequence while the condition is true (using a delegate so that we can perform calculations on real objects of the sequence), but checking this condition before each element. How to do the same check, but AFTER each item?

The following code demonstrates the problem

void RunIt() { List<SomeCommand> listOfCommands = new List<SomeCommand>(); listOfCommands.Add(new SomeCommand { CurrentIndex = 1, TotalCount = 3 }); listOfCommands.Add(new SomeCommand { CurrentIndex = 2, TotalCount = 3 }); listOfCommands.Add(new SomeCommand { CurrentIndex = 3, TotalCount = 3 }); var obs = listOfCommands.ToObservable().TakeWhile(c => c.CurrentIndex != c.TotalCount); obs.Subscribe(x => { Debug.WriteLine("{0} of {1}", x.CurrentIndex, x.TotalCount); }); } class SomeCommand { public int CurrentIndex; public int TotalCount; } 

Displays

 1 of 3 2 of 3 

I can not get the third element

Looking at this example, you might think that all I need to do is change my state like this:

 var obs = listOfCommands.ToObservable().TakeWhile(c => c.CurrentIndex <= c.TotalCount); 

But then the observable will never end (because in my real world code the thread does not end after these three commands)

+6
source share
5 answers

Final editing:

I based my decision on Sergey TakeWhileInclusive implementation in this thread - How to populate Rx Observable depending on the state in the event

 public static IObservable<TSource> TakeUntil<TSource>( this IObservable<TSource> source, Func<TSource, bool> predicate) { return Observable .Create<TSource>(o => source.Subscribe(x => { o.OnNext(x); if (predicate(x)) o.OnCompleted(); }, o.OnError, o.OnCompleted )); } 
+4
source

There are no built-in operators to do what you ask, but here, where Publish used to run two queries, only by subscribing to the observable below:

 // Emits matching values, but includes the value that failed the filter public static IObservable<T> TakeWhileInclusive<T>( this IObservable<T> source, Func<T, bool> predicate) { return source.Publish(co => co.TakeWhile(predicate) .Merge(co.SkipWhile(predicate).Take(1))); } 

And then:

 var obs = listOfCommands.ToObservable() .TakeWhileInclusive(c.CurrentIndex != c.TotalCount); 
+13
source

You can use the TakeUntil to execute each element until the secondary source TakeUntil a value; in this case, we can specify the second thread as the first value after passing through the predicate:

 public static IObservable<TSource> TakeWhileInclusive<TSource>( this IObservable<TSource> source, Func<TSource, bool> predicate) { return source.TakeUntil(source.SkipWhile(x => predicate(x)).Skip(1)); } 
+3
source

I think you are after TakeWhile , not TakeUntil :

 var list = (new List<int>(){1,2,3,4,5,6,7,8,9,10}); var takeWhile = list .ToObservable() .Select((_, i) => Tuple.Create(i, _)) .TakeWhile(tup => tup.Item1 < list.Count) .Do(_ => Console.WriteLine("Outputting {0}", _.Item2)); 

Strike>

Well, the thing you want doesn't exist out of the box, at least I don't know something with this particular syntax. However, you can pretty easily cheat it (and it's not too nasty):

 var fakeCmds = Enumerable .Range(1, 100) .Select(i => new SomeCommand() {CurrentIndex = i, TotalCount = 10}) .ToObservable(); var beforeMatch = fakeCmds .TakeWhile(c => c.CurrentIndex != c.TotalCount); var theMatch = fakeCmds .SkipWhile(c => c.CurrentIndex != c.TotalCount) .TakeWhile(c => c.CurrentIndex == c.TotalCount); var upToAndIncluding = Observable.Concat(beforeMatch, theMatch); 
+1
source

Combo using the new SkipUntil and TakeUntil :

SkipUntil return source.Publish(s => s.SkipUntil(s.Where(predicate)));

TakeUntil (inclusive) return source.Publish(s => s.TakeUntil(s.SkipUntil(predicate)));

Full source: https://gist.github.com/GeorgeTsiokos/a4985b812c4048c428a981468a965a86

0
source

All Articles