I am trying to do some analysis on sets. I have a sample data that looks like this:
orders.json
{"items":[1,2,3,4,5]} {"items":[1,2,5]} {"items":[1,3,5]} {"items":[3,4,5]}
All that is is a single field, which is a list of numbers representing identifiers.
Here is the Spark script I'm trying to run:
val sparkConf = new SparkConf() .setMaster("local[*]") .setAppName("Dataframe Test") val sc = new SparkContext(sparkConf) val sql = new SQLContext(sc) val dataframe = sql.read.json("orders.json") val expanded = dataframe .explode[::[Long], Long]("items", "item1")(row => row) .explode[::[Long], Long]("items", "item2")(row => row) val grouped = expanded .where(expanded("item1") !== expanded("item2")) .groupBy("item1", "item2") .count() val recs = grouped .groupBy("item1")
Creating expanded and grouped in order, in a nutshell expanded - a list of all possible sets of two identifiers in which two identifiers were in the same source set. grouped filters identifiers that have been mapped to itself, then groups all unique pairs of identifiers and counts for each. Example schema and grouped data:
root |-- item1: long (nullable = true) |-- item2: long (nullable = true) |-- count: long (nullable = false) [1,2,2] [1,3,2] [1,4,1] [1,5,3] [2,1,2] [2,3,1] [2,4,1] [2,5,2] ...
So my question is: how do I now group by the first element in each result so that I have a list of tuples? In the above examples, I would expect something like this:
[1, [(2, 2), (3, 2), (4, 1), (5, 3)]] [2, [(1, 2), (3, 1), (4, 1), (5, 2)]]
As you can see in my script with recs , I thought that you would start by creating groupBy on 'item1', which is the first item on each line. But after that you will be left with this GroupedData object, which has very limited actions. In fact, you are left with only aggregates such as sum, avg, etc. I just want to list the tuples from each result.
I could easily use the RDD functions at this point, but this is moving away from using Dataframes. Is there any way to do this using dataframe functions.