中文字幕在线观看,亚洲а∨天堂久久精品9966,亚洲成a人片在线观看你懂的,亚洲av成人片无码网站,亚洲国产精品无码久久久五月天

Kafka 源碼分析2 : Network相關(guān)

2018-07-02    來源:importnew

容器云強(qiáng)勢上線!快速搭建集群,上萬Linux鏡像隨意使用

背景

我們直接跑到最底層,看看kafka的網(wǎng)絡(luò)層處理是怎么處理的。因為Java的NIO還是偏底層,不能直接用來做應(yīng)用開發(fā),所以一般都使用像netty的框架或者按照自己的需要封裝一些nio,讓上層業(yè)務(wù)不用關(guān)心網(wǎng)絡(luò)處理的細(xì)節(jié),只需要創(chuàng)建服務(wù)監(jiān)聽端口、接受請求、處理請求、寫返回就可以了。我在看netty、thrift等涉及到網(wǎng)絡(luò)的Java框架時比較喜歡去看他們的nio是怎么封裝的,這里也是能夠體現(xiàn)作者水平的地方。java nio的基本元素為Selector、Channel、ByteBuffer。
我們從server和client兩端分別分析。

kafka server端在org.apache.kafka.common.network中進(jìn)行了封裝。
就像package.html里面寫的。

>
The network server for kafka. No application specific code here, just general network server stuff.

The classes Receive and Send encapsulate the incoming and outgoing transmission of bytes. A Handler
is a mapping between a Receive and a Send, and represents the users hook to add logic for mapping requests
to actual processing code. Any uncaught exceptions in the reading or writing of the transmissions will result in
the server logging an error and closing the offending socket. As a result it is the duty of the Handler
implementation to catch and serialize any application-level errors that should be sent to the client.

This slightly lower-level interface that models sending and receiving rather than requests and responses
is necessary in order to allow the send or receive to be overridden with a non-user-space writing of bytes
using FileChannel.transferTo.

啟動過程

網(wǎng)絡(luò)層的啟動在SocketServer.kafka中, 屬于KafkaServer啟動過程中的一部分
首先看一下server.properties中的網(wǎng)絡(luò)相關(guān)配置

  • listener就是本地的hostname和端口號, 沒有的話會使用InetAddress和默認(rèn)值(9092)
  • num.network.threads 類比netty中的worker threads num,是負(fù)責(zé)處理請求的線程的數(shù)量,nio的reactor模式一般是前面有一個Acceptor負(fù)責(zé)連接的建立,建立后Reactor將各種讀寫事件分發(fā)給各個Handler處理,這個num是分發(fā)處理讀寫事件的io的線程數(shù)。
  • num.io.threads 就是配置的Handler的數(shù)量,每個Handler一般都是一個線程(也叫IOThread)來處理。

  • queued.max.requests 在Handler處理完成前能夠排隊的request的數(shù)量,相當(dāng)于應(yīng)用層的request buffer
  • socket.send.buffer.bytes socket options里的sendbuffer
  • socket.receive.buffer.bytes receive buffer
  • socket.request.max.bytes 請求的最大的byte大小,因為接受請求時需要申請空間來存儲請求,如果太大會導(dǎo)致oom,這是一個保護(hù)措施。
# The number of threads that the server uses for receiving requests from the network and sending responses to the network
num.network.threads=3
# The number of threads that the server uses for processing requests, which may include disk I/O
num.io.threads=8
# The number of queued request allowed before blocking the network threads
#queued.max.requests
# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400
# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400
# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600

SocketServer

這個類上的注釋闡述了kafka server的io線程模型

這個類上的注釋闡述了kafka server的io線程模型

/**
 * An NIO socket server. The threading model is
 *   1 Acceptor thread that handles new connections
 *   Acceptor has N Processor threads that each have their own selector and read requests from sockets
 *   M Handler threads that handle requests and produce responses back to the processor threads for writing.
 */

一共三種線程。一個Acceptor線程負(fù)責(zé)處理新連接請求,會有N個Processor線程,每個都有自己的Selector,負(fù)責(zé)從socket中讀取請求和將返回結(jié)果寫回。然后會有M個Handler線程,負(fù)責(zé)處理請求,并且將結(jié)果返回給Processor。
將Acceptor和Processor線程分開的目的是為了避免讀寫頻繁影響新連接的接收。

SocketServer初始化

SockerServer創(chuàng)建的時候通過server.properties和默認(rèn)的配置中獲取配置,如numNetworkThread(num.network.threads,也就是線程模型中的N)、

創(chuàng)建processor數(shù)組、acceptorMap(因為可能會在多個Endpoint接收請求)、memoryPool(SimpleMemoryPool里主要做的事情是統(tǒng)計監(jiān)控ByteBuffer的使用)、requestChanne等 。

