中文字幕在线观看,亚洲а∨天堂久久精品9966,亚洲成a人片在线观看你懂的,亚洲av成人片无码网站,亚洲国产精品无码久久久五月天

使用 Redis 流和 Apache Spark 處理實時數(shù)據(jù)

2019-06-04    來源:raincent

容器云強勢上線!快速搭建集群,上萬Linux鏡像隨意使用

本文要點

Apache Spark 的流框架(Structured Streaming)為數(shù)據(jù)流帶來了 SQL 查詢功能,讓用戶可以實時、可擴展地處理數(shù)據(jù)。

Redis 流(Redis Stream)是 Redis 5.0 新引入的數(shù)據(jù)結(jié)構(gòu),能夠以亞毫秒級的延遲高速收集、保存和分發(fā)數(shù)據(jù)。

用戶集成 Redis 流和流框架后就能簡化連續(xù)應(yīng)用程序(continuous application)的擴展工作。

開源的 Spark-Redis 庫將 Apache  Spark 與 Redis 連接起來。該庫為 Redis 數(shù)據(jù)結(jié)構(gòu)提供 RDD 和數(shù)據(jù)幀 API,使用戶可以將 Redis 流用作流框架的數(shù)據(jù)源。

流框架是 Apache Spark 2.0 新引入的一項功能,在業(yè)界和數(shù)據(jù)工程社區(qū)中引起了很大關(guān)注。流框架 API 構(gòu)建于 Spark SQL 引擎之上,為流數(shù)據(jù)提供類似 SQL 的界面。

早期的 Apache Spark 以微批處理方式處理流框架查詢,延遲大約為 100 毫秒。

去年的 2.3 版本引入了低延遲(1 毫秒)的“連續(xù)處理”,進一步推動了流框架的應(yīng)用。

為了讓 Spark 保持高速的連續(xù)處理狀態(tài),你需要使用像 Redis 這樣的高速流數(shù)據(jù)庫來支持它。

Redis 開源內(nèi)存數(shù)據(jù)庫以其高速度和亞毫秒級延遲聞名于世。最近 Redis 5.0 新推出了一種名為 Redis 流的數(shù)據(jù)結(jié)構(gòu),使 Redis 能夠在多個生產(chǎn)者和消費者之間消費、保存和分發(fā)流數(shù)據(jù)。

現(xiàn)在的問題是,將 Redis 流作為流數(shù)據(jù)庫,Apache Spark 作為數(shù)據(jù)處理引擎,兩者共同部署,怎樣才能做到最佳搭配?

用 Scala 編寫的Spark-Redis庫就集成了 Apache Spark 和 Redis,使用它可以:

在 Redis 中以 RDD 的形式讀寫數(shù)據(jù)
在 Redis 中以數(shù)據(jù)幀的形式讀寫數(shù)據(jù)(例如,它允許將 Spark SQL 表映射到 Redis 數(shù)據(jù)結(jié)構(gòu))
使用 Redis 流作為流框架的數(shù)據(jù)源
在流框架之后將 Redis 實現(xiàn)為接收器

本文中我將介紹一個真實場景,并指導(dǎo)你如何使用 Redis 和 Apache Spark 實時處理流數(shù)據(jù)。

模擬場景:計算實時點擊

假設(shè)我們是一家廣告公司,在熱門網(wǎng)站上投放廣告。我們根據(jù)社交媒體上的熱門圖片制作包含流行話題梗的動圖,并將其作為廣告投放出去。為了最大化利潤,我們必須識別出能獲得病毒式傳播或贏得更多點擊次數(shù)的資產(chǎn),這樣就能加大它們的投放力度了。

我們的大部分資產(chǎn)傳播期很短,所以能實時處理點擊的話,我們就能快速生成傳播趨勢圖,這對業(yè)務(wù)至關(guān)重要。我們理想中的流數(shù)據(jù)解決方案必須記錄所有廣告點擊并實時處理,然后計算每項資產(chǎn)的實時點擊次數(shù)。以下是設(shè)計思路:

 

 

圖 1 實時計算點擊次數(shù)的流程示意

輸入

對于每次點擊,我們的數(shù)據(jù)提取方案(圖 1 中的方框 1)將資產(chǎn) ID 和廣告費用放在 Redis 流中:

例如:

輸出

在圖 1 中的方框 2 部分處理數(shù)據(jù)之后,我們的結(jié)果會存儲在數(shù)據(jù)存儲區(qū)中。數(shù)據(jù)查詢方案(圖 1 中的方框 3)為數(shù)據(jù)提供了一個 SQL 接口,我們可以用它查詢最近幾分鐘的最高點擊次數(shù):

