中文字幕在线观看,亚洲а∨天堂久久精品9966,亚洲成a人片在线观看你懂的,亚洲av成人片无码网站,亚洲国产精品无码久久久五月天

Uber開(kāi)源Marmaray:基于Hadoop的通用數(shù)據(jù)攝取和分散框架

2018-09-30    來(lái)源:raincent

容器云強(qiáng)勢(shì)上線(xiàn)!快速搭建集群,上萬(wàn)Linux鏡像隨意使用

作者:Uber 工程博客

翻譯:無(wú)明

 

 

三年前,Uber 采用 Apache Hadoop 作為數(shù)據(jù)平臺(tái),從而可以跨計(jì)算機(jī)集群管理數(shù) PB 的數(shù)據(jù)。但是,因?yàn)槲覀冇泻芏鄨F(tuán)隊(duì)、工具和數(shù)據(jù)源,所以需要一種可靠的方式來(lái)攝取和分散數(shù)據(jù)。Marmaray 是 Uber 開(kāi)源的 Apache Hadoop 數(shù)據(jù)提取和分散框架。Marmaray 由我們的 Hadoop 平臺(tái)團(tuán)隊(duì)設(shè)計(jì)和開(kāi)發(fā),是一個(gè)建立在 Hadoop 生態(tài)系統(tǒng)之上的基于插件的框架。用戶(hù)可以新增插件以便從任何來(lái)源攝取數(shù)據(jù),并利用 Apache Spark 將數(shù)據(jù)分散到接收器上。

Marmaray 這個(gè)名字源于土耳其的一條連接歐洲和亞洲的隧道,在 Uber 內(nèi)部,我們將 Marmaray 設(shè)想為根據(jù)客戶(hù)偏好將數(shù)據(jù)從任何來(lái)源連接到任何接收器的管道。數(shù)據(jù)湖的數(shù)據(jù)通常在質(zhì)量方面存在很大差異。Marmaray 可以確保所有攝取的原始數(shù)據(jù)都符合適當(dāng)?shù)脑茨J,保持高質(zhì)量水平,從而帶來(lái)可靠的分析結(jié)果。數(shù)據(jù)科學(xué)家可以將時(shí)間花在從這些數(shù)據(jù)中提取有用的見(jiàn)解上,而不是用來(lái)處理數(shù)據(jù)質(zhì)量問(wèn)題。

在 Uber,Marmaray 以聚合的方式連接各種系統(tǒng)和服務(wù)

通過(guò)我們的模式管理庫(kù)和服務(wù)生成高質(zhì)量的模式化數(shù)據(jù)。

通過(guò) Marmaray 的攝取組件將多個(gè)數(shù)據(jù)存儲(chǔ)中的數(shù)據(jù)攝取到我們的 Hadoop 數(shù)據(jù)湖中。

使用 Uber 的內(nèi)部工作流程編排服務(wù)來(lái)構(gòu)建管道,用以處理攝取的數(shù)據(jù),以及根據(jù)這些數(shù)據(jù)保存和計(jì)算業(yè)務(wù)指標(biāo)。

將處理的結(jié)果提供給在線(xiàn)數(shù)據(jù)存儲(chǔ),內(nèi)部客戶(hù)可以通過(guò) Marmaray 查詢(xún)數(shù)據(jù)并獲得近乎實(shí)時(shí)的結(jié)果。

 

 

圖 1:Marmaray 將數(shù)據(jù)攝取到 Hadoop 數(shù)據(jù)湖中,并分散到數(shù)據(jù)存儲(chǔ)中。

雖然 Marmaray 實(shí)現(xiàn)了任意數(shù)據(jù)源到任意數(shù)據(jù)槽的連接,但我們還需要構(gòu)建一個(gè)自助服務(wù)平臺(tái),為來(lái)自不同背景、團(tuán)隊(duì)和技術(shù)專(zhuān)業(yè)知識(shí)的用戶(hù)提供無(wú)縫的使用體驗(yàn)。

大規(guī)模數(shù)據(jù)攝入的挑戰(zhàn)

Uber 的業(yè)務(wù)生成大量的原始數(shù)據(jù),并將它們存儲(chǔ)在各種數(shù)據(jù)源中,例如 Kafka、Schemaless 和 MySQL。我們需要將這些數(shù)據(jù)攝取到 Hadoop 數(shù)據(jù)湖中,以進(jìn)行業(yè)務(wù)分析。隨著 Uber 業(yè)務(wù)的垂直增長(zhǎng),數(shù)據(jù)攝取的規(guī)模呈現(xiàn)出指數(shù)級(jí)的增長(zhǎng)。面對(duì)大規(guī)模數(shù)據(jù)可靠性的需求,我們不得不重新構(gòu)建我們的攝取平臺(tái),以確保能夠跟上公司增長(zhǎng)的步伐。

 

 

