中文字幕在线观看,亚洲а∨天堂久久精品9966,亚洲成a人片在线观看你懂的,亚洲av成人片无码网站,亚洲国产精品无码久久久五月天

Spark Streaming 調(diào)優(yōu)實(shí)踐

2019-08-21    來(lái)源:raincent

容器云強(qiáng)勢(shì)上線!快速搭建集群,上萬(wàn)Linux鏡像隨意使用

在使用 Spark 和 SparkStreaming 時(shí),當(dāng)我們將應(yīng)用部署在集群上時(shí),可能會(huì)碰到運(yùn)行慢、占用過(guò)多資源、不穩(wěn)定等問(wèn)題,這時(shí)需要做一些優(yōu)化才能達(dá)到最好的性能。有時(shí)候一個(gè)簡(jiǎn)單的優(yōu)化可以起到化腐朽為神奇的作用,使得程序能夠更加有效率,也更加節(jié)省資源。本文我們就來(lái)介紹一些能夠提高應(yīng)用性能的參數(shù)和配置。

另外需要指出的是,優(yōu)化本身是一個(gè)具體性很強(qiáng)的事情,不同的應(yīng)用及落地場(chǎng)景會(huì)有不同的優(yōu)化方式,并沒(méi)有一個(gè)統(tǒng)一的優(yōu)化標(biāo)準(zhǔn)。本文我們簡(jiǎn)單聊聊一些在項(xiàng)目中踩過(guò)的“坑”,列舉以下常見(jiàn)的優(yōu)化方式。

數(shù)據(jù)序列化

在分布式應(yīng)用中,序列化(serialization)對(duì)性能的影響是顯著的。如果使用一種對(duì)象序列化慢、占用字節(jié)多的序列化格式,就會(huì)嚴(yán)重降低計(jì)算效率。通常在 Spark 中,主要有如下 3 個(gè)方面涉及序列化:

①在算子函數(shù)中使用到外部變量時(shí),該變量會(huì)被序列化后進(jìn)行網(wǎng)絡(luò)傳輸。

②將自定義的類型作為 RDD 的泛型類型時(shí),所有自定義類型對(duì)象都會(huì)進(jìn)行序列化。因此這種情況下,也要求自定義的類必須實(shí)現(xiàn) Serializable 接口。

③使用可序列化的持久化策略時(shí)(比如 MEMORY_ONLY_SER),Spark 會(huì)將 RDD 中的每個(gè) partition 都序列化成一個(gè)大的字節(jié)數(shù)組。

而 Spark 綜合考量易用性和性能,提供了下面兩種序列化庫(kù)。

①Java 序列化:默認(rèn)情況下,Spark 使用 Java 的對(duì)象輸出流框架(ObjectOutputStreamframework)來(lái)進(jìn)行對(duì)象的序列化,并且可用在任意實(shí)現(xiàn) Java.io.Serializable 接口的自定義類上。我們可以通過(guò)擴(kuò)展 Java.io.Externalizable 來(lái)更加精細(xì)地控制序列化行為。Java 序列化方式非常靈活,但是通常序列化速度非常慢而且對(duì)于很多類會(huì)產(chǎn)生非常巨大的序列化結(jié)果。

②Kryo 序列化:Spark 在 2.0.0 以上的版本可以使用 Kryo 庫(kù)來(lái)非?焖俚剡M(jìn)行對(duì)象序列化,Kryo 要比 Java 序列化更快、更緊湊(10 倍),但是其不支持所有的 Serializable 類型,并且在使用自定義類之前必須先注冊(cè)。

我們可以在初始化 SparkConf 時(shí),調(diào)用 conf.set("spark.serializer","org.apache.spark.serializer.KryoSerializer") 來(lái)使用 Kryo。一旦進(jìn)行了這個(gè)配置,Kryo 序列化不僅僅會(huì)用在 Shuffling 操作時(shí) worker 節(jié)點(diǎn)間的數(shù)據(jù)傳遞,也會(huì)用在 RDDs 序列化到硬盤的過(guò)程。

Spark 官方解釋沒(méi)有將 Kryo 作為默認(rèn)序列化方式的唯一原因是,Kryo 必須用戶自己注冊(cè)(注意如果我們不注冊(cè)自定義類,Kryo 也是可以正常運(yùn)行的,但是它必須存儲(chǔ)每個(gè)對(duì)象的完整類名,這是非常浪費(fèi)的),但是其推薦在網(wǎng)絡(luò)頻繁傳輸?shù)膽?yīng)用中使用 Kryo。

另外值得注意的是,在 Spark2.0.0 之后,Spark 已經(jīng)默認(rèn)將 Kryo 序列化作為簡(jiǎn)單類型(基本類型、基本類型的數(shù)組及 string 類型)RDD 進(jìn)行 Shuffling 操作時(shí)傳輸數(shù)據(jù)的對(duì)象序列化方式。

Spark 已經(jīng)自動(dòng)包含注冊(cè)了絕大部分 Scala 的核心類,如果需要向 Kryo 注冊(cè)自己的類別,可以使用 registerKryoClasses 方法。使用 Kryo 的代碼框架如下:

// Spark 配置項(xiàng)
val conf = new SparkConf().setMaster(...).setAppName(...)
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") // 配置序列化方式
conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2])) // 注冊(cè)需要序列化的類
val sc = new SparkContext(conf)

如果我們的對(duì)象非常大,可能需要增加 Spark.kryoserializer.buffer 的配置。

同樣在 SparkStreaming 中,通過(guò)優(yōu)化序列化格式可以縮減數(shù)據(jù)序列化的開銷,而在 Streaming 中還會(huì)涉及以下兩類數(shù)據(jù)的序列化。

輸入數(shù)據(jù):SparkStreaming 中不同于 RDD 默認(rèn)是以非序列化的形式存于內(nèi)存當(dāng)中,Streaming 中由接收器(Receiver)接收而來(lái)的數(shù)據(jù),默認(rèn)是以序列化重復(fù)形式(StorageLevel.MEMORY_AND_DISK_SER_2)存放于 Executor 的內(nèi)存當(dāng)中。而采用這種方式的目的,一方面是由于將輸入數(shù)據(jù)序列化為字節(jié)流可以減少垃圾回收(GC)的開銷,另一方面對(duì)數(shù)據(jù)的重復(fù)可以對(duì) Executor 節(jié)點(diǎn)的失敗有更好的容錯(cuò)性。同時(shí)需要注意的是,輸入數(shù)據(jù)流一開始是保存在內(nèi)存當(dāng)中,當(dāng)內(nèi)存不足以存放流式計(jì)算依賴的輸入數(shù)據(jù)時(shí),會(huì)自動(dòng)存放于硬盤當(dāng)中。而在 Streaming 中這部分序列化是一個(gè)很大的開銷,接收器必須先反序列化(deserialize)接收到的數(shù)據(jù),然后再序列化(serialize)為 Spark 本身的序列化格式。

由 Streaming 操作產(chǎn)生 RDD 的持久化:由流式計(jì)算產(chǎn)生的 RDDs 有可能持久化在內(nèi)存當(dāng)中,例如由于基于窗口操作的數(shù)據(jù)會(huì)被反復(fù)使用,所以會(huì)持久化在內(nèi)存當(dāng)中。值得注意的是,不同于 Spark 核心默認(rèn)使用非序列化的持久化方式(StorageLevel.MEMORY_ONLY),流式計(jì)算為了減少垃圾回收(GC)的開銷,默認(rèn)使用了序列化的持久化方式(StorageLevel.MEMORY_ONLY_SER)。

