简介

StampedLock 是JDK1.8 开始提供的一种锁, 是对之前介绍的读写锁 ReentrantReadWriteLock 的功能增强。StampedLock 有三种模式:Writing(读)、Reading(写)、Optimistic Reading(乐观度),StampedLock 的功能不是基于AQS来实现的,而是完全自己内部实现的功能,不支持重入。在加锁的时候会返回一个戳,解锁的时候需要传入,匹配完成解锁操作。

官方使用示例

class Point {
private double x, y;
private final StampedLock sl = new StampedLock(); void move(double deltaX, double deltaY) { // an exclusively locked method
// 写锁-独占资源
long stamp = sl.writeLock();
try {
x += deltaX;
y += deltaY;
} finally {
sl.unlockWrite(stamp);
}
} double distanceFromOrigin() { // A read-only method
// 只读的方法,比较乐观,认为读的过程中不会有写,所以这里是乐观度
long stamp = sl.tryOptimisticRead();
double currentX = x, currentY = y;
if (!sl.validate(stamp)) { // 检查乐观读锁后是否有其他写锁发生
// 获取一个普通的读锁
stamp = sl.readLock();
try {
currentX = x;
currentY = y;
} finally {
// 释放读锁
sl.unlockRead(stamp);
}
}
return Math.sqrt(currentX * currentX + currentY * currentY);
} void moveIfAtOrigin(double newX, double newY) { // upgrade
// Could instead start with optimistic, not read mode
long stamp = sl.readLock();
try {
while (x == 0.0 && y == 0.0) {
// 普通读锁转换成写锁,返回0为转换失败
long ws = sl.tryConvertToWriteLock(stamp);
if (ws != 0L) {
stamp = ws;
x = newX;
y = newY;
break;
} else {
sl.unlockRead(stamp);
stamp = sl.writeLock();
}
}
} finally {
sl.unlock(stamp);
}
}
}

官方demo中用到的 api 主要有获取写锁(writeLock())、释放写锁(unlockWrite(stamp))、获取普通读锁(readLock())、释放普通读锁(unlockRead(stamp))、获取乐观读锁(tryOptimisticRead())、检测乐观读版本(validate(stamp))、普通读锁转换成写锁(tryConvertToWriteLock(stamp))。下面分析源码的时候也主要根据这几个方法来分析。

源码分析

主要内部类

  1. 等待节点:WNode

    用于维护 CLH 队列的节点,源码如下:

    static final class WNode {
    volatile WNode prev; // 前驱节点
    volatile WNode next; // 后继节点
    volatile WNode cowait; // 链接的读者列表
    volatile Thread thread; // 线程
    volatile int status; // 状态 0, WAITING, or CANCELLED
    final int mode; // 两种模式:RMODE or WMODE
    WNode(int m, WNode p) { mode = m; prev = p; }
    }
  2. ReadWriteLockView:实现了ReadWriteLock接口,提供了读写锁获取接口

    final class ReadWriteLockView implements ReadWriteLock {
    public Lock readLock() { return asReadLock(); }
    public Lock writeLock() { return asWriteLock(); }
    }
  3. ReadLockView 和 WriteLockView:都实现了Lock接口,并实现了所有的方法

主要属性

  1. CLH 队列

    /** Head of CLH queue */
    private transient volatile WNode whead;
    /** Tail (last) of CLH queue */
    private transient volatile WNode wtail;
  2. 其他常量属性

    /** CPU的核心数量,用来控制自旋的次数 */
    private static final int NCPU = Runtime.getRuntime().availableProcessors();
    /** 获取锁入队前最大重试次数:CPU核心数大于1:64,否则0 */
    private static final int SPINS = (NCPU > 1) ? 1 << 6 : 0;
    /** 等待队列的头结点,获取锁最大重试次数:CPU核心数大于1:1024,否则0 */
    private static final int HEAD_SPINS = (NCPU > 1) ? 1 << 10 : 0;
    /** 再次阻塞前最大重试次数:CPU核心数大于1:65536,否则0 */
    private static final int MAX_HEAD_SPINS = (NCPU > 1) ? 1 << 16 : 0;
    /** The period for yielding when waiting for overflow spinlock */
    private static final int OVERFLOW_YIELD_RATE = 7; // must be power 2 - 1 /** 用于读取器计数的位数 */
    private static final int LG_READERS = 7; // 用来计算state值的常量
    private static final long RUNIT = 1L; // 读单位
    // 写锁的标识位 十进制:128 二进制位标示:1000 0000
    private static final long WBIT = 1L << LG_READERS;
    // 读状态标识 admol 十进制:127 二进制: 0111 1111
    private static final long RBITS = WBIT - 1L;
    // 读锁的最大标识 十进制:126 二进制 :0111 1110
    private static final long RFULL = RBITS - 1L;
    // 用来读取读写状态 十进制:255 二进制:1111 1111
    private static final long ABITS = RBITS | WBIT;
    // ~255 == 11111111111111111111111111111111111111111111111111111111 1000 0000
    // -128
    private static final long SBITS = ~RBITS; // 同步状态state的初始值 256 二进制:0001 0000 0000
    private static final long ORIGIN = WBIT << 1; // 中断
    private static final long INTERRUPTED = 1L; // 节点的状态
    private static final int WAITING = -1;
    private static final int CANCELLED = 1; // 节点的模式
    private static final int RMODE = 0;
    private static final int WMODE = 1; /** 同步状态 初始值 256 0001 0000 0000*/
    private transient volatile long state;
    /** 读计数饱和时的额外读取器计数 */
    private transient int readerOverflow;

    StampedLock 虽然没有继承AQS,但是属性上很相似,都有一个CLH队列,和一个同步状态值state, StampedLock用8位来表示读写锁状态,前7位是用来标识读锁状态的,第8位标识写锁占用,如果读锁数量超过了126(0111 1110 ),超出的用readerOverflow来计数。

