理解AQS和ReentrantLock

发布时间:2022-07-04 发布网站:脚本宝典
脚本宝典收集整理的这篇文章主要介绍了理解AQS和ReentrantLock脚本宝典觉得挺不错的,现在分享给大家,也给大家做个参考。

AQS( AbstractQueuedSynchronizer )是一个用来构建锁和同步器的框架,Lock 包中的各种锁( ReentrantLock, ReadWrITeLock), concurrent 包中的各种同步器(如 CountDownLatch, semaphore, CyclicBarrier)都是基于 AQS 来构建。AQS负责同步状态的管理,线程的排队,等待和唤醒这些底层操作,而Lock等同步组件主要专注于实现同步语义

AQS

AQS则实现了对同步状态的管理,以及对阻塞线程进行排队,等待通知等等一些底层的实现处理。AQS的核心也包括了这些方面:同步队列,独占式锁的获取和释放,共享锁的获取和释放以及可中断锁,超时等待锁获取这些特性的实现,而这些实际上则是AQS提供出来的模板方法

模板方法

AQS可重写的方法,主要返回true fasle,告诉AQS怎样判断当前同步状态是否成功获取或者是否成功释放

// 独占式获取同步状态 判断当前状态是否符合预期 再进行CAS设置同步状态
PRotected boolean tryAcquire(int arg){throw new UnsupportedoperationException();}
// 独占式释放同步状态
protected boolean tryRelease(int arg){throw new UnsupportedOPErationException();}
// 共享式获取同步状态 返回>0代表获取成功  否则失败
protected int tryAcquireShared(int arg){throw new UnsupportedOperationException();}
// 共享式释放同步状态
protected boolean tryReleaseShared(int arg){throw new UnsupportedOperationException();}
// 判断AQS是否被当前线程独占
protected boolean isHeldExclusively(){throw new UnsupportedOperationException();}
// 独占式获取同步状态,如果获取失败则插入同步队列进行等待,该方法将会调用重写的tryAcquire(int arg)方法    
public final void acquire(int arg)
// 与acquire方法相同,但在同步队列中进行等待的时候可以检测中断
public final void acquireinterruptibly(int arg)
// 在acquireInterruptibly基础上增加了超时等待功能,在超时时间内没有获得同步状态返回false
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
// 释放同步状态,该方法会唤醒在同步队列中的下一个节点  调用重写的tryRelease(int arg)方法
public final boolean release(int arg) 
  

// 共享式获取同步状态,与独占式的区别在于同一时刻有多个线程获取同步状态
public final void acquireShared(int arg)
// 在acquireShared方法基础上增加了能响应中断的功能
public final void acquireSharedInterruptibly(int arg)
// 在acquireSharedInterruptibly基础上增加了超时等待的功能
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
// 共享式释放同步状态
public final boolean releaseShared(int arg) 


// 返回同步队列上的线程集合
public final Collection<Thread> getQueuedThreads()

同步队列

AQS中维护了一个volatile int state(代表共享资)和一个FIFO线程同步队列

当共享资源被某个线程占有,其他请求该资源的线程将会阻塞,从而进入同步队列。AQS中的同步队列是个双向链表,节点类型Node,通过头尾指针管理队列

static final class Node {
  static final Node SHARED = new Node();
  static final Node EXCLUSIVE = null;
	// 节点从同步队列中取消
  static final int CANCELLED =  1;
  // 后继节点的线程处于等待状态,如果当前节点释放同步状态会通知后继节点,使得后继节点的线程能够运行;
  static final int SIGNAL    = -1;
  // 当前节点进入等待队列中
  static final int CONDITION = -2;
  // 表示下一次共享式同步状态获取将会无条件传播下去
  static final int PROPAGATE = -3;
	// 节点状态
  volatile int waitStatus;
  volatile Node prev;
  volatile Node next;
  // 加入同步队列的线程引用
  volatile Thread thread;
  // 等待队列中的下一个节点
  Node nextWaiter;
}

关键方法

独占锁的获取

