Java四種執行緒池和引數詳解

一、四種執行緒池

Java透過Executors提供四種靜態方法來建立執行緒池

例如:

//建立一個可快取執行緒池ExecutorService cachedThreadPool = Executors。newCachedThreadPool();//執行任務cachedThreadPool。execute(Runnable command);

如下:

1。newSingleThreadPool 建立一個單執行緒化的執行緒池,它只會用唯一的工作執行緒來執行任務,保證所有任務按照指定順序(FIFO、LIFO、優先順序)執行。

2。newFixedThreadPool 建立一個定長執行緒池,可控制執行緒最大併發數,超出的執行緒會在佇列中等待。

3。newScheduledThreadPool 建立一個可定期或者延時執行任務的定長執行緒池,支援定時及週期性任務執行。

4。newCachedThreadPool 建立一個可快取執行緒池,如果執行緒池長度超過處理需求,可靈活回收空閒執行緒,若無可回收執行緒,則新建執行緒。

二、核心引數

四種執行緒池本質都是建立ThreadPoolExecutor類,ThreadPoolExecutor構造引數如下

1。int corePoolSize 核心執行緒數量

2。int maximumPoolSize 最大執行緒數量

3。long keepAliveTime 超過corePoolSize的執行緒多久不活動別銷燬時間

4。TimeUnit unit 時間單位

5。BlockingQueue workQueue 任務佇列

6。ThreadFactory threadFactory 執行緒池工廠

7。RejectedExecutionHandler handler 拒絕策略

三、執行緒池工廠 ThreadFactory

所在包位置:java。util。concurrent。ThreadFactory

ThreadFactory介面。這個介面是Java自身提供的,使用者可以實現它自定義自己的執行緒啟動方式,可以設定執行緒名稱、型別以及優先順序等屬性。

ThreadFactory vs Default ThreadFactory:

在一個典型的Java ExecutorService程式中,其執行緒都需要被指定以何種形式執行,如果程式初始化ExecutorService時沒有指定ThreadFactory,程式會採用一個預設的ThreadFactory來生成提交執行緒,但是對於一個嚴謹對程式來說,定義自己的ThreadFactory永遠是個最佳選擇。Why??

1。設定更有描述意義的執行緒名稱。如果使用預設的ThreadFactory,它給執行緒起名字大概規律就是pool-m-thread-n這個樣子,如pool-1-thread-1。但是當你分析一個thread dump時,看著這樣的名字就很難知道執行緒的目的。所以使用一個有描述意義的執行緒名稱是分析追蹤問題的clue NO。1。

2。設定執行緒是否是守護執行緒,預設的ThreadFactory總是提交非守護執行緒

3。設定執行緒優先順序,預設ThreadFactory總是提交的一般優先順序執行緒

例子:

CustomThreadFactoryBuilder類實現了一種優雅的Builder Mechanism方式去得到一個自定義ThreadFactory例項。ThreadFactory介面中有一個接受Runnable型別引數的方法newThread(Runnable r),你自己的factory邏輯就應該寫在這個方法中,去配置執行緒名稱、優先順序、守護執行緒狀態等屬性。

