I want to combine data twice:
rdd1 = spark.createDataFrame([(1, 'a'), (2, 'b'), (3, 'c')], ['idx', 'val']) rdd2 = spark.createDataFrame([(1, 2, 1), (1, 3, 0), (2, 3, 1)], ['key1', 'key2', 'val']) res1 = rdd1.join(rdd2, on=[rdd1['idx'] == rdd2['key1']]) res2 = res1.join(rdd1, on=[res1['key2'] == rdd1['idx']]) res2.show()
Then I get some error:
pyspark.sql.utils.AnalysisException: u'Cartesian joins can be prohibitively expensive and disabled by default. To explicitly enable them, set spark.sql.crossJoin.enabled = true; ''
But I think this is not a cross join
UPDATE:
res2.explain() == Physical Plan == CartesianProduct :- *SortMergeJoin [idx#0L, idx#0L], [key1#5L, key2#6L], Inner : :- *Sort [idx#0L ASC, idx#0L ASC], false, 0 : : +- Exchange hashpartitioning(idx#0L, idx#0L, 200) : : +- *Filter isnotnull(idx#0L) : : +- Scan ExistingRDD[idx#0L,val#1] : +- *Sort [key1#5L ASC, key2#6L ASC], false, 0 : +- Exchange hashpartitioning(key1#5L, key2#6L, 200) : +- *Filter ((isnotnull(key2#6L) && (key2#6L = key1#5L)) && isnotnull(key1#5L)) : +- Scan ExistingRDD[key1#5L,key2#6L,val#7L] +- Scan ExistingRDD[idx#40L,val#41]
dataframe apache-spark pyspark apache-spark-sql spark-dataframe
Zhang Tong
source share