圖 2:隨著 Uber 繼續(xù)擴(kuò)展全球業(yè)務(wù),存儲(chǔ)在 Hadoop 數(shù)據(jù)湖中的原始數(shù)據(jù)呈指數(shù)級(jí)增長(zhǎng)。

之前的數(shù)據(jù)架構(gòu)需要運(yùn)行和維護(hù)多個(gè)數(shù)據(jù)管道,每個(gè)管道對(duì)應(yīng)不同的生產(chǎn)代碼庫(kù),隨著數(shù)據(jù)量的增加,這些管道變得越來(lái)越繁瑣。MySQL、Kafka 和 Schemaless 等數(shù)據(jù)源包含需要被攝取到 Hive 中的原始數(shù)據(jù),以支持整個(gè)公司團(tuán)隊(duì)的各種分析需求。每個(gè)數(shù)據(jù)源都有自己的代碼庫(kù)和相關(guān)的復(fù)雜性,以及一組獨(dú)有的配置、圖表和警報(bào)。添加新的攝取源迫在眉睫,維護(hù)的開(kāi)銷(xiāo)要求我們的大數(shù)據(jù)生態(tài)系統(tǒng)支持所有這些系統(tǒng)。輪班待命的負(fù)擔(dān)十分繁重,有時(shí)每周會(huì)收到 200 多個(gè)警報(bào)。

 

 

圖 3:Hadoop 平臺(tái)團(tuán)隊(duì)的輪班待命警報(bào)圖表,可見(jiàn)維護(hù)系統(tǒng)需要多大的開(kāi)銷(xiāo)。

隨著 Marmaray 的推出,我們將攝取管道整合到一個(gè)與源無(wú)關(guān)的管道和代碼庫(kù)中,以此來(lái)提高可維護(hù)性和資源利用率。

無(wú)論源數(shù)據(jù)存儲(chǔ)是什么,單一攝取管道都將執(zhí)行相同的有向非循環(huán)圖作業(yè)(DAG)。在運(yùn)行時(shí),攝取行為將根據(jù)特定源(類(lèi)似于策略設(shè)計(jì)模式)而有所變化,以協(xié)調(diào)攝取過(guò)程,并使用一種通用的靈活配置來(lái)適應(yīng)未來(lái)不同的需求和用例。

Uber 的數(shù)據(jù)分散需求

我們的很多內(nèi)部數(shù)據(jù)用戶(hù),例如 Uber Eats 和 Michelangelo 機(jī)器學(xué)習(xí)平臺(tái)團(tuán)隊(duì),他們使用 Hadoop 與其他工具來(lái)構(gòu)建和訓(xùn)練機(jī)器學(xué)習(xí)模型,以生成有價(jià)值的衍生數(shù)據(jù)集,從而提高效率并改善用戶(hù)體驗(yàn)。為了最大化這些衍生數(shù)據(jù)集的價(jià)值,需要將這些數(shù)據(jù)分散到在線(xiàn)數(shù)據(jù)存儲(chǔ)中,這些數(shù)據(jù)存儲(chǔ)通常要求比 Hadoop 生態(tài)系統(tǒng)低得多的延遲。

在引入 Marmaray 之前,每個(gè)團(tuán)隊(duì)都需要構(gòu)建自己的臨時(shí)擴(kuò)散系統(tǒng)。這些重復(fù)性工作和不具有通用性的功能通常導(dǎo)致工程資源的極度浪費(fèi)。Marmaray 于 2017 年底發(fā)布,以滿(mǎn)足對(duì)靈活的通用分散平臺(tái)的需求,它將 Hadoop 數(shù)據(jù)傳輸?shù)饺魏我粋(gè)在線(xiàn)數(shù)據(jù)存儲(chǔ),以此來(lái)完善 Hadoop 生態(tài)系統(tǒng)。

跟蹤端到端數(shù)據(jù)傳輸

