中文字幕在线观看,亚洲а∨天堂久久精品9966,亚洲成a人片在线观看你懂的,亚洲av成人片无码网站,亚洲国产精品无码久久久五月天

kafka源碼分析3 : Producer

2018-07-02    來源:importnew

容器云強勢上線!快速搭建集群,上萬Linux鏡像隨意使用

Producer

Producer是生產(chǎn)者的接口定義
常用的方法有

public Future<RecordMetadata> send(ProducerRecord<K, V> record);
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback);
public void flush();
public void close();

KafkaProducer是異步的,調(diào)用send方法后,kafka并沒有立即發(fā)送給broker,而是先放在buffer緩沖池中就立即返回,后臺的IO線程來負責(zé)把消息記錄轉(zhuǎn)換成請求發(fā)送給kafka集群。

buffer大小通過batch.size配置置頂,producer維護每個partition的沒有發(fā)送記錄的buffer。
默認情況下不滿的buffer也是可以發(fā)送的,可以通過linger.ms來設(shè)置等待時間減少請求數(shù)量,跟TCP中的Nagle算法是一個道理。
producer的總的buffer大小可以通過buffer.memory控制,如果生產(chǎn)太快來不及發(fā)送超過了這個值則會block住,block的最大時間通過max.block.ms,超時后會拋出TimeoutException
key.serializevalue.serializer控制如何把Java對象轉(zhuǎn)換成byte數(shù)組傳輸給kafka集群。
acks控制producer什么時候認為寫成功了,數(shù)量是需要leader獲得的ack的數(shù)量。acks=0時producer把消息記錄放到socket buffer中就認為成功了;acks=1時,需要leader成功寫到本地就返回,但是不需要等待follower的ack。acks=all是,需要所有的in-sync replica都返回ack才認為是發(fā)送成功,這樣只要有一個in-sync replica存活消息就沒有丟。

Partitioner負責(zé)決定將哪一個消息寫入到哪一個partition, 有一些場景希望特定的key發(fā)送到特定的partition時可以指定自己實現(xiàn)的Paritioner。
默認的Partitioner是隨機負載均衡的。

public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
       List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
       int numPartitions = partitions.size();
       if (keyBytes == null) {
           int nextValue = nextValue(topic);
           List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
           if (!availablePartitions.isEmpty()) {
               int part = Utils.toPositive(nextValue) % availablePartitions.size();
               return availablePartitions.get(part).partition();
           } else {
               // no partitions are available, give a non-available partition
               return Utils.toPositive(nextValue) % numPartitions;
           }
       } else {
           // hash the keyBytes to choose a partition
           return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
       }
   }
   private int nextValue(String topic) {
       AtomicInteger counter = topicCounterMap.get(topic);
       if (null == counter) {
           counter = new AtomicInteger(ThreadLocalRandom.current().nextInt());
           AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, counter);
           if (currentCounter != null) {
               counter = currentCounter;
           }
       }
       return counter.getAndIncrement();
   }

ProducerRecord

ProducerRecord包含了發(fā)送給Broker需要的內(nèi)容

class ProducerRecord<K, V> {
    private final String topic;
    private final Integer partition;
    private final Headers headers;
    private final K key;
    private final V value;
    private final Long timestamp;
}

KafkaProducer構(gòu)建過程

// 創(chuàng)建partitioner
this.partitioner = config.getConfiguredInstance(ProducerConfig.PARTITIONER_CLASS_CONFIG, Partitioner.class);
long retryBackoffMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG);
// 配置序列化
if (keySerializer == null) {
    this.keySerializer = ensureExtended(config.getConfiguredInstance(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                                                                             Serializer.class));
    this.keySerializer.configure(config.originals(), true);
} else {
    config.ignore(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG);
    this.keySerializer = ensureExtended(keySerializer);
}
if (valueSerializer == null) {
    this.valueSerializer = ensureExtended(config.getConfiguredInstance(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                                                                               Serializer.class));
    this.valueSerializer.configure(config.originals(), false);
} else {
    config.ignore(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);
    this.valueSerializer = ensureExtended(valueSerializer);
}
// load interceptors and make sure they get clientId
userProvidedConfigs.put(ProducerConfig.CLIENT_ID_CONFIG, clientId);
List<ProducerInterceptor<K, V>> interceptorList = (List) (new ProducerConfig(userProvidedConfigs, false)).getConfiguredInstances(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
        ProducerInterceptor.class);
