Hadoop ChainMapper, ChainReducer

I am relatively new to Hadoop and trying to figure out how to program a task chain (several cartographers, reducers) using ChainMapper, ChainReducer. I found several partial examples, but not one complete and working one.

My current test code

public class ChainJobs extends Configured implements Tool { public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> { private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { String line = value.toString(); StringTokenizer tokenizer = new StringTokenizer(line); while (tokenizer.hasMoreTokens()) { word.set(tokenizer.nextToken()); output.collect(word, one); } } } public static class Map2 extends MapReduceBase implements Mapper<Text, IntWritable, Text, IntWritable> { private final static IntWritable one = new IntWritable(1); private Text word = new Text(); @Override public void map(Text key, IntWritable value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { String line = value.toString(); StringTokenizer tokenizer = new StringTokenizer(line); while (tokenizer.hasMoreTokens()) { word.set(tokenizer.nextToken().concat("Justatest")); output.collect(word, one); } } } public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> { @Override public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { int sum = 0; while (values.hasNext()) { sum += values.next().get(); } output.collect(key, new IntWritable(sum)); } } @Override public int run(String[] args) { Configuration conf = getConf(); JobConf job = new JobConf(conf); job.setJobName("TestforChainJobs"); FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); JobConf map1Conf = new JobConf(false); ChainMapper.addMapper(job, Map.class, LongWritable.class, Text.class, Text.class, IntWritable.class, true, map1Conf); JobConf map2Conf = new JobConf(false); ChainMapper.addMapper(job, Map2.class, Text.class, IntWritable.class, Text.class, IntWritable.class, true, map2Conf); JobConf reduceConf = new JobConf(false); ChainReducer.setReducer(job, Reduce.class, Text.class, IntWritable.class, Text.class, IntWritable.class, true, reduceConf); JobClient.runJob(job); return 0; } } public static void main(String[] args) throws Exception { int res = ToolRunner.run(new Configuration(), new ChainJobs(), args); System.exit(res); } 

But he fails with

 MapAttempt TASK_TYPE="MAP" TASKID="task_201210162337_0009_m_000000" TASK_ATTEMPT_ID="attempt_201210162337_0009_m_000000_0" TASK_STATUS="FAILED" FINISH_TIME="1350397216365" HOSTNAME="localhost\.localdomain" ERROR="java\.lang\.RuntimeException: Error in configuring object at org\.apache\.hadoop\.util\.ReflectionUtils\.setJobConf(ReflectionUtils\.java:106) at org\.apache\.hadoop\.util\.ReflectionUtils\.setConf(ReflectionUtils\.java:72) at org\.apache\.hadoop\.util\.ReflectionUtils\.newInstance(ReflectionUtils\.java:130) at org\.apache\.hadoop\.mapred\.MapTask\.runOldMapper(MapTask\.java:389) at org\.apache\.hadoop\.mapred\.MapTask\.run(MapTask\.java:327) at org\.apache\.hadoop\.mapred\.Child$4\.run(Child\.java:268) at java\.security\.AccessController\.doPrivileged(Native Method) at javax\.security\.auth\.Subject\.doAs(Subject\.java:396) 

Any hints or a very simple working example are greatly appreciated.

+4
chaining mapreduce hadoop
source share
2 answers


I encoded a wordcount job based on a chain converter. The code is written in the new API and it works well :)

 import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.chain.ChainMapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; //implementing CHAIN MAPREDUCE without using custom format //SPLIT MAPPER class SplitMapper extends Mapper<Object,Text,Text,IntWritable> { private IntWritable dummyValue=new IntWritable(1); //private String content; private String tokens[]; @Override public void map(Object key,Text value,Context context)throws IOException,InterruptedException{ tokens=value.toString().split(" "); for(String x:tokens) { context.write(new Text(x), dummyValue); } } } //UPPER CASE MAPPER class UpperCaseMapper extends Mapper<Text,IntWritable,Text,IntWritable> { @Override public void map(Text key,IntWritable value,Context context)throws IOException,InterruptedException{ String val=key.toString().toUpperCase(); Text newKey=new Text(val); context.write(newKey, value); } } //ChainMapReducer class ChainMapReducer extends Reducer<Text,IntWritable,Text,IntWritable> { private int sum=0; @Override public void reduce(Text key,Iterable<IntWritable>values,Context context)throws IOException,InterruptedException{ for(IntWritable value:values) { sum+=value.get(); } context.write(key, new IntWritable(sum)); } } public class FirstClass extends Configured implements Tool{ static Configuration cf; public int run (String args[])throws IOException,InterruptedException,ClassNotFoundException{ cf=new Configuration(); //bypassing the GenericOptionsParser part and directly running into job declaration part Job j=Job.getInstance(cf); /**************CHAIN MAPPER AREA STARTS********************************/ Configuration splitMapConfig=new Configuration(false); //below we add the 1st mapper class under ChainMapper Class ChainMapper.addMapper(j, SplitMapper.class, Object.class, Text.class, Text.class, IntWritable.class, splitMapConfig); //configuration for second mapper Configuration upperCaseConfig=new Configuration(false); //below we add the 2nd mapper that is the lower case mapper to the Chain Mapper class ChainMapper.addMapper(j, UpperCaseMapper.class, Text.class, IntWritable.class, Text.class, IntWritable.class, upperCaseConfig); /**************CHAIN MAPPER AREA FINISHES********************************/ //now proceeding with the normal delivery j.setJarByClass(FirstClass.class); j.setCombinerClass(ChainMapReducer.class); j.setOutputKeyClass(Text.class); j.setOutputValueClass(IntWritable.class); Path p=new Path(args[1]); //set the input and output URI FileInputFormat.addInputPath(j, new Path(args[0])); FileOutputFormat.setOutputPath(j, p); p.getFileSystem(cf).delete(p, true); return j.waitForCompletion(true)?0:1; } public static void main(String args[])throws Exception{ int res=ToolRunner.run(cf, new FirstClass(), args); System.exit(res); } } 

part of the output was shown below

 A 619 ACCORDING 636 ACCOUNT 638 ACROSS? 655 ADDRESSES 657 AFTER 674 AGGREGATING, 687 AGO, 704 ALL 721 ALMOST 755 ALTERING 768 AMOUNT 785 AN 819 ANATOMY 820 AND 1198 ANXIETY 1215 ANY 1232 APACHE 1300 APPENDING 1313 APPLICATIONS 1330 APPLICATIONS. 1347 APPLICATIONS.Γ―ΒΏΒ½ 1364 APPLIES 1381 ARCHITECTURE, 1387 ARCHIVES 1388 ARE 1405 AS 1422 BASED 1439 

You may see some special or unwanted characters since I did not use any cleanings to remove the punctuation. I just focused on the work of a chain mapper. Thanks:)

+4
source share

This question in Stackoverflow contains the answer: https://stackoverflow.com/a/167269/

0
source share

All Articles