构造方法

public StampedLock() {
// 初始值 256, 二进制:0001 0000 0000
state = ORIGIN;
}

获取写锁:writeLock()

源码展示:

public long writeLock() {
long s, next; // bypass acquireWrite in fully unlocked case only
return ((((s = state) & ABITS) == 0L && U.compareAndSwapLong(this, STATE, s, next = s + WBIT)) ? next : acquireWrite(false, 0L));
}

代码分析:

  1. 首先执行的是 ((s = state) & ABITS)== 0L,用来表示读锁和写锁是否可以被获取

    解析:第一次时,state 初始值是256,ABITS是255,计算过程:0001 0000 0000 && 0000 1111 1111 ,结算结果为0。
  2. 执行CAS操作:U.compareAndSwapLong(this, STATE, s, next = s + WBIT))

    解析:STATE 是state字段的内存地址相对于此对象的内存地址的偏移量,s 是期望值, next = s + WBIT 是更新值;s+WBIT 也就是 256 + 128,next计算结果为384,用二进制位表示就是0001 1000 0000,也就是将第8位设置为1,就是获得写锁。如果CAS 更新成功,返回next值,成功获得写锁。
  3. 如果CAS执行失败,则执行acquireWrite(false, 0L) ,进入等待队列获取锁

acquireWrite 代码分析:

// interruptible 是否要检查中断
// deadline:0 一直等待获取锁
private long acquireWrite(boolean interruptible, long deadline) {
// node:即将入队排队的节点
// p:当前排队节点入队之前的尾节点
WNode node = null, p;
// 第一次自旋:排队节点入队列自旋
for (int spins = -1;;) { // spin while enqueuing
long m, s, ns;
// 这个if 和外面一样的,从代码运行到这期间所有没有被释放
if ((m = (s = state) & ABITS) == 0L) {
// CAS 再次尝试获取下写锁
if (U.compareAndSwapLong(this, STATE, s, ns = s + WBIT))
// 成功获取写锁
return ns;
} else if (spins < 0) // 走到这,说明上面还是没获取到写锁,写锁被占用了,m的值为128
// 1. 确定自旋的次数 spins: 64 or 0
spins = (m == WBIT && wtail == whead) ? SPINS : 0;
else if (spins > 0) {
// 2.自旋的次数大于0,随机减一次自旋次数,直到减到spins为0(by.精灵王 这里实际是空转,没什么特点的逻辑处理)
if (LockSupport.nextSecondarySeed() >= 0)
--spins;
} else if ((p = wtail) == null) { // initialize queue
// 3.自旋spins减到0后会立马执行到这里
// p被赋值为尾节点
// 初始化队列, WMODE:写,null:前驱节点
WNode hd = new WNode(WMODE, null);
if (U.compareAndSwapObject(this, WHEAD, null, hd))
// 初始化队列时,尾节点等于头节点
wtail = hd;
} else if (node == null)
// 4.初始化队列后,下一次自旋,构建当前排队节点,并指定了其尾节点
node = new WNode(WMODE, p);
else if (node.prev != p) // 如果当前节点的前驱不是尾节点
// 5.前驱节点设置为之前队列的尾节点
node.prev = p;
else if (U.compareAndSwapObject(this, WTAIL, p, node)) {
// 6. CAS 更新尾节点为当前排队的节点
p.next = node;
// 退出自旋
break;
}
}
// 第二次自旋
for (int spins = -1;;) {
WNode h, np, pp; int ps;
if ((h = whead) == p) { // 如果头节点和之前的尾节点p是同一个, 说明马上应该轮到node节点获得锁
if (spins < 0)
// ① 设置自旋次数 1024 or 0
spins = HEAD_SPINS;
else if (spins < MAX_HEAD_SPINS)
// 第一次自旋1024次没有获取到锁,这次自旋翻倍
// 自旋次数*2 2048 继续进入到下面的自旋
spins <<= 1;
for (int k = spins;;) { // spin at head
// ② 第三次自旋,不断尝试获得锁(自旋1024或者2048次),直到成功获得锁 或者 break
long s, ns;
// ((s = state) & ABITS) == 0L 表示锁没有被占用
if (((s = state) & ABITS) == 0L) {
// CAS 修改state值
if (U.compareAndSwapLong(this, STATE, s, ns = s + WBIT)) {
// CAS 修改成功获得锁,设置新的头结点
whead = node;
node.prev = null;
return ns;
}
} else if (LockSupport.nextSecondarySeed() >= 0 && --k <= 0)
// 随机立减自旋次数 自旋次数为0时跳出自旋循环
break;
}
} else if (h != null) { // help release stale waiters
// 头节点不为空
// 进入情景:写锁被获取,队列中很多等待获取读锁的线程,写锁释放,读锁被唤醒后可能进入到这里
WNode c; Thread w;
while ((c = h.cowait) != null) { // 自旋
// 头节点的 cowait不为空
// h.cowait 修改成节点的下一个cowait
if (U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) && (w = c.thread) != null)
// 唤醒 cowait 里面的线程
U.unpark(w);
}
}
if (whead == h) { // 如果头结点没有变化
// P 是之前的尾节点
if ((np = node.prev) != p) {
// != 之前的尾节点,也就是说当前节点的前驱节点不是尾节点时
if (np != null)
// 保存尾节点和当前节点的连接关系
(p = np).next = node; // stale
}
else if ((ps = p.status) == 0)
// ③ 上面第三次自旋break后会进入到这里,修改尾节点状态
// 更新尾节点的状态为WAITING:-1, 然后继续回到第二次自旋的地方,重新开始自旋
U.compareAndSwapInt(p, WSTATUS, 0, WAITING);
else if (ps == CANCELLED) {
// p节点状态是取消,则删除p节点
if ((pp = p.prev) != null) {
node.prev = pp;
pp.next = node;
}
} else {
// 超时时间
long time; // 0 argument to park means no timeout
if (deadline == 0L)
time = 0L;
else if ((time = deadline - System.nanoTime()) <= 0L)
// 设置了超时时间 已经超时,取消当前node节点
return cancelWaiter(node, node, false);
Thread wt = Thread.currentThread();
// 为给定地址设置值,忽略修饰限定符的访问限制,与此类似操作还有: putInt,putDouble,putLong,putChar等
U.putObject(wt, PARKBLOCKER, this);
// node节点指向当前线程
node.thread = wt;
// p.status < 0 的只有-1 也就是WAITING
if (p.status < 0 && (p != h || (state & ABITS) != 0L) && whead == h && node.prev == p)
// 阻塞当前线程
U.park(false, time); // emulate LockSupport.park
// 线程被唤醒后,清除节点的线程
node.thread = null;
U.putObject(wt, PARKBLOCKER, null);
if (interruptible && Thread.interrupted()) // 要检查中断 && 线程有被中断
// 取消当前node节点
return cancelWaiter(node, node, true);
}
}
}
}

