中文字幕在线观看,亚洲а∨天堂久久精品9966,亚洲成a人片在线观看你懂的,亚洲av成人片无码网站,亚洲国产精品无码久久久五月天

Apache Beam 實(shí)戰(zhàn)指南 | 大數(shù)據(jù)管道 (pipeline) 設(shè)計(jì)及實(shí)戰(zhàn)

2019-08-27    來源:raincent

容器云強(qiáng)勢(shì)上線!快速搭建集群,上萬(wàn)Linux鏡像隨意使用

作者:張海濤

關(guān)于 Apache Beam 實(shí)戰(zhàn)指南系列文章

隨著大數(shù)據(jù) 2.0 時(shí)代悄然到來,大數(shù)據(jù)從簡(jiǎn)單的批處理擴(kuò)展到了實(shí)時(shí)處理、流處理、交互式查詢和機(jī)器學(xué)習(xí)應(yīng)用。近年來涌現(xiàn)出諸多大數(shù)據(jù)應(yīng)用組件,如 HBase、Hive、Kafka、Spark、Flink 等。開發(fā)者經(jīng)常要用到不同的技術(shù)、框架、API、開發(fā)語(yǔ)言和 SDK 來應(yīng)對(duì)復(fù)雜應(yīng)用的開發(fā),這大大增加了選擇合適工具和框架的難度,開發(fā)者想要將所有的大數(shù)據(jù)組件熟練運(yùn)用幾乎是一項(xiàng)不可能完成的任務(wù)。

面對(duì)這種情況,Google 在 2016 年 2 月宣布將大數(shù)據(jù)流水線產(chǎn)品(Google DataFlow)貢獻(xiàn)給 Apache 基金會(huì)孵化,2017 年 1 月 Apache 對(duì)外宣布開源 Apache Beam,2017 年 5 月迎來了它的第一個(gè)穩(wěn)定版本 2.0.0。在國(guó)內(nèi),大部分開發(fā)者對(duì)于 Beam 還缺乏了解,社區(qū)中文資料也比較少。

一.概述

其他行業(yè)問咱們 IT 具體干什么的,很多 IT 人員會(huì)自嘲自己就是“搬磚”(此處將復(fù)制代碼稱為搬磚)的民工。過了兩天 GitHub 出現(xiàn)自動(dòng)寫代碼的人工智能,IT 程序員深深嘆了一口氣說道“完了要失業(yè)了,代碼沒得搬了”。其實(shí)從入行 IT 那一刻起,不管我們做前端、服務(wù)端、底層架構(gòu)等任何崗位,其實(shí)我們都是為數(shù)據(jù)服務(wù)的服務(wù)人員(注:不是說從民工轉(zhuǎn)崗到服務(wù)員了):把數(shù)據(jù)從后端搬到前端,把前端數(shù)據(jù)再寫入數(shù)據(jù)庫(kù)。盡管編程語(yǔ)言從 C、C++、C#、JAVA、Python 不停變化,為了適應(yīng)時(shí)代背景框架也是千變?nèi)f化,我們拼命從“亞馬遜熱帶雨林”一直學(xué)到“地中海”。

然后 Apache Beam 這個(gè)一統(tǒng)“地中海”的框架出現(xiàn)了。Apache Beam 不光統(tǒng)一了數(shù)據(jù)源,還統(tǒng)一了流批計(jì)算。在這個(gè)數(shù)據(jù)傳輸過程中有一條核心的技術(shù)就是管道(Pipeline),不管是 Strom,F(xiàn)link ,Beam 它都是核心。在這條管道中可以對(duì)數(shù)據(jù)進(jìn)行過濾、凈化、清洗、合并、分流以及各種實(shí)時(shí)計(jì)算操作。

本文會(huì)詳細(xì)介紹如何設(shè)計(jì) Apache Beam 管道、管道設(shè)計(jì)工具介紹、源碼和案例分析,普及和提升大家對(duì) Apache Beam 管道的認(rèn)知。

二.怎樣設(shè)計(jì)好自己的管道?

設(shè)計(jì)管道注意事項(xiàng)

 

 

圖 2-1 簡(jiǎn)單管道

1. 你輸入的數(shù)據(jù)存儲(chǔ)在那里?

首先要確定你要構(gòu)造幾條數(shù)據(jù)源,在 Beam 可以構(gòu)建多條,構(gòu)建之前可以選擇自己的 SDK 的 IO。

