WebSockets proxy for IPython laptop using Flask and WebSocket-for-Python (ws4py)

Inspired by ipython-notebook-proxy and based on ipydra , and extending the latter to support more complex user authentication, as well as a proxy server, since in my use case only port 80 can be opened.

I use flask-sockets for working gunicorn , but I am having problems with proxies. IPython uses three different WebSockets connections, /shell , /stdin and /iopub , but I can only get 101 Switching Protocols for the first two. And /stdin gets Connection Close Frame right after creation.

This is the excerpt code in question:

 # Flask imports... from werkzeug import LocalProxy from ws4py.client.geventclient import WebSocketClient # I use my own LocalProxy because flask-sockets does not support Werkzeug Rules websocket = LocalProxy(lambda: request.environ.get('wsgi.websocket', None)) websockets = {} PROXY_DOMAIN = "127.0.0.1:8888" # IPython host and port methods = ["GET", "POST", "PUT", "DELETE", "HEAD", "OPTIONS", "PATCH", "CONNECT"] @app.route('/', defaults={'url': ''}, methods=methods) @app.route('/<path:url>', methods=methods) def proxy(url): with app.test_request_context(): if websocket: while True: data = websocket.receive() websocket_url = 'ws://{}/{}'.format(PROXY_DOMAIN, url) if websocket_url not in websockets: client = WebSocketClient(websocket_url, protocols=['http-only', 'chat']) websockets[websocket_url] = client else: client = websockets[websocket_url] client.connect() if data: client.send(data) client_data = client.receive() if client_data: websocket.send(client_data) return Response() 

I also tried to create my own WebSocket proxy class, but it does not work either.

 class WebSocketProxy(WebSocketClient): def __init__(self, to, *args, **kwargs): self.to = to print(("Proxy to", self.to)) super(WebSocketProxy, self).__init__(*args, **kwargs) def opened(self): m = self.to.receive() print("<= %d %s" % (len(m), str(m))) self.send(m) def closed(self, code, reason): print(("Closed down", code, reason)) def received_message(self, m): print("=> %d %s" % (len(m), str(m))) self.to.send(m) 

The normal request-response loop works like a charm, so I deleted this code. If you are interested, the full code is hosted in hidra .

I start the server using

 $ gunicorn -k flask_sockets.worker hidra:app 
+7
flask proxy websocket ipython-notebook ws4py
source share
1 answer

Here is my solution (ish). It is rude, but should serve as a starting point for creating a websocket proxy. The full code is available in an unreleased project, pyramid_notebook .

  • This uses ws4py and uWSGI instead of gunicorn

  • We use the internal uWSGI mechanism to receive a websocket downstream message flow. There is nothing like WSGI for websites in the Python world (yet?), But it looks like every web server implements its own mechanism.

  • A custom ws4py ProxyConnection is created that can combine a ws4py event loop with a uWSGI event loop

  • The thing starts and messages start flying

  • This uses a Pyramid request (based on WebOb), but it really doesn't matter, and the code should be good for any Python WSGI application with a few changes

  • As you can see, this does not take advantage of asynchrony, but just sleep () if nothing comes from the socket

The code goes here:

 """UWSGI websocket proxy.""" from urllib.parse import urlparse, urlunparse import logging import time import uwsgi from ws4py import WS_VERSION from ws4py.client import WebSocketBaseClient #: HTTP headers we need to proxy to upstream websocket server when the Connect: upgrade is performed CAPTURE_CONNECT_HEADERS = ["sec-websocket-extensions", "sec-websocket-key", "origin"] logger = logging.getLogger(__name__) class ProxyClient(WebSocketBaseClient): """Proxy between upstream WebSocket server and downstream UWSGI.""" @property def handshake_headers(self): """ List of headers appropriate for the upgrade handshake. """ headers = [ ('Host', self.host), ('Connection', 'Upgrade'), ('Upgrade', 'websocket'), ('Sec-WebSocket-Key', self.key.decode('utf-8')), # Origin is proxyed from the downstream server, don't set it twice # ('Origin', self.url), ('Sec-WebSocket-Version', str(max(WS_VERSION))) ] if self.protocols: headers.append(('Sec-WebSocket-Protocol', ','.join(self.protocols))) if self.extra_headers: headers.extend(self.extra_headers) logger.info("Handshake headers: %s", headers) return headers def received_message(self, m): """Push upstream messages to downstream.""" # TODO: No support for binary messages m = str(m) logger.debug("Incoming upstream WS: %s", m) uwsgi.websocket_send(m) logger.debug("Send ok") def handshake_ok(self): """ Called when the upgrade handshake has completed successfully. Starts the client thread. """ self.run() def terminate(self): raise RuntimeError("NO!") super(ProxyClient, self).terminate() def run(self): """Combine async uwsgi message loop with ws4py message loop. TODO: This could do some serious optimizations and behave asynchronously correct instead of just sleep(). """ self.sock.setblocking(False) try: while not self.terminated: logger.debug("Doing nothing") time.sleep(0.050) logger.debug("Asking for downstream msg") msg = uwsgi.websocket_recv_nb() if msg: logger.debug("Incoming downstream WS: %s", msg) self.send(msg) s = self.stream self.opened() logger.debug("Asking for upstream msg") try: bytes = self.sock.recv(self.reading_buffer_size) if bytes: self.process(bytes) except BlockingIOError: pass except Exception as e: logger.exception(e) finally: logger.info("Terminating WS proxy loop") self.terminate() def serve_websocket(request, port): """Start UWSGI websocket loop and proxy.""" env = request.environ # Send HTTP response 101 Switch Protocol downstream uwsgi.websocket_handshake(env['HTTP_SEC_WEBSOCKET_KEY'], env.get('HTTP_ORIGIN', '')) # Map the websocket URL to the upstream localhost:4000x Notebook instance parts = urlparse(request.url) parts = parts._replace(scheme="ws", netloc="localhost:{}".format(port)) url = urlunparse(parts) # Proxy initial connection headers headers = [(header, value) for header, value in request.headers.items() if header.lower() in CAPTURE_CONNECT_HEADERS] logger.info("Connecting to upstream websockets: %s, headers: %s", url, headers) ws = ProxyClient(url, headers=headers) ws.connect() # Happens only if exceptions fly around return "" 
+2
source share

All Articles