Java學習筆記 線程池使用及詳解

有點笨,參考了好幾篇大佬們寫的文章才整理出來的筆記….

字面意思上解釋,線程池就是裝有線程的池,我們可以把要執行的多線程交給線程池來處理,和連接池的概念一樣,通過維護一定數量的線程池來達到多個線程的復用。

好處

多線程產生的問題

一般我們使用到多線程的編程的時候,需要通過new Thread(xxRunnable).start()創建並開啟線程,我們可以使用多線程來達到最優效率(如多線程下載)。

但是,線程不是越多就越好,線程過多,創建和銷毀就會消耗系統的資源,也不方便管理。

除此之外,多線程還會造成併發問題,線程併發數量過多,搶佔系統資源從而導致阻塞。

線程池優點

我們將線程放入線程池,由線程池對線程進行管理,可以對線程池中緩衝的線程進行復用,這樣,就不會經常去創建和銷毀線程了,從而省下了系統的資源。

線程池能夠有效的控制線程併發的數量,能夠解決多線程造成的併發問題。

除此之外,線程池還能夠對線程進行一定的管理,如延時執行、定時循環執行的策略等

線程池實現

線程池的實現,主要是通過這個類ThreadPoolExecutor,其的構造參數非常長,我們先大概了解,之後再進行詳細的介紹。

public ThreadPoolExecutor(int corePoolSize,
    int maximumPoolSize,long keepAliveTime,
    TimeUnit unit,BlockingQueue workQueue,
    RejectedExecutionHandler handler)
  • corePoolSize:線程池核心線程數量
  • maximumPoolSize:線程池最大線程數量
  • keepAliverTime:當活躍線程數大於核心線程數時,空閑的多餘線程最大存活時間
  • unit:存活時間的單位
  • workQueue:存放線程的工作隊列
  • handler:超出線程範圍和隊列容量的任務的處理程序(拒絕策略)

這裏大概簡單說明一下線程池的運行流程:

當線程被添加到線程池中,如果線程池中的當前的線程數量等於線程池定義的最大核心線程數量(corePoolSize)了,此線程就會別放入線程的工作隊列(workQueue)中,等待線程池的調用。

Java提供了一個工具類Excutors,方便我們快速創建線程池,其底層也是調用了ThreadPoolExecutor

不過阿里巴巴Java規範中強制要求我們應該通過ThreadPoolExecutor來創建自己的線程池,使用Excutors容易造成OOM問題。

所以,我們先從Excutors開始學習,之後在對ThreadPoolExecutor進行詳細的講解

Excutors

由於Excutors是工具類,所以下面的介紹的都是其的靜態方法,如果是比較線程數目比較少的小項目,可以使用此工具類來創建線程池

PS:把線程提交給線程池中,有兩種方法,一種是submit,另外一種則是execute

兩者的區別:

  1. execute沒有返回值,如果不需要知道線程的結果就使用execute方法,性能會好很多。
  2. submit返回一個Future對象,如果想知道線程結果就使用submit提交,而且它能在主線程中通過Future的get方法捕獲線程中的異常

線程池可以接收兩種的參數,一個為Runnable對象,另外則是Callable對象

Callable是JDK1.5時加入的接口,作為Runnable的一種補充,允許有返回值,允許拋出異常。

主要的幾個靜態方法:

方法 說明
newFixedThreadPool(int nThreads) 創建固定大小的線程池
newSingleThreadExecutor() 創建只有一個線程的線程池
newCachedThreadPool() 創建一個不限線程數上限的線程池,任何提交的任務都將立即執行
newScheduledThreadPool(int nThreads) 創建一個支持定時、周期性或延時任務的限定線程數目的線程池
newSingleThreadScheduledExecutor() 創建一個支持定時、周期性或延時任務的單個線程的線程池

1.newSingleThreadExecutor

創建一個單線程化的線程池,它只會用唯一的工作線程來執行任務,保證所有任務按照指定順序(FIFO, LIFO, 優先級)執行,我們可以使用它來達到控制線程順序執行。

控制進程順序執行:

Thread thread1 = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    System.out.println("這是線程1");
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
Thread thread2 = new Thread(new Runnable() {
    @Override
    public void run() {
        try {
            System.out.println("這是線程2");
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
});
Thread thread3 = new Thread(new Runnable() {
    @Override
    public void run() {
        try {
            System.out.println("這是線程3");
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
});
//創建線程池對象
ExecutorService executorService = Executors.newSingleThreadExecutor();
//把線程添加到線程池中
executorService.submit(thread1);
executorService.submit(thread2);
executorService.submit(thread3);

之後出現的結果就是按照順序輸出

2.newFixedThreadPool

創建一個定長線程池,可控制線程最大併發數,超出的線程會在隊列中等待。定長線程池的大小最好根據系統資源進行設置。如Runtime.getRuntime().availableProcessors()

3.newCachedThreadPool

創建一個可緩存線程池,如果線程池長度超過處理需要,可靈活回收空閑線程,若無可回收,則新建線程,線程池為無限大,當執行第二個任務時第一個任務已經完成,會復用執行第一個任務的線程,而不用每次新建線程。

代碼:

//創建了一個自定義的線程
public class MyThread extends Thread {
    private int index;

    public MyThread(int index) {
        this.index = index;
    }

    @Override
    public void run() {
        System.out.println(index+" 當前線程"+Thread.currentThread().getName());
    }
}

//創建緩存線程池
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < 10; i++) {
    executorService.execute(new MyThread(i));
    try {
        //這裏模擬等待時間,等待線程池復用回收線程
        Thread.sleep(1000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

可以看到結果都是使用的同一個線程

4.newScheduledThreadPool

創建一個定長線程池,支持定時、周期性或延時任務執行

延遲1s后啟動線程:

ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(2);
scheduledExecutorService.schedule(new MyThread(1),1, TimeUnit.SECONDS);

ThreadPoolExecutor

構造方法

上面提到的那個構造方法其實只是ThreadPoolExecutor類中的一個,ThreadPoolExecutor類中存在有四種不同的構造方法,主要區別就是參數不同。

//五個參數的構造函數
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue)

//六個參數的構造函數-1
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory)

//六個參數的構造函數-2
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          RejectedExecutionHandler handler)

//七個參數的構造函數
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler)

首先,有個概念需要明白,線程池的最大線程數(線程總數,maximumPoolSize)= 核心線程數(corePoolSize)+非核心線程數

  • corePoolSize:線程池核心線程數量
  • maximumPoolSize:線程池最大線程數量
  • keepAliverTime:當活躍線程數大於核心線程數時,空閑的多餘線程最大存活時間
  • unit:存活時間的單位
  • workQueue:存放線程的工作隊列
  • handler:超出線程範圍和隊列容量的任務的處理程序(拒絕策略)

核心線程和非核心線程有什麼區別呢?

核心線程是永遠不會被線程池丟棄回收(即使核心線程沒有工作),非核心線程則是超過一定時間(keepAliverTime)則就會被丟棄

workQueue

當所有的核心線程都在工作時,新添加的任務會被添加到這個隊列中等待處理,如果隊列滿了,則新建非核心線程執行任務

1.SynchronousQueue:這個隊列接收到任務的時候,會直接提交給線程處理,而不保留它,如果所有線程都在工作怎麼辦?那就新建一個線程來處理這個任務!所以為了保證不出現線程數達到了maximumPoolSize而不能新建線程的錯誤,使用這個類型隊列的時候,maximumPoolSize一般指定成Integer.MAX_VALUE,即無限大

2.LinkedBlockingQueue:這個隊列接收到任務的時候,如果當前線程數小於核心線程數,則新建線程(核心線程)處理任務;如果當前線程數等於核心線程數,則進入隊列等待。由於這個隊列沒有最大值限制,即所有超過核心線程數的任務都將被添加到隊列中,這也就導致了maximumPoolSize的設定失效,因為總線程數永遠不會超過corePoolSize

3.ArrayBlockingQueue:可以限定隊列的長度,接收到任務的時候,如果沒有達到corePoolSize的值,則新建線程(核心線程)執行任務,如果達到了,則入隊等候,如果隊列已滿,則新建線程(非核心線程)執行任務,又如果總線程數到了maximumPoolSize,並且隊列也滿了,則發生錯誤

4.DelayQueue:隊列內元素必須實現Delayed接口,這就意味着你傳進去的任務必須先實現Delayed接口。這個隊列接收到任務時,首先先入隊,只有達到了指定的延時時間,才會執行任務

拒絕策略:

拒絕策略 拒絕行為
AbortPolicy 拋出RejectedExecutionException異常(默認)
DiscardPolicy 不處理,丟棄掉
DiscardOldestPolicy 丟棄執行隊列中等待最久的一個任務,嘗試為新來的任務騰出位置
CallerRunsPolicy 直接由提交任務者執行這個任務

兩種方法設置拒絕策略:

//ThreadPoolExecutor對象的setRejectedExecutionHandler方法設置
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(2, 5, 60, TimeUnit.SECONDS, queue);
threadPool.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
//構造方法進行設置,省略

線程池默認的拒絕行為是AbortPolicy,也就是拋出RejectedExecutionHandler異常,該異常是非受檢異常,很容易忘記捕獲。

如果不關心任務被拒絕的事件,可以將拒絕策略設置成DiscardPolicy,這樣多餘的任務會悄悄的被忽略。

ThreadFactory

一個接口類,用來對線程進行設置,需要實現newThread(Runnable r)方法

官方的文檔說明:

newThread此方法一般來初始化線程的優先級(priority),名字(name),守護進程(daemon)或線程組(ThreadGroup)

簡單的例子(讓某個類實現ThreadFactory接口):

@Override
public Thread newThread(Runnable r) {
    Thread thread = new Thread(r);
    thread.setDaemon(true);
    return thread;
}

線程池獲取執行結果

PS:把線程提交給線程池中,有兩種方法,一種是submit,另外一種則是execute

兩者的區別:

  1. execute沒有返回值,如果不需要知道線程的結果就使用execute方法,性能會好很多。
  2. submit返回一個Future對象,如果想知道線程結果就使用submit提交,而且它能在主線程中通過Future的get方法捕獲線程中的異常

線程池可以接收兩種的參數,一個為Runnable對象,另外則是Callable對象

Callable是JDK1.5時加入的接口,作為Runnable的一種補充,允許有返回值,允許拋出異常。

線程池的處理結果、以及處理過程中的異常都被包裝到Future中,並在調用Future.get()方法時獲取,執行過程中的異常會被包裝成ExecutionException,submit()方法本身不會傳遞結果和任務執行過程中的異常。

獲取執行結果的代碼可以這樣寫:

ExecutorService executorService = Executors.newFixedThreadPool(4);
Future<Object> future = executorService.submit(new Callable<Object>() {
        @Override
        public Object call() throws Exception {
            //該異常會在調用Future.get()時傳遞給調用者
            throw new RuntimeException("exception in call~");
        }
    });
    
try {
    //獲得返回結果
    Object result = future.get();
    
    
} catch (InterruptedException e) {
  // interrupt
} catch (ExecutionException e) {
  // exception in Callable.call()
  e.printStackTrace();
}

線程池運行流程

一個形象的比喻說明線程池的流程:

規定:

  1. 線程池比作成一家公司
  2. 公司的最大員工數為maximumPoolSize
  3. 最大正式員工數為coolPoolSize(核心線程的總數)
  4. 最大員工數(maximumPoolSize) = 最大正式員工(coolPoolSize)和臨時工(非核心線程)
  5. 單子(任務)可看做為線程
  6. 隊列使用的是ArrayBlockingQueue
  7. 一個員工只能幹一個任務

最開始的時候,公司是沒有一名員工。之後,公司接到了單子(任務),這個時候,公司才去找員工(創建核心線程並讓線程開始執行),這個時候找到的員工就是正式員工了。

公司的聲譽越來越好,於是來了更多的單子,公司繼續招人,直到正式員工數量達到最大的正式員工的數量(核心線程數量已達到最大)

於是,多出來的單子就暫時地存放在了隊列中,都在排隊,等待正式員工們把手頭的工作做完之後,就從隊列中依次取出單子繼續工作。

某天,來了一個新單子,但是這個時候隊列已經滿了,公司為了自己的信譽和聲譽着想,不得已只能去找臨時工(創建非核心線程)來幫忙開始進行工作(負責新單子)

在此之後,又來了新單子,公司繼續去招臨時工為新來的單子工作,直到正式工和臨時工的數量已經達到了公司最大員工數。

這個時候,公司沒有辦法了,只能拒絕新來的單子了(拒絕策略)

此時,正式工和臨時工都是在加班加點去從隊列中取出任務來工作,終於某一天,隊列的已經沒有單子了,市場發展不好,單子越來越少,臨時工很久都不工作了(非核心線程超過了最大存活時間keepAliveTime),公司就把這些臨時工解僱了,直到剩下只有正式員工。

PS:如果也想要解僱正式員工(銷毀核心線程),可以設置ThreadPoolExecutor對象的的allowCoreThreadTimeOut這個屬性為true

個人理解,可能不是很正確,僅供參考!

線程池關閉

方法 說明
shutdown() 不再接受新的任務,之前提交的任務等執行結束再關閉線程池
shutdownNow() 不再接受新的任務,試圖停止池中的任務再關閉線程池,返回所有未處理的線程list列表。

總結

如果是小的Java程序,可以使用Excutors,如果是服務器程序,則使用ThreadPoolExecutor進行自定義線程池的創建

參考鏈接:

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理【其他文章推薦】

USB CONNECTOR 掌控什麼技術要點? 帶您認識其相關發展及效能

※高價3c回收,收購空拍機,收購鏡頭,收購 MACBOOK-更多收購平台討論專區

※評比前十大台北網頁設計台北網站設計公司知名案例作品心得分享

收購3c瘋!各款手機、筆電、相機、平板,歡迎來詢價!

※智慧手機時代的來臨,RWD網頁設計已成為網頁設計推薦首選