获取写锁过程总结:

  1. 首先检查锁有没有被占用

    1. 没有被占用,尝试 CAS 修改state值,CAS 修改成功则获得锁,返回新的 state 值。
    2. CAS 修改失败的话进入下面自旋的逻辑
  2. 第一层自旋(节点入队):
    1. 先检查下锁有没有被占用((m = (s = state) & ABITS) == 0L), CAS 尝试一下获取锁,获取失败再继续自旋
    2. 入队之前会自旋64次(CPU核心数大于1),期间不做任何处理
    3. 初始化排队队列的队头队尾节点,当前节点加入到队尾,CAS 更新尾节点,更新成功则退出第一次自旋
  3. 开始第二层自旋(尝试获取锁,阻塞线程),第二层自旋和第三层自旋嵌套执行的:
    1. 如果头节点和之前的尾节点p还是是同一个(没有其他获取锁的节点排队已经入队), 说明马上应该轮到node节点获得锁(排队的只有node节点)。

      1. 初始化第三层自旋次数(第一次1024,第二次2048),开启第三层自旋

        1. 位运算检查锁是否有被释放((s = state) & ABITS) == 0L),CAS 修改 state 值,修改成功,退出,返回新的state值
      • 这里其实就是自旋和park线程之间性能的一个权衡,马上就要获得锁了,是自旋还是阻塞线程继续等,这里选择了先自旋1024次,如果没有获得锁,继续自旋2048次,如果还是没获得锁,则退出第三层自旋,回到第二层自旋,准备阻塞当前线程。
    2. 如果排队的头结点不为空,检查头结点的cowait 链表,如果不为空,自旋 CAS 修改头节点的cowait, 尝试唤醒整个链的节点线程
    3. 第三层自旋完成后还是没有获取到锁,阻塞当前线程,等待被唤醒,被唤醒后继续第二层自旋获取锁,重复这个过程,直到获取锁成功推出。

释放写锁:unlockWrite(stamp)

public void unlockWrite(long stamp) {
WNode h;
// state != stamp 检查解锁与加锁的版本是否匹配
// (stamp & WBIT) == 0L 为true的话说明锁没有被占用
if (state != stamp || (stamp & WBIT) == 0L)
// 抛出异常
throw new IllegalMonitorStateException();
// 释放写锁,会增加state的版本
// stamp += WBIT 等于二进制(第一次加写锁和解锁) 0001 1000 0000 + 0000 1000 0000 == 0010 0000 0000
// 解锁会把stamp 的二进制第8位设置为0
// 相当于重新赋值state值
state = (stamp += WBIT) == 0L ? ORIGIN : stamp;
if ((h = whead) != null && h.status != 0) // 头结点不为 && 状态不为初始状态0(一般是WAITING -1),说明队列中有排队获取锁的线程
// 唤醒头节点的后继节点
release(h);
}
private void release(WNode h) {
if (h != null) {
// q节点:头节点的有效后继节点
// w: 需要唤醒的线程
WNode q; Thread w;
// 将头节点的状态设置成0
U.compareAndSwapInt(h, WSTATUS, WAITING, 0);
if ((q = h.next) == null || q.status == CANCELLED) { // 如果头节点的后继为空 或者 是取消状态
// 就从排队的队尾找一个有效的节点
for (WNode t = wtail; t != null && t != h; t = t.prev)
if (t.status <= 0)
q = t;
}
// 找到了有效的节点,唤醒其线程
if (q != null && (w = q.thread) != null)
U.unpark(w);
}
}