public final void acquire(int arg) {
    if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

加锁成功才会返回,获取失败就将当前线程加入同步队列 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)

private Node addWaiter(Node mode) {
  Node node = new Node(Thread.currentThread(), mode);
  Node pred = tail;
  if (pred != null) {
    node.prev = pred;
    // 如果其他线程没有插入,尾插法插入
    if (compareAndSetTail(pred, node)) {
      pred.next = node;
      return node;
    }
  }
  enq(node);
  return node;
}

private Node enq(final Node node) {
  for (;;) {
    Node t = tail;
    if (t == null) { // 链式队列的头结点的初始化
      if (compareAndSetHead(new Node()))
        tail = head;
    } else {
      // 如果CAS尾插入节点失败后,负责自旋进行重试
      node.prev = t;
      if (compareAndSetTail(t, node)) {
        t.next = node;
        return t;
      }
    }
  }
}

进入队列后怎么去尝试获得锁:

final boolean acquireQueued(final Node node, int arg) {
  boolean failed = true;
  try {
    boolean interrupted = false;
    for (;;) {
      // 返回node.prev
      final Node p = node.predecessor();
      // 如果是头结点 拿锁
      if (p == head &amp;& tryAcquire(arg)) {
        // 把当前节点设置为头结点  清除thread 和 prev属性
        setHead(node);
        p.next = null; // help GC
        failed = false;
        return interrupted;
      }
      // 如果不是头结点 或者 拿锁失败 判断是否应该park
      if (shouldParkAfterFailedAcquire(p, node) &&
          parkAndCheckInterrupt())
        interrupted = true;
    }
  } finally {
    if (failed)
      cancelAcquire(node);
  }
}
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
  // node.prev.waitStatus
  int ws = pred.waitStatus;
  // 前一个节点状态是SIGNAL则阻塞当前节点
  if (ws == Node.SIGNAL)
    return true;
  // 如果前一个节点取消了,更新node的prev的节点为 向前寻找的第一个非取消状态的节点
  if (ws > 0) {
    do {
      node.prev = pred = pred.prev;
    } while (pred.waitStatus > 0);
    pred.next = node;
  } else {
    // 如果前一个节点是INITIAL,设置前一个节点的状态为SIGNAL
    // 如果设置失败了,返回false,for (;;)死循环中会继续重试设置,直到前一个节点为SIGNAL
    compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
  }
  return false;
}


private final boolean parkAndCheckInterrupt() {
  // 阻塞线程
  LockSupport.park(this);
  // 获取线程的中断状态,清空中断标志位
  return Thread.interrupted();
}

interrupted()方法的作用,该方法是获取线程的中断状态,并复位,也就是说,如果当前线程是中断状态,则第一次调用该方法获取的是true,第二次则是false。而isInterrupted()方法则只是返回线程的中断状态,不执行复位操作。

park与wait的作用类似,但是对中断状态的处理并不相同。如果当前线程不是中断的状态,park与wait的效果是一样的;如果一个线程是中断的状态,这时执行wait方法会报java.lang.IllegalMonitorStateException,而执行park时并不会报异常,而是直接返回,下一次循环调用park就能阻塞了

如果在循环的过程中出现了异常,则执行cancelAcquire方法,用于将该节点标记为取消状态

独占锁的释放

public final boolean release(int arg) {
  if (tryRelease(arg)) {
    Node h = head;
    // 如果头结点的状态不为INITIAL,为SIGNAL
    if (h != null && h.waitStatus != 0)
      unparkSuccessor(h);
    return true;
  }
  return false;
}
private void unparkSuccessor(Node node) {
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    /*
     * Thread to unpark is held in successor, which is normally
     * just the next node.  But if cancelled or apparently null,
     * traverse backwards From tail to find the actual
     * non-cancelled successor.
     */
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
      	// 从队列尾部开始找 不是取消状态 在node节点之后的最后一个节点 (即node右测第一个不为取消状态的节点)
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
      	// 唤醒线程
        LockSupport.unpark(s.thread);
}

可中断式获取锁