this.interceptors = interceptorList.isEmpty() ? null : new ProducerInterceptors<>(interceptorList);
ClusterResourceListeners clusterResourceListeners = configureClusterResourceListeners(keySerializer, valueSerializer, interceptorList, reporters);
this.metadata = new Metadata(retryBackoffMs, config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG),
        true, true, clusterResourceListeners);
this.maxRequestSize = config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG);
this.totalMemorySize = config.getLong(ProducerConfig.BUFFER_MEMORY_CONFIG);
this.compressionType = CompressionType.forName(config.getString(ProducerConfig.COMPRESSION_TYPE_CONFIG));
this.maxBlockTimeMs = config.getLong(ProducerConfig.MAX_BLOCK_MS_CONFIG);
this.requestTimeoutMs = config.getInt(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG);
this.transactionManager = configureTransactionState(config);
int retries = configureRetries(config, transactionManager != null);
int maxInflightRequests = configureInflightRequests(config, transactionManager != null);
short acks = configureAcks(config, transactionManager != null);
this.apiVersions = new ApiVersions();
// RecordAccumulator中實現(xiàn)了累加和等待的邏輯
this.accumulator = new RecordAccumulator(config.getInt(ProducerConfig.BATCH_SIZE_CONFIG),
        this.totalMemorySize,
        this.compressionType,
        config.getLong(ProducerConfig.LINGER_MS_CONFIG),
        retryBackoffMs,
        metrics,
        time,
        apiVersions,
        transactionManager);
List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
this.metadata.update(Cluster.bootstrap(addresses), Collections.<String>emptySet(), time.milliseconds());
ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config);
Sensor throttleTimeSensor = Sender.throttleTimeSensor(metrics);
// 高層的網(wǎng)絡(luò)處理,封裝了send、poll等接口
NetworkClient client = new NetworkClient(
        new Selector(config.getLong(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG),
                this.metrics, time, "producer", channelBuilder),
        this.metadata,
        clientId,
        maxInflightRequests,
        config.getLong(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG),
        config.getLong(ProducerConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG),
        config.getInt(ProducerConfig.SEND_BUFFER_CONFIG),
        config.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG),
        this.requestTimeoutMs,
        time,
        true,
        apiVersions,
        throttleTimeSensor);
// 負責(zé)實際發(fā)送請求給kafka集群的后臺線程
this.sender = new Sender(client,
        this.metadata,
        this.accumulator,
        maxInflightRequests == 1,
        config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG),
        acks,
        retries,
        this.metrics,
        Time.SYSTEM,
        this.requestTimeoutMs,
        config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG),
        this.transactionManager,
        apiVersions);
String ioThreadName = NETWORK_THREAD_PREFIX + (clientId.length() > 0 ? " | " + clientId : "");
this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
this.ioThread.start();
this.errors = this.metrics.sensor("errors");
config.logUnused();
AppInfoParser.registerAppInfo(JMX_PREFIX, clientId);
log.debug("Kafka producer started");

KafkaProducer#send

入口在doSend(interceptedRecord, callback);

