中文字幕在线观看,亚洲а∨天堂久久精品9966,亚洲成a人片在线观看你懂的,亚洲av成人片无码网站,亚洲国产精品无码久久久五月天

容錯(cuò)和高性能如何兼得: Flink創(chuàng)始人談流計(jì)算核心架構(gòu)演化和現(xiàn)狀

2018-07-28    來(lái)源:raincent

容器云強(qiáng)勢(shì)上線!快速搭建集群,上萬(wàn)Linux鏡像隨意使用

英文原文鏈接:

https://data-artisans.com/blog/high-throughput-low-latency-and-exactly-once-stream-processing-with-apache-flink

前言

當(dāng)前,流數(shù)據(jù)平臺(tái)的普及率正在飆升。為了解決日益增長(zhǎng)的實(shí)時(shí)數(shù)據(jù)處理需求,一些公司正在將其部分大數(shù)據(jù)基礎(chǔ)架構(gòu)轉(zhuǎn)換為流式處理模型。 基于流數(shù)據(jù)的基礎(chǔ)架構(gòu)不僅能夠更好地解決延遲敏感的數(shù)據(jù)處理業(yè)務(wù)需求,同時(shí)提供更多深入業(yè)務(wù)洞察 ; 另外,流式數(shù)據(jù)處理平臺(tái)讓傳統(tǒng)的數(shù)據(jù)倉(cāng)庫(kù)建設(shè)更加簡(jiǎn)單靈活。

流式基礎(chǔ)架構(gòu)的關(guān)鍵部分是流計(jì)算引擎。優(yōu)秀的流式計(jì)算引擎可以讓業(yè)務(wù)即使在有狀態(tài)計(jì)算的情況下,也能提供低延遲、高吞吐、強(qiáng)一致性。

在本文中,我們將深入探討 Flink 的檢查點(diǎn)機(jī)制如何工作,以及它如何取代舊架構(gòu)以實(shí)現(xiàn)流容錯(cuò)和恢復(fù)。 我們測(cè)量 Flink 在各種類型的流媒體應(yīng)用程序中的性能,并通過(guò)在 Apache Storm(一種廣泛使用的低延遲流處理器)上運(yùn)行相同系列的實(shí)驗(yàn)來(lái)進(jìn)行效果對(duì)比。

流式處理架構(gòu)演化

在流式計(jì)算領(lǐng)域,同一套系統(tǒng)需要同時(shí)兼具容錯(cuò)和高性能其實(shí)非常難。 在傳統(tǒng)的批處理中,當(dāng)作業(yè)失敗時(shí),可以簡(jiǎn)單地重新運(yùn)行作業(yè)的失敗部分以修復(fù)由于之前失敗導(dǎo)致的數(shù)據(jù)丟失。 這對(duì)于批處理是完全可行的,因?yàn)榕幚淼臄?shù)據(jù)是靜態(tài)的,可以從頭到尾重放。 在連續(xù)的流式處理模型中,這種處理思路是完全不可行的。

原則上,數(shù)據(jù)流是無(wú)窮無(wú)盡的,不具有開(kāi)始點(diǎn)和結(jié)束點(diǎn)。 一個(gè)帶有 Buffer 緩存的數(shù)據(jù)流或許可以進(jìn)行一小段的數(shù)據(jù)重放、重新計(jì)算 (即: 如果系統(tǒng)出錯(cuò),系統(tǒng)可以嘗試從在 Buffer 中緩存的數(shù)據(jù)流進(jìn)行重新計(jì)算),但出錯(cuò)時(shí)希望從數(shù)據(jù)流最開(kāi)始點(diǎn)進(jìn)行重新計(jì)算是不切實(shí)際的(例如,一個(gè)流作業(yè)可以運(yùn)行數(shù)月之久,當(dāng)出現(xiàn)系統(tǒng)故障時(shí)候?qū)е聰?shù)據(jù)計(jì)算出錯(cuò)不可能參考批處理系統(tǒng),從幾個(gè)月前的數(shù)據(jù)開(kāi)始計(jì)算)。 此外,與僅具有輸入和輸出的批處理作業(yè)相比,流式計(jì)算是有狀態(tài)的。 這意味著除了輸出之外,系統(tǒng)還需要備份和恢復(fù)部分計(jì)算 (我們稱之為 Operator,下同) 狀態(tài)。

由于這些問(wèn)題帶來(lái)的諸多復(fù)雜性,開(kāi)源生態(tài)系統(tǒng)多個(gè)系統(tǒng)都在嘗試多種方式來(lái)解決容錯(cuò)問(wèn)題。容錯(cuò)機(jī)制的設(shè)計(jì)將對(duì)框架設(shè)計(jì)預(yù)計(jì)編程模型都有深遠(yuǎn)的影響,導(dǎo)致難以在現(xiàn)有的流式框架上類似插件機(jī)制一樣擴(kuò)展實(shí)現(xiàn)不一樣的容錯(cuò)策略。因此,當(dāng)我們選擇流式計(jì)算內(nèi)框架時(shí),容錯(cuò)策略非常重要。

接下來(lái),我們將討論了容錯(cuò)流式架構(gòu)的幾種方法,從記錄確認(rèn) (record-acknowledgements) 到微批處理 (micro-batching),事務(wù)更新(transactional updates)和分布式快照(distributed snapshots)。 我們將從以下幾個(gè)維度討論各個(gè)系統(tǒng)的優(yōu)缺點(diǎn),同時(shí)最終選出一個(gè)適合流式處理的最優(yōu) Feature 組合。 我們將討論:

• 完全一次保證:故障后應(yīng)正確恢復(fù)有狀態(tài)運(yùn)算符中的狀態(tài)

• 低延遲:越低越好。 許多應(yīng)用程序需要亞秒級(jí)延遲

• 高吞吐量:隨著數(shù)據(jù)速率的增長(zhǎng),通過(guò)管道推送大量數(shù)據(jù)至關(guān)重要

• 強(qiáng)大的計(jì)算模型:框架應(yīng)該提供一種編程模型,該模型不限制用戶并允許各種各樣的應(yīng)用程序在沒(méi)有故障的情況下,容錯(cuò)機(jī)制的開(kāi)銷很低

