中文字幕在线观看,亚洲а∨天堂久久精品9966,亚洲成a人片在线观看你懂的,亚洲av成人片无码网站,亚洲国产精品无码久久久五月天

Java 線(xiàn)程池詳解

2018-09-12    來(lái)源:importnew

容器云強(qiáng)勢(shì)上線(xiàn)!快速搭建集群,上萬(wàn)Linux鏡像隨意使用

構(gòu)造一個(gè)線(xiàn)程池為什么需要幾個(gè)參數(shù)?如果避免線(xiàn)程池出現(xiàn)OOM?RunnableCallable的區(qū)別是什么?本文將對(duì)這些問(wèn)題一一解答,同時(shí)還將給出使用線(xiàn)程池的常見(jiàn)場(chǎng)景和代碼片段。

基礎(chǔ)知識(shí)

Executors創(chuàng)建線(xiàn)程池

Java中創(chuàng)建線(xiàn)程池很簡(jiǎn)單,只需要調(diào)用Executors中相應(yīng)的便捷方法即可,比如Executors.newFixedThreadPool(int nThreads),但是便捷不僅隱藏了復(fù)雜性,也為我們埋下了潛在的隱患(OOM,線(xiàn)程耗盡)。

Executors創(chuàng)建線(xiàn)程池便捷方法列表:

方法名 功能
newFixedThreadPool(int nThreads) 創(chuàng)建固定大小的線(xiàn)程池
newSingleThreadExecutor() 創(chuàng)建只有一個(gè)線(xiàn)程的線(xiàn)程池
newCachedThreadPool() 創(chuàng)建一個(gè)不限線(xiàn)程數(shù)上限的線(xiàn)程池,任何提交的任務(wù)都將立即執(zhí)行

小程序使用這些快捷方法沒(méi)什么問(wèn)題,對(duì)于服務(wù)端需要長(zhǎng)期運(yùn)行的程序,創(chuàng)建線(xiàn)程池應(yīng)該直接使用ThreadPoolExecutor的構(gòu)造方法。沒(méi)錯(cuò),上述Executors方法創(chuàng)建的線(xiàn)程池就是ThreadPoolExecutor。

ThreadPoolExecutor構(gòu)造方法

Executors中創(chuàng)建線(xiàn)程池的快捷方法,實(shí)際上是調(diào)用了ThreadPoolExecutor的構(gòu)造方法(定時(shí)任務(wù)使用的是ScheduledThreadPoolExecutor),該類(lèi)構(gòu)造方法參數(shù)列表如下:

// Java線(xiàn)程池的完整構(gòu)造函數(shù)
public ThreadPoolExecutor(
  int corePoolSize, // 線(xiàn)程池長(zhǎng)期維持的線(xiàn)程數(shù),即使線(xiàn)程處于Idle狀態(tài),也不會(huì)回收。
  int maximumPoolSize, // 線(xiàn)程數(shù)的上限
  long keepAliveTime, TimeUnit unit, // 超過(guò)corePoolSize的線(xiàn)程的idle時(shí)長(zhǎng),
                                     // 超過(guò)這個(gè)時(shí)間,多余的線(xiàn)程會(huì)被回收。
  BlockingQueue<Runnable> workQueue, // 任務(wù)的排隊(duì)隊(duì)列
  ThreadFactory threadFactory, // 新線(xiàn)程的產(chǎn)生方式
  RejectedExecutionHandler handler) // 拒絕策略

竟然有7個(gè)參數(shù),很無(wú)奈,構(gòu)造一個(gè)線(xiàn)程池確實(shí)需要這么多參數(shù)。這些參數(shù)中,比較容易引起問(wèn)題的有corePoolSize,?maximumPoolSize,?workQueue以及handler

  • corePoolSizemaximumPoolSize設(shè)置不當(dāng)會(huì)影響效率,甚至耗盡線(xiàn)程;
  • workQueue設(shè)置不當(dāng)容易導(dǎo)致OOM;
  • handler設(shè)置不當(dāng)會(huì)導(dǎo)致提交任務(wù)時(shí)拋出異常。

正確的參數(shù)設(shè)置方式會(huì)在下文給出。

線(xiàn)程池的工作順序

If fewer than corePoolSize threads are running, the Executor always prefers adding a new thread rather than queuing.
If corePoolSize or more threads are running, the Executor always prefers queuing a request rather than adding a new thread.
If a request cannot be queued, a new thread is created unless this would exceed maximumPoolSize, in which case, the task will be rejected.

corePoolSize -> 任務(wù)隊(duì)列 -> maximumPoolSize -> 拒絕策略

Runnable和Callable

可以向線(xiàn)程池提交的任務(wù)有兩種:RunnableCallable,二者的區(qū)別如下:

  1. 方法簽名不同,void Runnable.run(),?V Callable.call() throws Exception
  2. 是否允許有返回值,Callable允許有返回值
  3. 是否允許拋出異常,Callable允許拋出異常。