释放写锁过程总结:

  1. 检查锁印章戳是否匹配,锁是否有被占用,检查不通过抛出异常
  2. 通过位运算stamp += WBIT计算新的state值,state 二进制位的第8位会被设置成0就是写锁解锁
  3. 检测队列中是否有排队获取锁的线程
    1. 唤醒下一个等待获取锁的线程(unpark(thread)

获取普通读锁:readLock()

public long readLock() {
long s = state, next; // bypass acquireRead on common uncontended case
// 在没有线程获得锁的情况下,s的初始值是256
// whead == wtail 为true:表示队列为空
// (s & ABITS) < RFULL: 已获取读锁的数小于最大值126
return ((whead == wtail && (s & ABITS) < RFULL && U.compareAndSwapLong(this, STATE, s, next = s + RUNIT))
? next : acquireRead(false, 0L));
}

代码解析:

  1. 队列为空 && 已获取读锁次数小于126 && CAS 修改 state 值

    1. 条件完全成立,成功获得锁,返回新的state值
  2. 没有成功,进入到acquireRead(false, 0L)方法排队获取锁

acquireRead源码展示(代码超100行,需耐心观看):

private long acquireRead(boolean interruptible, long deadline) {
// p节点为尾节点 node为入队节点
WNode node = null, p;
// 第一层大循环 第一次自旋,是不是和获取写锁的很像?
for (int spins = -1;;) {
WNode h;
if ((h = whead) == (p = wtail)) { // 首尾节点相等,说明队列为空,有线程在排队不会进入if
// 前面没获取到锁,队列又为空,是不是应该马上就是当前线程获取锁了?
// 第二次自旋 自旋64次 目的是为了看马上能不能获取锁(排队队列为空,没线程排队时,会在这里自旋获取锁)
for (long m, s, ns;;) {
// 这里是个三目运算,代码太长,拆开来看
// (m = (s = state) & ABITS) < RFULL;和进入readLock()方法时的条件一样,判断读锁的数是否达到最大值,只有写锁被获取,这里就是false
// 我们假设前面写锁被获取了,现在获取读锁,m 就是128,大于RFULL 126
// 没有达到最大值, CAS 修改状态值 U.compareAndSwapLong(this, STATE, s, ns = s + RUNIT)
// 超过最大值了,记得前面那个readerOverflow属性不?在tryIncReaderOverflow这累加
if ((m = (s = state) & ABITS) < RFULL ? U.compareAndSwapLong(this, STATE, s, ns = s + RUNIT) :
(m < WBIT && (ns = tryIncReaderOverflow(s)) != 0L))
return ns;
else if (m >= WBIT) {// if条件成立,说明说明被占用
//
if (spins > 0) {
if (LockSupport.nextSecondarySeed() >= 0)
--spins; // 随机减自旋次数
} else {
if (spins == 0) { // 自旋次数减到0了,还没获取到读锁
WNode nh = whead, np = wtail;
if ((nh == h && np == p) || (h = nh) != (p = np))
break; // 退出自旋
}
spins = SPINS; // 初始自旋次数 64次
}
}
}
// 上面到这里都是在处理队列为空,马上要获取到锁的情况
}
if (p == null) { // 尾节点为空,初始化排队队列,有线程在排队时不会进入到这里
WNode hd = new WNode(WMODE, null);
if (U.compareAndSwapObject(this, WHEAD, null, hd)) // CAS 设置头节点
// 运行到这里后,会回到第一次的自旋,再次进入到第二次自旋,这次 spins 为0,只会自旋一次
wtail = hd;
} else if (node == null)
// 初始化当前入队排队节点,有线程在排队时,直接进入到这里,然后继续第一次自旋
node = new WNode(RMODE, p);
else if (h == p || p.mode != RMODE) { // 队列为空 或者 尾节点不是读模式
// 排队节点入队
if (node.prev != p)
node.prev = p; // 设置排队节点的前驱节点
// 继续第一次自旋
else if (U.compareAndSwapObject(this, WTAIL, p, node)) { // CAS 修改尾节点
p.next = node; // 老的尾节点的后继节点为当前节点
// 进入到这里会退出第一层自旋, 直接进入到下面第二层的大的自旋
break;
}
} else if (!U.compareAndSwapObject(p, WCOWAIT, node.cowait = p.cowait, node))
// 上面那个if分支进不去,只有条件:队列不为空 and 尾节点是读模式 为真
// 进入到了这里,说明CAS失败
// 这里的CAS 就是把当前节点加入到尾节点的cowait栈里面
// 从这里可以看出加入的顺序是个栈结构,先把旧的尾节点的cowait赋值给node节点的cowait,然后再把node节点赋值给尾节点
node.cowait = null;
else {
// 进入到这,说明上面的if分支都没有进去,尾节点不为空,当前节点不为空,队列不为空,尾节点是读模式,上面CAS修改成功
// 总结一下进入到这里的条件就是,有个线程获得了写锁还没释放,队列中有读线程在排队
// 第三次自旋,有线程在排队获取锁时,会进入到这里自旋
for (;;) {
WNode pp, c; Thread w;
if ((h = whead) != null && (c = h.cowait) != null &&
U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) &&
(w = c.thread) != null) // help release
// 头节点不为空,且其cowait节点不为空,唤醒整个cowait栈的线程
U.unpark(w);
if (h == (pp = p.prev) || h == p || pp == null) {
// 头节点等于尾节点的前驱节点 或者头节点等于尾节点 或者 尾节点的前驱节点为空
// 说明还是马上轮到自己获得锁
long m, s, ns;
do {
// 判断是否可以使用CAS获取读锁
if ((m = (s = state) & ABITS) < RFULL ?
U.compareAndSwapLong(this, STATE, s, ns = s + RUNIT) :
(m < WBIT && (ns = tryIncReaderOverflow(s)) != 0L))
return ns;
} while (m < WBIT); // m < WBIT时表示写锁没有被占用,一直尝试获取锁
}
if (whead == h && p.prev == pp) { // 对头没有发生变化 ,队尾也没发生变化
long time;
if (pp == null || h == p || p.status > 0) { // 队尾的前驱节点为空或者 头节点等于尾节点 或者 老的尾节点被取消(>0的状态只有1,取消)
node = null; // 抛弃当前节点,退出当前循环,回到第一层的自旋,重新构建节点
break;
}
if (deadline == 0L)
time = 0L;
else if ((time = deadline - System.nanoTime()) <= 0L) // 超时
return cancelWaiter(node, p, false); // 取消节点
Thread wt = Thread.currentThread();
U.putObject(wt, PARKBLOCKER, this);
node.thread = wt;
if ((h != pp || (state & ABITS) == WBIT) &&
whead == h && p.prev == pp)
U.park(false, time); // 阻塞当前线程
// 线程被唤醒,开始继续自旋获取锁
node.thread = null;
U.putObject(wt, PARKBLOCKER, null);
if (interruptible && Thread.interrupted())
return cancelWaiter(node, p, true);
}
}
}
}
// 第二层大的循环
for (int spins = -1;;) {
WNode h, np, pp; int ps;
if ((h = whead) == p) { // 如果队列为空,说明马上轮到当前线程获得锁了
// 这个大的if 里面做的就是获取锁
if (spins < 0)
// 初始化本次自旋获取锁的次数:1024次
spins = HEAD_SPINS;
else if (spins < MAX_HEAD_SPINS)
// 上面1024次自旋没有获取到锁,就自旋翻倍:2048次,继续下面的自旋
spins <<= 1;
// 开始自旋获取锁
for (int k = spins;;) { // spin at head
long m, s, ns;
// 这个if条件 检查了是否可以获取锁,如果可以就CAS获取锁
if ((m = (s = state) & ABITS) < RFULL ?
U.compareAndSwapLong(this, STATE, s, ns = s + RUNIT) :
(m < WBIT && (ns = tryIncReaderOverflow(s)) != 0L)) {
// 进入到这个if里面,说明就获得了锁
WNode c; Thread w;
// 这里的node节点是还没有绑定thread的
whead = node;
node.prev = null;
// 要唤醒当前node节点中的所有cowait节点线程
// 当前节点是在上面的第一层大的自旋入队的,其他获取读锁的节点都是挂在这个节点的cowait下的
while ((c = node.cowait) != null) {
if (U.compareAndSwapObject(node, WCOWAIT,
c, c.cowait) &&
(w = c.thread) != null)
U.unpark(w); // 唤醒线程
}
return ns;
} else if (m >= WBIT && LockSupport.nextSecondarySeed() >= 0 && --k <= 0)
// 上面没有获取到锁,自旋减次数,直到为0,退出自旋
break;
}
} else if (h != null) { // 队列不为空,头节点不为空
WNode c; Thread w;
while ((c = h.cowait) != null) {
if (U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) &&
(w = c.thread) != null)
U.unpark(w);
}
}
// 运行到这,说明还没获取到锁
if (whead == h) { // 头节点没变过
if ((np = node.prev) != p) { // 检查节点的链接关系
if (np != null)
(p = np).next = node; // stale
}
else if ((ps = p.status) == 0)
// 检查尾节点的状态,为0则更新成-1,回到第二层大的循环开始处
U.compareAndSwapInt(p, WSTATUS, 0, WAITING);
else if (ps == CANCELLED) { // p节点被取消,删除这个节点
if ((pp = p.prev) != null) {
node.prev = pp;
pp.next = node;
}
} else { // 上面2048次自旋后还是没获取到锁,进入到最终阻塞线程的环节
long time;
if (deadline == 0L)
time = 0L;
else if ((time = deadline - System.nanoTime()) <= 0L)
return cancelWaiter(node, node, false); // 超时了就取消当前节点
Thread wt = Thread.currentThread(); // 当前线程
U.putObject(wt, PARKBLOCKER, this);
node.thread = wt;
if (p.status < 0 && (p != h || (state & ABITS) == WBIT) && whead == h && node.prev == p)
U.park(false, time); // 阻塞线程
// 线程被唤醒了,继续执行 第二层大的自旋获取锁
node.thread = null;
U.putObject(wt, PARKBLOCKER, null);
if (interruptible && Thread.interrupted()) // 被唤醒了,发现要求中断线程 并且线程被中断了
return cancelWaiter(node, node, true); // 取消当前节点
}
}
}
}

