I am relatively new to Hadoop and trying to figure out how to program a task chain (several cartographers, reducers) using ChainMapper, ChainReducer. I found several partial examples, but not one complete and working one.
My current test code
public class ChainJobs extends Configured implements Tool { public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> { private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { String line = value.toString(); StringTokenizer tokenizer = new StringTokenizer(line); while (tokenizer.hasMoreTokens()) { word.set(tokenizer.nextToken()); output.collect(word, one); } } } public static class Map2 extends MapReduceBase implements Mapper<Text, IntWritable, Text, IntWritable> { private final static IntWritable one = new IntWritable(1); private Text word = new Text(); @Override public void map(Text key, IntWritable value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { String line = value.toString(); StringTokenizer tokenizer = new StringTokenizer(line); while (tokenizer.hasMoreTokens()) { word.set(tokenizer.nextToken().concat("Justatest")); output.collect(word, one); } } } public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> { @Override public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { int sum = 0; while (values.hasNext()) { sum += values.next().get(); } output.collect(key, new IntWritable(sum)); } } @Override public int run(String[] args) { Configuration conf = getConf(); JobConf job = new JobConf(conf); job.setJobName("TestforChainJobs"); FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); JobConf map1Conf = new JobConf(false); ChainMapper.addMapper(job, Map.class, LongWritable.class, Text.class, Text.class, IntWritable.class, true, map1Conf); JobConf map2Conf = new JobConf(false); ChainMapper.addMapper(job, Map2.class, Text.class, IntWritable.class, Text.class, IntWritable.class, true, map2Conf); JobConf reduceConf = new JobConf(false); ChainReducer.setReducer(job, Reduce.class, Text.class, IntWritable.class, Text.class, IntWritable.class, true, reduceConf); JobClient.runJob(job); return 0; } } public static void main(String[] args) throws Exception { int res = ToolRunner.run(new Configuration(), new ChainJobs(), args); System.exit(res); }
But he fails with
MapAttempt TASK_TYPE="MAP" TASKID="task_201210162337_0009_m_000000" TASK_ATTEMPT_ID="attempt_201210162337_0009_m_000000_0" TASK_STATUS="FAILED" FINISH_TIME="1350397216365" HOSTNAME="localhost\.localdomain" ERROR="java\.lang\.RuntimeException: Error in configuring object at org\.apache\.hadoop\.util\.ReflectionUtils\.setJobConf(ReflectionUtils\.java:106) at org\.apache\.hadoop\.util\.ReflectionUtils\.setConf(ReflectionUtils\.java:72) at org\.apache\.hadoop\.util\.ReflectionUtils\.newInstance(ReflectionUtils\.java:130) at org\.apache\.hadoop\.mapred\.MapTask\.runOldMapper(MapTask\.java:389) at org\.apache\.hadoop\.mapred\.MapTask\.run(MapTask\.java:327) at org\.apache\.hadoop\.mapred\.Child$4\.run(Child\.java:268) at java\.security\.AccessController\.doPrivileged(Native Method) at javax\.security\.auth\.Subject\.doAs(Subject\.java:396)
Any hints or a very simple working example are greatly appreciated.