中文字幕在线观看,亚洲а∨天堂久久精品9966,亚洲成a人片在线观看你懂的,亚洲av成人片无码网站,亚洲国产精品无码久久久五月天

FutureTask在線程池中應用和源碼解析

2018-11-20    來源:importnew

容器云強勢上線!快速搭建集群,上萬Linux鏡像隨意使用

FutureTask 是一個支持取消的異步處理器,一般在線程池中用于異步接受callable返回值。

主要實現(xiàn)分三部分:

  1. 封裝 Callable,然后放到線程池中去異步執(zhí)行->run。
  2. 獲取結果-> get。
  3. 取消任務-> cancel。

接下來主要學習下該模型如何實現(xiàn)。

舉例說明FutureTask在線程池中的應用

// 第一步,定義線程池,
ExecutorService executor = new ThreadPoolExecutor(
        minPoolSize,
        maxPollSize,
        keepAliveTime,
        TimeUnit.SECONDS,
        new SynchronousQueue<>());

// 第二步,放到線程池中執(zhí)行,返回FutureTask
FutureTask  task = executor.submit(callable);

// 第三步,獲取返回值
T data = task.get();

學習FutureTask實現(xiàn)

類屬性

//以下是FutureTask的各種狀態(tài)
private volatile int state; 
private static final int NEW          = 0; 
private static final int COMPLETING   = 1;
private static final int NORMAL       = 2;
private static final int EXCEPTIONAL  = 3;
private static final int CANCELLED    = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED  = 6;

private Callable<V> callable; //執(zhí)行的任務
private Object outcome; //存儲結果或者異常
private volatile Thread runner;//執(zhí)行callable的線程
private volatile WaitNode waiters; //調用get方法等待獲取結果的線程棧

其中各種狀態(tài)存在 最終狀態(tài) status>COMPLETING
1)NEW -> COMPLETING -> NORMAL(有正常結果)
2) NEW -> COMPLETING -> EXCEPTIONAL(結果為異常) 
3) NEW -> CANCELLED(無結果) 
4) NEW -> INTERRUPTING -> INTERRUPTED(無結果)

類方法

從上面舉例說明開始分析。

run()方法

FutureTask 繼承 Runnable,ExecutorService submit 把提交的任務封裝成 FutureTask 然后放到線程池 ThreadPoolExecutor 的 execute 執(zhí)行。

public void run() {
    //如果不是初始狀態(tài)或者cas設置運行線程是當前線程不成功,直接返回
    if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                     null, Thread.currentThread()))
        return;
    try {
        Callable<V> c = callable;
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
              // 執(zhí)行callable任務 這里對異常進行了catch
                result = c.call();
                ran = true;
            } catch (Throwable ex) {
                result = null;
                ran = false;
                setException(ex); // 封裝異常到outcome
            }
            if (ran)
                set(result);
        }
    } finally {
        runner = null;
        int s = state;
        // 這里如果是中斷中,設置成最終狀態(tài)
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
}

以上是 run 方法源碼實現(xiàn)很簡單,解析如下:

  1. 如果不是始狀態(tài)或者 cas 設置運行線程是當前線程不成功,直接返回,防止多個線程重復執(zhí)行。
  2. 執(zhí)行 Callable 的 call(),即提交執(zhí)行任務(這里做了catch,會捕獲執(zhí)行任務的異常封裝到 outcome 中)。
  3. 如果成功執(zhí)行 set 方法,封裝結果。

set 方法

protected void set(V v) {
    //cas方式設置成completing狀態(tài),防止多個線程同時處理
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        outcome = v; // 封裝結果
        UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // 最終設置成normal狀態(tài)

        finishCompletion();
    }
}

解析如下:

  1. cas方式設置成completing狀態(tài),防止多個線程同時處理
  2. 封裝結果到outcome,然后設置到最終狀態(tài)normal
  3. 執(zhí)行finishCompletion方法。

finishCompletion方法

// state > COMPLETING; 不管異常,中斷,還是執(zhí)行完成,都需要執(zhí)行該方法來喚醒調用get方法阻塞的線程
private void finishCompletion() {
    // assert state > COMPLETING;
    for (WaitNode q; (q = waiters) != null;) {
        // cas 設置waiters為null,防止多個線程執(zhí)行。
        if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
            // 循環(huán)喚醒所有等待結果的線程
            for (;;) {
                Thread t = q.thread;
                if (t != null) {
                    q.thread = null;
                    //喚醒線程
                    LockSupport.unpark(t);
                }
                WaitNode next = q.next;
                if (next == null)
                    break;
                q.next = null; // unlink to help gc
                q = next;
            }
            break;
        }
    }
   //該方法為空,可以被重寫
    done();
    callable = null;        // to reduce footprint
}

解析如下:

遍歷waiters中的等待節(jié)點,并通過 LockSupport 喚醒每一個節(jié)點,通知每個線程,該任務執(zhí)行完成(可能是執(zhí)行完成,也可能 cancel,異常等)。