不管在 Spark 還是在 SparkStreaming 中,使用 Kryo 序列化方式,都可以減少 CPU 和內(nèi)存的開銷。而對(duì)于流式計(jì)算,如果數(shù)據(jù)量不是很大,并且不會(huì)造成過(guò)大的垃圾回收(GC)開銷,我們可以考慮利用非序列化對(duì)象進(jìn)行持久化。

例如,我們使用很小的批處理時(shí)間間隔,并且沒(méi)有基于窗口的操作,可以通過(guò)顯示設(shè)置相應(yīng)的存儲(chǔ)級(jí)別來(lái)關(guān)閉持久化數(shù)據(jù)時(shí)的序列化,這樣可以減少序列化引起的 CPU 開銷,但是潛在的增加了 GC 的開銷。

廣播大變量

我們知道,不論 Spark 還是 SparkStreaming 的應(yīng)用,在集群節(jié)點(diǎn)間進(jìn)行數(shù)據(jù)傳輸時(shí),都會(huì)有序列化和反序列化的開銷,而如果我們的應(yīng)用有非常大的對(duì)象時(shí),這部分開銷是巨大的。比如應(yīng)用中的任何子任務(wù)需要使用 Driver 節(jié)點(diǎn)的一個(gè)大型配置查詢表,這時(shí)就可以考慮將該表通過(guò)共享變量的方式,廣播到每一個(gè)子節(jié)點(diǎn),從而大大減少在傳輸和序列化上的開銷。

另外,Spark 在 Master 節(jié)點(diǎn)會(huì)打印每個(gè)任務(wù)的序列化對(duì)象大小,我們可以通過(guò)觀察任務(wù)的大小,考慮是否需要廣播某些大變量。通常一個(gè)任務(wù)的大小超過(guò) 20KB,是值得去優(yōu)化的。

當(dāng)我們將大型的配置查詢表廣播出去時(shí),每個(gè)節(jié)點(diǎn)可以讀取配置項(xiàng)進(jìn)行任務(wù)計(jì)算,那么假設(shè)配置發(fā)生了動(dòng)態(tài)改變時(shí),如何通知各個(gè)子節(jié)點(diǎn)配置表更改了呢?(尤其是對(duì)于流式計(jì)算的任務(wù),重啟服務(wù)代價(jià)還是蠻大的。)

我們知道廣播變量是只讀的,也就是說(shuō)廣播出去的變量沒(méi)法再修改,那么應(yīng)該怎么解決這個(gè)問(wèn)題呢?我們可以利用 Spark 中的 unpersist() 函數(shù),Spark 通常會(huì)按照 LRU(leastRecentlyUsed)即最近最久未使用原則對(duì)老數(shù)據(jù)進(jìn)行刪除,我們并不需要操作具體的數(shù)據(jù),但如果是手動(dòng)刪除,可以使用 unpersist() 函數(shù)。

所以這里更新廣播變量的方式是,利用 unpersist() 函數(shù)先將已經(jīng)發(fā)布的廣播變量刪除,然后修改數(shù)據(jù)后重新進(jìn)行廣播,我們通過(guò)一個(gè)廣播包裝類來(lái)實(shí)現(xiàn)這個(gè)功能,代碼如下:

import java.io.{ ObjectInputStream, ObjectOutputStream }
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.streaming.StreamingContext
import scala.reflect.ClassTag
// 通過(guò)包裝器在 DStream 的 foreachRDD 中更新廣播變量
// 避免產(chǎn)生序列化問(wèn)題
case class BroadcastWrapper[T: ClassTag](
@transient private val ssc: StreamingContext,
@transient private val _v: T) {
@transient private var v = ssc.sparkContext.broadcast(_v)
def update(newValue: T, blocking: Boolean = false): Unit = {
// 刪除 RDD 是否需要鎖定
v.unpersist(blocking)
v = ssc.sparkContext.broadcast(newValue)
}
def value: T = v.value
private def writeObject(out: ObjectOutputStream): Unit = {
out.writeObject(v)
}
private def readObject(in: ObjectInputStream): Unit = {
v = in.readObject().asInstanceOf[Broadcast[T]]
}
}

利用 wrapper 更新廣播變量,可以動(dòng)態(tài)地更新大型的配置項(xiàng)變量,而不用重新啟動(dòng)計(jì)算服務(wù),大致的處理邏輯如下:

// 定義
val yourBroadcast = BroadcastWrapper yourType

yourStream.transform(rdd => {
// 定期更新廣播變量
if (System.currentTimeMillis - someTime > Conf.updateFreq) {
yourBroadcast.update(newValue, true)
}
// do something else
})

數(shù)據(jù)處理和接收時(shí)的并行度

作為分布式系統(tǒng),增加接收和處理數(shù)據(jù)的并行度是提高整個(gè)系統(tǒng)性能的關(guān)鍵,也能夠充分發(fā)揮集群機(jī)器資源。

關(guān)于 partition 和 parallelism。partition 指的就是數(shù)據(jù)分片的數(shù)量,每一次 Task 只能處理一個(gè) partition 的數(shù)據(jù),這個(gè)值太小了會(huì)導(dǎo)致每片數(shù)據(jù)量太大,導(dǎo)致內(nèi)存壓力,或者諸多 Executor 的計(jì)算能力無(wú)法充分利用;但是如果 partition 太大了則會(huì)導(dǎo)致分片太多,執(zhí)行效率降低。

在執(zhí)行 Action 類型操作的時(shí)候(比如各種 reduce 操作),partition 的數(shù)量會(huì)選擇 parentRDD 中最大的那一個(gè)。而 parallelism 則指的是在 RDD 進(jìn)行 reduce 類操作的時(shí)候,默認(rèn)返回?cái)?shù)據(jù)的 paritition 數(shù)量(而在進(jìn)行 map 類操作的時(shí)候,partition 數(shù)量通常取自 parentRDD 中較大的一個(gè),而且也不會(huì)涉及 Shuffle,因此這個(gè) parallelism 的參數(shù)沒(méi)有影響)。

由上述可得,partition 和 parallelism 這兩個(gè)概念密切相關(guān),都是涉及數(shù)據(jù)分片,作用方式其實(shí)是統(tǒng)一的。通過(guò) Spark.default.parallelism 可以設(shè)置默認(rèn)的分片數(shù)量,而很多 RDD 的操作都可以指定一個(gè) partition 參數(shù)來(lái)顯式控制具體的分片數(shù)量,如 reduceByKey 和 reduceByKeyAndWindow。

SparkStreaming 接收 Kafka 數(shù)據(jù)的方式,這個(gè)過(guò)程有一個(gè)數(shù)據(jù)反序列化并存儲(chǔ)到 Spark 的開銷,如果數(shù)據(jù)接收成為了整個(gè)系統(tǒng)的瓶頸,那么可以考慮增加數(shù)據(jù)接收的并行度。每個(gè)輸入 DStream 會(huì)創(chuàng)建一個(gè)單一的接收器(receiver 在 worker 節(jié)點(diǎn)運(yùn)行)用來(lái)接收一個(gè)單一的數(shù)據(jù)流。而對(duì)于接收多重?cái)?shù)據(jù)的情況,可以創(chuàng)建多個(gè)輸入 DStream 用來(lái)接收源數(shù)據(jù)流的不同分支(partitions)。

