The docs say instead of org.apache.hadoop.mapreduce.lib.output.MultipleOutputs
Below is a snippet of code that uses MultipleOutput. Unfortunately, I did not write this and did not spend much time on it ... Therefore, I do not know exactly why everything happens. I share the hope this helps. :)
Job Setting
job.setJobName("Job Name"); job.setJarByClass(ETLManager.class); job.setMapOutputKeyClass(Text.class); job.setOutputKeyClass(NullWritable.class); job.setMapOutputValueClass(MyThing.class); job.setMapperClass(MyThingMapper.class); job.setReducerClass(MyThingReducer.class); MultipleOutputs.addNamedOutput(job, Constants.MyThing_NAMED_OUTPUT, TextOutputFormat.class, NullWritable.class, Text.class); job.setInputFormatClass(MyInputFormat.class); FileInputFormat.addInputPath(job, new Path(conf.get("input"))); FileOutputFormat.setOutputPath(job, new Path(String.format("%s/%s", conf.get("output"), Constants.MyThing_NAMED_OUTPUT)));
Gear setting
public class MyThingReducer extends Reducer<Text, MyThing, NullWritable, NullWritable> { private MultipleOutputs m_multipleOutputs; @Override public void setup(Context context) { m_multipleOutputs = new MultipleOutputs(context); } @Override public void cleanup(Context context) throws IOException, InterruptedException { if (m_multipleOutputs != null) { m_multipleOutputs.close(); } } @Override public void reduce(Text key, Iterable<MyThing> values, Context context)throws IOException, InterruptedException { for (MyThing myThing : values) { m_multipleOutputs.write(Constants.MyThing_NAMED_OUTPUT, EMPTY_KEY, generateData(context, myThing), generateFileName(context, myThing)); context.progress(); } } }
EDIT: Added link to MultipleOutputs.
source share