ReZero's Utopia.

AQS

Word count: 2.2kReading time: 8 min
2020/09/12 Share

AQS

CLH

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
package com.hand.hcf.app.base.bigdata;

import java.util.concurrent.atomic.AtomicReference;

/**
* @version: 1.0
* https://github.com/yuanmabiji/Java-SourceCode-Blogs
*/
// CLHLock.java

public class CLHLock {
/**
* CLH锁节点
*/
private static class CLHNode {
// 锁状态:默认为false,表示线程没有获取到锁;true表示线程获取到锁或正在等待
// 为了保证locked状态是线程间可见的,因此用volatile关键字修饰
volatile boolean locked = false;
}
// 尾结点,总是指向最后一个CLHNode节点
// 【注意】这里用了java的原子系列之AtomicReference,能保证原子更新
private final AtomicReference<CLHNode> tailNode;
// 当前节点的前继节点
private final ThreadLocal<CLHNode> predNode;
// 当前节点
private final ThreadLocal<CLHNode> curNode;

// CLHLock构造函数,用于新建CLH锁节点时做一些初始化逻辑
public CLHLock() {
// 初始化前继节点,注意此时前继节点没有存储CLHNode对象,存储的是null
predNode = new ThreadLocal<>();
// 初始化当前的CLH节点
curNode = ThreadLocal.withInitial(CLHNode::new);
// 初始化时尾结点指向一个空的CLH节点
// 为啥? 第一个获取锁的不就一定会把 tailNode 指向有node的cur节点,但是同时返回了null 给 preNode,导致锁判定空指针
tailNode = new AtomicReference<>(new CLHNode());
}

/**
* 获取锁
*/
public void lock() {
// 取出当前线程ThreadLocal存储的当前节点,初始化值总是一个新建的CLHNode,locked状态为false。
CLHNode currNode = curNode.get();
// 此时把lock状态置为true,表示一个有效状态,
// 即获取到了锁或正在等待锁的状态
currNode.locked = true;
// 当一个线程到来时,总是将尾结点取出来赋值给当前线程的前继节点;
// 然后再把当前线程的当前节点赋值给尾节点
// 【注意】在多线程并发情况下,这里通过AtomicReference类能防止并发问题
// 【注意】哪个线程先执行到这里就会先执行predNode.set(preNode);语句,因此构建了一条逻辑线程等待链

// ----**这条链避免了线程饥饿现象发生**----

CLHNode preNode = tailNode.getAndSet(currNode);
// 将刚获取的尾结点(前一线程的当前节点)付给当前线程的前继节点ThreadLocal
// 【思考】这句代码也可以去掉吗,如果去掉有影响吗?
// 可以,因为临时变量持有着前节点的引用,其实就相当于一个 TL 即可,predNode的意义不大
predNode.set(preNode);
// 【1】若前继节点的locked状态为false,则表示获取到了锁,不用自旋等待;
// 【2】若前继节点的locked状态为true,则表示前一线程获取到了锁或者正在等待,自旋等待
while (preNode.locked) {
System.out.println("线程" + Thread.currentThread().getName() + "没能获取到锁,进行自旋等待。。。");
}
// 能执行到这里,说明当前线程获取到了锁
System.out.println("线程" + Thread.currentThread().getName() + "获取到了锁!!!");
}

/**
* 释放锁
*/
public void unLock() {
// 获取当前线程的当前节点
CLHNode node = curNode.get();
// 进行解锁操作
// 这里将locked至为false,此时执行了lock方法正在自旋等待的后继节点将会获取到锁
// 【注意】而不是所有正在自旋等待的线程去并发竞争锁,
// 因为此时除了第一个拿到锁的线程,其他线程都还没获取锁,或者获取了锁但是却在疯狂自旋
node.locked = false;
System.out.println("线程" + Thread.currentThread().getName() + "释放了锁!!!");
// 小伙伴们可以思考下,下面两句代码的作用是什么??
// 防只有 a线程工作时, 释放了锁以后,再次获取锁异常
// A 获取锁后释放,此时tailNode 和 curNode 指向同一个,而predNode 指向了之前的 tailNode
// 此时 若A仍和tailNode 持有同一个对象
// lock 时,curNode 设置为 true,tailNode 对应也就是 true了,这样 predNode 就会自己等自己

// 但是如果说指向新节点,那么获取锁后释放,再次lock 时, curNode 设置为 true, 但 tailNode 没变,仍然是之前的false,
// 这样 predNode 就可以了
CLHNode newCurNode = new CLHNode();
curNode.set(newCurNode);

// 【优化】能提高GC效率和节省内存空间,请思考:这是为什么?
// curNode.set(predNode.get());
}
}