public final void acquireInterruptibly(int arg)
  throws InterruptedException {
  if (Thread.interrupted())
    throw new InterruptedException();
  if (!tryAcquire(arg))
    // 拿锁失败了
    doAcquireInterruptibly(arg);
}

private void doAcquireInterruptibly(int arg) throws InterruptedException {
  // 将节点插入到同步队列中
  final Node node = addWaiter(Node.EXCLUSIVE);
  boolean failed = true;
  try {
    for (;;) {
      // 获取锁出队
      final Node p = node.predecessor();
      if (p == head && tryAcquire(arg)) {
        setHead(node);
        p.next = null; // help GC
        failed = false;
        return;
      }
      if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
        // 线程中断抛异常
        throw new InterruptedException();
    }
  } finally {
    if (failed)
      cancelAcquire(node);
  }
}

超时等待式获取锁

public final boolean tryAcquireNanos(int arg, long nanosTimeout)
  throws InterruptedException {
  if (Thread.interrupted())
    throw new InterruptedException();
  return tryAcquire(arg) || doAcquireNanos(arg, nanosTimeout);
}
// 实现超时等待的效果
private boolean doAcquireNanos(int arg, long nanosTimeout)
  throws InterruptedException {
  if (nanosTimeout <= 0L)
    return false;
  // 计算出deadline
  final long deadline = System.nanoTime() + nanosTimeout;
  final Node node = addWaiter(Node.EXCLUSIVE);
  boolean failed = true;
  try {
    for (;;) {
      final Node p = node.predecessor();
      if (p == head && tryAcquire(arg)) {
        setHead(node);
        p.next = null; // help GC
        failed = false;
        // 在超时时间内,当前线程成功获取了锁 返回true
        return true;
      }
      nanosTimeout = deadline - System.nanoTime();
      // 超时时间结束,仍未获得锁返回false
      if (nanosTimeout <= 0L)
        return false;
      if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > spinForTimeoutThreshold)
        LockSupport.parkNanos(this, nanosTimeout);
      if (Thread.interrupted())
        // 当前线程在超时时间内被中断
        throw new InterruptedException();
    }
  } finally {
    if (failed)
      cancelAcquire(node);
  }
}

共享锁的获取

public final void acquireShared(int arg) {
  if (tryAcquireShared(arg) < 0)
    doAcquireShared(arg);
}

private void doAcquireShared(int arg) {
  // 插入队尾
  final Node node = addWaiter(Node.SHARED);
  boolean failed = true;
  try {
    boolean interrupted = false;
    for (;;) {
      // node.prev
      final Node p = node.predecessor();
      if (p == head) {
        int r = tryAcquireShared(arg);
        if (r >= 0) {
          // 当该节点的前驱节点是头结点 且 成功获取同步状态
          setHeadAndPropagate(node, r);
          p.next = null; // help GC
          if (interrupted)
            selfInterrupt();
          failed = false;
          return;
        }
      }
      if (shouldParkAfterFailedAcquire(p, node) &&
          parkAndCheckInterrupt())
        interrupted = true;
    }
  } finally {
    if (failed)
      cancelAcquire(node);
  }
}


private void setHeadAndPropagate(Node node, int propagate) {
  Node h = head;
  // 设置node为head
  setHead(node);
  if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) {
    Node s = node.next;
    if (s == null || s.isShared())
      doReleaseShared();
  }
}

共享锁的释放

public final boolean releaseShared(int arg) {
  if (tryReleaseShared(arg)) {
    // 成功释放同步状态之后
    doReleaseShared();
    return true;
  }
  return false;
}

private void doReleaseShared() {
  for (;;) {
    Node h = head;
    if (h != null && h != tail) {
      int ws = h.waitStatus;
      if (ws == Node.SIGNAL) {
        if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
          continue;
        unparkSuccessor(h);
      }
      else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
        continue;
    }
    if (h == head)
      break;
  }
}

在共享式锁的释放过程中,对于能够支持多个线程同时访问的并发组件,必须保证多个线程能够安全的释放同步状态,这里采用的CAS保证,当CAS操作失败continue,在下一次循环中进行重试。

