I assume the following: something like your current implementation:
class Worker def initialize queue @queue = queue dequeue end def dequeue @queue.pop do |item| begin work_on item ensure dequeue end end end def work_on item case item.type when :reload_credentials # magic happens here else # more magic happens here end end end q = EM::Queue.new workers = Array.new(10) { Worker.new q }
The problem, if I understand you correctly, is that you do not want workers working on new tasks (tasks that were received earlier on the manufacturer’s timeline) than any reload_credentials tasks. The following should serve this (extra words of caution at the end).
class Worker def initialize queue @queue = queue dequeue end def dequeue @queue.pop do |item| begin work_on item ensure dequeue end end end def work_on item case item.type when :reload_credentials # magic happens here else # more magic happens here end end end class LockingDispatcher def initialize channel, queue @channel = channel @queue = queue @backlog = [] @channel.subscribe method(:dispatch_with_locking) @locked = false end def dispatch_with_locking item if locked? @backlog << item else # You probably want to move the specialization here out into a method or # block that passed into the constructor, to make the lockingdispatcher # more of a generic processor case item.type when :reload_credentials lock deferrable = CredentialReloader.new(item).start deferrable.callback { unlock } deferrable.errback { unlock } else dispatch_without_locking item end end end def dispatch_without_locking item @queue << item end def locked? @locked end def lock @locked = true end def unlock @locked = false bl = @backlog.dup @backlog.clear bl.each { |item| dispatch_with_locking item } end end channel = EM::Channel.new queue = EM::Queue.new dispatcher = LockingDispatcher.new channel, queue workers = Array.new(10) { Worker.new queue }
So, logging into the first system goes into q , but on this new system it goes into channel . queue is still used to distribute work among workers, but the queue not populated while the update credential operation is in progress. Unfortunately, since I did not take more time, I did not generalize LockingDispatcher so that it was not related to the element type and code for sending the CredentialsReloader . I will leave it to you.
Here you should note that although this is a service that I understand from your initial request, it is usually better to relax this requirement. There are several unresolved issues that essentially cannot be eradicated without changes to this requirement:
- The system does not wait for tasks to run until credential jobs run.
- The system will handle credential spikes very poorly - there will be no other elements that can be processed.
- In the event of an error in the credential codes, the backlog can fill the ram and lead to failure. A simple timeout may be sufficient to avoid catastrophic effects if the code is interrupted and subsequent messages are processed enough to avoid further deadlocks.
Actually, it sounds like you have a concept of a user ID in the system. If you think through your requirements, you probably only need the backlog elements related to the ID of the user whose credentials are in an updated state. This is another problem that is associated with different scheduling. Try a hash of blocked backlogs for these users, with a credential completion callback to merge these backlogs into working or some similar agreements.
Good luck
raggi
source share