获取普通读锁总结:

  • 位运算检查写锁是否被占用,读锁是否超限制

    1. 满足条件,直接CAS 修改state值,并返回新的state值
  • 开始第一层大的自旋,第一层大的自旋里面包含了两种情况不同的自旋:
    1. 第一种自旋情况:排队的队列为空,没有其他线程在排队等待锁时

      • 这种情况说明,锁虽然已经被占用,但是马上就应该是我得到锁了,所以我先在这儿自旋(64次)等等你释放锁,免得阻塞我自己,之后唤醒还需要成本
      • 自旋没有获取到锁,会退出第一层大的自旋,进入到第二层大的自旋
    2. 第二种自旋情况:写锁被占用,排队的队列不为空,队尾是读模式时
      • 这种情况,会不断尝试获取锁,阻塞线程,等待被唤醒,一直在这个自旋里面,直到获得锁,或者超时中断被取消,不会进入到第二层大的自旋
  • 第二层大的自旋
    • 这一层的自旋是对第一层里面第一种自旋情况(马上轮到我获得锁,但是前面持有锁的线程就是不释放)的补充,因为没有线程在排队,只要前面的线程释放了锁,马上就可以获得锁了,所以这一层还是在自旋获得锁,只不过自旋次数有增加

      • 首先会尝试自旋1024次获得锁,如果前面还没释放锁,再自旋2048次
    • 如果2048次之后还是没有等到前面的锁释放,就阻塞当前线程,等待被唤醒,直到获得锁,或者超时中断被取消