構(gòu)建解決方案

現(xiàn)在我們已經(jīng)定義好了業(yè)務(wù)需求,接下來探討如何使用 Redis 5.0 和 Apache Spark 2.4 構(gòu)建其解決方案。在本文中我用的是 Scala 編程語言,但你也可以在 Java 或 Python 中使用 Spark-Redis 庫。

 

 

圖 2 解決方案架構(gòu)

這張流程圖看起來非常簡單:首先系統(tǒng)將數(shù)據(jù)提取到 Redis 流,然后 Redis 流將數(shù)據(jù)作為 Spark 進程消費,并將結(jié)果聚合傳回 Redis,最后使用 Spark-SQL 接口在 Redis 中查詢結(jié)果。

數(shù)據(jù)提取:我選擇用 Redis 流提取數(shù)據(jù),因為它是 Redis 中的內(nèi)置數(shù)據(jù)結(jié)構(gòu),每秒可處理超過一百萬次讀寫操作。此外它還可以根據(jù)時間自動對數(shù)據(jù)排序,并支持簡化數(shù)據(jù)讀取方式的消費者組。Spark-Redis 庫支持將 Redis 流作為數(shù)據(jù)源,因此它完全符合我們對流式數(shù)據(jù)庫使用 Apache Spark 引擎的需求。

數(shù)據(jù)處理:Apache Spark 中的流框架 API 是我們處理數(shù)據(jù)的絕佳選擇,而 Spark-Redis 庫使我們能夠?qū)⒌竭_ Redis 流的數(shù)據(jù)轉(zhuǎn)換為數(shù)據(jù)幀。使用流框架時,我們可以用微批處理或 Spark 的連續(xù)處理模式運行查詢。我們還可以開發(fā)一個自定義的“編寫器”來將數(shù)據(jù)寫入指定目的地。如圖 2 所示,我們將使用哈希數(shù)據(jù)結(jié)構(gòu)將輸出寫入 Redis。

數(shù)據(jù)查詢:Spark-Redis 庫允許你將本機 Redis 數(shù)據(jù)結(jié)構(gòu)映射為數(shù)據(jù)幀。我們可以聲明一個將列映射到哈希數(shù)據(jù)結(jié)構(gòu)特定鍵的“臨時表”,并且由于 Redis 的速度非?,延遲在亞毫秒級別,我們可以使用 Spark-SQL 獲得實時查詢能力。

之后我將逐個介紹如何開發(fā)并運行解決方案的各個組件。在那之前,我們先用適當(dāng)?shù)墓ぞ邅沓跏蓟_發(fā)環(huán)境。

尋找合適的開發(fā)工具

在我們的示例中,我們將使用 Homebrew 包管理器在 macOS 上下載和安裝軟件,你也可以根據(jù)你操作系統(tǒng)的情況選擇其他包管理器。

Redis 5.0或更高版本:首先,我們需要在環(huán)境中下載并安裝 Redis 5.x。舊版本的 Redis 不支持 Redis 流。

在 Homebrew 上,我們用下面的命令安裝并啟動 Redis 5.0:

$ brew install Redis
$ brew services start Redis

如果你用的還是舊版 Redis,可以用下面的命令升級它:

$ brew upgrade Redis

Apacke Spark 2.3或更高版本:接下來我們從官方網(wǎng)站下載并安裝 Apache Spark,或者使用 Homebrew 安裝:

$ brew install apache-spark

Scala 2.12.8或更高版本:Scala 也是一樣的操作:

$ brew install scala

Apache Maven:我們需要用 Maven 來構(gòu)建 Spark-Redis 庫。

$ brew install maven

JDK 1.8或更高版本:我們可以使用下面的命令從甲骨文網(wǎng)站或 Homebrew 下載并安裝這個 JDK。對于最新版本的 JDK,我們需要用 java 替換 java8。

$ brew cask install java8

Spark-Redis庫:這是我們解決方案的核心部分,這里從 GitHub 下載庫并構(gòu)建軟件包,如下所示:

$ git clone https://github.com/RedisLabs/spark-redis.git
$ cd spark-redis
$ mvn clean package -DskipTests

這會在./target/ 目錄下加入 spark-redis--jar-with-dependencies.jar。在我的設(shè)置中這個文件是 spark-redis-2.3.1-SNAPSHOT-jar-with-dependencies.jar

SBT 1.2.8或更高版本:SBT 是一個 Scala 構(gòu)建工具,可簡化管理和構(gòu)建 Scala 文件的工作。

