How to connect a socket to asyncio?

I would like to create two protocols (TcpClient and UdpServer) with asyncio on app.py, where TcpClient will have a permanent connection to server.py and UdpServer serving as a UDP server:

What I need:
a) so that both protocols communicate: call each other's method. This only works the first time you connect. If TcpClient connects again, it will not be able to send the string "send to tcp". starting with UdpServer. I check with print(self) , and TcpClient creates a new instance, and the old one still exists, but without a connection, but I don't know how to do it. I think I am using asynchio incorrectly.
b) When TcpClient disconnects from server.py, wait 5 seconds and try connecting again and so on. I tried to do this with call_later() asyncio, but I think there is a native way to do this, not artificial. c) When I run app.py and if TcpClient cannot connect, I would like to try connecting again after 5 seconds and so on. I do not know how to do that.

Here are my app.py server.py test examples. The .py server is only for testing - it will be a different language.

Just to say that I tried:
1) When I run app.py and server.py does not work, app.py do not try again.
2) When the app.py application is connected to the .py server and the server does not work and quickly, TcpClient reconnects, but I can not connect other methods to the new instance and send the string "send to tcp". on server.py, just the old one where there is no more connection.
3) If I use asyncio.async() instead of run_until_complete() , I cannot call methods from each other's protocols.

Here I put app.py and server.py so you can just copy and run the tests.

I use ncat localhost 9000 -u -v to send the string "send to tcp." This line should be printed in the UdpServer class and passed to the send_data_to_tcp method in the TcpClient class, and this method will be sent to server.py. <- This does not work after the first reconnection of tcpClient.

I am using Python 3.4.0.

Thank you very much.

app.py:

 import asyncio #TCP client class TcpClient(asyncio.Protocol): message = 'Testing' def connection_made(self, transport): self.transport = transport self.transport.write(self.message.encode()) print('data sent: {}'.format(self.message)) server_udp[1].tcp_client_connected() def data_received(self, data): self.data = format(data.decode()) print('data received: {}'.format(data.decode())) if self.data == 'Testing': server_udp[1].send_data_to_udp(self.data) def send_data_to_tcp(self, data): self.transport.write(data.encode()) def connection_lost(self, exc): msg = 'Connection lost with the server...' info = self.transport.get_extra_info('peername') server_udp[1].tcp_client_disconnected(msg, info) #UDP Server class UdpServer(asyncio.DatagramProtocol): CLIENT_TCP_TIMEOUT = 5.0 def __init__(self): self.client_tcp_timeout = None def connection_made(self, transport): print('start', transport) self.transport = transport def datagram_received(self, data, addr): self.data = data.strip() self.data = self.data.decode() print('Data received:', self.data, addr) if self.data == 'send to tcp.': client_tcp[1].send_data_to_tcp(self.data) def connection_lost(self, exc): print('stop', exc) def send_data_to_udp(self, data): print('Receiving on UDPServer Class: ', (data)) def connect_client_tcp(self): coro = loop.create_connection(TcpClient, 'localhost', 8000) #client_tcp = loop.run_until_complete(coro) client_tcp = asyncio.async(coro) def tcp_client_disconnected(self, data, info): print(data) self.client_tcp_info = info self.client_tcp_timeout = asyncio.get_event_loop().call_later(self.CLIENT_TCP_TIMEOUT, self.connect_client_tcp) def tcp_client_connected(self): if self.client_tcp_timeout: self.client_tcp_timeout.cancel() print('call_later cancel.') loop = asyncio.get_event_loop() #UDP Server coro = loop.create_datagram_endpoint(UdpServer, local_addr=('localhost', 9000)) #server_udp = asyncio.Task(coro) server_udp = loop.run_until_complete(coro) #TCP client coro = loop.create_connection(TcpClient, 'localhost', 8000) #client_tcp = asyncio.async(coro) client_tcp = loop.run_until_complete(coro) loop.run_forever() 

server.py:

 import asyncio class EchoServer(asyncio.Protocol): def connection_made(self, transport): peername = transport.get_extra_info('peername') print('connection from {}'.format(peername)) self.transport = transport def data_received(self, data): print('data received: {}'.format(data.decode())) self.transport.write(data) # close the socket #self.transport.close() #def connection_lost(self): # print('server closed the connection') loop = asyncio.get_event_loop() coro = loop.create_server(EchoServer, 'localhost', 8000) server = loop.run_until_complete(coro) print(server) print(dir(server)) print(dir(server.sockets)) print('serving on {}'.format(server.sockets[0].getsockname())) try: loop.run_forever() except KeyboardInterrupt: print("exit") finally: server.close() loop.close() 
