In many real-world scenarios, the power of the values ββin the data set will be relatively small. In such cases, the problem can be effectively solved using two MapReduce tasks:
- Calculate the frequency of values ββin your dataset (basically, Word Count job)
- Identity mapping module + reducer, which calculates the median based on the <value - frequency> pair
Work 1. significantly reduce the amount of data and can be performed completely in parallel. Task 2. reducer should process only n tags ( n = cardinality of your value set ) instead of all values, as with a naive approach.
The following is an example of job abbreviation 2. This is a python script that can be used directly in the Hadoop stream. Assumes the values ββin your dataset are ints , but can be easily taken for double s
import sys item_to_index_range = [] total_count = 0 # Store in memory a mapping of a value to the range of indexes it has in a sorted list of all values for line in sys.stdin: item, count = line.strip().split("\t", 1) new_total_count = total_count + int(count) item_to_index_range.append((item, (total_count + 1, new_total_count + 1))) total_count = new_total_count # Calculate index(es) of middle items middle_items_indexes = [(total_count / 2) + 1] if total_count % 2 == 0: middle_items_indexes += [total_count / 2] # Retrieve middle item(s) middle_items = [] for i in middle_items_indexes: for item, index_range in item_to_index_range: if i in range(*index_range): middle_items.append(item) continue print sum(middle_items) / float(len(middle_items))
This answer is based on an assumption based on Chris White 's answer. The answer involves using a combiner as an average to calculate frequency values. However, MapReduce combines are not always guaranteed. This has some side effects:
- the gearbox must first calculate the final <value - frequency> of the pair, and then calculate the median.
- In the worst case, combinators will never be executed, and the reducer will still have to struggle with processing all the individual values.
source share