I am looking for a topic (or something similar) that might:
- Can receive items and hold them in a queue or buffer if there are no subscribers
- Once we have a subscriber, all items will be consumed and will never be released again.
- I can subscribe / unsubscribe to / from the topic
BehaviorSubject almost do the job, but save the last observed item.
UPDATE
Based on the accepted answer, I developed a similar solution for one observed element. Also, the unsubscribe part has been added to avoid memory leaks.
class LastEventObservable private constructor( private val onSubscribe: OnSubscribe<Any>, private val state: State ) : Observable<Any>(onSubscribe) { fun emit(value: Any) { if (state.subscriber.hasObservers()) { state.subscriber.onNext(value) } else { state.lastItem = value } } companion object { fun create(): LastEventObservable { val state = State() val onSubscribe = OnSubscribe<Any> { subscriber -> just(state.lastItem) .filter { it != null } .doOnNext { subscriber.onNext(it) } .doOnCompleted { state.lastItem = null } .subscribe() val subscription = state.subscriber.subscribe(subscriber) subscriber.add(Subscriptions.create { subscription.unsubscribe() }) } return LastEventObservable(onSubscribe, state) } } private class State { var lastItem: Any? = null val subscriber = PublishSubject.create<Any>() } }
rx-java
Martynas jurkus
source share