In Rx, how to group the last items after a certain period of time?

Sorry if the title is not very clear, I could not come up with anything better ...

I get user input in the form of IObservable<char> , and I would like to convert it to IObservable<char[]> , grouping characters every time the user stops typing for more than 1 second. So, for example, if the input looks like this:

 h e l l o (pause) w o r l d (pause) ! (pause) 

I would like the output observable to be:

 ['h', 'e', 'l', 'l', 'o'] ['w', 'o', 'r', 'l', 'd'] ['!'] 

I suspect the solution is pretty simple, but I cannot find the right approach ... I tried using Buffer , GroupByUntil , Throttle and several others to no avail.

Any idea would be welcome!


EDIT: I have something that almost works:

  _input.Buffer(() => _input.Delay(TimeSpan.FromSeconds(1))) .ObserveOnDispatcher() .Subscribe(OnCompleteInput); 

But I need a reset delay every time a new character is typed ...

+7
source share
4 answers

Buffer and Throttle will suffice if your spring is hot. To make it hot, you can use .Publish().RefCount() to ensure that you get only one subscription per source.

 IObservable<IList<T>> BufferWithInactivity<T>(this IObservable<T> source, TimeSpan dueTime) { if (source == null) throw new ArgumentNullException("source"); //defer dueTime checking to Throttle var hot = source.Publish().RefCount(); return hot.Buffer(() => hot.Throttle(dueTime)); } 
+7
source

OK, I found a solution:

  Func<IObservable<char>> bufferClosingSelector = () => _input.Timeout(TimeSpan.FromSeconds(1)) .Catch(Observable.Return('\0')) .Where(i => i == '\0'); _input.Buffer(bufferClosingSelector) .ObserveOnDispatcher() .Subscribe(OnCompleteInput); 

Basically, bufferClosingSelector pushes something whenever a timeout occurs that closes the current buffer. There is probably a simpler and more elegant way, but it works ... I'm open to the best deals;)

0
source

I wrote an extension a while ago to do what you need - BufferWithInactivity .

Here he is:

 public static IObservable<IEnumerable<T>> BufferWithInactivity<T>( this IObservable<T> source, TimeSpan inactivity, int maximumBufferSize) { return Observable.Create<IEnumerable<T>>(o => { var gate = new object(); var buffer = new List<T>(); var mutable = new SerialDisposable(); var subscription = (IDisposable)null; var scheduler = Scheduler.ThreadPool; Action dump = () => { var bts = buffer.ToArray(); buffer = new List<T>(); if (o != null) { o.OnNext(bts); } }; Action dispose = () => { if (subscription != null) { subscription.Dispose(); } mutable.Dispose(); }; Action<Action<IObserver<IEnumerable<T>>>> onErrorOrCompleted = onAction => { lock (gate) { dispose(); dump(); if (o != null) { onAction(o); } } }; Action<Exception> onError = ex => onErrorOrCompleted(x => x.OnError(ex)); Action onCompleted = () => onErrorOrCompleted(x => x.OnCompleted()); Action<T> onNext = t => { lock (gate) { buffer.Add(t); if (buffer.Count == maximumBufferSize) { dump(); mutable.Disposable = Disposable.Empty; } else { mutable.Disposable = scheduler.Schedule(inactivity, () => { lock (gate) { dump(); } }); } } }; subscription = source .ObserveOn(scheduler) .Subscribe(onNext, onError, onCompleted); return () => { lock (gate) { o = null; dispose(); } }; }); } 
0
source

That should work. This is not as concise as your solution, as it implements the logic through the class instead of extension methods, but it may be the best way to do this. In short: every time you get a char , add it to the List and (re) start the timer, which expires in one second; when the timer expires, tell our List subscribers as an array and reset state so that it is ready the next time.

  class Breaker : IObservable<char[]>, IObserver<char> { List<IObserver<char[]>> observers = new List<IObserver<char[]>>(); List<char> currentChars; DispatcherTimer t; public Breaker(IObservable<char> source) { source.Subscribe(this); t = new DispatcherTimer { Interval = new TimeSpan(0, 0, 1) }; t.Tick += TimerOver; currentChars = new List<char>(); } public IDisposable Subscribe(IObserver<char[]> observer) { observers.Add(observer); return null; //TODO return a useful IDisposable } public void OnCompleted() { //TODO implement completion logic } public void OnError(Exception e) { //TODO implement error logic } public void OnNext(char value) { currentChars.Add(value); t.Start(); } void TimerOver(object sender, EventArgs e) { char[] chars = currentChars.ToArray(); foreach (var obs in observers) obs.OnNext(chars); currentChars.Clear(); t.Stop(); } } 
0
source

All Articles