Actually there is no mechanism for "caching" (in the sense that you mean). It seems like the best approach would be to divide this task into two phases:
- Get separate โkeysโ with which you must access an external search and search once for each key
- Use this mapping to search for every record in RDD
I assume that there could potentially be many records accessing the same key key (otherwise, "caching" will not make any difference anyway), so making external calls to individual keys is much faster.
How to implement this ?
If , you know, that this set of different keys is small enough to fit into the memory of your driver:
- match your data with the individual keys with which you want to cache these selected values โโand collect them, for example:
val keys = inputRdd.map(/* get key */).distinct().collect() - perform driver side fetching (not using Spark)
use the resulting Map[Key, FetchedValues] in any transformation on your original RDD - it will be serialized and sent to each employee where you can perform a search. For example, if there are entries at the input for which the foreignId field is the search key:
val keys = inputRdd.map(record => record.foreignId).distinct().collect() val lookupTable = keys.map(k => (k, fetchValue(k))).asMap val withValues = inputRdd.map(record => (record, lookupTable(record.foreignId)))
Alternatively, if this card is large (but can still fit into the driverโs memory), you can broadcast it before using it in the RDD conversion โ see Translation Variables in the Spark Programming Guide
Otherwise (if this map may be too large), you need to use join if you want to store data in the cluster, but still refrain from getting the same twice:
val byKeyRdd = inputRdd.keyBy(record => record.foreignId) val lookupTableRdd = byKeyRdd .keys() .distinct() .map(k => (k, fetchValue(k)))
source share