线程池源码解读

发布时间:2022-06-21 发布网站:脚本宝典
脚本宝典收集整理的这篇文章主要介绍了线程池源码解读脚本宝典觉得挺不错的,现在分享给大家,也给大家做个参考。

参考文档: https://www.jianshu wangt.cc /p/a977ab6704d7

 

码位置: java.util.concurrent.ThreadPoolExecutor#execute

jdk 版本: 1.  1 /**  2 * The runstate PRovides the main lifecycle control, taking on values:

  3      *
  4      *   RUNNING:  Accept new tasks and process queued tasks
  5      *   SHUTDOWN: Don't accept new tasks, but process queued tasks
  6      *   STOP:     Don't accept new tasks, don't process queued tasks,
  7      *             and interrupt in-progress tasks
  8      *   TIDYING:  All tasks have terminated, workerCount is zero,
  9      *             the thread transITioning to state TIDYING
 10      *             will run the terminated() hook method
 11      *   TERMINATED: terminated() has completed
 12      *
 13      * The numerical order among these values matters, to allow
 14      * ordered comparisons. The runState monotonically increases over
 15      * time, but need not hit each state. The transitions are:
 16      *
 17      * RUNNING -> SHUTDOWN
 18      *    On invocation of shutdown(), PErhaps implicitly in finalize()
 19      * (RUNNING or SHUTDOWN) -> STOP
 20      *    On invocation of shutdownNow()
 21      * SHUTDOWN -> TIDYING
 22      *    When both queue and pool are empty
 23      * STOP -> TIDYING
 24      *    When pool is empty
 25      * TIDYING -> TERMINATED
 26      *    When the terminated() hook method has completed terminated()
 27      */
 28     
 29     // 前三位表示运行状态,后面存储当前运行 workerCount
 30     private static final int COUNT_BITS = Integer.SIZE - 3; // 32 - 3
 31     
 32     // 最大容量
 33     private static final int CAPACITY   = (1 << COUNT_BITS) - 1; // 00011111111111111111111111111111
 34     
 35     /**
 36      * Maximum pool size. Note that the actual maximum is internally
 37      * bounded by CAPACITY. 实际线程池大小还是由 CAPACITY 决定
 38      */
 39     private volatile int maximumPoolSize;
 40     
 41     // 线程池的几个状态 官方注释在最上方
 42     // 接受新的任务
 43     private static final int RUNNING    = -1 << COUNT_BITS; // 11100000000000000000000000000000
 44     
 45     // 不接受新的任务,但是已在队列中的任务,还会继续处理
 46     private static final int SHUTDOWN   =  0 << COUNT_BITS; // 00000000000000000000000000000000
 47     
 48     // 不接受,不处理新的任务,且中断正在进行中的任务
 49     private static final int STOP       =  1 << COUNT_BITS; // 00100000000000000000000000000000
 50     
 51     // 所有任务已停止,workerCount 清零,注意 workerCount 是由 workerCountOf(int c) 计算得出的
 52     private static final int TIDYING    =  2 << COUNT_BITS; // 01000000000000000000000000000000
 53     
 54     // 所有任务已完成
 55     private static final int TERMINATED =  3 << COUNT_BITS; // 01100000000000000000000000000000
 56     
 57     // 线程池运行状态和已工作的 workerCount 初始化为 RUNNING 和 0
 58     private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
 59     
 60     // 计算当前 state
 61     // ~CAPACITY 为 11100000000000000000000000000000 & c(假如前三位为 000)说明线程池已经 SHUTDOWN
 62     private static int runStateOf(int c)     { return c & ~CAPACITY; }
 63     
 64     // 同时拿到 state workerCount
 65     private static int ctlOf(int rs, int wc) { return rs | wc; }
 66     
 67     // & 可以计算出当前工作的 workerCount
 68     private static int workerCountOf(int c)  { return c & CAPACITY; }
 69     
 70     // 线程入列
 71     public void execute(Runnable command) {
 72             if (command == null)
 73                 throw new NullPointerException();
 74        
 75                // 获取线程池 state 和 workerCount
 76                // 判断是否满足加入核心线程
 77             int c = ctl.get();
 78             if (workerCountOf(c) < corePoolSize) {
 79                 // 以核心线程的方式加入队列
 80                 if (addWorker(command, true))
 81                     return;
 82                 // 添加失败 获取最新的线程池 state 和 workerCount
 83                 c = ctl.get();
 84             }
 85             // 在运行且成功加入队列
 86             if (isRunning(c) && workQueue.offer(command)) {
 87                 int recheck = ctl.get();
 88                 //止线程池状态发生了变化, 再检查一次,不是运行状态就拒绝任务
 89                 if (!isRunning(recheck) && remove(command))
 90                     reject(command);
 91                 else if (workerCountOf(recheck) == 0)
 92                     // 加入一个 null
 93                     addWorker(null, false);
 94             }
 95             // 加入失败就拒绝任务
 96             else if (!addWorker(command, false))
 97                 reject(command);
 98         }
 99     
