Java并发编程——从ReentrantLock来看AQS的应用

上篇文章简单梳理了一下AQS原理,这篇文章通过ReentrantLock来看JUC包是如何应用AQS的。

ReentrantLock概述

ReentrantLock意思为可重入锁,指的是一个线程能够对一个临界资源重复加锁,它通过AQS框架来提供公平锁与非公平锁的两种实现。

可以将其与Synchronized对比,区别如下:

ReentrantLock Synchronized
锁实现机制 依赖AQS 依赖Monitor
灵活性 支持响应中断、超时、尝试获取锁 不灵活
释放形式 必须显示调用unlock()释放锁 由编译器来保证monitorexit
锁类型 公平锁与非公平锁 非公平锁
条件队列 可关联多个条件队列 关联一个条件队列
可重入性 可重入 可重入

ReentrantLock本身没有实现AQS,而是通过内部类FairSyncNonfairSync来实现AQS,这两个类对应公平锁和非公平锁的实现,下面分别来看具体实现。

非公平锁实现

ReentrantLock 要实现非公平锁只需在创建时传入 false 即可:

1
2
3
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}

非公平锁的实现在NonfairSync类中,下面以代码为例,分析ReentrantLock在非公平锁的实现下的加锁与解锁过程。

1
2
3
4
5
6
7
8
9
10
11
private val reentrantLock = ReentrantLock(false)

fun tryGetResourceLocked() {
reentrantLock.lock()

try {
···
} finally {
reentrantLock.unlock()
}
}

获取锁

当前线程执行到reentrantLock.lock方法时,会尝试获取锁,来到NonfairSync的lock方法中:

1
2
3
4
5
6
7
8
9
final void lock() {
// 通过CAS操作来修改state,即插队
if (compareAndSetState(0, 1))
// 修改成功,设置当前持有锁的线程
setExclusiveOwnerThread(Thread.currentThread());
else
// 修改失败,进入正常流
acquire(1);
}

对于非公平锁,它允许线程插队争夺锁,因此代码中首先利用了CAS机制来争夺锁,即插队,成功则更新当前持有锁的线程,失败调用acquire方法。

acquire方法在上篇文章中已经详细介绍了,我们再来看一下它的方法签名:

1
2
3
4
5
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

之前我们说到tryAcquire方法是由子类实现,来看NonfairSync是如何实现的:

1
2
3
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}

仅仅是调用了nonfairTryAcquire方法,这个方法定义在NonfairSync的父类Sync里,来看看它是如何实现的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 如果c=0,再次尝试获取锁
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// c不为0,但当前线程为独占锁持有者
else if (current == getExclusiveOwnerThread()) {
// 直接增加重入次数
int nextc = c + acquires;

// 可能溢出
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}

方法中重新获取了当前同步状态state,并且根据state的状态有:

  • 如果state为0,说明有线程释放了锁,可以尝试再次获取锁,成功则返回true
  • 如果state不为0,并且当前线程为锁持有者,则直接增加重入次数,这说明ReentrantLock是可重入的。另外还增加了对于可能存在的Int值的溢出判断,不过这里实际上目前是不太可能执行到的,因为在这之前可能已经出现StackOverflow了。
  • 其他情况返回false,表示获取锁失败

假如方法返回了false,则表示当前线程未能成功抢到锁,则会加入到CLH同步队列中并阻塞,这个在上篇文章已经详细介绍过了,这里不再赘述。

释放锁

当代码执行到reentrant.unlock方法时,会执行锁的释放操作,来看看unlock的源码:

1
2
3
public void unlock() {
sync.release(1);
}

方法直接调用了AQS中的release方法,同样在上篇文章详细介绍过,再来看下方法签名:

1
2
3
4
5
6
7
8
9
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}

tryRelease方法在Sync类中实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
protected final boolean tryRelease(int releases) {
// state的预期值
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;

// c为0,表示所有线程都释放了锁
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}

// 独占模式不需要CAS
setState(c);
return free;
}

