How to provide a delayed AMP response without locking the system?

(I am very open to suggestions for a better name.)

I use the AMP protocol on top of Twisted to create a scheduler that transfers tasks to my agents. The agent sends jobs from the scheduler, so the scheduler is an AMP server, and the agents connect as clients.

The idea is that the agent connects, picks up the task from the upper queue (internal scheduler), and then continues its path of its implementation. However, this queue is not always guaranteed to be nonempty. Thus, I would like to use a twisted deferred mechanic to just have deferred fire on the agent side when the scheduler managed to kick the task out of the queue.

Implementing this on the scheduler’s side is a bit complicated. The way AMP works is to assign a function to each (predefined by me) command that the agent can send, while the function takes all the arguments that the command has and returns a dictionary of all returned values. This means that I need to do all this from one function. This is usually not a problem, but twisted here seems to bother me: I need to pause the function a bit without stopping the twisted-write cycle, which allows it to actually add more jobs to the queue, so one may slip out. (For this reason, I don’t think that normal sleep() will have the desired effect.) More importantly, it means that I cannot think of a way to use some twisted functions, for example. deferToThread() , because I would have to process the results from this (and only have access to them) in a separate function, which I would call a deferred callback, so I would not know what to return to the responder AMP after starting a separate thread and destination his callback. This illustrates what I mean more clearly:

 def assignJob(agentID): # We expect the agentID, so we can store who we've given a job to. # Get a job without blocking even if the queue is originally empty. job = None while job is None: try: job = jobqueue.pop(0) except IndexError: # Imagine getJob simply tries to get a job every 5 seconds # (using sleep() safely because it in a separate thread) # until it eventually gets one, which it returns d = deferToThread(getJob) # We would then need to have a separate function # , eg jobReturn() pick up the firing deferred and do # something with the result... d.addCallback(jobReturn) # But if we do... We don't (necessarily) have a job to return here # because for all we know, the deferred from that thread hasn't even # fired yet. return {'job': ???} 

(This is obviously not complete code for the function - for one, it is a subclass method of amp.AMP as needed.)

The callInThread() reactor method also seems useful at first (since it doesn't return a deferred), but it does not offer a way to get the return value of the callee that it executes (as far as I can see), and even if that were the case, it would mean waiting terminating a thread to block this method for so long, which makes using a separate thread pointless.

So, how do I block this method until I have a task, but not the entire Twisted event loop, or, as an alternative, how to return the AMP response outside the direct response method?

+4
source share
1 answer

One thing that you might have missed is that the AMP response method itself is also allowed to return Pending (searches can also return Pending to AMP API documents ). As long as Deferred finally fires with a dictionary that matches the definition of the command response, everything will work fine.

Also somewhat related, if you want to avoid using threads, you can take a look at twisted.internet.defer.DeferredQueue , the queue data structure that knows about Deferring initially.

+4
source

All Articles