cowait栈分析

下面代码是main线程先获取写锁不释放,之后T0,T1,T2,T3线程先后去获取读锁,最后断点观察整个排队队列的情况

StampedLock sl = new StampedLock();
long stamp = sl.writeLock();
// 先让T0线程去排队到尾节点
TimeUnit.SECONDS.sleep(1);
new Thread(new Runnable(){
@SneakyThrows
@Override
public void run(){
long stamp = sl.readLock();
System.out.println(stamp + " x");
}
},"T0").start(); // 之后T1线程来获取读
TimeUnit.SECONDS.sleep(3);
new Thread(new Runnable(){
@SneakyThrows
@Override
public void run(){
long stamp = sl.readLock();
System.out.println(stamp + " x");
}
},"T1").start();
TimeUnit.SECONDS.sleep(3);
new Thread(new Runnable(){
@SneakyThrows
@Override
public void run(){
long stamp = sl.readLock();
System.out.println(stamp + " x");
}
},"T2").start();
TimeUnit.SECONDS.sleep(3); // 在这里先断点,进入到这里后,再到源码位置去断点,就可以看到如下图的情况了
new Thread(new Runnable(){
@SneakyThrows
@Override
public void run(){
long stamp = sl.readLock();
System.out.println(stamp + " x");
}
},"T3").start();

运行代码后,断点截图:

他们的关系可以用如下表示,横向是链表,纵向是cowait栈。

释放读锁:unlockWrite(stamp)

public void unlockRead(long stamp) {
long s, m; WNode h;
for (;;) { // 自旋
if (((s = state) & SBITS) != (stamp & SBITS) || (stamp & ABITS) == 0L || (m = s & ABITS) == 0L || m == WBIT)
// 检查版本
throw new IllegalMonitorStateException();
if (m < RFULL) { // 锁标识小于读锁的最大标识
if (U.compareAndSwapLong(this, STATE, s, s - RUNIT)) { // CAS 更新state值
if (m == RUNIT && (h = whead) != null && h.status != 0) // 头结点不为空
release(h); // 唤醒下一个节点
break;
}
} else if (tryDecReaderOverflow(s) != 0L)
// 读锁个数饱和溢出,尝试减少readerOverflow
break;
}
}
private void release(WNode h) {
if (h != null) {
WNode q; Thread w;
U.compareAndSwapInt(h, WSTATUS, WAITING, 0);
if ((q = h.next) == null || q.status == CANCELLED) {
for (WNode t = wtail; t != null && t != h; t = t.prev)
if (t.status <= 0)
q = t;
}
if (q != null && (w = q.thread) != null)
U.unpark(w);
}
}

释放读锁的逻辑也比较简单,和释放写锁的逻辑很相识,唤醒下一个节点的release方法也完全一致

获取乐观读锁:tryOptimisticRead()

public long tryOptimisticRead() {
long s;
return (((s = state) & WBIT) == 0L) ? (s & SBITS) : 0L;
}

乐观读锁的逻辑也比较简单,就一个三目运算,((s = state) & WBIT) == 0L 就是看写锁是否有被占用,写锁被占用返回0,否则返回写锁没被占用的包含高位版本有效戳(也就是写锁的版本)。

检测乐观读版本:validate(stamp)

