脚本宝典收集整理的这篇文章主要介绍了理解AQS和ReentrantLock,脚本宝典觉得挺不错的,现在分享给大家,也给大家做个参考。
AQS( AbstractQueuedSynchronizer )是一个用来构建锁和同步器的框架,Lock 包中的各种锁( ReentrantLock, ReadWrITeLock), concurrent 包中的各种同步器(如 CountDownLatch, semaphore, CyclicBarrier)都是基于 AQS 来构建。AQS负责同步状态的管理,线程的排队,等待和唤醒这些底层操作,而Lock等同步组件主要专注于实现同步语义
AQS则实现了对同步状态的管理,以及对阻塞线程进行排队,等待通知等等一些底层的实现处理。AQS的核心也包括了这些方面:同步队列,独占式锁的获取和释放,共享锁的获取和释放以及可中断锁,超时等待锁获取这些特性的实现,而这些实际上则是AQS提供出来的模板方法
AQS可重写的方法,主要返回true fasle,告诉AQS怎样判断当前同步状态是否成功获取或者是否成功释放
// 独占式获取同步状态 判断当前状态是否符合预期 再进行CAS设置同步状态
PRotected boolean tryAcquire(int arg){throw new UnsupportedoperationException();}
// 独占式释放同步状态
protected boolean tryRelease(int arg){throw new UnsupportedOPErationException();}
// 共享式获取同步状态 返回>0代表获取成功 否则失败
protected int tryAcquireShared(int arg){throw new UnsupportedOperationException();}
// 共享式释放同步状态
protected boolean tryReleaseShared(int arg){throw new UnsupportedOperationException();}
// 判断AQS是否被当前线程独占
protected boolean isHeldExclusively(){throw new UnsupportedOperationException();}
// 独占式获取同步状态,如果获取失败则插入同步队列进行等待,该方法将会调用重写的tryAcquire(int arg)方法
public final void acquire(int arg)
// 与acquire方法相同,但在同步队列中进行等待的时候可以检测中断
public final void acquireinterruptibly(int arg)
// 在acquireInterruptibly基础上增加了超时等待功能,在超时时间内没有获得同步状态返回false
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
// 释放同步状态,该方法会唤醒在同步队列中的下一个节点 调用重写的tryRelease(int arg)方法
public final boolean release(int arg)
// 共享式获取同步状态,与独占式的区别在于同一时刻有多个线程获取同步状态
public final void acquireShared(int arg)
// 在acquireShared方法基础上增加了能响应中断的功能
public final void acquireSharedInterruptibly(int arg)
// 在acquireSharedInterruptibly基础上增加了超时等待的功能
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
// 共享式释放同步状态
public final boolean releaseShared(int arg)
// 返回同步队列上的线程集合
public final Collection<Thread> getQueuedThreads()
AQS中维护了一个volatile int state
(代表共享资源)和一个FIFO
线程同步队列
当共享资源被某个线程占有,其他请求该资源的线程将会阻塞,从而进入同步队列。AQS中的同步队列是个双向链表,节点类型Node,通过头尾指针管理队列
static final class Node {
static final Node SHARED = new Node();
static final Node EXCLUSIVE = null;
// 节点从同步队列中取消
static final int CANCELLED = 1;
// 后继节点的线程处于等待状态,如果当前节点释放同步状态会通知后继节点,使得后继节点的线程能够运行;
static final int SIGNAL = -1;
// 当前节点进入等待队列中
static final int CONDITION = -2;
// 表示下一次共享式同步状态获取将会无条件传播下去
static final int PROPAGATE = -3;
// 节点状态
volatile int waitStatus;
volatile Node prev;
volatile Node next;
// 加入同步队列的线程引用
volatile Thread thread;
// 等待队列中的下一个节点
Node nextWaiter;
}
public final void acquire(int arg) {
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
加锁成功才会返回,获取失败就将当前线程加入同步队列 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
if (pred != null) {
node.prev = pred;
// 如果其他线程没有插入,尾插法插入
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // 链式队列的头结点的初始化
if (compareAndSetHead(new Node()))
tail = head;
} else {
// 如果CAS尾插入节点失败后,负责自旋进行重试
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
进入队列后怎么去尝试获得锁:
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// 返回node.prev
final Node p = node.predecessor();
// 如果是头结点 拿锁
if (p == head && tryAcquire(arg)) {
// 把当前节点设置为头结点 清除thread 和 prev属性
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 如果不是头结点 或者 拿锁失败 判断是否应该park
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// node.prev.waitStatus
int ws = pred.waitStatus;
// 前一个节点状态是SIGNAL则阻塞当前节点
if (ws == Node.SIGNAL)
return true;
// 如果前一个节点取消了,更新node的prev的节点为 向前寻找的第一个非取消状态的节点
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 如果前一个节点是INITIAL,设置前一个节点的状态为SIGNAL
// 如果设置失败了,返回false,for (;;)死循环中会继续重试设置,直到前一个节点为SIGNAL
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
private final boolean parkAndCheckInterrupt() {
// 阻塞线程
LockSupport.park(this);
// 获取线程的中断状态,清空中断标志位
return Thread.interrupted();
}
interrupted()方法的作用,该方法是获取线程的中断状态,并复位,也就是说,如果当前线程是中断状态,则第一次调用该方法获取的是
true
,第二次则是false
。而isInterrupted()方法则只是返回线程的中断状态,不执行复位操作。park与wait的作用类似,但是对中断状态的处理并不相同。如果当前线程不是中断的状态,park与wait的效果是一样的;如果一个线程是中断的状态,这时执行wait方法会报
java.lang.IllegalMonitorStateException
,而执行park时并不会报异常,而是直接返回,下一次循环调用park就能阻塞了
如果在循环的过程中出现了异常,则执行cancelAcquire方法,用于将该节点标记为取消状态
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
// 如果头结点的状态不为INITIAL,为SIGNAL
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards From tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
// 从队列尾部开始找 不是取消状态 在node节点之后的最后一个节点 (即node右测第一个不为取消状态的节点)
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
// 唤醒线程
LockSupport.unpark(s.thread);
}
public final void acquireInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg))
// 拿锁失败了
doAcquireInterruptibly(arg);
}
private void doAcquireInterruptibly(int arg) throws InterruptedException {
// 将节点插入到同步队列中
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
// 获取锁出队
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
// 线程中断抛异常
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquire(arg) || doAcquireNanos(arg, nanosTimeout);
}
// 实现超时等待的效果
private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (nanosTimeout <= 0L)
return false;
// 计算出deadline
final long deadline = System.nanoTime() + nanosTimeout;
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
// 在超时时间内,当前线程成功获取了锁 返回true
return true;
}
nanosTimeout = deadline - System.nanoTime();
// 超时时间结束,仍未获得锁返回false
if (nanosTimeout <= 0L)
return false;
if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if (Thread.interrupted())
// 当前线程在超时时间内被中断
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
private void doAcquireShared(int arg) {
// 插入队尾
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// node.prev
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
// 当该节点的前驱节点是头结点 且 成功获取同步状态
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head;
// 设置node为head
setHead(node);
if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
// 成功释放同步状态之后
doReleaseShared();
return true;
}
return false;
}
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue;
unparkSuccessor(h);
}
else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue;
}
if (h == head)
break;
}
}
在共享式锁的释放过程中,对于能够支持多个线程同时访问的并发组件,必须保证多个线程能够安全的释放同步状态,这里采用的CAS保证,当CAS操作失败continue,在下一次循环中进行重试。
LockSupprot是线程的阻塞原语,用来阻塞线程和唤醒线程。
先调用park() 当前线程阻塞,再调用unpark(Thread thread) 唤醒阻塞的线程
先调用unpark(Thread thread) ,再调用park() 会直接返回
// 阻塞当前线程,如果调用unpark方法或者当前线程被中断,从能从park()方法中返回 blocker用于记录park信息 public static void park(Object blocker) { Thread t = Thread.currentThread(); setBlocker(t, blocker); UNSAFE.park(false, 0L); setBlocker(t, null); } // 唤醒处于阻塞状态的指定线程 public static void unpark(Thread thread) { if (thread != null) UNSAFE.unpark(thread); }
synchronized致使线程阻塞,线程会进入到BLOCKED状态,而调用LockSupport方法阻塞线程会致使线程进入到WAITING状态。
在Lock接口出现之前,java程序主要是靠synchronized关键字实现锁功能的,lock接口,它提供了与synchronized一样的锁功能。虽然它失去了像synchronize关键字隐式加锁解锁的便捷性,但是却拥有了锁获取和释放的可操作性,可中断的获取锁以及超时获取锁等多种synchronized关键字所不具备的同步特性。ReentrantLock还支持公平锁和非公平锁两种方式
public interface Lock {
void lock();
void lockInterruptibly() throws InterruptedException;
boolean tryLock();// 非阻塞式响应中断能立即返回,获取锁放回true反之返回fasle
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
void unlock();
Condition newCondition();
}
ReentrantLock 里有private final Sync sync
,Sync是个继承了AbstractQueuedSynchronizer的静态内部类,内部又有NonfairSync和FairSync继承了Sync,在构造器里决定sync的类型 sync = fair ? new FairSync() : new NonfairSync(),影响之后上锁的行为
要想支持重入性,就要解决两个问题:1. 在线程获取锁的时候,如果已经获取锁的线程是当前线程的话则直接再次获取成功;2. 由于锁会被获取n次,那么只有锁在被释放同样的n次之后,该锁才算是完全释放成功。
实现重入
static final class NonfairSync extends Sync {
final void lock() {
// 如果处于空闲 上锁
if (compareAndSetState(0, 1))
// 设置当前线程为独占线程
setExclusiveOwnerThread(Thread.currentThread());
else
// 竞争锁
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
abstract static class Sync extends AbstractQueuedSynchronizer {
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 如果该锁未被任何线程占有,该锁能被当前线程获取
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 若被占有,检查占有线程是否是当前线程 实现重入
else if (current == getExclusiveOwnerThread()) {
// 再次获取
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
// 只有当同步状态为0时,锁成功被释放,返回true
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
}
如何支持公平锁,在于tryAcquire方法
static final class FairSync extends Sync {
final void lock() {
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 相比于非公平锁,增加了hasQueuedPredecessors()判断
if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
// 用来判断当前节点在同步队列中是否有前驱节点 如果有前驱节点说明有线程比当前线程更早的请求资源,为了公平,请求失败
public final boolean hasQueuedPredecessors() {
Node t = tail;
Node h = head;
Node s;
return h != t && ((s = h.next) == null || s.thread != Thread.currentThread());
}
非公平锁有可能导致其他线程永远无法获取到锁,造成“饥饿”现象。
公平锁为了保证时间上的绝对顺序,需要频繁的上下文切换,而非公平锁会降低一定的上下文切换,降低性能开销。因此,ReentrantLock默认选择的是非公平锁,则是为了减少一部分上下文切换,保证了系统更大的吞吐量。
在一些业务场景中读多写少,依然使用独占锁的话,并发读取会出现性能瓶颈的地方。针对这种情况,java还提供了另外一个实现Lock接口的ReentrantReadWriteLock。读写锁允许同一时刻被多个读线程访问,但是在写线程访问时,所有的读线程和其他的写线程都会被阻塞。ReentrantReadWriteLock包括WirteLock和ReadLock的。
AQS中只有一个记录锁状态的state变量,分成两半
static final int SHARED_SHIFT = 16;
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;// 0x0000FFFF
// 高16位记录读锁状态
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
// 低16位记录写锁状态
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
protected final boolean tryAcquire(int acquires) {
Thread current = Thread.currentThread();
int c = getState();
int w = exclusiveCount(c);
if (c != 0) {
// (Note: if c != 0 and w == 0 then shared count != 0)
if (w == 0 || current != getExclusiveOwnerThread())
// 读锁已被读线程获取 或 当前线程不是已经获取写锁的线程的话,获取写锁失败
return false;
// 超过计数范围
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// 当前线程获取写锁,支持可重复加锁
setState(c + acquires);
return true;
}
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
// 锁未被任何线程获取,当前线程可获取写锁
setExclusiveOwnerThread(current);
return true;
}
protected final boolean tryRelease(int releases) {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
int nextc = getState() - releases;
boolean free = exclusiveCount(nextc) == 0;
if (free)
// 如果写状态为0则释放写锁
setExclusiveOwnerThread(null);
// 更新同步状态
setState(nextc);
return free;
}
protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread();
int c = getState();
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
// 如果有非当前线程获得写锁,则获取读锁失败
return -1;
int r = sharedCount(c);
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
if (r == 0) {
FirstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
// 处理CAS操作失败的自旋
return fullTryAcquireShared(current);
}
protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
if (firstReaderHoldCount == 1)
firstReader = null;
else
firstReaderHoldCount--;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
int count = rh.count;
if (count <= 1) {
readHolds.remove();
if (count <= 0)
throw unmatchedUnlockException();
}
--rh.count;
}
for (;;) {
int c = getState();
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc))
// Releasing the read lock has no effect on readers,
// but it may allow waiting writers to proceed if
// both read and write locks are now free.
return nextc == 0;
}
}
读写锁支持锁降级,遵循按照获取写锁,获取读锁再释放写锁的次序,写锁能够降级成为读锁
class CachedData {
Object data;
volatile boolean cacheValid;
final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
void processCachedData() {
rwl.readLock().lock();
if (!cacheValid) {
// 获得write lock前必须释放read lock
rwl.readLock().unlock();
rwl.writeLock().lock();
try {
// Recheck state because another thread might have
// acquired write lock and changed state before we did.
if (!cacheValid) {
data = ...
cacheValid = true;
}
// 通过在释放写锁之前获取读锁 实现降级
rwl.readLock().lock();
} finally {
rwl.writeLock().unlock(); // Unlock write, still hold read
}
}
try {
use(data);
} finally {
rwl.readLock().unlock();
}
}
}
Condition 和 Object的wait和notify/notify区别
参照Object的wait和notify/notifyAll方法,Condition也提供了同样的方法:
public interface Condition {
// 当前线程进入等待状态,如果其他线程调用condition的signal或者signalAll方法并且当前线程获取Lock从await方法返回,如果在等待状态中被中断会抛出被中断异常;
void await() throws InterruptedException;
void awaitUninterruptibly();
// 当前线程进入等待状态直到被通知,中断或者超时
long awaitNanos(long nanosTimeout) throws InterruptedException;
// 自定义时间单位
boolean await(long time, TimeUnit unit) throws InterruptedException;
// 当前线程进入等待状态直到被通知,中断或者到了某个时间
boolean awaitUntil(Date deadline) throws InterruptedException;
// 唤醒一个等待在condition上的线程,将该线程从等待队列中转移到同步队列中,如果在同步队列中能够竞争到Lock则可以从等待方法中返回
void signal();
// 唤醒所有等待在condition上的线程
void signalAll();
}
要想能够深入的掌握condition还是应该知道它的实现原理,现在我们一起来看看condition的源码。创建一个condition对象是通过lock.newCondition()
,而这个方法实际上是会new出一个ConditionObject对象,该类是AQS的一个内部类。
public class ConditionObject implements Condition, java.io.Serializable {
private static final long serialVersionUID = 1173984872572414699L;
// 指向队首节点
private transient Node firstWaiter;
// 指向队尾节点
private transient Node lastWaiter;
}
这样我们就可以看出来ConditionObject通过持有等待队列的头尾指针来管理等待队列。主要注意的是Node类复用了在AQS中的Node类
// 后继节点 说明等待队列是一个单向队列 不带头结点
Node nextWaiter;
lock.newCondition()方法创建多个ConditionObject对象,ConditionObject实现了Condition接口,一个lock可以持有多个等待队列。
释放当前线程的锁,阻塞,只有当调用了signal恢复运行,得到锁成功后才会返回
该线程能够从await()方法返回的话一定是该线程获取了与condition相关联的lock
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 将当前线程包装成Node,尾插入到等待队列中
Node node = addConditionWaiter();
// 释放当前线程所占用的lock,调用了release
int savedstate = fullyRelease(node);
int interruptMode = 0;
// isOnSyncQueue 判断当前节点是否在同步队列中
while (!isOnSyncQueue(node)) {
// 当前线程进入到等待状态
LockSupport.park(this);
// 检测中断
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 获取到lock返回true
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
// 处理被中断的情况
reportInterruptAfterWait(interruptMode);
}
private Node addConditionWaiter() {
Node t = lastWaiter;
if (t != null && t.waitStatus != Node.CONDITION) {
// 清理队列中取消状态的节点
unlinkCancelledWaiters();
t = lastWaiter;
}
// 当前线程包装成Node
Node node = new Node(Thread.currentThread(), Node.CONDITION);
// 插入尾部
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
// 返回新创建的节点
return node;
}
将等待队列头节点移动到同步队列中
// 把当前节点移动到同步队列 ,直到获得了lock后才会从await方法返回
public final void signal() {
if (!isHeldExclusively())
// 如果没有获取lock会直接抛出异常
throw new IllegalMonitorStateException();
// 获取等待队列中第一个节点
Node first = firstWaiter;
if (first != null)
DOSignal(first);
}
// ReentrantLock#isHeldExclusively 判断当前线程是否占有锁
protected final boolean isHeldExclusively() {
return getExclusiveOwnerThread() == Thread.currentThread();
}
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
// 将头结点从等待队列中移除
first.nextWaiter = null;
} while (!transferForSignal(first) && (first = firstWaiter) != null);
}
// 处理头结点
final boolean transferForSignal(Node node) {
// 更新状态为0
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
// 将该节点移入到同步队列中去
Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
await 从 同步队列头插入等待对列尾部
signal从 等待队列头插入同步队列尾部
public class Demo {
private static ReentrantLock lock = new ReentrantLock();
private static Condition condition = lock.newCondition();
private static volatile boolean flag = false;
static volatile int num=1;
public static void main(String[] args) {
Thread thread1 = new Thread(new Print(0));
thread1.start();
Thread thread2 = new Thread(new Print(1));
thread2.start();
}
static class Print implements Runnable {
int x;
public Print(int x) {
this.x = x;
}
@override
public void run() {
lock.lock();
try {
while (num<=100){
if (num%2==x) {
System.out.println(Thread.currentThread().getName() + "当前条件不满足等待");
try {
condition.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println(Thread.currentThread().getName() + "num:"+num);
num++;
condition.signal();
}
} finally {
lock.unlock();
}
}
}
}
深入理解AbstractQueuedSynchronizer(一)
深入理解AbstractQueuedSynchronizer(二)
深入理解AbstractQueuedSynchronizer(三)
以上是脚本宝典为你收集整理的理解AQS和ReentrantLock全部内容,希望文章能够帮你解决理解AQS和ReentrantLock所遇到的问题。
本图文内容来源于网友网络收集整理提供,作为学习参考使用,版权属于原作者。
如您有任何意见或建议可联系处理。小编QQ:384754419,请注明来意。