private val endpoints = config.listeners.map(l => l.listenerName -> l).toMap
private val numProcessorThreads = config.numNetworkThreads
private val maxQueuedRequests = config.queuedMaxRequests
private val totalProcessorThreads = numProcessorThreads * endpoints.size
private val maxConnectionsPerIp = config.maxConnectionsPerIp
private val maxConnectionsPerIpOverrides = config.maxConnectionsPerIpOverrides
this.logIdent = "[Socket Server on Broker " + config.brokerId + "], "
private val memoryPoolSensor = metrics.sensor("MemoryPoolUtilization")
private val memoryPoolDepletedPercentMetricName = metrics.metricName("MemoryPoolAvgDepletedPercent", "socket-server-metrics")
memoryPoolSensor.add(memoryPoolDepletedPercentMetricName, new Rate(TimeUnit.MILLISECONDS))
private val memoryPool = if (config.queuedMaxBytes > 0) new SimpleMemoryPool(config.queuedMaxBytes, config.socketRequestMaxBytes, false, memoryPoolSensor) else MemoryPool.NONE
val requestChannel = new RequestChannel(totalProcessorThreads, maxQueuedRequests)
private val processors = new Array[Processor](totalProcessorThreads)
private[network] val acceptors = mutable.Map[EndPoint, Acceptor]()
private var connectionQuotas: ConnectionQuotas = _

RequestChannel

因為Nio帶來的異步特性,就是在一個連接上可以連續(xù)發(fā)送多個應(yīng)用層的請求,每個請求得到是一個返回的Future。RequestChannel中將請求和返回結(jié)果放在各自的BlockingQueue中,也就是requestQueue和responseQueue,這里的request指客戶端發(fā)來的請求。requestQueue的大小是queued.max.requests定義的,默認(rèn)500。而每個RequestChannel中有numProcessor大小個responseQueue(無界的LinkedBlockingQueue)。
這樣Handler從requestQueue中取request處理得到response然后put到responseQueue中。Processor則把接收到的byte轉(zhuǎn)換成requestput到requestQueue中,并從responseQueue中拉response寫回給對應(yīng)的socket。

startup

startup中創(chuàng)建Processor、Acceptor。創(chuàng)建connectionQuotas, 限制每個客戶端ip的最大連接數(shù)。

  val sendBufferSize = config.socketSendBufferBytes
  val recvBufferSize = config.socketReceiveBufferBytes
  val brokerId = config.brokerId
  var processorBeginIndex = 0
  config.listeners.foreach { endpoint =>
    val listenerName = endpoint.listenerName
    val securityProtocol = endpoint.securityProtocol
    val processorEndIndex = processorBeginIndex + numProcessorThreads
    for (i <- processorBeginIndex until processorEndIndex)
      processors(i) = newProcessor(i, connectionQuotas, listenerName, securityProtocol, memoryPool)
    val acceptor = new Acceptor(endpoint, sendBufferSize, recvBufferSize, brokerId,
      processors.slice(processorBeginIndex, processorEndIndex), connectionQuotas)
    acceptors.put(endpoint, acceptor)
    KafkaThread.nonDaemon(s"kafka-socket-acceptor-$listenerName-$securityProtocol-${endpoint.port}", acceptor).start()
    acceptor.awaitStartup()
    processorBeginIndex = processorEndIndex
  }
}

Acceptor創(chuàng)建過程中啟動了Processor線程。

/**
 * Thread that accepts and configures new connections. There is one of these per endpoint.
 */
