Spark difference between reduceByKey and groupByKey vs aggregateByKey vs combByKey

Can someone explain the difference between the shortcut keys, groupbykey, aggregatebykey and the key combination? I read the documents about this, but could not understand the exact differences?

If you can explain it with examples, it will be great.

+7
apache-spark
source share
5 answers

While both reducebykey and groupby will give the same answer, For example, the reduceByKey example works much better on a large dataset. This is because Spark knows that it can combine output with a shared key on each before shuffling the data.

On the other hand, when groupByKey is called, all key-value pairs are shuffled. This is a lot of unspoken data to be transmitted over the network.

for a more detailed check of this link below

https://databricks.gitbooks.io/databricks-spark-knowledge-base/content/best_practices/prefer_reducebykey_over_groupbykey.html

+7
source share
  • groupByKey() is just grouping your dataset based on a key. This will result in data shuffling when the RDD is not yet partitioned.
  • reduceByKey() is something like grouping + aggregation. We can say that reduceBykey () is equivalent for dataset.group (...). Reduce (...). It will shuffle less data unlike groupByKey() .
  • aggregateByKey() logically matches reduceByKey (), but allows you to return the result in another type. In other words, it allows you to enter both type x and aggregate the result as type y. For example, (1,2), (1,4) as input and (1, six) as output. It also takes a null value, which will be applied at the beginning of each key.

Note. One of the similarities is that they are all broad operations.

+4
source share

groupByKey:

Syntax:

 sparkContext.textFile("hdfs://") .flatMap(line => line.split(" ") ) .map(word => (word,1)) .groupByKey() .map((x,y) => (x,sum(y)) ) 

groupByKey can cause disk problems because data is transmitted over the network and collected to reduce workers.

reduceByKey:

Syntax:

 sparkContext.textFile("hdfs://") .flatMap(line => line.split(" ")) .map(word => (word,1)) .reduceByKey((x,y)=> (x+y)) 

Data is combined in each section, only one output for one key in each section for sending over the network. reduceByKey requires combining all your values ​​into another value of the same type.

aggregateByKey:

same as reduceByKey, which takes an initial value.

3 parameters as input i. Initial value II. Combiner Logic III. sequence of operations with logic

 *Example:* ` val keysWithValuesList = Array("foo=A", "foo=A", "foo=A", "foo=A", "foo=B", "bar=C", "bar=D", "bar=D") val data = sc.parallelize(keysWithValuesList) //Create key value pairs val kv = data.map(_.split("=")).map(v => (v(0), v(1))).cache() val initialCount = 0; val addToCounts = (n: Int, v: String) => n + 1 val sumPartitionCounts = (p1: Int, p2: Int) => p1 + p2 val countByKey = kv.aggregateByKey(initialCount)(addToCounts, sumPartitionCounts) 

`

Ouput: Key Accounts Summary Results bar β†’ 3 foo β†’ 5

combineByKey:

3 input options

  • Initial value: unlike aggregateByKey, we don’t always need to pass a constant, we can pass a function that will return a new value.
  • merge function
  • Union function

Example: ``

 val result = rdd.combineByKey( (v) => (v,1), ( (acc:(Int,Int),v) => acc._1 +v , acc._2 +1 ) , ( acc1:(Int,Int),acc2:(Int,Int) => (acc1._1+acc2._1) , (acc1._2+acc2._2)) ).map( { case (k,v) => (k,v._1/v._2.toDouble) }) result.collect.foreach(println) 

`

reduceByKey, aggregateByKey, combByKey is preferred over groupByKey

Link: Avoid groupByKey

+2
source share

ReduceByKey reduceByKey(func, [numTasks]) -

The data is combined so that each section must have at least one value for each key. And then there is a shuffle, and it is transmitted over the network to a certain performer for some actions, such as reduction.

GroupByKey - groupByKey([numTasks])

It does not combine the values ​​for the key, but the shuffling process takes place directly and here a lot of data is sent to each section, almost the same as the original data.

And merging the values ​​for each key is done after shuffling. There is a lot of data stored in the final node executor, which leads to memory problems.

AggregateByKey - aggregateByKey(zeroValue)(seqOp, combOp, [numTasks]) This is similar to reduceByKey, but you can specify the initial values ​​when performing aggregation.

Using reduceByKey

  • reduceByKey can be used when we run a large dataset.

  • reduceByKey when input and output types are the same type over aggregateByKey

In addition, it is recommended that you do not use groupByKey and prefer reduceByKey . See here for more details.

You can also refer to this question to understand in more detail how reduceByKey and aggregateByKey .

0
source share

Although we will both get the same results, there is a significant difference in the performance of both functions. reduceByKey() works better with large datasets compared to groupByKey() .

In reduceByKey() pairs on the same computer with the same key are combined (using the function passed to reduceByKey() ) before the data is shuffled. Then the function is called again to reduce all the values ​​from each section to get one final result.

In groupByKey() all key-value pairs are shuffled. This is a lot of unnecessary data for transmission over the network.

0
source share

All Articles