脚本宝典收集整理的这篇文章主要介绍了Jboss EnhancedQueueExecutor源码解读,脚本宝典觉得挺不错的,现在分享给大家,也给大家做个参考。
在JDK线程池中自带的Executor遵循一种典型的生产者,消费者队列模型,即一个统一的阻塞队列,然后一个线程数组不停地消费其中的数据。其本身的处理逻辑为 coreSize->queueSize->maxSize 的增长方式,即先尝试增加 coreSize, 然后再不断地将任务放进队列中,如果队列满了,则再尝试增加 maxSize, 直至拒绝任务。
通过一些手法可以调整策略为 coreSize->maxSize->queueSize。
本文则描述一个由 jboss-threads 中提到的 EnhancedQueueExecutor,中文为增加型队列执行器。其除支持典型的executor模型外,也同样保留如 coreSize,maxSize, queueSize 这些模型。与jdk中实现相区别的是,其本身采用单个链表来完成任务的提交和线程的执行,同时采用额外的数据来存储计数类数据. 更重要的是,其默认线程策略即 coreSize->maxSize->queueSize, 同时可以根据参数调整此策略.
创建对象与ThreadPoolExecutor类似,指定相应的参数即可,如下所示:
EnhancedQueueExecutor executor = new EnhancedQueueExecutor.Builder() .setCorePoolSize(corePoolSize) .setMaximumPoolSize(maxpoolSize) .setKeepAliveTime(Duration.ofMinutes(5)) .setMaximumQueueSize(1024) .setThreaDFactory(threadFactory) .setExceptionHandler(uncaughtExceptionHandler) .setRegisterMBean(false) .setGrowthResistance(growthResistance) //增长因子,控制新线程创建逻辑(if >= coreSize时) .build();
链表结构
暂不考虑其它特殊节点
null->TaskNode->TaskNode...TaskNode->PoolThreadNode->PoolThreadNode...PoolThreadNode->null ^ ^ | | head tail
TaskNode 表示此为1个任务节点,内部封装具体要执行的runnable任务 PoolThreadNode 表示此为1个线程节点,内部封装着具体的执行线程以及相应的状态. head 和 tail 为特殊标记节点。head 表示任务节点的头部,tail 则表示线程节点的头部(也可理解为任务节点的尾部)
整个链表可以理解为2个部分,前半部分全为任务节点,后半部分全为线程节点。仅当线程节点为等待状态(waITing)状态时,其才会被加入到链表中(以方便获取). 即可以理解为链表中后部分为等待线程列表。
在类上,相应的类定义如下
static final class TaskNode extends QNode { volatile Runnable task; //具体要执行的任务 } static abstract class PoolThreadNodeBase extends QNode{} static final class PoolThreadNode extends PoolThreadNodeBase { PRivate final Thread thread; //执行线程本身 @SupPressWarnings("unused") private volatile Runnable task; //当前正在处理的任务 }
场景一:新增任务(无等待线程)
入口均为 execute(Runnable),这里会通过函数 tryExecute 来决定相应的结果值。这里有多种情况如下所示:
在上面的第4种情况时,会返回状态码 EXE_REJECT_QUEUE_FULL, 这里会触发 handoff, 类似于jdk中的拒绝策略.
而第1种和第3种情况,则会根据 growthResistance 判断是否可以新创建线程。这里默认值为0,则表示当 < maxSize 时均可以创建,因此返回状态码 EXE_CREATE_THREAD.
这里触发操作 DOStartThread(Runnable), 其基本实现即创建新线程,然后执行之. 如下参考所示
boolean doStartThread(Runnable runnable) throws RejectedExecutionException { thread = threadFactory.newThread(new ThreadBody(runnable)); thread.start(); }
ThreadBody 用于封装执行线程的具体逻辑,而任务runnable则被认为是初始任务对象. 因此,执行线程的初始逻辑即是执行初始任务本身,如下参考所示
public void run() { final Thread currentThread = Thread.currentThread(); runningThreads.add(currentThread); doRunTask(getAndClearInitialTask()); }
以上,即会直接执行初始任务,并清除对象(为后续轮循和等待作处理)
场景二:新增任务(已存在等待的线程)
当场景一中的执行线程在执行完初始任务之后,并不会直接退出,而是进入一个任务循环。可以理解为不断地拿任务并执行。其处理逻辑如下流程所示
通过park将当前线程挂起,则对应则有unpark以恢复。
以上的流程代码如下所示(即执行线程的处理逻辑)
processingQueue: for (;;) { node = getOrAddNode(nextPoolThreadNode); if (node instanceof TaskNode) { ...//拿到任务 } else if (node == nextPoolThreadNode) { final PoolThreadNode newNode = nextPoolThreadNode; nextPoolThreadNode = new PoolThreadNode(currentThread); waitingForTask: for (;;) { Runnable task = newNode.getTask(); if (task != WAITING &amp;& task != EXIT) { //这里表示有其它线程给自己塞了任务(非主动调用) if (newNode.COMpareAndSetTask(task, ACCEPTED)) { doRunTask(task); continue processingQueue; } continue waitingForTask; } else { final long timeoutNanos = EnhancedQueueExecutor.this.timeoutNanos; long oldVal = threadstatus; //这里处理几种情况 //1 等待超时 //2 被无意间唤醒(参考 LockSupport.park) //3 继续等待 //这里即调用park实现 线程挂起 newNode.park(EnhancedQueueExecutor.this, timeoutNanos - elapsed); } Thread.interrupted(); continue waitingForTask; } // :waitingForTask }
在上面轮循中,有一个场景即是 task 字段被其它线程更新,以触发被动拿到任务的情况.这里的 其它线程即是提交任务的线程,即是在场景一中提交任务时,通过 线程节点 更新任务的处理逻辑.
在新增任务步骤中的第2个步骤即是这种情况。在提交任务中,如果发现链表中存在 线程节点 PoolThreadNode,即表示有等待线程,这里即是更新其 task 字段,并唤醒线程。相应的代码仍在 tryExecute 中,如下所示
private int tryExecute(final Runnable runnable) { QNode tailNext; TaskNode tail = this.tail; for (;;) { tailNext = tail.getNext(); ...//这里跳过所有已有的TaskNode //这里找到执行节点,则将其从链表中删除(不再等待) if (tailNext instanceof PoolThreadNode) { final QNode tailNextNext = tailNext.getNext(); if (tail.compareAndSetNext(tailNext, tailNextNext)) { PoolThreadNode consumerNode = (PoolThreadNode) tailNext; //替换相应的 task 对象为待执行任务 if (consumerNode.compareAndSetTask(WAITING, runnable)) { //挂起线程解锁 consumerNode.unpark(); return EXE_OK; } }
从上面的逻辑可以看出,执行线程在被放入链表并挂起时,其task为WAITING。而恢复时,先将节点从链表中移除(避免其它线程再重入),再将WAITING更换为新任务,再解锁。而执行线程执行完此任务后,又将重新进入轮循.
场景三: 线程轮循任务
在场景二中,已初步描述了执行线程如何处理任务,其重点就在于 getOrAddNode(PoolThreadNode), 如果链表中存在任务数据,则一定会返回 TaskNode(并移除). 逻辑可以理解为一个简单的 continue 循环,如下所示
processingQueue: for (;;) { node = getOrAddNode(nextPoolThreadNode); if (node instanceof TaskNode) { // task node was removed doRunTask(((TaskNode) node).getAndClearTask()); continue; } //其它逻辑 }
而 getOrAddNode 方法,则是不断地从head节点获取相应的数据. 因为head即表示 任务节点的起始点,这里也就表示 任务的执行是 FIFO,新的任务节点会放在原链表中TaskNode结尾(见tryExecute 跳过taskNode逻辑)
相应的代码如下所示:
private QNode getOrAddNode(PoolThreadNode nextPoolThreadNode) { TaskNode head; QNode headNext; for (;;) { head = EnhancedQueueExecutor.this.head; headNext = head.getNext(); if (headNext instanceof TaskNode) { TaskNode taskNode = (TaskNode) headNext; //这里即找到了TaskNode,调整head指针, 并返回 if (compareAndSetHead(head, taskNode)) { return taskNode; } } } }
场景四: 线程超时退出
在执行节点的处理流程中,一部分即是处理超时退出的情况。这里相应的逻辑,即是先尝试通过 park 挂起线程,但当挂起的时候超过预定时间后,则会触发超时流程. 其整个流程仍在整个轮循之内,其大概流程如下所示
退出一个线程的最正常方法,则是直接return,即当1个线程的 run 方法,return时,即表示这个线程被成功执行结束,即退出了
上面流程的代码如下所示
final long timeoutNanos = EnhancedQueueExecutor.this.timeoutNanos; long oldVal = threadStatus; if (elapsed >= timeoutNanos || task == EXIT || currentsizeof(oldVal) > maxSizeOf(oldVal)) { //进入超时流程 if (task == EXIT || isShutdownRequested(oldVal) || isAllowCoreTimeout(oldVal) || currentSizeOf(oldVal) > coreSizeOf(oldVal)) { //以下即置任务为GIVE_UP,并最终return 退出线程 if (newNode.compareAndSetTask(task, GAVE_UP)) { for (;;) { if (tryDeallocateThread(oldVal)) { runningThreads.remove(currentThread); return; } oldVal = threadStatus; } } continue waitingForTask; } else { //这里可能为core线程,不允许退出的情况。因此为 永久挂起 if (elapsed >= timeoutNanos) { newNode.park(EnhancedQueueExecutor.this); } else { newNode.park(EnhancedQueueExecutor.this, timeoutNanos - elapsed); } } } else { //Timed挂起 newNode.park(EnhancedQueueExecutor.this, timeoutNanos - elapsed); continue waitingForTask; } }
总结
以上涉及到的代码均在类 EnhancedQueueExecutor 中, 其代码可以与 AQS(AbstractQueuedSynchronizer) 相对照,对了解多线程间协作和运行有相关的帮助。在具体使用中,也可以作为Jdk Executor的一种替代. 特别是在 需要使用 优先动态调节线程大小的场景中,其是一个优先的考虑.
转载请标明出处:i flym本文地址:https://www.iflym.com/index.php/code/202012010001.htML
I am flym,the master of the site:) 查看flym的所有文章
以上是脚本宝典为你收集整理的Jboss EnhancedQueueExecutor源码解读全部内容,希望文章能够帮你解决Jboss EnhancedQueueExecutor源码解读所遇到的问题。
本图文内容来源于网友网络收集整理提供,作为学习参考使用,版权属于原作者。
如您有任何意见或建议可联系处理。小编QQ:384754419,请注明来意。