Make python dictionary accessible for all spark sections

I am trying to develop an algorithm in pyspark for which I am working with the linalg.SparseVector class. I need to create a dictionary of value value pairs as input for each SparseVector object. The keys here must be integers, since they represent integers (in my case, they represent user identifiers). I have a separate method that reads the input file and returns a dictionary where each user id (string) maps to an integer index. When I view the file again and do

FileRdd.map (lambda x: userid_idx [x [0]]). I get a KeyError. I think this is because my dict is not available for all sections. Is there a way to make userid_idx dict available to all sections like a distributed map in MapReduce? Also apologize for the mess. I am sending this using my phone. Will be updated after some time from my laptop.

Code as promised:

from pyspark.mllib.linalg import SparseVector
from pyspark import SparkContext
import glob
import sys
import time
"""We create user and item indices starting from 0 to #users and 0 to #items respectively. This is done to store them in sparseVectors as dicts."""
def create_indices(inputdir):
    items=dict()
    user_id_to_idx=dict()
    user_idx_to_id=dict()
    item_idx_to_id=dict()
    item_id_to_idx=dict()
    item_idx=0
    user_idx=0
    for inputfile in glob.glob(inputdir+"/*.txt"):
        print inputfile
        with open(inputfile) as f:
            for line in f:
                toks=line.strip().split("\t")
                try:
                    user_id_to_idx[toks[1].strip()]
                except KeyError:
                    user_id_to_idx[toks[1].strip()]=user_idx
                    user_idx_to_id[user_idx]=toks[1].strip()
                    user_idx+=1
                try:
                    item_id_to_idx[toks[0].strip()]
                except KeyError:
                    item_id_to_idx[toks[0].strip()]=item_idx
                    item_idx_to_id[item_idx]=toks[0].strip()
                    item_idx+=1
    return user_idx_to_id,user_id_to_idx,item_idx_to_id,item_id_to_idx,user_idx,item_idx

# pass in the hdfs path to the input files and the spark context.
def runKNN(inputdir,sc,user_id_to_idx,item_id_to_idx):
    rdd_text=sc.textFile(inputdir)
    try:

        new_rdd = rdd_text.map(lambda x: (item_id_to_idx[str(x.strip().split("\t")[0])],{user_id_to_idx[str(x.strip().split("\t")[1])]:1})).reduceByKey(lambda x,y: x.update(y))
    except KeyError:
        sys.exit(1)
    new_rdd.saveAsTextFile("hdfs:path_to_output/user/hadoop/knn/output")

if __name__=="__main__":
    sc = SparkContext()
    u_idx_to_id,u_id_to_idx,i_idx_to_id,i_id_to_idx,u_idx,i_idx=create_indices(sys.argv[1])
    u_idx_to_id_b=sc.broadcast(u_idx_to_id)
    u_id_to_idx_b=sc.broadcast(u_id_to_idx)
    i_idx_to_idx_b=sc.broadcast(i_idx_to_id)
    i_id_to_idx_b=sc.broadcast(i_id_to_idx)
    num_users=sc.broadcast(u_idx)
    num_items=sc.broadcast(i_idx)
    runKNN(sys.argv[1],sc,u_id_to_idx_b.value,i_id_to_idx_b.value)
+4
source share
1 answer

In Spark, this dictionary will already be available to you, as in all tasks. For instance:

dictionary = {1:"red", 2:"blue"}
rdd = sc.parallelize([1,2])
rdd.map(lambda x: dictionary[x]).collect()
# Prints ['red', 'blue']

You will probably find that your problem is that your dictionary does not contain the key you are looking for!

From spark documentation :

, , Spark (, ), node, , . , .

, , ​​ node .

, , node, .

+5

All Articles