今天是2016年最后一天,对于做技术的来讲,又是一年积累的结束。这一年里,学到的,错失的,都值得好好总结一下。在2016的最后几个小时里,还是决定静下心来,写一篇技术博文,把数日死磕源码的一些心得记录下来,也算对即将到来的新年表个态:做技术的,就是要有点死磕自己的工匠精神,谈不上愉悦他人,至少给自己的职业生涯定下一个基调。既然选择了,就坚持下去,不是因为无奈,而是因为热爱。

本周在编写一个定时调度任务时,遇到一个需要控制并发量的场景:应用有六台机器,每台机器配置的定时任务执行并发量是10,这个配置对所有任务都生效,但眼下要编写的这个任务需要调用外部系统一个很重的接口,如果并发量过高,存在将外部系统拖垮的风险。这个没有冒险上线验证评估,因为待编写的定时任务每月只执行一次,因此只需要单独对该任务做一下并发数限制,初步定为单机并发数限制为5,则六台机器并发总数为30。

尝试过自己写一些控制流程,但限于个人能力,控制逻辑也是漏洞百出:stuck_out_tongue_winking_eye:。然后很识趣地去调研成熟的控制方案,发现其实Doug Lea大叔编写的concurrent工具包里就有现成的解决方案–Semaphore。这是一个通过信号量控制同一时间资源访问并发数的工具,其本质是共享锁,可以对比常见的synchronized,Reentrantlock等排它锁。排它锁限制同一时刻资源访问并发数只能为1,为共享锁则可以自定义并发数,可以理解为广义的锁。

我们先来看下Semaphore的使用方法,十分简单:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
static final Semaphore SEMAPHORE = new Semaphore(5);
public static void main(String[] args) {
for(int i = 0; i < 1000; i++) {
Thread th = new Thread(new Runnable() {
@Override
public void run() {
try {
SEMAPHORE.acquire();
Thread.sleep(5); // 执行业务逻辑
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
SEMAPHORE.release();
}
});
th.start();
}
}

声明一个Semaphore对象,将并发数限制作为参数传递给构造器,默认为非公平锁。通过acquire方法进行锁的获取,抢占资源访问权限,当抢占数满5后,后续的线程将阻塞,直到前5个线程有处理完业务后调用release方法唤醒后续的线程进行资源抢占。这块的逻辑非常好理解,经测试后也发现能按照预定的意图完美运行。

这么神奇的几行代码,就解决了问题,不得不佩服Doug Lea大叔的精湛功力,也不得不自我汗颜。那么,我们就进入Semaphore,看看这几行代码里到底发生了什么。

Semaphore的背后其实就是AQS–AbstractQueuedSynchronizer,整个concurrrent工具包的运作基石。AQS实质是实现了一个双向链表,将暂时抢占资源失败的线程虚化为Node节点在该链表进行排队,当前线程处理完业务后,依次唤醒排队的节点线程。这个队列的基本原理是十分简单的,也是CLH锁的基本原理,但AQS对CLH进行了改进,最大的区别在于排队的线程并没有不断进行自旋,而是阻塞,需等待其他线程的唤醒。

CLH节点不断自旋其实就是要不断检查前续节点的状态,如果允许则跳出自旋进行锁抢占。这种设计比较适用于占用资源耗时极短的业务场景,但因为短时间的自旋不会有线程上下文切换的开销,而且很快就能获取到锁,对CPU的消耗可以容忍。但在实际业务中,往往线程处理任务都是比较耗时的,若是让排队线程进行自旋,在不控制好排队数和超时机制的情况下,会对CPU资源造成极大的消耗,甚至出现宕机的危险。因此,对于这类处理场景,将等待线程阻塞是比较合理的处理方式,

AQS将排队线程的自旋等待改为阻塞等待,当阻塞线程被唤醒后,即可进行资源的抢占,并在释放时,唤醒后续等待的线程。这块逻辑最简单的实现是ReentrantLock可重入锁,大家可以参阅下源码。本文重点要写的是Semaphore,其控制逻辑更为复杂,下面我们慢慢道来。

首先从获取锁下手,进入acquire,AQS的acquireSharedInterruptibly方法:

1
2
3
4
5
6
public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0) // arg为1
doAcquireSharedInterruptibly(arg);
}