• 流量控制:來(lái)自慢速操作員的背壓應(yīng)該由系統(tǒng)和數(shù)據(jù)源自然吸收,以避免因消費(fèi)者緩慢而導(dǎo)致崩潰或降低性能

我們遺漏了一個(gè)共同特征,即失敗后的快速恢復(fù),不是因?yàn)樗恢匾且驗(yàn)?1)所有討論的系統(tǒng)都是基于完全并行的分布式處理系統(tǒng),恢復(fù)是基礎(chǔ)能力;以及(2)在有狀態(tài)的應(yīng)用程序中,狀態(tài)恢復(fù)的瓶頸通常在于存儲(chǔ)而非計(jì)算框架。

Record acknowledgements (記錄確認(rèn),代表系統(tǒng) Apache Storm)

雖然流處理已經(jīng)在諸如金融等行業(yè)中廣泛使用多年,但直到最近流式處理才能為大數(shù)據(jù)的基礎(chǔ)設(shè)施的一部分。 這些都得益于開(kāi)源的流式大數(shù)據(jù)處理引擎成熟和發(fā)展。 Apache Storm 是開(kāi)源生態(tài)中第一個(gè)廣泛使用的大規(guī)模流處理框架。 Storm 使用上游備份機(jī)制和記錄確認(rèn)機(jī)制來(lái)保證在失敗后重新處理消息。 請(qǐng)注意,Storm 不保證狀態(tài)一致性,任何可變狀態(tài)處理都委托給用戶來(lái)處理(Storm 的 Trident API 確保狀態(tài)一致性,將在下一節(jié)中介紹)。

譯者注: 以下內(nèi)容理解需要讀者一定的 Apache Storm 基礎(chǔ),請(qǐng)參看 Apache Storm 官方文檔有關(guān) Storm 關(guān)鍵概念的描述。

記錄確認(rèn)的容錯(cuò)方式如下:當(dāng)前 Operator 處理完成每條記錄時(shí)都會(huì)向前一個(gè) Operator 發(fā)回針對(duì)這條記錄處理過(guò)的確認(rèn)。

Topology 的 Source(譯者注: Storm 的 Source 節(jié)點(diǎn)指 Storm 一個(gè)作業(yè)中負(fù)責(zé)從流式源頭讀取數(shù)據(jù)的 Operator) 會(huì)保留其產(chǎn)生的所有記錄備份用來(lái)處理 Fail 情況。 當(dāng)源頭一條記錄的所有派生記錄都被整個(gè) Topology 處理完成,Source 節(jié)點(diǎn)就可以刪除其備份;當(dāng)系統(tǒng)出現(xiàn)部分 Fail 情況,例如一條記錄并沒(méi)有收到其下游的派生記錄的確認(rèn),Source 就會(huì)重新發(fā)送該記錄到下游的 Topology 以便重新進(jìn)行計(jì)算。 這種處理機(jī)制可以保證整個(gè)處理過(guò)程不會(huì)丟失數(shù)據(jù),但很有可能導(dǎo)致同一條記錄被多次發(fā)送到下游進(jìn)行處理(我們稱之為“at least once”)。 Storm 使用一種巧妙的機(jī)制來(lái)實(shí)現(xiàn)這種容錯(cuò)方式,每個(gè)源記錄只需要幾個(gè)字節(jié)的存儲(chǔ)來(lái)跟蹤確認(rèn)。 Twitter Heron 保持與 Storm 相同的確認(rèn)機(jī)制,但提高了記錄重放的效率(從而提高了恢復(fù)時(shí)間和整體吞吐量)。

單獨(dú)的記錄確認(rèn)容錯(cuò)體系結(jié)構(gòu),無(wú)論其性能如何,都無(wú)法提供 exactly-once(精確一次) 的保證,Storm 將規(guī)避重復(fù)數(shù)據(jù)的問(wèn)題交給了流式處理應(yīng)用開(kāi)發(fā)者去處理。 當(dāng)然,對(duì)于某些應(yīng)用程序而言,數(shù)據(jù)小部分重復(fù)可以接受的,但仍然有更多的場(chǎng)景無(wú)法接受數(shù)據(jù)不準(zhǔn)確的情況。另外,Storm 的容錯(cuò)機(jī)制還帶來(lái)了吞吐不夠以及流控問(wèn)題, 特別是在 backpressure(反壓) 情況下,記錄確認(rèn)的容錯(cuò)方式會(huì)導(dǎo)致上游節(jié)點(diǎn)錯(cuò)誤地認(rèn)為數(shù)據(jù)處理出現(xiàn)了 Fail(實(shí)際上僅僅是由于 backpressure 導(dǎo)致記錄處理不及時(shí),而無(wú)法 ack)。上述 Storm 的種種問(wèn)題最終演化出基于微批處理的流式架構(gòu)。

Micro batches(微批處理,代表系統(tǒng) Apache Storm Trident,Apache Spark Streaming)

上節(jié)討論到,Storm 和以及更早前的流式傳輸系統(tǒng)無(wú)法提供對(duì)大規(guī)模應(yīng)用程序至關(guān)重要的一些 Feature,特別是高吞吐量,快速并行恢復(fù),以及托管狀態(tài)的一次性語(yǔ)義。 這導(dǎo)致了下一階段的流式系統(tǒng)演化。

之后,具備容錯(cuò)能力的下一個(gè)發(fā)展階段到了微批處理,或者說(shuō)流離散化 (stream discretization,即將連續(xù)的流切分為一個(gè)個(gè)離散的、小批次的微批進(jìn)行處理)。這個(gè)出發(fā)點(diǎn)非常簡(jiǎn)單:流式處理系統(tǒng)中的算子都是在 record 級(jí)別進(jìn)行計(jì)算同步和容錯(cuò),由此帶來(lái)了在 record 如此低層次上進(jìn)行處理的復(fù)雜和開(kāi)銷。很簡(jiǎn)單嘛,我們就把連續(xù)的數(shù)據(jù)流不要切分到 record 級(jí)別,而是收斂切分為一批一批微批的、原子的數(shù)據(jù)進(jìn)行類似 Batch 的計(jì)算。這樣,每個(gè) batch 的數(shù)據(jù)可能會(huì)成功或者失敗處理,我們就對(duì)當(dāng)前失敗的這一小批數(shù)據(jù)進(jìn)行處理即可。

 

 

