Solving the problem of slow consumption (eventProcessor) in the LMAX Disruptor pattern

When using a deactivator, there may be a consumer (s) that is lagging, and because of this slow consumer, the entire application is affected.

Remember that each producer (publisher) and consumer (EventProcessor) work on one thread each, which can be a solution to the problem of slow consumption?

Can I use multiple threads for one consumer? If not, which alternative is better?

+6
source share
3 answers

Generally speaking, use WorkerPool to allow multiple collaborative workflows to work with a single consumer, which is good if you have tasks that are independent and potentially variable in duration (for example: some short tasks, some longer).

Another option is to simultaneously execute several independent workflows on events, but each worker processes only handlers modulo N (for example, 2 threads and one thread are odd, one thread even processes event identifiers). This works great if you have ongoing processing tasks of duration, and also allows batch processing to work very efficiently.

Another thing to consider is that the consumer can perform โ€œbatch processing,โ€ which is especially useful, for example, in auditing. If your consumer has 10 pending events, and not to write 10 events to the audit log independently, you can collect all 10 events and record them at the same time. In my experience, this more than covers the need to run multiple threads.

+6
source

Try to divide the slow part into another stream (input / output, not O (1) or O (log) calculations, etc.) Or apply some back pressure when overloading the consumer (by assigning or temporarily stopping producers, playing back status codes 503 or 429, etc.): http://mechanical-sympathy.blogspot.com/2012/05/apply-back-pressure-when-overloaded.html

+3
source

Use a set of identical eventHandlers. To avoid more than one Handler event acting on a single event, I use the following approach.

Create a pool of threads the size of the number of cores in the system

Executor executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); // a thread pool to which we can assign tasks 

Then create an array of handlers

  HttpEventHandler [] handlers = new HttpEventHandler[Runtime.getRuntime().availableProcessors()]; for(int i = 0; i<Runtime.getRuntime().availableProcessors();i++){ handlers[i] = new HttpEventHandler(i); } disruptor.handleEventsWith(handlers); 

In EventHandler

 public void onEvent(HttpEvent event, long sequence, boolean endOfBatch) throws InterruptedException { if( sequence % Runtime.getRuntime().availableProcessors()==id){ System.out.println("-----On event Triggered on thread "+Thread.currentThread().getName()+" on sequence "+sequence+" -----"); //your event handler logic } 
0
source

All Articles