如果我們利用 Receiver 的形式接收 Kafka,一個(gè)單一的 Kafka 輸入 DStream 接收了兩個(gè)不同 topic 的數(shù)據(jù)流,我們?yōu)榱颂岣卟⑿卸瓤梢詣?chuàng)建兩個(gè)輸入流,分別接收其中一個(gè) topic 上的數(shù)據(jù)。這樣就可以創(chuàng)建兩個(gè)接收器來(lái)并行地接收數(shù)據(jù),從而提高整體的吞吐量。而之后對(duì)于多個(gè) DStreams,可以通過(guò) union 操作并為一個(gè) DStream,之后便可以在這個(gè)統(tǒng)一的輸入 DStream 上進(jìn)行操作,代碼示例如下:

val numStreams = 5
val kafkaStreams = (1 to numStreams).map { i => KafkaUtils.createStream(...)}
val unifiedStream = streamingContext.union(kafkaStreams)
unifiedStream.print()

如果采用 Direct 連接方式,前面講過(guò) Spark 中的 partition 和 Kafka 中的 partition 是一一對(duì)應(yīng)的,但一般默認(rèn)設(shè)置為 Kafka 中 partition 的數(shù)量,這樣來(lái)達(dá)到足夠并行度以接收 Kafka 數(shù)據(jù)。

設(shè)置合理的批處理間隔

對(duì)于一個(gè) SparkStreaming 應(yīng)用,只有系統(tǒng)處理數(shù)據(jù)的速度能夠趕上數(shù)據(jù)接收的速度,整個(gè)系統(tǒng)才能保持穩(wěn)定,否則就會(huì)造成數(shù)據(jù)積壓。換句話說(shuō),即每個(gè) batch 的數(shù)據(jù)一旦生成就需要被盡快處理完畢。這一點(diǎn)我們可以通過(guò) Spark 監(jiān)控界面進(jìn)行查看(在 2.3.4 節(jié)我們介紹過(guò)),比較批處理時(shí)間必須小于批處理間隔。

通過(guò)設(shè)置合理的批處理大小(batchsize),使得每批數(shù)據(jù)能夠在接收后被盡快地處理完成(即數(shù)據(jù)處理的速度趕上數(shù)據(jù)生成的速度)。

如何選取合適的批處理時(shí)間呢?一個(gè)好的方法是:先保守地設(shè)置一個(gè)較大的批處理間隔(如 5~10s),以及一個(gè)很低的數(shù)據(jù)速率,來(lái)觀測(cè)系統(tǒng)是否能夠趕上數(shù)據(jù)傳輸速率。我們可以通過(guò)查看每個(gè)處理好的 batch 的端到端延遲來(lái)觀察,也可以看全局延遲來(lái)觀察(可以在 Sparklog4j 的日志里或者使用 StreamingListener 接口,也可以直接在 UI 界面查看)。

如果延遲保持在一個(gè)相對(duì)穩(wěn)定的狀態(tài),則整個(gè)系統(tǒng)是穩(wěn)定的,否則延遲不斷上升,那說(shuō)明整個(gè)系統(tǒng)是不穩(wěn)定的。在實(shí)際場(chǎng)景中,也可以直接觀察系統(tǒng)正在運(yùn)行的 Spark 監(jiān)控界面來(lái)判斷系統(tǒng)的穩(wěn)定性。

內(nèi)存優(yōu)化

內(nèi)存優(yōu)化是在所有應(yīng)用落地中必須經(jīng)歷的話題,雖然 Spark 在內(nèi)存方面已經(jīng)為開發(fā)者做了很多優(yōu)化和默認(rèn)設(shè)置,但是我們還是需要針對(duì)具體的情況進(jìn)行調(diào)試。

在優(yōu)化內(nèi)存的過(guò)程中需要從 3 個(gè)方面考慮這個(gè)問(wèn)題:對(duì)象本身需要的內(nèi)存;訪問(wèn)這些對(duì)象的內(nèi)存開銷;垃圾回收(GCgarbagecollection)導(dǎo)致的開銷。

通常來(lái)說(shuō),對(duì)于 Java 對(duì)象而言,有很快的訪問(wèn)速度,但是很容易消耗原始數(shù)據(jù) 2~5 倍以上的內(nèi)存空間,可以歸結(jié)為以下幾點(diǎn)原因:

① 每個(gè)獨(dú)立的 Java 對(duì)象,都會(huì)有一個(gè)“對(duì)象頭”,大約 16 個(gè)字節(jié)用來(lái)保存一些基本信息,如指向類的指針,對(duì)于一個(gè)只包含很少數(shù)據(jù)量在內(nèi)的對(duì)象(如一個(gè) Int 類型數(shù)據(jù)),這個(gè)開銷是相對(duì)巨大的。

② Java 的 String 對(duì)象會(huì)在原始數(shù)據(jù)的基礎(chǔ)上額外開銷 40 個(gè)字節(jié),因?yàn)槌俗址麛?shù)組(Charsarray)本身之外,還需要保存如字符串長(zhǎng)度等額外信息,而且由于 String 內(nèi)部存儲(chǔ)字符時(shí)是按照 UTF-16 格式編碼的,所以一個(gè) 10 字符的字符串開銷很容易超過(guò) 60 個(gè)字符。

③ 對(duì)于集合類(collectionclasses),如 HashMap、LinkedList,通常使用鏈表的形式將數(shù)據(jù)結(jié)構(gòu)鏈在一起,那么對(duì)于每一個(gè)節(jié)點(diǎn)(entry,如 Map.Entry)都會(huì)有一個(gè)包裝器(wrapper),而這個(gè)包裝器對(duì)象不僅包含對(duì)象頭,還會(huì)保存指向下一個(gè)節(jié)點(diǎn)的指針(每個(gè) 8 字節(jié))。

④熟悉 Java 的開發(fā)者應(yīng)該知道,Java 數(shù)據(jù)類型分為基本類型和包裝類型,對(duì)于 int、long 等基本類型是直接在棧中分配空間,如果我們想將這些類型用在集合類中(如 Map),需要使用對(duì)基本數(shù)據(jù)類型打包(當(dāng)然這是 Java 的一個(gè)自動(dòng)過(guò)程),而打包后的基本數(shù)據(jù)類型就會(huì)產(chǎn)生額外的開銷。

針對(duì)以上內(nèi)存優(yōu)化的基本問(wèn)題,接下來(lái)首先介紹 Spark 中如何管理內(nèi)存,之后介紹一些能夠在具體應(yīng)用中更加有效地使用內(nèi)存的具體策略,例如,如何確定合適的內(nèi)存級(jí)別,如何改變數(shù)據(jù)結(jié)構(gòu)或?qū)?shù)據(jù)存儲(chǔ)為序列化格式來(lái)節(jié)省內(nèi)存等,也會(huì)從 Spark 的緩存及 Java 的垃圾回收方面進(jìn)行分析,另外,也會(huì)對(duì) SparkStreaming 進(jìn)行分析。