对于独占锁来说,只有可能是锁的持有线程才可能修改State的值,因此整个释放过程不需要CAS来参与。

由于可能存在多次重入,本次释放后可能当前state不为0,那么当前线程依然是锁的持有者。

如果释放后state为0,则表示当前线程已经完全释放了锁,则进入后续操作,唤醒后继节点。

到这里NonfairSync类就解析完成了,可以看到在AQS框架的基础上要实现一个锁还是非常简单的。

公平锁实现

下面来看看公平锁实现,在FairSync类中。

FairSync与NonFairSync仅仅在获取锁的过程有差别,因此我们只需要关注获取锁即可。

同样的在lock方法和tryAcquire中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
final void lock() {
acquire(1);
}

protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}

可以看到这里的实现与非公平锁非常类似,仅有少许差别,分别是:

  • lock方法中,不同于非公平锁会先采用CAS尝试插队,公平锁直接调用了acquire方法来尝试获取锁
  • tryAcquire方法中,如果此时已经同步资源已经处于无锁状态(state为0),当前线程将不会立马去尝试获取锁,而是先判断队列中有没有正在等待的线程,而且这个线程不是当前线程,如果有的话将返回false,没有才会尝试获取锁。

到这里ReentrantLock的非公平锁与公平锁的实现分析就完成了,可以看到有了AQS框架之后整个实现是很简单的,不需要特别多的代码。

ReentrantReadWriteLock

上节中的ReentrantLock实现仅仅利用了AQS的独占同步机制,共享同步则没有使用到,因为ReentrantLock本身就是独占锁实现。

ReentrantReadWriteLock中的读锁是典型的共享锁,下面就从ReentrantReadWriteLock源码来看它是如何利用AQS同时实现共享锁与排他锁的。

Sync

与ReentrantLock一样,ReentrantReadWriteLock中本身不继承AQS,而是通过内部类Sync来实现。

ReentrantReadWriteLock中的Sync需要同时提供独占与共享的两种方式获取锁,读写锁获取与释放的核心逻辑都由它完成,因此我们只需关注它的实现即可。

同步状态的定义

上面说的Sync需要同时提供独占和共享的两种同步资源的获取与释放,根据AQS原理一文,AQS中的同步状态由一个state变量来保存。

那么如何使用一个变量来保存两种不同同步资源的持有状态呢?

如果要在一个整型变量上保存多种状态,那么我们自然想到要对其进行“按位切割”

如果每种状态只有有(1)没有(0)两种状态,那么这个整形变量可以保存最多32种状态,即每一个bit位都可以保存一个状态。这种保存多状态的方式其实在Android源码里面非常常见。

回到本文中,我们要在一个AQS的state变量中保存读和写两种同步资源状态,只需将state拆分为高16位和低16位,分别用来存储读状态和写状态,并且最多可以分别维护65535个读或写状态(包括重入)。

如下图:

读写锁状态划分

上图表示当前线程获取了写锁,并且重入了两次,同时也连续获取了两次读锁。

为什么一定是当前线程呢,这是因为读写锁的互斥性,如果读写锁的状态同时存在,则一定由同一线程持有

读锁(共享锁)

ReentrantReadWriteLock中ReadLock是共享锁实现,其通过Sync中的tryAcquireShared方法来进行加锁,通过Sync中的tryReleaseShared方法来进行解锁,这个在之前AQS的文章中就已经说明过。

下面通过源码来解读读锁的加锁与解锁过程。

Sync#tryAcquireShared

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread();
int c = getState();

// 通过exclusiveCount方法来获取当前持有独占同步状态的线程数
if (exclusiveCount(c) != 0 &&
// 当前线程不持有写锁
getExclusiveOwnerThread() != current)
return -1;

// 通过sharedCount方法来获取当前持有共享同步状态的线程数
int r = sharedCount(c);

