Does anyone have experience / tips on testing a Flask content streaming resource? My application uses Redis Pub / Sub, when it receives a message in the channel, it sends the text 'data: {"value": 42}' Implementation is performed in the following Flask documents at: docs , and my unit tests are also done on Flask documents. Messages in Redis pub / sub are sent by a second resource (POST).
I create a stream to listen to the stream while I post in the main application on a resource that is published to Redis.
Although the connection statements go through (I get OK 200 and mimetype 'text / event-stream'), the data object is empty.
My unit test looks like this:
def test_04_receiving_events(self): headers = [('Content-Type', 'application/json')] data = json.dumps({"value": 42}) headers.append(('Content-Length', len(data))) def get_stream(): rv_stream = self.app.get('stream/data') rv_stream_object = json.loads(rv_stream.data)
Application Resource:
@app.route('/stream/data') def stream(): def generate(): pubsub = db.pubsub() pubsub.subscribe('interesting') for event in pubsub.listen(): if event['type'] == 'message': yield 'data: {"value":%s}\n\n' % event['data'] return Response(stream_with_context(generate()), direct_passthrough=True, mimetype='text/event-stream')
Any pointers or examples on how to test content flow in Flask? Google doesn't seem to help much if Iām not looking for the wrong keywords.
Thank you in advance.
source share