Akka HTTP Streaming JSON Deserialization

Is it possible to dynamically deserialize an external stream of unknown length, ByteString stream from Akka HTTP to domain objects?


Context

I call an infinitely long HTTP endpoint that outputs a JSON Array that continues to grow:

 [ { "prop": true, "prop2": false, "prop3": 97, "prop4": "sample" }, { "prop": true, "prop2": false, "prop3": 97, "prop4": "sample" }, { "prop": true, "prop2": false, "prop3": 97, "prop4": "sample" }, { "prop": true, "prop2": false, "prop3": 97, "prop4": "sample" }, { "prop": true, "prop2": false, "prop3": 97, "prop4": "sample" }, ... ] <- Never sees the daylight 
+8
json akka akka-stream
source share
3 answers

I think play-iteratees-extras should help you. This library allows you to parse Json using the Enumerator / Iteratee template and, of course, does not wait to receive all the data.

For example, so as not to create an "infinite" byte stream representing a "infinite" Json array.

 import play.api.libs.iteratee.{Enumeratee, Enumerator, Iteratee} var i = 0 var isFirstWas = false val max = 10000 val stream = Enumerator("[".getBytes) andThen Enumerator.generateM { Future { i += 1 if (i < max) { val json = Json.stringify(Json.obj( "prop" -> Random.nextBoolean(), "prop2" -> Random.nextBoolean(), "prop3" -> Random.nextInt(), "prop4" -> Random.alphanumeric.take(5).mkString("") )) val string = if (isFirstWas) { "," + json } else { isFirstWas = true json } Some(Codec.utf_8.encode(string)) } else if (i == max) Some("]".getBytes) // <------ this is the last jsArray closing tag else None } } 

Well, this value contains a jsArray of 10,000 (or more) objects. Allows you to define a case class that will contain the data of each object in our array.

 case class Props(prop: Boolean, prop2: Boolean, prop3: Int, prop4: String) 

Now write a parser that will parse each element

 import play.extras.iteratees._ import JsonBodyParser._ import JsonIteratees._ import JsonEnumeratees._ val parser = jsArray(jsValues(jsSimpleObject)) ><> Enumeratee.map { json => for { prop <- json.\("prop").asOpt[Boolean] prop2 <- json.\("prop2").asOpt[Boolean] prop3 <- json.\("prop3").asOpt[Int] prop4 <- json.\("prop4").asOpt[String] } yield Props(prop, prop2, prop3, prop4) } 

Please see the doc for jsArray , jsValues and jsSimpleObject . To create a result handler:

 val result = stream &> Encoding.decode() ><> parser 

Encoding.decode() from the JsonIteratees package will decode the bytes as a CharString . result value is of type Enumerator[Option[Item]] , and you can apply some iteratee to this enumerator to start the parsing process.

In general, I don’t know how you get bytes (the solution depends a lot on this), but I think this is one of the possible solutions to your problem.

0
source

I had a very similar problem trying to parse the Twitter stream (infinite string) into a domain object. I solved this with Json4s , for example:

 case class Tweet(username: String, geolocation: Option[Geo]) case class Geo(latitude: Float, longitude: Float) object Tweet{ def apply(s: String): Tweet = { parse(StringInput(s), useBigDecimalForDouble = false, useBigIntForLong = false).extract[Tweet] } } 

Then I just buffer the stream and matched it with Tweet:

 val reader = new BufferedReader(new InputStreamReader(new GZIPInputStream(inputStream), "UTF-8")) var line = reader.readLine() while(line != null){ store(Tweet.apply(line)) line = reader.readLine() } 

Json4s has full support for Option (or custom objects inside an object, such as Geo in the example). Therefore, you can put an option like me, and if the field is not included in Json, it will be set to None.

Hope this helps!

0
source

I assume that in this case, JsonFraming.objectScanner(Int.MaxValue) should be used. According to the documents:

Returns a stream that implements the framing operator based on "parenthesis calculation" to produce valid JSON pieces. It scans the incoming data stream for valid JSON objects and returns ByteStrings fragments containing only these valid fragments. Typical examples of data that can be generated using this operator include: Very large arrays

So you can get something like this:

 val response: Future[HttpResponse] = Http().singleRequest(HttpRequest(uri = serviceUrl)) response.onComplete { case Success(value) => value.entity.dataBytes .via(JsonFraming.objectScanner(Int.MaxValue)) .map(_.utf8String) // In case you have ByteString .map(decode[MyEntity](_)) // Use any Unmarshaller here .grouped(20) .runWith(Sink.ignore) // Do whatever you need here case Failure(exception) => log.error(exception, "Api call failed") } 
0
source

All Articles