微批處理本質(zhì)上一種批處理模型,顯然可以利用現(xiàn)有的批處理引擎就可以完成流式計(jì)算。例如,可以在批處理引擎(Spark)提供流功能(這是 Spark Streaming 背后的基本機(jī)制),當(dāng)前它也可以應(yīng)用于流引擎之上(例如, Storm)提供一次性保證和狀態(tài)恢復(fù)(這是 Storm Trident 背后的想法)。 在 Spark Streaming 中,每次的微批量計(jì)算都是一個(gè) Spark 作業(yè),而在 Trident 中,每個(gè)微批次都是一個(gè)大型記錄,微批次中的所有記錄都會(huì)合并進(jìn)入一個(gè)大型記錄。

基于微批處理的系統(tǒng)可以實(shí)現(xiàn)上面列出的相當(dāng)多的需求(確切一次保證,高吞吐量),但它們還有很多不足之處:

編程模型:為了實(shí)現(xiàn)其目標(biāo),例如,Spark Streaming 將編程模型從流式更改為微批處理。 這意味著用戶不能再在檢查點(diǎn)間隔的倍數(shù)之外的時(shí)段中窗口數(shù)據(jù),并且模型不能支持許多應(yīng)用程序所需的基于計(jì)數(shù)或會(huì)話窗口。 這些都是應(yīng)用程序開(kāi)發(fā)人員需要的需求。具有可以改變狀態(tài)的連續(xù)運(yùn)算符的純流模型為用戶提供了更大的靈活性。

流量控制:使用基于時(shí)間的數(shù)據(jù)切分為微批的處理方式仍然具有 backpressure 固有問(wèn)題。 如果某個(gè)下游的 Operator 處理較慢(例如,計(jì)算密集型 Operator 處理性能跟不上或者向外部存儲(chǔ)寫出數(shù)據(jù)較慢),此時(shí)如果負(fù)責(zé)數(shù)據(jù)流切分的 Operator 速度快于下游的阻塞節(jié)點(diǎn),就會(huì)導(dǎo)致數(shù)據(jù)切分比原有的配置時(shí)間更長(zhǎng)。 這導(dǎo)致越來(lái)越多的批次在內(nèi)存排隊(duì)等待被處理,最終內(nèi)存 OOM,或者微批的時(shí)間間隔增大導(dǎo)致數(shù)據(jù)不精確。

延遲:微批處理顯然加大了流計(jì)算延遲,一個(gè)微批作業(yè)的延遲最好情況也只能到微批的間隔時(shí)間。 通常情況下,亞秒級(jí)別的延遲對(duì)于一些簡(jiǎn)單應(yīng)用程序足夠,但一個(gè)較為復(fù)雜的流式處理任務(wù),例如單個(gè)作業(yè)內(nèi)部存在多個(gè)階段,每個(gè)階段存在大量分布式數(shù)據(jù) shuffle 情況,很容易將整個(gè)作業(yè)延遲拉長(zhǎng)的數(shù)秒甚至數(shù)十秒。

微批處理模型的最大限制可能是它連接了兩個(gè)不應(yīng)連接的概念:應(yīng)用程序定義的窗口大小和系統(tǒng)內(nèi)部恢復(fù)間隔。 假設(shè)一個(gè)程序(下面是示例 Flink 代碼)每 5 秒聚合一次記錄:

dataStream .map(transformRecords) .groupBy(“sessionId”) .window(Time.of(5, TimeUnit.SECONDS)) .sum(“price”)

這些應(yīng)用非常適合微批量模型。 系統(tǒng)累積 5 秒的數(shù)據(jù),對(duì)它們求和,并在對(duì)流進(jìn)行一些轉(zhuǎn)換后聚合計(jì)算。 下游數(shù)據(jù)應(yīng)用程序可以直接使用上述 5 秒聚合的結(jié)果進(jìn)行數(shù)據(jù)消費(fèi),例如在儀表板上顯示。 但是,現(xiàn)在假設(shè) backpressure 效應(yīng)開(kāi)始起作用(例如,由于計(jì)算密集型的 transformRecords 函數(shù)),或者 devops 團(tuán)隊(duì)決定通過(guò)將間隔增加到 10 秒來(lái)控制作業(yè)的吞吐量。 然后,在出現(xiàn) backpressure 情況下,微批量大小不受控制地動(dòng)態(tài)進(jìn)行改變,或者直接變?yōu)?10 秒。 這意味著下游應(yīng)用程序(例如,包含最近 5 秒統(tǒng)計(jì)的 Web 儀表板)讀取的聚合數(shù)據(jù)是錯(cuò)誤的,下游應(yīng)用程序需要自己處理此問(wèn)題。 這樣,流計(jì)算系統(tǒng)由于性能或者吞吐問(wèn)題,直接導(dǎo)致了運(yùn)行數(shù)據(jù)錯(cuò)誤。

微批處理可以實(shí)現(xiàn)高吞吐量和一次性保證,但這些功能室以喪失低延遲,流量控制和純流式編程模型為代價(jià)滴。 顯然,我們需要思考清楚的是,是否有可能實(shí)現(xiàn)兩全其美:在保持持續(xù)計(jì)算 (continuous process) 的運(yùn)算符模型的所有優(yōu)勢(shì),同時(shí)兼?zhèn)湟恢滦、高吞吐量等?yōu)勢(shì)。 后面討論的后續(xù)流式架構(gòu)實(shí)現(xiàn)了這種 Feature 的組合,并將微批處理作為流式處理的基本模型。