// readerShouldBlock方法判断获取读锁的线程是否需要阻塞
if (!readerShouldBlock() &&
r < MAX_COUNT &&
// 尝试对state加65536, 实际就是对高16位加一
compareAndSetState(c, c + SHARED_UNIT)) {

// 如果r为0,说明当前线程为第一个持有读锁的线程
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
// firstReader为第一次获取读锁的线程
// 即最近将state从0设置为1的线程

// 增加重入次数
firstReaderHoldCount++;
} else {
// HoldCounter记录每个线程的重入次数
// 通过ThreadLocal来访问
// cachedHoldCounter为最近获取共享锁的线程的重入计数器
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
// 本次没能加锁
return fullTryAcquireShared(current);
}

加锁流程分析:

  1. 如果其他线程持有写锁,则加锁失败,直接返回,这说明读写锁之间是互斥的
  2. 否则,调用readerShouldBlock方法判断是否需要阻塞当前获取读锁的线程,它是个抽象方法,有两种实现:
    • 对于公平锁:如果在同步队列中有节点正在等待并且该节点对应线程不是当前线程,这表明有线程正在等待获取读锁,那么当前线程需要排队,则返回true表示需要阻塞
    • 对于非公平锁:如果同步队列中的首个节点(即头节点的后继节点)正处于独占状态,表示有线程将在当前线程获取读锁之前尝试获取写锁,而写锁与读锁是互斥的,因此没办法进行插队,返回true表示需要阻塞
  3. 如果无需阻塞,并且当先持有共享锁的线程没有超过规定的最大值,则使用CAS更新state表示获取锁,成功则进行后续的重入次数更新操作
  4. 如果线程需要阻塞、持有共享锁线程达到最大值、加锁失败,将执行fullTryAcquireShared方法。

Sync#fullTryAcquireShared

fullTryAcquireShared方法中的代码与tryAcquireShared中高度类似,其作用是开启一个死循环,即自旋获取锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
final int fullTryAcquireShared(Thread current) {
/**
* 这段代码与tryAcquireShared中的代码有部分重复,但整体更简单。
*/
HoldCounter rh = null;
// 死循环
for (;;) {
int c = getState();
// 如果存在写锁
if (exclusiveCount(c) != 0) {
// 并且不是当前线程,获取锁失败
// 这个判断是必须的,如果持有写锁的线程在获取读锁时进入阻塞了会导致死锁
if (getExclusiveOwnerThread() != current)
return -1;
} else if (readerShouldBlock()) {
// 获取读锁的线程需要阻塞
if (firstReader == current) {
// 表示重入 这里不需要做任何事
} else {
if (rh == null) {
// 上一个获取读锁的线程重入次数
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current)) {
// 当前线程持有的读锁数,即重入次数
rh = readHolds.get();
if (rh.count == 0)
readHolds.remove();
}
}
// 当前线程不持有读锁,同时需要阻塞,则返回-1
if (rh.count == 0)
return -1;
}
}
// 读锁获取次数不能超过65535
if (sharedCount(c) == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// 尝试加锁
if (compareAndSetState(c, c + SHARED_UNIT)) {
// 注意此时的c是旧值,当旧值为0,则当前线程为首个获取读锁的线程
if (sharedCount(c) == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
// 重入
firstReaderHoldCount++;
} else {
// 增加重入次数
if (rh == null)
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
// 保存为最近获取读锁的线程的持有次数
cachedHoldCounter = rh;
}
return 1;
}
}
}

执行fullTryAcquireShared时线程可能会进入自旋状态,流程如下:

  1. 首先根据读写锁互斥性,如果当前有线程持有写锁并且不是当前线程,那么会直接返回-1,根据上篇文章的AQS获取共享锁原理,将会执行doAcquireShared方法,可能会阻塞当前线程
  2. 写锁处于空闲状态或者当前线程持有写锁,当前线程才有可能获取到读锁。接下来会调用readerShouldBlock方法来判断线程是否应该阻塞,这时会排除两个特例:
    • 当前线程就是第一个获取读锁的线程,即当前线程此时是重入状态,此时是允许重新获取锁的
    • 否则,如果当前线程原先并不持有读锁,那么就会返回-1表示获取失败
  3. 经过上面的判断后,当前线程执行到这里就可以去获取锁了,通过cas操作更新state:
    • 成功,判断是否首个读线程,增加重入次数,返回1表示获取成功
    • 失败则自旋重复走fullTryAcquireShared方法的流程

