中文字幕在线观看,亚洲а∨天堂久久精品9966,亚洲成a人片在线观看你懂的,亚洲av成人片无码网站,亚洲国产精品无码久久久五月天

說(shuō)說(shuō) MQ 之 Kafka(三)

2018-10-31    來(lái)源:importnew

容器云強(qiáng)勢(shì)上線!快速搭建集群,上萬(wàn)Linux鏡像隨意使用

Kafka 副本和集群

在生產(chǎn)環(huán)境中,Kafka 總是以“集群+分區(qū)”方式運(yùn)行的,以保證可靠性和性能。下面是一個(gè)3副本的 Kafka 集群實(shí)例。

首先,需要啟動(dòng)3個(gè) Kafka Broker,Broker 的配置文件分別如下,

broker.id=0
listeners=PLAINTEXT://192.168.232.23:9092
log.dirs=/tmp/kafka-logs
broker.id=1
listeners=PLAINTEXT://192.168.232.23:9093
log.dirs=/tmp/kafka-logs-1
broker.id=1
listeners=PLAINTEXT://192.168.232.23:9094
log.dirs=/tmp/kafka-logs-2

雖然每個(gè) Broker 只配置了一個(gè)端口,實(shí)際上,Kafka 會(huì)多占用一個(gè),可能是用來(lái) Broker 之間的復(fù)制的。另外,3個(gè) Broker 都配置了,

zookeeper.connect=localhost:2181
delete.topic.enable=true

在同一個(gè) Zookeeper 上的 Broker 會(huì)被歸類到一個(gè)集群中。注意,這些配置中并沒(méi)有指定哪一個(gè) Broker 是主節(jié)點(diǎn),哪些 Broker 是從節(jié)點(diǎn),Kafka 采用的辦法是從可選的 Broker 中,選出每個(gè)分區(qū)的 Leader。也就是說(shuō),對(duì)某個(gè) Topic 來(lái)說(shuō),可能0節(jié)點(diǎn)是 Leader,另外一些 Topic,可能1節(jié)點(diǎn)是 Leader;甚至,如果 topic1 有2個(gè)分區(qū)的話,分區(qū)1的 Leader 是0節(jié)點(diǎn),分區(qū)2的 Leader 是1節(jié)點(diǎn)。

這種對(duì)等的設(shè)計(jì),對(duì)于故障恢復(fù)是十分有用的,在節(jié)點(diǎn)崩潰的時(shí)候,Kafka 會(huì)自動(dòng)選舉出可用的從節(jié)點(diǎn),將其升級(jí)為主節(jié)點(diǎn)。在崩潰的節(jié)點(diǎn)恢復(fù),加入集群之后,Kafka 又會(huì)將這個(gè)節(jié)點(diǎn)加入到可用節(jié)點(diǎn),并自動(dòng)選舉出新的主節(jié)點(diǎn)。

實(shí)驗(yàn)如下,先新建一個(gè)3副本,2分區(qū)的 Topic,

bin/kafka-topics.sh --create --zookeeper 192.168.232.23:2181 --replication-factor 3 --partitions 2 --topic topic1

初始狀況下,topic1 的狀態(tài)如下,

$ bin/kafka-topics.sh --describe --zookeeper 192.168.232.23:2181 --topic topic1
Topic:topic1    PartitionCount:2        ReplicationFactor:3     Configs:
        Topic: topic1   Partition: 0    Leader: 0       Replicas: 0,1,2 Isr: 0,1,2
        Topic: topic1   Partition: 1    Leader: 1       Replicas: 1,2,0 Isr: 1,2,0

對(duì)于上面的輸出,即使沒(méi)有文檔,也可以看懂大概:topic1 有2個(gè)分區(qū),Partition 0 和 Partition 1,Leader 分別在 Broker 0 和 1。Replicas 表示副本在哪些 Broker 上,Isr(In-Sync Replicas)表示處于同步狀態(tài)中的 Broker,如果有 Broker 宕機(jī)了,那么 Replicas 不會(huì)變,但是 Isr 會(huì)僅顯示沒(méi)有宕機(jī)的 Broker,詳見(jiàn)下面的實(shí)驗(yàn)。

然后分2個(gè)線程,運(yùn)行之前寫的 Producer 和 Consumer 的示例代碼,Producer 采用異步發(fā)送,消息采用同步復(fù)制。在有消息傳送的情況下,kill -9?停掉其中2個(gè) Broker(Broker 0 和 Broker 1),模擬突然宕機(jī)。此時(shí),topic1 狀態(tài)如下,

$ bin/kafka-topics.sh --describe --zookeeper 192.168.232.23:2181 --topic topic1
Topic:topic1    PartitionCount:2        ReplicationFactor:3     Configs:
        Topic: topic1   Partition: 0    Leader: 2       Replicas: 0,1,2 Isr: 2
        Topic: topic1   Partition: 1    Leader: 2       Replicas: 1,2,0 Isr: 2

可見(jiàn),Kafka 已經(jīng)選出了新的 Leader,消息傳送沒(méi)有中斷。接著再啟動(dòng)被停掉的那兩個(gè) Broker,并查看 topic1 的狀態(tài),如下,