Callable是JDK1.5時(shí)加入的接口,作為Runnable的一種補(bǔ)充,允許有返回值,允許拋出異常。

三種提交任務(wù)的方式:

提交方式 是否關(guān)心返回結(jié)果
Future<T> submit(Callable<T> task)
void execute(Runnable command)
Future<?> submit(Runnable task) 否,雖然返回Future,但是其get()方法總是返回null

如何正確使用線(xiàn)程池

避免使用無(wú)界隊(duì)列

不要使用Executors.newXXXThreadPool()快捷方法創(chuàng)建線(xiàn)程池,因?yàn)檫@種方式會(huì)使用無(wú)界的任務(wù)隊(duì)列,為避免OOM,我們應(yīng)該使用ThreadPoolExecutor的構(gòu)造方法手動(dòng)指定隊(duì)列的最大長(zhǎng)度:

ExecutorService executorService = new ThreadPoolExecutor(2, 2, 
                0, TimeUnit.SECONDS, 
                new ArrayBlockingQueue<>(512), // 使用有界隊(duì)列,避免OOM
                new ThreadPoolExecutor.DiscardPolicy());

明確拒絕任務(wù)時(shí)的行為

任務(wù)隊(duì)列總有占滿(mǎn)的時(shí)候,這是再submit()提交新的任務(wù)會(huì)怎么樣呢?RejectedExecutionHandler接口為我們提供了控制方式,接口定義如下:

public interface RejectedExecutionHandler {
    void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}

線(xiàn)程池給我們提供了幾種常見(jiàn)的拒絕策略:
undefined

拒絕策略 拒絕行為
AbortPolicy 拋出RejectedExecutionException
DiscardPolicy 什么也不做,直接忽略
DiscardOldestPolicy 丟棄執(zhí)行隊(duì)列中最老的任務(wù),嘗試為當(dāng)前提交的任務(wù)騰出位置
CallerRunsPolicy 直接由提交任務(wù)者執(zhí)行這個(gè)任務(wù)

線(xiàn)程池默認(rèn)的拒絕行為是AbortPolicy,也就是拋出RejectedExecutionHandler異常,該異常是非受檢異常,很容易忘記捕獲。如果不關(guān)心任務(wù)被拒絕的事件,可以將拒絕策略設(shè)置成DiscardPolicy,這樣多余的任務(wù)會(huì)悄悄的被忽略。

ExecutorService executorService = new ThreadPoolExecutor(2, 2, 
                0, TimeUnit.SECONDS, 
                new ArrayBlockingQueue<>(512), 
                new ThreadPoolExecutor.DiscardPolicy());// 指定拒絕策略

獲取處理結(jié)果和異常

線(xiàn)程池的處理結(jié)果、以及處理過(guò)程中的異常都被包裝到Future中,并在調(diào)用Future.get()方法時(shí)獲取,執(zhí)行過(guò)程中的異常會(huì)被包裝成ExecutionException,submit()方法本身不會(huì)傳遞結(jié)果和任務(wù)執(zhí)行過(guò)程中的異常。獲取執(zhí)行結(jié)果的代碼可以這樣寫(xiě):

ExecutorService executorService = Executors.newFixedThreadPool(4);
Future<Object> future = executorService.submit(new Callable<Object>() {
        @Override
        public Object call() throws Exception {
            throw new RuntimeException("exception in call~");// 該異常會(huì)在調(diào)用Future.get()時(shí)傳遞給調(diào)用者
        }
    });
    
try {
  Object result = future.get();
} catch (InterruptedException e) {
  // interrupt
} catch (ExecutionException e) {
  // exception in Callable.call()
  e.printStackTrace();
}

上述代碼輸出類(lèi)似如下:
undefined

線(xiàn)程池的常用場(chǎng)景

正確構(gòu)造線(xiàn)程池

int poolSize = Runtime.getRuntime().availableProcessors() * 2;
BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(512);
RejectedExecutionHandler policy = new ThreadPoolExecutor.DiscardPolicy();
executorService = new ThreadPoolExecutor(poolSize, poolSize,
    0, TimeUnit.SECONDS,
            queue,
            policy);

獲取單個(gè)結(jié)果

過(guò)submit()向線(xiàn)程池提交任務(wù)后會(huì)返回一個(gè)Future,調(diào)用V Future.get()方法能夠阻塞等待執(zhí)行結(jié)果,V get(long timeout, TimeUnit unit)方法可以指定等待的超時(shí)時(shí)間。

獲取多個(gè)結(jié)果

如果向線(xiàn)程池提交了多個(gè)任務(wù),要獲取這些任務(wù)的執(zhí)行結(jié)果,可以依次調(diào)用Future.get()獲得。但對(duì)于這種場(chǎng)景,我們更應(yīng)該使用ExecutorCompletionService,該類(lèi)的take()方法總是阻塞等待某一個(gè)任務(wù)完成,然后返回該任務(wù)的Future對(duì)象。向CompletionService批量提交任務(wù)后,只需調(diào)用相同次數(shù)的CompletionService.take()方法,就能獲取所有任務(wù)的執(zhí)行結(jié)果,獲取順序是任意的,取決于任務(wù)的完成順序:

void solve(Executor executor, Collection<Callable<Result>> solvers)
   throws InterruptedException, ExecutionException {
   
   CompletionService<Result> ecs = new ExecutorCompletionService<Result>(executor);// 構(gòu)造器
   
   for (Callable<Result> s : solvers)// 提交所有任務(wù)
       ecs.submit(s);
       
   int n = solvers.size();
   for (int i = 0; i < n; ++i) {// 獲取每一個(gè)完成的任務(wù)
       Result r = ecs.take().get();
       if (r != null)
           use(r);
   }
}

單個(gè)任務(wù)的超時(shí)時(shí)間

V Future.get(long timeout, TimeUnit unit)方法可以指定等待的超時(shí)時(shí)間,超時(shí)未完成會(huì)拋出TimeoutException。

多個(gè)任務(wù)的超時(shí)時(shí)間

等待多個(gè)任務(wù)完成,并設(shè)置最大等待時(shí)間,可以通過(guò)CountDownLatch完成:

public void testLatch(ExecutorService executorService, List<Runnable> tasks) 
    throws InterruptedException{
      
    CountDownLatch latch = new CountDownLatch(tasks.size());
      for(Runnable r : tasks){
          executorService.submit(new Runnable() {
              @Override
              public void run() {
                  try{
                      r.run();
                  }finally {
                      latch.countDown();// countDown
                  }
              }
          });
      }
      latch.await(10, TimeUnit.SECONDS); // 指定超時(shí)時(shí)間
  }

線(xiàn)程池和裝修公司

以運(yùn)營(yíng)一家裝修公司做個(gè)比喻。公司在辦公地點(diǎn)等待客戶(hù)來(lái)提交裝修請(qǐng)求;公司有固定數(shù)量的正式工以維持運(yùn)轉(zhuǎn);旺季業(yè)務(wù)較多時(shí),新來(lái)的客戶(hù)請(qǐng)求會(huì)被排期,比如接單后告訴用戶(hù)一個(gè)月后才能開(kāi)始裝修;當(dāng)排期太多時(shí),為避免用戶(hù)等太久,公司會(huì)通過(guò)某些渠道(比如人才市場(chǎng)、熟人介紹等)雇傭一些臨時(shí)工(注意,招聘臨時(shí)工是在排期排滿(mǎn)之后);如果臨時(shí)工也忙不過(guò)來(lái),公司將決定不再接收新的客戶(hù),直接拒單。

線(xiàn)程池就是程序中的“裝修公司”,代勞各種臟活累活。上面的過(guò)程對(duì)應(yīng)到線(xiàn)程池上:

// Java線(xiàn)程池的完整構(gòu)造函數(shù)
public ThreadPoolExecutor(
  int corePoolSize, // 正式工數(shù)量
  int maximumPoolSize, // 工人數(shù)量上限,包括正式工和臨時(shí)工
  long keepAliveTime, TimeUnit unit, // 臨時(shí)工游手好閑的最長(zhǎng)時(shí)間,超過(guò)這個(gè)時(shí)間將被解雇
  BlockingQueue<Runnable> workQueue, // 排期隊(duì)列
  ThreadFactory threadFactory, // 招人渠道
  RejectedExecutionHandler handler) // 拒單方式

總結(jié)

Executors為我們提供了構(gòu)造線(xiàn)程池的便捷方法,對(duì)于服務(wù)器程序我們應(yīng)該杜絕使用這些便捷方法,而是直接使用線(xiàn)程池ThreadPoolExecutor的構(gòu)造方法,避免無(wú)界隊(duì)列可能導(dǎo)致的OOM以及線(xiàn)程個(gè)數(shù)限制不當(dāng)導(dǎo)致的線(xiàn)程數(shù)耗盡等問(wèn)題。ExecutorCompletionService提供了等待所有任務(wù)執(zhí)行結(jié)束的有效方式,如果要設(shè)置等待的超時(shí)時(shí)間,則可以通過(guò)CountDownLatch完成。

參考

  • ThreadPoolExecutor API Doc

標(biāo)簽: 代碼 服務(wù)器

版權(quán)申明:本站文章部分自網(wǎng)絡(luò),如有侵權(quán),請(qǐng)聯(lián)系:west999com@outlook.com
特別注意:本站所有轉(zhuǎn)載文章言論不代表本站觀點(diǎn)!
本站所提供的圖片等素材,版權(quán)歸原作者所有,如需使用,請(qǐng)與原作者聯(lián)系。

上一篇:JDK 源碼閱讀 : DirectByteBuffer

下一篇:SpringBoot | 第十八章:web 應(yīng)用開(kāi)發(fā)之WebJars 使用