中文字幕在线观看,亚洲а∨天堂久久精品9966,亚洲成a人片在线观看你懂的,亚洲av成人片无码网站,亚洲国产精品无码久久久五月天

使用Apache Kafka和KSQL實現(xiàn)流處理普及化——第二部分

2018-09-24    來源:raincent

容器云強勢上線!快速搭建集群,上萬Linux鏡像隨意使用

本文要點


  • 針對客戶操作、操作儀表板、在線分析等應用場景,使用Apache Kafka和KSQL構建數(shù)據(jù)集成和處理應用程序。
  • 流處理的主要好處包括:數(shù)據(jù)增強一次性完成、低延遲處理、向客戶Ops團隊實時發(fā)送通知。
  • 你可以使用變化數(shù)據(jù)捕獲(CDC)工具把數(shù)據(jù)庫的數(shù)據(jù)以及任何后續(xù)的變化鏡像到Kafka主題。
  • 使用KSQL,很容易把業(yè)務數(shù)據(jù)流和源自數(shù)據(jù)庫、在Kafka主題中維護的相關信息合流。
  • 擴展應用程序使其可以處理更多的通知,而不必修改過濾邏輯。

這是文章“使用Apache Kafka和KSQL實現(xiàn)流處理普及化”的第二部分。第一部分在這里。

在本文中,我們將介紹如何使用Apache Kafka®和KSQL構建數(shù)據(jù)集成和處理應用程序。這是一個來自電商領域的簡單示例:在一個網(wǎng)站上,通過一系列事件跟蹤用戶評論。關于這些用戶的信息,如姓名、聯(lián)系方式、尊貴客戶俱樂部資深會員,保存在數(shù)據(jù)庫的某個地方。對于這類評論數(shù)據(jù),至少有三種用途:

  • 客戶操作——如果一個尊貴客戶俱樂部資深會員留下了差評,我們希望可以馬上做些事情,降低流失這類客戶的風險。我們希望應用程序在出現(xiàn)滿足此條件的評論時立即通知我們。這樣,我們就可以馬上為客戶提供服務,這遠遠好于我們等待一段時間后才運行的批處理為我們標記出需要聯(lián)系的用戶。
  • 操作儀表板實時展示評論輸入,滾動聚合,計算數(shù)量和平均分數(shù),等等,并按用戶區(qū)域劃分。
  • 結合其他數(shù)據(jù)(不論在數(shù)據(jù)湖中,還是在數(shù)據(jù)倉庫中)在線分析分析評論數(shù)據(jù)。這可以擴展到更廣泛的數(shù)據(jù)科學實踐和機器學習應用。所有這些都需要訪問評論信息及用戶的詳細信息。

我們將介紹下如何使用一種更為現(xiàn)代化的模式基于流平臺實現(xiàn)上述功能。我們將使用開源項目Apache Kafka和KSQL來實現(xiàn)。KSQL是一個面向Apache Kafka的流SQL引擎,基于Kafka Streams API實現(xiàn),后者是Apache Kafka的一個組成部分。

下圖展示了流應用程序示例的工作原理。


圖1.流數(shù)據(jù)應用程序

事件是用戶提交到網(wǎng)站的評論,它們被以流的方式直接傳遞給Kafka。從這里,它們可以實時和用戶信息聯(lián)系起來,經(jīng)過充實的結果數(shù)據(jù)會寫回到Kafka。轉(zhuǎn)換完成后,這些數(shù)據(jù)就可以用于驅(qū)動上述應用和目標了。轉(zhuǎn)換邏輯只需要執(zhí)行一次。數(shù)據(jù)一次性從源系統(tǒng)提取。轉(zhuǎn)換后的數(shù)據(jù)可以供不相關的應用程序多次使用。不用對現(xiàn)有組件做任何修改,就可以添加新的源和目標。所有這些操作的延遲都非常低。

因此,高層設計是這樣的:

  • Web應用直接向Kafka發(fā)送評論;
  • Kafka Connect把數(shù)據(jù)庫用戶數(shù)據(jù)快照以流的方式發(fā)送給Kafka,并且直接與CDC保持同步;
  • 流處理把用戶數(shù)據(jù)添加到評論事件,并寫回到一個新的Kafka主題;
  • 流處理會針對VIP用戶的差評篩選出充實后的Kafka主題,并寫入一個新的Kafka主題;
  • 事件驅(qū)動應用會監(jiān)聽Kafka主題,在VIP用戶留下差評后立即推送通知;
  • Kafka Connect把數(shù)據(jù)以流的方式傳入Elasticsearch,供操作儀表板使用;
  • Kafka Connect把數(shù)據(jù)以流的方式傳入S3,供長期在線分析使用以及和其他數(shù)據(jù)集一起使用。

