中文字幕在线观看,亚洲а∨天堂久久精品9966,亚洲成a人片在线观看你懂的,亚洲av成人片无码网站,亚洲国产精品无码久久久五月天

RocketMQ 源碼學(xué)習(xí) 4 : 消息發(fā)送

2018-07-02    來源:importnew

容器云強勢上線!快速搭建集群,上萬Linux鏡像隨意使用

1. Client端,三種發(fā)送方式

RocketMQ 支持常見的三種發(fā)送方式,

  • SYNC
producer.send(msg)

同步的發(fā)送方式,會等待發(fā)送結(jié)果后才返回?梢杂 send(msg, timeout) 的方式指定等待時間,如果不指定,就是默認(rèn)的 3000ms. 這個timeout 最終會被設(shè)置到 ResponseFuture 里,再發(fā)送完消息后,用 countDownLatch 去 await timeout的時間,如果過期,就會拋出異常。

  • ASYNC
producer.send(msg, new SendCallback() {
    @Override
    public void onSuccess(SendResult sendResult) {
        System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId());
    }
    @Override
    public void onException(Throwable e) {
        System.out.printf("%-10d Exception %s %n", index, e);
        e.printStackTrace();
    }
});

異步的發(fā)送方式,發(fā)送完后,立刻返回。Client 在拿到 Broker 的響應(yīng)結(jié)果后,會回調(diào)指定的 callback. 這個 API 也可以指定 Timeout,不指定也是默認(rèn)的 3000ms.

  • ONEWAY
producer.sendOneway(msg);

比較簡單,發(fā)出去后,什么都不管直接返回。

對于每種方式,Producer 還提供了可以指定 MessageQueue, MessageQueueSelector的API,這屬于稍微高端一點的玩法,一般用它提供的默認(rèn)的策略選擇 MessageQueue 就可以了。

2. Client端發(fā)送過程

下面以 SYNC 方式為例,看下整個消息的發(fā)送過程,其他方式略有差異,總體流程類似。

1. 根據(jù) Topic 找到指定的 TopicPublishInfo

先去本地 map 找,如果沒有,就去 Namesrv fetch, 如果 Namesrv 里也沒有,則用默認(rèn)的 Topic 再去 fetch TopicRouteData. 對用用默認(rèn) Topic 的這種情況,Client 拿到數(shù)據(jù)后,會去構(gòu)建 TopicPublishInfo, 然后用當(dāng)前的 Topic 作為 key 放到本地 map 里。Broker 在接收到消息的時候,會去更新它本地的配置,然后在 registerBroker 的時候會去更新 namesrv 中的 TopicRouteData 信息,這樣 Namesrv 中就會有這樣一份配置了。當(dāng)然,也可以事先在 Namesrv 增加該配置,很多公司內(nèi)部都有這樣定制的平臺來管理MQ的接入配置。

public class TopicPublishInfo {
    private boolean orderTopic = false;
    private boolean haveTopicRouterInfo = false;
    private List<MessageQueue> messageQueueList = new ArrayList<MessageQueue>();
    private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex();
    private TopicRouteData topicRouteData;
}

public class TopicRouteData {
    private String orderTopicConf;
    private List<QueueData> queueDatas;
    private List<BrokerData> brokerDatas;
    private HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
}

QueueData 定義了這個 read 和 write 的 queue的數(shù)量,Client 在拿到 TopicRouteData 后,會根據(jù)這里配的數(shù)量去構(gòu)建響應(yīng)數(shù)目的messageQueue,即 messageQueueList. brokerDatas 保存了各個 broker 的相關(guān)信息。

2. 從 messageQueueList 中選擇一個 MessageQueue

如果沒有 enable latencyFaultTolerance,就用遞增取模的方式選擇。如果 enable 了,在遞增取模的基礎(chǔ)上,再過濾掉 not available 的。這里所謂的 latencyFaultTolerance, 是指對之前失敗的,按一定的時間做退避:

long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L};
long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};

舉個例子,如果上次請求的 latency 超過 550L ms, 就退避 3000L ms;超過 1000L,就退避 60000L.

以上就是 Producer 到 Broker 的簡單的負(fù)載均衡。

3. 發(fā)送消息

到這一步,我們已經(jīng)拿到了這些關(guān)鍵數(shù)據(jù):

  • Message, 要發(fā)送的消息
  • MessageQueue,這里面包括 topic/brokerName/queueId
  • CommunicationMode, 發(fā)送方式, SYNC/ASYNC/ONEWAY
  • TopicPublishInfo

