Asyncio decode utf-8 with StreamReader

I'm used to asyncio and I find that handling the task is pretty nice, but it can be difficult to mix asynchronous file libraries with traditional io libraries. The problem I'm currently facing is how to decode async StreamReader correctly.

The simplest solution is to read() snippets of byte strings, and then decode each snippet - see the code below. (In my program, I did not print each fragment, but decoded it into a string and sent it to another method for processing):

 import asyncio import aiohttp async def get_data(port): url = 'http://localhost:{}/'.format(port) r = await aiohttp.get(url) stream = r.content while not stream.at_eof(): data = await stream.read(4) print(data.decode('utf-8')) 

This works fine until the utf-8 character is found, which will be split between pieces. For example, if the answer is b'M\xc3\xa4dchen mit Bi\xc3\x9f\n' , then reading fragments from 3 will work, but fragments from 4 will not (since \xc3 and \x9f are in different fragments and decoding a fragment ending in \xc3 will result in the following error:

UnicodeDecodeError: 'utf-8' codec can't decode byte 0xc3 in position 3: unexpected end of data

I examined the correct solutions to this problem and, at least in the blocking world, it looks like either io.TextIOWrapper or codecs.StreamReaderWriter (the differences are discussed in PEP 0400 ). However, they both rely on typical blocking threads.

I spent 30 minutes searching for examples with asyncio and continued to find my decode () solution. Does anyone know of a better solution or is it a missing feature in python asynchronous mode?

For reference, here are the results of using two "standard" decoders with asynchronous streams.

Using a codec reader:

 r = yield from aiohttp.get(url) decoder = codecs.getreader('utf-8') stream = decoder(r.content) 

An exception:

 File "echo_client.py", line 13, in get_data data = yield from stream.read(4) File "/usr/lib/python3.5/codecs.py", line 497, in read data = self.bytebuffer + newdata TypeError: can't concat bytes to generator 

(it calls read () directly, not yield from or await it)

I also tried to wrap the stream using io.TextIOWrapper:

 stream = TextIOWrapper(r.content) 

But this leads to the following:

 File "echo_client.py", line 10, in get_data stream = TextIOWrapper(r.content) AttributeError: 'FlowControlStreamReader' object has no attribute 'readable' 

PS If you need an example for an example, look at this meaning . You can run it with python3.5 to reproduce the error. If you change the block size from 4 to 3 (or 30), it will work correctly.

EDIT

The accepted answer fixed this as a charm. Thank you If anyone has this problem, here is a simple wrapper class that I made to handle decoding in StreamReader:

 import codecs class DecodingStreamReader: def __init__(self, stream, encoding='utf-8', errors='strict'): self.stream = stream self.decoder = codecs.getincrementaldecoder(encoding)(errors=errors) async def read(self, n=-1): data = await self.stream.read(n) if isinstance(data, (bytes, bytearray)): data = self.decoder.decode(data) return data def at_eof(self): return self.stream.at_eof() 
+7
python asynchronous encoding utf-8 python-asyncio
source share
1 answer

You can use IncrementalDecoder :

 Utf8Decoder = codecs.getincrementaldecoder('utf-8') 

In your example:

 decoder = Utf8Decoder(error='strict') while not stream.at_eof(): data = await stream.read(4) print(decoder.decode(data), end='') 

Output:

 Mädchen mit Biß 
+3
source share

All Articles