1. 內(nèi)存管理

Spark 對(duì)于內(nèi)存的使用主要有兩類用途:執(zhí)行(execution)和存儲(chǔ)(storage)。執(zhí)行類內(nèi)存主要被用于 Shuffle 類操作、join 操作及排序(sort)和聚合(aggregation)類操作,而存儲(chǔ)類內(nèi)存主要用于緩存數(shù)據(jù)(caching)和集群間內(nèi)部數(shù)據(jù)的傳送。

在 Spark 內(nèi)部執(zhí)行和存儲(chǔ)分享同一片內(nèi)存空間(M),當(dāng)沒(méi)有執(zhí)行類內(nèi)存被使用時(shí),存儲(chǔ)類內(nèi)存可以使用全部的內(nèi)存空間,反之亦然。執(zhí)行類內(nèi)存可以剝奪存儲(chǔ)類內(nèi)存的空間,但是有一個(gè)前提是,存儲(chǔ)類內(nèi)存所占空間不得低于某一個(gè)閾值 R,也就是說(shuō) R 指定了 M 中的一塊子空間塊是永遠(yuǎn)不會(huì)被剝奪的。而另一方面由于實(shí)現(xiàn)上的復(fù)雜性,存儲(chǔ)類內(nèi)存是不可以剝奪執(zhí)行類內(nèi)存的。

Spark 的這種設(shè)計(jì)方式確保了系統(tǒng)一些很好的特性:首先,如果應(yīng)用不需要緩存數(shù)據(jù),那么所有的空間都可以用作執(zhí)行類內(nèi)存,可以一定程度上避免不必要的內(nèi)存不夠用時(shí)溢出到硬盤的情況;其次,如果應(yīng)用需要使用緩存數(shù)據(jù),會(huì)有最小的內(nèi)存空間 R 能夠保證這部分?jǐn)?shù)據(jù)塊免于被剝奪;最后,這種方式對(duì)于使用者而言是完全黑盒的,使用者不需要了解內(nèi)部如何根據(jù)不同的任務(wù)負(fù)載來(lái)進(jìn)行內(nèi)存劃分。

Spark 提供了兩個(gè)相關(guān)的配置,但是大多數(shù)情況下直接使用默認(rèn)值就能滿足大部分負(fù)載情況:

SparkMemory.Fraction 表示 M 的大小占整個(gè) JVM(JavaVirtueMachine)堆空間的比例(默認(rèn)是 0.6),剩余的空間(40%)被用來(lái)保存用戶的數(shù)據(jù)結(jié)構(gòu)及 Spark 內(nèi)部的元數(shù)據(jù)(metadata),另一方面預(yù)防某些異常數(shù)據(jù)記錄造成的 OOM(OutofMemory)錯(cuò)誤。

Spark.Memory.StorageFraction 表示 R 的大小占整個(gè) M 的比例(默認(rèn)是 0.5),R 是存儲(chǔ)類內(nèi)存在 M 中占用的空間,其中緩存的數(shù)據(jù)塊不會(huì)被執(zhí)行類內(nèi)存剝奪。

2. 優(yōu)化策略

當(dāng)我們需要初步判斷內(nèi)存的占用情況時(shí),可以創(chuàng)建一個(gè) RDD,然后將其緩存(cache)起來(lái),然后觀察網(wǎng)頁(yè)監(jiān)控頁(yè)面的存儲(chǔ)頁(yè)部分,就可以看出 RDD 占用了多少內(nèi)存。而對(duì)于特殊的對(duì)象,我們可以調(diào)用 SizeEstimator 的 estimate() 方法來(lái)評(píng)估內(nèi)存消耗,這對(duì)于實(shí)驗(yàn)不同數(shù)據(jù)層的內(nèi)存消耗,以及判斷廣播變量在每個(gè) Executor 堆上所占用的內(nèi)存是非常有效的。

當(dāng)我們了解了內(nèi)存的消耗情況后,發(fā)現(xiàn)占用內(nèi)存過(guò)大,可以著手做一些優(yōu)化,一方面可以在數(shù)據(jù)結(jié)構(gòu)方面進(jìn)行優(yōu)化。首先需要注意的是,我們要避免本章開頭提到的 Java 本身數(shù)據(jù)結(jié)構(gòu)的頭部開銷,比如基于指針的數(shù)據(jù)結(jié)構(gòu)或者包裝器類型,有以下方式可以進(jìn)行優(yōu)化:

在設(shè)計(jì)數(shù)據(jù)結(jié)構(gòu)時(shí),優(yōu)先使用基本數(shù)據(jù)類型及對(duì)象數(shù)組等,避免使用 Java 或者 Scala 標(biāo)準(zhǔn)庫(kù)當(dāng)中的集合類(如 HashMap),在 fastutil 庫(kù)中,為基本數(shù)據(jù)類型提供了方便的集合類接口,這些接口也兼容 Java 標(biāo)準(zhǔn)庫(kù)。

盡可能避免在數(shù)據(jù)結(jié)構(gòu)中嵌套大量的小對(duì)象和指針。

考慮使用數(shù)值類 ID 或者枚舉對(duì)象來(lái)代替字符串類型作為主鍵(Key)。

如果我們的運(yùn)行時(shí)內(nèi)存小于 32GB,可以加上 JVM 配置 -XX:+UseCompressedOops 將指針的占用空間由 8 個(gè)字節(jié)壓縮到 4 個(gè)字節(jié),我們也可以在 Spark-env.sh 中進(jìn)行配置。

假設(shè)我們通過(guò)以上策略還是發(fā)現(xiàn)對(duì)象占用了過(guò)大的內(nèi)存,可以用一個(gè)非常簡(jiǎn)單的方式來(lái)降低內(nèi)存使用,就是將對(duì)象以序列化的形式(serializedform)存儲(chǔ),在 RDD 的持久化接口中使用序列化的存儲(chǔ)級(jí)別,如 MEMORY_ONLY_SER,Spark 便會(huì)將每個(gè) RDD 分區(qū)存儲(chǔ)為一個(gè)很大的字節(jié)數(shù)組。而這種方式會(huì)使得訪問(wèn)數(shù)據(jù)的速度有所下降,因?yàn)槊總(gè)對(duì)象訪問(wèn)時(shí)都需要有一個(gè)反序列化的過(guò)程。在 7.1 節(jié)中我們已經(jīng)介紹過(guò),優(yōu)先使用 Kryo 序列化方式,其占用大小遠(yuǎn)低于 Java 本身的序列化方式。

3. 垃圾回收(GC)優(yōu)化

如果我們?cè)趹?yīng)用中進(jìn)行了頻繁的 RDD 變動(dòng),那么 JVM 的垃圾回收會(huì)成為一個(gè)問(wèn)題(也就是說(shuō),假設(shè)在程序中只創(chuàng)建了一個(gè) RDD,后續(xù)所有操作都圍繞這個(gè) RDD,那么垃圾回收就不存在問(wèn)題)。當(dāng) Java 需要通過(guò)刪除舊對(duì)象來(lái)為新對(duì)象開辟空間時(shí),它便會(huì)掃描我們?cè)鴦?chuàng)建的所有對(duì)象并找到不再使用的對(duì)象。

