Crystal converts idea for thread pool to Fibers / spawn

I find it difficult to grasp the idea of ​​Fibers \ coroutines and implementation in Crystal.

I hope this is the right place to ask about it, I completely agree with the answer "not here" :)

This is my usual way to handle multithreading in Ruby:

threads = [] max_threads = 10 loop do begin threads << Thread.new do helper_method(1,2,3,4) end rescue Exception => e puts "Error Starting thread" end begin threads = threads.select { |t| t.alive? ? true : (t.join; false) } while threads.size >= max_threads puts 'Got Maximum threads' sleep 1 threads = threads.select { |t| t.alive? ? true : (t.join; false) } end rescue Exception => e puts e end end 

This way I open a new thread, usually an incoming connection or some other thing, adding Thread to the array of threads, and then check that I have no more threads than I wanted.

What would be a good way to implement something similar in Crystal using spawn \ channels \ fiber, etc.?

+5
source share
2 answers

Something like that:

 require "socket" ch = Channel(TCPSocket).new 10.times do spawn do loop do socket = ch.receive socket.puts "Hi!" socket.close end end end server = TCPServer.new(1234) loop do socket = server.accept ch.send socket end 

This code will pre-display 10 fibers to participate in requests. The channel is not buffered, so connections will not queue if they cannot be present in any fiber.

+13
source

You cannot replicate the way you work with threads. spawn does not return a coroutine object, and there is no path to join coroutines.

However, we can open a channel for communication between coroutines and the pool manager. This manager can work inside its own coroutine or be the main coroutine that will prevent the process from exiting.

Here is a working example with the worker(&block) method, which will spawn a coroutine and open the channel to return its status (it failed or was completed) and the pool(&block) method, which will contain a pool of such workers and read from the result channels to find out the status of coroutines and keep spawning new.

 def worker(&block) result = UnbufferedChannel(Exception?).new ::spawn do begin block.call rescue ex result.send(ex) else result.send(nil) end end result end def pool(size, &block) counter = 0 results = [] of UnbufferedChannel(Exception?) loop do while counter < size counter += 1 puts "spawning worker" results << worker(&block) end result = Channel.select(results) counter -= 1 results.delete(result) if ex = result.receive puts "ERROR: #{ex.message}" else puts "worker terminated" end end end pool(5) do loop { helper_method(1, 2, 3, 4) } end 
+5
source

All Articles