AbstractQueuedSynchronizer理解之四(Condition)

发布时间:2019-06-03 发布网站:脚本宝典
脚本宝典收集整理的这篇文章主要介绍了AbstractQueuedSynchronizer理解之四(Condition)脚本宝典觉得挺不错的,现在分享给大家,也给大家做个参考。

什么是CondITion

Condition必须要和独占锁一起使用,独占锁代替了原来的synchronized,Condition代替了原来的Object中的监视器方法(wait, notify and notifyAll);一个Lock可以对应多个Condition,这样线程之间可以按照条件唤醒指定的线程,而不是简单的notifyAll多有的线程,使得我们多线程编程的时候可以灵活的控制线程。

独占锁和Condition最经典的配合使用就是ArrayBlockingQueue.java,典型的生产者消费者问题:

/*
 * Concurrency control uses the classic two-condition algorithm
 * found in any textBook.
 */

/** Main lock Guarding all access */
final ReentrantLock lock;

/** Condition for waiting takes */
PRivate final Condition notEmpty;

/** Condition for waiting puts */
private final Condition notFull;

这是在许多教科书中能找到的经典的双Condition算法的并发控制,需要有一个独占锁ReentrantLock,然后再定义两个Condition,notEmpty(队列不是空的)表示可以从队列中消费元素的信号条件,notFull(队列不是满的)表示可以向队列生产元素的信号条件。这两个Condition都是调用了lock.newCondition()方法实例化的。

当消费者线程调用消费方法take时:

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        //当队列的元素数量为0时,调用notEmpty.await,阻塞当前的消费线程
        while (count == 0)
            notEmpty.await();
        //dequeue中调用了notFull.signal(),通知生产者队列还没满,可以生产
        return dequeue();
    } finally {
        lock.unlock();
    }
}

当生产者线程调用生产方法put时:

public void put(E e) throws InterruptedException {
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        //当队列满时,调用notFull.await(),阻塞当前生产线程,停止生产
        while (count == items.length)
            notFull.await();
        //enqueue中调用了notEmpty.signal(),通知消费者队列里有元素,可以消费
        enqueue(e);
    } finally {
        lock.unlock();
    }
}

Condition的await

在AQS中有一个ConditionObject内部类实现了Condition接口,其中有两个成员变量:

    /** First node of condition queue. */
    private transient Node firstWaiter;
    /** Last node of condition queue. */
    private transient Node lastWaiter;

Condition也有一个node队列,firstWaiter、lastWaiter分别表示第一个和最后一个node

先看await方法:

    public final void await() throws InterruptedException {
        //如果线程设置中断标志,抛出中断异常
        if (Thread.interrupted())
            throw new InterruptedException();
        //往队列添加node
        Node node = addConditionWaiter();
        //完全释放锁,head的后继节点将被唤醒,然后被移出sync队列
        int savedstate = fullyRelease(node);
        int interruptMode = 0;
        //判断当前节点是否在sync队列中(当condition调用signal是会将该节点放入Sync队列),如果不在就park当前线程,线程在这里开始等待被signal
        while (!isOnSyncQueue(node)) {
            LockSupport.park(this);
            //发送中断时(唤醒了线程)break;checkInterruptWhileWaiting中调用了transferAfterCancelledWait(贴在下面),这个方法时检测中断是发生在signal之前还是之后
            if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                break;
        }
        //当前线程被signal后,调用acquireQueued抢占锁,如果interruptMode不为抛出异常,设置为ReiNTERRUPT
        if (acquireQueued(node, savedstate) && interruptMode != THROW_IE)
            interruptMode = REINTERRUPT;
        if (node.nextWaiter != null) // clean up if cancelled
            //从头到尾移除取消的节点
            unlinkCancelledWaiters();
        if (interruptMode != 0)
            //继续中断还是抛出异常
            reportInterruptAfterWait(interruptMode);
    }
        
final boolean transferAfterCancelledWait(Node node) {
    //首先CAS设置node状态为0,如果成功说明中断发生在signal之前(因为signal会将node状态设置为0)
    if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
        //将node入sync队列
        enq(node);
        return true;
    }
    /*
     * If we lost out to a signal(), then we can't proceed
     * until it finishes its enq().  Cancelling during an
     * incomplete transfer is both rare and transient, so just
     * spin.
     */
    //如果node不在sync队列中,yield,让出cpu
    while (!isOnSyncQueue(node))
        Thread.yield();
    //中断发生在signal后
    return false;
}

分析一下addConditionWaiter:

    private Node addConditionWaiter() {
        Node t = lastWaiter;
        // If lastWaiter is cancelled, clean out.
        //如果最后一个node被取消,清除node
        if (t != null && t.waitStatus != Node.CONDITION) {
            unlinkCancelledWaiters();
            t = lastWaiter;
        }
        //新建一个node,持有当前线程,状态为CONDITION
        Node node = new Node(Thread.currentThread(), Node.CONDITION);
        if (t == null)
            //如果尾节点为null,说明condition队列还是空的,将新建的node作为头节点
            firstWaiter = node;
        else
            //如果condition队列已经存在,将新建的node作为尾节点的next
            t.nextWaiter = node;
        //将新建node设置为尾节点
        lastWaiter = node;
        //返回新建的node
        return node;
    }

