中文字幕在线观看,亚洲а∨天堂久久精品9966,亚洲成a人片在线观看你懂的,亚洲av成人片无码网站,亚洲国产精品无码久久久五月天

JDK源碼閱讀:InterruptibleChannel與可中斷IO

2018-08-09    來(lái)源:importnew

容器云強(qiáng)勢(shì)上線!快速搭建集群,上萬(wàn)Linux鏡像隨意使用

Java傳統(tǒng)IO是不支持中斷的,所以如果代碼在read/write等操作阻塞的話,是無(wú)法被中斷的。這就無(wú)法和Thead的interrupt模型配合使用了。JavaNIO眾多的升級(jí)點(diǎn)中就包含了IO操作對(duì)中斷的支持。InterruptiableChannel表示支持中斷的Channel。我們常用的FileChannel,SocketChannel,DatagramChannel都實(shí)現(xiàn)了這個(gè)接口。

InterruptibleChannel接口

public interface InterruptibleChannel extends Channel
{

    /**
     * 關(guān)閉當(dāng)前Channel
     *     
     * 任何當(dāng)前阻塞在當(dāng)前channel執(zhí)行的IO操作上的線程,都會(huì)收到一個(gè)AsynchronousCloseException異常
     */
    public void close() throws IOException;
}

InterruptibleChannel接口沒(méi)有定義任何方法,其中的close方法是父接口就有的,這里只是添加了額外的注釋。

AbstractInterruptibleChannel實(shí)現(xiàn)了InterruptibleChannel接口,并提供了實(shí)現(xiàn)可中斷IO機(jī)制的重要的方法,比如begin(),end()

在解讀這些方法的代碼前,先了解一下NIO中,支持中斷的Channel代碼是如何編寫的。

第一個(gè)要求是要正確使用begin()end()方法:

boolean completed = false;
try {
    begin();
    completed = ...;    // 執(zhí)行阻塞IO操作
    return ...;         // 返回結(jié)果
} finally {
    end(completed);
}

NIO規(guī)定了,在阻塞IO的語(yǔ)句前后,需要調(diào)用begin()end()方法,為了保證end()方法一定被調(diào)用,要求放在finally語(yǔ)句塊中。

第二個(gè)要求是Channel需要實(shí)現(xiàn)java.nio.channels.spi.AbstractInterruptibleChannel#implCloseChannel這個(gè)方法。AbstractInterruptibleChannel在處理中斷時(shí),會(huì)調(diào)用這個(gè)方法,使用Channel的具體實(shí)現(xiàn)來(lái)關(guān)閉Channel。

接下來(lái)我們具體看一下begin()end()方法是如何實(shí)現(xiàn)的。

begin方法

// 保存中斷處理對(duì)象實(shí)例
private Interruptible interruptor;
// 保存被中斷線程實(shí)例
private volatile Thread interrupted;

protected final void begin() {
    // 初始化中斷處理對(duì)象,中斷處理對(duì)象提供了中斷處理回調(diào)
    // 中斷處理回調(diào)登記被中斷的線程,然后調(diào)用implCloseChannel方法,關(guān)閉Channel
    if (interruptor == null) {
        interruptor = new Interruptible() {
            public void interrupt(Thread target) {
                synchronized (closeLock) {
                    // 如果當(dāng)前Channel已經(jīng)關(guān)閉,則直接返回
                    if (!open)
                        return;

                    // 設(shè)置標(biāo)志位,同時(shí)登記被中斷的線程
                    open = false;
                    interrupted = target;
                    try {
                        // 調(diào)用具體的Channel實(shí)現(xiàn)關(guān)閉Channel
                        AbstractInterruptibleChannel.this.implCloseChannel();
                    } catch (IOException x) { }
                }
            }};
    }
    // 登記中斷處理對(duì)象到當(dāng)前線程
    blockedOn(interruptor);

    // 判斷當(dāng)前線程是否已經(jīng)被中斷,如果已經(jīng)被中斷,可能登記的中斷處理對(duì)象沒(méi)有被執(zhí)行,這里手動(dòng)執(zhí)行一下
    Thread me = Thread.currentThread();
    if (me.isInterrupted())
        interruptor.interrupt(me);
}

