How to create bidirectional messaging using AMP in Twisted / Python

I am trying to create a twisted daemon (part of the server) that communicates with my local code base (client part). Basically, the client should call Remote (), using AMP for the daemon, to start some processing (updating the database) using some methods. After each method completes processing on the server, I need a server to call Remote () for my client so that the user knows about the server’s progress.

I managed to call the server from the client and get a response, but I can not get the server to send a response to the client.

I have a solution to solve the problem, but I can not find an example code that uses AMP for bi-directional communication - it always calls the client on the server.

I am trying to get the client to call the server to start processing (ServerStart AMP command), and then the server will send several callbacks to the client to provide updates during processing (MessageClient AMP command).

Any help would be greatly appreciated. A super simple example that shows how to call a server from a client and then pass the server back two calls to the client will be awesome!

ampclient.py

from client_server import MessageServer, Client, ServerStart from twisted.internet.protocol import ClientCreator from twisted.internet import reactor from twisted.protocols import amp from time import sleep from twisted.internet.protocol import Factory from twisted.internet.endpoints import TCP4ServerEndpoint from twisted.application.service import Application from twisted.application.internet import StreamServerEndpointService def startServerProcess(): def show_start(result): print 'result from server: %r' % result d = ClientCreator(reactor, amp.AMP).connectTCP( '127.0.0.1', 1234).addCallback( lambda p: p.callRemote(ServerStart, truncate=True)).addCallback( show_start) pf = Factory() pf.protocol = Client reactor.listenTCP(1235, pf) print 'client listening' startServerProcess() sleep(4) reactor.run() 

ampserver.py

 from client_server import MessageClient, Server from twisted.internet.protocol import ClientCreator from twisted.internet import reactor from twisted.protocols import amp from time import sleep from twisted.internet.protocol import Factory from twisted.internet.endpoints import TCP4ServerEndpoint from twisted.application.service import Application from twisted.application.internet import StreamServerEndpointService def makeClientCall(): def show_result(result): print 'result from client: %r' % result d = ClientCreator(reactor, amp.AMP).connectTCP( '127.0.0.1', 1235).addCallback( lambda p: p.callRemote(MessageClient)).addCallback( show_result) application = Application("server app") endpoint = TCP4ServerEndpoint(reactor, 1234) factory = Factory() factory.protocol = Server service = StreamServerEndpointService(endpoint, factory) service.setServiceParent(application) sleep(4) makeClientCall() makeClientCall() 

client_server.py

 from twisted.protocols import amp from twisted.internet import reactor from twisted.internet.protocol import Factory from twisted.internet.endpoints import TCP4ServerEndpoint from twisted.application.service import Application from twisted.application.internet import StreamServerEndpointService class MessageServer(amp.Command): response = [('msg', amp.String())] class ServerStart(amp.Command): arguments = [('truncate', amp.Boolean())] response = [('msg', amp.String())] class Server(amp.AMP): def message_it(self): msg = 'This is a message from the server' print 'msg sent to client: %s' % msg return {'msg': msg} MessageServer.responder(message_it) def start_it(self, truncate): msg = 'Starting processing...' return {'msg': msg} ServerStart.responder(start_it) class MessageClient(amp.Command): response = [('msg', amp.String())] class Client(amp.AMP): def message_it(self): msg = 'This is a message from the client' return {'msg': msg} MessageClient.responder(message_it) 
+6
source share
2 answers

Here is a simple example of a bi-directional client and AMP server. The key is that the AMP protocol class contains a link to a client connection and provides a callRemote method.

Of course, I only know this from digging AMP code. At best, there is a flaw in virtualization documentation, at least outside the kernel.

File: count_server.tac

 from twisted.protocols.amp import AMP from twisted.internet import reactor from twisted.internet.protocol import Factory from twisted.internet.endpoints import TCP4ServerEndpoint from twisted.application.service import Application from twisted.application.internet import StreamServerEndpointService from count_client import Counter application = Application('test AMP server') endpoint = TCP4ServerEndpoint(reactor, 8750) factory = Factory() factory.protocol = Counter service = StreamServerEndpointService(endpoint, factory) service.setServiceParent(application) 

File: count_client.py

 if __name__ == '__main__': import count_client raise SystemExit(count_client.main()) from sys import stdout from twisted.python.log import startLogging, err from twisted.protocols import amp from twisted.internet import reactor from twisted.internet.protocol import Factory from twisted.internet.endpoints import TCP4ClientEndpoint class Count(amp.Command): arguments = [('n', amp.Integer())] response = [('ok', amp.Boolean())] class Counter(amp.AMP): @Count.responder def count(self, n): print 'received:', n n += 1 if n < 10: print 'sending:', n self.callRemote(Count, n=n) return {'ok': True} def connect(): endpoint = TCP4ClientEndpoint(reactor, '127.0.0.1', 8750) factory = Factory() factory.protocol = Counter return endpoint.connect(factory) def main(): startLogging(stdout) d = connect() d.addErrback(err, 'connection failed') d.addCallback(lambda p: p.callRemote(Count, n=1)) d.addErrback(err, 'call failed') reactor.run() 

