How do I handle this use case with EventMachine?

I have an application that responds to messages sent by clients. One reload_credentials message that the application receives at any time when a new client is registered. This message will then connect to the PostgreSQL database, execute a query for all credentials, and then store them in a regular Ruby hash (client_id => client_token).

Some other messages that the application may receive are start , stop , pause , which are used to track some sessions. I believe the application works as follows:

  • the client sends a message
  • the message is queued
  • queue is being processed

However, for example, I do not want to block the reactor. Also, suppose I have a reload_credentials message next in line. I do not want any other message from the queue to be processed until the credentials are reloaded from the database. Also, while I am processing a specific message (for example, waiting for the credential request to complete), I want to allow other messages in the queue.

Could you advise me to solve this problem? I think I might have to use em-synchrony , but I'm not sure.

+8
ruby eventmachine
source share
2 answers

Use one of the Postgresql EM or EM.defer drivers so that you do not block the reactor.

When you receive the message 'reload_credentials', simply flip the flag, which causes all subsequent messages to be in the queue. Once the "reload_credentials" is finished, process all messages from the queue. After the queue is empty, flip the flag, which causes messages to be processed as they are received.

The following are EM drivers for Postgresql: https://github.com/eventmachine/eventmachine/wiki/Protocol-Implementations

 module Server def post_init @queue = [] @loading_credentials = false end def recieve_message(type, data) return @queue << [type, data] if @loading_credentials || !@queue.empty? return process_msg(type, data) unless :reload_credentials == type @loading_credentials = true reload_credentials do @loading_credentials = false process_queue end end def reload_credentials(&when_done) EM.defer( proc { query_and_load_credentials }, when_done ) end def process_queue while (type, data = @queue.shift) process_msg(type, data) end end # lots of other methods end EM.start_server(HOST, PORT, Server) 

If you want all connections to send messages to the queue when a connection receives the message "reload_connections", you will need to coordinate it through eigenclass.

+7
source share

I assume the following: something like your current implementation:

  class Worker def initialize queue @queue = queue dequeue end def dequeue @queue.pop do |item| begin work_on item ensure dequeue end end end def work_on item case item.type when :reload_credentials # magic happens here else # more magic happens here end end end q = EM::Queue.new workers = Array.new(10) { Worker.new q } 

The problem, if I understand you correctly, is that you do not want workers working on new tasks (tasks that were received earlier on the manufacturer’s timeline) than any reload_credentials tasks. The following should serve this (extra words of caution at the end).

  class Worker def initialize queue @queue = queue dequeue end def dequeue @queue.pop do |item| begin work_on item ensure dequeue end end end def work_on item case item.type when :reload_credentials # magic happens here else # more magic happens here end end end class LockingDispatcher def initialize channel, queue @channel = channel @queue = queue @backlog = [] @channel.subscribe method(:dispatch_with_locking) @locked = false end def dispatch_with_locking item if locked? @backlog << item else # You probably want to move the specialization here out into a method or # block that passed into the constructor, to make the lockingdispatcher # more of a generic processor case item.type when :reload_credentials lock deferrable = CredentialReloader.new(item).start deferrable.callback { unlock } deferrable.errback { unlock } else dispatch_without_locking item end end end def dispatch_without_locking item @queue << item end def locked? @locked end def lock @locked = true end def unlock @locked = false bl = @backlog.dup @backlog.clear bl.each { |item| dispatch_with_locking item } end end channel = EM::Channel.new queue = EM::Queue.new dispatcher = LockingDispatcher.new channel, queue workers = Array.new(10) { Worker.new queue } 

So, logging into the first system goes into q , but on this new system it goes into channel . queue is still used to distribute work among workers, but the queue not populated while the update credential operation is in progress. Unfortunately, since I did not take more time, I did not generalize LockingDispatcher so that it was not related to the element type and code for sending the CredentialsReloader . I will leave it to you.

Here you should note that although this is a service that I understand from your initial request, it is usually better to relax this requirement. There are several unresolved issues that essentially cannot be eradicated without changes to this requirement:

  • The system does not wait for tasks to run until credential jobs run.
  • The system will handle credential spikes very poorly - there will be no other elements that can be processed.
  • In the event of an error in the credential codes, the backlog can fill the ram and lead to failure. A simple timeout may be sufficient to avoid catastrophic effects if the code is interrupted and subsequent messages are processed enough to avoid further deadlocks.

Actually, it sounds like you have a concept of a user ID in the system. If you think through your requirements, you probably only need the backlog elements related to the ID of the user whose credentials are in an updated state. This is another problem that is associated with different scheduling. Try a hash of blocked backlogs for these users, with a credential completion callback to merge these backlogs into working or some similar agreements.

Good luck

+4
source share

All Articles