Files misplaced in distributed cache

I am adding the file to the distributed cache using the following code:

Configuration conf2 = new Configuration(); job = new Job(conf2); job.setJobName("Join with Cache"); DistributedCache.addCacheFile(new URI("hdfs://server:port/FilePath/part-r-00000"), conf2); 

Then I read the file in mappers:

 protected void setup(Context context)throws IOException,InterruptedException{ Configuration conf = context.getConfiguration(); URI[] cacheFile = DistributedCache.getCacheFiles(conf); FSDataInputStream in = FileSystem.get(conf).open(new Path(cacheFile[0].getPath())); BufferedReader joinReader = new BufferedReader(new InputStreamReader(in)); String line; try { while ((line = joinReader.readLine()) != null) { s = line.toString().split("\t"); do stuff to s } finally { joinReader.close(); } 

The problem is that I only read on one line, and this is not a file that I put in the cache. Rather, it is: cm9vdA == or root in base64.

Has anyone else had this problem, or see how I use distributed cache incorrectly? I am using Hadoop 0.20.2 fully distributed.

+4
hadoop distributed-cache
source share
1 answer

General error in the configuration of your work:

 Configuration conf2 = new Configuration(); job = new Job(conf2); job.setJobName("Join with Cache"); DistributedCache.addCacheFile(new URI("hdfs://server:port/FilePath/part-r-00000"), conf2); 

After you create your Job object, you need to drop the Configuration object when Job makes a copy of it, and setting values ​​in conf2 after creating the job will not affect the job. Try the following:

 job = new Job(new Configuration()); Configuration conf2 = job.getConfiguration(); job.setJobName("Join with Cache"); DistributedCache.addCacheFile(new URI("hdfs://server:port/FilePath/part-r-00000"), conf2); 

You should also check the number of files in the distributed cache, possibly more than one, and you open a random file that gives you the value that you see.

I suggest you use a symlink that will make the files available in the local working directory and with a known name:

 DistributedCache.createSymlink(conf2); DistributedCache.addCacheFile(new URI("hdfs://server:port/FilePath/part-r-00000#myfile"), conf2); // then in your mapper setup: BufferedReader joinReader = new BufferedReader(new FileInputStream("myfile")); 
+14
source share

All Articles