LockSupprot是线程的阻塞原语,用来阻塞线程和唤醒线程。

先调用park() 当前线程阻塞,再调用unpark(Thread thread) 唤醒阻塞的线程

先调用unpark(Thread thread) ,再调用park() 会直接返回

// 阻塞当前线程,如果调用unpark方法或者当前线程被中断,从能从park()方法中返回 blocker用于记录park信息
public static void park(Object blocker) {
  Thread t = Thread.currentThread();
  setBlocker(t, blocker);
  UNSAFE.park(false, 0L);
  setBlocker(t, null);
}
// 唤醒处于阻塞状态的指定线程
public static void unpark(Thread thread) {
  if (thread != null)
    UNSAFE.unpark(thread);
}

synchronized致使线程阻塞,线程会进入到BLOCKED状态,而调用LockSupport方法阻塞线程会致使线程进入到WAITING状态。

Lock接口

在Lock接口出现之前,java程序主要是靠synchronized关键字实现锁功能的,lock接口,它提供了与synchronized一样的锁功能。虽然它失去了像synchronize关键字隐式加锁解锁的便捷性,但是却拥有了锁获取和释放的可操作性,可中断的获取锁以及超时获取锁等多种synchronized关键字所不具备的同步特性。ReentrantLock还支持公平锁和非公平锁两种方式

public interface Lock {
  void lock();
  void lockInterruptibly() throws InterruptedException;
  boolean tryLock();// 非阻塞式响应中断能立即返回,获取锁放回true反之返回fasle
  boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
  void unlock();
  Condition newCondition();
}

ReentrantLock

理解AQS和ReentrantLock

ReentrantLock 里有private final Sync sync,Sync是个继承了AbstractQueuedSynchronizer的静态内部类,内部又有NonfairSync和FairSync继承了Sync,在构造器里决定sync的类型 sync = fair ? new FairSync() : new NonfairSync(),影响之后上锁的行为

要想支持重入性,就要解决两个问题:1. 在线程获取锁的时候,如果已经获取锁的线程是当前线程的话则直接再次获取成功;2. 由于锁会被获取n次,那么只有锁在被释放同样的n次之后,该锁才算是完全释放成功。

实现重入

static final class NonfairSync extends Sync {
  final void lock() {
    // 如果处于空闲 上锁
    if (compareAndSetState(0, 1))
      // 设置当前线程为独占线程
      setExclusiveOwnerThread(Thread.currentThread());
    else
      // 竞争锁
      acquire(1);
  }

  protected final boolean tryAcquire(int acquires) {
    return nonfairTryAcquire(acquires);
  }
}

abstract static class Sync extends AbstractQueuedSynchronizer {
  final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
      // 如果该锁未被任何线程占有,该锁能被当前线程获取
      if (compareAndSetState(0, acquires)) {
        setExclusiveOwnerThread(current);
        return true;
      }
    }
    // 若被占有,检查占有线程是否是当前线程 实现重入
    else if (current == getExclusiveOwnerThread()) {
      // 再次获取
      int nextc = c + acquires;
      if (nextc < 0) // overflow
        throw new Error("Maximum lock count exceeded");
      setState(nextc);
      return true;
    }
    return false;
  }

  protected final boolean tryRelease(int releases) {
    int c = getState() - releases;
    if (Thread.currentThread() != getExclusiveOwnerThread())
      throw new IllegalMonitorStateException();
    boolean free = false;
    // 只有当同步状态为0时,锁成功被释放,返回true
    if (c == 0) {
      free = true;
      setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
  }
}

如何支持公平锁,在于tryAcquire方法

static final class FairSync extends Sync {

  final void lock() {
    acquire(1);
  }

  protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
      // 相比于非公平锁,增加了hasQueuedPredecessors()判断
      if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) {
        setExclusiveOwnerThread(current);
        return true;
      }
    }
    else if (current == getExclusiveOwnerThread()) {
      int nextc = c + acquires;
      if (nextc < 0)
        throw new Error("Maximum lock count exceeded");
      setState(nextc);
      return true;
    }
    return false;
  }
}