public boolean validate(long stamp) {
// 插入内存屏障,禁止load操作重排序。
// 由于StampedLock提供的乐观读锁不阻塞写线程获取读锁,当线程共享变量从主内存load到线程工作内存时,会存在数据不一致问题
// 解决锁状态校验运算发生重排序导致锁状态校验不准确的问题
U.loadFence();
return (stamp & SBITS) == (state & SBITS);
}

返回true:表示期间没有写锁发生,读锁为所谓

返回false:表示期间有写锁发生

那这里是怎么计算的呢?

SBITS为-128,用二进制表示是:1111 1111 1111 1000 0000

两种情况:

  1. 假如先获取乐观锁,再获取读锁;

    乐观锁返回的stamp为256,二进制位是 0001 0000 0000;

    获取读锁之后state值是257,二进制位是 0001 0000 0001;

    它们分别于与-128 进行与运算后都是0001 0000 0000,也就是十进制256,返回true;
  2. 假如先获取乐观锁,再获取写锁;

    乐观锁返回的stamp为256,二进制位是 0001 0000 0000;

    获取写锁之后state值是384,二进制位是 0001 1000 0000;

    它们分别与-128 进行与运算后,相当与 256 == 384,结果肯定返回false;

普通读锁转换成写锁:tryConvertToWriteLock(stamp)

public long tryConvertToWriteLock(long stamp) {
// m标识最新的锁标识
// a标识被转换的锁的锁标识
long a = stamp & ABITS, m, s, next;
while (((s = state) & SBITS) == (stamp & SBITS)) { // 检查锁持有状态
if ((m = s & ABITS) == 0L) {
if (a != 0L)
break;
if (U.compareAndSwapLong(this, STATE, s, next = s + WBIT))
return next;
} else if (m == WBIT) { // 写锁已经被占用
if (a != m)
break;
return stamp; // 说明被转换前就是写锁
} else if (m == RUNIT && a != 0L) { // 被转换前的是普通读锁,写锁没被占用
if (U.compareAndSwapLong(this, STATE, s, next = s - RUNIT + WBIT))
// s:是之前的锁状态
// s - RUNIT:就是释放读锁
// + WBIT :就是加写锁(进入之前写锁没被占用)
return next; // 返回最新的锁状态
} else
break; // 其他情况,全部返回0,转换失败
}
return 0L; // 返回0,标识转换写锁失败
}

普通读锁转换成写锁过程总结:

  1. 如果转换前是写锁,直接返回写锁
  2. 如果转换前是读锁,转换期间,写锁被占用,返回0,转换失败
  3. 如果转换前是读锁,写锁没有被占用,释放读锁,加写锁,返回写锁,转换成功
  4. 其他情况,全部返回0,转换失败

StampedLock 总结

  1. StampedLock 是一种支持乐观读锁的高级版读写锁
  2. StampedLock 没有使用AQS 同步框架,而是完全自己实现的同步状态state 和 CLH队列维护算法
  3. 同步状态state的低7位标识读锁的数量,第8位标识写锁是否被占用,高24位记录写锁的版本,每次释放写锁会版本位置会加1
  4. 写锁每次获取state会加128,释放也会加128,读锁是加减1
  5. StampedLock 的连续多个读锁线程,只有第一个是在队列上,后面的读线程都存在第一个线程的cowait栈结构上
  6. StampedLock 唤醒一个读锁线程后,读锁线程会唤醒所有在它cowait栈上的等待读锁线程
  7. StampedLock 用到了大量的自旋操作,适合持有锁时间比较短的任务,持有锁时间长的话等待的线程自旋后还是会阻塞自己。
  8. StampedLock 同一个线程先获取读锁,再获取写锁也会死锁
  9. StampedLock 写锁不支持重入,读锁支持重入
  10. StampedLock 不支持条件锁
  11. StampedLock 不支持公平锁,上来有条件就 CAS 尝试获得锁

StampedLock 与 ReentrantReadWriteLock的区别总结

使用功能上的区别:

  1. StampedLock 支持乐观读锁,RRWL 没有
  2. StampedLock 支持锁转换,tryConvertToXXXXX(stamp)
  3. StampedLock 写锁不支持重入,RRWL 支持重入
  4. StampedLock 不支持条件锁,RRWL 支持条件锁
  5. StampedLock 不支持公平锁,RRWL 支持公平锁

底层实现的区别:

  1. StampedLock 没有使用同步框架AQS,RRWL 是基于AQS 来实现排队、阻塞、唤醒等功能的
  2. StampedLock 获取锁时,会直接使用CAS尝试获得锁(不公平,不看排队),会根据CPU核心数来决定自旋次数等待获取锁
  3. StampedLock 的 CLH 队列中连续的读线程只有首个节点存储在队列中,后面的节点都存储的首个节点的cowait栈中,即 1→5→4→3→2→1 这种顺序。
  4. StampedLock 中同步状态 state 被分成了三部分,第8位记录的是写锁的状态,低7位记录读锁的次数,其他位记录的是写锁的版本
  5. RRWL 中同步状态 state 被分成两部分,高16位记录读锁次数,低16位记录写锁次数
  6. StampedLock 唤醒一个读锁线程后,读线程会唤醒所有在它cowait栈上的等待读锁线程