注意:通常微批處理被認(rèn)為是一次處理一條記錄的替代方法。 這是見(jiàn)文生義的做法:所謂的連續(xù)計(jì)算并不是連續(xù)地一次處理一條記錄。 實(shí)際上,所有精心設(shè)計(jì)的流計(jì)算系統(tǒng)(包括下面討論的 Flink 和 Google Dataflow)在通過(guò)網(wǎng)絡(luò)傳輸之前會(huì)緩沖許多記錄,同時(shí)又具備流式連續(xù)處理能力。

Transactional updates (代表系統(tǒng) Google Cloud Dataflow)

如何做到魚(yú)和熊掌兼得?在保持連續(xù)計(jì)算模型(低延遲,反壓流控,狀態(tài)管理等)的好處,同時(shí)保證做到數(shù)據(jù)處理的準(zhǔn)確一致。一種強(qiáng)大而不失優(yōu)雅的方式是原子地記錄數(shù)據(jù)的處理以及狀態(tài)的更新 (譯者注: 類似數(shù)據(jù)的 WAL 日志)。 一旦系統(tǒng)出現(xiàn) Fail,可從記錄的日志中恢復(fù)我們需要的中間計(jì)算狀態(tài)和需要處理數(shù)據(jù)。

在 Google Cloud Dataflow 中實(shí)現(xiàn)類似的模型。 系統(tǒng)將計(jì)算模型抽象為一次部署并長(zhǎng)期運(yùn)行持續(xù)計(jì)算的 Operator DAG。 在 Dataflow 中,數(shù)據(jù)的 shuffle 是流式的而非批模式,同時(shí)計(jì)算結(jié)果亦不需要物化 (數(shù)據(jù)的計(jì)算結(jié)果放在內(nèi)存中)。 這種模型不僅解決了流式計(jì)算低延遲問(wèn)題,同時(shí)還天然支持自然流量控制機(jī)制,因?yàn)?DAG 不同階段的 Operator 之間存有中間結(jié)果的 Buffer,這些中間緩沖區(qū)可以緩解反壓,直到反壓惡化到最源頭的 Operator,即 DataFlow Source 節(jié)點(diǎn)。而基于 Pull 模型的流式數(shù)據(jù)源,如 Kafka 消費(fèi)者可以處理這個(gè)問(wèn)題,即 Source 節(jié)點(diǎn)的中間結(jié)果 Buffer 會(huì)出現(xiàn)積壓導(dǎo)致讀取 Kafka 變慢,但不會(huì)影響上游的流數(shù)據(jù)采集。 系統(tǒng)還支持一套干凈的流編程模型,支持復(fù)雜的窗口,同時(shí)還提供對(duì)狀態(tài)的更新操作。 值得一提的是,這套流編程模型包含微批量模型。

例如,下面 Google Cloud Dataflow 程序(請(qǐng)參閱:https://cloud.google.com/dataflow/model/windowing )會(huì)創(chuàng)建一個(gè)會(huì)話窗口,如果針對(duì)某個(gè) Key 在 10 分鐘內(nèi)都沒(méi)有數(shù)據(jù)達(dá)到,則會(huì)觸發(fā)該會(huì)話窗口 (譯者注: 例如某個(gè)用戶在訪問(wèn) APP 期間中斷了 10 分鐘沒(méi)有操作)。 而間隔 10 分鐘后,如果新的數(shù)據(jù)到達(dá),系統(tǒng)將創(chuàng)建一個(gè)新的會(huì)話窗口。

PCollection items = ...; PCollection session_windowed_items = items.apply( Window.into(Sessions.withGapDuration(Duration.standardMinutes(10))))

這種數(shù)據(jù)的處理方式在流式模型中很容易實(shí)現(xiàn),但在微批量模型中很難實(shí)現(xiàn),因?yàn)閿?shù)據(jù)窗口的定義不對(duì)應(yīng)于固定的微批量大小。

這種架構(gòu)中的容錯(cuò)設(shè)計(jì)如下:通過(guò) Operator 的每個(gè)中間記錄,和本 Operator 計(jì)算帶來(lái)的狀態(tài)更新,以及有本條記錄派生的新記錄,一起做一次原子事務(wù)并提交到事務(wù)性日志系統(tǒng)或者數(shù)據(jù)庫(kù)系統(tǒng)。 在系統(tǒng)出現(xiàn)失敗的情況下,之前數(shù)據(jù)庫(kù)的記錄將被重放,用于恢復(fù)計(jì)算的中間狀態(tài),同時(shí)將丟失沒(méi)有來(lái)得及計(jì)算的數(shù)據(jù)重新讀取進(jìn)行計(jì)算。

Apache Samza 遵循類似的方法,但只能提供 at-least-once 保證 ,因?yàn)樗褂?Apache Kafka 作為后臺(tái)存儲(chǔ)。 Kafka(現(xiàn)在)不提供事務(wù),因此對(duì)狀態(tài)和派生流記錄的更新不能作為原子事務(wù)一起提交。

事務(wù)更新體系結(jié)構(gòu)具有許多優(yōu)點(diǎn)。 事實(shí)上,它實(shí)現(xiàn)了我們?cè)诒疚拈_(kāi)頭提出的所有需求。 該體系結(jié)構(gòu)的基礎(chǔ)是能夠頻繁地寫入具有高吞吐量的分布式容錯(cuò)存儲(chǔ)。 分布式快照(在下一節(jié)中進(jìn)行了解釋)將拓?fù)涞臓顟B(tài)作為一個(gè)整體進(jìn)行快照,從而減少了對(duì)分布式存儲(chǔ)的寫入量和頻率。

Distributed Snapshots (代表系統(tǒng) Apache Flink™)

提供 exactly-once 流式處理語(yǔ)義保證的核心問(wèn)題就是 確定當(dāng)前流式計(jì)算的狀態(tài) (包括正在處理的數(shù)據(jù),以及 Operator 狀態(tài)),生成該狀態(tài)的一致快照,并存儲(chǔ)在持久存儲(chǔ)中。如果可以經(jīng)常執(zhí)行狀態(tài)保存的操作,則從故障恢復(fù)意味著僅從持久存儲(chǔ)中恢復(fù)最新快照,將源頭 Source 回退到快照生成時(shí)刻再次進(jìn)行”播放”。Flink 的狀態(tài)算法在這篇論文有詳細(xì)說(shuō)明,以下我們給出一個(gè)簡(jiǎn)單總結(jié)。

