Python Flask Stream Unit Testing

Does anyone have experience / tips on testing a Flask content streaming resource? My application uses Redis Pub / Sub, when it receives a message in the channel, it sends the text 'data: {"value": 42}' Implementation is performed in the following Flask documents at: docs , and my unit tests are also done on Flask documents. Messages in Redis pub / sub are sent by a second resource (POST).

I create a stream to listen to the stream while I post in the main application on a resource that is published to Redis.

Although the connection statements go through (I get OK 200 and mimetype 'text / event-stream'), the data object is empty.

My unit test looks like this:

def test_04_receiving_events(self): headers = [('Content-Type', 'application/json')] data = json.dumps({"value": 42}) headers.append(('Content-Length', len(data))) def get_stream(): rv_stream = self.app.get('stream/data') rv_stream_object = json.loads(rv_stream.data) #error (empty) self.assertEqual(rv_stream.status_code, 200) self.assertEqual(rv_stream.mimetype, 'text/event-stream') self.assertEqual(rv_stream_object, "data: {'value': 42}") t.stop() threads = [] t = Thread(target=get_stream) threads.append(t) t.start() time.sleep(1) rv_post = self.app.post('/sensor', headers=headers, data=data) threads_done = False while not threads_done: threads_done = True for t in threads: if t.is_alive(): threads_done = False time.sleep(1) 

Application Resource:

 @app.route('/stream/data') def stream(): def generate(): pubsub = db.pubsub() pubsub.subscribe('interesting') for event in pubsub.listen(): if event['type'] == 'message': yield 'data: {"value":%s}\n\n' % event['data'] return Response(stream_with_context(generate()), direct_passthrough=True, mimetype='text/event-stream') 

Any pointers or examples on how to test content flow in Flask? Google doesn't seem to help much if I’m not looking for the wrong keywords.

Thank you in advance.

+9
source share
1 answer

You start the stream with get_stream() before sending any data to REDIS.

This means that it will not find any events and will return without streaming data.

I believe that you do not need streams at all, you can simply use POST to configure the data for the integration test, and then call the stream.

 def test_04_receiving_events(self): headers = [('Content-Type', 'application/json')] data = json.dumps({"value": 42}) headers.append(('Content-Length', len(data))) rv_post = self.app.post('/sensor', headers=headers, data=data) rv_stream = self.app.get('stream/data') rv_stream_object = json.loads(rv_stream.data) self.assertEqual(rv_stream.status_code, 200) self.assertEqual(rv_stream.mimetype, 'text/event-stream') self.assertEqual(rv_stream_object, "data: {'value': 42}") 
0
source

All Articles