private[kafka] class Acceptor(val endPoint: EndPoint,
                              val sendBufferSize: Int,
                              val recvBufferSize: Int,
                              brokerId: Int,
                              processors: Array[Processor],
                              connectionQuotas: ConnectionQuotas) extends AbstractServerThread(connectionQuotas) with KafkaMetricsGroup {
  private val nioSelector = NSelector.open()
  val serverChannel = openServerSocket(endPoint.host, endPoint.port)
  this.synchronized {
    processors.foreach { processor =>
      KafkaThread.nonDaemon(s"kafka-network-thread-$brokerId-${endPoint.listenerName}-${endPoint.securityProtocol}-${processor.id}",
        processor).start()
    }
  }

Acceptor和Processor啟動后各自執(zhí)行自己的loop。

Acceptor只負(fù)責(zé)接收新連接,并采用round-robin的方式交給各個Processor

/**
  * Accept loop that checks for new connection attempts
  */
 def run() {
   serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT)
   startupComplete()
   try {
     var currentProcessor = 0
     while (isRunning) {
       try {
         val ready = nioSelector.select(500)
         if (ready > 0) {
           val keys = nioSelector.selectedKeys()
           val iter = keys.iterator()
           while (iter.hasNext && isRunning) {
             try {
               val key = iter.next
               iter.remove()
               if (key.isAcceptable)
                 accept(key, processors(currentProcessor))
               else
                 throw new IllegalStateException("Unrecognized key state for acceptor thread.")
               // round robin to the next processor thread
               currentProcessor = (currentProcessor + 1) % processors.length
             } catch {
               case e: Throwable => error("Error while accepting connection", e)
             }
           }
         }
       }
       catch {
         // We catch all the throwables to prevent the acceptor thread from exiting on exceptions due
         // to a select operation on a specific channel or a bad request. We don't want
         // the broker to stop responding to requests from other clients in these scenarios.
         case e: ControlThrowable => throw e
         case e: Throwable => error("Error occurred", e)
       }
     }
   } finally {
     debug("Closing server socket and selector.")
     swallowError(serverChannel.close())
     swallowError(nioSelector.close())
     shutdownComplete()
   }
 }
Acceptor接收配置socket并傳給processor
/*
   * Accept a new connection
   */
  def accept(key: SelectionKey, processor: Processor) {
    val serverSocketChannel = key.channel().asInstanceOf[ServerSocketChannel]
    val socketChannel = serverSocketChannel.accept()
    try {
      connectionQuotas.inc(socketChannel.socket().getInetAddress)
      socketChannel.configureBlocking(false)
      socketChannel.socket().setTcpNoDelay(true)
      socketChannel.socket().setKeepAlive(true)
      if (sendBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE)
        socketChannel.socket().setSendBufferSize(sendBufferSize)
      debug("Accepted connection from %s on %s and assigned it to processor %d, sendBufferSize [actual|requested]: [%d|%d] recvBufferSize [actual|requested]: [%d|%d]"
            .format(socketChannel.socket.getRemoteSocketAddress, socketChannel.socket.getLocalSocketAddress, processor.id,
                  socketChannel.socket.getSendBufferSize, sendBufferSize,
                  socketChannel.socket.getReceiveBufferSize, recvBufferSize))
      processor.accept(socketChannel)
    } catch {
      case e: TooManyConnectionsException =>
        info("Rejected connection from %s, address already has the configured maximum of %d connections.".format(e.ip, e.count))
        close(socketChannel)
    }
  }

Processor的循環(huán)。

  • 設(shè)置新連接
  • 如果有Response嘗試寫回
  • 帶timeout的poll一次
  • 接收Request
  • 處理已經(jīng)發(fā)送成功的Response
  • 處理已經(jīng)斷開的連接
override def run() {
   startupComplete()
   while (isRunning) {
     try {
       // setup any new connections that have been queued up
       configureNewConnections()
       // register any new responses for writing
       processNewResponses()
       poll()
       processCompletedReceives()
       processCompletedSends()
       processDisconnected()
     } catch {
       // We catch all the throwables here to prevent the processor thread from exiting. We do this because
       // letting a processor exit might cause a bigger impact on the broker. Usually the exceptions thrown would
       // be either associated with a specific socket channel or a bad request. We just ignore the bad socket channel
       // or request. This behavior might need to be reviewed if we see an exception that need the entire broker to stop.
       case e: ControlThrowable => throw e
       case e: Throwable =>
         error("Processor got uncaught exception.", e)
     }
   }
   debug("Closing selector - processor " + id)
   swallowError(closeAll())
   shutdownComplete()
 }

處理新連接 configureNewConnections

Acceptor傳過來的新socket放在了一個ConcorrentLinkedQueue中,
congiureNewConnections()負(fù)責(zé)獲取ip端口號等信息然后注冊到Processor自己的selector上。這個selector是Kafka封裝了一層的KSelector

/**
   * Register any new connections that have been queued up
   */
  private def configureNewConnections() {
    while (!newConnections.isEmpty) {
      val channel = newConnections.poll()
      try {
        debug(s"Processor $id listening to new connection from ${channel.socket.getRemoteSocketAddress}")
        val localHost = channel.socket().getLocalAddress.getHostAddress
        val localPort = channel.socket().getLocalPort
        val remoteHost = channel.socket().getInetAddress.getHostAddress
        val remotePort = channel.socket().getPort
        val connectionId = ConnectionId(localHost, localPort, remoteHost, remotePort).toString
        selector.register(connectionId, channel)
      } catch {
        // We explicitly catch all non fatal exceptions and close the socket to avoid a socket leak. The other
        // throwables will be caught in processor and logged as uncaught exceptions.
        case NonFatal(e) =>
          val remoteAddress = channel.getRemoteAddress
          // need to close the channel here to avoid a socket leak.
          close(channel)
          error(s"Processor $id closed connection from $remoteAddress", e)
      }
    }
  }

