Spark Caching

Function defined for RDD conversion. Therefore, the function is called once for each element in the RDD.

The function must call an external web service to find the reference data, passing it as parameter data from the current element to the RDD.

Two questions:

  • Is there a problem with calling a web service in Spark?

  • Web service data needs to be cached. What is the best way to store (and follow reference) cached data? An easy way would be to store the cache in a collection with the Scala class, which contains the function passed to the RDD. Would it be effective, or is there a better caching approach in Spark?

thanks

+1
source share
1 answer

Actually there is no mechanism for "caching" (in the sense that you mean). It seems like the best approach would be to divide this task into two phases:

  • Get separate โ€œkeysโ€ with which you must access an external search and search once for each key
  • Use this mapping to search for every record in RDD

I assume that there could potentially be many records accessing the same key key (otherwise, "caching" will not make any difference anyway), so making external calls to individual keys is much faster.

How to implement this ?

  • If , you know, that this set of different keys is small enough to fit into the memory of your driver:

    • match your data with the individual keys with which you want to cache these selected values โ€‹โ€‹and collect them, for example: val keys = inputRdd.map(/* get key */).distinct().collect()
    • perform driver side fetching (not using Spark)
    • use the resulting Map[Key, FetchedValues] in any transformation on your original RDD - it will be serialized and sent to each employee where you can perform a search. For example, if there are entries at the input for which the foreignId field is the search key:

       val keys = inputRdd.map(record => record.foreignId).distinct().collect() val lookupTable = keys.map(k => (k, fetchValue(k))).asMap val withValues = inputRdd.map(record => (record, lookupTable(record.foreignId))) 
    • Alternatively, if this card is large (but can still fit into the driverโ€™s memory), you can broadcast it before using it in the RDD conversion โ€” see Translation Variables in the Spark Programming Guide

  • Otherwise (if this map may be too large), you need to use join if you want to store data in the cluster, but still refrain from getting the same twice:

     val byKeyRdd = inputRdd.keyBy(record => record.foreignId) val lookupTableRdd = byKeyRdd .keys() .distinct() .map(k => (k, fetchValue(k))) // this time fetchValue is done in cluster - concurrently for different values val withValues = byKeyRdd.join(lookupTableRdd) 
+2
source

All Articles