Flink 的快照機(jī)制基于 Chandy 和 Lamport 于 1985 年設(shè)計(jì)的算法,用于生成分布式系統(tǒng)當(dāng)前狀態(tài)的一致快照(請(qǐng)參閱此處的詳細(xì)介紹 ),不會(huì)丟失信息且不記錄重復(fù)項(xiàng)。 Flink 使用的是 Chandy Lamport 算法的一個(gè)變種,定期對(duì)正在運(yùn)行的流拓?fù)涞臓顟B(tài)做快照,并將這些快照存儲(chǔ)到持久存儲(chǔ)(例如,存儲(chǔ)到 HDFS 或內(nèi)存中文件系統(tǒng))。 這些做快照的頻率是可配置的。

這類似于微批處理方法,其中兩個(gè)檢查點(diǎn)之間的所有計(jì)算都作為一個(gè)整體原子地成功或失敗。 然而,這個(gè)就是兩者唯一的類似點(diǎn)。 Chandy Lamport 算法的一個(gè)重要特點(diǎn)是我們永遠(yuǎn)不必按流處理中的“暫停”按鈕,用來(lái)等待檢查點(diǎn)完成后安排下一次 Batch 數(shù)據(jù)處理。 相反,常規(guī)數(shù)據(jù)處理始終保持運(yùn)行,而狀態(tài)持久化僅在后臺(tái)發(fā)生。 以下引用原始論文,

全局狀態(tài)檢測(cè)算法應(yīng)該被設(shè)計(jì)在基礎(chǔ) (業(yè)務(wù)) 計(jì)算之上:它必須與此基礎(chǔ) (業(yè)務(wù)) 計(jì)算同時(shí)并行進(jìn)行,但不能侵入改變基礎(chǔ) (業(yè)務(wù)) 計(jì)算。

因此,該架構(gòu)結(jié)合了遵循真正的持續(xù)計(jì)算模型(低延遲,流量控制和真正的流編程模型)和高吞吐量的優(yōu)點(diǎn),并且也是 Chandy-Lamport 算法可證明的一次性保證。 除了持久化有狀態(tài)計(jì)算的狀態(tài)(每個(gè)其他容錯(cuò)機(jī)制也需要這樣做)之外,這種容錯(cuò)機(jī)制幾乎沒(méi)有開(kāi)銷。 對(duì)于小狀態(tài)(例如,計(jì)數(shù)或其他統(tǒng)計(jì)摘要),這種持久化開(kāi)銷通常可忽略不計(jì),而對(duì)于大狀態(tài),狀態(tài)持久化間隔需要流計(jì)算應(yīng)用開(kāi)發(fā)者在吞吐量和恢復(fù)時(shí)間之間進(jìn)行權(quán)衡。

最重要的是,該架構(gòu)將應(yīng)用程序開(kāi)發(fā)與流量控制、吞吐量控制分開(kāi)。 更改快照持久化的間隔時(shí)間對(duì)流作業(yè)的結(jié)果完全沒(méi)有影響,因此下游應(yīng)用程序可以安全地依賴于接收正確的結(jié)果。

Flink 的檢查點(diǎn)機(jī)制基于 stream barriers(可以理解為Chandy Lamport 中的“標(biāo)記”),這些 barrier 像正常的業(yè)務(wù)數(shù)據(jù)一樣在 Operator 和 Operator 之間的數(shù)據(jù)管道中流動(dòng)。 Flink 的檢查點(diǎn)的描述來(lái)自于 Flink 社區(qū)文檔 ) 。

Barrier 在 Source 節(jié)點(diǎn)中被注入到普通流數(shù)據(jù)中(例如,如果使用 Apache Kafka 作為源,Barrier 將與 Kafka 的讀取偏移對(duì)齊),并且作為數(shù)據(jù)流的一部分與數(shù)據(jù)記錄一起流過(guò)下游的 DAG。 Barrier 將業(yè)務(wù)數(shù)據(jù)流分為兩組:當(dāng)前快照的一部分(Barrier 表示檢查點(diǎn)的開(kāi)始),以及屬于下一個(gè)快照的那些組。

 

 

Barrier 流向下游并在通過(guò) Operator 時(shí)觸發(fā)狀態(tài)快照。 Operator 首先將 Barrier 與所有傳入的流分區(qū)(通常 Barrier 具有多個(gè)輸入)對(duì)齊,上游來(lái)源較快的流分區(qū)將被緩沖數(shù)據(jù)以等待來(lái)源較慢的流分區(qū)。 當(dāng) Operator 從每個(gè)輸入流分區(qū)都收到 Barrier 時(shí),它會(huì)檢查其狀態(tài)(如果有)并寫入持久存儲(chǔ),這個(gè)過(guò)程我們稱之為狀態(tài)寫檢查。一旦完成狀態(tài)檢查點(diǎn),Operator 就將 Barrier 向下游轉(zhuǎn)發(fā)。 請(qǐng)注意,在此機(jī)制中,如果 Operator 支持,則狀態(tài)檢查點(diǎn)既可以是異步(在寫入狀態(tài)時(shí)繼續(xù)處理),也可以是增量(僅寫入更改)。

 

 

一旦所有數(shù)據(jù)寫出端 (即 Flink Sink 節(jié)點(diǎn)) 都收到 Barrier,當(dāng)前檢查點(diǎn)就完成了。 故障恢復(fù)意味著只需恢復(fù)最新的檢查點(diǎn)狀態(tài),并從最新的 Barrier 記錄的偏移量重新啟動(dòng) Source 節(jié)點(diǎn)。 分布式快照在我們?cè)诒疚拈_(kāi)頭所要達(dá)到的所有需求中得分很高。 它們實(shí)現(xiàn)了高吞吐量、一次性保證,同時(shí)保留了連續(xù)的 Operator 模型、低延遲以及自然流量控制。

總覽

我們從這篇文章開(kāi)始,列出了來(lái)自分布式流體系結(jié)構(gòu)的需求。下表總結(jié)了我們討論的每個(gè)體系結(jié)構(gòu)如何支持這些功能。

 

