教育行業(yè)A股IPO第一股(股票代碼 003032)

全國咨詢/投訴熱線:400-618-4000

GroupingComparator應(yīng)用案例

更新時(shí)間:2015年12月29日14時(shí)58分 來源:傳智播客云計(jì)算學(xué)科 瀏覽次數(shù):

MapReduce中的GroupingComparator應(yīng)用案例
在日常的數(shù)據(jù)統(tǒng)計(jì)分析中,常常會有類似如下的求分組最大值統(tǒng)計(jì)需求,用到的數(shù)據(jù)示例如下:
itemid    amount    date    …
10001    136.6    2015-1-12    …
10001    165.5    2015-1-12    …
10002    122.5    2015-1-12    …
10002    166.88    2015-1-12    …
10003    189.65    2015-1-12    …
10003    198.62    2015-1-13    …
10001    278.6    2015-1-13    …
10001    143.6    2015-1-13    …

需求是求出整個(gè)數(shù)據(jù)集中每一種商品銷售額最大的單筆訂單,結(jié)果如下:
10001    278.60
10002    166.88
10003    198.62
... ...
如果用傳統(tǒng)sql來求解,這是極其簡單的:
select itemid,max(amount) from t_order group by itemid;

而用mapreduce程序,該如何實(shí)現(xiàn)呢?最簡單的辦法是:
1、在mapper中將日志的每一行解析成鍵值對: “key: itemid ,value:amount”
2、經(jīng)過shuffle之后,相同itemid的數(shù)據(jù)會發(fā)送給同一個(gè)reducer
3、然后,我們就可以在reducer中遍歷某個(gè)item的一組values,
4、這一組values對于amount來說是無序的,進(jìn)而需要在reducer中緩存這一組values,然后排序從而取到這一組values中的最大值。

這個(gè)辦法固然可行,但是效率不是很高,因?yàn)樵趓educer中針對一組values取最大amount,需要在內(nèi)存中進(jìn)行緩存并排序,在數(shù)據(jù)量大的情況下,會耗費(fèi)相當(dāng)多的內(nèi)存空間和cpu運(yùn)算資源,甚至可能會內(nèi)存溢出。

現(xiàn)在,就讓我們來思考另一種實(shí)現(xiàn)方式,如果能讓數(shù)據(jù)到達(dá)reducer時(shí)的次序是針對amount的倒序,則我們可以直接取改組values的第一個(gè)值即可,如何實(shí)現(xiàn)呢?
1、首先,我們構(gòu)造一個(gè)bean<itemid,amount> implements WritableComparable作為mapper輸出的key來傳遞數(shù)據(jù),在其compareTo()方法中定義邏輯:按照itemid升序及amount降序,這樣一來,mapper輸出的數(shù)據(jù)就會按照amount降序排列,示例如下:
<10001,278.60>
<10001,165.50>
<10001,136.60>
<10002,166.88>
<10002,122.5>
.......
2、但是,這樣一來,又帶來一個(gè)棘手的問題——相同item的bean在shuffle時(shí)不一定發(fā)往同一個(gè)reducer!因?yàn)槊恳粋€(gè)bean(就算是相同itemid)都是一個(gè)不同的對象,而默認(rèn)HashPartitioner分區(qū)的邏輯是用bean的hashcode計(jì)算分區(qū)號。從而,需要自定義一個(gè)ItemPartitioner,實(shí)現(xiàn)將相同itemid的bean發(fā)往同一個(gè)reducer,代碼如下所示:
class ItemPartitioner extends Partitioner{
int getPartition(bean,numreducertasks){
      return bean.getItemid.hashCode() % numreducertasks;
}
}

這樣一來,可以保證相同item的數(shù)據(jù)會到達(dá)同一個(gè)reducer,并且是按照amount降序排序,如下所示:
<10001,278.60>
<10001,165.50>
<10001,136.60>
.......

3、接下來,就是如何取到這一組values中的最大值。
在默認(rèn)情況下,reducer會將拿到的數(shù)據(jù)按照相同key進(jìn)行聚合,然后對聚合起來的每一組數(shù)據(jù)調(diào)用一次reduce方法,此處麻煩的問題是,這里的每一個(gè)key都是一個(gè)對象,從而,就算是相同itemid的數(shù)據(jù),也不會聚合到一組,而是會逐一地調(diào)用reduce()方法進(jìn)行處理,這樣一來,我們也就沒辦法取到最大值了;
4、要解決這個(gè)問題,就得借助GroupingComparator了,其工作機(jī)制是這樣:
當(dāng)mapper輸出的相同partition的kv數(shù)據(jù)到達(dá)一個(gè)Reducer后,會有一個(gè)聚合的過程,即將“相同”key的kv聚合到一起(其實(shí)質(zhì)是利用GroupingComparator來對key進(jìn)行比較),然后將這一組聚合好的kv中最前面的一個(gè)kv的key傳給reduce方法的入?yún)ey,將一個(gè)用來遍歷這一組kv數(shù)據(jù)的values的迭代器iterator傳給reduce方法的入?yún)terator。

5、從而,我們可以自定義一個(gè)GroupingComparator來定義哪些kv可以聚合成一組,代碼示例如下:
public class GroupingComparator extends WritableComparator{
protected  GroupingComparator() {
super(Bean.class,true);
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
Bean kv1 = (Bean) a;
Bean kv2 = (Bean) b;
int cmp = kv1.getItemid().compareTo(kv2.getItemid());
return cmp;
}
}


6、這樣一來,雖然不同的bean是不同的對象,但是在進(jìn)行聚合的時(shí)候,根據(jù)GroupingComparator ,只要是itemid相同的bean都會算成一組聚合kv,然后這一組聚合kv的最前面一個(gè)kv(也就是amount值最大的那一個(gè))會傳入reduce方法的入?yún)ey,從而,在我們的reduce方法中,只要直接輸出這個(gè)key就ok了:
@Override
protected void reduce(Bean bean,Iterable<NullWritable> arg1,Context context)throws IOException, InterruptedException {
context.write(bean, NullWritable.get());
}

當(dāng)然,要想讓這個(gè)GroupingComparator 生效,還需要在job中進(jìn)行注冊:
job.setGroupingComparatorClass(GroupingComparator.class);


綜上所述,該案例需要自定義這幾個(gè)元素:
     自定義的復(fù)合key
     自定義的partitioner
     自定義的GroupingComparator
0 分享到:
和我們在線交談!