+7
python client-server sockets python-asyncio
source share
1 answer

You really need only minor corrections. First, I wrote a coroutine to handle connection attempts:

 @asyncio.coroutine def do_connect(): global tcp_server # Make sure we use the global tcp_server while True: try: tcp_server = yield from loop.create_connection(TcpClient, 'localhost', 8000) except OSError: print("Server not up retrying in 5 seconds...") yield from asyncio.sleep(5) else: break 

Then we use this to start all:

 loop = asyncio.get_event_loop() #UDP Server coro = loop.create_datagram_endpoint(UdpServer, local_addr=('localhost', 9000)) server_udp = loop.run_until_complete(coro) #TCP client loop.run_until_complete(do_connect()) loop.run_forever() 

The next part handles the server, which returns / returns after running app.py. We need to fix tcp_client_disconnected and connect_client_tcp for proper processing:

 def connect_client_tcp(self): global client_tcp task = asyncio.async(do_connect()) def cb(result): client_tcp = result task.add_done_callback(cb) def tcp_client_disconnected(self, data, info): print(data) self.client_tcp_info = info self.client_tcp_timeout = loop.call_later(self.CLIENT_TCP_TIMEOUT, self.connect_client_tcp) 

An interesting snippet of connect_client_tcp . You had two problems with your original version:

  • You directly assigned client_tcp result of asyncio.async(coro) , which means that client_tcp was assigned asyncio.Task . That was not what you wanted; you need client_tcp to assign the result of the completed asyncio.Task . We achieve this by using task.add_done_callback to assign client_tcp to the result of the Task after it is completed.

  • You forgot the global client_tcp at the top of the method. Without this, you simply created a local variable called client_tcp that was thrown at the end of connect_client_tcp .

Once these problems are fixed, I can run app.py , start / stop serv.py whenever I want, but I always see all messages delivered from ncat to serv.py when all three components are started together.

Here's the full app.py , for easy copy / paste:

 import asyncio #TCP client class TcpClient(asyncio.Protocol): message = 'Testing' def connection_made(self, transport): self.transport = transport self.transport.write(self.message.encode()) print('data sent: {}'.format(self.message)) server_udp[1].tcp_client_connected() def data_received(self, data): self.data = format(data.decode()) print('data received: {}'.format(data.decode())) if self.data == 'Testing': server_udp[1].send_data_to_udp(self.data) def send_data_to_tcp(self, data): self.transport.write(data.encode()) def connection_lost(self, exc): msg = 'Connection lost with the server...' info = self.transport.get_extra_info('peername') server_udp[1].tcp_client_disconnected(msg, info) #UDP Server class UdpServer(asyncio.DatagramProtocol): CLIENT_TCP_TIMEOUT = 5.0 def __init__(self): self.client_tcp_timeout = None def connection_made(self, transport): print('start', transport) self.transport = transport def datagram_received(self, data, addr): self.data = data.strip() self.data = self.data.decode() print('Data received:', self.data, addr) if self.data == 'send to tcp.': client_tcp[1].send_data_to_tcp(self.data) def connection_lost(self, exc): print('stop', exc) def send_data_to_udp(self, data): print('Receiving on UDPServer Class: ', (data)) def connect_client_tcp(self): global client_tcp coro = loop.create_connection(TcpClient, 'localhost', 8000) task = asyncio.async(do_connect()) def cb(result): client_tcp = result task.add_done_callback(cb) def tcp_client_disconnected(self, data, info): print(data) self.client_tcp_info = info self.client_tcp_timeout = loop.call_later(self.CLIENT_TCP_TIMEOUT, self.connect_client_tcp) def tcp_client_connected(self): if self.client_tcp_timeout: self.client_tcp_timeout.cancel() print('call_later cancel.') @asyncio.coroutine def do_connect(): global client_tcp while True: try: client_tcp = yield from loop.create_connection(TcpClient, 'localhost', 8000) except OSError: print("Server not up retrying in 5 seconds...") yield from asyncio.sleep(1) else: break loop = asyncio.get_event_loop() #UDP Server coro = loop.create_datagram_endpoint(UdpServer, local_addr=('localhost', 9000)) server_udp = loop.run_until_complete(coro) #TCP client loop.run_until_complete(do_connect()) loop.run_forever() 
+9
source share

All Articles