processNewResponses

從requestChannel中poll待寫回的Response,這里是將Channel的send變量設(shè)置為Response.responseSend等待Selector處理

private def processNewResponses() {
    var curr = requestChannel.receiveResponse(id)
    while (curr != null) {
      try {
        curr.responseAction match {
          case RequestChannel.NoOpAction =>
            // There is no response to send to the client, we need to read more pipelined requests
            // that are sitting in the server's socket buffer
            updateRequestMetrics(curr.request)
            trace("Socket server received empty response to send, registering for read: " + curr)
            val channelId = curr.request.connectionId
            if (selector.channel(channelId) != null || selector.closingChannel(channelId) != null)
                selector.unmute(channelId)
          case RequestChannel.SendAction =>
            val responseSend = curr.responseSend.getOrElse(
              throw new IllegalStateException(s"responseSend must be defined for SendAction, response: $curr"))
            sendResponse(curr, responseSend)
          case RequestChannel.CloseConnectionAction =>
            updateRequestMetrics(curr.request)
            trace("Closing socket connection actively according to the response code.")
            close(selector, curr.request.connectionId)
        }
      } finally {
        curr = requestChannel.receiveResponse(id)
      }
    }
  }

Selector.send

/**
    * Queue the given request for sending in the subsequent {@link #poll(long)} calls
    * @param send The request to send
    */
   public void send(Send send) {
       String connectionId = send.destination();
       if (closingChannels.containsKey(connectionId))
           this.failedSends.add(connectionId);
       else {
           KafkaChannel channel = channelOrFail(connectionId, false);
           try {
               channel.setSend(send);
           } catch (CancelledKeyException e) {
               this.failedSends.add(connectionId);
               close(channel, false);
           }
       }
   }

processCompletedReceives

Selector在接收到請求后,將數(shù)據(jù)放到一個List中,Processor取出后put到requestChannel的requestQueue中

private def processCompletedReceives() {
    selector.completedReceives.asScala.foreach { receive =>
      try {
        val openChannel = selector.channel(receive.source)
        // Only methods that are safe to call on a disconnected channel should be invoked on 'openOrClosingChannel'.
        val openOrClosingChannel = if (openChannel != null) openChannel else selector.closingChannel(receive.source)
        val session = RequestChannel.Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, openOrClosingChannel.principal.getName), openOrClosingChannel.socketAddress)
        val req = new RequestChannel.Request(processor = id, connectionId = receive.source, session = session,
          startTimeNanos = time.nanoseconds, listenerName = listenerName, securityProtocol = securityProtocol,
          memoryPool, receive.payload)
        requestChannel.sendRequest(req)
        selector.mute(receive.source)
      } catch {
        case e @ (_: InvalidRequestException | _: SchemaException) =>
          // note that even though we got an exception, we can assume that receive.source is valid. Issues with constructing a valid receive object were handled earlier
          error(s"Closing socket for ${receive.source} because of error", e)
          close(selector, receive.source)
      }
    }
  }

processCompletedSends

在Selector發(fā)送完成Resposne后,從inflightResponse中remove掉這個connnection -> resposne的鍵值對,當(dāng)前inflightResposne只用于驗證response的正確性,就是一個Channel寫的數(shù)據(jù)必須在發(fā)送后先記錄在inflightResponse中

private def processCompletedSends() {
  selector.completedSends.asScala.foreach { send =>
    val resp = inflightResponses.remove(send.destination).getOrElse {
      throw new IllegalStateException(s"Send for ${send.destination} completed, but not in `inflightResponses`")
    }
    updateRequestMetrics(resp.request)
    selector.unmute(send.destination)
  }
}

processDisconnected

寫失敗的連接和由于各種原因close的連接,需要清理已經(jīng)占用的內(nèi)存空間,例如inflightResponses。

private def processDisconnected() {
    selector.disconnected.keySet.asScala.foreach { connectionId =>
      val remoteHost = ConnectionId.fromString(connectionId).getOrElse {
        throw new IllegalStateException(s"connectionId has unexpected format: $connectionId")
      }.remoteHost
      inflightResponses.remove(connectionId).foreach(response => updateRequestMetrics(response.request))
      // the channel has been closed by the selector but the quotas still need to be updated
      connectionQuotas.dec(InetAddress.getByName(remoteHost))
    }
  }

至此網(wǎng)絡(luò)部分基本分析完成,后面有涉及到的要注意的地方會單獨介紹。
startup完成后,KafkaServer繼續(xù)完成其他的startup