實(shí)驗(yàn)

為了進(jìn)一步說(shuō)明 Apache Flink™的性能,我們?cè)O(shè)計(jì)了一系列實(shí)驗(yàn),用以研究 Flink 的吞吐,延遲以及容錯(cuò)機(jī)制的影響。下面所有實(shí)驗(yàn)均在 Google Compute Engine 上進(jìn)行,使用 30 個(gè)實(shí)例,每個(gè)實(shí)例包含 4 核和 15GB 內(nèi)存。所有 Flink 測(cè)試均使用截至 7 月 24 日的最新代碼修訂版進(jìn)行,所有 Storm 測(cè)試均使用 0.9.3 版。如有需要,大家可以在此處找到用于評(píng)估的所有代碼。

同時(shí),為了更好進(jìn)行橫向比較,我們也提供了在 Apache Storm 上面運(yùn)行相同程序的結(jié)果。如前面文章所介紹,Apache Storm 曾經(jīng)是最廣泛使用的流式處理系統(tǒng)之一,它核心機(jī)制是實(shí)現(xiàn)了"記錄確認(rèn) (譯注: record acknowledgements)"以及"微批處理 (譯注: mini-batch)"。(后者是通過(guò) Storm Trident 實(shí)現(xiàn))

吞吐

我們?cè)?Google 云上,組建 30 臺(tái)機(jī)器總計(jì) 120Core 的集群,用來(lái)測(cè)量 Flink 和 Storm 對(duì)兩個(gè)不同程序的吞吐量。 第一個(gè)程序是并行流式 grep 任務(wù),它在流中搜索包含與正則表達(dá)式匹配的字符串的事件。grep 應(yīng)用程序具備的特征能夠讓 grep 非常容易做到并行處理,并且基于流分區(qū)進(jìn)行伸縮。

 

 

在 Flink 集群上,我們可以看到 Flink 每核每秒平均有 150 萬(wàn)條記錄的持續(xù)吞吐量,這使 Flink 集群總吞吐量達(dá)到每秒 1.82 億記錄。Flink 的計(jì)算延遲為零,因?yàn)樽鳂I(yè)不涉及網(wǎng)絡(luò),也不涉及微批處理。而當(dāng)打開(kāi) Flink 的容錯(cuò)機(jī)制,并設(shè)定每 5 秒做一次快照,可以看到的是 Flink 吞吐量有輕微降低(小于 2%)?梢哉f(shuō),F(xiàn)link 優(yōu)秀的容錯(cuò)機(jī)制并不會(huì)引入任何計(jì)算延遲。

在 Storm 集群上,當(dāng)我們關(guān)閉記錄確認(rèn)機(jī)制(即沒(méi)有任何數(shù)據(jù)準(zhǔn)確性保證),Storm 處理吞吐能力是每核每秒約 82000 條記錄,99% 的處理延遲在 10 毫秒以內(nèi),因此整個(gè) Storm 集群的總吞吐量為每秒 57 萬(wàn)條記錄。當(dāng)啟用記錄確認(rèn)(即保證數(shù)據(jù)至少處理一次,at-least-once)時(shí),Storm 的吞吐量降至每核每秒 4700 條記錄,同時(shí) Storm 的延遲也增加到 30-120 毫秒。 接下來(lái),我們使用 Storm Trident,其微批量大小為 200000 個(gè)元組。 Trident 實(shí)現(xiàn)了每核每秒 75000 條記錄的吞吐量(集群總吞吐量與 Storm 原生處理機(jī)制在關(guān)閉容錯(cuò)機(jī)制情況下整體吞吐量大致相同)。然而,這個(gè)集群吞吐性能是以 3000 毫秒的延遲(99% 的百分位數(shù)是 3000ms)為代價(jià)換來(lái)的。

我們看到 Flink 的吞吐量比 Trident 高出 20 倍以上,吞吐量比 Storm 高 300 倍,在如此高吞吐情況下,F(xiàn)link 還保證了計(jì)算延遲幾乎為零。另外,我們還看到,F(xiàn)link 規(guī)避了微批處理模型,因此 Flink 的高吞吐量并不會(huì)以犧牲延遲為代價(jià) 。 Flink 可以將 Source 節(jié)點(diǎn)和 Sink 節(jié)點(diǎn)鏈接 (Chain) 在一起,從而將數(shù)據(jù)在 Flink 內(nèi)部傳遞優(yōu)化為在單個(gè) JVM 里面交換下數(shù)據(jù)記錄的句柄而已。

之后,我們還進(jìn)行了如下實(shí)驗(yàn),將計(jì)算集群核心數(shù)量從 40 個(gè)擴(kuò)展到 120 個(gè)。因?yàn)?Grep 程序是一個(gè)易于并行處理的邏輯,因此所有框架處理能力理論上都能夠做到線性擴(kuò)展,F(xiàn)在讓我們?cè)僮鲆粋(gè)稍加不同的實(shí)驗(yàn),它按數(shù)據(jù)業(yè)務(wù)主鍵執(zhí)行流分組,從而實(shí)現(xiàn)通過(guò)網(wǎng)絡(luò)對(duì)數(shù)據(jù)流進(jìn)行混洗 (Shuffle)。同樣,我們?nèi)匀辉?30 臺(tái)機(jī)器的集群中運(yùn)行此作業(yè),集群硬件系統(tǒng)配置與以前完全相同。Flink 集群的吞吐能力如下,當(dāng)關(guān)閉快照檢查點(diǎn)是每核每秒大約 720000 條記錄,當(dāng)打開(kāi)快照檢查點(diǎn)后降至 690000 條記錄每秒。 請(qǐng)注意,F(xiàn)link 在每個(gè)檢查點(diǎn)均備份所有 Operator 的狀態(tài),而 Storm 則完全不支持這種功能。 此示例中的狀態(tài)相對(duì)較小(狀態(tài)主要是計(jì)數(shù)和摘要,每個(gè)檢查點(diǎn)每個(gè)運(yùn)算符的數(shù)量小于 1MB)。 Storm 在打開(kāi)記錄確認(rèn)情況下,具有每核心每秒約 2600 條記錄的吞吐能力。

 

 

