Key database integration with Spark

I find it difficult to understand how Spark interacts with storage.

I would like to create a Spark cluster that retrieves data from the RocksDB database (or any other keystore). However, at this point, the best I can do is collect the entire data set from the database into memory in each node of the cluster (for example, on the map) and build the RDD from this object.

What do I need to do to extract only the necessary data (e.g. Spark with HDFS)? I read about Hadoop Input Format and Record Readers, but I don’t quite understand what I should implement.

I know this is a pretty broad question, but I would really appreciate help to get me started. Thank you in advance.

+7
hadoop apache-spark rocksdb
source share
1 answer

Here is one possible solution. I assume that you have a client library for the keystore (RocksDB in your case) that you want to access.
KeyValuePair represents a bean class that represents one key-value pair from your key-value store.

Classes

 /*Lazy iterator to read from KeyValue store*/ class KeyValueIterator implements Iterator<KeyValuePair> { public KeyValueIterator() { //TODO initialize your custom reader using java client library } @Override public boolean hasNext() { //TODO } @Override public KeyValuePair next() { //TODO } } class KeyValueReader implements FlatMapFunction<KeyValuePair, KeyValuePair>() { @Override public Iterator<KeyValuePair> call(KeyValuePair keyValuePair) throws Exception { //ignore empty 'keyValuePair' object return new KeyValueIterator(); } } 

Create KeyDalue RDD

 /*list with a dummy KeyValuePair instance*/ ArrayList<KeyValuePair> keyValuePairs = new ArrayList<>(); keyValuePairs.add(new KeyValuePair()); JavaRDD<KeyValuePair> keyValuePairRDD = javaSparkContext.parallelize(keyValuePairs); /*Read one key-value pair at a time lazily*/ keyValuePairRDD = keyValuePairRDD.flatMap(new KeyValueReader()); 

Note:

The above solution creates an RDD with two partitions by default (one of them will be empty). Extend the sections before applying any conversion to keyValuePairRDD to distribute the processing between the performers. Different ways to increase partitions:

 keyValuePairRDD.repartition(partitionCounts) //OR keyValuePairRDD.partitionBy(...) 
+4
source share

All Articles