以上就是執(zhí)行的過程,接下來分析獲取結果的過程->get。

get 方法

public V get() throws InterruptedException, ExecutionException {
    int s = state;
    if (s <= COMPLETING)
        s = awaitDone(false, 0L);
    return report(s);
}
public V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException {
        if (unit == null)
            throw new NullPointerException();
        int s = state;
        if (s <= COMPLETING &&
            (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
            throw new TimeoutException();
        return report(s);
    }

解析如下:

以上兩個方法,原理一樣,其中一個設置超時時間,支持最多阻塞多長時間。
狀態(tài)如果小于 COMPLETING,說明還沒到最終狀態(tài),(不管是否是成功、異常、取消)。
調用 awaitDone 方法阻塞線程,最終調用 report 方法返回結果。

awaitDone 方法

private int awaitDone(boolean timed, long nanos)
        throws InterruptedException {
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        WaitNode q = null;
        boolean queued = false;
        for (;;) {
            //線程可中斷,如果當前阻塞獲取結果線程執(zhí)行interrupt()方法,則從隊列中移除該節(jié)點,并拋出中斷異常
            if (Thread.interrupted()) {
                removeWaiter(q);
                throw new InterruptedException();
            }
            int s = state;
            // 如果已經是最終狀態(tài),退出返回
            if (s > COMPLETING) {
                if (q != null)
                    q.thread = null;
                return s;
            }
            //這里做了個優(yōu)化,competiting到最終狀態(tài)時間很短,通過yield比掛起響應更快。
            else if (s == COMPLETING) // cannot time out yet
                Thread.yield();
            // 初始化該阻塞節(jié)點
            else if (q == null)
                q = new WaitNode();
            // cas方式寫到阻塞waiters棧中
            else if (!queued)
                queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                     q.next = waiters, q);
            // 這里做阻塞時間處理。
            else if (timed) {
                nanos = deadline - System.nanoTime();
                if (nanos <= 0L) {
                    removeWaiter(q);
                    return state;
                }
                // 阻塞線程,有超時時間
                LockSupport.parkNanos(this, nanos);
            }
            else
                // 阻塞線程
                LockSupport.park(this);
        }
    }

解析如下:

整體流程已寫到注解中,整體實現(xiàn)是放在一個死循環(huán)中,唯一出口,是達到最終狀態(tài)。
然后是構建節(jié)點元素,并將該節(jié)點入棧,同時阻塞當前線程等待運行主任務的線程喚醒該節(jié)點。

report 方法

private V report(int s) throws ExecutionException {
    Object x = outcome;
    if (s == NORMAL)
        return (V)x;
    if (s >= CANCELLED)
        throw new CancellationException();
    throw new ExecutionException((Throwable)x);
}

然后是report方法,如果是正常結束,返回結果,如果不是正常結束(取消,中斷)拋出異常。

最后分析下取消流程。

cancel 方法

public boolean cancel(boolean mayInterruptIfRunning) {
    if (!(state == NEW &&
          UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
              mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
        return false;
    try {    // in case call to interrupt throws exception
        if (mayInterruptIfRunning) {
            try {
                Thread t = runner;
                if (t != null)
                    t.interrupt();
            } finally { // final state
                UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
            }
        }
    } finally {
        finishCompletion();
    }
    return true;
}

解析如下:

mayInterruptIfRunning參數是是否允許運行中被中斷取消。

  1. 根據入參是否為true,CAS設置狀態(tài)為INTERRUPTING或CANCELLED,設置成功,繼續(xù)第二步,否則直接返回false。
  2. 如果允許運行中被中斷取消,調用runner.interupt()進行中斷取消,設置狀態(tài)為INTERRUPTED。
  3. 喚醒所有在get()方法等待的線程。

此處有兩種狀態(tài)轉換:

  1. 如果mayInterruptIfRunning為true:status狀態(tài)轉換為 new -> INTERRUPTING->INTERRUPTED。主動去中斷執(zhí)行線程,然后喚醒所有等待結果的線程。
  2. 如果mayInterruptIfRunning為false:status狀態(tài)轉換為 new -> CANCELLED。

不會去中斷執(zhí)行線程,直接喚醒所有等待結果的線程,從 awaitDone 方法中可以看到,喚醒等待線程后,直接從跳轉回 get 方法,然后把結果返回給獲取結果的線程,當然此時的結果是 null。

總結

以上就是 FutureTask 的源碼簡單解析,實現(xiàn)比較簡單,F(xiàn)utureTask 就是一個實現(xiàn) Future 模式,支持取消的異步處理器。

標簽: swap

版權申明:本站文章部分自網絡,如有侵權,請聯(lián)系:west999com@outlook.com
特別注意:本站所有轉載文章言論不代表本站觀點!
本站所提供的圖片等素材,版權歸原作者所有,如需使用,請與原作者聯(lián)系。

上一篇:Java字節(jié)碼結構剖析一:常量池

下一篇:Java字節(jié)碼結構剖析三:方法表