源码分析:升级版的读写锁 StampedLock的更多相关文章

  1. [源码分析]读写锁ReentrantReadWriteLock

    一.简介 读写锁. 读锁之间是共享的. 写锁是独占的. 首先声明一点: 我在分析源码的时候, 把jdk源码复制出来进行中文的注释, 有时还进行编译调试什么的, 为了避免和jdk原生的类混淆, 我在类前 ...

  2. 锁对象-Lock: 同步问题更完美的处理方式 (ReentrantReadWriteLock读写锁的使用/源码分析)

    Lock是java.util.concurrent.locks包下的接口,Lock 实现提供了比使用synchronized 方法和语句可获得的更广泛的锁定操作,它能以更优雅的方式处理线程同步问题,我 ...

  3. 深入理解读写锁—ReadWriteLock源码分析

    转载:https://blog.csdn.net/qq_19431333/article/details/70568478 ReadWriteLock管理一组锁,一个是只读的锁,一个是写锁.读锁可以在 ...

  4. Java并发指南10:Java 读写锁 ReentrantReadWriteLock 源码分析

    Java 读写锁 ReentrantReadWriteLock 源码分析 转自:https://www.javadoop.com/post/reentrant-read-write-lock#toc5 ...

  5. java并发锁ReentrantReadWriteLock读写锁源码分析

    1.ReentrantReadWriterLock 基础 所谓读写锁,是对访问资源共享锁和排斥锁,一般的重入性语义为如果对资源加了写锁,其他线程无法再获得写锁与读锁,但是持有写锁的线程,可以对资源加读 ...

  6. java读写锁源码分析(ReentrantReadWriteLock)

    读锁的调用,最终委派给其内部类 Sync extends AbstractQueuedSynchronizer /** * 获取读锁,如果写锁不是由其他线程持有,则获取并立即返回: * 如果写锁被其他 ...

  7. 【源码分析】HashMap源码再读-基于Java8

    最近工作不是太忙,准备再读读一些源码,想来想去,还是先从JDK的源码读起吧,毕竟很久不去读了,很多东西都生疏了.当然,还是先从炙手可热的HashMap,每次读都会有一些收获.当然,JDK8对HashM ...

  8. Netty源码分析第3章(客户端接入流程)----&gt;第5节: 监听读事件

    Netty源码分析第三章: 客户端接入流程 第五节: 监听读事件 我们回到AbstractUnsafe的register0()方法: private void register0(ChannelPro ...

  9. 6. SOFAJRaft源码分析— 透过RheaKV看线性一致性读

    开篇 其实这篇文章我本来想在讲完选举的时候就开始讲线性一致性读的,但是感觉直接讲没头没尾的看起来比比较困难,所以就有了RheaKV的系列,这是RheaKV,终于可以讲一下SOFAJRaft的线性一致性 ...

  10. gRPC源码分析0-导读

    gRPC是Google开源的新一代RPC框架,官网是http://www.grpc.io.正式发布于2016年8月,技术栈非常的新,基于HTTP/2,netty4.1,proto3.虽然目前在工程化方 ...

随机推荐

  1. webstorm+react+webpack-demo

    序言:通过这个小例子你也许.大概.可能会掌握以下几点 1.webstorm如何使用命令行 2.如何使用webpack的loaders把json格式的文件转化为javascript文件 3.如何使用不同 ...

  2. Objective-C中将结构体与联合体封装为NSValue对象

    在Clang 3.7之前,Objective-C已经可以使用类似@100.@YES.@10.5f等字面量表示一个NSNumber对象:用类似@"xxx"的字面量表示一个NSStri ...

  3. 【转】非常适用的Sourceinsight插件,提高效率事半功倍

    原文网址:http://www.cnblogs.com/wangqiguo/p/3713211.html 一直使用sourceinsight编辑C/C++代码,sourceinsight是一个非常好用 ...

  4. hibernate 建表一对一 就是一对多,多的一方外键唯一unique

    Person.java package cn.itcast.hiberate.sh.domain.onetoone; import java.io.Serializable; import java. ...

  5. Background Worker Component

    http://www.delphiarea.com/products/delphi-components/backgroundworker/ Background Worker Component ( ...

  6. android模块化app开发-4为APP减负

    现在android应用中一个趋势是应用越来越大,免去游戏不谈普通APP也是一个个的体积直线增长.这里面除了业务增长外各种接口jar包的对接也占了不少比重.像广告SDK,统计SDK,支付SDK等这些我们 ...

  7. poj3642 Charm Bracelet(0-1背包)

    题目意思: 给出N,M,N表示有N个物品,M表示背包的容量.接着给出每一个物品的体积和价值,求背包可以装在的最大价值. http://poj.org/problem? id=3624 题目分析: o- ...

  8. 通过appium-desktop定位元素

    https://www.cnblogs.com/feng0815/p/8481679.html http://www.cnblogs.com/feng0815/p/8481495.html appiu ...

  9. vue之$forceUpdate

    由于一些嵌套特别深的数据,导致数据更新了.UI没有更新(连深度监听都没有监听到) this.$forceUpdate();

  10. NAT ------ 为什么手动设置NAT端口映射(转发)不成功,导致访问不了局域网服务器

    手动设置端口映射成功的条件是路由器WAN口接的是外网IP,而不是网络提供商的路由器NAT之后的IP.假如有个外网的客户端,连的服务器IP一定要是外网IP(假设IP_A),如果自己的路由器WAN口接的是 ...