I wrote a simple card reduction instruction that will read data from DFS and run a simple algorithm on it. When trying to debug, I decided to just make mappers produce one set of keys and values, and reducers display a completely different set. I am doing this work on a single HWOOP 20.2 node cluster. When the task is completed, the output simply contains the values that were issued by the cartographers, forcing me to believe that the gearbox does not start. I would really appreciate it if someone made it clear why my code produces such an output. I tried setting the outputKeyClass and outputValueClass to different things, and also to setMapOutputKeyClass and setMapOutputValueClass to different things. Our code sections currently commented out are an algorithm that I run, but I changed the map and reduced the methods,to just output specific values. Once again, exiting the job contains only the values that the handler displayed. Here is the class I used to complete the job:
import java.io.IOException; import java.util. *;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser;
/ ** * * @author redbeard * / public class CalculateHistogram {
public static class HistogramMap extends Mapper<LongWritable, Text, LongWritable, Text> {
private static final int R = 100;
private int n = 0;
@Override
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
if (n == 0) {
StringTokenizer tokens = new StringTokenizer(value.toString(), ",");
int counter = 0;
while (tokens.hasMoreTokens()) {
String token = tokens.nextToken();
if (tokens.hasMoreTokens()) {
context.write(new LongWritable(-2), new Text("HI"));
}
counter++;
n++;
}
} else {
n++;
if (n == R) {
n = 0;
}
}
}
}
public static class HistogramReduce extends Reducer<LongWritable, Text, LongWritable, HistogramBucket> {
private final static int R = 10;
public void reduce(LongWritable key, Iterator<Text> values, Context context)
throws IOException, InterruptedException {
if (key.toString().equals("-1")) {
}
Text t = values.next();
for (char c : t.toString().toCharArray()) {
if (!Character.isDigit(c) && c != '.') {
}
}
context.setStatus("Building Histogram");
HistogramBucket i = new HistogramBucket(key);
i.add(new DoubleWritable(Double.parseDouble(t.toString())));
while (values.hasNext()) {
for (int j = 0; j < R; j++) {
t = values.next();
}
if (!i.contains(Double.parseDouble(t.toString()))) {
context.setStatus("Writing a value to the Histogram");
i.add(new DoubleWritable(Double.parseDouble(t.toString())));
}
}
context.write(new LongWritable(55555555), new HistogramBucket(new LongWritable(55555555)));
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: wordcount <in> <out>");
System.exit(2);
}
Job job = new Job(conf, "MRDT - Generate Histogram");
job.setJarByClass(CalculateHistogram.class);
job.setMapperClass(HistogramMap.class);
job.setReducerClass(HistogramReduce.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
source
share