我们知道,在java中提供了两类锁的实现,一种是在jvm层级上实现的synchrinized隐式锁,另一类是jdk在代码层级实现的,juc包下的Lock显示锁,而提到Lock就不得不提一下它的核心队列同步器(AQS)了,它的全称是AbstractQueuedSynchronizer,是用来构建锁或者其他一些同步组件的基础,除了ReentrantLock、ReentrantReadWriteLock外,它还在CountDownLatch、Semaphore以及ThreadPoolExecutor中被使用,通过理解队列同步器的工作原理,对我们了解和使用这些工具类会有很大的帮助。
创新互联公司专业为企业提供嘉定网站建设、嘉定做网站、嘉定网站设计、嘉定网站制作等企业网站建设、网页设计与制作、嘉定企业网站模板建站服务,十余年嘉定做网站经验,不只是建网站,更提供有价值的思路和整体网络服务。
为了便于理解AQS的概念,这里首先摘录部分AbstractQueuedSynchronizer的注释进行了简要翻译:
阅读这些注释,可以知道AbstractQueuedSynchronizer是一个抽象类,它基于内部先进先出(FIFO)的双向队列、以及内置的一些protected方法来实现同步器,完成同步状态的管理,并且我们可以通过子类继承AQS抽象类的方式,在共享模式或独占模式下,实现自定义的同步组件。
通过上面的描述,可以看出AQS中的两大核心是同步状态和双向的同步队列,来看一下源码中是如何对它们进行定义的:
- public abstract class AbstractQueuedSynchronizer
- extends AbstractOwnableSynchronizer implements java.io.Serializable {
- static final class Node {
- volatile int waitStatus;
- volatile Node prev;
- volatile Node next;
- volatile Thread thread;
- //...
- }
- private transient volatile Node head;
- private transient volatile Node tail;
- private volatile int state;
- //...
- }
下面针对这两个核心内容分别进行研究。
AQS内部静态类Node用于表示同步队列中的节点,变量表示的意义如下:
每个节点的prev和next指针在加入队列的时候进行赋值,通过这些指针就形成了一个双向列表,另外AQS还保存了同步队列的头节点head和尾节点tail,通过这样的结构,就能够通过头节点或尾节点,找到队列中的任何一个节点。使用图来表示同步队列的结构如下:
另外可以看到,在源码中为了保证可见性,同步器中的head、tail、state,以及节点中的prev,next属性都加了关键字volatile修饰。
AQS的另一核心同步状态,在代码中是使用int类型的变量state来表示的,通过原子操作修改同步状态的值,来实现对同步组件的状态进行修改。在子类中,主要通过AQS提供的下面3个方法对同步状态的访问和转换进行操作:
线程会试图修改state的值,如果修改成功那么表示线程得到或释放了同步状态,如果失败就会将当前线程封装成一个Node节点,然后将其加入到同步队列中,并阻塞当前线程。
AQS的设计使用了模板方法的设计模式,模板方法一般在父类中封装不变的部分(如算法骨架),把扩展的可变部分交给子类进行扩展,子类的执行结果会影响父类的结果,是一种反向的控制结构。AQS中应用了这种设计模式,将一部分方法交给子类进行重写,而自定义的同步组件在调用同步器提供的模板方法(父类中的方法)时,又会调用子类重写的方法。
以AQS类中常用于获取锁的acquire方法为例,它的代码如下:
- public final void acquire(int arg) {
- if (!tryAcquire(arg) &&
- acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
- selfInterrupt();
- }
acquire方法被final修饰,不可以在子类中重写,因为它是对外提供的模板方法,有相对具体和固定的执行逻辑。在acquire方法中调用了tryAcquire方法:
- protected boolean tryAcquire(int arg) {
- throw new UnsupportedOperationException();
- }
可以看到带有protected修饰的tryAcquire方法是一个空壳方法,并没有定义实际获取同步状态的逻辑,这就需要我们在继承AQS的子类中对齐进行重写,从而达到扩展目的。在重写过程中,就会用到上面提到的获取和修改同步状态的3个方法getState、setState和compareAndSetState。
以ReentrantLock中的方法调用为例,当调用ReentrantLock中的lock方法时,会调用继承了AQS的内部类Sync的父类中的acquire方法,acquire方法再调用子类Sync的tryAcquire方法并返回boolean类型结果。
除了tryAcquire方法外,子类中还提供了其他可以重写的方法,列出如下:
而我们在实现自定义的同步组件时,可以直接调用AQS提供的下面这些模板方法:
从模板方法中可以看出,大多方法都是独占模式和共享模式对称出现的,除去查询等待线程方法外,可以将他们分为两类:独占式获取或释放同步状态、共享式获取或释放同步状态,并且它们的核心都是acquire与release方法,其他方法只是在它们实现的基础上做了部分的逻辑改动,增加了中断和超时功能的支持。下面对主要的4个方法进行分析。
分析上面acquire方法中源码的执行流程:
1.首先调用tryAcquire尝试获取同步状态,如果获取成功,那么直接返回
2.如果获取同步状态失败,调用addWaiter方法生成新Node节点并加入同步队列:
- private Node addWaiter(Node mode) {
- Node node = new Node(Thread.currentThread(), mode);
- // Try the fast path of enq; backup to full enq on failure
- Node pred = tail;
- if (pred != null) {
- node.prev = pred;
- if (compareAndSetTail(pred, node)) {
- pred.next = node;
- return node;
- }
- }
- enq(node);
- return node;
- }
方法中使用当前线程和等待状态构造了一个新的Node节点,在同步队列的队尾节点不为空的情况下(说明同步队列非空),调用compareAndSetTail方法以CAS的方式把新节点设置为同步队列的队尾节点。如果队尾节点为空或添加新节点失败,则调用enq方法:
- private Node enq(final Node node) {
- for (;;) {
- Node t = tail;
- if (t == null) { // Must initialize
- if (compareAndSetHead(new Node()))
- tail = head;
- } else {
- node.prev = t;
- if (compareAndSetTail(t, node)) {
- t.next = node;
- return t;
- }
- }
- }
在同步队列为空的情况下,会先创建一个新的空节点作为头节点,然后通过CAS的方式将当前线程创建的Node设为尾节点。在for循环中,只有通过CAS将节点插入到队尾后才会返回,否则就会重复循环,通过这样的方式,能够将并发添加节点的操作变为串行添加,保证了线程的安全性。这一过程可以使用下图表示:
3.添加新节点完成后,调用acquireQueued方法,尝试以自旋的方式获取同步状态:
- final boolean acquireQueued(final Node node, int arg) {
- boolean failed = true;
- try {
- boolean interrupted = false;
- for (;;) {
- final Node p = node.predecessor();
- if (p == head && tryAcquire(arg)) {
- setHead(node);
- p.next = null; // help GC
- failed = false;
- return interrupted;
- }
- if (shouldParkAfterFailedAcquire(p, node) &&
- parkAndCheckInterrupt())
- interrupted = true;
- }
- } finally {
- if (failed)
- cancelAcquire(node);
- }
- }
当新添加的Node的前驱节点是同步队列的头节点并且尝试获取同步状态成功时,线程将Node设为头节点并从自旋中退出,否则调用shouldParkAfterFailedAcquire方法判断是否需要挂起:
- private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
- int ws = pred.waitStatus;
- if (ws == Node.SIGNAL)
- return true;
- if (ws > 0) {
- do {
- node.prev = pred = pred.prev;
- } while (pred.waitStatus > 0);
- pred.next = node;
- } else {
- compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
- }
- return false;
- }
在该方法中,传入的第一个Node类型的参数是当前节点的前驱节点,对其等待状态进行判断:
当返回为true时,调用parkAndCheckInterrupt方法:
- private final boolean parkAndCheckInterrupt() {
- LockSupport.park(this);
- return Thread.interrupted();
- }
在方法内部调用了LockSupport的park方法,阻塞当前线程,并返回当前线程是否被中断的状态。
在上面的代码中,各节点通过自旋的方式检测自己的前驱节点是否头节点的过程,可用下图表示:
4.当满足条件,返回acquire方法后,调用selfInterrupt方法。方法内部使用interrupt方法,唤醒被阻塞的线程,继续向下执行:
- static void selfInterrupt() {
- Thread.currentThread().interrupt();
- }
最后,使用流程图的方式总结acquire方法独占式获取锁的整体流程:
与acquire方法对应,release方法负责独占式释放同步状态,流程也相对简单。在ReentrantLock中,unlock方法就是直接调用的AQS的release方法。先来直接看一下它的源码:
- public final boolean release(int arg) {
- if (tryRelease(arg)) {
- Node h = head;
- if (h != null && h.waitStatus != 0)
- unparkSuccessor(h);
- return true;
- }
- return false;
- }
1.方法中首先调用子类重写的tryRelease方法,尝试释放当前线程持有的同步状态,如果成功则向下执行,失败返回false
2.如果同步队列的头节点不为空且等待状态不为初始状态,那么将调用unparkSuccessor方法唤醒它的后继节点:
- private void unparkSuccessor(Node node) {
- int ws = node.waitStatus;
- if (ws < 0)
- compareAndSetWaitStatus(node, ws, 0);
- Node s = node.next;
- if (s == null || s.waitStatus > 0) {
- s = null;
- for (Node t = tail; t != null && t != node; t = t.prev)
- if (t.waitStatus <= 0)
- s = t;
- }
- if (s != null)
- LockSupport.unpark(s.thread);
- }
方法主要实现的功能有:
同步队列新头节点的设置过程如下图所示:
在上面的过程中,采用的是从后向前遍历寻找未取消节点的方式,这是因为AQS的同步队列是一个弱一致性的双向列表,在下面的情况中,存在next指针为null的情况:
在了解了独占式获取同步状态后,再来看一下共享式获取同步状态。在共享模式下,允许多个线程同时获取到同步状态,来看一下它的源码:
- public final void acquireShared(int arg) {
- if (tryAcquireShared(arg) < 0)
- doAcquireShared(arg);
- }
首先调用子类重写的tryAcquireShared方法,返回值为int类型,如果值大于等于0表示获取同步状态成功,那么直接返回。如果小于0表示获取失败,执行下面的doAcquireShared方法,将线程放入等待队列使用自旋尝试获取,直到获取同步状态成功:
- private void doAcquireShared(int arg) {
- final Node node = addWaiter(Node.SHARED);
- boolean failed = true;
- try {
- boolean interrupted = false;
- for (;;) {
- final Node p = node.predecessor();
- if (p == head) {
- int r = tryAcquireShared(arg);
- if (r >= 0) {
- setHeadAndPropagate(node, r);
- p.next = null; // help GC
- if (interrupted)
- selfInterrupt();
- failed = false;
- return;
- }
- }
- if (shouldParkAfterFailedAcquire(p, node) &&
- parkAndCheckInterrupt())
- interrupted = true;
- }
- } finally {
- if (failed)
- cancelAcquire(node);
- }
- }
对上面的代码进行简要的解释:
1.调用addWaiter方法,封装新节点,并以共享模式(Node.SHARED)将节点放入同步队列中队尾
2.在for循环中,获取当前节点的前驱节点,如果前驱节点是同步队列的头节点,那么就以共享模式去尝试获取同步状态,判断tryAcquireShared的返回值,如果返回值大于等于0,表示获取同步状态成功,修改新的头节点,并将信息传播给同步队列中的后继节点,然后检查中断标志位,如果线程被阻塞,那么进行唤醒
3.如果前驱节点不是头节点、或获取同步状态失败时,调用shouldParkAfterFailedAcquire判断是否需要阻塞,如果需要则调用parkAndCheckInterrupt,在前驱节点的等待状态为SIGNAL时,将节点对应的线程阻塞
可以看到,共享式的获取同步状态的调用过程和acquire方法非常相似,但不同的是在获取同步状态成功后,会调用setHeadAndPropagate方法进行共享式同步状态的传播:
- private void setHeadAndPropagate(Node node, int propagate) {
- Node h = head; // Record old head for check below
- setHead(node);
- if (propagate > 0 || h == null || h.waitStatus < 0 ||
- (h = head) == null || h.waitStatus < 0) {
- Node s = node.next;
- if (s == null || s.isShared())
- doReleaseShared();
- }
- }
因为共享式同步状态是允许多个线程共享的,所以在一个线程获取到同步状态后,需要在第一时间通知后继节点的线程可以尝试获取同步资源,这样就可以避免其他线程阻塞时间过长。在方法中,把当前节点设置为头节点后,需要根据情况判断后继节点是否需要释放:
如果满足上面的任何一种状态,并且它的后继节点是SHARED状态的,则执行doReleaseShared方法释放后继节点:
- private void doReleaseShared() {
- for (;;) {
- Node h = head;
- if (h != null && h != tail) {
- int ws = h.waitStatus;
- if (ws == Node.SIGNAL) {
- if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
- continue; // loop to recheck cases
- unparkSuccessor(h);
- }
- else if (ws == 0 &&
- !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
- continue; // loop on failed CAS
- }
- if (h == head) // loop if head changed
- break;
- }
- }
doReleaseShared方法不仅在这里的共享状态传播的情况下被调用,还会在后面介绍的共享式释放同步状态中被调用。在方法中,当头节点不为空且不等于尾节点(意味着没有后继节点需要等待唤醒)时:
通过上面的流程,就实现了从头节点尝试向后唤醒节点,实现了共享状态的向后传播。
最后,再来看一下对应的共享式释放同步状态方法:
- public final boolean releaseShared(int arg) {
- if (tryReleaseShared(arg)) {
- doReleaseShared();
- return true;
- }
- return false;
- }
releaseShared方法会释放指定量的资源,如果调用子类重写的tryReleaseShared方法返回值为true,表示释放成功,那么还是执行上面介绍过的doReleaseShared方法唤醒同步队列中的等待线程。
在前面的介绍中说过,在使用AQS时,需要定义一个子类继承AbstractQueuedSynchronizer抽象类,并实现它的抽象方法来管理同步状态。接下来我们就来手写一个独占式的锁,按照文档中的推荐,我们将子类定义为自定义同步工具类的静态内部类:
- public class MyLock {
- private static class AqsHelper extends AbstractQueuedSynchronizer {
- @Override
- protected boolean tryAcquire(int arg) {
- int state = getState();
- if (state == 0) {
- if (compareAndSetState(0, arg)) {
- setExclusiveOwnerThread(Thread.currentThread());
- return true;
- }
- } else if (getExclusiveOwnerThread() == Thread.currentThread()) {
- setState(getState() + arg);
- return true;
- }
- return false;
- }
- @Override
- protected boolean tryRelease(int arg) {
- int state = getState() - arg;
- if (state == 0) {
- setExclusiveOwnerThread(null);
- setState(state);
- return true;
- }
- setState(state);
- return false;
- }
- @Override
- protected boolean isHeldExclusively() {
- return getState() == 1;
- }
- }
- private final AqsHelper aqsHelper = new AqsHelper();
- public void lock() {
- aqsHelper.acquire(1);
- }
- public boolean tryLock() {
- return aqsHelper.tryAcquire(1);
- }
- public void unlock() {
- aqsHelper.release(1);
- }
- public boolean isLocked() {
- return aqsHelper.isHeldExclusively();
- }
- }
在AQS的子类中,首先重写了tryAcquire方法,在方法中利用CAS来修改state的状态值,并在修改成功时设置当前线程独占资源。并且通过比较尝试获取锁的线程与持有锁的线程是否相同的方式,来实现了锁的可重入性。在重写的tryRelease方法中,进行资源的释放,如果存在重入的情况,会一直到所有重入锁释放完才会真正的释放锁,并放弃占有状态。
可以注意到在自定义的锁工具类中,我们定义了lock和tryLock两个方法,分别调用了acquire和tryAcquire方法,它们的区别是lock会等待锁资源,直到成功时才会返回,而tryLock尝试获取锁时,会立即返回成功或失败的状态。
接下来,我们通过下面的测试代码,验证自定义的锁的有效性:
- public class Test {
- private MyLock lock=new MyLock();
- private int i=0;
- public void sayHi(){
- try {
- lock.lock();
- System.out.println("i am "+i++);
- }finally {
- lock.unlock();
- }
- }
- public static void main(String[] args) {
- Test test=new Test();
- Thread[] th=new Thread[20];
- for (int i = 0; i < 20; i++) {
- new Thread(()->{
- test.sayHi();
- }).start();
- }
- }
- }
运行上面的测试代码,结果如下,可以看见通过加锁保证了对变量i的同步访问控制:
接下来通过下面的例子测试锁的可重入性:
- public class Test2 {
- private MyLock lock=new MyLock();
- public void function1(){
- lock.lock();
- System.out.println("execute function1");
- function2();
- lock.unlock();
- }
- public void function2(){
- lock.lock();
- System.out.println("execute function2");
- lock.unlock();
- }
- public static void main(String[] args) {
- Test2 test2=new Test2();
- new Thread(()->{
- test2.function1();
- }).start();
- }
- }
执行上面的代码,可以看到在function1未释放锁的情况下,function2对锁进行了重入并执行了后续的代码:
通过上面的学习,我们了解了AQS的两大核心同步队列和同步状态,并对AQS对资源的管理以及队列状态的变化有了一定的研究。其实归根结底,AQS只是提供给我们来开发同步组件的一个底层框架,在它的层面上,并不关心子类在继承它时要实现什么功能,AQS只是提供了一套维护同步状态的功能,至于要完成什么样的一个工具类,这完全是由我们自己去定义的。
网页名称:源码级深挖AQS队列同步器
文章链接:http://www.mswzjz.cn/qtweb/news8/290508.html
攀枝花网站建设、攀枝花网站运维推广公司-贝锐智能,是专注品牌与效果的网络营销公司;服务项目有等
声明:本网站发布的内容(图片、视频和文字)以用户投稿、用户转载内容为主,如果涉及侵权请尽快告知,我们将会在第一时间删除。文章观点不代表本网站立场,如需处理请联系客服。电话:028-86922220;邮箱:631063699@qq.com。内容未经允许不得转载,或转载时需注明来源: 贝锐智能