// 獲取cluster信息, 來得到對應(yīng)topic的cluster節(jié)點信息
ClusterAndWaitTime clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), maxBlockTimeMs);
long remainingWaitMs = Math.max(0, maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs);
   Cluster cluster = clusterAndWaitTime.cluster;
   byte[] serializedKey;
   try {
       serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());
   } catch (ClassCastException cce) {
       throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() +
               " to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() +
               " specified in key.serializer");
   }
   byte[] serializedValue;
   try {
       serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());
   } catch (ClassCastException cce) {
       throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() +
               " to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() +
               " specified in value.serializer");
   }
   // 找到對應(yīng)的partition
   int partition = partition(record, serializedKey, serializedValue, cluster);
   tp = new TopicPartition(record.topic(), partition);
   setReadOnly(record.headers());
   Header[] headers = record.headers().toArray();
   int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(apiVersions.maxUsableProduceMagic(),
           compressionType, serializedKey, serializedValue, headers);
   ensureValidRecordSize(serializedSize);
   long timestamp = record.timestamp() == null ? time.milliseconds() : record.timestamp();
   log.trace("Sending record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition);
   // producer callback will make sure to call both 'callback' and interceptor callback
   Callback interceptCallback = this.interceptors == null ? callback : new InterceptorCallback<>(callback, this.interceptors, tp);
   if (transactionManager != null && transactionManager.isTransactional())
       transactionManager.maybeAddPartitionToTransaction(tp);
   // 追加到RecordAccumulator中
   RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
           serializedValue, headers, interceptCallback, remainingWaitMs);
   if (result.batchIsFull || result.newBatchCreated) {
       log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
       this.sender.wakeup();
   }
   return result.future;

RecordAccumulator

使用雙端隊列Deque保存ProducerBatch

// We keep track of the number of appending thread to make sure we do not miss batches in
   // abortIncompleteBatches().
   appendsInProgress.incrementAndGet();
   ByteBuffer buffer = null;
   if (headers == null) headers = Record.EMPTY_HEADERS;
   try {
       // check if we have an in-progress batch
       // 獲取或創(chuàng)建對應(yīng)TopicPartition的隊列
       Deque<ProducerBatch> dq = getOrCreateDeque(tp);
       synchronized (dq) {
           if (closed)
               throw new IllegalStateException("Cannot send after the producer is closed.");
           // 如果最后一個節(jié)點能加入就加入返回
           RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq);
           if (appendResult != null)
               return appendResult;
       }
       // 加入不了就要新申請一個
       // we don't have an in-progress record batch try to allocate a new batch
       byte maxUsableMagic = apiVersions.maxUsableProduceMagic();
       int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers));
       log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition());
       buffer = free.allocate(size, maxTimeToBlock);
       synchronized (dq) {
           // Need to check if producer is closed again after grabbing the dequeue lock.
           if (closed)
               throw new IllegalStateException("Cannot send after the producer is closed.");
           // 這兩個同步塊間可能有其他線程已經(jīng)創(chuàng)建了下一個Batch
           RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq);
           if (appendResult != null) {
               // Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often...
               return appendResult;
           }
           MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, maxUsableMagic);
           ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, time.milliseconds());
           FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, headers, callback, time.milliseconds()));
           dq.addLast(batch);
           incomplete.add(batch);
           // Don't deallocate this buffer in the finally block as it's being used in the record batch
           buffer = null;
           return new RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true);
       }
   } finally {
       if (buffer != null)
           free.deallocate(buffer);
       appendsInProgress.decrementAndGet();
   }

Sender

Sender是一個后臺線程, 不考慮事務(wù)的話,只分為senProducerDat和poll, poll中等待處理返回結(jié)果

void run(long now) {
       if (transactionManager != null) {
           if (!transactionManager.isTransactional()) {
               // this is an idempotent producer, so make sure we have a producer id
               maybeWaitForProducerId();
           } else if (transactionManager.hasInFlightRequest() || maybeSendTransactionalRequest(now)) {
               // as long as there are outstanding transactional requests, we simply wait for them to return
               client.poll(retryBackoffMs, now);
               return;
           }
           // do not continue sending if the transaction manager is in a failed state or if there
           // is no producer id (for the idempotent case).
           if (transactionManager.hasFatalError() || !transactionManager.hasProducerId()) {
               RuntimeException lastError = transactionManager.lastError();
               if (lastError != null)
                   maybeAbortBatches(lastError);
               client.poll(retryBackoffMs, now);
               return;
           } else if (transactionManager.hasAbortableError()) {
               accumulator.abortUndrainedBatches(transactionManager.lastError());
           }
       }
       long pollTimeout = sendProducerData(now);
       client.poll(pollTimeout, now);
   }