从名称可以看出,可重入锁使用的是acquireInterruptibly,而这里是获取共享锁。首先判断线程是否中断,再尝试一次获取锁。由于默认是非公平锁,所以会调用Semaphore实现的nonfairTryAcquireShared方法。

1
2
3
4
5
6
7
8
9
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState(); // 获取当前信号量
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}

getState()用于获取AQS的state参数值,该参数表征了当前可用的信号量,即剩余的可用并发数。当计算的remaining小于零,则直接返回remaining,表示当前无资源可抢占,上层逻辑将该线程放入等待队列。若remaining>=0,则表示资源抢占成功,同时运用CAS方法原子地将state值更新。

tryAcquireShared体现了非公平锁的设计思路,即线程到来后直接尝试抢占,而公平锁会首先检查等待队列是否存在以确定是否直接入队。

好,现在我们进入doAcquireSharedInterruptibly方法,看下当信号量满员时,后续线程进来后的处理逻辑。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED); // 往等待队列中新增共享节点
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
break;
}
} catch (RuntimeException ex) {
cancelAcquire(node);
throw ex;
}
// Arrive here only if interrupted
cancelAcquire(node);
throw new InterruptedException();
}

首先向等待队列添加节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node); // 自旋入队
return node;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
Node h = new Node(); // Dummy header
h.next = node;
node.prev = h;
if (compareAndSetHead(h)) {
tail = node;
return h;
}
}
else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}

我们看到,入队的操作还是很简单的,只是设置tail节点采用CAS操作避免并发问题。这里有个问题,enq方法提供了完整的入队操作,并通过自旋保证一定能够入栈成功,但为何在调用enq操作前要执行一个快速入栈的操作呢?即当队列存在,则向节点添加到尾部,否则才执行enq操作。不难发现,快速入队的操作和enq方法中尾节点不为空时的操作实际是一样的。我们假设取消掉快速入队的操作,而是直接调用enq,逻辑并不会出现问题。那Doug Lea为何要在这里加一个快速入队操作呢?翻看了很多网络文章,基本都是人云亦云,或者直接翻译作者注释,对于为什么要进行快速入队操作并没有有说服力的解释。当然,这个问题可能本身并没有合理解释,或者作者只是想减少一次栈帧的深度,或者作者当时喝了点小酒:blush:?总之,目前我也没想明白为啥有这部操作,大家如果有见解的望不吝在评论中赐教。

经过入队操作,当前线程节点已经添加到等待队列的队尾。随后,判断当前节点的前置节点是否为头结点,若是则再次调用tryAcquireShared尝试抢占资源。这里若依旧抢占失败,则进入shouldParkAfterFailedAcquire:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}

shouldParkAfterFailedAcquire的入参是当前节点及其前置节点,这里主要是对前置节点的状态进行判断,若为SIGNAL(即-1),则直接返回true;若大于零(即被取消),那么不断寻找最近的状态不大于零的节点作为前置节点,返回false;其他情况下,将前置节点状态设为SIGNAL。SIGNAL状态表明后继节点线程可直接进入阻塞状态,等待唤醒。

当前置节点状态已经为SIGNAL,则进入parkAndCheckInterrupt方法阻塞当前线程,否则将前置节点状态置为SIGNAL,再次循环,尝试一次抢占资源,若再次失败,则进入shouldParkAfterFailedAcquire判断前置节点已为SIGNAL,阻塞当前线程。

可能很多同学都注意到了,当信号量满额后,后续到达的线程可能会经历三次资源抢占的尝试,然后才阻塞。第一次是入队前,第二次是将前置节点状态设为SIGNAL后,第三次抢占尝试后才阻塞。这个问题暂时留在这里,标记为问题一,下文介绍完资源释放的操作后我们再研究下为什么。

接下来看下doAcquireSharedInterruptibly方法中尝试资源抢占成功后的setHeadAndPropagate操作:

1
2
3
4
5
6
7
8
9
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);
if (propagate > 0 || h == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}

排它锁的setHead操作仅仅是将当前节点设置为head,实现队列头的切换,而在共享锁的处理中,还要添加一个传播的操作,即如果有其他的若干资源被释放,那当前线程在抢占到一个资源后,有必要具备唤醒后续线程进行剩余资源抢占的能力。这里记录下老的头结点,若其waitStatus小于0或者propagate(剩余资源数)大于0,且后续节点不为空(该队列中的节点均为共享模式),则进行后续节点的唤醒处理。作者在这里提到,在高并发下,可能出现不必要的唤醒操作,我们将在下文介绍完释放资源操作后解释这个问题,记为问题二。

