Turn the list of key / value pairs into the list of values ​​per key in the spark

We need to efficiently convert large lists of key / value pairs, for example:

val providedData = List(
        (new Key("1"), new Val("one")),
        (new Key("1"), new Val("un")),
        (new Key("1"), new Val("ein")),
        (new Key("2"), new Val("two")),
        (new Key("2"), new Val("deux")),
        (new Key("2"), new Val("zwei"))
)

in the lists of values ​​for each key, for example:

val expectedData = List(
  (new Key("1"), List(
    new Val("one"), 
    new Val("un"), 
    new Val("ein"))),
  (new Key("2"), List(
    new Val("two"), 
    new Val("deux"), 
    new Val("zwei")))
)

The key value pair is from a large key / value store (Accumulo), so the keys will be sorted, but will usually intersect with the boundaries of the cross borders. There can be millions of keys and hundreds of values ​​per key.

I believe that the correct tool for this work is the operation spark combyBeyKey, but they can only find brief examples with typical types (for example, Int), which I could not generalize to custom types such as above.

, , , - (), scala combByKey , , , , , .

+4
1

Spark, , , :

val rdd = sc.parallelize(providedData)

rdd.combineByKey(
    // createCombiner: add first value to a list
    (x: Val) => List(x),
    // mergeValue: add new value to existing list
    (acc: List[Val], x) => x :: acc,
    // mergeCominber: combine the 2 lists
    (acc1: List[Val], acc2: List[Val]) => acc1 ::: acc2
)

aggregateByKey:

rdd.aggregateByKey(List[Val]())(
    (acc, x) => x :: acc,
    (acc1, acc2) => acc1 ::: acc2
)
+4

All Articles