Kafka Client端網(wǎng)絡(luò)代碼

clients包里分成主要Send、Receive、KafkaChannel和Selector四部分

Selectable是其中的網(wǎng)絡(luò)操作的接口, Selector是具體的實現(xiàn), 包括了發(fā)送請求、接收返回、建立連接、斷開連接等操作。

/**
 * An interface for asynchronous, multi-channel network I/O
 */
public interface Selectable {
    /**
     * See {@link #connect(String, InetSocketAddress, int, int) connect()}
     */
    public static final int USE_DEFAULT_BUFFER_SIZE = -1;
    /**
     * Begin establishing a socket connection to the given address identified by the given address
     * @param id The id for this connection
     * @param address The address to connect to
     * @param sendBufferSize The send buffer for the socket
     * @param receiveBufferSize The receive buffer for the socket
     * @throws IOException If we cannot begin connecting
     */
    public void connect(String id, InetSocketAddress address, int sendBufferSize, int receiveBufferSize) throws IOException;
    /**
     * Wakeup this selector if it is blocked on I/O
     */
    public void wakeup();
    /**
     * Close this selector
     */
    public void close();
    /**
     * Close the connection identified by the given id
     */
    public void close(String id);
    /**
     * Queue the given request for sending in the subsequent {@link #poll(long) poll()} calls
     * @param send The request to send
     */
    public void send(Send send);
    /**
     * Do I/O. Reads, writes, connection establishment, etc.
     * @param timeout The amount of time to block if there is nothing to do
     * @throws IOException
     */
    public void poll(long timeout) throws IOException;
    /**
     * The list of sends that completed on the last {@link #poll(long) poll()} call.
     */
    public List<Send> completedSends();
    /**
     * The list of receives that completed on the last {@link #poll(long) poll()} call.
     */
    public List<NetworkReceive> completedReceives();
    /**
     * The connections that finished disconnecting on the last {@link #poll(long) poll()}
     * call. Channel state indicates the local channel state at the time of disconnection.
     */
    public Map<String, ChannelState> disconnected();
    /**
     * The list of connections that completed their connection on the last {@link #poll(long) poll()}
     * call.
     */
    public List<String> connected();
    ...
}

Send作為要發(fā)送數(shù)據(jù)的接口, 子類實現(xiàn)complete()方法用于判斷是否已經(jīng)發(fā)送完成,實現(xiàn)writeTo(GatheringByteChannel channel)方法來實現(xiàn)寫入到Channel中,
size()方法返回要發(fā)送的數(shù)據(jù)的大小

/**
 * This interface models the in-progress sending of data to a destination identified by an integer id.
 */
public interface Send {
    /**
     * The numeric id for the destination of this send
     */
    String destination();
    /**
     * Is this send complete?
     */
    boolean completed();
    /**
     * Write some as-yet unwritten bytes from this send to the provided channel. It may take multiple calls for the send
     * to be completely written
     * @param channel The Channel to write to
     * @return The number of bytes written
     * @throws IOException If the write fails
     */
    long writeTo(GatheringByteChannel channel) throws IOException;
    /**
     * Size of the send
     */
    long size();
}

以ByteBufferSend實現(xiàn)為例, 保存ByteBuffer數(shù)組作為要發(fā)送的內(nèi)容,size就是這些ByteBuffer.remaining()的和,發(fā)送只需要委托給channe.write即可,每次發(fā)送后檢查剩余待發(fā)送的大小,當(dāng)沒有待發(fā)送的內(nèi)容并且channel中也都已經(jīng)發(fā)送完成就表示Send已經(jīng)完成了。

public class ByteBufferSend implements Send {
    private final String destination;
    private final int size;
    protected final ByteBuffer[] buffers;
    private int remaining;
    private boolean pending = false;
    public ByteBufferSend(String destination, ByteBuffer... buffers) {
        this.destination = destination;
        this.buffers = buffers;
        for (ByteBuffer buffer : buffers)
            remaining += buffer.remaining();
        this.size = remaining;
    }
    @Override
    public String destination() {
        return destination;
    }
    @Override
    public boolean completed() {
        return remaining <= 0 && !pending;
    }
    @Override
    public long size() {
        return this.size;
    }
    @Override
    public long writeTo(GatheringByteChannel channel) throws IOException {
        long written = channel.write(buffers);
        if (written < 0)
            throw new EOFException("Wrote negative bytes to channel. This shouldn't happen.");
        remaining -= written;
        pending = TransportLayers.hasPendingWrites(channel);
        return written;
    }
}

NetworkSend類繼承了ByteBufferSend,增加了4字節(jié)表示內(nèi)容大小(不包含這4byte)。