我們的很多內(nèi)部用戶(hù)需要保證將數(shù)據(jù)源的數(shù)據(jù)傳輸?shù)侥繕?biāo)接收器,他們還需要完整性指標(biāo),包括數(shù)據(jù)傳輸?shù)阶罱K接收器的可靠性。從理論上講,這意味著數(shù)據(jù)被 100%傳輸?shù)侥康牡,但?shí)際上我們的目標(biāo)是提供 99.99% 到 99.999%的可靠性。當(dāng)記錄數(shù)量非常少時(shí),可以直接對(duì)源系統(tǒng)和接收器系統(tǒng)運(yùn)行查詢(xún)來(lái)驗(yàn)證數(shù)據(jù)是否已送達(dá)。

在 Uber,我們每天攝取數(shù) PB 的數(shù)據(jù)和超過(guò) 1000 億條消息,因此無(wú)法通過(guò)查詢(xún)的方式進(jìn)行數(shù)據(jù)驗(yàn)證。面對(duì)這么大規(guī)模的數(shù)據(jù),我們需要一個(gè)能夠跟蹤數(shù)據(jù)傳輸而不會(huì)顯著增加延遲的系統(tǒng)。Marmaray 通過(guò)自定義 Spark 累加器來(lái)桶分化記錄,讓用戶(hù)能夠以最小的開(kāi)銷(xiāo)監(jiān)控?cái)?shù)據(jù)傳輸。

Marmaray 的架構(gòu)

下面的架構(gòu)圖說(shuō)明了 Marmaray 的基本構(gòu)建塊和抽象。我們可以通過(guò)這些通用組件向 Marmaray 添加擴(kuò)展,以便支持新的數(shù)據(jù)源和接收器。

 

 

圖 4:Marmaray 的架構(gòu),包含各種組件。

DataConverters

攝取和分散作業(yè)主要對(duì)來(lái)自數(shù)據(jù)源的記錄執(zhí)行轉(zhuǎn)換,以確保在將數(shù)據(jù)寫(xiě)入目標(biāo)接收器之前具備所需格式。Marmaray 將多個(gè)轉(zhuǎn)換器鏈接在一起執(zhí)行多次轉(zhuǎn)換,并且可以寫(xiě)入多個(gè)接收器。

DataConverters 的一個(gè)關(guān)鍵的作用是在轉(zhuǎn)換時(shí)生成錯(cuò)誤記錄。為了保證分析結(jié)果更準(zhǔn)確,所有原始數(shù)據(jù)在被攝入到 Hadoop 數(shù)據(jù)湖之前必須符合一定的模式。任何包含錯(cuò)誤格式、缺少必需字段或被視為有問(wèn)題的數(shù)據(jù)都將被過(guò)濾掉并寫(xiě)入錯(cuò)誤表。

WorkUnitCalculator

Marmaray 按照批次來(lái)移動(dòng)數(shù)據(jù),批次大小是可配的。為了計(jì)算需要處理的數(shù)據(jù)量,我們引入了 WorkUnitCalculator 的概念。WorkUnitCalculator 將檢查輸入源的類(lèi)型和之前的檢查點(diǎn),然后計(jì)算下一個(gè)工作單元或批次。工作單元可以是 Kafka 的偏移范圍或 Hive/HDFS 的 HDFS 文件集合。

在計(jì)算下一批數(shù)據(jù)時(shí),WorkUnitCalculator 還可以應(yīng)用限流信息,例如,要讀取的最大數(shù)據(jù)量或要從 Kafka 讀取的消息數(shù)。這些限流信息可根據(jù)具體用例進(jìn)行靈活配置,從而確保工作單元的大小不會(huì)壓垮源系統(tǒng)或接收系統(tǒng)。

Metadata Manager

 

 

圖 5:Marmaray 的 Metadata Manager 用于存儲(chǔ)運(yùn)行中作業(yè)的相關(guān)元數(shù)據(jù)。

所有 Marmaray 作業(yè)都需要一個(gè)持久存儲(chǔ)(我們稱(chēng)之為 Metadata Manager)來(lái)緩存作業(yè)的元數(shù)據(jù)信息。作業(yè)可以在執(zhí)行期間更新?tīng)顟B(tài),并在作業(yè)執(zhí)行成功后替換舊狀態(tài),否則將無(wú)法修改狀態(tài)。在 Uber,我們使用 Metadata Manager 存儲(chǔ)檢查點(diǎn)信息(或 Kafka 的分區(qū)偏移量)、平均記錄大小和平均消息數(shù)等元數(shù)據(jù)。元數(shù)據(jù)存儲(chǔ)是通用的,可以存儲(chǔ)任何相關(guān)的度量指標(biāo),這些度量指標(biāo)可用于根據(jù)用例和用戶(hù)需求跟蹤、描述或收集作業(yè)的狀態(tài)。

ForkOperator 和 ForkFunction