所以垃圾回收的開銷是和 Java 對(duì)象的個(gè)數(shù)成比例的,我們要盡可能地使用包含較少對(duì)象的數(shù)據(jù)結(jié)構(gòu)(如使用 Int 數(shù)組代替 LinkedList)來(lái)降低這部分開銷。另外前面提到的用序列化形式存儲(chǔ)也是一個(gè)很好的方法,序列化后每個(gè)對(duì)象在每個(gè) RDD 分區(qū)下僅有一個(gè)對(duì)象(一個(gè)字節(jié)數(shù)組)。注意當(dāng) GC 開銷成為瓶頸時(shí),首先要嘗試的便是序列化緩存(serializedcaching)。

在做 GC 優(yōu)化時(shí),我們首先需要了解 GC 發(fā)生的頻率以及其所消耗的時(shí)間。這可以通過(guò)在 Java 選項(xiàng)中加入 -verbose:gc-XX:+PrintGCDetails-XX:+PrintGCTimeStamps 來(lái)實(shí)現(xiàn);之后當(dāng) Spark 任務(wù)運(yùn)行后,便可以在 Worker 日志中看到 GC 發(fā)生時(shí)打印的信息。注意這些日志是打印在集群中的 Worker 節(jié)點(diǎn)上的(在工作目錄的 stdout 文件中),而非 Driver 程序。

為了進(jìn)一步優(yōu)化 GC,首先簡(jiǎn)單介紹下 Java 虛擬機(jī)內(nèi)部是如何進(jìn)行內(nèi)存管理的。

①Java 對(duì)象是存儲(chǔ)在堆空間內(nèi)的,堆空間被分為兩部分,即年輕區(qū)域(Youngregion)和老年區(qū)域(Oldregion),其中年輕代(Younggeneration)會(huì)用來(lái)存儲(chǔ)短生命周期的對(duì)象,而老年代(Oldgeneration)會(huì)用來(lái)存儲(chǔ)較長(zhǎng)生命周期的對(duì)象。

②年輕代的區(qū)域又被分為 3 個(gè)部分 [Eden,Survivor1,Survivor2]。

③一個(gè)簡(jiǎn)單的 GC 流程大致是:當(dāng) Eden 區(qū)域滿了,一次小型 GC 過(guò)程會(huì)將 Eden 和 Survivor1 中還存活的對(duì)象復(fù)制到 Survivor2 區(qū)域上,Survivor 區(qū)域是可交換的(即來(lái)回復(fù)制),當(dāng)一個(gè)對(duì)象存活周期已足夠長(zhǎng)或者 Survivor2 區(qū)域已經(jīng)滿時(shí),那么它們會(huì)被移動(dòng)到老年代上,而當(dāng)老年代的區(qū)域也滿了時(shí),就會(huì)觸發(fā)一次完整的 GC 過(guò)程。

Java 的這種 GC 機(jī)制主要是基于程序中創(chuàng)建的大多數(shù)對(duì)象,都會(huì)在創(chuàng)建后被很快銷毀,只有極少數(shù)對(duì)象會(huì)存活下來(lái),所以其分為年輕代和老年代兩部分,而這兩部分 GC 的方式也是不同的,其時(shí)間復(fù)雜度也是不同的,年輕代會(huì)更加快一些,感興趣的讀者可以進(jìn)一步查閱相關(guān)資料。

基于以上原因,Spark 在 GC 方面優(yōu)化的主要目標(biāo)是:只有長(zhǎng)生命周期的 RDD 會(huì)被存儲(chǔ)在老年代上,而年輕代上有足夠的空間來(lái)存儲(chǔ)短生命周期的對(duì)象,從而盡可能避免任務(wù)執(zhí)行時(shí)創(chuàng)建的臨時(shí)對(duì)象觸發(fā)完整 GC 流程。我們可以通過(guò)以下步驟來(lái)一步步優(yōu)化:

①通過(guò) GC 統(tǒng)計(jì)信息觀察是否存在過(guò)于頻繁的 GC 操作,如果在任務(wù)完成前,完整的 GC 操作被調(diào)用了多次,那么說(shuō)明可執(zhí)行任務(wù)并沒(méi)有獲得足夠的內(nèi)存空間。

②如果觸發(fā)了過(guò)多的小型 GC,而完整的 GC 操作并沒(méi)有調(diào)用很多次,那么給 Eden 區(qū)域多分配一些內(nèi)存空間會(huì)有所幫助。我們可以根據(jù)每個(gè)任務(wù)所需內(nèi)存大小來(lái)預(yù)估 Eden 的大小,如果 Eden 設(shè)置大小為 E,可以利用配置項(xiàng) -Xmn=4/3*E 來(lái)對(duì)年輕代的區(qū)域大小進(jìn)行設(shè)置(其中 4/3 的比例是考慮到 survivor 區(qū)域所需空間)。

③如果我們觀察 GC 打印的統(tǒng)計(jì)信息,發(fā)現(xiàn)老年代接近存滿,那么就需要改變 spark.memory.fraction 來(lái)減少存儲(chǔ)類內(nèi)存(用于 caching)的占用,因?yàn)榕c其降低任務(wù)的執(zhí)行速度,不如減少對(duì)象的緩存大小。另一個(gè)可選方案是減少年輕代的大小,即通過(guò) -Xmn 來(lái)進(jìn)行配置,也可以通過(guò) JVM 的 NewRatio 參數(shù)進(jìn)行調(diào)整,大多數(shù) JVM 的該參數(shù)的默認(rèn)值是 2,意思是老年代占整個(gè)堆內(nèi)存的 2/3,這個(gè)比例需要大于 Spark.Memory.Fraction。

④通過(guò)加入 -XX:+UserG1GC 來(lái)使用 G1GC 垃圾回收器,這可以一定程度提高 GC 的性能。另外注意對(duì)于 executor 堆內(nèi)存非常大的情況,一定通過(guò) -XX:G1HeapRegionSize 來(lái)增加 G1 區(qū)域的大小。

針對(duì)以上步驟我們舉一個(gè)例子,如果我們的任務(wù)是從 HDFS 當(dāng)中讀取數(shù)據(jù),任務(wù)需要的內(nèi)存空間可以通過(guò)從 HDFS 當(dāng)中讀取的數(shù)據(jù)塊大小來(lái)進(jìn)行預(yù)估,一般解壓后的數(shù)據(jù)塊大小會(huì)是原數(shù)據(jù)塊的 2~3 倍,所以如果我們希望 3、4 個(gè)任務(wù)同時(shí)運(yùn)行在工作空間中,假設(shè)每個(gè) HDFS 塊大小是 128MB,那么需要將 Eden 大小設(shè)置為 4×3×128MB。改動(dòng)之后,我們可以監(jiān)控 GC 的頻率和時(shí)間消耗,看看有沒(méi)有達(dá)到優(yōu)化的效果。

對(duì)于優(yōu)化 GC,主要還是從降低全局 GC 的頻率出發(fā),executor 中對(duì)于 GC 優(yōu)化的配置可以通過(guò) spark.executor.extraJavaOptions 來(lái)配置。

4.SparkStreaming 內(nèi)存優(yōu)化