在这里我们可以看到Condition的队列是一个单链表。
看一下unlinkCancelledWaiters,Condition所有操作都是在获取锁之后执行的,所以不用考虑线程安全问题:

    private void unlinkCancelledWaiters() {
        Node t = firstWaiter;
        Node trail = null;
        while (t != null) {
            Node next = t.nextWaiter;
            if (t.waitStatus != Node.CONDITION) {
                t.nextWaiter = null;
                if (trail == null)
                    firstWaiter = next;
                else
                    trail.nextWaiter = next;
                if (next == null)
                    lastWaiter = trail;
            }
            else
                trail = t;
            t = next;
        }
    }

该方法从队列头开始往后遍历所有node,移除已经取消的node;

在新建了node后,调用了fullyRelease:

final int fullyRelease(Node node) {
    boolean failed = true;
    try {
        //保存当前的state
        int savedState = getState();
        //release(savedState)尝试释放锁,这也是为什么叫fullyRelease
        if (release(savedState)) {
            failed = false;
            //返回之前保存的state
            return savedState;
        } else {
            throw new IllegalMonitorStateException();
        }
    } finally {
        if (failed)
            //如果失败,将当前node设置为取消状态
            node.waitStatus = Node.CANCELLED;
    }
}

看一下release:

public final boolean release(int arg) {
    //尝试释放锁,这里调用的是ReentrantLock实现的tryRelease,传入的arg是当前的state,所以会释放成功,即state为0
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            //唤醒后继节点
            unparkSuccessor(h);
        return true;
    }
    return false;
}

下面的方法是判断当前节点是否在Sync队列中

final boolean isOnSyncQueue(Node node) {
    //如果当前节点状态为CONDITION或者节点前驱为null,说明该节点已经在CONDITION队列中,不在Syc队列里
    if (node.waitStatus == Node.CONDITION || node.prev == null)
        return false;
    //如果节点后继不是null,那该节点一定在Syc队列中
    if (node.next != null) // If has successor, it must be on queue
        return true;
    /*
     * node.prev can be non-null, but not yet on queue because
     * the CAS to place it on queue can fail. So we have to
     * traverse From tail to make sure it actually made it.  It
     * will always be near the tail in calls to this method, and
     * unless the CAS failed (which is unlikely), it will be
     * there, so we hardly ever traverse much.
     */
    //此时节点入列的CAS动作可能失败,所以要从尾部往前查找该节点再次确认
    return findNodeFromtail(node);
}

Condition的signal

    public final void signal() {
        //如果当前线程不是当前的独占线程,抛出异常
        if (!isHeldExclusively())
            throw new IllegalMonitorStateException();
        Node first = firstWaiter;
        if (first != null)
            //signal Condition队列的第一个节点
            DOSignal(first);
    }
    
    private void doSignal(Node first) {
        //如果transferForSignal失败(即当前节点取消)且下一个节点存在,while继续loop
        do {
            //设置第一个节点的next为firstWaiter,此时如果firstWaiter为null,说明队列空了,将lastWaiter也设置为null
            if ( (firstWaiter = first.nextWaiter) == null)
                lastWaiter = null;
            //设置第一个节点next为null,help GC
            first.nextWaiter = null;
        } while (!transferForSignal(first) &&
                 (first = firstWaiter) != null);
    }
    
    final boolean transferForSignal(Node node) {
    /*
     * If cannot change waitStatus, the node has been cancelled.
     */
     //如果为node设置状态失败,说明node被取消,返回false
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;

    /*
     * splice onto queue and try to set waitStatus of predecessor to
     * indicate that thread is (probably) waiting. If cancelled or
     * attempt to set waitStatus fails, wake up to resync (in which
     * case the waitStatus can be transiently and harMLessly wrong).
     */
    //将当前node入列sync队列,返回node的前继
    Node p = enq(node);
    int ws = p.waitStatus;
    //如果前继的状态为取消或者设置前继状态为SIGNAL失败,当前node线程unpark
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        LockSupport.unpark(node.thread);
    return true;
}

signal后,Condition第一个节点将入列sync的队列,等待抢占到锁继续执行。

总结

在一开是的例子中,假设有两个线程P,C分别代表生产者和消费者线程,生产消费元素E的队列Q容量为1。

C无限loop调用take,当C抢占到独占锁,发现Q时空的,调用notEmpty.await(),线程C释放锁并且入列notEmpty队列park,等待别的线程调用notEmpty.signal();

P无限loop调用put,当P抢占到独占锁生产了一个E,调用notEmpty.signal()通知C,然后释放了锁;

C收到signal信号,入列SYC队列,并且unpark,尝试抢占独占锁,成功获得独占锁后,消费了一个E,然后调用notFull.signal();

P生产E时发现Q已满(C还没来得及消费),调用notFull.await()线程P释放锁并且入列notFull队列park,等待notFull.signal()通知自己unpark并入列AQS队列去抢占独占锁进行生产;

脚本宝典总结

以上是脚本宝典为你收集整理的AbstractQueuedSynchronizer理解之四(Condition)全部内容,希望文章能够帮你解决AbstractQueuedSynchronizer理解之四(Condition)所遇到的问题。

如果觉得脚本宝典网站内容还不错,欢迎将脚本宝典推荐好友。

本图文内容来源于网友网络收集整理提供,作为学习参考使用,版权属于原作者。
如您有任何意见或建议可联系处理。小编QQ:384754419,请注明来意。