How to perform a Search operation on Spark frames, given several conditions

I am new to Spark (my version 1.6.0) and now I am trying to solve the problem below:

Suppose there are two source files:

  • The first (A for short) is large, which contains columns with the names A1, B1, C1 and the other 80 columns. There are 230K entries inside.
  • The second option (B for short) is a small lookup table that contains columns with the names A2, B2, C2, and D2. There are 250 entries inside.

Now we need to insert a new column in A, given the following logic:

  • First search A1, B1 and C1 in B (corresponding columns A2, B2 and C2), if successful, return D2 as the value of the newly added column. If nothing is found ...
  • Then find A1, B1 in B. If successful, return D2. If nothing is found ...
  • Set the default value to "NA"

I already read in files and converted them to data frames. For the first situation, I got the result when the left outer join connected them. But I can not find a good way in the next step.

My current attempt is to create a new data frame by attaching A and B using less stringent conditions. However, I do not know how to update the current data frame from another. Or is there an even more intuitive and effective way to solve the whole problem?

Thanks for all the answers.

----------------------------- Update from 20160309 --------------- --- --------------

Finally, the accepted answer is @mlk. Thanks a lot to @ zero323 for his wonderful comments regarding UDF and join, the generation of tungsten code is another problem we are facing right now. But since we need to do several searches and on average 4 conditions for each search, the first solution is more suitable ...

The final solution looks like below:

``` import sqlContext.implicits._ import com.github.marklister.collections.io._ case class TableType(A: String, B: String, C: String, D: String) val tableBroadcast = sparkContext.broadcast(CsvParser(TableType).parseFile("...")) val lkupD = udf { (aStr: String, bStr: String, cStr: String) => tableBroadcast.value.find { case TableType(a, b, c, _) => (a == aStr && b == bStr && c == cStr) || (a == aStr && b == bStr) }.getOrElse(TableType("", "", "", "NA")).D } df = df.withColumn("NEW_COL", lkupD($"A", $"B", $"C")) ``` 
+6
source share
2 answers

Since B is small, I think the best way to do this is with a broadcast variable and a user-defined function.

 // However you get the data... case class BType( A2: Int, B2: Int, C2 : Int, D2 : String) val B = Seq(BType(1,1,1,"B111"), BType(1,1,2, "B112"), BType(2,0,0, "B200")) val A = sc.parallelize(Seq((1,1,1, "DATA"), (1,1,2, "DATA"), (2, 0, 0, "DATA"), (2, 0, 1, "NONE"), (3, 0, 0, "NONE"))).toDF("A1", "B1", "C1", "OTHER") // Broadcast B so all nodes have a copy of it. val Bbradcast = sc.broadcast(B) // A user defined function to find the value for D2. This I'm sure could be improved by whacking it into maps. But this is a small example. val findD = udf {( a: Int, b : Int, c: Int) => Bbradcast.value.find(x => x.A2 == a && x.B2 == b && x.C2 == c).getOrElse(Bbradcast.value.find(x => x.A2 == a && x.B2 == b).getOrElse(BType(0,0,0,"NA"))).D2 } // Use the UDF in a select A.select($"A1", $"B1", $"C1", $"OTHER", findD($"A1", $"B1", $"C1").as("D")).show 
+4
source

Just for reference, a solution without UDF:

 val b1 = broadcast(b.toDF("A2_1", "B2_1", "C2_1", "D_1")) val b2 = broadcast(b.toDF("A2_2", "B2_2", "C2_2", "D_2")) // Match A, B and C val expr1 = ($"A1" === $"A2_1") && ($"B1" === $"B2_1") && ($"C1" === $"C2_1") // Match A and B mismatch C val expr2 = ($"A1" === $"A2_2") && ($"B1" === $"B2_2") && ($"C1" !== $"C2_2") val toDrop = b1.columns ++ b2.columns toDrop.foldLeft(a .join(b1, expr1, "leftouter") .join(b2, expr2, "leftouter") // If there is match on A, B, C then D_1 should be not NULL // otherwise we fall-back to D_2 .withColumn("D", coalesce($"D_1", $"D_2")) )((df, c) => df.drop(c)) 

It is assumed that in each category there should be no more than one match (all three columns, or the first two) or duplicate rows in the output.

UDF vs JOIN:

There are several factors to consider, and there is no simple answer:

Minuses:

  • broadcast joins requires data transfer twice to work nodes. Currently broadcasted tables are not cached ( SPARK-3863 ), and this is unlikely to change any time soon (Resolution: Later). Operation
  • join is applied twice, even if there is a complete match.

Pros:

  • join and coalesce transparent to the optimizer, but UDF is not.
  • working directly with SQL expressions can benefit from all tungsten optimizations, including code generation, while UDF cannot.
+2
source

All Articles