How to buffer stream events?

I have a web component that subscribes to a stream.

Since the web component is recreated every time it is displayed, I have to clear the subscriber and repeat it.

Now I add all the subscribers to the list and to removed() the lifecycle method that I do:

 subscriptions.forEach((sub) => sub.cancel()); 

Now, to the problem: when the web component is not displayed, no one is listening to the stream. The problem is that the component lacks data / events when it is not displayed.

I need buffering. Events should be buffered and dispatched immediately after registering the listener. According to the documentation, buffering takes place until a listener is registered:

The controller will buffer all incoming events until the subscriber registers.

This works, but the problem is that the listener at some point is deleted and re-registered, and it seems that this does not cause buffering.

It seems that buffering occurs only initially, and not later, even if all listeners have disappeared.

So the question is: how can I buffer in this situation, when the listeners can leave and return?

+8
dart
source share
1 answer

Note. As a rule, you will not be able to resend a subscription to a Stream that has already been closed. This seems to be a mistake that we forgot to fix.

I am not familiar with web components, but I hope that I address your issue with the following sentence.

One way (and, of course, a lot) is to create a new Stream for each subscriber (for example, html-events do), which pauses the original stream.

Let's say origin is the source stream. Then we implement getter stream , which returns a new stream associated with origin :

Unverified code.

 Stream origin; var _subscription; final _listeners = new Set<StreamController>(); _addListener(controller) { _listeners.add(controller); if (_subscription == null) { _subscription = origin.listen((event) { // When we emit the event we want listeners to be able to unsubscribe // or add new listeners. In order to avoid ConcurrentModificationErrors // we need to make sure that the _listeners set is not modified while // we are iterating over it with forEach. Here we just create a copy with // toList(). // Alternatively (more efficient) we could also queue subscription // modification requests and do them after the forEach. _listeners.toList().forEach((c) => c.add(event)); }); } _subscription.resume(); // Just in case it was paused. } _removeListener(controller) { _listeners.remove(controller); if (_listeners.isEmpty) _subscription.pause(); } Stream get stream { var controller; controller = new StreamController( onListen: () => _addListener(controller), onCancel: () => _removeListener(controller)); return controller.stream; } 

If you need to cancel events immediately, you need to immediately start subscribing and not lazily, as in the code example.

+11
source share

All Articles