StampedLock详解
# StampedLock详解
# 1、StampedLock简介
StampedLock
是JUC并发包里面 JDK8 版本新增的一个锁,是读写锁的一种具体实现,和ReentrantReadWriteLock
不同的是其不提供可重入性,不基于某个类似Lock或者ReadWriteLock接口实现,而是基于CLH锁思想实现这点这AQS有些类似,并且StampedLock不支持条件变量 Condition
。 关于ReentrantReadWriteLock
可以参考上篇文章 ReentrantReadWriteLock
详解 (opens new window)。
那么JDK既然实现了这个锁,就说明这个锁一定有优势,那就是性能优势,下面会具体分析。
# StampedLock 三个主要的锁模式
写锁模式(writeLock()): 用于排他性地写操作。在写锁模式下,其他线程既无法获取读锁,也无法获取写锁。
乐观读锁模式(tryOptimisticRead()): 允许线程进行读操作而不获取读锁,这种模式假设在读操作过程中数据不会被其他线程修改。如果发现数据被修改,可以重新获取悲观读锁以保证数据一致性。
悲观读锁模式(readLock()): 类似于 ReadWriteLock 的读锁,允许多个线程同时获取读锁,但无法与写锁共存。
# 戳记
戳记(stamp) 是 StampedLock 的关键,表示当前锁的状态。获取锁时返回的戳记值在后续的锁操作中用于验证锁的有效性,确保在锁的释放或转换操作中锁的状态是正确的。使用戳记有助于减少锁的争用和开销, StampedLock 通过提供乐观读锁在多线程多读的情况下提供了更好的性能,这是因为获取乐观读锁时不需要进行 CAS 操作设置锁的状态,而只是简单地测试状态。
# 获取锁和释放锁的方法
获取锁:
tryOptimisticRead(): 获取乐观读锁,返回戳记。
readLock(): 获取悲观读锁,返回戳记。
writeLock(): 获取写锁,返回戳记。
释放锁:
unlockRead(stamp): 释放读锁(悲观读锁或乐观读锁)。
unlockWrite(stamp): 释放写锁。
# 转换锁
// 尝试将当前持有的锁转换为写锁
long tryConvertToWriteLock(long stamp){}
// 尝试将当前持有的写锁转换为读锁
long tryConvertToReadLock(long stamp){}
// 尝试将当前持有的悲观读锁转换为乐观读锁
long tryConvertToOptimisticRead(long stamp){}
可以看出tryConvertToWriteLock
方法表明StampedLock支持锁升级,这也是和ReentrantReadWriteLock
不同的点。
# 并发度比较
锁 | 并发度 |
---|---|
ReentrantLock | 读读互斥,读写互斥,写写互斥 |
ReentrantReadWriteLock | 读读不互斥、读写互斥、写写互斥 |
StampedLock | 读读不互斥、读写不互斥、写写互斥 |
上面对于StampedLock 的读写不互斥是指 乐观读和写,而不是悲观读和写。
乐观读的思想和数据库中MVCC(Multi-Version Concurrency Control,多版本并发控制)有点类似。
# 2、StampedLock简单使用示例
还是拿狗吃骨头举例:
import java.util.concurrent.locks.StampedLock;
public class TestA {
private final StampedLock lock = new StampedLock();
private volatile int bones = 10; // 初始骨头数量
public static void main(String[] args) {
TestA example = new TestA();
// 创建多个线程模拟狗的操作 可以吃骨头 也可以添加骨头
Runnable dogEat = example::eatBone;
Runnable dogAdd = () -> example.addBones(5);
Thread dog1 = new Thread(dogEat);
Thread dog3 = new Thread(dogAdd);
Thread dog2 = new Thread(dogEat);
dog1.start();
dog2.start();
dog3.start();
try {
dog1.join();
dog2.join();
dog3.join();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
// 狗获取骨头(读操作)
public void eatBone() {
long stamp = lock.tryOptimisticRead();
try {
// 尝试进行乐观读操作
int currentBones = bones;
// 这里模拟一些延迟
Thread.sleep(100); // 假设读操作需要一点时间
// 检查在读操作过程中骨头是否被修改
if (!lock.validate(stamp)) {
// 骨头在读操作过程中被修改,获取悲观读锁以确保一致性
stamp = lock.readLock();
try {
// 持有悲观读锁 bones 就不能被修改了
currentBones = bones;
} finally {
lock.unlockRead(stamp);
}
}
// 输出读取到的骨头数量
System.out.println("狗看到的骨头数量 : " + currentBones + " 个");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
// 狗添加新的骨头(写操作)
public void addBones(int number) {
long stamp = lock.writeLock();
try {
// 执行写操作
bones += number;
System.out.println("添加 " + number + " 个骨头. 一共: " + bones);
} finally {
lock.unlockWrite(stamp);
}
}
}
总结:
需要注意的点就是,lock.tryOptimisticRead();
获取悲观锁方法并不真正获取锁,而是假设在读操作期间数据不会被修改。使用戳记来验证数据是否在读操作期间被修改,必要时需要升级为悲观读锁来保证数据一致性。
# 使用乐观读锁需要遵循一定的规则:
比如吃骨头的方法中获取狗头数量的时候需要遵循下面几个步骤: 并且一定要按照下面几个步骤顺序处理。
①、乐观读操作:
先尝试使用 tryOptimisticRead()
获取乐观读锁。此时不持有实际的读锁,仅仅假设数据在读取期间不会被修改。
进行乐观读操作,相当于把共享变量读取到线程的栈内存,这一步很重要需要在验证之前执行。
②、验证和转换:
使用 validate(stamp)
检查在乐观读期间数据是否被修改。如果数据未被修改,读取的结果就是可靠的。
如果数据被修改,需要使用 readLock()
获取悲观读锁,再进行一遍悲观读取操作来确保数据的一致性。
③、释放锁: 如果获取了悲观读锁,必须在读操作完成后使用 unlockRead(stamp) 释放锁。
// 狗获取骨头(读操作)
public void eatBone() {
long stamp = lock.tryOptimisticRead();
try {
// 尝试进行乐观读操作
int currentBones = bones;
// 这里模拟一些延迟
Thread.sleep(100); // 假设读操作需要一点时间
// 检查在读操作过程中骨头是否被修改
if (!lock.validate(stamp)) {
// 骨头在读操作过程中被修改,获取悲观读锁以确保一致性
stamp = lock.readLock();
try {
// 持有悲观读锁 bones 就不能被修改了
currentBones = bones;
} finally {
lock.unlockRead(stamp);
}
}
// 输出读取到的骨头数量
System.out.println("狗看到的骨头数量 : " + currentBones + " 个");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
使用乐观锁、悲观锁思想也是StampedLock提升读取性能的一个方式。因为tryOptimisticRead
和 validate
这两个方法都比悲观读锁的CAS操作要快。
# validate
方法的注意点
由于乐观读操作对顺序要求很严格,并且乐观读返回的是普通long类型变量,所以为了防止重排序,在validate
方法中使用了读屏障( U.loadFence();
) 确保在屏障之前的所有读操作在屏障之后的读操作之前完成。这意味着,U.loadFence()
保证了在它之前的内存读取操作不会被重新排序到它之后。
StampedLock
中的state
变量是volatile修饰的,但是validate方法的入参stamp
并不能保证是volatile变量,所以需要加个读屏障,确保 stamp
和 state
的读取操作不会被重排序,从而保证 stamp
的有效性检查是准确的。如果不使用读屏障,可能会出现 stamp
和 state
的读取操作被重排序的情况,这可能导致 validate 方法返回不正确的结果。
public boolean validate(long stamp) {
U.loadFence();
return (stamp & SBITS) == (state & SBITS);
}
# 3、StampedLock获取释放锁详解
# 类继承结构
public class StampedLock implements java.io.Serializable
# 类属性
/** CPU 核心数,用于自旋控制 */
private static final int NCPU = Runtime.getRuntime().availableProcessors();
/** 尝试获取锁时的最大自旋次数。自旋是指在短时间内反复检查锁状态,而不是立即阻塞线程。 */
private static final int SPINS = (NCPU > 1) ? 1 << 6 : 0;
/** 尝试获取锁时,最大自旋次数,超出此值后会尝试阻塞在队列的头部。 */
private static final int HEAD_SPINS = (NCPU > 1) ? 1 << 10 : 0;
/** 自旋时的最大重试次数,超出此值后会重新尝试阻塞。 */
private static final int MAX_HEAD_SPINS = (NCPU > 1) ? 1 << 16 : 0;
/** 等待溢出自旋锁时的放弃 CPU 使用的周期。这是一个幂次 2 - 1 的值。 */
private static final int OVERFLOW_YIELD_RATE = 7; // 必须是 2 的幂次 - 1
/** 用于表示读锁计数的位数,超出此范围会发生溢出。 范围是1~126 */
private static final int LG_READERS = 7;
/** 锁状态和印章操作的相关常量值 */
// 单位读操作的位掩码
private static final long RUNIT = 1L;
// 写操作的位掩码(位移了 LG_READERS 位)
private static final long WBIT = 1L << LG_READERS;
// 读操作的位掩码范围
private static final long RBITS = WBIT - 1L;
// 读操作的满位掩码
private static final long RFULL = RBITS - 1L;
// 所有锁的位掩码(包括读和写)
private static final long ABITS = RBITS | WBIT;
// 只有写锁的位掩码 ( ~ 是按位取反操作符)
private static final long SBITS = ~RBITS; // 注意与 ABITS 的重叠
// 锁状态的初始值; 避免零值作为失败值
private static final long ORIGIN = WBIT << 1;
/** 从被取消的获取方法返回的特殊值,用于抛出中断异常。 */
private static final long INTERRUPTED = 1L;
/** 节点状态的相关常量值,顺序很重要 */
// 等待状态
private static final int WAITING = -1;
// 取消状态
private static final int CANCELLED = 1;
/** 节点模式; 用整数而不是布尔值,以允许进行算术操作 */
// 读模式
private static final int RMODE = 0;
// 写模式
private static final int WMODE = 1;
/** 等待节点的内部类,用于管理队列中的节点 */
static final class WNode {
volatile WNode prev; // 上一个节点
volatile WNode next; // 下一个节点
volatile WNode cowait; // 链接的读线程列表
volatile Thread thread; // 线程对象; 如果非空,则线程可能被挂起
volatile int status; // 节点状态; 0, WAITING 或 CANCELLED
final int mode; // 节点模式; RMODE 或 WMODE
WNode(int m, WNode p) { mode = m; prev = p; }
}
/** CLH 队列的头部节点 */
private transient volatile WNode whead;
/** CLH 队列的尾部节点 */
private transient volatile WNode wtail;
/** 锁视图,用于提供不同类型的锁视图 */
transient ReadLockView readLockView;
transient WriteLockView writeLockView;
transient ReadWriteLockView readWriteLockView;
/** 锁的状态*/
private transient volatile long state;
/** 当状态的读计数饱和时,所使用的额外读锁计数 */
private transient int readerOverflow;
可以看出StampedLock
是通过内部类 WNode
来管理队列中的节点(队列属于双向链表结构,利用了CLH锁思想)。并且用了大量的标志位和位运算来处理锁的逻辑。 CLH锁的详解可以参考 AQS详解 (opens new window)这篇文章。
# StampedLock对于state变量的设计
/** 用于表示读锁计数的位数,超出此范围会发生溢出。 范围是1~126 */
private static final int LG_READERS = 7;
// 写操作的位掩码(位移了 LG_READERS 位)
private static final long WBIT = 1L << LG_READERS;
// 锁状态的初始值; 避免零值作为失败值
private static final long ORIGIN = WBIT << 1;
/** 锁的状态*/
private transient volatile long state;
由于只有一个state
变量,又需要表示读写锁,所以StampedLock
也把state变量拆成了读和写的部分,但是不同于 ReentrantReadWriteLock
的int类型的state变量把高16位表示读锁计数,低16位表示写锁计数。
StampedLock
中 锁状态的初始值是 ORIGIN
也就是 1<<7,也就是 1000 0000
(前面的0省略)
用最低的8位表示读和写的状态,其中最低的7位表示读锁的状态(版本),第8位表示写锁的状态。
因为写锁是互斥的且不可重入,用一位就够了。
# StampedLock乐观锁实现原理
结合tryOptimisticRead
和validate
方法分析:
public long tryOptimisticRead() {
long s;
return (((s = state) & WBIT) == 0L) ? (s & SBITS) : 0L;
}
public boolean validate(long stamp) {
U.loadFence();
return (stamp & SBITS) == (state & SBITS);
}
tryOptimisticRead
方法中 state&WBIT!=0
,说明state
变量表示写锁的第八位为1,也就是有线程持有写锁,那么tryOptimisticRead
方法就会返回0。表示获取乐观读锁失败。然后我们再调用validate(0)
一定会得到false,也就是校验失败。这个符合当有线程持有写锁时与其他锁互斥的逻辑。
为什么validate
方法,比较的是(stamp & SBITS) == (state & SBITS);
?
因为需要支持读读不互斥,即使修改了state的低7位也就是读锁的部分,(stamp & SBITS) == (state & SBITS);
依然会返回true。
# StampedLock的构造方法
/** 用于表示读锁状态的位数,超出此范围会发生溢出。 范围是1~126 */
private static final int LG_READERS = 7;
/** 写操作的位掩码(位移了 LG_READERS 位) */
private static final long WBIT = 1L << LG_READERS;
/** 锁状态的初始值; 避免零值作为失败值 */
private static final long ORIGIN = WBIT << 1;
/** 锁的状态*/
private transient volatile long state;
/**
* 默认构造方法。
* 初始化 `StampedLock` 对象,将锁状态设置为初始值。
*/
public StampedLock() {
// 初始化锁状态为初始值 ORIGIN,避免状态为零作为失败值。
state = ORIGIN;
}
# tryOptimisticRead
方法
/** 用于表示读锁状态的位数 */
private static final int LG_READERS = 7;
/** 写操作的位掩码(位移了 LG_READERS 位) */
private static final long WBIT = 1L << LG_READERS;
/** 只有写锁的位掩码 */
private static final long SBITS = ~RBITS;
/** 读操作的位掩码范围 */
private static final long RBITS = WBIT - 1L;
/**
* 尝试以乐观读模式获取锁。
*
* 这个方法检查当前锁状态以确定是否可以进行乐观读操作。
* 如果没有写锁持有者(即锁的写位没有被设置),方法返回当前的读者计数(即锁的状态)。
* 否则,返回 0 表示无法进行乐观读操作。
*
* @return 如果可以进行乐观读,则返回当前的读者计数;否则返回 0。
*/
public long tryOptimisticRead() {
long s;
// 读取当前锁状态
s = state;
// 检查写锁位是否被设置。如果没有写锁持有者(写位为 0),则返回当前读者计数(即去除写锁位后的状态值)。
return (((s & WBIT) == 0L) ? (s & SBITS) : 0L);
}
这个方法允许线程在没有写锁持有者的情况下进行乐观读操作,这可以提高并发性能,因为乐观读操作不需要获取实际的读锁。
# readLock
方法
readLock方法
/**
* 尝试获取读锁。
*
* 在常见的无竞争情况下,这个方法会直接返回一个读锁的印章。
* 如果当前队列为空(即 `whead` 等于 `wtail`),并且没有写锁持有者(即当前状态的读位小于 RFULL),
* 则通过 CAS 操作将状态值加上 `RUNIT` 以尝试获取读锁。
* 否则,调用 `acquireRead` 方法来实际获取读锁。
*
* @return 成功获取读锁时的印章值。如果无法获取,则调用 `acquireRead` 方法来处理。
*/
public long readLock() {
long s = state, next; // 读取当前锁状态,并为下一个状态准备变量
// 如果当前队列为空且状态值允许新的读操作,则尝试通过 CAS 操作增加读锁计数
return ((whead == wtail && (s & ABITS) < RFULL &&
U.compareAndSwapLong(this, STATE, s, next = s + RUNIT)) ?
next : acquireRead(false, 0L)); // 否则调用 acquireRead 方法来获取读锁
}
readLock
方法总结: 目标: 尝试在无竞争情况下快速获取读锁。如果直接获取失败,则调用 acquireRead
处理复杂情况。
具体步骤:
检查队列状态:
如果队列为空(whead == wtail)且状态允许增加读锁计数((s & ABITS) < RFULL),即没有达到读锁计数上限:
尝试通过 CAS 操作将 state 增加 RUNIT,即增加读锁计数。
如果 CAS 操作成功,返回新的状态 next。
调用 acquireRead:
如果队列不为空,或者 CAS 操作失败,则调用 acquireRead 方法来处理更复杂的情况,实际获取读锁。
acquireRead方法
这个方法就开始上强度了,不太好理解
/**
* 实际获取读锁的方法。
*
* 这个方法会尝试获取读锁。如果直接获取失败,它会通过自旋、队列管理和线程等待来确保获取读锁。
*
* @param interruptible 是否可中断。
* @param deadline 线程等待的截止时间(纳秒)。
* @return 成功获取读锁时的印章值。如果超时或中断,则可能会取消等待。
*/
private long acquireRead(boolean interruptible, long deadline) {
WNode node = null, p;
for (int spins = -1;;) {
WNode h;
if ((h = whead) == (p = wtail)) { // 检查队列是否为空
for (long m, s, ns;;) {
// 检查当前状态是否允许增加读锁计数
if ((m = (s = state) & ABITS) < RFULL ?
U.compareAndSwapLong(this, STATE, s, ns = s + RUNIT) :
(m < WBIT && (ns = tryIncReaderOverflow(s)) != 0L))
return ns; // 成功获取读锁
else if (m >= WBIT) {
if (spins > 0) {
if (LockSupport.nextSecondarySeed() >= 0)
--spins;
}
else {
if (spins == 0) {
WNode nh = whead, np = wtail;
if ((nh == h && np == p) || (h = nh) != (p = np))
break;
}
spins = SPINS; // 重置自旋次数
}
}
}
}
if (p == null) { // 如果队列为空,初始化队列
WNode hd = new WNode(WMODE, null);
if (U.compareAndSwapObject(this, WHEAD, null, hd))
wtail = hd;
}
else if (node == null)
node = new WNode(RMODE, p); // 创建新的读节点
else if (h == p || p.mode != RMODE) { // 如果队列中的前驱节点不是读节点,尝试加入队列
if (node.prev != p)
node.prev = p;
else if (U.compareAndSwapObject(this, WTAIL, p, node)) {
p.next = node;
break;
}
}
else if (!U.compareAndSwapObject(p, WCOWAIT,
node.cowait = p.cowait, node))
node.cowait = null; // 更新前驱节点的等待列表
else {
for (;;) {
WNode pp, c; Thread w;
if ((h = whead) != null && (c = h.cowait) != null &&
U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) &&
(w = c.thread) != null) // 唤醒等待线程
U.unpark(w);
if (h == (pp = p.prev) || h == p || pp == null) {
long m, s, ns;
do {
// 检查状态是否允许增加读锁计数
if ((m = (s = state) & ABITS) < RFULL ?
U.compareAndSwapLong(this, STATE, s,
ns = s + RUNIT) :
(m < WBIT &&
(ns = tryIncReaderOverflow(s)) != 0L))
return ns; // 成功获取读锁
} while (m < WBIT);
}
if (whead == h && p.prev == pp) {
long time;
if (pp == null || h == p || p.status > 0) {
node = null; // 丢弃节点
break;
}
if (deadline == 0L)
time = 0L;
else if ((time = deadline - System.nanoTime()) <= 0L)
return cancelWaiter(node, p, false); // 超时取消等待
Thread wt = Thread.currentThread();
U.putObject(wt, PARKBLOCKER, this);
node.thread = wt;
if ((h != pp || (state & ABITS) == WBIT) &&
whead == h && p.prev == pp)
U.park(false, time); // 线程等待
node.thread = null;
U.putObject(wt, PARKBLOCKER, null);
if (interruptible && Thread.interrupted())
return cancelWaiter(node, p, true); // 处理中断
}
}
}
}
// 在队列头部自旋等待
for (int spins = -1;;) {
WNode h, np, pp; int ps;
if ((h = whead) == p) {
if (spins < 0)
spins = HEAD_SPINS; // 初始化自旋次数
else if (spins < MAX_HEAD_SPINS)
spins <<= 1; // 增加自旋次数
for (int k = spins;;) { // 在队列头部自旋
long m, s, ns;
if ((m = (s = state) & ABITS) < RFULL ?
U.compareAndSwapLong(this, STATE, s, ns = s + RUNIT) :
(m < WBIT && (ns = tryIncReaderOverflow(s)) != 0L)) {
WNode c; Thread w;
whead = node; // 更新队列头部
node.prev = null;
while ((c = node.cowait) != null) {
if (U.compareAndSwapObject(node, WCOWAIT,
c, c.cowait) &&
(w = c.thread) != null)
U.unpark(w); // 唤醒等待线程
}
return ns; // 成功获取读锁
}
else if (m >= WBIT &&
LockSupport.nextSecondarySeed() >= 0 && --k <= 0)
break; // 超过自旋次数,退出自旋
}
}
else if (h != null) {
WNode c; Thread w;
while ((c = h.cowait) != null) {
if (U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) &&
(w = c.thread) != null)
U.unpark(w); // 唤醒等待线程
}
}
if (whead == h) {
if ((np = node.prev) != p) {
if (np != null)
(p = np).next = node; // 更新前驱节点的 next 指针
}
else if ((ps = p.status) == 0)
U.compareAndSwapInt(p, WSTATUS, 0, WAITING); // 设置节点状态为等待
else if (ps == CANCELLED) {
if ((pp = p.prev) != null) {
node.prev = pp;
pp.next = node; // 更新前驱节点的 next 指针
}
}
else {
long time;
if (deadline == 0L)
time = 0L;
else if ((time = deadline - System.nanoTime()) <= 0L)
return cancelWaiter(node, node, false); // 超时取消等待
Thread wt = Thread.currentThread();
U.putObject(wt, PARKBLOCKER, this);
node.thread = wt;
if (p.status < 0 &&
(p != h || (state & ABITS) == WBIT) &&
whead == h && node.prev == p)
U.park(false, time); // 线程等待
node.thread = null;
U.putObject(wt, PARKBLOCKER, null);
if (interruptible &&
acquireRead 方法总结:
目标: 在有竞争的情况下获取读锁,包括处理线程等待、队列管理、自旋等。
acquireRead 的详细步骤:
检查队列状态:
如果队列为空(whead == wtail),尝试直接增加读锁计数(state)。
如果成功,返回新的状态 ns。否则,进行自旋尝试。
创建或管理节点:
如果队列为空,初始化一个新的头节点。
如果节点 node 为空,则创建一个新的读节点。
尝试将新的读节点添加到队列中,处理节点前驱和队列管理。
处理自旋等待:
如果直接获取读锁失败,通过自旋等待的方式尝试获取读锁。
处理线程等待:
如果自旋仍然失败,将线程放入等待队列中,处理中断和超时。
唤醒等待线程:
当读锁被成功获取后,唤醒等待的线程。
# unlockRead方法
unlockRead方法
/**
* 解锁读锁。
*
* 这个方法会验证提供的印章(`stamp`)是否与当前锁状态一致。如果印章无效或锁状态不匹配,会抛出 `IllegalMonitorStateException`。
* 如果锁的读计数低于 `RFULL`,尝试通过 CAS 操作减少读计数。如果读计数减少到 0,则释放队列中可能被阻塞的线程。
* 如果读计数达到了 `RFULL`,则调用 `tryDecReaderOverflow` 方法来处理溢出读线程计数。
*
* @param stamp 读锁的印章值。
* @throws IllegalMonitorStateException 如果印章无效或状态不匹配。
*/
public void unlockRead(long stamp) {
long s, m; WNode h;
for (;;) { // 自旋
// 获取当前锁状态
s = state;
// 检查印章是否与当前状态匹配,或者印章是否有效
if (((s & SBITS) != (stamp & SBITS)) ||
(stamp & ABITS) == 0L || (m = s & ABITS) == 0L || m == WBIT)
throw new IllegalMonitorStateException(); // 印章无效,抛出异常
if (m < RFULL) {
// 如果读计数小于 RFULL,尝试减少读计数
if (U.compareAndSwapLong(this, STATE, s, s - RUNIT)) {
// 如果减少后读计数为 RUNIT 且队列头部节点状态不为 0,释放队列中的线程
if (m == RUNIT && (h = whead) != null && h.status != 0)
release(h);
break;
}
}
else if (tryDecReaderOverflow(s) != 0L)
break; // 处理溢出的读者计数
}
}
unlockRead
方法总结:
目标:
释放读锁,验证印章的有效性,并根据读锁计数的状态更新锁状态。
步骤:
自旋检查印章有效性:
进入自旋循环,获取当前锁状态 s。
检查传入的印章 stamp 是否与当前状态一致(通过比较 SBITS),以及印章是否有效((stamp & ABITS) == 0L)
,或者当前状态是否无效(m == WBIT)
。
如果印章无效,抛出 IllegalMonitorStateException。
处理读计数:
如果读计数小于 RFULL:
尝试通过 CAS 操作将状态 s 减少 RUNIT,即减少读锁计数。
如果读计数减少后为 RUNIT,并且队列头部节点状态不为 0,调用 release(h) 释放队列中的线程。
跳出循环,完成解锁。
如果读计数达到 RFULL:
调用 tryDecReaderOverflow 方法处理读计数溢出情况。
处理读计数溢出:
tryDecReaderOverflow 方法用于减少在 RFULL 状态下的溢出读线程计数。
tryDecReaderOverflow方法
/**
* 尝试减少溢出的读者计数。
*
* 这个方法会处理在读锁计数达到 `RFULL` 后的溢出情况。如果当前状态的读计数已经达到 `RFULL`,
* 通过 CAS 操作将状态更新为包含读位的值,并减少 `readerOverflow` 计数器。
* 如果 `readerOverflow` 计数器为 0,则直接减少读计数。
*
* @param s 当前锁状态。
* @return 更新后的状态值。如果无法减少读者计数,则返回 0L。
*/
private long tryDecReaderOverflow(long s) {
// 确保当前状态的读位计数达到了 RFULL
if ((s & ABITS) == RFULL) {
// 通过 CAS 操作将状态值更新为包含读位的状态
if (U.compareAndSwapLong(this, STATE, s, s | RBITS)) {
int r; long next;
if ((r = readerOverflow) > 0) {
// 如果溢出计数器大于 0,减少计数器并保持状态
readerOverflow = r - 1;
next = s;
}
else
// 否则,减少读锁计数
next = s - RUNIT;
state = next; // 更新锁状态
return next; // 返回更新后的状态值
}
}
else if ((LockSupport.nextSecondarySeed() & OVERFLOW_YIELD_RATE) == 0)
Thread.yield(); // 在不能处理溢出时,让线程让步
return 0L; // 无法减少读者计数时返回 0L
}
tryDecReaderOverflow方法总结:
目标: 处理在读锁计数达到上限时的溢出情况,确保锁状态的正确性。
步骤: 检查读计数是否达到 RFULL:
如果当前状态的读位计数等于 RFULL,则尝试更新状态以处理溢出情况。
更新状态:
通过 CAS 操作将状态更新为包含读位的状态(s | RBITS)
。
如果 readerOverflow 计数器大于 0,减少计数器并保持状态。
如果 readerOverflow 为 0,减少读锁计数。
更新 state 变量,并返回更新后的状态值。
处理无法减少计数的情况:
如果无法处理溢出情况,让线程让步(Thread.yield()),并返回 0L。
# writeLock
方法
writeLock方法
/**
* 尝试获取写锁。
*
* 这个方法首先检查当前锁状态,如果当前没有读锁或写锁(`ABITS` 中没有任何标志位),
* 直接通过 CAS 操作将状态更新为加上写锁位 `WBIT` 的新状态。如果更新成功,返回新的状态值。
* 如果锁已被其他线程持有,或者队列中存在等待写锁的线程,则调用 `acquireWrite` 方法,
* 通过排队的方式来获取写锁。
*
* @return 成功获取的写锁印章。
*/
public long writeLock() {
long s, next; // 当前锁状态和新的状态
// 检查当前锁状态是否为完全解锁状态
return ((((s = state) & ABITS) == 0L &&
U.compareAndSwapLong(this, STATE, s, next = s + WBIT)) ?
next : acquireWrite(false, 0L));
}
writeLock 方法总结: 目标: writeLock 方法尝试直接获取写锁,如果失败则调用 acquireWrite 方法通过排队的方式获取写锁。
步骤:
检查当前锁状态:
获取当前锁状态 s。
如果当前状态没有任何读锁或写锁(即 ABITS 中没有任何标志位),尝试通过 CAS 操作将状态更新为加上写锁位 WBIT 的新状态 next。
更新锁状态:
如果 CAS 操作成功,则返回新的状态值 next。
调用 acquireWrite:
如果状态更新失败(锁已被其他线程持有或队列中存在等待写锁的线程),调用 acquireWrite 方法来通过排队的方式获取写锁。
acquireWrite方法
/**
* 尝试获取写锁。
*
* 这个方法通过自旋和排队的方式获取写锁。如果直接获取写锁失败,则将当前线程排入等待队列,
* 并在队列中等待直到能够获取写锁。
*
* @param interruptible 是否可中断。
* @param deadline 等待的最大时间(以纳秒为单位),0 表示不超时。
* @return 成功获取的写锁印章。
*/
private long acquireWrite(boolean interruptible, long deadline) {
WNode node = null, p;
// 自旋尝试将当前线程排入等待队列
for (int spins = -1;;) { // 自旋,直到将节点加入队列
long m, s, ns;
// 获取当前锁状态
if ((m = (s = state) & ABITS) == 0L) {
// 如果当前没有读锁或写锁,通过 CAS 操作将状态更新为加上写锁位 WBIT
if (U.compareAndSwapLong(this, STATE, s, ns = s + WBIT))
return ns; // 返回更新后的状态值
} else if (spins < 0)
// 如果当前状态为写锁且队列为空,设置自旋次数
spins = (m == WBIT && wtail == whead) ? SPINS : 0;
else if (spins > 0) {
// 自旋,减少自旋次数
if (LockSupport.nextSecondarySeed() >= 0)
--spins;
} else if ((p = wtail) == null) { // 初始化队列
WNode hd = new WNode(WMODE, null);
if (U.compareAndSwapObject(this, WHEAD, null, hd))
wtail = hd;
} else if (node == null)
// 创建新的等待节点
node = new WNode(WMODE, p);
else if (node.prev != p)
// 确保节点的前驱节点正确
node.prev = p;
else if (U.compareAndSwapObject(this, WTAIL, p, node)) {
// 将当前节点加入队列
p.next = node;
break; // 成功将节点加入队列,退出自旋循环
}
}
// 自旋等待,直到获取写锁
for (int spins = -1;;) {
WNode h, np, pp;
int ps;
// 检查当前队列头节点
if ((h = whead) == p) {
// 如果当前节点是队列头节点,自旋等待写锁
if (spins < 0)
spins = HEAD_SPINS;
else if (spins < MAX_HEAD_SPINS)
spins <<= 1;
for (int k = spins;;) {
long s, ns;
// 获取当前锁状态
if (((s = state) & ABITS) == 0L) {
// 如果当前没有读锁或写锁,通过 CAS 操作更新状态
if (U.compareAndSwapLong(this, STATE, s, ns = s + WBIT)) {
// 更新队列头部为当前节点
whead = node;
node.prev = null;
return ns; // 返回新的状态值
}
} else if (LockSupport.nextSecondarySeed() >= 0 &&
--k <= 0)
break; // 超过自旋次数,退出循环
}
} else if (h != null) { // 帮助释放过时的等待节点
WNode c;
Thread w;
while ((c = h.cowait) != null) {
if (U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) &&
(w = c.thread) != null)
U.unpark(w);
}
}
if (whead == h) {
if ((np = node.prev) != p) {
// 更新链表,将当前节点插入到正确的位置
if (np != null)
(p = np).next = node; // 处理过时的节点
} else if ((ps = p.status) == 0)
// 如果节点状态为 0,设置为 WAITING
U.compareAndSwapInt(p, WSTATUS, 0, WAITING);
else if (ps == CANCELLED) {
// 如果节点状态为 CANCELLED,处理取消的节点
if ((pp = p.prev) != null) {
node.prev = pp;
pp.next = node;
}
} else {
// 等待获取写锁
long time; // 0 参数表示无超时
if (deadline == 0L)
time = 0L;
else if ((time = deadline - System.nanoTime()) <= 0L)
// 超过等待时间,取消等待
return cancelWaiter(node, node, false);
Thread wt = Thread.currentThread();
U.putObject(wt, PARKBLOCKER, this);
node.thread = wt;
if (p.status < 0 && (p != h || (state & ABITS) != 0L) &&
whead == h && node.prev == p)
// 如果条件满足,则使当前线程进入等待状态
U.park(false, time); // 模拟 LockSupport.park
node.thread = null;
U.putObject(wt, PARKBLOCKER, null);
if (interruptible && Thread.interrupted())
// 如果线程被中断,取消等待
return cancelWaiter(node, node, true);
}
}
}
}
acquireWrite 方法总结: 目标: 负责处理排队、等待和自旋等复杂逻辑,确保在竞争条件下正确地获取写锁。
步骤:
自旋排队:
方法开始时,尝试通过自旋的方式将当前线程排入等待队列。如果当前没有读锁或写锁,尝试直接获取写锁。如果无法直接获取写锁,初始化队列或创建新的等待节点,将其添加到队列中。
等待队列初始化:
如果队列为空,创建队列头节点,并将尾节点指向该头节点。若节点已经存在,则创建新的节点并将其加入队列。
自旋等待写锁:
在自旋阶段,尝试检查队列头节点并自旋等待写锁。更新队列头节点为当前节点,确保其他线程能够正确地获取写锁。
处理过时的等待节点:
在等待期间,帮助释放已经过时的等待节点,并唤醒那些被阻塞的线程。
等待获取写锁:
如果条件允许,将当前线程进入等待状态,直到写锁可用。支持中断处理,如果线程被中断则取消等待。
# unlockWrite方法
unlockWrite方法
/**
* 解锁写锁。
*
* 这个方法会验证提供的印章(`stamp`)是否为当前锁的写锁印章。如果印章无效或不是写锁印章,会抛出 `IllegalMonitorStateException`。
* 成功解锁后,将状态重置为原始值 `ORIGIN`(如果 `stamp` 变为 0L),或者恢复为提供的印章加上写锁位 `WBIT`。
* 如果队列头部节点(`whead`)不为空且状态不为 0,则调用 `release` 方法释放队列中可能被阻塞的线程。
*
* @param stamp 写锁的印章值。
* @throws IllegalMonitorStateException 如果印章无效或不是写锁印章。
*/
public void unlockWrite(long stamp) {
WNode h;
// 验证提供的印章是否与当前锁状态匹配,并且是写锁印章
if (state != stamp || (stamp & WBIT) == 0L)
throw new IllegalMonitorStateException(); // 印章无效,抛出异常
// 更新锁状态。如果解锁后状态为 0L,重置为原始值 ORIGIN
state = (stamp += WBIT) == 0L ? ORIGIN : stamp;
// 如果队列头部节点不为空且状态不为 0,释放队列中的线程
if ((h = whead) != null && h.status != 0)
release(h);
}
unlockWrite方法总结: 目标: 安全地释放持有的写锁,并确保相关线程被正确唤醒。
步骤:
验证印章:
首先,方法会检查提供的印章是否与当前锁状态匹配,且印章是否包含写锁位 (WBIT)。如果不匹配或不包含写锁位,抛出 IllegalMonitorStateException。
更新锁状态: 如果印章有效,解锁后将状态重置为原始值 ORIGIN(如果 stamp 加上 WBIT 结果为 0L),否则将状态更新为印章加上 WBIT。这个操作确保锁的状态正确反映当前的锁持有情况。
释放阻塞线程: 检查队列头部节点 (whead) 是否存在且状态不为 0。如果存在,则调用 release 方法尝试释放队列中的线程。
release方法
/**
* 释放队列中阻塞的线程。
*
* 这个方法会尝试从队列中找到下一个需要被唤醒的节点(`WNode`)。
* 将队列头部节点的状态从 `WAITING` 更新为 0(表示节点不再等待)。
* 如果队列中的下一个节点(`q`)存在且状态不为 `CANCELLED`,唤醒线程。
*
* @param h 队列中的头部节点。
*/
private void release(WNode h) {
if (h != null) {
WNode q; Thread w;
// 更新队列头部节点的状态为 0
U.compareAndSwapInt(h, WSTATUS, WAITING, 0);
// 查找下一个需要被唤醒的节点(如果下一个节点为空或者状态为 CANCELLED)
if ((q = h.next) == null || q.status == CANCELLED) {
for (WNode t = wtail; t != null && t != h; t = t.prev)
if (t.status <= 0)
q = t;
}
// 如果找到的节点存在且线程不为空,唤醒线程
if (q != null && (w = q.thread) != null)
U.unpark(w);
}
}
release方法总结:
目标: 从队列中释放阻塞的线程,并确保下一个等待的线程被正确唤醒。
步骤:
更新节点状态:
将队列头部节点的状态从 WAITING 更新为 0,表示该节点不再需要等待。此操作标志着头部节点已处理完毕,准备释放或处理下一个节点。
查找并设置下一个待唤醒的节点:
如果头部节点的下一个节点为空或状态为 CANCELLED,从队列尾部向前查找有效的待唤醒节点。这样可以确保唤醒操作的正确性,并避免唤醒已取消或不再需要的线程。
唤醒线程:
如果找到有效的待唤醒节点,并且该节点关联的线程不为空,则唤醒该线程。通过调用 U.unpark(w),可以确保线程能够继续执行,减少系统中的线程阻塞时间。
# StampedLock管理线程的队列
StampedLock内部基于WNode
实现的阻塞队列和AQS实现的阻塞队列类似。
初始化时,新建个空节点,whead=wtail=NULL
。
之后再往里面加入一个个读线程或写线程节点。
但是上面acquireRead
和acquireWrite
方法对于自旋的操作就和AQS有很大不同了。
在AQS
里面,当一个线程CAS state
失败之后,会立即加入阻塞队列,并且进入阻塞状态。但在StampedLock
中,CAS state
失败之后,会不断自旋,自旋足够多的次数之后,如果还拿不到锁,才进入阻塞状态。为此,根据CPU的核数,定义了自旋次数的常量值。如果是单核的CPU,肯定不能自旋,在多核情况下,才采用自旋策略。
还有个比较特殊的地方在于,每个WNode里面有一个cowait指针,用于串联起所有的读线程。
例如,队列尾部阻塞的是一个读线程 1,现在又来了读线程 2、3,那么会通过cowait指针,把1、2、3串联起来。1被唤醒之后,2、3也随之一起被唤醒,因为读和读之间不互斥。
也就是 当入队一个线程时,如果队尾是写结点,则直接链接到队尾。 当入队一个读线程时,如果队尾是读节点,则直接链接到该读结点的cowait链中。
# 4、StampedLock
和ReentrantReadWriteLock
对比
特性 | StampedLock | ReentrantReadWriteLock |
---|---|---|
引入版本 | JDK 8 | JDK 5 |
锁类型 | 提供乐观读锁、悲观读锁和写锁 | 提供悲观读锁和写锁 |
乐观读锁 | 支持(tryOptimisticRead() ) | 不支持 |
悲观读锁 | 支持(readLock() ) | 支持(readLock() ) |
写锁 | 支持(writeLock() ) | 支持(writeLock() ) |
锁升级 | 支持从乐观读锁升级到悲观读锁或写锁(tryConvertToWriteLock() 等) | 不支持 |
锁降级 | 支持从写锁降级为读锁(tryConvertToReadLock() ) | 支持从写锁降级为读锁 |
锁管理复杂度 | 复杂,需要管理戳记,确保在转换锁时一致性 | 相对简单,直接使用 readLock() 和 writeLock() |
性能优化 | 乐观读锁减少了锁竞争,提高了读取性能 | 读写锁性能依赖于锁的竞争情况(大量并发读可能会导致写线程饥饿) |
公平性 | 不提供公平性(锁的获取是非公平的) | 可以选择公平性(通过构造函数 ReentrantReadWriteLock(true) ) |
条件变量Condition | 不支持 | 支持 |
# 5、 总结
StampedLock
适用于需要高效读取性能的场景,通过乐观读锁减少锁竞争,同时支持锁的升级和降级,但锁的管理相对复杂,不支持重入,并且使用乐观读锁时需要遵循一定的顺序,所以使用时一定要谨慎。
参考资料:
https://www.skjava.com/series/article/1446338301
《Java并发编程之美》
《Java并发实现原理:JDK源码剖析》