100     // 实际的操作
101     private boolean addWorker(Runnable FirstTask, boolean core) {
102             retry:
103             for (;;) {
104                 // 获得当前 state 和 workerCount
105                 int c = ctl.get();
106                 int rs = runStateOf(c);
107     
108                 // 大于等于 SHUTDOWN 即 SHUTDOWN, STOP TIDYING TERMINATED (SHUTDOWN需要排除 队列中不为空的情况, 这个状态不接受新任务, 需要把当前任务处理完)
109                 // Check if queue empty only if necessary.
110                 if (rs >= SHUTDOWN &&
111                     ! (rs == SHUTDOWN &&
112                        firstTask == null &&
113                        ! workQueue.iSEMpty()))
114                     return false;
115     
116                 for (;;) {
117                     // 计算 workerCount
118                     int wc = workerCountOf(c);
119                     if (wc >= CAPACITY ||
120                         wc >= (core ? corePoolSize : maximumPoolSize))
121                         return false;
122                     // 成功了就退出
123                     if (compareAndIncrementWorkerCount(c))
124                         break retry;
125                     c = ctl.get();  // Re-read ctl
126                     if (runStateOf(c) != rs)
127                         // 状态发了变化, 进行重试
128                         continue retry;
129                     // else CAS failed due to workerCount change; retry inner loop
130                 }
131             }                // 上面的逻辑, 主要是对线程(worker)数量进行加一操作(原子性),下面
132     
133             boolean workerStarted = false;
134             boolean workerAdded = false;
135             Worker w = null;
136             try {
137                 // 统一线程的名字
138                 // 设置 daemon 和 priority
139                 w = new Worker(firstTask);
140                 final Thread t = w.thread;
141                 if (t != null) {                        // 加入可重入锁
142                     final ReentrantLock mainLock = this.mainLock;
143                     mainLock.lock();
144                     try {
145                         // Recheck while holding lock.
146                         // Back out on ThreaDFactory failure or if
147                         // shut down before lock acquired.
148                         int rs = runStateOf(ctl.get());
149 
150                         // 异常检查 (rs== SHUTDOWN, firstTask必然是null, 原因是这个状态拒绝接受新任务, 这么写更严谨一些 )
151                         if (rs < SHUTDOWN ||
152                             (rs == SHUTDOWN && firstTask == null)) {                    // 检查线程状态, 165行才启动线程, 按道理线程线程状态不可能是alive 状态
153                             if (t.isAlive()) // precheck that t is startable
154                                 throw new IllegalThreadstateException();
155                             workers.add(w);
156                             int s = workers.size();
157                             if (s > largestPoolSize)
158                                 largestPoolSize = s;
159                             workerAdded = true;
160                         }
161                     } finally {
162                         mainLock.unlock();
163                     }
164                     // 添加成功 启动线程
165                     if (workerAdded) {
166                         t.start();
167                         workerStarted = true;
168                     }
169                 }
170             } finally {
171                 // 加入失败 
172                 if (! workerStarted)
173                     addWorkerFailed(w);
174             }
175             return workerStarted;
176         }
177         
178     // 加入失败 做一些扫尾清理(回滚操作)     //  1. 工作线程数量减1  2. 工作线程队列移除当前worker
179     private void addWorkerFailed(Worker w) {
180             final ReentrantLock mainLock = this.mainLock;
181             mainLock.lock();
182             try {
183                 if (w != null)
184                     workers.remove(w);
185                 // workerCount-1
186                 decrementWorkerCount();
187                 // 尝试更新状态 何为尝试,即需要满足一定条件,而不是冒然去做某事
188                 tryTerminate();
189             } finally {
190                 mainLock.unlock();
191             }
192         }

 

脚本宝典总结

以上是脚本宝典为你收集整理的线程池源码解读全部内容,希望文章能够帮你解决线程池源码解读所遇到的问题。

如果觉得脚本宝典网站内容还不错,欢迎将脚本宝典推荐好友。

本图文内容来源于网友网络收集整理提供,作为学习参考使用,版权属于原作者。
如您有任何意见或建议可联系处理。小编QQ:384754419,请注明来意。