2. 你的數(shù)據(jù)類型是什么樣的?

Beam 提供的是鍵值對(duì)的數(shù)據(jù)類型,你的數(shù)據(jù)可能是日志文本、格式化設(shè)備事件、數(shù)據(jù)庫(kù)的行,所以在 PCollection 就應(yīng)該確定數(shù)據(jù)集的類型。

3. 你想怎么處理數(shù)據(jù)?

對(duì)數(shù)據(jù)進(jìn)行轉(zhuǎn)換、過濾處理、窗口計(jì)算、SQL 處理等。 在管道中提供了通用的 ParDo 轉(zhuǎn)換類,算子計(jì)算以及 BeamSQL 等操作。

4. 你打算把數(shù)據(jù)最后輸出到哪里去?

在管道末尾進(jìn)行 Write 寫入操作,把數(shù)據(jù)最后寫入你自己想存放或最后流向的地方。

管道的幾種玩法

1. 分支管道:多次轉(zhuǎn)換,處理相同的數(shù)據(jù)集

 

 

圖 2-2-1 多次轉(zhuǎn)換處理相同數(shù)據(jù)示意圖

描述:例如上圖 2-1-1 圖所示,從一個(gè)數(shù)據(jù)庫(kù)的表讀取或轉(zhuǎn)換數(shù)據(jù)集,然后從數(shù)據(jù)集中分別找找以字母“A”開頭的數(shù)據(jù)放入一個(gè)分支數(shù)據(jù)集中,如果以字母“B”開頭的數(shù)據(jù)放入另一個(gè)分支數(shù)據(jù)集中,最終兩個(gè)數(shù)據(jù)集進(jìn)行隔離處理。

數(shù)據(jù)集:

// 為了演示顯示內(nèi)存數(shù)據(jù)集
final List LINES = Arrays.asList(
"Aggressive",
"Bold",
"Apprehensive",
"Brilliant");

示例代碼:

PCollection dbRowCollection = ...;// 這個(gè)地方可以讀取任何數(shù)據(jù)源。
PCollection aCollection = dbRowCollection.apply("aTrans", ParDo.of(new DoFn(){
@ProcessElement
public void processElement(ProcessContext c) {
if(c.element().startsWith("A")){// 查找以 "A" 開頭的數(shù)據(jù)
c.output(c.element());
System.out.append("A 開頭的單詞有:"+c.element()+"\r");
}
}
}));
PCollection bCollection = dbRowCollection.apply("bTrans", ParDo.of(new DoFn(){
@ProcessElement
public void processElement(ProcessContext c) {
if(c.element().startsWith("B")){// 查找以 "A" 開頭的數(shù)據(jù)
c.output(c.element());
System.out.append("B 開頭的單詞有:"+c.element()+"\r");
}
}
}));

最終結(jié)果展示:

A 開頭的單詞有:Aggressive
B 開頭的單詞有:Bold
A 開頭的單詞有:Apprehensive
B 開頭的單詞有:Brilliant

原示例代碼地址 : pipelineTest2_1

2. 分支管道:一次轉(zhuǎn)換,輸出多個(gè)數(shù)據(jù)集

 

 

圖 2-2-2 一次轉(zhuǎn)換多個(gè)輸出示意圖

描述:根據(jù)圖 2-2-1 和圖 2-2-2 圖中可以看出,他們以不同的方式執(zhí)行著相同的操作,圖 2-2-1 中的管道包含兩個(gè)轉(zhuǎn)換,用于處理同一輸入中的元素 PCollection。一個(gè)轉(zhuǎn)換使用以下邏輯:

if(以'A'開頭){outputToPCollectionA}

另一個(gè)轉(zhuǎn)換為

if(以'B'開頭){outputToPCollectionB}

因?yàn)槊總(gè)轉(zhuǎn)換讀取整個(gè)輸入 PCollection,所以輸入中的每個(gè)元素都會(huì) PCollection 被處理兩次。

圖 2-2-2 中的管道以不同的方式執(zhí)行相同的操作 - 只有一個(gè)轉(zhuǎn)換使用以下邏輯:

if(以'A'開頭){outputToPCollectionA} else if(以'B'開頭){outputToPCollectionB}

其中輸入中的每個(gè)元素都 PCollection 被處理一次。

數(shù)據(jù)集:同 2-1-1 數(shù)據(jù)集

示例代碼:

// 定義兩個(gè) TupleTag,每個(gè)輸出一個(gè)。
final TupleTag startsWithATag = new TupleTag(){};
final TupleTag startsWithBTag = new TupleTag(){};
PCollectionTuple mixedCollection =
dbRowCollection.apply(ParDo
.of(new DoFn() {
@ProcessElement
public void processElement(ProcessContext c) {
if (c.element().startsWith("A")) {
// 返回首字母帶有 "A" 的數(shù)據(jù)集。
c.output(c.element());
} else if(c.element().startsWith("B")) {
// // 返回首字母帶有 "B" 的數(shù)據(jù)集。
c.output(startsWithBTag, c.element());
}
}
})
// Specify main output. In this example, it is the output
// with tag startsWithATag.
.withOutputTags(startsWithATag,
// Specify the output with tag startsWithBTag, as a TupleTagList.
TupleTagList.of(startsWithBTag)));
// Get subset of the output with tag startsWithATag.
mixedCollection.get(startsWithATag).apply(...);
// Get subset of the output with tag startsWithBTag.
mixedCollection.get(startsWithBTag).apply(...);

如果每個(gè)元素的轉(zhuǎn)換計(jì)算非常耗時(shí),則使用其他輸出會(huì)更有意義,因?yàn)橐淮涡赃^濾全部數(shù)據(jù),比全部數(shù)據(jù)過濾兩次從性能上和轉(zhuǎn)換上都存在一定程度上提升,數(shù)據(jù)量越大越明顯。

最終結(jié)果展示:

復(fù)制代碼A 開頭的單詞有:ApprehensiveA 開頭的單詞有:AggressiveB 開頭的單詞有:BrilliantB 開頭的單詞有:Bold

原示例代碼地址 : pipelineTest2_2

3. 合并管道:多個(gè)數(shù)據(jù)集,合并成一個(gè)管道輸出

 

 

圖 2-2-3 多數(shù)據(jù)集合并輸出圖

描述:

上圖 2-2-3 是接圖 2-2-1 的繼續(xù),把帶“A” 的數(shù)據(jù)和帶“B” 字母開頭的數(shù)據(jù)進(jìn)行合并到一個(gè)管道。

這個(gè)地方注意點(diǎn)是 Flatten 用法必須兩個(gè)數(shù)據(jù)的數(shù)據(jù)類型相同。

數(shù)據(jù)集:

// 為了演示顯示內(nèi)存數(shù)據(jù)集
final List LINESa = Arrays.asList(
"Aggressive",
"Apprehensive");
final List LINESb = Arrays.asList(
"Bold",
"Brilliant");

示例代碼:

// 將兩個(gè) PCollections 與 Flatten 合并
PCollectionList collectionList = PCollectionList.of(aCollection).and(bCollection);
PCollection mergedCollectionWithFlatten = collectionList
.apply(Flatten.pCollections());
// 繼續(xù)合并新的 PCollection
mergedCollectionWithFlatten.apply(...);

結(jié)果展示:

合并單詞單詞有:
Aggressive
Brilliant
Apprehensive
Bold

原示例代碼地址 : pipelineTest2_3

4. 合并管道:多個(gè)數(shù)據(jù)源,鏈接合并一個(gè)管道輸出

 

 

圖 2-2-4 多數(shù)據(jù)源合并輸出圖

描述:

你的管道可以從一個(gè)或多個(gè)源讀取或輸入。如果你的管道從多個(gè)源讀取并且這些源中的數(shù)據(jù)相關(guān)聯(lián),則將輸入連接在一起會(huì)很有用。在上面的圖 2-2-4 所示的示例中,管道從數(shù)據(jù)庫(kù)表中讀取名稱和地址,并從 Kafka 主題中讀取名稱和訂單號(hào)。然后管道 CoGroupByKey 用于連接此信息,其中鍵是名稱 ; 結(jié)果 PCollection 包含名稱,地址和訂單的所有組合。

示例代碼:

PCollection> userAddress = pipeline.apply(JdbcIO.>read()...);
PCollection> userOrder = pipeline.apply(KafkaIO.read()...);
final TupleTag addressTag = new TupleTag();
final TupleTag orderTag = new TupleTag();
// 將集合值合并到 CoGbkResult 集合中。
PCollection> joinedCollection =
KeyedPCollectionTuple.of(addressTag, userAddress)
.and(orderTag, userOrder)
.apply(CoGroupByKey.create());
joinedCollection.apply(...);