其中,主要好處包括:

  • 數(shù)據(jù)增強一次性完成,可供任何應用程序消費;
  • 數(shù)據(jù)處理延遲低;
  • 可以在VIP客戶留下差評后立即通知客戶Ops團隊——提供更好的客戶體驗,增加業(yè)務保留機會;
  • 容易擴展,可以按需增加新節(jié)點,實現(xiàn)更大的吞吐量。


實現(xiàn)


讓我們看一下構建這個應用程序的詳細過程。GitHub上提供了所有示例的代碼以及docker-compose文件。
 

把數(shù)據(jù)寫入Kafka


Web應用程序有多種方式可以使事件流入Kafka。

  • 許多客戶端庫都提供了Producer API,面向的語言包括Java、.NET、Python、C/C++、Go、node.js等;
  • 開源REST代理,可以發(fā)起HTTP調(diào)用把數(shù)據(jù)發(fā)送到Kafka。

在我們的例子中,應用程序使用了Producer API。

Web應用程序發(fā)送給Kafka主題“評級(ratings)”的消息格式如下:

 

使Kafka可以訪問數(shù)據(jù)庫中的數(shù)據(jù)


在構建應用程序的時候,經(jīng)常需要使用存儲在數(shù)據(jù)庫中的數(shù)據(jù)。在我們的例子中,用戶數(shù)據(jù)保存在MySQL中,不過,設計模式都是一樣的,與采用哪種具體的RDBMS技術無關。

在使用Kafka編寫流處理應用程序時,集成保存在數(shù)據(jù)庫中的數(shù)據(jù)的標準方法是,確保數(shù)據(jù)本身在Kafka中保存和維護。這比聽上去簡單——我們只需要使用數(shù)據(jù)變化捕獲(CDC)工具把數(shù)據(jù)庫中的數(shù)據(jù)和任何后續(xù)的變化鏡像到一個Kafka主題。

這樣做的好處是隔離了數(shù)據(jù)庫和流處理。這主要有兩個好處:數(shù)據(jù)庫不會因為我們的請求增加開銷,我們可以自由使用我們選取的數(shù)據(jù),而又不會使我們的開發(fā)和部署流程和數(shù)據(jù)庫所有者的相耦合。

CDC技術和工具不止一種,我們這里就不介紹了。由于數(shù)據(jù)在MySQL中,我們使用Debezium項目作為我們的CDC工具。它會把用戶表的內(nèi)容快照到Kafka,并使用MySQL的binlog即時檢測后續(xù)MySQL中數(shù)據(jù)的變化并復制到Kafka。

圖2詳細展示了數(shù)據(jù)變化捕獲過程的數(shù)據(jù)流動。


圖2.流應用程序變化數(shù)據(jù)捕獲

從數(shù)據(jù)庫流出、流入Kafka主題asgard.demo.CUSTOMERS的消息格式如下:

 
{ 

使用數(shù)據(jù)庫信息充實事件流

使用KSQL,很容易就可以把源于數(shù)據(jù)庫、在Kafka主題中維護的相關信息合并到評級中。

合并細節(jié)如圖3所示:

第一步是確?蛻糁黝}中的消息以關聯(lián)列為鍵,在這個例子中是客戶ID。我們實際上可以使用KSQL進行重新分區(qū)。KSQL CREATE STREAM的輸出被寫入一個Kafka主題,在默認情況下,會以流本身的名稱命名:

 

現(xiàn)在,到達asgard.demo.CUSTOMERS主題的每條信息都將寫入正確設置了消息鍵的Kafka主題CUSTOMERS_SRC_REKEY。注意,我們不一定要聲明任何模式,因為我們在使用Avro。KSQL和Kafka Connect都無縫集成了開源的Confluent Schema Registry,序列化/反序列化Avro數(shù)據(jù),并在Schema Registry中保存/檢索模式。

為了進行合并,我們使用標準的SQL聯(lián)合查詢語法:

 

我們可以查看這條查詢處理的消息數(shù)量:

 

實際上,這條SQL語句本身就是一個應用程序,就像我們在Java、Python、C……中編寫的代碼一樣。它不斷地執(zhí)行,接收輸入數(shù)據(jù)、處理數(shù)據(jù)、輸出數(shù)據(jù)。我們在上面看到的輸出是該應用程序的運行時指標。
 

