Sorry to cross-mail this message on the hadoop user mailing list here too, but this is becoming an urgent issue for me.
My problem is this: I have two input files and I want to define
- a) The number of lines that are found only in file 1
- b) The number of lines that are found only in file 2
- c) The number of rows common to both (e.g. regarding row equality)
Example:
File 1:
a
b
c
File 2:
a
d
Desired conclusion for each case:
lines_only_in_1: 2 (b, c)
lines_only_in_2: 1 (d)
lines_in_both: 1 (a)
, :
LineRecordReader, , () , (0 1).
Mapper , .
,
Map<Line, Iterable<SourceId>>
( SourceId - 0 1).
, . , (a, b, c) ( 1)
"" ( ?).
, :
lines_only_in_1 2531
lines_only_in_2 3190
lines_in_both 901
. ( , , ).
, ,
(, sourceId)//sourceId 0, 1
, .
, , " " - - .
.
,
1:
public static class SourceCombiner
extends Reducer<Text, ByteWritable, Text, LongWritable> {
private long countA = 0;
private long countB = 0;
private long countC = 0;
@Override
public void reduce(Text key, Iterable<ByteWritable> values, Context context) throws IOException, InterruptedException {
Set<Byte> fileIds = new HashSet<Byte>();
for (ByteWritable val : values) {
byte fileId = val.get();
fileIds.add(fileId);
}
if(fileIds.contains((byte)0)) { ++countA; }
if(fileIds.contains((byte)1)) { ++countB; }
if(fileIds.size() >= 2) { ++countC; }
}
protected void cleanup(Context context)
throws java.io.IOException, java.lang.InterruptedException
{
context.write(new Text("in_a_distinct_count_total"), new LongWritable(countA));
context.write(new Text("in_b_distinct_count_total"), new LongWritable(countB));
context.write(new Text("out_common_distinct_count_total"), new LongWritable(countC));
}
}