/**
 * A size delimited Send that consists of a 4 byte network-ordered size N followed by N bytes of content
 */
public class NetworkSend extends ByteBufferSend {
    public NetworkSend(String destination, ByteBuffer buffer) {
        super(destination, sizeDelimit(buffer));
    }
    private static ByteBuffer[] sizeDelimit(ByteBuffer buffer) {
        return new ByteBuffer[] {sizeBuffer(buffer.remaining()), buffer};
    }
    private static ByteBuffer sizeBuffer(int size) {
        ByteBuffer sizeBuffer = ByteBuffer.allocate(4);
        sizeBuffer.putInt(size);
        sizeBuffer.rewind();
        return sizeBuffer;
    }
}

與Send對應(yīng)的是Receive,表示從Channel中讀取的數(shù)據(jù)

public interface Receive extends Closeable {
    /**
     * The numeric id of the source from which we are receiving data.
     */
    String source();
    /**
     * Are we done receiving data?
     */
    boolean complete();
    /**
     * Read bytes into this receive from the given channel
     * @param channel The channel to read from
     * @return The number of bytes read
     * @throws IOException If the reading fails
     */
    long readFrom(ScatteringByteChannel channel) throws IOException;
    /**
     * Do we know yet how much memory we require to fully read this
     */
    boolean requiredMemoryAmountKnown();
    /**
     * Has the underlying memory required to complete reading been allocated yet?
     */
    boolean memoryAllocated();
}

org.apache.kafka.common.network.Selector類則負(fù)責(zé)具體的連接寫入讀取等操作
下面分析下這幾個操作的實現(xiàn)

connect過程,由于connect是異步的,所以connect方法返回后不一定已經(jīng)連接成功了,需要等SelectionKey.isConnectable()后判斷一次Channel.finishConnect才算連接成功。

public void connect(String id, InetSocketAddress address, int sendBufferSize, int receiveBufferSize) throws IOException {
        if (this.channels.containsKey(id))
            throw new IllegalStateException("There is already a connection for id " + id);
        SocketChannel socketChannel = SocketChannel.open();
        socketChannel.configureBlocking(false);
        Socket socket = socketChannel.socket();
        socket.setKeepAlive(true);
        if (sendBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE)
            socket.setSendBufferSize(sendBufferSize);
        if (receiveBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE)
            socket.setReceiveBufferSize(receiveBufferSize);
        socket.setTcpNoDelay(true);
        boolean connected;
        try {
            connected = socketChannel.connect(address);
        } catch (UnresolvedAddressException e) {
            socketChannel.close();
            throw new IOException("Can't resolve address: " + address, e);
        } catch (IOException e) {
            socketChannel.close();
            throw e;
        }
        SelectionKey key = socketChannel.register(nioSelector, SelectionKey.OP_CONNECT);
        KafkaChannel channel;
        try {
            channel = channelBuilder.buildChannel(id, key, maxReceiveSize, memoryPool);
        } catch (Exception e) {
            try {
                socketChannel.close();
            } finally {
                key.cancel();
            }
            throw new IOException("Channel could not be created for socket " + socketChannel, e);
        }
        key.attach(channel);
        this.channels.put(id, channel);
        if (connected) {
            // OP_CONNECT won't trigger for immediately connected channels
            log.debug("Immediately connected to node {}", channel.id());
            immediatelyConnectedKeys.add(key);
            key.interestOps(0);
        }
    }

poll方法,poll方法會調(diào)用一次JavaSelector的select方法,然后處理SelectionKey,分成可連接可讀可寫

