Apache Spark: working with the / Some / None option in RDD

I map the HBase table, generating one RDD element per HBase row. However, sometimes the string has bad data (throwing a NullPointerException in the parsing code), in which case I just want to skip it.

I have my original mapper returning Option to indicate that it returns 0 or 1 element, then filters for Some , then gets the contained value:

 // myRDD is RDD[(ImmutableBytesWritable, Result)] val output = myRDD. map( tuple => getData(tuple._2) ). filter( {case Some(y) => true; case None => false} ). map( _.get ). // ... more RDD operations with the good data def getData(r: Result) = { val key = r.getRow var id = "(unk)" var x = -1L try { id = Bytes.toString(key, 0, 11) x = Long.MaxValue - Bytes.toLong(key, 11) // ... more code that might throw exceptions Some( ( id, ( List(x), // more stuff ... ) ) ) } catch { case e: NullPointerException => { logWarning("Skipping id=" + id + ", x=" + x + "; \n" + e) None } } } 

Is there a more idiomatic way to make this shorter? I feel this looks pretty dirty, both in getData() and in the map.filter.map dance that I do.

Maybe flatMap might work (generate 0 or 1 element in Seq ), but I don't want it to smooth out the tuples that I create in the map function, just delete the empty containers.

+5
source share
3 answers

If you modify getData to return scala.util.Try , you can greatly simplify your conversions. Something like this might work:

 def getData(r: Result) = { val key = r.getRow var id = "(unk)" var x = -1L val tr = util.Try{ id = Bytes.toString(key, 0, 11) x = Long.MaxValue - Bytes.toLong(key, 11) // ... more code that might throw exceptions ( id, ( List(x) // more stuff ... ) ) } tr.failed.foreach(e => logWarning("Skipping id=" + id + ", x=" + x + "; \n" + e)) tr } 

Then your conversion can start as follows:

 myRDD. flatMap(tuple => getData(tuple._2).toOption) 

If your Try is Failure , it will be converted to None via toOption , and then removed as part of the flatMap logic. At this point, your next step in the conversion will only work with successful cases, which are the type of the base type, which is returned from getData without wrapping (i.e. No Option )

+7
source

An alternative and often overlooked method would be to use collect(PartialFunction pf) , which is designed to “select” or “collect” certain elements in the RDD that are defined in a partial function.

The code will look like this:

 val output = myRDD.collect{case Success(tuple) => tuple } def getData(r: Result):Try[(String, List[X])] = Try { val id = Bytes.toString(key, 0, 11) val x = Long.MaxValue - Bytes.toLong(key, 11) (id, List(x)) } 
+7
source

If you're fine with deleting data, you can just use mapPartitions . Here is an example:

 import scala.util._ val mixedData = sc.parallelize(List(1,2,3,4,0)) mixedData.mapPartitions(x=>{ val foo = for(y <- x) yield { Try(1/y) } for{goodVals <- foo.partition(_.isSuccess)._1} yield goodVals.get }) 

If you want to see bad values, you can use an accumulator or just a log as you were.

Your code will look something like this:

 val output = myRDD. mapPartitions( tupleIter => getCleanData(tupleIter) ) // ... more RDD operations with the good data def getCleanData(iter: Iter[???]) = { val triedData = getDataInTry(iter) for{goodVals <- triedData.partition(_.isSuccess)._1} yield goodVals.get } def getDataInTry(iter: Iter[???]) = { for(r <- iter) yield { Try{ val key = r._2.getRow var id = "(unk)" var x = -1L id = Bytes.toString(key, 0, 11) x = Long.MaxValue - Bytes.toLong(key, 11) // ... more code that might throw exceptions } } } 
+1
source

Source: https://habr.com/ru/post/1215544/


All Articles