ForkOperator 使用 ForkFunction 將輸入的記錄流拆分為多個(gè)輸出流,并驗(yàn)證符合模式的記錄和錯(cuò)誤的記錄,然后可以單獨(dú)處理這些記錄。

 

 

圖 6:ForkOperator 和 ForkFunction 用于將原始數(shù)據(jù)記錄拆分為符合模式的記錄和錯(cuò)誤的記錄。

ISource 和 ISink

ISource 包含工作單元所需的源數(shù)據(jù)信息,ISink 包含有關(guān)如何寫(xiě)入接收器的必要信息。例如,Cassandra 接收器可能需要包含集群、表、分區(qū)鍵和集群鍵的信息。Kafka 源需要包含主題名稱(chēng)、要讀取的最大消息數(shù)、群集信息和偏移量初始化策略以及其他元數(shù)據(jù)信息。

數(shù)據(jù)模型和工作流

AvroPayload 是 Marmaray 架構(gòu)的核心組件,它是 Avro GenericRecord 二進(jìn)制編碼格式的包裝器,包含了需要處理的數(shù)據(jù)的元數(shù)據(jù)信息。

 

 

圖 7:AvroPayload 使用元數(shù)據(jù)包裝 GenericRecord。

Avro 數(shù)據(jù)(GenericRecord)的主要優(yōu)點(diǎn)是它在內(nèi)存存儲(chǔ)和網(wǎng)絡(luò)傳輸方面都很高效,與 JSON 相比,二進(jìn)制編碼數(shù)據(jù)通過(guò)線(xiàn)路發(fā)送需要更小的模式開(kāi)銷(xiāo)。在 Spark 架構(gòu)之上使用 Avro 數(shù)據(jù)意味著我們還可以利用 Spark 的數(shù)據(jù)壓縮和加密功能。這些優(yōu)勢(shì)有助于我們的 Spark 作業(yè)更有效地處理大規(guī)模數(shù)據(jù)。

為了支持連接任何數(shù)據(jù)源和任意接收器的架構(gòu),我們要求所有攝取源將轉(zhuǎn)換器定義為從自己的格式到 Avro,所有分散接收器將轉(zhuǎn)換器定義為從 Avro 到本地?cái)?shù)據(jù)模型(對(duì)于 Cassandra 來(lái)說(shuō)就是 ByteBuffers)。

要求所有轉(zhuǎn)換器將數(shù)據(jù)轉(zhuǎn)換為 AvroPayload 格式或從 AvroPayload 格式轉(zhuǎn)換為數(shù)據(jù)為形成了松散耦合的數(shù)據(jù)模型。在定義了數(shù)據(jù)源及其相關(guān)的轉(zhuǎn)換器后,就可以將數(shù)據(jù)分散到任何受支持的接收器,因?yàn)樗薪邮掌鞫际窃床豢芍,它們只關(guān)心 AvroPayload 格式的數(shù)據(jù)。下圖描繪了 Marmaray 的數(shù)據(jù)模型:

 

 

圖 8:對(duì)于攝取和分散,Marmaray 要求將數(shù)據(jù)轉(zhuǎn)換為 AvroPayload,這是一個(gè)基于 Avro 的 GenericRecord 格式的包裝器。

下圖描繪了 Marmaray 作業(yè)是如何獨(dú)立于特定的數(shù)據(jù)源或接收器進(jìn)行編排的。

 

 

圖 9:Marmaray 獨(dú)立于數(shù)據(jù)源或接收器運(yùn)行攝取和分散作業(yè)。

在這個(gè)過(guò)程中,每個(gè)數(shù)據(jù)源和接收器的屬性配置將指向作業(yè)的下一個(gè)步驟,包括計(jì)算需要處理的數(shù)據(jù)量(即其工作單元)、應(yīng)用 ForkFunction 將原始數(shù)據(jù)拆分為“有效”和“錯(cuò)誤”記錄以確保數(shù)據(jù)的質(zhì)量、將數(shù)據(jù)轉(zhuǎn)換為適當(dāng)?shù)哪繕?biāo)格式、更新元數(shù)據(jù)并報(bào)告指標(biāo)以便跟蹤進(jìn)度。在 Uber,所有 Marmaray 作業(yè)都運(yùn)行在 Apache Spark 上,并使用 YARN 作為資源管理器。

自助服務(wù)平臺(tái)

