中文字幕在线观看,亚洲а∨天堂久久精品9966,亚洲成a人片在线观看你懂的,亚洲av成人片无码网站,亚洲国产精品无码久久久五月天

Kafka 源碼分析 5 :KafkaConsumer 消費(fèi)處理

2018-07-02    來源:importnew

容器云強(qiáng)勢上線!快速搭建集群,上萬Linux鏡像隨意使用

Kafka消費(fèi)者客戶端從Kafka cluster中讀取消息并處理。

Kafka消費(fèi)者可以手動綁定自己到某個topic的某些partition上或者通過subscribe方法監(jiān)聽某個topic自動綁定。Kafka消費(fèi)者綁定到某個parition后就和這個partition的leader連接,然后發(fā)出fetch request, 獲取消息后進(jìn)行處理。

offset管理

kafka的消費(fèi)模型是一個partition最多被一個consumer消費(fèi),而offset可以有consumer控制,例如通過seek前進(jìn)或后退到某個offset位置。

首次連接時,可以通過KafkaConsumer配置參數(shù)里的auto.offset.reset參數(shù)決定是從最新的位置(默認(rèn))還是從就早的位置開始消費(fèi)。

默認(rèn)情況下, enable.auto.commit參數(shù)是true,即KafkaConsumer客戶端會定時commit offset,所有要注意的一點(diǎn)是如果poll函數(shù)得到ConsumerRecords后如果處理是異步的,則可能出現(xiàn)消費(fèi)處理還沒有完成但是卻commit offset了,這時如果進(jìn)程掛掉則重啟后則會發(fā)生丟消息的情況。這里有兩種解決方案,1是poll后的處理是同步的,這樣下一次poll會嘗試commit offset,則能保證at least one語義。2是關(guān)閉enable.auto.commit, 然后通過KafkaConsumer.commitSync方法來手動commit offset。

max.poll.interval.ms參數(shù)用于設(shè)置kafka消費(fèi)者處理一次poll的消費(fèi)結(jié)果的最大時間(默認(rèn)300s),如果超過了這個時間則consumer被認(rèn)為掛了會重新rebalance。

Consumer線程相關(guān)

消費(fèi)者多線程處理有幾種方式

  1. 每個consumer只由一個線程處理,優(yōu)點(diǎn)是能保證partition內(nèi)有序和實(shí)現(xiàn)簡單,缺點(diǎn)是并發(fā)能力受限于partition的數(shù)量
  2. 將consumption和process過程分離,即consumer拉到一個消息后傳遞給另一個線程或線程池處理,這里提高了并發(fā)能力但是需要注意多線程處理中的順序問題不再保證以及可能出現(xiàn)consumer提交了offset而線程池沒處理完的情況,另外線程池要注意處理慢導(dǎo)致的內(nèi)存隊列積壓問題。

KafkaConsumer.subscribe

監(jiān)聽某個topic

subscribe(Collection topics, ConsumerRebalanceListener listener)
當(dāng)消費(fèi)者使用kafka cluster來管理group membership時,ConsumerRebalanceListener會在consumer rebalance時調(diào)用,consumer rebalance發(fā)生在消費(fèi)者或消費(fèi)關(guān)系變化的時候

  • 某個消費(fèi)進(jìn)程掛掉
  • 新消費(fèi)進(jìn)程加入
  • partition數(shù)量發(fā)生變化時

這個Listener的常見用途是保存這個partition的最新消費(fèi)offset,在void onPartitionsRevoked(java.util.Collection<TopicPartition> partitions)里保存當(dāng)前的partition和offset到數(shù)據(jù)庫中。然后reassign完成后,void onPartitionsAssigned(java.util.Collection partitions)中從數(shù)據(jù)庫讀取之前的消費(fèi)位置,通過seek方法設(shè)置消費(fèi)位置繼續(xù)消費(fèi)。

Kafka.poll

public ConsumerRecords<K, V> poll(long timeout) {
		// KafkaConsumer不是線程安全的
       acquireAndEnsureOpen();
       try {
           if (timeout < 0)
               throw new IllegalArgumentException("Timeout must not be negative");
           if (this.subscriptions.hasNoSubscriptionOrUserAssignment())
               throw new IllegalStateException("Consumer is not subscribed to any topics or assigned any partitions");
           // poll for new data until the timeout expires
           long start = time.milliseconds();
           long remaining = timeout;
           do {
               Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollOnce(remaining);
               if (!records.isEmpty()) {
                   // before returning the fetched records, we can send off the next round of fetches
                   // and avoid block waiting for their responses to enable pipelining while the user
                   // is handling the fetched records.
                   //
                   // NOTE: since the consumed position has already been updated, we must not allow
                   // wakeups or any other errors to be triggered prior to returning the fetched records.
                   if (fetcher.sendFetches() > 0 || client.hasPendingRequests())
                       client.pollNoWakeup();
                   if (this.interceptors == null)
                       return new ConsumerRecords<>(records);
                   else
                       return this.interceptors.onConsume(new ConsumerRecords<>(records));
               }
               long elapsed = time.milliseconds() - start;
               remaining = timeout - elapsed;
           } while (remaining > 0);
           return ConsumerRecords.empty();
       } finally {
           release();
       }
   }

pollOnce處理

private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollOnce(long timeout) {
        client.maybeTriggerWakeup();
        // 協(xié)調(diào)者進(jìn)行一次poll,里面會根據(jù)auto.commit.interval.ms決定是否自動提交offset
        coordinator.poll(time.milliseconds(), timeout);
        // fetch positions if we have partitions we're subscribed to that we
        // don't know the offset for
        if (!subscriptions.hasAllFetchPositions())
            updateFetchPositions(this.subscriptions.missingFetchPositions());
        // 如果已經(jīng)有record數(shù)據(jù)了直接返回
        // if data is available already, return it immediately
        Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchedRecords();
        if (!records.isEmpty())
            return records;
        // 發(fā)送一次fetch請求
        // send any new fetches (won't resend pending fetches)
        fetcher.sendFetches();
        long now = time.milliseconds();
        long pollTimeout = Math.min(coordinator.timeToNextPoll(now), timeout);
        // 等待fetch請求結(jié)果
        client.poll(pollTimeout, now, new PollCondition() {
            @Override
            public boolean shouldBlock() {
                // since a fetch might be completed by the background thread, we need this poll condition
                // to ensure that we do not block unnecessarily in poll()
                return !fetcher.hasCompletedFetches();
            }
        });
        // after the long poll, we should check whether the group needs to rebalance
        // prior to returning data so that the group can stabilize faster
        if (coordinator.needRejoin())
            return Collections.emptyMap();
        // 返回fetch結(jié)果
        return fetcher.fetchedRecords();
    }

標(biāo)簽: 安全 數(shù)據(jù)庫

版權(quán)申明:本站文章部分自網(wǎng)絡(luò),如有侵權(quán),請聯(lián)系:west999com@outlook.com
特別注意:本站所有轉(zhuǎn)載文章言論不代表本站觀點(diǎn)!
本站所提供的圖片等素材,版權(quán)歸原作者所有,如需使用,請與原作者聯(lián)系。

上一篇:kafka源碼分析4 : broker處理生產(chǎn)請求

下一篇:MySQL的變量分類總結(jié)