下面看下共享资源释放的操作doReleaseShared,其实质是修改头节点的相关状态,并唤醒后继节点。该方法在释放共享资源操作中也会调用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}

这里检测头节点存在且有后续节点,若状态为SIGNAL,则转为0,并唤醒后续线程;若为0,则标记为PROPAGATE,当释放操作短时间内出现第二次时,将执行置为PROPAGATE的操作。这里的意思是当前头节点需要将唤醒操作传递下去,在setHeadAndPropagate中将判断到waitStatus小于0,执行后续唤醒操作,实现“传播”。另一方面,这里有个判断头节点是否切换的操作,当执行释放资源操作,唤醒后续线程后,该线程可能已经抢占到资源并执行了切换头节点的操作,这时检测到头节点已发生变化,于是继续释放新头节点的后续节点。这里不断自旋的过程,保证了在高并发情况下对头节点切换的感知,将唤醒操作传递下去。

再看下unparkSuccessor方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}

这里首先将头节点的状态置为0,然后判断后继节点,若为空或已被取消,则从尾节点开始倒序寻找,直到找到离头节点最近的未取消的节点,然后将其休眠。

这里为何从尾节点开始寻找呢?因为若正向寻找,当前头节点的后继节点可能出现变为空的情况,如在setHeadAndPropagate方法执行后会将老的头节点的后继节点置为空,以告诉GC实现回收,这样的话后继节点找不到,则将无法找到状态不大于0的后续节点,即使该节点是存在的。而从尾节点倒序寻找将确保能够找到该目标节点(前提是存在),因为尾节点的设置是线程安全的。

以上是获取资源操作所涉及到的主要流程方法,接下来我们看下释放资源的操作。

1
2
3
4
5
6
7
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int p = getState();
if (compareAndSetState(p, p + releases))
return true;
}
}

将释放的资源数叠加到可用信号量上,操作很简单。接下来的doReleaseShared上文已经涉及到。

我们最后来看下前文遗留下来的两个问题。

  • 问题一:线程在阻塞前为何进行了三次资源抢占操作。第一次抢占体现了非公平锁的实现逻辑,这个自不必多说。关键在于为何不在第一次判断是否阻塞(shouldParkAfterFailedAcquire)时就进行阻塞,这样的话就只进行两次资源抢占。这个我们结合释放资源的doReleaseShared操作来看下。我们知道,在doReleaseShared操作中,若检查到头节点的状态为SIGNAL,则置为0,并唤醒后续线程。假设我们在第一次shouldParkAfterFailedAcquire操作时就直接阻塞了,那么这时doReleaseShared操作中unparkSuccessor就是一次真实的线程唤醒操作,必然带来线程上下文切换的开销,而实际代码中,第一次判断阻塞只是将头节点状态置为-1,第二次才真的进行阻塞,这种分步操作其实就节省下了一次可能的线程唤醒操作(虽然unparkSuccessor逻辑依旧会执行,但由于线程并没有阻塞,因此相当于一次无效操作,但带来的开销很小)。
  • 问题二:在高并发情况下,setHeadAndPropagate中可能存在不必要的唤醒后继线程的操作。如当判断propagate大于0进入后续流程后,可能新来的未入队的线程抢占资源成功,导致可用资源数为0,这时后续逻辑依旧会唤醒后继线程,然后进行抢占尝试,必然会失败,也就是说是不必要的唤醒。可以看到,在并发流程控制中,很多时候只能通过一些冗余操作来弥补并发造成的一致性问题,即使逻辑变得冗余,所以我想这也是并发编程中的一块难点,怎样找到保证功能的前提下性能的最大优化。

后记

对AQS的了解本人目前还只是停留到代码表象,对于更多深层的动机其实还需要更多去理解揣摩。通过阅读这部分代码,我真切感受到短短数行代码所能产生的巨大能量,几乎每一行代码的编写动机都值得推敲。这里附上当年Doug Lea大叔发表的AQS论文,讲述了AQS的设计原理,抽时间还会去细度一下。但关于代码的设计细节,还是只能再慢慢推敲源码才能发现。