Exception handling in a pool of parallel ruby ​​flows

How to handle exceptions in pools of parallel ruby ​​threads ( http://ruby-concurrency.imtqy.com/concurrent-ruby/file.thread_pools.html )?

Example:

pool = Concurrent::FixedThreadPool.new(5) pool.post do raise 'something goes wrong' end # how to rescue this exception here 

Update:

Here is a simplified version of my code:

 def process pool = Concurrent::FixedThreadPool.new(5) products.each do |product| new_product = generate_new_product pool.post do store_in_db(new_product) # here exception is raised, eg connection to db failed end end pool.shutdown pool.wait_for_terminaton end 

What I want to achieve is to break processing (break loop) in case of any exception.

This exception is also preserved at a higher level of the application, and some cleaning tasks are performed (for example, setting the model state to failure and sending some notifications).

+8
multithreading ruby concurrency concurrent-ruby
source share
3 answers

The following answer from jdantonio is here https://github.com/ruby-concurrency/concurrent-ruby/issues/616

"Most applications should not directly use thread pools. Thread pools are a low-level abstraction intended for internal use. All high-level abstractions in this library (Promise, Actor, etc.) All send jobs to the global thread pool and provide processing exceptions: just select the abstraction that best suits your use case and use it.

If you need to configure your own thread pool and not use a global thread pool, you can still use high-level abstractions. All of them support the option: executor, which allows you to enter your own thread pool. You can then use the exception handling provided by the high-level abstraction.

If you absolutely insist on sending tasks directly to the thread pool, and not using our high-level abstractions (which I categorically reject), just create a task shell. You can find shell examples in all of our high-level abstractions, Rails ActiveJob, Sucker Punch, and other libraries that use our thread pools.

So what about an implementation with Promises? http://ruby-concurrency.imtqy.com/concurrent-ruby/Concurrent/Promise.html In your case, it will look something like this:

 promises = [] products.each do |product| new_product = generate_new_prodcut promises << Concurrent::Promise.execute do store_in_db(new_product) end end # .value will wait for the Thread to finish. # The ! means, that all exceptions will be propagated to the main thread # .zip will make one Promise which contains all other promises. Concurrent::Promise.zip(*promises).value! 
+2
source share

There may be a better way, but it works. You will want to change the error handling in wait_for_pool_to_finish .

 def process pool = Concurrent::FixedThreadPool.new(10) errors = Concurrent::Array.new 10_000.times do pool.post do begin # do the work rescue StandardError => e errors << e end end end wait_for_pool_to_finish(pool, errors) end private def wait_for_pool_to_finish(pool, errors) pool.shutdown until pool.shutdown? if errors.any? pool.kill fail errors.first end sleep 1 end pool.wait_for_termination end 
0
source share

I created problem # 634 . A parallel thread pool can support operation without problems.

 require "concurrent" Concurrent::RubyThreadPoolExecutor.class_eval do # Inspired by "ns_kill_execution". def ns_abort_execution aborted_worker @pool.each do |worker| next if worker == aborted_worker worker.kill end @pool = [aborted_worker] @ready.clear stopped_event.set nil end def abort_worker worker synchronize do ns_abort_execution worker end nil end def join shutdown # We should wait for stopped event. # We couldn't use timeout. stopped_event.wait nil @pool.each do |aborted_worker| # Rubinius could receive an error from aborted thread "join" only. # MRI Ruby doesn't care about "join". # It will receive error anyway. # We can "raise" error in aborted thread and than "join" it from this thread. # We can "join" aborted thread from this thread and than "raise" error in aborted thread. # The order of "raise" and "join" is not important. We will receive target error anyway. aborted_worker.join end @pool.clear nil end class AbortableWorker < self.const_get :Worker def initialize pool super @thread.abort_on_exception = true end def run_task pool, task, args begin task.call *args rescue StandardError => error pool.abort_worker self raise error end pool.worker_task_completed nil end def join @thread.join nil end end self.send :remove_const, :Worker self.const_set :Worker, AbortableWorker end class MyError < StandardError; end pool = Concurrent::FixedThreadPool.new 5 begin pool.post do sleep 1 puts "we shouldn't receive this message" end pool.post do puts "raising my error" raise MyError end pool.join rescue MyError => error puts "received my error, trace: \n#{error.backtrace.join("\n")}" end sleep 2 

Output:

 raising my error received my error, trace: ... 

This patch is great for any version of MRI Ruby and Rubinius. JRuby doesn't work, and I don't care. Please fix the JRuby artist if you want to support it. It should be easy.

0
source share

All Articles