begin()方法中,我們可以看出NIO實(shí)現(xiàn)可中斷IO操作的思路,是在Thread的中斷邏輯中,掛載自定義的中斷處理對(duì)象,這樣Thread對(duì)象在被中斷時(shí),會(huì)執(zhí)行中斷處理對(duì)象中的回調(diào),這個(gè)回調(diào)中,執(zhí)行關(guān)閉Channel的操作。這樣就實(shí)現(xiàn)了Channel對(duì)線程中斷的響應(yīng)了。

接下來(lái)重點(diǎn)就是研究“Thread添加中斷處理邏輯”這個(gè)機(jī)制是如何實(shí)現(xiàn)的了,是通過(guò)blockedOn方法實(shí)現(xiàn)的:

static void blockedOn(Interruptible intr) {         // package-private
    sun.misc.SharedSecrets.getJavaLangAccess().blockedOn(Thread.currentThread(),intr);
}

blockedOn方法使用的是JavaLangAccessblockedOn方法。

SharedSecrets是一個(gè)神奇而糟糕的類,為啥說(shuō)是糟糕呢,因?yàn)檫@個(gè)方法的存在,就是為了訪問(wèn)JDK類庫(kù)中一些因?yàn)轭愖饔糜蛳拗贫獠繜o(wú)法訪問(wèn)的類或者方法。JDK很多類與方法是私有或者包級(jí)別私有的,外部是無(wú)法訪問(wèn)的,但是JDK在本身實(shí)現(xiàn)的時(shí)候又存在互相依賴的情況,所以為了外部可以不依賴反射訪問(wèn)這些類或者方法,在sun包下,存在這么一個(gè)類,提供了各種超越限制的方法。

SharedSecrets.getJavaLangAccess()方法返回JavaLangAccess對(duì)象。JavaLangAccess對(duì)象就和名稱所說(shuō)的一樣,提供了java.lang包下一些非公開(kāi)的方法的訪問(wèn)。這個(gè)類在System初始化時(shí)被構(gòu)造:

// java.lang.System#setJavaLangAccess
private static void setJavaLangAccess() {
    sun.misc.SharedSecrets.setJavaLangAccess(new sun.misc.JavaLangAccess(){
        public void blockedOn(Thread t, Interruptible b) {
            t.blockedOn(b);
        }
        //...
    });
}

可以看出,sun.misc.JavaLangAccess#blockedOn保證的就是java.lang.Thread#blockedOn這個(gè)包級(jí)別私有的方法:

/* The object in which this thread is blocked in an interruptible I/O
 * operation, if any.  The blocker's interrupt method should be invoked
 * after setting this thread's interrupt status.
 */
private volatile Interruptible blocker;
private final Object blockerLock = new Object();

/* Set the blocker field; invoked via sun.misc.SharedSecrets from java.nio code
 */
void blockedOn(Interruptible b) {
    // 串行化blocker相關(guān)操作
    synchronized (blockerLock) {
        blocker = b;
    }
}

而這個(gè)方法也非常簡(jiǎn)單,就是設(shè)置java.lang.Thread#blocker變量為之前提到的中斷處理對(duì)象。而且從注釋中可以看出,這個(gè)方法就是專門為NIO設(shè)計(jì)的,注釋都非常直白的提到了,NIO的代碼會(huì)通過(guò)sun.misc.SharedSecrets調(diào)用到這個(gè)方法。。

接下來(lái)就是重頭戲了,看一下Thread在中斷時(shí),如何調(diào)用NIO注冊(cè)的中斷處理器:

public void interrupt() {
    if (this != Thread.currentThread())
        checkAccess();

    synchronized (blockerLock) {
        Interruptible b = blocker;

        // 如果NIO設(shè)置了中斷處理器,則只需Thread本身的中斷邏輯后,調(diào)用中斷處理器的回調(diào)函數(shù)
        if (b != null) {
            interrupt0();           // 這一步會(huì)設(shè)置interrupt標(biāo)志位
            b.interrupt(this);
            return;
        }
    }

    // 如果沒(méi)有的話,就走普通流程
    interrupt0();
}

end方法

begin()方法負(fù)責(zé)添加Channel的中斷處理器到當(dāng)前線程。end()是在IO操作執(zhí)行完/中斷完后的操作,負(fù)責(zé)判斷中斷是否發(fā)生,如果發(fā)生判斷是當(dāng)前線程發(fā)生還是別的線程中斷把當(dāng)前操作的Channel給關(guān)閉了,對(duì)于不同的情況,拋出不同的異常。