使用KSQL過濾數(shù)據(jù)流


我們前面創(chuàng)建的JOIN查詢其輸出是一個Kafka主題,在源自源主題ratings的事件的驅(qū)動下實時填充,如下圖4所示:

我們可以構建第二個KSQL應用程序,由這個派生主題所驅(qū)動,并對數(shù)據(jù)做進一步地處理。這里,我們將簡單地過濾所有評級流,識別那些同時滿足如下兩個條件的評級:

  • 差評(評級范圍1到5,小于3即為差評)
  • “鉑金”客戶留下的評級

SQL給出的語義幾乎可以從字面上表達上述需求。我們可以首先使用KSQL CLI驗證該查詢:

 

然后,和以前一樣,這個持續(xù)查詢的結果可以持久化到一個Kafka主題,只需為語句加上CREATE STREAM ... AS(通常使用縮寫CSAS)前綴。注意,我們可以選擇所有的源列(SELECT *),或者創(chuàng)建一個可用字段的子集(SELECT COL1, COL2),使用哪一個取決于創(chuàng)建流的目的。此外,我們將把目標消息寫成JSON格式:

 

查看生成的Kafka主題,我們可以看到,它只包含我們感興趣的事件。再次強調(diào)一下,這是一個Kafka主題——我們可以使用KSQL查詢它——這里,我將跳過KSQL,使用流行的kafkacat工具查看它:

 

在離開KSQL之前,我們給自己提個醒,我們實際上僅寫了三個流應用程序:

 



由Kafka主題驅(qū)動的推送通知


我們在上面創(chuàng)建的主題UNHAPPY_PLATINUM_CUSTOMERS可以用于驅(qū)動一個應用程序,如果有重要客戶留下了差評,它就會給客戶運營團隊發(fā)送警報。這里的關鍵是,我們基于一個剛剛發(fā)生的事件驅(qū)動了一個實時的動作;谂幚淼姆治鱿轮懿鸥嬖V我們,上周我們讓一位客戶失望了,這就沒用了。我們希望現(xiàn)在就知道,以便我們現(xiàn)在就可以采取行動,向那位客戶提供更好的體驗。

Kafka客戶端庫有面向各種語言的——你幾乎可以選擇任何語言。這里,我們使用面向Python的開源Confluent Kafka庫。這是一個構建事件驅(qū)動應用程序的簡單例子。它在一個Kafka主題上監(jiān)聽事件,然后生成一個推送通知。我們將使用Slack作為我們的通知發(fā)送平臺。為了簡化說明,下面的代碼片段刪除了所有的錯誤處理代碼。我們可以把一個API(如Slack的API)和一個Kafka主題集成,在這個主題上監(jiān)聽事件,從而觸發(fā)一個動作。

 

下圖5展示了使用Slack API發(fā)送用戶通知。

[點擊查看大圖]

這里有必要重申一下,我們正在構建的應用程序(如果你愿意,可以把它稱為微服務)是事件驅(qū)動的。就是說,該應用程序會等待一個事件,然后執(zhí)行動作。它不是嘗試處理所有數(shù)據(jù)并查找特定的條件,也不是一個響應某個命令的同步請求-響應服務。我們已經(jīng)分離出了這些職責:

  • 根據(jù)確定的條件過濾實時事件流是由KSQL完成的(使用我們前面介紹的CREATE STREAM UNHAPPY_PLATINUM_CUSTOMERS語句),匹配的事件被寫入一個Kafka主題;
  • 通知服務有一個唯一的職責,它負責從Kafka主題獲得事件,并基于它生成一個推送通知。這是異步完成的。

這樣做的好處很明顯:

  • 我們可以橫向擴展應用程序,使其處理更多通知,而不必修改過濾邏輯;
  • 我們可以使用可選的其他應用程序替換這個應用程序,而不必修改過濾邏輯;
  • 我們可以替換或修改過濾邏輯,而不必觸及通知應用程序。


Kafka和請求/響應模式


對于基于Kafka平臺編寫應用程序,有一種常見的質(zhì)疑,就是事件驅(qū)動模式不適用于應用程序的流程,并由此推論,Kafka也不適合。這種觀點是錯誤的,有兩個關鍵點需要記住:

  • 事件驅(qū)動模式和請求/響應模式都完全可以使用——它們不是互斥的,有些需求需要使用請求/響應模式;
  • 決定因素應該是需求;應該挑戰(zhàn)現(xiàn)有方法的慣性。在部分或全部應用程序的消息傳遞中使用事件驅(qū)動架構,你可以從它帶來的異步性、可擴展性以及與Kafka的集成中受益,其他所有使用Kafka的系統(tǒng)和應用程序也是如此。