$ bin/kafka-topics.sh --describe --zookeeper 192.168.232.23:2181 --topic topic1
Topic:topic1    PartitionCount:2        ReplicationFactor:3     Configs:
        Topic: topic1   Partition: 0    Leader: 2       Replicas: 0,1,2 Isr: 2,1,0
        Topic: topic1   Partition: 1    Leader: 2       Replicas: 1,2,0 Isr: 2,1,0
$ bin/kafka-topics.sh --describe --zookeeper 192.168.232.23:2181 --topic topic1
Topic:topic1    PartitionCount:2        ReplicationFactor:3     Configs:
        Topic: topic1   Partition: 0    Leader: 2       Replicas: 0,1,2 Isr: 2,1,0
        Topic: topic1   Partition: 1    Leader: 1       Replicas: 1,2,0 Isr: 2,1,0

可以發(fā)現(xiàn), 有一個(gè)短暫的時(shí)間,topic1 的兩個(gè)分區(qū)的 Leader 都是 Broker 2,但是在 Kafka 重新選舉之后,分區(qū)1的 Leader 變?yōu)?Broker 1。說(shuō)明 Kafka 傾向于用不同的 Broker 做分區(qū)的 Leader,這樣更能達(dá)到負(fù)載均衡的效果。

再來(lái)看看 Producer 和 Consumer 的日志,下面這個(gè)片段是2個(gè) Broker 宕機(jī)前后的日志,

......
Send     message: (00439, Message_00439) at offset 217 to partition(0) in 3 ms
Received message: (00438, Message_00438) at offset 216
Send     message: (00440, Message_00440) at offset 218 to partition(0) in 5 ms
Send     message: (00441, Message_00441) at offset 221 to partition(1) in 5 ms
Received message: (00441, Message_00441) at offset 221
Received message: (00439, Message_00439) at offset 217
Send     message: (00442, Message_00442) at offset 222 to partition(1) in 5 ms
Send     message: (00443, Message_00443) at offset 219 to partition(0) in 3 ms
Received message: (00440, Message_00440) at offset 218
Received message: (00443, Message_00443) at offset 219
org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.
org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.
org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.
org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.
org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.
org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.
org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.
Received message: (00442, Message_00442) at offset 222
Send     message: (00452, Message_00452) at offset 223 to partition(1) in 7492 ms
Send     message: (00454, Message_00454) at offset 224 to partition(1) in 7485 ms
Send     message: (00455, Message_00455) at offset 225 to partition(1) in 7482 ms
Send     message: (00458, Message_00458) at offset 226 to partition(1) in 7473 ms
Send     message: (00460, Message_00460) at offset 227 to partition(1) in 7467 ms
Send     message: (00461, Message_00461) at offset 228 to partition(1) in 7465 ms
Send     message: (00462, Message_00462) at offset 229 to partition(1) in 7462 ms
Send     message: (00463, Message_00463) at offset 230 to partition(1) in 7459 ms
Send     message: (00464, Message_00464) at offset 231 to partition(1) in 7456 ms
Send     message: (00465, Message_00465) at offset 232 to partition(1) in 7453 ms
......
Send     message: (01103, Message_01103) at offset 543 to partition(1) in 5478 ms
Received message: (00631, Message_00631) at offset 310
Received message: (00633, Message_00633) at offset 311
Send     message: (00451, Message_00451) at offset 220 to partition(0) in 7525 ms
Received message: (00634, Message_00634) at offset 312
Send     message: (00453, Message_00453) at offset 221 to partition(0) in 7518 ms
Received message: (00639, Message_00639) at offset 313
Send     message: (00456, Message_00456) at offset 222 to partition(0) in 7509 ms
Received message: (00641, Message_00641) at offset 314
Send     message: (00457, Message_00457) at offset 223 to partition(0) in 7506 ms
Received message: (00643, Message_00643) at offset 315
......

出現(xiàn)錯(cuò)誤的時(shí)候,Producer 拋出了?NetworkException?異常。其中有3589條 Received 日志,3583條 Send 日志,7條?NetworkException?異常日志,發(fā)送消息的最大序號(hào)是3590,接收消息的最大序號(hào)是3589,有以下幾個(gè)值得注意的地方,

  1. 宕機(jī)之前,消息的接收并不是順序的,這是因?yàn)?topic1 有2個(gè)分區(qū),Kafka 只保證分區(qū)上的有序;
  2. 宕機(jī)之后,出現(xiàn)了長(zhǎng)段的發(fā)送日志而沒(méi)有接收日志,說(shuō)明 Kafka 此時(shí)正在選舉,選舉的過(guò)程會(huì)阻塞消費(fèi)者;
  3. 從接收消息的條數(shù)和序號(hào)來(lái)看,所有的消息都收到了,沒(méi)有丟(沒(méi)有收到3590的消息可能是因?yàn)閺?qiáng)制退出 client 進(jìn)程的原因),發(fā)送的過(guò)程的7個(gè)異常應(yīng)該只是虛警,7條異常對(duì)應(yīng)序號(hào)444~450,3583條 Send 消息再加上這7條,與總消息3590條一致;