$ brew install sbt

開發(fā)環(huán)境:最后該設(shè)置文件夾結(jié)構(gòu)并構(gòu)建文件了。本示例中我們將把程序代碼放在“scala”目錄下。

$ mkdir scala
$ cd ./scala

使用以下內(nèi)容創(chuàng)建一個新文件 build.sbt:

初始化目錄。用以下命令初始化包目錄:

構(gòu)建我們的點擊計數(shù)解決方案

如架構(gòu)部分所述,我們的解決方案包含三個部分:數(shù)據(jù)提取組件、Spark 引擎內(nèi)的數(shù)據(jù)處理器和數(shù)據(jù)查詢接口。在本節(jié)中我將詳細(xì)說明這三個部分并組合出一個有效的解決方案。

提取 Redis 流

Redis 流是一種僅附加數(shù)據(jù)結(jié)構(gòu)。假設(shè) Apache Spark 的連續(xù)處理單元將消費這些數(shù)據(jù),我們可以將消息數(shù)限制為一百萬。稍微修改一下前面提到的命令:

大多數(shù)流行的 Redis 客戶端都支持 Redis 流,因此根據(jù)你的編程語言,你可以選擇適用 Python 的 redis-py、適用 Java 的 Jedis 或 Lettuce、適用 Node.js 的 node-redis 等等。

 

 

圖 3 數(shù)據(jù)提取

數(shù)據(jù)處理

這一部分分為三個小節(jié):

從 Redis 流讀取和處理數(shù)據(jù)
將結(jié)果存儲在 Redis 中
運行程序

 

 

圖 4. 數(shù)據(jù)處理

從 Redis 流讀取數(shù)據(jù)

要在 Spark 中從 Redis 流讀取數(shù)據(jù),我們需要明白怎樣連接到 Redis,以及 Redis 流中數(shù)據(jù)的 Schema 結(jié)構(gòu)。

為了連接到 Redis,我們必須為 Redis 創(chuàng)建一個帶有連接參數(shù)的新 Spark 會話(SparkSession):

設(shè)置 Schema 結(jié)構(gòu)時,我們用“clicks”命名流,并為“stream.keys”設(shè)置一個“clicks”的選項。由于每個流元素都包含一項資產(chǎn)以及與之相關(guān)的成本,因此我們將創(chuàng)建一個包含兩個 StructField 的數(shù)組的 StructType——一個用于“asset”,另一個用于“cost”,如下所示:

在第一個程序中我們對每個資產(chǎn)的點擊次數(shù)感興趣。為此創(chuàng)建一個數(shù)據(jù)幀,其中包含按資產(chǎn)計數(shù)分組的數(shù)據(jù):

val byasset = clicks.groupBy("asset").count

最后一步是啟動流框架查詢:

對于連續(xù)處理而言,我們希望在查詢中添加’trigger’命令:.trigger(Trigger.Continuous(“1 second”))。trigger 命令不適用于聚合查詢,因此我們無法把它插入這個示例。

下面是完整的程序代碼。它會從 Redis 流讀取新的點擊數(shù)據(jù)并使用 Spark 的流框架 API 處理。如果你想在自己的環(huán)境中嘗試,請將程序保存在 src/main/scala 下,命名為 ClickAnalysis.scala。(如果你的 Redis 服務(wù)器不是在端口 6379 上本地運行的,請根據(jù)具體情況設(shè)置連接參數(shù)。)

將結(jié)果存儲在 Redis 中

為了將結(jié)果寫回 Redis,我們可以開發(fā)一個名為 ClickForeachWriter 的自定義 ForeachWriter。它會擴展 ForeachWriter,并使用 Redis 的 Java 客戶端 Jedis 連接到 Redis 上。下面是完整的程序代碼,保存為 ClickForeachWriter.scala:

在這部分程序中有一點需要注意:它將結(jié)果存儲在哈希數(shù)據(jù)結(jié)構(gòu)中,其鍵遵循語法“click:”。我將在本文的最后一節(jié)中將此結(jié)構(gòu)轉(zhuǎn)換成數(shù)據(jù)幀來使用。另一點需要指出的是鍵的過期時間是完全可選的。上面展示了如何在每次點擊被記錄時讓鍵的壽命延長五分鐘(300 秒)。

運行程序

在我們運行之前首先需要編譯程序。轉(zhuǎn)到主目錄(我們存儲 build.sbt 的目錄)運行命令:

$ sbt package

我們的程序應(yīng)該能順利編譯通過,沒有錯誤。如果出現(xiàn)了錯誤,請修復(fù)它們并重新運行 sbt 包。編譯完成后,在同一目錄中運行以下命令來啟動程序:

如果你不喜歡調(diào)試消息,可以停止程序(按 ctrl 加 c)并編輯 /usr/local/Cellar/apache-spark/2.4.0/libexec/conf/(或 log4j.properties 文件存儲的目錄)下的 log4j.properties,并將 log4j.rootCategory 更改為 WARN,如下所示:

log4j.rootCategory=WARN, console

該程序?qū)⒆詣訌?Redis 流中提取消息。如果 Redis 流中沒有消息,它將異步偵聽新消息。我們可以在新的控制臺中啟動 redis-cli 并向 Redis 流添加一條消息,以測試它是否在正常消費消息:

一切順利的話,我們應(yīng)該能在哈希數(shù)據(jù)結(jié)構(gòu)中讀取結(jié)果:

查詢數(shù)據(jù):將 Redis 數(shù)據(jù)讀取為數(shù)據(jù)幀

我們解決方案的最后一個組件實際上為 Redis 數(shù)據(jù)提供了一個 SQL 接口。通過 SQL 命令讀取數(shù)據(jù)又是一個兩步過程:首先,我們?yōu)?Redis 數(shù)據(jù)定義 SQL schema;其次,我們運行 SQL 命令。

 

 

圖 5 數(shù)據(jù)查詢

但在此之前,我們需要從主目錄上在控制臺運行 spark-sql,如下所示:

然后會轉(zhuǎn)到 spark-sql 提示符下:

spark-sql>

現(xiàn)在我們要為 Redis 哈希數(shù)據(jù)結(jié)構(gòu)中存儲的數(shù)據(jù)定義 SQL schema。如前所述,我們將每個資產(chǎn)的數(shù)據(jù)存儲在由鍵:click:表示的哈希數(shù)據(jù)結(jié)構(gòu)中。哈希結(jié)構(gòu)中還有一個鍵:count。創(chuàng)建 schema 并將其映射到 Redis 哈希數(shù)據(jù)結(jié)構(gòu)的命令是:

此命令創(chuàng)建一個名為“clicks”的新表視圖。它使用 Spark-Redis 庫中指定的指令將“asset”和“count”列映射到哈希結(jié)構(gòu)中的對應(yīng)字段。現(xiàn)在我們可以運行查詢:

如果要以編程方式運行 SQL 查詢,請參閱 Apache Spark 提供的有關(guān)如何使用 ODBC/JDBC 驅(qū)動程序連接到 Spark 引擎的文檔。

我們的成果是什么?

在本文中,我演示了如何使用 Redis 流作為 Apache Spark 引擎的數(shù)據(jù)源,介紹了 Redis 流是怎樣為流框架用例提供支持的。我還展示了如何使用 Apache Spark 中的數(shù)據(jù)幀 API 讀取 Redis 數(shù)據(jù),并融合流框架和數(shù)據(jù)幀的理念說明了 Spark-Redis 庫可以實現(xiàn)的功能。

Redis 流簡化了高速收集和分發(fā)數(shù)據(jù)的任務(wù)。將其與 Apache Spark 中的流框架相結(jié)合,可以支持需要實時計算的各種解決方案,包括物聯(lián)網(wǎng)、欺詐檢測、人工智能和機器學(xué)習(xí)、實時分析等。

作者介紹

Roshan Kumar 是 Redis Labs 的高級產(chǎn)品經(jīng)理。他在軟件開發(fā)和技術(shù)領(lǐng)域的產(chǎn)品管理方面擁有豐富的經(jīng)驗。他曾在惠普公司和一些成功的硅谷創(chuàng)業(yè)公司工作。他擁有計算機科學(xué)學(xué)士學(xué)位和美國加利福尼亞州圣克拉拉大學(xué)的 MBA 學(xué)位。

查看英文原文:Real-Time Data Processing Using Redis Streams and Apache Spark Structured Streaming

標(biāo)簽: [db:TAGG]

版權(quán)申明:本站文章部分自網(wǎng)絡(luò),如有侵權(quán),請聯(lián)系:west999com@outlook.com
特別注意:本站所有轉(zhuǎn)載文章言論不代表本站觀點!
本站所提供的圖片等素材,版權(quán)歸原作者所有,如需使用,請與原作者聯(lián)系。

上一篇:2019年互聯(lián)網(wǎng)企業(yè)起薪最高,數(shù)據(jù)科學(xué)家崗位薪資排第一

下一篇:IDC:預(yù)計2019年亞太地區(qū)人工智能(AI)系統(tǒng)支出近55億美元