到这里整个读锁的获取过程就分析完了,接下来来看如何释放读锁。

Sync#tryReleaseShared

搞懂了上面的获取共享锁的过程后,释放共享锁就很容理解了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
// 先处理缓存的锁重入次数
if (firstReader == current) {
if (firstReaderHoldCount == 1)
firstReader = null;
else
firstReaderHoldCount--;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
int count = rh.count;
if (count <= 1) {
readHolds.remove();
if (count <= 0)
throw unmatchedUnlockException();
}
--rh.count;
}
// 自旋
for (;;) {
// 对state执行减一操作
int c = getState();
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc))
// 注意这个返回值
return nextc == 0;
}
}

代码很容易理解,这里有一处需要注意的,就是返回值:

1
return nextc == 0;

nextc表示释放后的state值,如果为0,则表示读锁和写锁都处于空闲状态,那么就会返回true,根据这个返回值我们可以去判断是否需要唤醒正在等待的获取写锁的线程。

写锁(排他锁)

相对于读锁的共享式获取和释放,写锁的独占式就显得尤为简单了,通过Sync中的tryAcquire方法来获取写锁,tryRelease方法来释放写锁。

Sync#tryAcquire

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
protected final boolean tryAcquire(int acquires) {
Thread current = Thread.currentThread();
// c为读写总状态
int c = getState();
// w为写锁状态状态
int w = exclusiveCount(c);
if (c != 0) {
// c不为0
// w为0,则当前读状态,加锁失败
// w不为0,但是当前线程不持有写锁,加锁失败
if (w == 0 || current != getExclusiveOwnerThread())
return false;

// 限制最大重入次数
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// 获取写锁
setState(c + acquires);
return true;
}
// c为0
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
// 获取锁成功,更新写锁持有线程
setExclusiveOwnerThread(current);
return true;
}

加锁流程:

  1. 同步状态state不为0,此时分多种情况:

    • 写锁空闲,根据读写互斥,加锁失败
    • 写锁不空闲,但当前线程不是持有写锁的线程,根据读写互斥,加锁失败
    • 除了上面的情况之外,只需不超出重入限制即可成功加锁,无需cas
  2. 同步状态state为0,可以直接进行加锁,但在这之前同样需要通过writerShouldBlock来判断是否需要阻塞获取写锁的线程:

    • 对于公平锁,如果队列中有线程在排队,那么当先线程必须先入队进行排队
    • 对于非公平锁,固定返回false,表示始终允许获取写锁时进行插队

    如果writerShouldBlock方法返回false,则会通过CAS来修改state的值,修改成功则加锁成功,更新当前线程为持有写锁的线程

这里的writerShouldBlock和获取读锁时的readerShouldBlock方法的作用是一致的,都是在获取锁之前判断当前时刻当前线程是否有资格获取锁,都依赖于子类来完成,说白了就是适配公平锁与非公锁的不同需求。

Sync#tryRelease

1
2
3
4
5
6
7
8
9
10
11
protected final boolean tryRelease(int releases) {
// 如果当前线程不持有写锁,直接抛异常
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
int nextc = getState() - releases;
boolean free = exclusiveCount(nextc) == 0;
if (free)
setExclusiveOwnerThread(null);
setState(nextc);
return free;
}

代码非常简单,同样也是不需要CAS操作的,不再赘述。

ReentrantReadWriteLock的锁降级策略

总结

本篇文章基于ReentrantLock的源码了解了AQS框架在公平锁与非公平锁上的应用,然后重点分析了ReentrantReadWriteLock中Sync类的源码,理解了共享同步状态获取的过程,加深了对AQS的理解。

参考

  1. openJdk 11
  2. 《Java并发编程艺术》

本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!