Python Multiprocessing and Network Connections on Windows

I am trying to implement tcp 'echo server'. Simple material:

  • The client sends a message to the server.
  • Server receives message
  • The server converts the message to uppercase.
  • The server sends the modified message to the client
  • The client prints the response.

This worked, so I decided to parallelize the server; make it able to handle multiple clients at once. Since most Python interpreters have a GIL, multithreading will not reduce it. I had to use multiprocesses ... And boy, this is where things went down.

I am using Windows 10 x64 and a WinPython costume with Python 3.5.2 x64.

My idea is to create a socket, initialize it (connect and listen), create subprocesses and pass the socket to the children. But for the sake of love ... I can not do this work, my subprocesses die almost instantly. At first, I had problems with the "swinging" of the socket ... Therefore, I worked a little with the search and thought that this was a problem. So I tried passing my socket through the multiprocessing queue through the pipe, and my last attempt was forkpickling and passed it as a byte object at the time the processing was created. Nothing works.

Can someone shed some light here? Tell me what's wrong? Perhaps the whole idea (socket separation) is bad ... And if so, PLEASE tell me how I can achieve my original goal: to allow my server to ACTIVELY process several clients at once (on Windows) (don’t tell me about streaming, we are all know that python threads don't cut it.

It is also worth noting that no files are created by the debug function. I believe that no process was long enough to start it.

Typical output of my server code (only the difference between the runs is the process numbers):

Server is running... Degree of parallelism: 4 Socket created. Socket bount to: ('', 0) Process 3604 is alive: True Process 5188 is alive: True Process 6800 is alive: True Process 2844 is alive: True Press ctrl+c to kill all processes. Process 3604 is alive: False Process 3604 exit code: 1 Process 5188 is alive: False Process 5188 exit code: 1 Process 6800 is alive: False Process 6800 exit code: 1 Process 2844 is alive: False Process 2844 exit code: 1 The children died... Why god? WHYYyyyyy!!?!?!? 

Server Code:

 # Imports import socket import packet import sys import os from time import sleep import multiprocessing as mp import pickle import io # Constants DEGREE_OF_PARALLELISM = 4 DEFAULT_HOST = "" DEFAULT_PORT = 0 def _parse_cmd_line_args(): arguments = sys.argv if len(arguments) == 1: return DEFAULT_HOST, DEFAULT_PORT else: raise NotImplemented() def debug(data): pid = os.getpid() with open('C:\\Users\\Trauer\\Desktop\\debug\\'+str(pid)+'.txt', mode='a', encoding='utf8') as file: file.write(str(data) + '\n') def handle_connection(client): client_data = client.recv(packet.MAX_PACKET_SIZE_BYTES) debug('received data from client: ' + str(len(client_data))) response = client_data.upper() client.send(response) debug('sent data from client: ' + str(response)) def listen(picklez): debug('started listen function') pid = os.getpid() server_socket = pickle.loads(picklez) debug('acquired socket') while True: debug('Sub process {0} is waiting for connection...'.format(str(pid))) client, address = server_socket.accept() debug('Sub process {0} accepted connection {1}'.format(str(pid), str(client))) handle_connection(client) client.close() debug('Sub process {0} finished handling connection {1}'. format(str(pid),str(client))) if __name__ == "__main__": # Since most python interpreters have a GIL, multithreading won't cut # it... Oughta bust out some process, yo! host_port = _parse_cmd_line_args() print('Server is running...') print('Degree of parallelism: ' + str(DEGREE_OF_PARALLELISM)) server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) print('Socket created.') server_socket.bind(host_port) server_socket.listen(DEGREE_OF_PARALLELISM) print('Socket bount to: ' + str(host_port)) buffer = io.BytesIO() mp.reduction.ForkingPickler(buffer).dump(server_socket) picklez = buffer.getvalue() children = [] for i in range(DEGREE_OF_PARALLELISM): child_process = mp.Process(target=listen, args=(picklez,)) child_process.daemon = True child_process.start() children.append(child_process) while not child_process.pid: sleep(.25) print('Process {0} is alive: {1}'.format(str(child_process.pid), str(child_process.is_alive()))) print() kids_are_alive = True while kids_are_alive: print('Press ctrl+c to kill all processes.\n') sleep(1) exit_codes = [] for child_process in children: print('Process {0} is alive: {1}'.format(str(child_process.pid), str(child_process.is_alive()))) print('Process {0} exit code: {1}'.format(str(child_process.pid), str(child_process.exitcode))) exit_codes.append(child_process.exitcode) if all(exit_codes): # Why do they die so young? :( print('The children died...') print('Why god?') print('WHYYyyyyy!!?!?!?') kids_are_alive = False 

edit: fixed signature "listen". My processes still die instantly.

edit2: The cmidi user indicated that this code really works on Linux; so my question is: how can I "make this work" on Windows?

+7
python multithreading windows networking sockets
source share
2 answers

You can directly pass the socket to the child process. multiprocessing registers an abbreviation for this, for which the following DupSocket class from multiprocessing.resource_sharer used in a Windows implementation:

 class DupSocket(object): '''Picklable wrapper for a socket.''' def __init__(self, sock): new_sock = sock.dup() def send(conn, pid): share = new_sock.share(pid) conn.send_bytes(share) self._id = _resource_sharer.register(send, new_sock.close) def detach(self): '''Get the socket. This should only be called once.''' with _resource_sharer.get_connection(self._id) as conn: share = conn.recv_bytes() return socket.fromshare(share) 

This calls the Windows socket share method, which returns the protocol information buffer from the WSADuplicateSocket call. It is registered using a distribution resource to send this buffer when connected to a child process. The child in turn calls detach , which receives the protocol information buffer and restores the socket through socket.fromshare .

It is not directly related to your problem, but I recommend that you reverse engineer the server to instead invoke accept in the main process, as is usually done (for example, in the Python socketserver.ForkingTCPServer module). Pass the resulting tuple (conn, address) first available worker on multiprocessing.Queue , which will be used by all workers in the process pool. Or consider multiprocessing.Pool with apply_async .

+3
source share

def listen() target / start for your child processes does not accept any arguments, but you provide a serialized socket as an argument args=(picklez,) for the child process, this can throw an exception in the child process and exit immediately.

 TypeError: listen() takes no arguments (1 given) 

def listen(picklez) should solve the problem, this will provide one argument to the target of your child processes.

0
source share

All Articles