How to optimize the group according to the application in the PIG liner?

I have a skewed dataset, and I need to make a group by operation, and then make a nested foreach. Because of the skewed data, several gearboxes take a lot of time, while others do not lose time. I know that there is a skewed connection, but what is there for the by and foreach group? Here is my pig code (renamed to variables):

foo_grouped = GROUP foo_grouped by FOO; FOO_stats = FOREACH foo_grouped { a_FOO_total = foo_grouped.ATTR; a_FOO_total = DISTINCT a_FOO_total; bar_count = foo_grouped.BAR; bar_count = DISTINCT bar_count; a_FOO_type1 = FILTER foo_grouped by COND1=='Y'; a_FOO_type1 = a_FOO_type1.ATTR; a_FOO_type1 = DISTINCT a_FOO_type1; a_FOO_type2 = FILTER foo_grouped by COND2=='Y' OR COND3=='HIGH'; a_FOO_type2 = a_FOO_type2.ATTR; a_FOO_type2 = DISTINCT a_FOO_type2; generate group as FOO, COUNT(a_FOO_total) as a_FOO_total, COUNT(a_FOO_type1) as a_FOO_type1, COUNT(a_FOO_type2) as a_FOO_type2, COUNT(bar_count) as bar_count; } 
+4
source share
2 answers

In your example, in FOREACH there are many nested DISTINCT statements that are executed in the reducer, it relies on RAM to calculate unique values, and this query only produces one job. If there are too many unique elements in a group, you can also get memory exceptions.

Fortunately, PIG Latin is a data stream language, and you write a kind of execution plan. To use more processors, you can change your code to make MapReduce jobs that can run in parallel. To do this, we must rewrite the query without using the nested DISTINCT, the trick is to perform various operations and how to group, as if you had only one column, and then merge the results. This is very similar to SQL, but it works. There he is:

 records = LOAD '....' USING PigStorage(',') AS (g, a, b, c, d, fd, s, w); selected = FOREACH records GENERATE g, a, b, c, d; grouped_a = FOREACH selected GENERATE g, a; grouped_a = DISTINCT grouped_a; grouped_a_count = GROUP grouped_a BY g; grouped_a_count = FOREACH grouped_a_count GENERATE FLATTEN(group) as g, COUNT(grouped_a) as a_count; grouped_b = FOREACH selected GENERATE g, b; grouped_b = DISTINCT grouped_b; grouped_b_count = GROUP grouped_b BY g; grouped_b_count = FOREACH grouped_b_count GENERATE FLATTEN(group) as g, COUNT(grouped_b) as b_count; grouped_c = FOREACH selected GENERATE g, c; grouped_c = DISTINCT grouped_c; grouped_c_count = GROUP grouped_c BY g; grouped_c_count = FOREACH grouped_c_count GENERATE FLATTEN(group) as g, COUNT(grouped_c) as c_count; grouped_d = FOREACH selected GENERATE g, d; grouped_d = DISTINCT grouped_d; grouped_d_count = GROUP grouped_d BY g; grouped_d_count = FOREACH grouped_d_count GENERATE FLATTEN(group) as g, COUNT(grouped_d) as d_count; mrg = JOIN grouped_a_count BY g, grouped_b_count BY g, grouped_c_count BY g, grouped_d_count BY g; out = FOREACH mrg GENERATE grouped_a_count::g, grouped_a_count::a_count, grouped_b_count::b_count, grouped_c_count::c_count, grouped_d_count::d_count; STORE out into '....' USING PigStorage(','); 

After execution, I received the following summary, which shows that individual operations were not affected by the skew in the data processed by the first Job:

 Job Stats (time in seconds): JobId Maps Reduces MaxMapTime MinMapTIme AvgMapTime MaxReduceTime MinReduceTime AvgReduceTime Alias Feature Outputs job_201206061712_0244 669 45 75 8 13 376 18 202 grouped_a,grouped_b,grouped_c,grouped_d,records,selected DISTINCT,MULTI_QUERY job_201206061712_0245 1 1 3 3 3 12 12 12 grouped_c_count GROUP_BY,COMBINER job_201206061712_0246 1 1 3 3 3 12 12 12 grouped_b_count GROUP_BY,COMBINER job_201206061712_0247 5 1 48 27 33 30 30 30 grouped_a_count GROUP_BY,COMBINER job_201206061712_0248 1 1 3 3 3 12 12 12 grouped_d_count GROUP_BY,COMBINER job_201206061712_0249 4 1 3 3 3 12 12 12 mrg,out HASH_JOIN ..., Input(s): Successfully read 52215768 records (44863559501 bytes) from: "...." Output(s): Successfully stored 9 records (181 bytes) in: "..." 

From the DAG job, we see that groupby operations were executed in parallel:

 Job DAG: job_201206061712_0244 -> job_201206061712_0248,job_201206061712_0246,job_201206061712_0247,job_201206061712_0245, job_201206061712_0248 -> job_201206061712_0249, job_201206061712_0246 -> job_201206061712_0249, job_201206061712_0247 -> job_201206061712_0249, job_201206061712_0245 -> job_201206061712_0249, job_201206061712_0249 

It works great on my datasets, where one of the group key values ​​(in column g) is 95% of the data. It also gets rid of memory related exceptions.

+9
source

I recently ran into an error with this union. If the group has zero, then all relationships will be deleted ..

0
source

Source: https://habr.com/ru/post/1414081/


All Articles