Background template A swirling server that populates the incoming message queue and empties the outgoing message queue?

I would like to do something like this:

twistedServer.start() # This would be a nonblocking call while True: while twistedServer.haveMessage(): message = twistedServer.getMessage() response = handleMessage(message) twistedServer.sendResponse(response) doSomeOtherLogic() 

The key value I want to do is to start the server in the background thread. I hope to do this with a thread instead of multiprocessing / queuing, because I already have one messaging layer for my application, and I would like to avoid two. I talk about this because I already see how to do this in a separate process, but I would like to know how to do this in a thread, or, if possible. Or maybe there is another model that I can use that does the same thing, for example, maybe write my own reactor method. Thanks for any help. :)

+4
source share
1 answer

The key thing I want to do is to start the server in the background thread.

You do not explain why this is the key. In general, things like "use threads" are implementation details. Perhaps the flows are suitable, perhaps not, but the actual goal is agnostic in this matter. What is your goal? Handle multiple clients at the same time? To process messages of this type simultaneously with events from another source (for example, a web server)? Without knowing the ultimate goal, there is no way to know whether the implementation strategy that I propose will work or not.

With this in mind, there are two possibilities.

First, you can forget about threads. This will entail the definition of the above event processing logic as only parts of the event processing. The part that is trying to receive the event will be delegated to another part of the application, probably something will ultimately be based on one of the reactor APIs (for example, you can configure a TCP server that receives messages and turns them into events, processing, and in this case you will start by calling some sort of .listenTCP reactor).

Thus, your example may turn into something similar (with some specifics added to try to increase the instructive value):

 from twisted.internet import reactor class MessageReverser(object): """ Accept messages, reverse them, and send them onwards. """ def __init__(self, server): self.server = server def messageReceived(self, message): """ Callback invoked whenever a message is received. This implementation will reverse and re-send the message. """ self.server.sendMessage(message[::-1]) doSomeOtherLogic() def main(): twistedServer = ... twistedServer.start(MessageReverser(twistedServer)) reactor.run() main() 

A few notes about this example:

  • I'm not sure how your twistedServer . I imagine that he is somehow interacting with the network. Your version of the code would force it to receive messages and buffer them until they are removed from the buffer by your loop for processing. This version will probably not have any buffer, but instead just call the messageReceived method of the object passed to start as soon as the message arrives. You can still add buffering if you want by entering it into the messageReceived method.

  • Now there is a call to reactor.run that will block. Instead, you can write this code as a twistd plugin or .tac file, in which case you will not be directly responsible for starting the reactor. However, someone must start the reactor, or most Twisted APIs will do nothing. reactor.run blocks, of course, until someone calls reactor.stop .

  • There are no threads in this approach. A twisted, collaborative multi-tasking approach to concurrency means that you can still do several things at once if you remember collaboration (which usually means returning to the reactor from time to time).

  • The exact times when the doSomeOtherLogic function is doSomeOtherLogic change slightly, because there is no concept that the "buffer is empty at the moment" separately from "I just processed the message." You can change this so that the function is installed once per second or after every N messages or something suitable.

The second option is to really use threads. This may look very similar to the previous example, but you would name reactor.run in a different thread, and not in the main thread. For instance,

 from Queue import Queue from threading import Thread class MessageQueuer(object): def __init__(self, queue): self.queue = queue def messageReceived(self, message): self.queue.put(message) def main(): queue = Queue() twistedServer = ... twistedServer.start(MessageQueuer(queue)) Thread(target=reactor.run, args=(False,)).start() while True: message = queue.get() response = handleMessage(message) reactor.callFromThread(twistedServer.sendResponse, response) main() 

This version assumes twistedServer , which works similarly, but uses a stream so you can have a while True: . Note:

  • You should call reactor.run(False) if you are using a thread to prevent Twisted from trying to install any signal handlers that Python only allows to be installed on the main thread. This means that Ctrl-C processing will be disabled and reactor.spawnProcess will not work reliably.

  • MessageQueuer has the same interface as MessageReverser , only its implementation of messageReceived is different. It uses the Queue stream network object to communicate between the reactor stream (in which it will be called) and your main stream, where the while True: runs.

  • You must use reactor.callFromThread to send the message back to the reactor stream (it is assumed that twistedServer.sendResponse actually based on the Twisted API). Twisted APIs are generally not thread safe and should be called in the reactor thread. This is what reactor.callFromThread does for you.

  • You want to implement some way to stop the cycle and the reactor, you think. The python process will not exit completely until you name reactor.stop .

Note that while the streaming version gives you the familiar, desired while True , it actually does nothing better than the non-streaming version. This is even more complicated. So, consider if you really need threads, or if this is just an implementation method that can be exchanged for something else.

+10
source

All Articles