AQS[0] 独占

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 头结点: 当前持有锁的线程 队列不包含该节点
private transient volatile Node head;

// 阻塞的尾节点,每个新的节点进来,都插入到最后,也就形成了一个链表
private transient volatile Node tail;

// 这个是最重要的,代表当前锁的状态,0代表没有被占用,大于 0 代表有线程持有当前锁
// 这个值可以大于 1,是因为锁可以重入,每次重入都加上 1
private volatile int state;

// 代表当前持有独占锁的线程,举个最重要的使用例子,因为锁可以重入
// reentrantLock.lock()可以嵌套调用多次,所以每次用这个来判断当前线程是否已经拥有了锁
// if (currentThread == getExclusiveOwnerThread()) {state++}
private transient Thread exclusiveOwnerThread; //继承自AbstractOwnableSynchronizer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
/**
* Inserts node into queue, initializing if necessary. See picture above.
* @param node the node to insert
* @return node's predecessor
*/
// 采用自旋的方式入队
// 之前说过,到这个方法只有两种可能:等待队列为空,或者有线程竞争入队,
// 自旋在这边的语义是:CAS设置tail过程中,竞争一次竞争不到,我就多次竞争,总会排到的
private Node enq(final Node node) {

for (;;) {
Node t = tail;
// 之前说过,队列为空也会进来这里
if (t == null) { // Must initialize
// 初始化head节点
// 细心的读者会知道原来 head 和 tail 初始化的时候都是 null 的
// 还是一步CAS,你懂的,现在可能是很多线程同时进来呢
if (compareAndSetHead(new Node()))
// 给后面用:这个时候head节点的waitStatus==0, 看new Node()构造方法就知道了

// 这个时候有了head,但是tail还是null,设置一下,
// 把tail指向head,放心,马上就有线程要来了,到时候tail就要被抢了
// 注意:这里只是设置了tail=head,这里可没return哦,没有return,没有return
// 所以,设置完了以后,继续for循环,下次就到下面的else分支了
tail = head;
} else {
// 下面几行,和上一个方法 addWaiter 是一样的,
// 只是这个套在无限循环里,反正就是将当前线程排到队尾,有线程竞争的话排不上重复排
node.prev = t;
// 如果第一次set tail了后发生了线程切换,因而采用 CAS
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}

acquireQueued

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
// 现在,又回到这段代码了
// if (!tryAcquire(arg)
// && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
// selfInterrupt();

// 这里假设addWaiter 返回 tail节点后,tail节点就被其他线程cas替换掉了,其实这对下面的分析是没有影响的,上面的设置tail的最终目的是为了 使链表构成 并tail标志末尾,tail标志末尾的语义没有改变即可,

// 下面这个方法,参数node,经过addWaiter(Node.EXCLUSIVE),此时已经进入阻塞队列
// 注意一下:如果acquireQueued(addWaiter(Node.EXCLUSIVE), arg))返回true的话,
// 意味着上面这段代码将进入selfInterrupt(),所以正常情况下,下面应该返回false
// 这个方法非常重要,应该说真正的线程挂起,然后被唤醒后去获取锁,都在这个方法里了
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
// p == head 说明当前节点虽然进到了阻塞队列,但是是阻塞队列的第一个,因为它的前驱是head
// 注意,阻塞队列不包含head节点,head一般指的是占有锁的线程,head后面的才称为阻塞队列
// 所以当前节点可以去试抢一下锁
// 这里我们说一下,为什么可以去试试:
// 首先,它是队头,这个是第一个条件,其次,当前的head有可能是刚刚初始化的node,
// enq(node) 方法里面有提到,head是延时初始化的,而且new Node()的时候没有设置任何线程
// 也就是说,当前的head不属于任何一个线程,所以作为队头,可以去试一试,
// tryAcquire已经分析过了, 忘记了请往前看一下,就是简单用CAS试操作一下state
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 到这里,说明上面的if分支没有成功,要么当前node本来就不是队头,
// 要么就是tryAcquire(arg)没有抢赢别人,继续往下看
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
// 什么时候 failed 会为 true???
// tryAcquire() 方法抛异常的情况
if (failed)
cancelAcquire(node);
}
}
CATALOG
  1. 1. AQS
    1. 1.1. CLH
    2. 1.2. AQS[0] 独占