延遲

一個(gè)大數(shù)據(jù)系統(tǒng)能否處理大規(guī)模數(shù)據(jù)量肯定至關(guān)重要。 但在流式處理系統(tǒng)中,另外一個(gè)特別重要的點(diǎn)在于計(jì)算延遲。 對(duì)于欺詐檢測(cè)或 IT 安全等應(yīng)用程序,在毫秒級(jí)別能夠進(jìn)行事件處理意味著可以避免業(yè)務(wù)損失,一套流式處理系統(tǒng)最低只能優(yōu)化到 100 毫秒的延遲通常意味著前述問(wèn)題只能在業(yè)務(wù)損失發(fā)生的事后才能發(fā)現(xiàn),而此時(shí)的問(wèn)題發(fā)現(xiàn)對(duì)于我們避免業(yè)務(wù)損失實(shí)際上意義已經(jīng)不大了。

當(dāng)應(yīng)用程序開(kāi)發(fā)人員評(píng)估一套流式處理系統(tǒng)性能延遲時(shí),他們通常需要一個(gè)底層處理系統(tǒng)告之他們延遲分布情況。我們?cè)O(shè)計(jì)一個(gè)實(shí)驗(yàn),測(cè)量業(yè)務(wù)主鍵聚合場(chǎng)景下作業(yè)的延遲分布情況,該作業(yè)由于存在主鍵聚合,因此需要流式系統(tǒng)內(nèi)部數(shù)據(jù)跨網(wǎng)絡(luò)混洗。下圖顯示了延遲分布情況,包括延遲中位數(shù)、延遲第 90% 位數(shù)、第 95% 分位數(shù)、第 99% 分位數(shù)(所謂第 99% 分位數(shù)的 50 毫秒延遲,意味著 99%的元素延遲不超過(guò) 50 毫秒)。

 

 

值得注意的是,F(xiàn)link 在以最大吞吐量運(yùn)行時(shí),其處理中值延遲為 26 毫秒,第 99 百分位延遲為 51 毫秒,這意味著 99%的延遲都低于 51 毫秒。 當(dāng)我們打開(kāi) Flink 的檢查點(diǎn)機(jī)制(打開(kāi) exactly-once 的狀態(tài)更新保證)并沒(méi)有增加明顯的延遲。但此時(shí),我們確實(shí)看到處于較高百分位數(shù)的延遲增加,有觀察到的延遲大約為 150 毫秒。這類情況主要原因是流在對(duì)齊所消耗的延遲,此時(shí)的 Operator 在等待接收所有輸入的 Barrier(譯注: 關(guān)于 Barrier 部分,請(qǐng)參考 https://ci.apache.org/projects/flink/flink-docs-master/internals/stream_checkpointing.html 這篇文章)。Storm 具有非常低的中值延遲(1 毫秒),并且第 99 百分位延遲也是 51 毫秒。

對(duì)于大多數(shù)應(yīng)用程序而言,重要的是能夠在可接受的延遲中維持較高吞吐量,延遲的具體需求取決于特定應(yīng)用程序的業(yè)務(wù)要求。 在 Flink 中,用戶可以使用稱之為 Buffer Timeout 的機(jī)制來(lái)調(diào)整延遲。為了提高性能,F(xiàn)link 的 Operator 在將數(shù)據(jù)發(fā)送到下一個(gè) Operator 之前會(huì)將數(shù)據(jù)暫存在緩沖區(qū)。通過(guò)指定緩沖區(qū)超時(shí)時(shí)間,例如設(shè)定 10 毫秒,我們可以告訴 Flink 當(dāng)面臨 1) 緩沖區(qū)已滿 2)10 毫秒已過(guò) 的情況下發(fā)送當(dāng)前緩沖區(qū)所有的數(shù)據(jù)。 通常來(lái)說(shuō)設(shè)定一個(gè)較低的緩沖區(qū)超時(shí)間將優(yōu)化流式處理的延遲,但隨之而來(lái)的是會(huì)降低相應(yīng)的計(jì)算吞吐量。 在上面的實(shí)驗(yàn)中,我們將 Flink 緩沖區(qū)超時(shí)設(shè)置為 50 毫秒,這解釋了為什么第 99 百分位的是 50 毫秒。

我們?cè)龠M(jìn)一步解釋下延遲是如何影響 Flink 中的吞吐量。因?yàn)樵O(shè)定較低的延遲時(shí)間將不可避免地意味著緩存數(shù)據(jù)的減少,因此必然會(huì)產(chǎn)生一定的吞吐量成本。 下圖顯示了不同緩沖區(qū)超時(shí)時(shí)間設(shè)置下的 Flink 吞吐量情況。 該實(shí)驗(yàn)再次使用流記錄分組聚合的作業(yè)。

 

 

如果指定緩沖區(qū)超時(shí)時(shí)間為零,處理的記錄會(huì)立即轉(zhuǎn)發(fā)到下游的 Operator 而不會(huì)進(jìn)行緩沖。 如此的延遲優(yōu)化,F(xiàn)link 可以實(shí)現(xiàn) 0 毫秒的中值延遲,以及 99% 延遲在 20 毫秒以下。當(dāng)然,隨之帶來(lái)相應(yīng)的吞吐量是每核每秒 24500 記錄處理能力。當(dāng)我們?cè)黾泳彌_區(qū)超時(shí)時(shí)間,我們會(huì)看到延遲增加,吞吐量會(huì)同時(shí)增加,直到達(dá)到吞吐量峰值,緩沖區(qū)填充速度超過(guò)緩沖區(qū)超時(shí)到期時(shí)間。例如,設(shè)置 50 毫秒的緩沖區(qū)超時(shí)時(shí)間,F(xiàn)link 系統(tǒng)將達(dá)到每核每秒 750000 條記錄的峰值吞吐量,99% 的處理延遲在 50 毫秒以下。

正確性與恢復(fù)開(kāi)銷