前面介紹了 Spark 中的優(yōu)化策略和關(guān)于 GC 方面的調(diào)優(yōu),對(duì)于 SparkStreaming 的應(yīng)用程序,這些策略也都是適用的,除此之外還會(huì)有一些其他方面的優(yōu)化點(diǎn)。

對(duì)于 SparkStreaming 應(yīng)用所需要的集群內(nèi)存,很大程度上取決于要使用哪種類型的 transformation 操作。比如,假設(shè)我們想使用 10 分鐘數(shù)據(jù)的窗口操作,那么我們的集群必須有足夠的空間能夠保存 10 分鐘的全部數(shù)據(jù);亦或,我們?cè)诖罅康逆I值上使用了 updateStateByKey 操作,那么所需要的內(nèi)存空間會(huì)較大。而如果我們僅僅使用簡(jiǎn)單的 Map、Filter、Store 操作,那么所需空間會(huì)較小。

默認(rèn)情況下,接收器接收來(lái)的數(shù)據(jù)會(huì)以 StorageLevel.MEMORY_AND_DISK_SER_2 的格式存儲(chǔ),那么如果內(nèi)存不足時(shí),數(shù)據(jù)就會(huì)序列化到硬盤上,這樣會(huì)損失 SparkStreaming 應(yīng)用的性能。所以通常建議為 SparkStreaming 應(yīng)用分配充足的內(nèi)存,可以在小規(guī)模數(shù)據(jù)集上進(jìn)行測(cè)試和判斷。

另一方面與 Spark 程序有顯著區(qū)別的是,SparkStreaming 程序?qū)?shí)時(shí)性要求會(huì)較高,所以我們需要盡可能降低 JVM 垃圾回收所導(dǎo)致的延遲。

基于此,我們可以通過(guò)以下幾個(gè)參數(shù)對(duì)內(nèi)存使用和 GC 開銷進(jìn)行優(yōu)化調(diào)整。

DStream 的持久化級(jí)別:在前文中講過(guò),輸入數(shù)據(jù)默認(rèn)是持久化為字節(jié)流的,因?yàn)橄噍^于反序列化的開銷,其更會(huì)降低內(nèi)存的使用并且減少 GC 的開銷。所以優(yōu)先使用 Kryo 序列化方式,可以大大降低序列化后的尺寸和內(nèi)存開銷。另外,如果需要更進(jìn)一步減少內(nèi)存開銷,可以通過(guò)配置 spark.rdd.compress 進(jìn)行更進(jìn)一步的壓縮(當(dāng)然對(duì)于目前的集群機(jī)器,大多數(shù)內(nèi)存都足夠了)。

及時(shí)清理老數(shù)據(jù):默認(rèn)情況下所有的輸入數(shù)據(jù)和由 DStream 的 Transormation 操作產(chǎn)生的持久 RDD 會(huì)被自動(dòng)清理,即 SparkStreaming 會(huì)決定何時(shí)對(duì)數(shù)據(jù)進(jìn)行清理。例如,假設(shè)我們使用 10 分鐘的窗口操作,SparkStreaming 會(huì)保存之前 10 分鐘的所有數(shù)據(jù),并及時(shí)清理過(guò)時(shí)的老數(shù)據(jù)。數(shù)據(jù)保存的時(shí)間可以通過(guò) stremingContext.remember 進(jìn)行設(shè)置。

CMS 垃圾回收器:不同于之前我們?cè)?Spark 中的建議,由于需要減少 GC 間的停頓,所以這里建議使用并發(fā)標(biāo)記清除類的 GC 方式。即使并發(fā) GC 會(huì)降低全局系統(tǒng)的生產(chǎn)吞吐量,但是使用這種 GC 可以使得每個(gè) Batch 的處理時(shí)間更加一致(不會(huì)因?yàn)槟硞(gè) Batch 處理時(shí)發(fā)生了 GC,而導(dǎo)致處理時(shí)間劇增)。我們需要確保在 Driver 節(jié)點(diǎn)(在 spark-submit 中使用—driver-java-options)和 Executor 節(jié)點(diǎn)(在 Spark 配置中使用 spark.executor.extraJavaOptions=-XX:+UseConcMarkSweepGC)都設(shè)置了 CMSGC 方式。

其他減少 GC 開銷的方式有:可以通過(guò) OFF_HEAP 存儲(chǔ)級(jí)別的 RDD 持久化方式,以及可以在 Executor 上使用更小的堆內(nèi)存,從而降低每個(gè) JVM 堆垃圾回收的壓力。

實(shí)例——項(xiàng)目實(shí)戰(zhàn)中的調(diào)優(yōu)示例

在某輿情監(jiān)控系統(tǒng)中,對(duì)于每天爬取的千萬(wàn)級(jí)游戲玩家評(píng)論信息都要實(shí)時(shí)地進(jìn)行詞頻統(tǒng)計(jì),對(duì)于爬取到的游戲玩家評(píng)論數(shù)據(jù),我們會(huì)生產(chǎn)輸入到 Kafka 中,而另一端的消費(fèi)者,我們采用了 SparkStreaming 來(lái)進(jìn)行流式處理,首先利用 Direct 方式從 Kafka 拉取 batch,之后經(jīng)過(guò)分詞、統(tǒng)計(jì)等相關(guān)處理,回寫到數(shù)據(jù)庫(kù)(DataBase,DB)上,由此高效實(shí)時(shí)的完成每天大量數(shù)據(jù)的詞頻統(tǒng)計(jì)任務(wù)。

對(duì)于數(shù)據(jù)量較小的情況,一般是不會(huì)暴露問(wèn)題的,但是數(shù)據(jù)量增大后,就會(huì)暴露各種問(wèn)題,這就需要進(jìn)行一些調(diào)優(yōu)和參數(shù)配置?梢酝ㄟ^(guò)以下幾方面進(jìn)行調(diào)優(yōu)嘗試。

1. 合理的批處理時(shí)間(batchDuration)

關(guān)于 SparkStreaming 的批處理時(shí)間設(shè)置是非常重要的,SparkStreaming 在不斷接收數(shù)據(jù)的同時(shí),需要處理數(shù)據(jù)的時(shí)間,所以如果設(shè)置過(guò)段的批處理時(shí)間,會(huì)造成數(shù)據(jù)堆積,即未完成的 batch 數(shù)據(jù)越來(lái)越多,從而發(fā)生阻塞。

另外值得注意的是,batchDuration 本身也不能設(shè)置為小于 500ms,這會(huì)導(dǎo)致 SparkStreaming 進(jìn)行頻繁地提交作業(yè),造成額外的開銷,減少整個(gè)系統(tǒng)的吞吐量;相反如果將 batchDuration 時(shí)間設(shè)置得過(guò)長(zhǎng),又會(huì)影響整個(gè)系統(tǒng)的吞吐量。

如何設(shè)置一個(gè)合理的批處理時(shí)間,需要根據(jù)應(yīng)用本身、集群資源情況,以及關(guān)注和監(jiān)控 SparkStreaming 系統(tǒng)的運(yùn)行情況來(lái)調(diào)整,重點(diǎn)關(guān)注監(jiān)控界面中的 TotalDelay,如圖 1 所示。

 

 

圖 1 SparkUI 中全局延遲

2. 合理的 Kafka 拉取量(maxRatePerPartition 參數(shù)設(shè)置)

