Does the spark use the hbase sorted key order when using hbase as the data source

I store data time-seriesin HBase. The row row consists of user_idand timestamp, for example:

{
    "userid1-1428364800" : {
        "columnFamily1" : {
            "val" : "1"
            }
        }
    }
    "userid1-1428364803" : {
        "columnFamily1" : {
            "val" : "2"
            }
        }
    }

    "userid2-1428364812" : {
        "columnFamily1" : {
            "val" : "abc"
            }
        }
    }

}

Now I need to perform an analysis for each user. Here is the initialization hbase_rdd(from here )

sc = SparkContext(appName="HBaseInputFormat")

conf = {"hbase.zookeeper.quorum": host, "hbase.mapreduce.inputtable": table}
keyConv = "org.apache.spark.examples.pythonconverters.ImmutableBytesWritableToStringConverter"
valueConv = "org.apache.spark.examples.pythonconverters.HBaseResultToStringConverter"

hbase_rdd = sc.newAPIHadoopRDD(
        "org.apache.hadoop.hbase.mapreduce.TableInputFormat",
        "org.apache.hadoop.hbase.io.ImmutableBytesWritable",
        "org.apache.hadoop.hbase.client.Result",
        keyConverter=keyConv,
        valueConverter=valueConv,
        conf=conf)

The natural way to convert as mapreduce would be:

hbase_rdd
   .map(lambda row: (row[0].split('-')[0], (row[0].split('-')[1], row[1])))  # shift timestamp from key to value
   .groupByKey()
   .map(processUserData)  # process user data

When executing the first map (the timestamp of the shift from the key to the value), it is important to know when the time series data of the current user is finished, and therefore groupByKey conversion can be started. Thus, we do not need to display the entire table and store all the temporary data. This is possible because hbase stores key strings in sorted order.

:

import sys

current_user_data = []
last_userid = None
for line in sys.stdin:
    k, v = line.split('\t')
    userid, timestamp = k.split('-')
    if userid != last_userid and current_user_data:
        print processUserData(last_userid, current_user_data)
        last_userid = userid
        current_user_data = [(timestamp, v)]
    else:
        current_user_data.append((timestamp, v))

: hbase Spark?

+4
1

, , HBase, , Spark.

RDD[X]. Spark, X , RDD . , , X (, , ).

mapPartitions , , . , .

val myRDD: RDD[X] = ...
val groupedData: RDD[Seq[X]] = myRdd.mapPartitions { itr =>
  var currentUserData = new scala.collection.mutable.ArrayBuffer[X]()
  var currentUser: X = null
  //itr is an iterator over *all* the records in one partition
  itr.flatMap { x => 
    if (currentUser != null && x.userId == currentUser.userId) {
      // same user as before -- add the data to our list
      currentUserData += x
      None
    } else {
      // its a new user -- return all the data for the old user, and make
      // another buffer for the new user
      val userDataGrouped = currentUserData
      currentUserData = new scala.collection.mutable.ArrayBuffer[X]()
      currentUserData += x
      currentUser = x
      Some(userDataGrouped)
    }
  }
}
// now groupedRDD has all the data for one user grouped together, and we didn't
// need to do an expensive shuffle.  Also, the above transformation is lazy, so
// we don't necessarily even store all that data in memory -- we could still
// do more filtering on the fly, eg:
val usersWithLotsOfData = groupedRDD.filter{ userData => userData.size > 10 }

, python - , , , Scala. , , , , Scala...:). , , , . ( flatMap Some None, , , ...)

+2

All Articles