Queue like Subject in RxJava

I am looking for a topic (or something similar) that might:

  • Can receive items and hold them in a queue or buffer if there are no subscribers
  • Once we have a subscriber, all items will be consumed and will never be released again.
  • I can subscribe / unsubscribe to / from the topic

BehaviorSubject almost do the job, but save the last observed item.

UPDATE

Based on the accepted answer, I developed a similar solution for one observed element. Also, the unsubscribe part has been added to avoid memory leaks.

 class LastEventObservable private constructor( private val onSubscribe: OnSubscribe<Any>, private val state: State ) : Observable<Any>(onSubscribe) { fun emit(value: Any) { if (state.subscriber.hasObservers()) { state.subscriber.onNext(value) } else { state.lastItem = value } } companion object { fun create(): LastEventObservable { val state = State() val onSubscribe = OnSubscribe<Any> { subscriber -> just(state.lastItem) .filter { it != null } .doOnNext { subscriber.onNext(it) } .doOnCompleted { state.lastItem = null } .subscribe() val subscription = state.subscriber.subscribe(subscriber) subscriber.add(Subscriptions.create { subscription.unsubscribe() }) } return LastEventObservable(onSubscribe, state) } } private class State { var lastItem: Any? = null val subscriber = PublishSubject.create<Any>() } } 
+7
rx-java
source share
3 answers

I achieve the expected result by creating a custom Observable that wraps the topic of publication and processes the emission cache if there are no connected subscribers. Check this.

 public class ExampleUnitTest { @Test public void testSample() throws Exception { MyCustomObservable myCustomObservable = new MyCustomObservable(); myCustomObservable.emit("1"); myCustomObservable.emit("2"); myCustomObservable.emit("3"); Subscription subscription = myCustomObservable.subscribe(System.out::println); myCustomObservable.emit("4"); myCustomObservable.emit("5"); subscription.unsubscribe(); myCustomObservable.emit("6"); myCustomObservable.emit("7"); myCustomObservable.emit("8"); myCustomObservable.subscribe(System.out::println); } } class MyCustomObservable extends Observable<String> { private static PublishSubject<String> publishSubject = PublishSubject.create(); private static List<String> valuesCache = new ArrayList<>(); protected MyCustomObservable() { super(subscriber -> { Observable.from(valuesCache) .doOnNext(subscriber::onNext) .doOnCompleted(valuesCache::clear) .subscribe(); publishSubject.subscribe(subscriber); }); } public void emit(String value) { if (publishSubject.hasObservers()) { publishSubject.onNext(value); } else { valuesCache.add(value); } } } 

Hope this helps!

Best wishes.

+7
source share

I had a similar problem, my requirements were:

  • Must support value reproduction when Observer subscriber is not subscribed
  • Must allow only one observer, signed at a time
  • Allows another observer to subscribe when placing the first observer

I implemented it as RxRelay , but the implementation for Subject would be similar:

 public final class CacheRelay<T> extends Relay<T> { private final ConcurrentLinkedQueue<T> queue = new ConcurrentLinkedQueue<>(); private final PublishRelay<T> relay = PublishRelay.create(); private CacheRelay() { } public static <T> CacheRelay<T> create() { return new CacheRelay<>(); } @Override public void accept(T value) { if (relay.hasObservers()) { relay.accept(value); } else { queue.add(value); } } @Override public boolean hasObservers() { return relay.hasObservers(); } @Override protected void subscribeActual(Observer<? super T> observer) { if (hasObservers()) { EmptyDisposable.error(new IllegalStateException("Only a single observer at a time allowed."), observer); } else { for (T element; (element = queue.poll()) != null; ) { observer.onNext(element); } relay.subscribeActual(observer); } } } 

Look at the gist for more

+2
source share

If you only want to wait for one subscriber, use UnicastSubject , but note that if you unsubscribe in the middle, all subsequent queues will be lost.

Edit:

As soon as we have a subscriber, all items will be consumed and will never be thrown away again

For multiple subscribers, use ReplaySubject .

+1
source share

All Articles