MissingBackpressureException exception exceptions

Starting with C #, when I used RX and there was back pressure, the elements were constantly added to the internal queues until the application ran out of memory (as far as I remember).

In ReactiveX (RXJava), it looks like they take a different position, throwing exceptions when the backpressure begins to build.

This means that I should use something like onBackpressureBuffer() and in the subscribe() call pass to Subscriber<? super T> Subscriber<? super T> , which makes requests upstream to release pressure.

Maybe because I use the RX.NET approach to this, but it seems to me that it seems to me.

Firstly, did I understand this correctly?

Secondly, is there anyway the ability to "disable" this function, so that it behaves the same as RX.NET, since I do not want to complicate my call to subscribe() , checking if I implemented one of these backpressure operators to see if I need to call request() or not.

+7
java rx-java
source share
3 answers

In scala (I don't know Java syntax, but method calls will be the same), you just need to rotate

fastHotObservable.subscribe(next => slowFunction(next))

in

fastHotObservable.onBackpressureBuffer.subscribe(next => slowFunction(next))

That should do it. Of course, when it starts, there should be some periods of inactivity, so the process has time to catch up and process the buffered elements.

I do not think this is mental, I am pleased that you can choose a strategy to handle the raw backpressure yourself, and not to be forced to choose one for you. I also prefer to specify it explicitly.

In fact, the RX.net strategy is not always the best. Recently, I have used several onBackpressureDrop calls to just forget about mouse movements. I did not have time to process, and I am glad that I can avoid buffering them so easily.

+8
source share

A backpression exception occurs if the Observable source does not support backpressure, some of the time-related operators have this behavior, and many of the Observables are implemented for versions prior to 0.20. On the other hand, subscribers by default work in unlimited mode (they request Long.MAX_VALUE and no longer require a request). Most operators have a quick path for this case or simply do not interfere with back pressure.

The most common source of exceptions are observeOn and merge operators. You will need to override them. ObserveOn can be switched to an unlimited queue, and merging can completely skip using queues. Here is an example implementation of these two operators.

+5
source share

At a time when the right approach will be to use a good backpressure strategy like onBackPressureDrop . However, remember that this is a mistake.

When we turned to RxJava2, we realized that approximately 100% of our Observables just wanted to use onBackPressureBuffer . Given this, we increased the size of rx.ring-buffer.size , prevented all crashes on the back and continued our life.

https://eng.uber.com/rxjava-backpressure/

0
source share

All Articles