對(duì)于數(shù)據(jù)源是 Kafka 的 SparkStreaming 應(yīng)用,在 Kafka 數(shù)據(jù)頻率過(guò)高的情況下,調(diào)整這個(gè)參數(shù)是非常必要的。我們可以改變 spark.streaming.kafka.maxRatePerPartition 參數(shù)的值來(lái)進(jìn)行上限調(diào)整,默認(rèn)是無(wú)上限的,即 Kafka 有多少數(shù)據(jù),SparkStreaming 就會(huì)一次性全拉出,但是上節(jié)提到的批處理時(shí)間是一定的,不可能動(dòng)態(tài)變化,如果持續(xù)數(shù)據(jù)頻率過(guò)高,同樣會(huì)造成數(shù)據(jù)堆積、阻塞的現(xiàn)象。

所以需要結(jié)合 batchDuration 設(shè)置的值,調(diào)整 spark.streaming.kafka.maxRatePerPatition 參數(shù),注意該參數(shù)配置的是 Kafka 每個(gè) partition 拉取的上限,數(shù)據(jù)總量還需乘以所有的 partition 數(shù)量,調(diào)整兩個(gè)參數(shù) maxRatePerPartition 和 batchDuration 使得數(shù)據(jù)的拉取和處理能夠平衡,盡可能地增加整個(gè)系統(tǒng)的吞吐量,可以觀察監(jiān)控界面中的 InputRate 和 ProcessingTime,如圖 2 所示。

 

 

圖 2 SparkUI 中輸入速率和平均處理時(shí)間

3. 緩存反復(fù)使用的 Dstream(RDD)

Spark 中的 RDD 和 SparkStreaming 中的 Dstream 如果被反復(fù)使用,最好利用 cache() 函數(shù)將該數(shù)據(jù)流緩存起來(lái),防止過(guò)度地調(diào)度資源造成的網(wǎng)絡(luò)開銷?梢詤⒖疾⒂^察 SchedulingDelay 參數(shù),如圖 3 所示。

 

 

圖 3 SparkUI 中調(diào)度延遲

4. 其他一些優(yōu)化策略

除了以上針對(duì) SparkStreaming 和 Kafka 這個(gè)特殊場(chǎng)景方面的優(yōu)化外,對(duì)于前面提到的一些常規(guī)優(yōu)化,也可以通過(guò)下面幾點(diǎn)來(lái)完成。

設(shè)置合理的 GC 方式:使用–conf"spark.executor.extraJavaOptions=-XX:+UseConc MarkSweepGC" 來(lái)配置垃圾回收機(jī)制。

設(shè)置合理的 parallelism:在 SparkStreaming+kafka 的使用中,我們采用了 Direct 連接方式,前面講過(guò) Spark 中的 partition 和 Kafka 中的 Partition 是一一對(duì)應(yīng)的,一般默認(rèn)設(shè)置為 Kafka 中 Partition 的數(shù)量。

設(shè)置合理的 CPU 資源數(shù):CPU 的 core 數(shù)量,每個(gè) Executor 可以占用一個(gè)或多個(gè) core,觀察 CPU 使用率(Linux 命令 top)來(lái)了解計(jì)算資源的使用情況。例如,很常見(jiàn)的一種浪費(fèi)是一個(gè) Executor 占用了多個(gè) core,但是總的 CPU 使用率卻不高(因?yàn)橐粋(gè) Executor 并不會(huì)一直充分利用多核的能力),這個(gè)時(shí)候可以考慮讓單個(gè) Executor 占用更少的 core,同時(shí) Worker 下面增加更多的 Executor;或者從另一個(gè)角度,增加單個(gè)節(jié)點(diǎn)的 worker 數(shù)量,當(dāng)然這需要修改 Spark 集群的配置,從而增加 CPU 利用率。值得注意是,這里的優(yōu)化有一個(gè)平衡,Executor 的數(shù)量需要考慮其他計(jì)算資源的配置,Executor 的數(shù)量和每個(gè) Executor 分到的內(nèi)存大小成反比,如果每個(gè) Executor 的內(nèi)存過(guò)小,容易產(chǎn)生內(nèi)存溢出(outofmemory)的問(wèn)題。

高性能的算子:所謂高性能算子也要看具體的場(chǎng)景,通常建議使用 reduceByKey/aggregateByKey 來(lái)代替 groupByKey。而存在數(shù)據(jù)庫(kù)連接、資源加載創(chuàng)建等需求時(shí),我們可以使用帶 partition 的操作,這樣在每一個(gè)分區(qū)進(jìn)行一次操作即可,因?yàn)榉謪^(qū)是物理同機(jī)器的,并不存在這些資源序列化的問(wèn)題,從而大大減少了這部分操作的開銷。例如,可以用 mapPartitions、foreachPartitions 操作來(lái)代替 map、foreach 操作。另外在進(jìn)行 coalesce 操作時(shí),因?yàn)闀?huì)進(jìn)行重組分區(qū)操作,所以最好進(jìn)行必要的數(shù)據(jù)過(guò)濾 filter 操作。

Kryo 優(yōu)化序列化性能:我們只要設(shè)置序列化類,再注冊(cè)要序列化的自定義類型即可(比如算子函數(shù)中使用到的外部變量類型、作為 RDD 泛型類型的自定義類型等)。

5. 結(jié)果

通過(guò)以上種種調(diào)整和優(yōu)化,最終我們想要達(dá)到的目的便是,整個(gè)流式處理系統(tǒng)保持穩(wěn)定,即 SparkStreaming 消費(fèi) Kafka 數(shù)據(jù)的速率趕上爬蟲向 Kafka 生產(chǎn)數(shù)據(jù)的速率,使得 Kafka 中的數(shù)據(jù)盡可能快地被處理掉,減少積壓,才能保證實(shí)時(shí)性,如圖 4 所示。

 

 

圖 4 SparkStreaming 和 Kafka 穩(wěn)定運(yùn)行監(jiān)控圖

當(dāng)然不同的應(yīng)用場(chǎng)景會(huì)有不同的圖形,這是本文詞頻統(tǒng)計(jì)優(yōu)化穩(wěn)定后的監(jiān)控圖,我們可以看到在 ProcessingTime 柱形圖中有一條 Stable 的虛線,而大多數(shù) Batch 都能夠在這一虛線下處理完畢,說(shuō)明整體 SparkStreaming 是運(yùn)行穩(wěn)定的。

對(duì)于項(xiàng)目中具體的性能調(diào)優(yōu),有以下幾個(gè)點(diǎn)需要注意:

一個(gè) DStream 流只關(guān)聯(lián)單一接收器,如果需要并行多個(gè)接收器來(lái)讀取數(shù)據(jù),那么需要?jiǎng)?chuàng)建多個(gè) DStream 流。一個(gè)接收器至少需要運(yùn)行在一個(gè) Executor 上,甚至更多,我們需要保證在接收器槽占用了部分核后,還能有足夠的核來(lái)處理接收到的數(shù)據(jù)。例如在設(shè)置 spark.cores.max 時(shí)需要將接收器的占用考慮進(jìn)來(lái),同時(shí)注意在分配 Executor 給接收器時(shí),采用的是輪循的方式(roundrobinfashion)。

