Time-based event flow handling in rxjs

I have a process that sends me data packets at intervals, and I need to control this flow depending on the time packets arrived, etc. At some point, I also close the thread and the process.

Right now, I am using a set of timers for this, but I hope I can do it with rxjs as it seems very suitable for this kind of thing. So far I have not had much success.

Problem

A thread should send me packets at regular intervals, but it usually rejects very often and sometimes gets stuck.

I want to close the stream at some point under the following conditions:

  • If I need to send the first package more than startDelay .
  • After sending the first packet, if there is a pause longer than middleDelay between two packets.
  • After a specified period of time maxChannelTime .

When I am about to close the stream for any of the above reasons, I first ask it to be closed politely so that it can do some cleaning. Sometimes it also sends me the final data packet during cleanup. But I want to wait more than cleanupTime to clean up and get the latest data before I close the stream and ignore any messages.

Development

I will create "threads" by wrapping the Observable event. I have no problem with this.

By "closing" the stream, I want to say that the process has stopped emitting data and, possibly, closing (i.e., dying).

+7
javascript asynchronous rxjs
source share
3 answers

Tough problem.

I broke it into two phases - “adjusted” (since we want to check regular intervals) and “clean”.

Work back, exit

 const regulated = source.takeUntil(close) const cleanup = source.skipUntil(close).takeUntil(cleanupCloser) const output = regulated.merge(cleanup) 

'Closers' are observables that emit when time closes (one closer to the timeout value).

 const startTimeout = 600 const intervalTimeout = 200 const maxtimeTimeout = 3000 const cleanupTimeout = 300 const startCloser = Observable.timer(startTimeout) // emit once after initial delay .takeUntil(source) // cancel after source emits .mapTo('startTimeoutMarker') const intervalCloser = source.switchMap(x => // reset interval after each source emit Observable.timer(intervalTimeout) // emit once after intervalTimeout .mapTo('intervalTimeoutMarker') ) const maxtimeCloser = Observable.timer(maxtimeTimeout) // emit once after maxtime .takeUntil(startCloser) // cancel if startTimeout .takeUntil(intervalCloser) // cancel if intervalTimeout .mapTo('maxtimeTimeoutMarker') const close = Observable.merge(startCloser, intervalCloser, maxtimeCloser).take(1) const cleanupCloser = close.switchMap(x => // start when close emits Observable.timer(cleanupTimeout) // emit once after cleanup time ) .mapTo('cleanupTimeoutMarker') 

Here's a working CodePen sample (please run the tests one at a time)

+2
source share

It is hard to give any advice without knowing how you create “streams” using RxJS or how you want to use them later.

In general, you can achieve what you want only with takeUntil() , switchMap() and timeout() .

 Observable.defer(...) .startWith(undefined) // Trigger the first `timeout` .switchMap((val, i) => { if (i === 0) { // waiting for the first value return Observable.of().timeout(startDelay); } else { return Observable.of(val).timeout(middleDelay); } }) .takeUntil(Observable.timer(maxChannelTime)); 

I don’t know what you mean by “closing the thread at some point”. Do you expect error or complete notification? This solution will throw an error when the timeout expires, and complete if takeUntil emits.

+1
source share

In the end, this is what I did. My answer is mainly based on the answer of Richard Matsen, so I leave his answer as accepted.

There were several additional changes that I needed to make.

This code is a code that receives a data message flow and returns a singleton observable containing all the data collected and the reason for the termination.

 let startCloser$ = Observable.timer(this.options.maxStartDelay).takeUntil(dataStream$).mapTo(TerminationReason.StartTimeout); let intervalCloser$ = dataStream$.switchMap(x => Observable.timer(this.options.timeBetweenPackets).mapTo(TerminationReason.Inactivity)); let maxTimeCloser$ = Observable.timer(this.options.totalConnectionTime).takeUntil(startCloser$).takeUntil(intervalCloser$).mapTo(TerminationReason.ChannelTimeout); //we need to publishReplay it so we can get the reason afterwards... let close$ = startCloser$.merge(intervalCloser$, maxTimeCloser$).take(1).publishReplay(1); //basically treating close$ like a promise close$.connect(); //cleanupAction has side-effects so it must only be subscribed to once. let cleanupAction$ = Observable.defer(async () => { //it just a promise that yields nothing and waits until requestTermination has terminated //requestTermination is an async function and it already has a timeout thing in promise-language await this.requestTermination(); }); let result$ = dataStream$.takeUntil(close$).concat(dataStream$.takeUntil(cleanupAction$)).toArray().switchMap(arrs => { //switchMap will only resolve once because the observable is a singleton return close$.map(reason => { //this should fire immediately because close is publishReplay(1) and has already happened let totalArr = _.flattenDeep(arrs); return { reason : reason, data : totalArr } }) }); return result$; 
0
source share

All Articles