// 用来判断当前节点在同步队列中是否有前驱节点  如果有前驱节点说明有线程比当前线程更早的请求资源,为了公平,请求失败
public final boolean hasQueuedPredecessors() {
  Node t = tail;
  Node h = head;
  Node s;
  return h != t && ((s = h.next) == null || s.thread != Thread.currentThread());
}

非公平锁有可能导致其他线程永远无法获取到锁,造成“饥饿”现象

公平锁为了保证时间上的绝对顺序,需要频繁的上下文切换,而非公平锁会降低一定的上下文切换,降低性能开销。因此,ReentrantLock默认选择的是非公平锁,则是为了减少一部分上下文切换,保证了系统更大的吞吐量

ReentrantReadWriteLock

在一些业务场景中读多写少,依然使用独占锁的话,并发读取会出现性能瓶颈的地方。针对这种情况,java还提供了另外一个实现Lock接口的ReentrantReadWriteLock。读写锁允许同一时刻被多个读线程访问,但是在写线程访问时,所有的读线程和其他的写线程都会被阻塞。ReentrantReadWriteLock包括WirteLock和ReadLock的。

读写锁是怎样实现分别记录读写状态的?

AQS中只有一个记录锁状态的state变量,分成两

static final int SHARED_SHIFT   = 16;
static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;// 0x0000FFFF
// 高16位记录读锁状态
static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }
// 低16位记录写锁状态
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }

写锁

protected final boolean tryAcquire(int acquires) {
  Thread current = Thread.currentThread();
  int c = getState();
  int w = exclusiveCount(c);
  if (c != 0) {
    // (Note: if c != 0 and w == 0 then shared count != 0)
    if (w == 0 || current != getExclusiveOwnerThread())
      // 读锁已被读线程获取 或 当前线程不是已经获取写锁的线程的话,获取写锁失败       
      return false;
    // 超过计数范围
    if (w + exclusiveCount(acquires) > MAX_COUNT)
      throw new Error("Maximum lock count exceeded");
    // 当前线程获取写锁,支持可重复加锁
    setState(c + acquires);
    return true;
  }
  if (writerShouldBlock() ||
      !compareAndSetState(c, c + acquires))
    return false;
  // 锁未被任何线程获取,当前线程可获取写锁
  setExclusiveOwnerThread(current);
  return true;
}

protected final boolean tryRelease(int releases) {
  if (!isHeldExclusively())
    throw new IllegalMonitorStateException();
  int nextc = getState() - releases;
  boolean free = exclusiveCount(nextc) == 0;
  if (free)
    // 如果写状态为0则释放写锁
    setExclusiveOwnerThread(null);
  // 更新同步状态
  setState(nextc);
  return free;
}

读锁

protected final int tryAcquireShared(int unused) {
  Thread current = Thread.currentThread();
  int c = getState();
  if (exclusiveCount(c) != 0 &&
      getExclusiveOwnerThread() != current)
    // 如果有非当前线程获得写锁,则获取读锁失败
    return -1;
  int r = sharedCount(c);
  if (!readerShouldBlock() &&
      r < MAX_COUNT &&
      compareAndSetState(c, c + SHARED_UNIT)) {
    if (r == 0) {
      FirstReader = current;
      firstReaderHoldCount = 1;
    } else if (firstReader == current) {
      firstReaderHoldCount++;
    } else {
      HoldCounter rh = cachedHoldCounter;
      if (rh == null || rh.tid != getThreadId(current))
        cachedHoldCounter = rh = readHolds.get();
      else if (rh.count == 0)
        readHolds.set(rh);
      rh.count++;
    }
    return 1;
  }
  // 处理CAS操作失败的自旋
  return fullTryAcquireShared(current);
}