public void poll(long timeout) throws IOException {
     if (timeout < 0)
         throw new IllegalArgumentException("timeout should be >= 0");
     boolean madeReadProgressLastCall = madeReadProgressLastPoll;
     clear();
     boolean dataInBuffers = !keysWithBufferedRead.isEmpty();
     if (hasStagedReceives() || !immediatelyConnectedKeys.isEmpty() || (madeReadProgressLastCall && dataInBuffers))
         timeout = 0;
     if (!memoryPool.isOutOfMemory() && outOfMemory) {
         //we have recovered from memory pressure. unmute any channel not explicitly muted for other reasons
         log.trace("Broker no longer low on memory - unmuting incoming sockets");
         for (KafkaChannel channel : channels.values()) {
             if (channel.isInMutableState() && !explicitlyMutedChannels.contains(channel)) {
                 channel.unmute();
             }
         }
         outOfMemory = false;
     }
     /* check ready keys */
     long startSelect = time.nanoseconds();
     int numReadyKeys = select(timeout);
     long endSelect = time.nanoseconds();
     this.sensors.selectTime.record(endSelect - startSelect, time.milliseconds());
     if (numReadyKeys > 0 || !immediatelyConnectedKeys.isEmpty() || dataInBuffers) {
         Set<SelectionKey> readyKeys = this.nioSelector.selectedKeys();
         keysWithBufferedRead.removeAll(readyKeys); //so no channel gets polled twice
         //poll from channels that have buffered data (but nothing more from the underlying socket)
         if (!keysWithBufferedRead.isEmpty()) {
             Set<SelectionKey> toPoll = keysWithBufferedRead;
             keysWithBufferedRead = new HashSet<>(); //poll() calls will repopulate if needed
             pollSelectionKeys(toPoll, false, endSelect);
         }
         //poll from channels where the underlying socket has more data
         pollSelectionKeys(readyKeys, false, endSelect);
         pollSelectionKeys(immediatelyConnectedKeys, true, endSelect);
     } else {
         madeReadProgressLastPoll = true; //no work is also "progress"
     }
     long endIo = time.nanoseconds();
     this.sensors.ioTime.record(endIo - endSelect, time.milliseconds());
     // we use the time at the end of select to ensure that we don't close any connections that
     // have just been processed in pollSelectionKeys
     maybeCloseOldestConnection(endSelect);
     // Add to completedReceives after closing expired connections to avoid removing
     // channels with completed receives until all staged receives are completed.
     addToCompletedReceives();
 }

pollSelectionKeys

/**
     * handle any ready I/O on a set of selection keys
     * @param selectionKeys set of keys to handle
     * @param isImmediatelyConnected true if running over a set of keys for just-connected sockets
     * @param currentTimeNanos time at which set of keys was determined
     */
    private void pollSelectionKeys(Set<SelectionKey> selectionKeys,
                                   boolean isImmediatelyConnected,
                                   long currentTimeNanos) {
        Iterator<SelectionKey> iterator = determineHandlingOrder(selectionKeys).iterator();
        while (iterator.hasNext()) {
            SelectionKey key = iterator.next();
            iterator.remove();
            KafkaChannel channel = channel(key);
            long channelStartTimeNanos = recordTimePerConnection ? time.nanoseconds() : 0;
            // register all per-connection metrics at once
            sensors.maybeRegisterConnectionMetrics(channel.id());
            if (idleExpiryManager != null)
                idleExpiryManager.update(channel.id(), currentTimeNanos);
            try {
                /* complete any connections that have finished their handshake (either normally or immediately) */
                if (isImmediatelyConnected || key.isConnectable()) {
                    if (channel.finishConnect()) {
                        this.connected.add(channel.id());
                        this.sensors.connectionCreated.record();
                        SocketChannel socketChannel = (SocketChannel) key.channel();
                        log.debug("Created socket with SO_RCVBUF = {}, SO_SNDBUF = {}, SO_TIMEOUT = {} to node {}",
                                socketChannel.socket().getReceiveBufferSize(),
                                socketChannel.socket().getSendBufferSize(),
                                socketChannel.socket().getSoTimeout(),
                                channel.id());
                    } else
                        continue;
                }
                /* if channel is not ready finish prepare */
                if (channel.isConnected() && !channel.ready()) {
                    channel.prepare();
                }
                attemptRead(key, channel);
                if (channel.hasBytesBuffered()) {
                    //this channel has bytes enqueued in intermediary buffers that we could not read
                    //(possibly because no memory). it may be the case that the underlying socket will
                    //not come up in the next poll() and so we need to remember this channel for the
                    //next poll call otherwise data may be stuck in said buffers forever.
                    keysWithBufferedRead.add(key);
                }
                /* if channel is ready write to any sockets that have space in their buffer and for which we have data */
                if (channel.ready() && key.isWritable()) {
                    Send send = channel.write();
                    if (send != null) {
                        this.completedSends.add(send);
                        this.sensors.recordBytesSent(channel.id(), send.size());
                    }
                }
                /* cancel any defunct sockets */
                if (!key.isValid())
                    close(channel, true);
            } catch (Exception e) {
                String desc = channel.socketDescription();
                if (e instanceof IOException)
                    log.debug("Connection with {} disconnected", desc, e);
                else
                    log.warn("Unexpected error from {}; closing connection", desc, e);
                close(channel, true);
            } finally {
                maybeRecordTimePerConnection(channel, channelStartTimeNanos);
            }
        }
    }

Selector.send(Send send)方法只需要找到對應(yīng)的channel然后調(diào)用KafkaChanne.setSend(Send send), KafkaChannel中同時只允許寫一個Send對象,發(fā)送完成才能發(fā)送下一個

KafkaClient是Kafka定義的高層的接口