我們的最后一個(gè)實(shí)驗(yàn)開(kāi)始測(cè)試做快照檢查點(diǎn)機(jī)制的正確性保證以及故障恢復(fù)的開(kāi)銷。我們需要運(yùn)行一個(gè)需要強(qiáng)一致性的流式程序,并定期殺死工作節(jié)點(diǎn)。

這個(gè)實(shí)驗(yàn)的測(cè)試程序受到網(wǎng)絡(luò)安全 / 入侵檢測(cè)等用例的啟發(fā),并使用規(guī)則來(lái)檢查事件序列的有效性(例如,身份驗(yàn)證令牌,登錄,服務(wù)交互)。該程序從 Kafka 并行地讀取事件流,并通過(guò)生成一些實(shí)體標(biāo)識(shí)(例如,IP 地址或用戶 ID)作為主鍵進(jìn)行分組。 對(duì)于每個(gè)事件,流式處理程序會(huì)根據(jù)一些業(yè)務(wù)規(guī)則校驗(yàn)事件的順序性(例如,“服務(wù)交互”必須在“登錄”之前)。 對(duì)于亂序,或者說(shuō)無(wú)效的事件序列,程序會(huì)發(fā)布警報(bào)。如果沒(méi)有 exactly-once 的語(yǔ)義保證,人為制造的故障將直接產(chǎn)生無(wú)效的事件序列并導(dǎo)致程序發(fā)布錯(cuò)誤警報(bào)。

同樣,我們?cè)谝粋(gè) 30 節(jié)點(diǎn)的集群中運(yùn)行這個(gè)程序,其中“YARN chaos monkey”進(jìn)程每 5 分鐘將隨機(jī)殺死一個(gè)的 YARN 容器。 我們保留備用 worker(即 Flink 中的 TaskManagers),這樣系統(tǒng)可以在發(fā)生故障后立即獲取到新資源并運(yùn)行作業(yè),而無(wú)需等待 YARN 啟動(dòng)新的容器。接著,F(xiàn)link 將重新啟動(dòng)失敗的 worker 并在后臺(tái)將其加入到 Flink 調(diào)度集群中,以確保備用 worker 始終可用。

為了保證能夠模擬出我們期待的效果,我們開(kāi)發(fā)了并發(fā)的數(shù)據(jù)生成器,這些生成器將以每核每秒 30000 的速率生成數(shù)據(jù),并將數(shù)據(jù)推送到 Kafka。 下圖顯示了數(shù)據(jù)生成器的速率(紅線),以及從 Kafka 讀取事件并使用規(guī)則驗(yàn)證事件序列的 Flink 作業(yè)的吞吐量(藍(lán)線)。

 

 

后續(xù)規(guī)劃

在 dataArtisans 公司,我們正在研究 Flink 流處理的幾個(gè)重大功能,并希望很快將它們作為下一個(gè) Flink 版本的一部分提供。(譯注: 這篇文章寫于 2015 年,因此下面作者提到的 Flink Feature 實(shí)際上已經(jīng)全部實(shí)現(xiàn))。

高可用性

現(xiàn)在,F(xiàn)link 的主節(jié)點(diǎn)(稱為 JobManager)是單點(diǎn)故障。 我們正在引入具有備用主節(jié)點(diǎn)的主高可用性,該節(jié)點(diǎn)使用 Apache Zookeeper 進(jìn)行主 / 備用協(xié)調(diào)。

EventTime 和 Watermark

我們正在向 Flink 添加按事件時(shí)間處理亂序事件的能力,即創(chuàng)建記錄時(shí)的時(shí)間戳而不是處理時(shí)的時(shí)間戳,以及 Watermark 的引入。

改進(jìn)運(yùn)行作業(yè)的監(jiān)控

我們正在開(kāi)發(fā)一個(gè)完全重新設(shè)計(jì)的管理接口,該接口提供用戶可以在運(yùn)行時(shí)觀察底層運(yùn)行細(xì)節(jié),并獲取統(tǒng)計(jì)信息,例如累加器 (accumulators)。 如果您對(duì)此感興趣并希望了解有關(guān) Apache Flink™,Google Cloud Dataflow 以及其他技術(shù)和實(shí)際用例的更多信息,請(qǐng)注冊(cè) Flink Forward 2015。

譯者簡(jiǎn)介

陳守元(花名:巴真)阿里巴巴產(chǎn)品專家。阿里巴巴實(shí)時(shí)計(jì)算團(tuán)隊(duì)產(chǎn)品負(fù)責(zé)人,2010 年畢業(yè)即加入阿里集團(tuán)參與淘寶數(shù)據(jù)平臺(tái)建設(shè),近 10 年的大數(shù)據(jù)從業(yè)經(jīng)驗(yàn),開(kāi)源項(xiàng)目 Alibaba DataX 發(fā)起人,當(dāng)前負(fù)責(zé)阿里實(shí)時(shí)計(jì)算產(chǎn)品 Flink 的規(guī)劃與設(shè)計(jì),致力于推動(dòng) Flink 成為下一代大數(shù)據(jù)處理標(biāo)準(zhǔn)。

標(biāo)簽: Google 安全 大數(shù)據(jù) 大數(shù)據(jù)處理 大數(shù)據(jù)基礎(chǔ) 大數(shù)據(jù)系統(tǒng) 代碼 金融 開(kāi)發(fā)者 媒體 數(shù)據(jù)庫(kù) 搜索 網(wǎng)絡(luò) 網(wǎng)絡(luò)安全

版權(quán)申明:本站文章部分自網(wǎng)絡(luò),如有侵權(quán),請(qǐng)聯(lián)系:west999com@outlook.com
特別注意:本站所有轉(zhuǎn)載文章言論不代表本站觀點(diǎn)!
本站所提供的圖片等素材,版權(quán)歸原作者所有,如需使用,請(qǐng)與原作者聯(lián)系。

上一篇:報(bào)告 | 自然語(yǔ)言處理到底哪家強(qiáng)?這些企業(yè)上榜了

下一篇:全球18家頂級(jí)企業(yè)數(shù)據(jù)存儲(chǔ)供應(yīng)商