In your example, in FOREACH there are many nested DISTINCT statements that are executed in the reducer, it relies on RAM to calculate unique values, and this query only produces one job. If there are too many unique elements in a group, you can also get memory exceptions.
Fortunately, PIG Latin is a data stream language, and you write a kind of execution plan. To use more processors, you can change your code to make MapReduce jobs that can run in parallel. To do this, we must rewrite the query without using the nested DISTINCT, the trick is to perform various operations and how to group, as if you had only one column, and then merge the results. This is very similar to SQL, but it works. There he is:
records = LOAD '....' USING PigStorage(',') AS (g, a, b, c, d, fd, s, w); selected = FOREACH records GENERATE g, a, b, c, d; grouped_a = FOREACH selected GENERATE g, a; grouped_a = DISTINCT grouped_a; grouped_a_count = GROUP grouped_a BY g; grouped_a_count = FOREACH grouped_a_count GENERATE FLATTEN(group) as g, COUNT(grouped_a) as a_count; grouped_b = FOREACH selected GENERATE g, b; grouped_b = DISTINCT grouped_b; grouped_b_count = GROUP grouped_b BY g; grouped_b_count = FOREACH grouped_b_count GENERATE FLATTEN(group) as g, COUNT(grouped_b) as b_count; grouped_c = FOREACH selected GENERATE g, c; grouped_c = DISTINCT grouped_c; grouped_c_count = GROUP grouped_c BY g; grouped_c_count = FOREACH grouped_c_count GENERATE FLATTEN(group) as g, COUNT(grouped_c) as c_count; grouped_d = FOREACH selected GENERATE g, d; grouped_d = DISTINCT grouped_d; grouped_d_count = GROUP grouped_d BY g; grouped_d_count = FOREACH grouped_d_count GENERATE FLATTEN(group) as g, COUNT(grouped_d) as d_count; mrg = JOIN grouped_a_count BY g, grouped_b_count BY g, grouped_c_count BY g, grouped_d_count BY g; out = FOREACH mrg GENERATE grouped_a_count::g, grouped_a_count::a_count, grouped_b_count::b_count, grouped_c_count::c_count, grouped_d_count::d_count; STORE out into '....' USING PigStorage(',');
After execution, I received the following summary, which shows that individual operations were not affected by the skew in the data processed by the first Job:
Job Stats (time in seconds): JobId Maps Reduces MaxMapTime MinMapTIme AvgMapTime MaxReduceTime MinReduceTime AvgReduceTime Alias Feature Outputs job_201206061712_0244 669 45 75 8 13 376 18 202 grouped_a,grouped_b,grouped_c,grouped_d,records,selected DISTINCT,MULTI_QUERY job_201206061712_0245 1 1 3 3 3 12 12 12 grouped_c_count GROUP_BY,COMBINER job_201206061712_0246 1 1 3 3 3 12 12 12 grouped_b_count GROUP_BY,COMBINER job_201206061712_0247 5 1 48 27 33 30 30 30 grouped_a_count GROUP_BY,COMBINER job_201206061712_0248 1 1 3 3 3 12 12 12 grouped_d_count GROUP_BY,COMBINER job_201206061712_0249 4 1 3 3 3 12 12 12 mrg,out HASH_JOIN ..., Input(s): Successfully read 52215768 records (44863559501 bytes) from: "...." Output(s): Successfully stored 9 records (181 bytes) in: "..."
From the DAG job, we see that groupby operations were executed in parallel:
Job DAG: job_201206061712_0244 -> job_201206061712_0248,job_201206061712_0246,job_201206061712_0247,job_201206061712_0245, job_201206061712_0248 -> job_201206061712_0249, job_201206061712_0246 -> job_201206061712_0249, job_201206061712_0247 -> job_201206061712_0249, job_201206061712_0245 -> job_201206061712_0249, job_201206061712_0249
It works great on my datasets, where one of the group key values (in column g) is 95% of the data. It also gets rid of memory related exceptions.