要了解有關這個問題的進一步討論,可以查閱Ben Stopford的系列文章及其最新著作《事件驅(qū)動系統(tǒng)設計》。
 

使數(shù)據(jù)從Kafka流入Elasticsearch,用于操作分析


使用Kafka Connect很容易就可以使數(shù)據(jù)從Kafka流入Elasticsearch。它提供了一個由配置文件控制的可擴展的流集成。有一個開源的Elasticsearch連接器,既可以單獨存在,也可以作為Confluent平臺的一部分。這里,我們將使原始評級及警告信息流入Elasticsearch:

 

在從Kafka Connect到Elasticsearch的數(shù)據(jù)流上,使用Kibana很容易在經(jīng)過充實、過濾的數(shù)據(jù)上構建一個實時儀表板,如圖6所示。

[點擊查看大圖]

 

使數(shù)據(jù)從Kafka流入數(shù)據(jù)湖


最后,我們將使充實后的評級流入數(shù)據(jù)湖。在這里,它可以用于在線分析、訓練機器學習模型和數(shù)據(jù)科學項目,等等。

Kafka中的數(shù)據(jù)可以流入使用Kafka Connect的各種類型的目標。這里,我們將看下S3和BigQuery,但是,使用HDFS、GCS、Redshift、Snowflake DB等也同樣簡單。

就像前面介紹的使數(shù)據(jù)從Kafka流入Elasticsearch一樣,針對每項目標技術的設置只是一個簡單的配置文件設置:


數(shù)據(jù)流入S3后,我們可以在桶里查看,如圖7所示。

[點擊查看大圖]

我們還可以使同樣的數(shù)據(jù)流入谷歌的BigQuery:

 

[點擊查看大圖]

谷歌的Data Studio是眾多可以用于分析這些來自云對象存儲的數(shù)據(jù)的應用程序之一:

[點擊查看大圖]

這里的重點不是上面介紹的具體技術,不管你選擇使用什么樣的數(shù)據(jù)湖技術,使用Kafka Connect都很容易使數(shù)據(jù)流入它。
 

和KSQL及流平臺一起走向未來


在這篇文章中,我們已經(jīng)看了把流平臺作為數(shù)據(jù)架構核心組成部分的其中多個有力的論據(jù)。它提供了一個可擴展的基礎,由于其解耦特性,使系統(tǒng)可以靈活地集成和演進。分析工作可以從流平臺的強大集成能力中獲益。它是流平臺,因此,實時不是其主要動因。應用程序可以從流平臺獲益,因為它是實時的,而且也因為它的集成能力。

借助KSQL,可以使用許多開發(fā)人員都熟悉的語言編寫流處理應用程序。這些應用程序可以是簡單的Kafka事件流過濾器,也可以是復雜的充實模式,從包括數(shù)據(jù)庫在內(nèi)的其他系統(tǒng)獲取數(shù)據(jù)。

要了解更多有關KSQL的信息,你可以觀看教程并自己試一下。文檔中介紹了調(diào)整和部署實踐。在Confluent Community Slack群組中,有一個與此相關的活躍社區(qū)。GitHub上提供了本文的示例。
 

關于作者

Robin Moffatt 是Confluent的一名開發(fā)大使,該公司由Apache Kafka的創(chuàng)建者發(fā)起成立。他還是Oracle ACE總監(jiān)和開發(fā)冠軍。職業(yè)生涯至今,他一直在跟數(shù)據(jù)打交道,從以前的COBOL和DB2到Oracle和Hadoop,再到如今的Kafka。他的主要研究領域是分析、系統(tǒng)架構、性能測試和優(yōu)化。

查看英文原文:Democratizing Stream Processing with Apache Kafka® and KSQL - Part 2

 

標簽: Mysql 代碼 電商 谷歌 數(shù)據(jù)庫

版權申明:本站文章部分自網(wǎng)絡,如有侵權,請聯(lián)系:west999com@outlook.com
特別注意:本站所有轉(zhuǎn)載文章言論不代表本站觀點!
本站所提供的圖片等素材,版權歸原作者所有,如需使用,請與原作者聯(lián)系。

上一篇:不公正的AI算法,在質(zhì)疑中邁向透明化

下一篇:寫作比寫代碼難多了?給數(shù)據(jù)科學家的寫作指南