Manupulation iterator in mapreduce

I am trying to find the sum of any given points using hadoop. The problem I am facing is getting all the values ​​from the given key in one reducer. It looks like this.

Dilution:

public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, DoubleWritable> { public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, DoubleWritable> output, Reporter reporter) throws IOException { Text word = new Text(); Iterator<IntWritable> tr = values; IntWritable v; while (tr.hasNext()) { v = tr.next(); Iterator<IntWritable> td = values; while (td.hasNext()) { IntWritable u = td.next(); double sum = u+v; word.set( u + " + " + v); output.collect(word, new DoubleWritable(sum)); } } } } 

And I'm trying to create two copies of the Iterator variable so that I can go through all the values ​​of the second iterator while I get one value from the previous Iterator (two while loops), but two iterators keep the same value all the time.

I am not sure if this is the right way to do this. Any help is really appreciated.

Thanks,

Tsegay

+4
source share
4 answers

Iterators in a reducer are not as simple as you might think.

The problem is that the total number of elements that you repeat may not fit into memory. This means that the iterator can read from disk. If you have two independent copies of the iterator, then you can have one of them far ahead of the other, which means that the data between the two points of the iterator cannot be deleted.

For ease of implementation, Hadoop does not support having more than one iterator for reduction values.

The practical effect of this is that you cannot go through the same iterator twice. This is not good, but it is. If you absolutely know that the number of items will fit into the memory, you can copy all the elements to the list, as suggested by MrGomez. If you do not know this, you may need to use secondary storage.

The best approach is to reverse engineer your program so you don't need unlimited storage in the gearbox. This may be a bit confusing, but there are standard approaches to the problem.

For your specific problem, you have a quadratic increase in output size compared to the largest reduction input set. This is usually a very bad idea. In most cases, you do not need ALL pairs, only the most important pairs. If you can somehow trim the set of pairs, then you are all set up and you can remove the restriction of all pairs.

For example, if you try to find 100 pairs with the largest amount for each set of abbreviations, you can save the priority order with the 100 largest data currently nested and the priority queue with the 100 largest sums. For each new input, you can generate the sum with the largest 100 numbers, who have seen so far, and try to insert these amounts in the second place. Finally, you must insert the new input first and trim both queues to 100 elements, deleting the smallest values ​​(if necessary). In the method of closing the reduction method, you must reset the priority queue. This approach ensures that you only need minimal (n ^ 2, 200) storage items that avoid the n ^ 2 problem and avoid double-passing through the entrance, saving the 100 largest items, not all items seen.

+28
source

I'm not sure exactly what you are trying to accomplish, but I know this a lot: the behavior of the Hadoop Iterators is a bit strange. Calling Iterator.next () always returns a SAME EXACT instance from IntWritable, with the contents of this instance being replaced with the following value. Thus, accessing IntWritable through calls to Iterator.next () is almost always an error. I believe this design behavior reduces the amount of object creation and the overhead of GC.

One way around this is to use WritableUtils.clone () to clone the instance you are trying to save during calls to Iterator.next ().

+12
source

Following your previous question, you seem to be stuck in the piccolbo iterator problem described. the wording of your gearbox also indicates that you have abandoned the proposed algorithms for a naive approach ... that will work, albeit suboptimally.

Let me clear my code a bit with my answer:

 // Making use of Hadoop Iterable reduce, assuming it available to you // // The method signature is: // // protected void reduce(KEYIN key, java.lang.Iterable<VALUEIN> values, // org.apache.hadoop.mapreduce.Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT>.Context // context) throws java.io.IOException, java.lang.InterruptedException // public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { // I assume you declare this here to save on GC Text outKey = new Text(); IntWritable outVal = new IntWritable(); // Since you've forgone piccolbo approach, you'll need to maintain the // data structure yourself. Since we always walk the list forward and // wish to optimize the insertion speed, we use LinkedList. Calls to // IntWritable.get() will give us an int, which we then copy into our list. LinkedList<Integer> valueList = new LinkedList<Integer>(); // Here why we changed the method signature: use of Java for-each for (IntWritable iw: values) { valueList.add(iw.get()); } // And from here, we construct each value pair as an O(n^2) operation for (Integer i: valueList) { for (Integer j: valueList) { outKey.set(i + " + " + j); outVal.set(i + j); context.write(outKey, outVal); } } // Do note: I've also changed your return value from DoubleWritable to // IntWritable, since you should always be performing integer operations // as defined. If your points are Double, supply DoubleWritable instead. } 

This works, but he makes several assumptions that limit performance when building your distance matrix, including requiring that the combination be performed in a single reduction operation.

Consider the piccolbo approach if you know in advance the size and dimension of your input. This should be available, in the worst case, by entering input lines in linear time.

(see this thread , why we cannot implement this as an advanced iterator.)

+1
source

To copy an Iterator, you cannot assign a new variable to the iterator. You must "clone" the iterator of the new variable of the iterator class. When iterator A assigns another variable to Iterator B, the two variables of the iterator indicate the same data.

+1
source

All Articles