Scala: how to make a GroupBy sum for String values?

I have an RDD [Row]:

|---itemId----|----Country-------|---Type----------| | 11 | US | Movie | | 11 | US | TV | | 101 | France | Movie | 

How to make GroupBy itemId so that I can save the result as a json list, where each row is a separate json object (each row in RDD):

 {"itemId" : 11, "Country": {"US" :2 },"Type": {"Movie" :1 , "TV" : 1} }, {"itemId" : 101, "Country": {"France" :1 },"Type": {"Movie" :1} } 

RDD:

I tried:

 import com.mapping.data.model.MappingUtils import com.mapping.data.model.CountryInfo val mappingPath = "s3://.../" val input = sc.textFile(mappingPath) 

Input is a jsons list, where each line is json, which I map to the POJO CountryInfo class using MappingUtils, which does JSON analysis and conversion:

 val MappingsList = input.map(x=> { val countryInfo = MappingUtils.getCountryInfoString(x); (countryInfo.getItemId(), countryInfo) }).collectAsMap MappingsList: scala.collection.Map[String,com.mapping.data.model.CountryInfo] def showCountryInfo(x: Option[CountryInfo]) = x match { case Some(s) => s } val events = sqlContext.sql( "select itemId EventList") val itemList = events.map(row => { val itemId = row.getAs[String](1); val çountryInfo = showTitleInfo(MappingsList.get(itemId)); val country = if (countryInfo.getCountry() == 'unknown)' "US" else countryInfo.getCountry() val type = countryInfo.getType() Row(itemId, country, type) }) 

Can someone let me know how I can achieve this?

Thanks!

+6
source share
1 answer

I cannot afford the extra time to complete this, but you can start.

The idea is that you combine the RDD[Row] down into a single map that represents your JSON structure. Aggregation is a summary that requires two functional parameters:

  • seqOp How to collapse a set of elements to a target type
  • combOp How to merge two target types.

The combOp part comes in combOp when merging, since you need to accumulate a count of the values ​​observed in seqOp . I left it as an exercise because I have a plane to catch! Hope someone else can fill in the blanks if you have problems.

  case class Row(id: Int, country: String, tpe: String) def foo: Unit = { val rows: RDD[Row] = ??? def seqOp(acc: Map[Int, (Map[String, Int], Map[String, Int])], r: Row) = { acc.get(r.id) match { case None => acc.updated(r.id, (Map(r.country, 1), Map(r.tpe, 1))) case Some((countries, types)) => val countries_ = countries.updated(r.country, countries.getOrElse(r.country, 0) + 1) val types_ = types.updated(r.tpe, types.getOrElse(r.tpe, 0) + 1) acc.updated(r.id, (countries_, types_)) } } val z = Map.empty[Int, (Map[String, Int], Map[String, Int])] def combOp(l: Map[Int, (Map[String, Int], Map[String, Int])], r: Map[Int, (Map[String, Int], Map[String, Int])]) = { l.foldLeft(z) { case (acc, (id, (countries, types))) => r.get(id) match { case None => acc.updated(id, (countries, types)) case Some(otherCountries, otherTypes) => // todo - continue by merging countries with otherCountries // and types with otherTypes, then update acc } } } val summaryMap = rows.aggregate(z) { seqOp, combOp } 
+3
source

All Articles