/**
 * Queue up the given request for sending. Requests can only be sent on ready connections.
 * @param request The request
 * @param now The current timestamp
 */
void send(ClientRequest request, long now);
/**
 * Do actual reads and writes from sockets.
 * 
 * @param timeout The maximum amount of time to wait for responses in ms, must be non-negative. The implementation
 *                is free to use a lower value if appropriate (common reasons for this are a lower request or
 *                metadata update timeout)
 * @param now The current time in ms
 * @throws IllegalStateException If a request is sent to an unready node
 */
List<ClientResponse> poll(long timeout, long now);

關(guān)鍵的接口有send和poll, send方法將要發(fā)送的內(nèi)容保存起來,真正的Channel讀寫發(fā)生在poll方法中
KafkaClient的實現(xiàn)類是NetworkClient。
ClientRequest中通過requetBuilder給不同類型的請求設(shè)置不同的請求內(nèi)容

public final class ClientRequest {
    private final String destination;
    private final AbstractRequest.Builder<?> requestBuilder;
    private final int correlationId;
    private final String clientId;
    private final long createdTimeMs;
    private final boolean expectResponse;
    private final RequestCompletionHandler callback;

同樣的,ClientResponse也有對應(yīng)各個類型不同的返回體

public class ClientResponse {
    private final RequestHeader requestHeader;
    private final RequestCompletionHandler callback;
    private final String destination;
    private final long receivedTimeMs;
    private final long latencyMs;
    private final boolean disconnected;
    private final UnsupportedVersionException versionMismatch;
    private final AbstractResponse responseBody;
private void doSend(ClientRequest clientRequest, boolean isInternalRequest, long now, AbstractRequest request) {
    String nodeId = clientRequest.destination();
    ...
    Send send = request.toSend(nodeId, header);
    InFlightRequest inFlightRequest = new InFlightRequest(
            header,
            clientRequest.createdTimeMs(),
            clientRequest.destination(),
            clientRequest.callback(),
            clientRequest.expectResponse(),
            isInternalRequest,
            request,
            send,
            now);
    this.inFlightRequests.add(inFlightRequest);
    selector.send(inFlightRequest.send);
}
public void send(Send send) {
    String connectionId = send.destination();
    if (closingChannels.containsKey(connectionId))
        this.failedSends.add(connectionId);
    else {
        KafkaChannel channel = channelOrFail(connectionId, false);
        try {
            channel.setSend(send);
        } catch (CancelledKeyException e) {
            this.failedSends.add(connectionId);
            close(channel, false);
        }
    }
}

poll的處理流程為

  1. 調(diào)用selector.poll
  2. 處理已經(jīng)發(fā)送完成的Send, 有一些請求不需要等待返回
  3. 處理收到的返回結(jié)果
  4. 處理斷開的連接
  5. 處理新連接
  6. 處理新建連接后獲取api版本號的請求
  7. 處理超時請求
  8. 調(diào)用各個Response的onComplete方法, onComplete實際調(diào)用的是ClientRequest中設(shè)置的callback
public List<ClientResponse> poll(long timeout, long now) {
    if (!abortedSends.isEmpty()) {
        // If there are aborted sends because of unsupported version exceptions or disconnects,
        // handle them immediately without waiting for Selector#poll.
        List<ClientResponse> responses = new ArrayList<>();
        handleAbortedSends(responses);
        completeResponses(responses);
        return responses;
    }
    long metadataTimeout = metadataUpdater.maybeUpdate(now);
    try {
        this.selector.poll(Utils.min(timeout, metadataTimeout, requestTimeoutMs));
    } catch (IOException e) {
        log.error("Unexpected error during I/O", e);
    }
    // process completed actions
    long updatedNow = this.time.milliseconds();
    List<ClientResponse> responses = new ArrayList<>();
    handleCompletedSends(responses, updatedNow);
    handleCompletedReceives(responses, updatedNow);
    handleDisconnections(responses, updatedNow);
    handleConnections();
    handleInitiateApiVersionRequests(updatedNow);
    handleTimedOutRequests(responses, updatedNow);
    completeResponses(responses);
    return responses;
}

標(biāo)簽: seo ssd ssl 代碼 網(wǎng)絡(luò)

版權(quán)申明:本站文章部分自網(wǎng)絡(luò),如有侵權(quán),請聯(lián)系:west999com@outlook.com
特別注意:本站所有轉(zhuǎn)載文章言論不代表本站觀點!
本站所提供的圖片等素材,版權(quán)歸原作者所有,如需使用,請與原作者聯(lián)系。

上一篇:numfmt : 讓數(shù)字變得更容易理解

下一篇:JAVA 同步實現(xiàn)原理