I am new to Hadoop and tried to write a join relation using Hadoop. The algorithm attempts to combine the three relationships in two consecutive rounds. I used the recursive method. The program is working fine. But at runtime, he tries to print a warning like this:
14/12/02 10:41:16 WARN io.ReadaheadPool: Failed readahead on ifile
EBADF: Bad file descriptor
at org.apache.hadoop.io.nativeio.NativeIO$POSIX.posix_fadvise(Native Method)
at org.apache.hadoop.io.nativeio.NativeIO$POSIX.posixFadviseIfPossible(NativeIO.java:263)
at org.apache.hadoop.io.nativeio.NativeIO$POSIX$CacheManipulator.posixFadviseIfPossible(NativeIO.java:142)
at org.apache.hadoop.io.ReadaheadPool$ReadaheadRequestImpl.run(ReadaheadPool.java:206)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
This is annoying, and I want to know the cause of the problem and how to get rid of them. My code is as follows:
public class Recursive {
static String[] relationSequence;
static int round;
public static class joinMapper extends Mapper<Object, Text, IntWritable, Text>{
public void map(Object keyIn, Text valueIn, Context context) throws IOException, InterruptedException {
String curValue = valueIn.toString();
String[] values = curValue.split("\t");
String fileName = ((FileSplit) context.getInputSplit()).getPath().getName();
int joinIndex;
String others = "";
if(fileName.compareTo(relationSequence[round])==0){
joinIndex = 0;
others = curValue.substring(0+2);
}else{
joinIndex = values.length - 1;
others = curValue.substring(0, curValue.length()-2);
}
IntWritable joinValue = new IntWritable(Integer.parseInt(values[joinIndex]));
Text temp = new Text(fileName + "|" + others);
context.write(joinValue,temp);
}
}
public static class joinReducer extends Reducer<IntWritable, Text, Text, Text>{
public void reduce(IntWritable keyIn, Iterable<Text> valueIn, Context context)
throws IOException, InterruptedException{
ArrayList<String> firstRelation = new ArrayList<String>();
ArrayList<String> secondRelation = new ArrayList<String>();
for (Text value : valueIn) {
String[] values = value.toString().split("\\|");
if(values[0].compareTo(relationSequence[round])==0){
secondRelation.add(values[1]);
}else{
firstRelation.add(values[1]);
}
}
if(secondRelation.size()>0){
for (String firstItem : firstRelation) {
for (String secondItem : secondRelation) {
context.write(new Text(firstItem.toString()), new Text(keyIn.toString() + "\t"
+ secondItem.toString()
));
}
}
}
}
}
public static class joinPartitioner extends Partitioner<IntWritable, Text> {
public int getPartition(IntWritable key, Text value, int numReduceTasks) {
int partitionNumber = key.get()&0x007F;
return partitionNumber;
}
}
public static void main(String[] args) throws IllegalArgumentException, IOException, InterruptedException, ClassNotFoundException {
long s = System.currentTimeMillis();
relationSequence = args[4].split(",");
round = 1;
int maxOfReducers = 128;
int noReducers;
noReducers = maxOfReducers;
Path firstRelation = new Path(args[0]);
Path secondRelation = new Path(args[1]);
Path thirdRelation = new Path(args[2]);
Path temp = new Path("/temp");
Path out = new Path(args[3]);
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "Recursive multi-way join (first round)");
job.setNumReduceTasks(noReducers);
job.setJarByClass(Recursive.class);
job.setMapperClass(joinMapper.class);
job.setPartitionerClass(joinPartitioner.class);
job.setReducerClass(joinReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(Text.class);
FileSystem fs = FileSystem.get(conf);
if(fs.exists(temp)){ fs.delete(temp, true);}
if(fs.exists(out)) { fs.delete(out, true); }
FileInputFormat.addInputPath(job, firstRelation);
FileInputFormat.addInputPath(job, secondRelation);
FileOutputFormat.setOutputPath(job, temp);
job.submit();
boolean b = job.waitForCompletion(true);
if(b){
round++;
Configuration conf2 = new Configuration();
Job job2 = Job.getInstance(conf2, "Reduce multi-way join (second round)");
job2.setNumReduceTasks(noReducers);
job2.setJarByClass(Recursive.class);
job2.setMapperClass(joinMapper.class);
job2.setPartitionerClass(joinPartitioner.class);
job2.setReducerClass(joinReducer.class);
job2.setOutputKeyClass(Text.class);
job2.setOutputValueClass(Text.class);
job2.setMapOutputKeyClass(IntWritable.class);
job2.setMapOutputValueClass(Text.class);
FileInputFormat.addInputPath(job2, temp);
FileInputFormat.addInputPath(job2, thirdRelation);
FileOutputFormat.setOutputPath(job2, out);
job2.submit();
b = job2.waitForCompletion(true);
long e = System.currentTimeMillis() - s;
System.out.println("Total: " + e);
System.exit(b ? 0 : 1);
}
System.exit(1);
}
}