public class CustomThreadFactoryBuilder { private String namePrefix = null; private boolean daemon = false; private int priority = Thread。NORM_PRIORITY; public CustomThreadFactoryBuilder setNamePrefix(String namePrefix) { if (namePrefix == null) { throw new NullPointerException(); } this。namePrefix = namePrefix; return this; } public CustomThreadFactoryBuilder setDaemon(boolean daemon) { this。daemon = daemon; return this; } public CustomThreadFactoryBuilder setPriority(int priority) { if (priority < Thread。MIN_PRIORITY){ throw new IllegalArgumentException(String。format( “Thread priority (%s) must be >= %s”, priority, Thread。MIN_PRIORITY)); } if (priority > Thread。MAX_PRIORITY) { throw new IllegalArgumentException(String。format( “Thread priority (%s) must be <= %s”, priority, Thread。MAX_PRIORITY)); } this。priority = priority; return this; } public ThreadFactory build() { return build(this); } private static ThreadFactory build(CustomThreadFactoryBuilder builder) { final String namePrefix = builder。namePrefix; final Boolean daemon = builder。daemon; final Integer priority = builder。priority; final AtomicLong count = new AtomicLong(0); /* return new ThreadFactory() { @Override public Thread newThread(Runnable runnable) { Thread thread = new Thread(runnable); if (namePrefix != null) { thread。setName(namePrefix + “-” + count。getAndIncrement()); } if (daemon != null) { thread。setDaemon(daemon); } if (priority != null) { thread。setPriority(priority); } return thread; } };*/ //jdk8中還是優先使用lamb表示式 return (Runnable runnable) -> { Thread thread = new Thread(runnable); if (namePrefix != null) { thread。setName(namePrefix + “-” + count。getAndIncrement()); } if (daemon != null) { thread。setDaemon(daemon); } /* thread。setPriority(priority); */ return thread; }; }}

SimpleTask類實現類Runnable介面,打印出了執行緒的執行屬性(名稱,優先順序等)。

public class SimpleTask implements Runnable { private long sleepTime; public SimpleTask(long sleepTime) { super(); this。sleepTime = sleepTime; } @Override public void run() { while (true) { try { System。out。println(“Simple task is running on ” + Thread。currentThread()。getName() + “ with priority ” + Thread。currentThread()。getPriority()); Thread。sleep(sleepTime); } catch (InterruptedException e) { e。printStackTrace(); } } }}

CustomThreadFactoryDemo類使用我們上面的CustomThreadFactoryBuilder類建立類一個ThreadFactory例項,又使用這個例項獲得類一個ExecutoryService,這樣所有這個執行緒池中的執行緒都會按我們定義好的屬性被生成,下面程式碼中執行類三個SimpleTask。

public class CustomThreadFactoryDemo { public static void main(String[] args) { ThreadFactory customThreadfactory = new CustomThreadFactoryBuilder() 。setNamePrefix(“DemoPool-Thread”)。setDaemon(false) 。setPriority(Thread。MAX_PRIORITY)。build(); ExecutorService executorService = Executors。newFixedThreadPool(3, customThreadfactory); // Create three simple tasks with 1000 ms sleep time SimpleTask simpleTask1 = new SimpleTask(1000); SimpleTask simpleTask2 = new SimpleTask(1000); SimpleTask simpleTask3 = new SimpleTask(1000); // Execute three simple tasks with 1000 ms sleep time executorService。execute(simpleTask1); executorService。execute(simpleTask2); executorService。execute(simpleTask3); }}

輸出結果:

Simple task is running on DemoPool-Thread-0 with priority 10Simple task is running on DemoPool-Thread-1 with priority 10Simple task is running on DemoPool-Thread-2 with priority 10Simple task is running on DemoPool-Thread-0 with priority 10Simple task is running on DemoPool-Thread-1 with priority 10Simple task is running on DemoPool-Thread-2 with priority 10Simple task is running on DemoPool-Thread-0 with priority 10Simple task is running on DemoPool-Thread-1 with priority 10Simple task is running on DemoPool-Thread-2 with priority 10Simple task is running on DemoPool-Thread-0 with priority 10Simple task is running on DemoPool-Thread-1 with priority 10Simple task is running on DemoPool-Thread-2 with priority 10

四、任務佇列 BlockingQueue

Tops:BlockingQueue不能夠新增null物件,否則會丟擲空指標異常。

介面抽象方法

boolean add(E e);

新增元素,新增成功返回true ,新增失敗丟擲異常 IllegalStateException。

boolean offer(E e);

true:新增元素成功 ; false : 新增元素失敗。

void put(E e) throws InterruptedException;

新增元素,直到有空間新增成功才會返回,阻塞方法。

boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException;

true:新增資料成功,false:超時時間到。

E take() throws InterruptedException;

獲取隊頭的元素,阻塞方法,會一直等到有元素獲取到才會返回,獲取到元素時並將佇列中的該元素刪除。

E poll(long timeout, TimeUnit unit) throws InterruptedException;

獲取隊頭的元素,阻塞方法,超時時間到則返回null,獲取到元素時並將佇列中的該元素刪除。

int remainingCapacity();

返回理想情況下此佇列可以新增的其他元素的數量。

boolean remove(Object o);

移除指定的元素。

boolean contains(Object o);

檢查是否包含該元素

int drainTo(Collection<? super E> c);

移除佇列中的所有元素並新增到集合c,返回被移除元素的數量。

int drainTo(Collection<? super E> c, int maxElements);

移除佇列中maxElements個元素並新增到集合c,返回被移除元素的數量。

實現類

Tops: 所有的實現類都是併發安全的。

ArrayBlockingQueue

ArrayBlockingQueue 是 BlockingQueue 介面的有界佇列實現類,底層採用陣列來實現。其併發控制採用可重入鎖來控制,不管是插入操作還是讀取操作,都需要獲取到鎖才能進行操作。

SynchronousQueue

它是一個特殊的佇列,它的名字其實就蘊含了它的特徵 – - 同步的佇列。為什麼說是同步的呢?這裡說的並不是多執行緒的併發問題,而是因為當一個執行緒往佇列中寫入一個元素時,寫入操作不會立即返回,需要等待另一個執行緒來將這個元素拿走;同理,當一個讀執行緒做讀操作的時候,同樣需要一個相匹配的寫執行緒的寫操作。這裡的 Synchronous 指的就是讀執行緒和寫執行緒需要同步,一個讀執行緒匹配一個寫執行緒。

LinkedBlockingDeque

LinkedBlockingDeque就是一個雙向佇列,任何一端都可以進行元素的出入。底層基於單向連結串列實現的阻塞佇列,可以當做無界佇列也可以當做有界佇列來使用。

LinkedBlockingQueue

LinkedBlockingQueue是一個單向佇列,只能一端出一端入的單向佇列結構,是有FIFO特性的,並且是透過兩個ReentrantLock和兩個Condition來實現的。底層基於單向連結串列實現的阻塞佇列,可以當做無界佇列也可以當做有界佇列來使用。

DelayQueue

是一個支援延時獲取元素的無界阻塞佇列。內部用 PriorityQueue 實現。

LinkedTransferQueue

PriorityBlockingQueue

PriorityBlockingQueue是帶排序的 BlockingQueue 實現,其併發控制採用的是 ReentrantLock,佇列為無界佇列(ArrayBlockingQueue 是有界佇列,LinkedBlockingQueue 也可以透過在建構函式中傳入 capacity 指定佇列最大的容量,但是 PriorityBlockingQueue 只能指定初始的佇列大小,後面插入元素的時候,如果空間不夠的話會自動擴容)。

簡單地說,它就是 PriorityQueue 的執行緒安全版本。不可以插入 null 值,同時,插入佇列的物件必須是可比較大小的(comparable),否則報 ClassCastException 異常。它的插入操作 put 方法不會 block,因為它是無界佇列(take 方法在佇列為空的時候會阻塞)。

五、拒絕策略 RejectedExecutionHandle

在使用執行緒池並且使用有界佇列的時候,如果佇列滿了,任務新增到執行緒池的時候就會有問題,針對這些問題java執行緒池提供了以下幾種策略:

1。 AbortPolicy

2。 DiscardPolicy

3。 DiscardOldestPolicy

4。 CallerRunsPolicy

5。 自定義策略

AbortPolicy

該策略是執行緒池的預設策略。使用該策略時,如果執行緒池佇列滿了丟掉這個任務並且丟擲RejectedExecutionException異常。

原始碼如下:

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { //不做任何處理,直接丟擲異常 throw new RejectedExecutionException(“Task ” + r。toString() + “ rejected from ” + e。toString()); }

DiscardPolicy

這個策略和AbortPolicy的slient版本,如果執行緒池佇列滿了,會直接丟掉這個任務並且不會有任何異常。

原始碼如下:

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { //就是一個空的方法 }

DiscardOldestPolicy

這個策略從字面上也很好理解,丟棄最老的。也就是說如果佇列滿了,會將最早進入佇列的任務刪掉騰出空間,再嘗試加入佇列。

因為佇列是隊尾進,隊頭出,所以隊頭元素是最老的,因此每次都是移除對頭元素後再嘗試入隊。

原始碼如下:

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e。isShutdown()) { //移除隊頭元素 e。getQueue()。poll(); //再嘗試入隊 e。execute(r); } }

CallerRunsPolicy

使用此策略,如果新增到執行緒池失敗,那麼主執行緒會自己去執行該任務,不會等待執行緒池中的執行緒去執行。就像是個急脾氣的人,我等不到別人來做這件事就乾脆自己幹。

原始碼如下:

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e。isShutdown()) { //直接執行run方法 r。run(); } }

自定義策略

如果以上策略都不符合業務場景,那麼可以自己定義一個拒絕策略,只要實現RejectedExecutionHandler介面,並且實現rejectedExecution方法就可以了。具體的邏輯就在rejectedExecution方法裡去定義就OK了。

例如:我定義了我的一個拒絕策略,叫做MyRejectPolicy,裡面的邏輯就是列印處理被拒絕的任務內容

public class MyRejectPolicy implements RejectedExecutionHandler{ public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { //Sender是我的Runnable類,裡面有message欄位 if (r instanceof Sender) { Sender sender = (Sender) r; //直接列印 System。out。println(sender。getMessage()); } }}

Tops:

這幾種策略沒有好壞之分,只是適用不同場景,具體哪種合適根據具體場景和業務需要選擇,如果需要特殊處理就自己定義好了。