protected final boolean tryReleaseShared(int unused) {
  Thread current = Thread.currentThread();
  if (firstReader == current) {
    // assert firstReaderHoldCount > 0;
    if (firstReaderHoldCount == 1)
      firstReader = null;
    else
      firstReaderHoldCount--;
  } else {
    HoldCounter rh = cachedHoldCounter;
    if (rh == null || rh.tid != getThreadId(current))
      rh = readHolds.get();
    int count = rh.count;
    if (count <= 1) {
      readHolds.remove();
      if (count <= 0)
        throw unmatchedUnlockException();
    }
    --rh.count;
  }
  for (;;) {
    int c = getState();
    int nextc = c - SHARED_UNIT;
    if (compareAndSetState(c, nextc))
      // Releasing the read lock has no effect on readers,
      // but it may allow waiting writers to proceed if
      // both read and write locks are now free.
      return nextc == 0;
  }
}

写锁降级成读锁

读写锁支持锁降级,遵循按照获取写锁,获取读锁再释放写锁的次序,写锁能够降级成为读锁

class CachedData {
  Object data;
  volatile boolean cacheValid;
  final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();

  void processCachedData() {
    rwl.readLock().lock();
    if (!cacheValid) {
      // 获得write lock前必须释放read lock
      rwl.readLock().unlock();
      rwl.writeLock().lock();
      try {
        // Recheck state because another thread might have
        // acquired write lock and changed state before we did.
        if (!cacheValid) {
          data = ...
          cacheValid = true;
        }
        // 通过在释放写锁之前获取读锁 实现降级
        rwl.readLock().lock();
      } finally {
        rwl.writeLock().unlock(); // Unlock write, still hold read
      }
    }

    try {
      use(data);
    } finally {
      rwl.readLock().unlock();
    }
  }
}

Condition

Condition 和 Object的wait和notify/notify区别

  1. Condition能够支持不响应中断,而通过使用Object方式不支持;
  2. Condition可以精准的对多个不同条件进行控制,能够支持多个等待队列(new 多个Condition对象),而Object方式只能支持一个;
  3. Condition能够支持超时时间的设置,而Object不支持

参照Object的wait和notify/notifyAll方法,Condition也提供了同样的方法:

public interface Condition {
  // 当前线程进入等待状态,如果其他线程调用condition的signal或者signalAll方法并且当前线程获取Lock从await方法返回,如果在等待状态中被中断会抛出被中断异常;
  void await() throws InterruptedException;
  void awaitUninterruptibly();
  // 当前线程进入等待状态直到被通知,中断或者超时
  long awaitNanos(long nanosTimeout) throws InterruptedException;
  // 自定义时间单位
  boolean await(long time, TimeUnit unit) throws InterruptedException;
  // 当前线程进入等待状态直到被通知,中断或者到了某个时间 
  boolean awaitUntil(Date deadline) throws InterruptedException;
  // 唤醒一个等待在condition上的线程,将该线程从等待队列中转移到同步队列中,如果在同步队列中能够竞争到Lock则可以从等待方法中返回
  void signal();
  // 唤醒所有等待在condition上的线程
  void signalAll();
}

要想能够深入的掌握condition还是应该知道它的实现原理,现在我们一起来看看condition的源码。创建一个condition对象是通过lock.newCondition(),而这个方法实际上是会new出一个ConditionObject对象,该类是AQS的一个内部类。

public class ConditionObject implements Condition, java.io.Serializable {
  private static final long serialVersionUID = 1173984872572414699L;
	// 指向队首节点
  private transient Node firstWaiter;
  // 指向队尾节点
  private transient Node lastWaiter;
}

这样我们就可以看出来ConditionObject通过持有等待队列的头尾指针来管理等待队列。主要注意的是Node类复用了在AQS中的Node类

// 后继节点  说明等待队列是一个单向队列  不带头结点
Node nextWaiter;

lock.newCondition()方法创建多个ConditionObject对象,ConditionObject实现了Condition接口,一个lock可以持有多个等待队列。

await 原理

释放当前线程的锁,阻塞,只有当调用了signal恢复运行,得到锁成功后才会返回

该线程能够从await()方法返回的话一定是该线程获取了与condition相关联的lock