有了這些數(shù)據(jù),就可以構(gòu)建 RequestHeader 了,大部分字段意思都很明顯(當(dāng)然,前提是對RocketMQ的源碼有所熟悉),個別字段見注釋。

requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
requestHeader.setTopic(msg.getTopic());
requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());
requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());
requestHeader.setQueueId(mq.getQueueId());
//系統(tǒng)Flag, 用于判斷走什么邏輯。標(biāo)識是否壓縮,事務(wù)的不同TYPE(prepare/rollback/commit/not transaction) 等
requestHeader.setSysFlag(sysFlag); 
requestHeader.setBornTimestamp(System.currentTimeMillis());
//消息Flag, 最終會落地
requestHeader.setFlag(msg.getFlag());
requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
requestHeader.setReconsumeTimes(0);
//TODO,暫不知道這個字段是干嘛用的
requestHeader.setUnitMode(this.isUnitMode());
requestHeader.setBatch(msg instanceof MessageBatch);

最后用這些 header 字段,以及 message body 構(gòu)建 RemotingCommand,通過 remoting 模塊發(fā)給 broker.

4. 處理結(jié)果

  • 發(fā)送成功:直接返回發(fā)送結(jié)果
  • 發(fā)送失。喝绻 enable retryAnotherBrokerWhenNotStoreOK,就會重試,默認(rèn)重試兩次(retryTimesWhenSendFailed)。否則直接返回結(jié)果
  • 發(fā)送異常:Producer 對異常做了很好的區(qū)分,如果是 Remoting 和 Client 模塊的異常,就重試,如果是 Broker 模塊的異常,根據(jù)不同的 response code 做不同的處理,有的重試,有的拋出異常,有的返回結(jié)果。

3. Broker端,消息的處理和落地


如圖,Broker 有很多 Processor 用來處理不同類型的請求,有些 Processor 會共用一個 Processor 線程池。對于消息發(fā)送,Broker 的 remoting 模塊在接收到請求后,根據(jù)request code,最終會交給 SendMessageProcessor 來處理。SendMessageProcessor 會依次做以下處理:

  • 做一些校驗,包括但不限于
    1. broker 是否可寫
    2. topic 配置是否存在,如果不存在就新建一個(createTopicInSendMessageMethod)
    3. 校驗 queueId 是否超過指定大小
  • 構(gòu)建 MessageExtBrokerInner
  • 將 MessageExtBrokerInner 交給 Store 處理
  • 處理 Store 返回的結(jié)果,BrokerStatsManager 做一些統(tǒng)計更新,設(shè)置 Response 中的一些字段并返回。

Store 收到消息后,會先做一些校驗,然后交給 commitLog 去 put,然后做些統(tǒng)計并返回。Store 存儲消息的過程比較復(fù)雜,后面會單獨分析。

4. 其他

1. 順序消息
很多應(yīng)用并不關(guān)注消息順序,而且消息沒有順序并不代表消息內(nèi)容沒有順序,合理的系統(tǒng)設(shè)計可以避免順序問題。MQ 要保證消息順序必然會損失性能、增加系統(tǒng)實現(xiàn)復(fù)雜度。具體的分析可以看?分布式開放消息系統(tǒng)(RocketMQ)的原理與實踐。

在 RocketMQ 里, 在發(fā)送消息的時候可以自己定義 MessageQueueSelector,對于同一個訂單ID(或其他ID)的不同消息,可以讓它走同一個 MessageQueue,這樣就可以按順序發(fā)給同一個 Broker 了。

2. Batch Message
Producer 的 API 還支持一次發(fā)多個消息。?

List<Message> messages = new ArrayList<>();
messages.add(new Message(topic, "Tag", "OrderID001", "Hello world 0".getBytes()));
messages.add(new Message(topic, "Tag", "OrderID002", "Hello world 1".getBytes()));

producer.send(messages);

Client 模塊會將 Message List 封裝成 MessageBatch,且會標(biāo)記 requestHeader 的 batch 標(biāo)志位為 true. Broker 在接收到消息后就可以根據(jù)這個標(biāo)志位去做不同的處理。

5. Reference

  • RocketMQ 原理簡介
  • 分布式開放消息系統(tǒng)(RocketMQ)的原理與實踐

標(biāo)簽:

版權(quán)申明:本站文章部分自網(wǎng)絡(luò),如有侵權(quán),請聯(lián)系:west999com@outlook.com
特別注意:本站所有轉(zhuǎn)載文章言論不代表本站觀點!
本站所提供的圖片等素材,版權(quán)歸原作者所有,如需使用,請與原作者聯(lián)系。

上一篇:深入Spring Boot :怎樣排查 java.lang.ArrayStoreException

下一篇:linux 重要日志說明