當(dāng)接收器從數(shù)據(jù)源接收到數(shù)據(jù)時(shí),會(huì)創(chuàng)建數(shù)據(jù)塊,在每個(gè)微秒級(jí)的數(shù)據(jù)塊間隔(blockIntervalmilliseconds)中都會(huì)有一個(gè)新的數(shù)據(jù)塊生成。在每個(gè)批處理間隔內(nèi)(batchInterval)數(shù)據(jù)塊的數(shù)量 N=batchInterval/blockInterval。這些數(shù)據(jù)塊會(huì)由當(dāng)前執(zhí)行器(Executor)的數(shù)據(jù)塊管理器(BlockManager)分發(fā)到其他執(zhí)行器的數(shù)據(jù)塊管理器。之后在 Driver 節(jié)點(diǎn)上運(yùn)行的輸入網(wǎng)絡(luò)追蹤器(NetworkInputTracker)會(huì)通知數(shù)據(jù)塊所在位置,以期進(jìn)一步處理。

RDD 是基于 Driver 節(jié)點(diǎn)上每個(gè)批處理間隔產(chǎn)生的數(shù)據(jù)塊(blocks)而創(chuàng)建的,這些數(shù)據(jù)塊是 RDD 的分支(partitions),每個(gè)分支是 Spark 中的一個(gè)任務(wù)(task)。如果 blockInterval==batchInterval,那么意味著創(chuàng)建了單一分支,并且可能直接在本地處理。

數(shù)據(jù)塊上的映射(map)任務(wù)在執(zhí)行器(一個(gè)接收塊,另一個(gè)復(fù)制塊)中處理,該執(zhí)行器不考慮塊間隔,除非出現(xiàn)非本地調(diào)度。擁有更大的塊間隔(blockInterval)意味著更大的數(shù)據(jù)塊,如果將 spark.locality.wait 設(shè)置一個(gè)更大的值,那么更有可能在本地節(jié)點(diǎn)處理數(shù)據(jù)塊。我們需要在兩個(gè)參數(shù)間(blockInterval 和 spark.locality.wait)做一個(gè)折中,確保越大的數(shù)據(jù)塊更可能在本地被處理。

除了依賴于 batchInterval 和 blockInterval,我們可以直接通過(guò) inputDstream.repartition(n) 來(lái)確定分支的數(shù)量。這個(gè)操作會(huì)重新打亂(reshuffles)RDD 中的數(shù)據(jù),隨機(jī)的分配給 n 個(gè)分支。當(dāng)然打亂(shuffle)過(guò)程會(huì)造成一定的開銷,但是會(huì)有更高的并行度。RDD 的處理是由驅(qū)動(dòng)程序的 jobscheduler 作為作業(yè)安排的。在給定的時(shí)間點(diǎn)上,只有一個(gè)作業(yè)是活動(dòng)的。因此,如果一個(gè)作業(yè)正在執(zhí)行,那么其他作業(yè)將排隊(duì)。

如果我們有兩個(gè) Dstreams,那么將形成兩個(gè) RDDs,并將創(chuàng)建兩個(gè)作業(yè),每個(gè)作業(yè)(job)都被安排為一個(gè)接著一個(gè)地執(zhí)行。為了避免這種情況,可以聯(lián)合兩個(gè) Dstreams(union)。這將確保為 Dstreams 的兩個(gè) RDD 形成單一的 unionRDD。而這個(gè) unionRDD 會(huì)被視為一個(gè)作業(yè),但是 RDDs 的分區(qū)不會(huì)受到影響。

如果批處理時(shí)間大于 batchinterval,那么很明顯,接收方的內(nèi)存將逐漸被填滿,并最終拋出異常(很可能是 BlockNotFoundException)。目前沒(méi)有辦法暫停接收,那么可以利用 SparkConf 配置項(xiàng)中的 spark.streaming.receiver.maxRate 來(lái)控制接收器的速率。

小結(jié)

①SparkStreaming 中需要大量的序列化和反序列化操作,在 2.0.0 以上的 Spark 版本中,我們應(yīng)當(dāng)優(yōu)先考慮使用 Kryo 序列化方式。

②對(duì)于非常大的變量,如配置信息,可以提前利用廣播變量的方式傳送給每一個(gè)節(jié)點(diǎn)。

③在流式處理系統(tǒng)中,我們需要兼顧數(shù)據(jù)的接收和數(shù)據(jù)處理,即消費(fèi)數(shù)據(jù)的速率要趕上生產(chǎn)數(shù)據(jù)的速率。當(dāng)發(fā)現(xiàn)生產(chǎn)數(shù)據(jù)速率過(guò)慢時(shí),可以考慮增加并行度,使用更多的接收器(Receiver);如果處理速度過(guò)慢,可以考慮加機(jī)器、優(yōu)化程序邏輯及 GC 優(yōu)化等方式。

④Spark 內(nèi)存分為執(zhí)行類內(nèi)存和存儲(chǔ)類內(nèi)存,執(zhí)行類內(nèi)存可以剝奪存儲(chǔ)類內(nèi)存空間,但是存儲(chǔ)類內(nèi)存空間有一個(gè)最低閾值會(huì)保證保留。

⑤內(nèi)存優(yōu)化最簡(jiǎn)單的方式是使用序列化格式進(jìn)行對(duì)象存儲(chǔ),另外一方面考慮到 Java/Scala 對(duì)象本身會(huì)有所開銷,應(yīng)盡可能減少對(duì)象的數(shù)量。

⑥對(duì)于 Spark 而言,垃圾回收采用 G1GC,而 SparkStreaming 采用 CMS。

⑦調(diào)優(yōu)過(guò)程是一個(gè)觀察,調(diào)整,再觀察,再調(diào)整的過(guò)程,針對(duì)具體問(wèn)題需要進(jìn)行不同策略上的調(diào)整,希望大家多多實(shí)踐。

作者介紹:

肖力濤,浙江大學(xué)計(jì)算機(jī)碩士,前騰訊優(yōu)圖實(shí)驗(yàn)室及 WeTest 研究員,現(xiàn)拼多多資深算法工程師,長(zhǎng)期進(jìn)行大數(shù)據(jù)、自然語(yǔ)言處理、深度學(xué)習(xí)、推薦相關(guān)算法的研究實(shí)踐,有豐富的經(jīng)驗(yàn)。善于總結(jié)和歸納知識(shí)體系,整理的個(gè)人博客,收到了廣泛的閱讀和好評(píng)。擅長(zhǎng)數(shù)據(jù)分析處理、算法實(shí)踐落地、挖掘用戶行為數(shù)據(jù)、大規(guī)模數(shù)據(jù)處理。

標(biāo)簽: Spark  數(shù)據(jù)處理

版權(quán)申明:本站文章部分自網(wǎng)絡(luò),如有侵權(quán),請(qǐng)聯(lián)系:west999com@outlook.com
特別注意:本站所有轉(zhuǎn)載文章言論不代表本站觀點(diǎn)!
本站所提供的圖片等素材,版權(quán)歸原作者所有,如需使用,請(qǐng)與原作者聯(lián)系。

上一篇:貝殼:流式數(shù)據(jù)的平臺(tái)化實(shí)踐與挑戰(zhàn)

下一篇:Hadoop衰落,數(shù)據(jù)湖項(xiàng)目開始失敗,我們?cè)撊绾螒?yīng)對(duì)?