private long sendProducerData(long now) {
//
Cluster cluster = metadata.fetch();
// 獲取準備好發(fā)送的數(shù)據(jù),包括各個TopicParition的隊列,其中隊列長度大于1、第一個batch滿了、沒有緩存buffer空間了、正在關(guān)閉、在調(diào)用flush都會刷新待發(fā)送數(shù)據(jù)。
// get the list of partitions with data ready to send
RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);

    // if there are any partitions whose leaders are not known yet, force metadata update
    if (!result.unknownLeaderTopics.isEmpty()) {
        // The set of topics with unknown leader contains topics with leader election pending as well as
        // topics which may have expired. Add the topic again to metadata to ensure it is included
        // and request metadata update, since there are messages to send to the topic.
        for (String topic : result.unknownLeaderTopics)
            this.metadata.add(topic);
        this.metadata.requestUpdate();
    }

    // remove any nodes we aren't ready to send to
    Iterator<Node> iter = result.readyNodes.iterator();
    long notReadyTimeout = Long.MAX_VALUE;
    while (iter.hasNext()) {
        Node node = iter.next();
        if (!this.client.ready(node, now)) {
            iter.remove();
            notReadyTimeout = Math.min(notReadyTimeout, this.client.connectionDelay(node, now));
        }
    }

    // 從隊列中取出
    // create produce requests
    Map<Integer, List<ProducerBatch>> batches = this.accumulator.drain(cluster, result.readyNodes,
            this.maxRequestSize, now);
    if (guaranteeMessageOrder) {
        // Mute all the partitions drained
        for (List<ProducerBatch> batchList : batches.values()) {
            for (ProducerBatch batch : batchList)
                this.accumulator.mutePartition(batch.topicPartition);
        }
    }

    List<ProducerBatch> expiredBatches = this.accumulator.expiredBatches(this.requestTimeout, now);
    boolean needsTransactionStateReset = false;
    // Reset the producer id if an expired batch has previously been sent to the broker. Also update the metrics
    // for expired batches. see the documentation of @TransactionState.resetProducerId to understand why
    // we need to reset the producer id here.
    if (!expiredBatches.isEmpty())
        log.trace("Expired {} batches in accumulator", expiredBatches.size());
    for (ProducerBatch expiredBatch : expiredBatches) {
        failBatch(expiredBatch, -1, NO_TIMESTAMP, expiredBatch.timeoutException());
        if (transactionManager != null && expiredBatch.inRetry()) {
            needsTransactionStateReset = true;
        }
        this.sensors.recordErrors(expiredBatch.topicPartition.topic(), expiredBatch.recordCount);
    }

    if (needsTransactionStateReset) {
        transactionManager.resetProducerId();
        return 0;
    }

    sensors.updateProduceRequestMetrics(batches);

    // If we have any nodes that are ready to send + have sendable data, poll with 0 timeout so this can immediately
    // loop and try sending more data. Otherwise, the timeout is determined by nodes that have partitions with data
    // that isn't yet sendable (e.g. lingering, backing off). Note that this specifically does not include nodes
    // with sendable data that aren't ready to send since they would cause busy looping.
    long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout);
    if (!result.readyNodes.isEmpty()) {
        log.trace("Nodes with data ready to send: {}", result.readyNodes);
        // if some partitions are already ready to be sent, the select time would be 0;
        // otherwise if some partition already has some data accumulated but not ready yet,
        // the select time will be the time difference between now and its linger expiry time;
        // otherwise the select time will be the time difference between now and the metadata expiry time;
        pollTimeout = 0;
    }
    sendProduceRequests(batches, now);

    return pollTimeout;
}

 

標簽: 網(wǎng)絡(luò)

版權(quán)申明:本站文章部分自網(wǎng)絡(luò),如有侵權(quán),請聯(lián)系:west999com@outlook.com
特別注意:本站所有轉(zhuǎn)載文章言論不代表本站觀點!
本站所提供的圖片等素材,版權(quán)歸原作者所有,如需使用,請與原作者聯(lián)系。

上一篇:記一次內(nèi)存溢出的分析經(jīng)歷 — thrift帶給我的痛

下一篇:深入探索并發(fā)編程系列1 : 鎖不慢;鎖競爭慢