I cannot afford the extra time to complete this, but you can start.
The idea is that you combine the RDD[Row] down into a single map that represents your JSON structure. Aggregation is a summary that requires two functional parameters:
seqOp How to collapse a set of elements to a target typecombOp How to merge two target types.
The combOp part comes in combOp when merging, since you need to accumulate a count of the values observed in seqOp . I left it as an exercise because I have a plane to catch! Hope someone else can fill in the blanks if you have problems.
case class Row(id: Int, country: String, tpe: String) def foo: Unit = { val rows: RDD[Row] = ??? def seqOp(acc: Map[Int, (Map[String, Int], Map[String, Int])], r: Row) = { acc.get(r.id) match { case None => acc.updated(r.id, (Map(r.country, 1), Map(r.tpe, 1))) case Some((countries, types)) => val countries_ = countries.updated(r.country, countries.getOrElse(r.country, 0) + 1) val types_ = types.updated(r.tpe, types.getOrElse(r.tpe, 0) + 1) acc.updated(r.id, (countries_, types_)) } } val z = Map.empty[Int, (Map[String, Int], Map[String, Int])] def combOp(l: Map[Int, (Map[String, Int], Map[String, Int])], r: Map[Int, (Map[String, Int], Map[String, Int])]) = { l.foldLeft(z) { case (acc, (id, (countries, types))) => r.get(id) match { case None => acc.updated(id, (countries, types)) case Some(otherCountries, otherTypes) =>