Server output:

 $ twistd -n -y count_server.tac 2013-03-27 11:05:18-0500 [-] Log opened. 2013-03-27 11:05:18-0500 [-] twistd 12.2.0 (/usr/bin/python 2.7.3) starting up. 2013-03-27 11:05:18-0500 [-] reactor class: twisted.internet.epollreactor.EPollReactor. 2013-03-27 11:05:18-0500 [-] Factory starting on 8750 2013-03-27 11:05:18-0500 [-] Starting factory <twisted.internet.protocol.Factory instance at 0x2adc368> 2013-03-27 11:05:22-0500 [twisted.internet.protocol.Factory] Counter connection established (HOST:IPv4Address(TCP, '127.0.0.1', 8750) PEER:IPv4Address(TCP, '127.0.0.1', 58195)) 2013-03-27 11:05:22-0500 [Counter,0,127.0.0.1] received: 1 2013-03-27 11:05:22-0500 [Counter,0,127.0.0.1] sending: 2 2013-03-27 11:05:22-0500 [Counter,0,127.0.0.1] received: 3 2013-03-27 11:05:22-0500 [Counter,0,127.0.0.1] sending: 4 2013-03-27 11:05:22-0500 [Counter,0,127.0.0.1] received: 5 2013-03-27 11:05:22-0500 [Counter,0,127.0.0.1] sending: 6 2013-03-27 11:05:22-0500 [Counter,0,127.0.0.1] received: 7 2013-03-27 11:05:22-0500 [Counter,0,127.0.0.1] sending: 8 2013-03-27 11:05:22-0500 [Counter,0,127.0.0.1] received: 9 2013-03-27 11:05:26-0500 [Counter,0,127.0.0.1] Counter connection lost (HOST:IPv4Address(TCP, '127.0.0.1', 8750) PEER:IPv4Address(TCP, '127.0.0.1', 58195)) ^C2013-03-27 11:05:31-0500 [-] Received SIGINT, shutting down. 2013-03-27 11:05:31-0500 [-] (TCP Port 8750 Closed) 2013-03-27 11:05:31-0500 [-] Stopping factory <twisted.internet.protocol.Factory instance at 0x2adc368> 2013-03-27 11:05:31-0500 [-] Main loop terminated. 2013-03-27 11:05:31-0500 [-] Server Shut Down. 

Client Output:

 $ python count_client.py 2013-03-27 11:05:22-0500 [-] Log opened. 2013-03-27 11:05:22-0500 [-] Starting factory <twisted.internet.protocol.Factory instance at 0x246bf80> 2013-03-27 11:05:22-0500 [Uninitialized] Counter connection established (HOST:IPv4Address(TCP, '127.0.0.1', 58195) PEER:IPv4Address(TCP, '127.0.0.1', 8750)) 2013-03-27 11:05:22-0500 [Counter,client] received: 2 2013-03-27 11:05:22-0500 [Counter,client] sending: 3 2013-03-27 11:05:22-0500 [Counter,client] received: 4 2013-03-27 11:05:22-0500 [Counter,client] sending: 5 2013-03-27 11:05:22-0500 [Counter,client] received: 6 2013-03-27 11:05:22-0500 [Counter,client] sending: 7 2013-03-27 11:05:22-0500 [Counter,client] received: 8 2013-03-27 11:05:22-0500 [Counter,client] sending: 9 ^C2013-03-27 11:05:26-0500 [-] Received SIGINT, shutting down. 2013-03-27 11:05:26-0500 [Counter,client] Counter connection lost (HOST:IPv4Address(TCP, '127.0.0.1', 58195) PEER:IPv4Address(TCP, '127.0.0.1', 8750)) 2013-03-27 11:05:26-0500 [Counter,client] Stopping factory <twisted.internet.protocol.Factory instance at 0x246bf80> 2013-03-27 11:05:26-0500 [-] Main loop terminated. 
+4
source

Ryan P's previous answer is poor. In particular, he never used the AMP answer, preferring instead to transfer callRemote calls everywhere. Here is my answer based on examples of ampserver.py (unchanged) and ampclient.py (rewritten) in twisted. This answers the basic question of bidirectional messaging in a way that fits the question (although not quite the description).

Short summary, add a callback to the deferred one that you will get from callRemote, which will have the answer in its argument. This is a normal dictionary, so you can do anything with it.

File: ampserver.py

 from twisted.protocols import amp class Sum(amp.Command): arguments = [('a', amp.Integer()), ('b', amp.Integer())] response = [('total', amp.Integer())] class Divide(amp.Command): arguments = [('numerator', amp.Integer()), ('denominator', amp.Integer())] response = [('result', amp.Float())] errors = {ZeroDivisionError: 'ZERO_DIVISION'} class Math(amp.AMP): def sum(self, a, b): total = a + b print 'Did a sum: %d + %d = %d' % (a, b, total) return {'total': total} Sum.responder(sum) def divide(self, numerator, denominator): result = float(numerator) / denominator print 'Divided: %d / %d = %f' % (numerator, denominator, result) return {'result': result} Divide.responder(divide) def main(): from twisted.internet import reactor from twisted.internet.protocol import Factory pf = Factory() pf.protocol = Math reactor.listenTCP(1234, pf) print 'started' reactor.run() if __name__ == '__main__': main() 

File: ampclient.py

 from twisted.internet import reactor, protocol from twisted.internet.task import deferLater from twisted.protocols import amp from ampserver import Sum, Divide connection = None class MathClient(amp.AMP): def connectionMade(self): global connection connection = self class MathFactory(protocol.ReconnectingClientFactory): protocol = MathClient if __name__ == '__main__': reactor.connectTCP('127.0.0.1', 1234, MathFactory()) def simpleSum(): global connection d = connection.callRemote(Sum, a=1, b=5) def prin(result): print(result) d.addCallback(prin) return d deferLater(reactor, 1, simpleSum) deferLater(reactor, 3, simpleSum) deferLater(reactor, 6, simpleSum) deferLater(reactor, 9, simpleSum) deferLater(reactor, 12, simpleSum) deferLater(reactor, 15, simpleSum) deferLater(reactor, 18, simpleSum).addCallback(lambda _: reactor.stop()) reactor.run() 
+1
source

All Articles