Mapreduce combiner

I have simple code to convert code using translator, reducer and combiner. The output from the converter is transmitted to the combiner. But for the gearbox, instead of the combiner output, the output from the cartographer is transmitted.

Request for help

the code:

package Combiner; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.Mapper.Context; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; public class AverageSalary { public static class Map extends Mapper<LongWritable, Text, Text, DoubleWritable> { public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] empDetails= value.toString().split(","); Text unit_key = new Text(empDetails[1]); DoubleWritable salary_value = new DoubleWritable(Double.parseDouble(empDetails[2])); context.write(unit_key,salary_value); } } public static class Combiner extends Reducer<Text,DoubleWritable, Text,Text> { public void reduce(final Text key, final Iterable<DoubleWritable> values, final Context context) { String val; double sum=0; int len=0; while (values.iterator().hasNext()) { sum+=values.iterator().next().get(); len++; } val=String.valueOf(sum)+":"+String.valueOf(len); try { context.write(key,new Text(val)); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } public static class Reduce extends Reducer<Text,Text, Text,Text> { public void reduce (final Text key, final Text values, final Context context) { //String[] sumDetails=values.toString().split(":"); //double average; //average=Double.parseDouble(sumDetails[0]); try { context.write(key,values); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } public static void main(String args[]) { Configuration conf = new Configuration(); try { String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length != 2) { System.err.println("Usage: Main <in> <out>"); System.exit(-1); } Job job = new Job(conf, "Average salary"); //job.setInputFormatClass(KeyValueTextInputFormat.class); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); job.setJarByClass(AverageSalary.class); job.setMapperClass(Map.class); job.setCombinerClass(Combiner.class); job.setReducerClass(Reduce.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); System.exit(job.waitForCompletion(true) ? 0 : -1); } catch (ClassNotFoundException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } 

}

+7
java mapreduce hadoop
source share
4 answers

It seems that you forgot about the important property of the combiner:

The input types for the key / value and the output types of the key / value must be the same.

You cannot take Text/DoubleWritable and return Text/Text . I suggest you use Text instead of DoubleWritable and do the right parsing inside Combiner .

+8
source share

Rule number 1 for combine operators: it is not assumed that the combine will work . Treat the combiner only as an optimization .

The combiner is not guaranteed to complete all of your data. In some cases, when data does not need to be spilled to disk, MapReduce will completely skip using Combiner. Also note that Combiner can be run multiple times on subsets of data! He will work once per spill.

In your case, you make this bad assumption. You have to do the amount in the combiner and gearbox.

In addition, you should also respond to @ user987339. The input and output of the combiner must be identical (Text, Double → Text, Double), and it must match the output of the Mapper and the input of the Reducer.

+14
source share

If the union function is used, then it is the same form as the reduction function (and the Reducer implementation), with the exception of its output types, are an intermediate key and (K2 and V2), so they can provide the reduction function: map: (K1, V1) → list (K2, V2) Combination: (K2, list (V2)) → list (K2, V2) reduce: (K2, list (V2)) → list (K3, V3) Often the functions of combining and reduction are the same, and in this case K3 is the same as K2 and V3 coincide with V2.

+1
source share

Combiner will not work always when you start mapreduce .

If there are at least three spill files (the output of the mapper recorded on the local disk), the combiner will perform so that the file size can be reduced so that it can be easily moved to reduce the node.

The number of spills for which you need to start the combiner can be set via the min.num.spills.for.combine property

0
source share

All Articles