public final void await() throws InterruptedException {
  if (Thread.interrupted())
    throw new InterruptedException();
  // 将当前线程包装成Node,尾插入到等待队列中
  Node node = addConditionWaiter();
  // 释放当前线程所占用的lock,调用了release
  int savedstate = fullyRelease(node);
  int interruptMode = 0;
  // isOnSyncQueue 判断当前节点是否在同步队列中
  while (!isOnSyncQueue(node)) {
    // 当前线程进入到等待状态
    LockSupport.park(this);
    // 检测中断
    if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
      break;
  }
  // 获取到lock返回true
  if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
    interruptMode = REINTERRUPT;
  if (node.nextWaiter != null) // clean up if cancelled
    unlinkCancelledWaiters();
  if (interruptMode != 0)
    // 处理被中断的情况
    reportInterruptAfterWait(interruptMode);
}

private Node addConditionWaiter() {
  Node t = lastWaiter;
  if (t != null && t.waitStatus != Node.CONDITION) {
    // 清理队列中取消状态的节点
    unlinkCancelledWaiters();
    t = lastWaiter;
  }
  // 当前线程包装成Node
  Node node = new Node(Thread.currentThread(), Node.CONDITION);
  // 插入尾部
  if (t == null)
    firstWaiter = node;
  else
    t.nextWaiter = node;
  lastWaiter = node;
  // 返回新创建的节点
  return node;
}

signal方法

将等待队列头节点移动到同步队列中

// 把当前节点移动到同步队列 ,直到获得了lock后才会从await方法返回
public final void signal() {
  if (!isHeldExclusively())
    // 如果没有获取lock会直接抛出异常
    throw new IllegalMonitorStateException();
  // 获取等待队列中第一个节点
  Node first = firstWaiter;
  if (first != null)
    DOSignal(first);
}
// ReentrantLock#isHeldExclusively 判断当前线程是否占有锁
protected final boolean isHeldExclusively() {
  return getExclusiveOwnerThread() == Thread.currentThread();
}

private void doSignal(Node first) {
  do {
    if ( (firstWaiter = first.nextWaiter) == null)
      lastWaiter = null;
    // 将头结点从等待队列中移除
    first.nextWaiter = null;
  } while (!transferForSignal(first) && (first = firstWaiter) != null);
}

// 处理头结点
final boolean transferForSignal(Node node) {
  // 更新状态为0
  if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
    return false;
  // 将该节点移入到同步队列中去
  Node p = enq(node);
  int ws = p.waitStatus;
  if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
    LockSupport.unpark(node.thread);
  return true;
}

await 从 同步队列头插入等待对列尾部

signal从 等待队列头插入同步队列尾部

实现交替打印

public class Demo {
    private static ReentrantLock lock = new ReentrantLock();
    private static Condition condition = lock.newCondition();
    private static volatile boolean flag = false;

    static volatile int num=1;

    public static void main(String[] args) {
        Thread thread1 = new Thread(new Print(0));
        thread1.start();
        Thread thread2 = new Thread(new Print(1));
        thread2.start();
    }

    static class Print implements Runnable {

        int x;

        public Print(int x) {
            this.x = x;
        }

        @override
        public void run() {
            lock.lock();
            try {
                while (num<=100){
                    if (num%2==x) {
                        System.out.println(Thread.currentThread().getName() + "当前条件不满足等待");
                        try {
                            condition.await();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    System.out.println(Thread.currentThread().getName() + "num:"+num);
                    num++;
                    condition.signal();
                }
            } finally {
                lock.unlock();
            }
        }
    }
}

参考资料

深入理解AbstractQueuedSynchronizer(一

深入理解AbstractQueuedSynchronizer(二)

深入理解AbstractQueuedSynchronizer(三

脚本宝典总结

以上是脚本宝典为你收集整理的理解AQS和ReentrantLock全部内容,希望文章能够帮你解决理解AQS和ReentrantLock所遇到的问题。

如果觉得脚本宝典网站内容还不错,欢迎将脚本宝典推荐好友。

本图文内容来源于网友网络收集整理提供,作为学习参考使用,版权属于原作者。
如您有任何意见或建议可联系处理。小编QQ:384754419,请注明来意。