通過Thread Pool Executor類解析線程池執行任務的核心流程( 三 )

(3)CAS操作成功后 , 表示向線程池中成功添加了工作線程,此時,還沒有線程去執行任務 。使用全局的獨占鎖mainLock來將新增的工作線程Worker對象安全的添加到workers中 。
總體邏輯就是:創建新的Worker對象 , 并獲取Worker對象中的執行線程,如果線程不為空,則獲取獨占鎖,獲取鎖成功后,再次檢查線線程的狀態,這是避免在獲取獨占鎖之前其他線程修改了線程池的狀態,或者關閉了線程池 。如果線程池關閉,則需要釋放鎖 。否則將新增加的線程添加到工作集合中,釋放鎖并啟動線程執行任務 。將是否啟動線程的標識設置為true 。最后,判斷線程是否啟動,如果沒有啟動,則調用addWorkerFailed(Worker)方法 。最終返回線程是否起送的標識 。
//跳出最外層for循環 , 說明通過CAS新增線程數量成功//此時創建新的工作線程boolean workerStarted = false;boolean workerAdded = false;Worker w = null;try {//將執行的任務封裝成workerw = new Worker(firstTask);final Thread t = w.thread;if (t != null) {//獨占鎖,保證操作workers時的同步final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {//此處需要重新檢查線程池狀態//原因是在獲得鎖之前可能其他的線程改變了線程池的狀態int rs = runStateOf(ctl.get());if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {if (t.isAlive())throw new IllegalThreadStateException();//向worker中添加新任務workers.add(w);int s = workers.size();if (s > largestPoolSize)largestPoolSize = s;//將是否添加了新任務的標識設置為trueworkerAdded = true;}} finally {//釋放獨占鎖mainLock.unlock();}//添加新任成功,則啟動線程執行任務if (workerAdded) {t.start();//將任務是否已經啟動的標識設置為trueworkerStarted = true;}}} finally {//如果任務未啟動或啟動失敗 , 則調用addWorkerFailed(Worker)方法if (! workerStarted)addWorkerFailed(w);}//返回是否啟動任務的標識return workerStarted;addWorkerFailed(Worker)方法在addWorker(Runnable, boolean)方法中,如果添加工作線程失敗或者工作線程啟動失敗時 , 則會調用addWorkerFailed(Worker)方法,下面我們就來看看addWorkerFailed(Worker)方法的實現,如下所示 。
private void addWorkerFailed(Worker w) {//獲取獨占鎖final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {//如果Worker任務不為空if (w != null)//將任務從workers集合中移除workers.remove(w);//通過CAS將任務數量減1decrementWorkerCount();tryTerminate();} finally {//釋放鎖mainLock.unlock();}}addWorkerFailed(Worker)方法的邏輯就比較簡單了 , 獲取獨占鎖,將任務從workers中移除 , 并且通過CAS將任務的數量減1,最后釋放鎖 。
拒絕策略我們在分析execute(Runnable)方法時,線程池會在適當的時候調用reject(Runnable)方法來執行相應的拒絕策略,我們看下reject(Runnable)方法的實現 , 如下所示 。
final void reject(Runnable command) {handler.rejectedExecution(command, this);}通過代碼,我們發現調用的是handler的rejectedExecution方法 , handler又是個什么鬼,我們繼續跟進代碼,如下所示 。
private volatile RejectedExecutionHandler handler;再看看RejectedExecutionHandler是個啥類型,如下所示 。
package java.util.concurrent;public interface RejectedExecutionHandler { void rejectedExecution(Runnable r, ThreadPoolExecutor executor);}可以發現RejectedExecutionHandler是個接口,定義了一個rejectedExecution(Runnable, ThreadPoolExecutor)方法 。既然RejectedExecutionHandler是個接口,那我們就看看有哪些類實現了RejectedExecutionHandler接口 。

通過Thread Pool Executor類解析線程池執行任務的核心流程

文章插圖
看到這里,我們發現RejectedExecutionHandler接口的實現類正是線程池默認提供的四種拒絕策略的實現類 。
至于reject(Runnable)方法中具體會執行哪個類的拒絕策略,是根據創建線程池時傳遞的參數決定的 。如果沒有傳遞拒絕策略,則默認會執行AbortPolicy類的拒絕策略 。否則會執行傳遞的類的拒絕策略 。
在創建線程池時,除了能夠傳遞JDK默認提供的拒絕策略外,還可以傳遞自定義的拒絕策略 。如果想使用自定義的拒絕策略,則只需要實現RejectedExecutionHandler接口,并重寫rejectedExecution(Runnable, ThreadPoolExecutor)方法即可 。例如 , 下面的代碼 。
public class CustomPolicy implements RejectedExecutionHandler {public CustomPolicy() { }public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {if (!e.isShutdown()) {System.out.println("使用調用者所在的線程來執行任務")r.run();}}}使用如下方式創建線程池 。
new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), Executors.defaultThreadFactory(), new CustomPolicy());

推薦閱讀