從這個(gè)實(shí)驗(yàn)中,可以看到,雖然 Kafka 不保證消息重復(fù)發(fā)送,但是卻在盡量保證沒(méi)有消息被重復(fù)發(fā)送,可能我的實(shí)驗(yàn)場(chǎng)景還不夠極端,沒(méi)有做出消息重復(fù)的情況。

如之前所說(shuō),如果要保持完全順序性,需要使用單分區(qū);如果要避免拋出?NetworkException?異常,就使用 Producer 同步發(fā)送。下面,我們重做上面的例子,不同之處是使用單分區(qū)和 Producer 同步發(fā)送,截取一段 Broker 宕機(jī)時(shí)的日志如下,

......
Sent message: (118, Message_00118)
Received message: (00118, Message_00118) at offset 117
Received message: (00119, Message_00119) at offset 118
Sent message: (119, Message_00119)
Sent message: (120, Message_00120)
Received message: (00120, Message_00120) at offset 119
Sent message: (121, Message_00121)
Received message: (00121, Message_00121) at offset 120
Sent message: (122, Message_00122)
Sent message: (123, Message_00123)
Sent message: (124, Message_00124)
Sent message: (125, Message_00125)
Sent message: (126, Message_00126)
Sent message: (127, Message_00127)
......

可見(jiàn),由于采用同步發(fā)送,Broker 宕機(jī)并沒(méi)有造成拋出異常,另外,由于使用單分區(qū),順序性也得到了保證,全局沒(méi)有出現(xiàn)亂序的情況。

綜上,是否使用多分區(qū)更多的是對(duì)順序性的要求,而使用 Producer 同步發(fā)送還是異步發(fā)送,更多是出于重復(fù)消息的考慮,如果異步發(fā)送拋出異常,在保證不丟消息的前提下,勢(shì)必要重發(fā)消息,這就會(huì)導(dǎo)致收到重復(fù)消息。多分區(qū)和 Producer 異步發(fā)送,會(huì)帶來(lái)性能的提升,但是也會(huì)引入非順序性,重復(fù)消息等問(wèn)題,如何取舍要看應(yīng)用的需求。

Kafka 最佳實(shí)踐

Kafka 在一些應(yīng)用場(chǎng)景中,有一些前人總結(jié)的最佳實(shí)踐?8?9。對(duì)最佳實(shí)踐,我的看法是,對(duì)于自己比較熟悉,有把握的部分,可以按自己的步驟進(jìn)行;對(duì)一些自己不清楚的領(lǐng)域,可以借鑒其中的一些內(nèi)容,至少不會(huì)錯(cuò)的特別厲害。有文章10說(shuō),Kafka 在分區(qū)比較多的時(shí)候,相應(yīng)時(shí)間會(huì)變長(zhǎng),這個(gè)現(xiàn)象值得在實(shí)踐中注意。

后記

在 Kafka 與 RocketMQ 的對(duì)比中,RocketMQ 的一個(gè)核心功能就是可以支持同步刷盤,此時(shí),即使突然斷電,也可以保證消息不丟;而 Kafka 采用的是異步刷盤,即使返回寫入成功,也只是寫入緩沖區(qū)成功,并非已經(jīng)持久化。因此,如果出現(xiàn)斷電或?kill -9?的情況,Kafka 內(nèi)存中的消息可能丟失。另外,同步刷盤的效率是比較低下的,一般生產(chǎn)中估計(jì)也不會(huì)使用,可以用優(yōu)雅關(guān)閉的方式來(lái)關(guān)閉進(jìn)程。如果不考慮這些極端情況的話,Kafka 基本是一個(gè)很可靠的消息中間件。

參考文章

  1. http://kafka.apache.org/documentation.html?
  2. http://www.jianshu.com/p/453c6e7ff81c?
  3. http://www.infoq.com/cn/author/%E9%83%AD%E4%BF%8A#文章?
  4. http://developer.51cto.com/art/201501/464491.htm?
  5. https://segmentfault.com/q/1010000004292925?
  6. http://www.cnblogs.com/gnivor/p/5318319.html?
  7. http://www.cnblogs.com/davidwang456/p/4313784.html?
  8. http://www.jianshu.com/p/8689901720fd?
  9. http://zqhxuyuan.github.io/2016/05/26/2016-05-13-Kafka-Book-Sample/?
  10. http://www.confluent.io/blog/how-to-choose-the-number-of-topicspartitions-in-a-kafka-cluster/?

標(biāo)簽: 代碼

版權(quán)申明:本站文章部分自網(wǎng)絡(luò),如有侵權(quán),請(qǐng)聯(lián)系:west999com@outlook.com
特別注意:本站所有轉(zhuǎn)載文章言論不代表本站觀點(diǎn)!
本站所提供的圖片等素材,版權(quán)歸原作者所有,如需使用,請(qǐng)與原作者聯(lián)系。

上一篇:Git內(nèi)部原理之Git引用

下一篇:說(shuō)說(shuō) MQ 之 Kafka(二)