Java并发工具AQS条件队列Condition实现详解

技术分享  / 只看大图  / 倒序浏览   ©

#楼主# 2019-11-6

跳转到指定楼层

马上注册,分享更多源码,享用更多功能,让你轻松玩转云大陆。

您需要 登录 才可以下载或查看,没有帐号?立即注册

x
你是否使用过java并发工具包中的Lock和Condition。本文分析一下JDK是如何实现Condition条件队列的,对你今后的使用或许有帮助。如果你觉得分析源码太累,看不懂,可以通过阅读本文以大致了解java显式锁的实现,保证你面试够用了。
约定:

  • 本文AQS均指java.util.concurrent.locks.AbstractQueuedSynchronizer类。
  • 本文CAS操作指的是compareAndSet 或 compareAndSwap原子操作,需要您自行学习相关基础。
AQS如何实现条件队列
AQS提供一个内部类ConditionObject,由这个ConditionObject实现条件队列。
例如,经典代码:
Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();
...
lock.lock();
try {
while ( 某条件A ) {
condition.await();
}
// 条件A满足时应执行的代码
} finally {
lock.unlock();
}

以上代码中的condition实际就是ConditionObject类。
首先,分析其声明:
public class ConditionObject implements Condition, java.io.Serializable {
1. 首先,它实现了java.util.concurrent.locks.Condition接口,表示自己是一个条件队列。作为内部类,它没有被static关键字修饰,表示它不能脱离外部的AQS类独立存在,必需与外部类AQS实例建立关联。
2. 然后,它设置了一个单链表,firstWaiter字段指向链表的头结点,lastWaiter指向尾结点。最先开始等待的线程位于头结点,最后等待的线程位于尾结点。
154009dflhl7yfv9flozhj.jpg
锁队列与条件队列

3. 接下来我们分析条件队列中的两个最核心的两个方法 signal 和 await()。至于signalAll,它与signalAll()几乎一致,区别只在于signal()只处理一个线程,而signalAll()处理条件队列中的所有线程。而awaitUninterruptibly(), awaitNanos(), awaitUntil()等awaitX()方法都与await()类似。
假设有以下代码:
Lock lock1 = new ReentrantLock();
Condition condtion1 = lock1.newCondition();
// 线程1调用下面这段代码:
lock1.lock();
while(某个条件) {
condition1.await();
}
lock1.unlock();
参照await的代码进行分析。首先,await()可以响应线程中断,所以一开始先判断这个时候有没有被中断(即await还没开始任何操作就遇到了中断,也许是不耐烦的用户操作引起的)。然后,把当前线程放入条件队列,释放之前已持有的锁(锁中的状态保存到线程的本地变量)。再然后,不断的查询代表当前线程的结点是否已经在锁队列上,如果不在锁队列上则阻塞。由于,把当前线程结点移到锁队列上的一定是另外一个线程(通过调用signal())。当阻塞唤醒(即从LockSupport.park()放回)之后,且查询到当前线程结点已在锁队列上时,说明此时已有其他线程成功调用了signal()或signalAll(),然后执行争夺锁的操作并把之前的锁状态还原回去,即通过acquireQueued()完成。acquireQueued()只有在成功获得锁以后才会返回,最后检查一下在前面的时间里有没有中断,如果有中断则抛出异常。Javadoc中说中断与方法的成功返回之前,中断必需优先响应,这就是由于前面的中断信息只是被记录了下来,没有及时处理,很可能是在很早之前就收到了,直到最后才有机会处理。所以,中断比上述的阻塞循环结果优先。
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
其他的awaitX()系列操作,只是在阻塞循环体中增加了等待时间判断、检查有没有中断以尽早跳出循环的逻辑而已。
在await()进行等待的操作中,第一个操作是把线程放入等待队列,这是addConditionWaiter()完成的。addConditionWaiter()负责把线程放入条件队列。它先检查是否持有锁(未持有锁则抛出IllegalMonitorStateException)。然后,在进入条件队列之前,检查最后一个结点是否已经取消等待(cancelled),如果是,则触发一次对从条件队列开头直到末尾的遍历,把队列中所有已经取消等待的线程摘除。其实,这个遍历操作是一个优化措施,可以避免在遇到大量线程取消等待后(取消风暴 cancel storm)的无效锁竞争。最后,把当前线程放入条件队列末尾。
最后,我们分析signal(),如代码所示。signal() 通常是另外一个线程调用的,内部主要工作是调用doSignal()。
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
在doSignal()的实现代码中,从条件队列头部开始,找到第一个未取消等待的线程,并把线程由条件等待状态改为就绪状态(waitStatus=0),并且从条件队列取下来放入锁队列。进入锁队列以后,把这个线程用CAS操作设为等待锁状态(waitStatus=-1),如果设置失败(此期间有取消等待操作发生),则将这个线程唤醒(通过LockSupport.unpark())。把结点放入锁队列,并设置等待状态的相关操作在transferForSignal()方法中完成。
顺便说一下,doSignalAll()把条件队列中的所有线程取出,然后对于这些线程,逐一设为就绪状态。然后也按照doSignal的做法,放入锁队列,设置为等待锁的状态,CAS设置失败则唤醒这个线程。
4. Lock与Condition的不同点
Lock有自旋而Condition没有自旋。由于lock的加锁操作是无条件的,自旋可以减少线程上下文切换;而Condition是在检测到一个条件不满足时,主动进入阻塞。自旋对于Condition来说意义不大,由于条件不满足时,很大概率在接下来的一小段时间内依然不满足,所以它如果自旋只会浪费cpu计算资源。
Lock中的队列操作需要大量谨慎的CAS操作,由于没有锁的保护;而Condition由于有锁的保护,所以在把结点在条件队列和锁队列之间移动时,无需太多谨慎的CAS操作,只是在修改等待状态时,需要用到CAS操作,以应对突然发生的取消等待操作。
5. 总结
看完Condition的实现之后你是不是对Condition的运行和使用又有了更深入的体会呢。假如你需要面试,可以重点看这里。总结一下,你要知道Condtion与一个AQS对象建立了关联,并且内部有一个单链表,所有等待线程(调用await)都在这个单链表里面等待。条件的等待和唤醒是通过把代表线程的结点在条件队列和锁队列之间移动来实现的。当有线程在这个Condition上调用await时,它就会把自身放入条件队列并且阻塞,而阻塞是通过一个循环来完成的,在这个循环中交替执行阻塞(LockSupport.park)、判断是否在锁队列上。当有线程在这个Condition上调用了signal时,就会把线程从条件队列转移到它锁关联的那个AQS的锁队列上,并且将它唤醒(此时,阻塞循环中的判断是否在锁队列的条件就会满足,导致跳出循环)。
分享淘帖
回复

使用道具

您的回复是对作者最大的奖励

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关于作者

海上灵光Y8

新手猿

  • 主题

    5

  • 帖子

    5

  • 关注者

    0

Archiver|手机版|小黑屋|云大陆 | 赣ICP备18008958号-4|网站地图
Powered by vrarz.com!  © 2019-2020版权所有云大陆