管道的設(shè)計(jì)工具

對(duì)于管道的設(shè)計(jì)不光用代碼去實(shí)現(xiàn),也可以用視圖工具,F(xiàn)在存在的有兩種一種是拓藍(lán)公司出品叫 Talend Big Data Studio,另一種就是免費(fèi)開源的視圖設(shè)計(jì)工具 kettle-beam 。

 

 

三.怎樣創(chuàng)建你的管道

Apache Beam 程序從頭到尾就是處理數(shù)據(jù)的管道。本小節(jié)使用 Apache Beam SDK 中的類構(gòu)建管道,一個(gè)完整的 Apache Beam 管道構(gòu)建流程如下:

首先創(chuàng)建一個(gè) Pipeline 對(duì)象。

不管是數(shù)據(jù)做任何操作,如“ 讀取”或“ 創(chuàng)建”及轉(zhuǎn)換都要為管道創(chuàng)建 PCollection 一個(gè)或多個(gè)的數(shù) 據(jù)集(PCollection****)。

在 Apache Beam 的管道中你可以對(duì)數(shù)據(jù)集 PCollection 做任何操作,例如轉(zhuǎn)換數(shù)據(jù)格式,過濾,分組,分析或以其他方式處理數(shù)據(jù)中的每一個(gè)元素。每個(gè)轉(zhuǎn)換都會(huì)創(chuàng)建一個(gè)新輸出數(shù)據(jù)集 PCollection,當(dāng)然你可以在處理完成之前進(jìn)行做任何的轉(zhuǎn)換處理。

把你認(rèn)為最終處理完成的數(shù)據(jù)集寫或以其他方式輸出最終的存儲(chǔ)地方。

最后運(yùn)行管道。

創(chuàng)建管道對(duì)象

每一個(gè) Apache Beam 程序都會(huì)從創(chuàng)建管道(Pipeline)對(duì)象開始。

在 Apache Beam SDK,每一個(gè)管道都是一個(gè)獨(dú)立的實(shí)體,管道的數(shù)據(jù)集也都封裝著它的數(shù)據(jù)和對(duì)應(yīng)的數(shù)據(jù)類型(在 Apache Beam 中有對(duì)應(yīng)的數(shù)據(jù)轉(zhuǎn)換類型包)。最后把數(shù)據(jù)進(jìn)行用于各種轉(zhuǎn)換操作。

在創(chuàng)建的管道的時(shí)候需要設(shè)置管道選項(xiàng) PipelineOptions,有兩種創(chuàng)建方式第一種是無參數(shù)和一種有參數(shù)的。具體兩種有什么不同呢? 無參數(shù)的可以在程序中指定相應(yīng)的管道選項(xiàng)參數(shù),如顯示設(shè)置執(zhí)行大數(shù)據(jù)引擎參數(shù)。有參數(shù)的就可以在提交 Apache Beam jar 程序的時(shí)候進(jìn)行用 Shell 腳本的方式后期設(shè)置管道對(duì)應(yīng)的參數(shù)。

具體示例如下:

無參數(shù)

// 首先定義管道的選項(xiàng)
PipelineOptions options = PipelineOptionsFactory.create();
options.setRunner(DirectRunner.class); // 顯示設(shè)置執(zhí)行大數(shù)據(jù)引擎
// 創(chuàng)建管道實(shí)體對(duì)象
Pipeline p = Pipeline.create(options);

有參數(shù)

PipelineOptions options =
PipelineOptionsFactory.fromArgs(args).withValidation().create();

提交設(shè)置參數(shù)的格式如下:

--

標(biāo)簽: 大數(shù)據(jù)管道 機(jī)器學(xué)習(xí)

版權(quán)申明:本站文章部分自網(wǎng)絡(luò),如有侵權(quán),請(qǐng)聯(lián)系:west999com@outlook.com
特別注意:本站所有轉(zhuǎn)載文章言論不代表本站觀點(diǎn)!
本站所提供的圖片等素材,版權(quán)歸原作者所有,如需使用,請(qǐng)與原作者聯(lián)系。

上一篇:Caffe作者賈揚(yáng)清:AI,從大數(shù)據(jù)演進(jìn)到高性能計(jì)算

下一篇:Apache Flink 1.9 重磅發(fā)布:正式合并阿里內(nèi)部版本Blink重要功能