protected final void end(boolean completed) throws AsynchronousCloseException
{
	// 清空線程的中斷處理器引用,避免線程一直存活導(dǎo)致中斷處理器無(wú)法被回收
    blockedOn(null);
    Thread interrupted = this.interrupted;

    if (interrupted != null && interrupted == Thread.currentThread()) {
        interrupted = null;
        throw new ClosedByInterruptException();
    }
    // 如果這次沒(méi)有讀取到數(shù)據(jù),并且Channel被另外一個(gè)線程關(guān)閉了,則排除Channel被異步關(guān)閉的異常
    // 但是如果這次讀取到了數(shù)據(jù),就不能拋出異常,因?yàn)檫@次讀取的數(shù)據(jù)是有效的,需要返回給用戶的(重要邏輯)
    if (!completed && !open)
        throw new AsynchronousCloseException();
}

通過(guò)代碼可以看出,如果是當(dāng)前線程被中斷,則拋出ClosedByInterruptException異常,表示Channel因?yàn)榫程中斷而被關(guān)閉了,IO操作也隨之中斷了。

如果是當(dāng)前線程發(fā)現(xiàn)Channel被關(guān)閉了,并且是讀取還未執(zhí)行完畢的情況,則拋出AsynchronousCloseException異常,表示Channel被異步關(guān)閉了。

end()邏輯的活動(dòng)圖如下:

場(chǎng)景分析

并發(fā)的場(chǎng)景分析起來(lái)就是復(fù)雜,上面的代碼不多,但是場(chǎng)景很多,我們以sun.nio.ch.FileChannelImpl#read(java.nio.ByteBuffer)為例分析一下可能的場(chǎng)景:

  1. A線程read,B線程中斷A線程:A線程拋出ClosedByInterruptException異常
  2. A,B線程read,C線程中斷A線程
    1. A被中斷時(shí),B剛剛進(jìn)入read方法:A線程拋出ClosedByInterruptException異常,B線程ensureOpen方法拋出ClosedChannelException異常
      1. A被中斷時(shí),B阻塞在底層read方法中:A線程拋出ClosedByInterruptException異常,B線程底層方法拋出異常返回,end方法中拋出AsynchronousCloseException異常
      2. A被中斷時(shí),B已經(jīng)讀取到數(shù)據(jù):A線程拋出ClosedByInterruptException異常,B線程正常返回

sun.nio.ch.FileChannelImpl#read(java.nio.ByteBuffer)代碼如下:

public int read(ByteBuffer dst) throws IOException {
    ensureOpen();  // 1
    if (!readable) // 2
        throw new NonReadableChannelException();
    synchronized (positionLock) {
        int n = 0;
        int ti = -1;
        try {            
            begin();
            ti = threads.add();
            if (!isOpen())
                return 0; // 3
            do {
                n = IOUtil.read(fd, dst, -1, nd); // 4
            } while ((n == IOStatus.INTERRUPTED) && isOpen());
            return IOStatus.normalize(n);
        } finally {
            threads.remove(ti);
            end(n > 0);
            assert IOStatus.check(n);
        }
    }
}

總結(jié)

在JavaIO時(shí)期,人們?yōu)榱酥袛郔O操作想了不少方法,核心操作就是關(guān)閉流,促使IO操作拋出異常,達(dá)到中斷IO的效果。NIO中,將這個(gè)操作植入了java.lang.Thread#interrupt方法,免去用戶自己編碼特定代碼的麻煩。使IO操作可以像其他可中斷方法一樣,在中斷時(shí)拋出ClosedByInterruptException異常,業(yè)務(wù)程序捕獲該異常即可對(duì)IO中斷做出響應(yīng)。

參考資料

  • java – What does JavaLangAccess.blockedOn(Thread t, Interruptible b) do? – Stack Overflow
  • Java NIO 那些躲在角落的細(xì)節(jié)

標(biāo)簽: 代碼

版權(quán)申明:本站文章部分自網(wǎng)絡(luò),如有侵權(quán),請(qǐng)聯(lián)系:west999com@outlook.com
特別注意:本站所有轉(zhuǎn)載文章言論不代表本站觀點(diǎn)!
本站所提供的圖片等素材,版權(quán)歸原作者所有,如需使用,請(qǐng)與原作者聯(lián)系。

上一篇:Java結(jié)合keytool實(shí)現(xiàn)非對(duì)稱加密和解密

下一篇:CPU核心架構(gòu)及對(duì)應(yīng)型號(hào)、芯片組一覽表