由于我們的很多數(shù)據(jù)平臺(tái)用戶(hù)不熟悉我們的技術(shù)棧中所使用的語(yǔ)言(如 Python 和 Java),因此我們的團(tuán)隊(duì)必須構(gòu)建一個(gè)自助服務(wù)平臺(tái),用戶(hù)只需通過(guò)鼠標(biāo)點(diǎn)擊就可以建立端到端的管道,確保來(lái)自所需源的數(shù)據(jù)最終能夠到達(dá)目標(biāo)接收器,以便進(jìn)行后續(xù)的分析工作和查詢(xún)。

 

 

圖 10:我們的自助服務(wù) UI 讓數(shù)據(jù)科學(xué)家和其他用戶(hù)能夠?qū)?shù)據(jù)從任何源移動(dòng)到任何接收器,而無(wú)需了解特定的數(shù)據(jù)格式。

在上線(xiàn)之后的七個(gè)月時(shí)間里,已經(jīng)有超過(guò) 3300 個(gè)作業(yè)通過(guò)我們的自助服務(wù)平臺(tái)加入到我們的系統(tǒng)中。

數(shù)據(jù)刪除

在 Uber,所有的 Kafka 數(shù)據(jù)都是以追加的方式進(jìn)行存儲(chǔ),并使用了日期分區(qū)。用戶(hù)數(shù)據(jù)可以跨越多個(gè)日期分區(qū),并且每個(gè)分區(qū)通常會(huì)包含很多 Kafka 記錄。如果底層存儲(chǔ)沒(méi)有內(nèi)置的索引和更新支持,那么掃描和更新這些分區(qū)以便進(jìn)行用戶(hù)數(shù)據(jù)的更正、更新或刪可能會(huì)非常耗費(fèi)資源。Hadoop 使用的 Parquet 數(shù)據(jù)存儲(chǔ)不支持索引,所以我們根本無(wú)法直接更新 Parquet 文件。為了便于索引和更新,Marmaray 使用了 Hadoop Updates 和 Incremental(Hudi),Uber 開(kāi)發(fā)的另一個(gè)開(kāi)源庫(kù),用于管理大型分析數(shù)據(jù)集的存儲(chǔ),將原始數(shù)據(jù)存儲(chǔ)在 Hive 中。

數(shù)據(jù)生產(chǎn)者使用 Hive 來(lái)掃描數(shù)據(jù)表,識(shí)別要?jiǎng)h除的記錄,并將它們發(fā)布到 Kafka 集群。Marmaray 的 Kafka 攝取管道依次從 Kafka 集群中讀取它們。然后,Marmaray 使用 Hudi 的批量插入功能攝取新記錄,保持較低的攝取延遲,并使用 Hudi 的 upsert 功能處理更新的記錄,將 Kafka 的舊記錄替換為更新過(guò)的數(shù)據(jù)。

 

 

圖 11:Marmaray 利用 Hudi 存儲(chǔ)格式來(lái)支持?jǐn)?shù)據(jù)刪除。

Marmaray 的下一個(gè)篇章

Marmaray 對(duì)任意源到任意接收器數(shù)據(jù)管道的支持適用于 Hadoop 生態(tài)系統(tǒng)(主要針對(duì)使用了 Hive 的場(chǎng)景)以及數(shù)據(jù)遷移的各種用例。我們已經(jīng)向開(kāi)源社區(qū)發(fā)布了 Marmary,并期待收到更多反饋,然后不斷改進(jìn) Marmaray 平臺(tái)。與此同時(shí),我們正在棄用傳統(tǒng)管道,并將我們所有的工作流程遷移到 Marmaray 平臺(tái)上,以簡(jiǎn)化我們的整體數(shù)據(jù)架構(gòu),并確保隨著數(shù)據(jù)需求的增長(zhǎng),我們能夠輕松地?cái)U(kuò)展。

Marmaray GitHub 地址:https://github.com/uber/marmaray

英文原文:https://eng.uber.com/marmaray-hadoop-ingestion-open-source/

標(biāo)簽: Mysql 大數(shù)據(jù) 代碼 網(wǎng)絡(luò)

版權(quán)申明:本站文章部分自網(wǎng)絡(luò),如有侵權(quán),請(qǐng)聯(lián)系:west999com@outlook.com
特別注意:本站所有轉(zhuǎn)載文章言論不代表本站觀(guān)點(diǎn)!
本站所提供的圖片等素材,版權(quán)歸原作者所有,如需使用,請(qǐng)與原作者聯(lián)系。

上一篇:全球10大智慧城市大數(shù)據(jù)應(yīng)用案例

下一篇:程序員必備的數(shù)據(jù)結(jié)構(gòu)與算法書(shū)單