我们专注攀枝花网站设计 攀枝花网站制作 攀枝花网站建设
成都网站建设公司服务热线:400-028-6601

网站建设知识

十年网站开发经验 + 多家企业客户 + 靠谱的建站团队

量身定制 + 运营维护+专业推广+无忧售后,网站问题一站解决

Java 并发编程解析 | 基于JDK源码解析Java领域中的并发锁,我们可以从中学习到什么内容?

苍穹之边,浩瀚之挚,眰恦之美; 悟心悟性,善始善终,惟善惟道! —— 朝槿《朝槿兮年说》

成都创新互联公司自2013年创立以来,是专业互联网技术服务公司,拥有项目成都做网站、网站建设、外贸营销网站建设网站策划,项目实施与项目整合能力。我们以让每一个梦想脱颖而出为使命,1280元友好做网站,已为上家服务,为友好各地企业和个人服务,联系电话:028-86922220

写在开头

在Java领域中, 尤其是在并发编程领域,对于多线程并发执行一直有两大核心问题:同步和互斥。其中:

  • 互斥(Mutual Exclusion):一个公共资源同一时刻只能被一个进程或线程使用,多个进程或线程不能同时使用公共资源。即就是同一时刻只允许一个线程访问共享资源的问题。
  • 同步(Synchronization):两个或两个以上的进程或线程在运行过程中协同步调,按预定的先后次序运行。即就是线程之间如何通信、协作的问题。

针对对于这两大核心问题,利用管程是能够解决和实现的,因此可以说,管程是并发编程的万能钥匙。

虽然,Java在基于语法层面(synchronized 关键字)实现了对管程技术,但是从使用方式和性能上来说,内置锁(synchronized 关键字)的粒度相对过大,不支持超时和中断等问题。

为了弥补这些问题,从JDK层面对其“重复造轮子”,在JDK内部对其重新设计和定义,甚至实现了新的特性。

关健术语

本文用到的一些关键词语以及常用术语,主要如下:

  • 信号量(Semaphore): 是在多线程环境下使用的一种设施,是可以用来保证两个或多个关键代码段不被并发调用,也是作系统用来解决并发中的互斥和同步问题的一种方法。
  • 信号量机制(Semaphores): 用来解决同步/互斥的问题的,它是1965年,荷兰学者 Dijkstra提出了一种卓有成效的实现进程互斥与同步的方法。
  • 管程(Monitor) : 一般是指管理共享变量以及对共享变量的操作过程,让它们支持并发的一种机制。

基本概述

在Java领域中,我们可以将锁大致分为基于Java语法层面(关键词)实现的锁和基于JDK层面实现的锁。

在Java领域中,从JDK源码分析来看,基于JDK层面实现的锁大致主要可以分为以下4种方式:

  • 基于Lock接口实现的锁
  • 基于ReadWriteLock接口实现的锁
  • 基于AQS基础同步器实现的锁
  • 基于自定义API操作实现的锁

从阅读源码不难发现,在Java SDK 并发包主要通过AbstractQueuedSynchronizer(AQS)实现多线程同步机制的封装与定义,而通过Lock 和 Condition 两个接口来实现管程,其中 Lock 用于解决互斥问题,Condition 用于解决同步问题。

一. 基本理论

在并发编程领域,有两大核心问题:一个是互斥,即同一时刻只允许一个线程访问共享资源;另一个是同步,即线程之间如何通信、协作。

在操作系统中,一般有如果I/O操作时,对于阻塞和非阻塞是从函数调用角度来说的,其中:

  • 阻塞:如果读写操作没有就绪或者完成,则函数一直等待。
  • 非阻塞: 函数立即调用,然后让应用程序轮询循环。

而同步和异步则是从“读写是主要是由谁完成”的角度来说的,其中:

  • 同步: 读写操作主要交给应用程序完成
  • 异步: 读写操作主要由操作系统完成,一般完成之后,回调函数和事件通知应用程序。

其中,信号量机制(Semaphores)是用来解决同步/互斥的问题的,但是信号量(Semaphore)的操作分散在各个进程或线程中,不方便进行管理,因每次需调用P/V(来自荷兰语 proberen和 verhogen)操作,还可能导致死锁或破坏互斥请求的问题。

由于PV操作对于解决进程互斥/同步编程复杂,因而在此基础上提出了与信号量等价的——“管程技术”。

其中,管程(Monitor)当中定义了共享数据结构只能被管程内部定义的函数所修改,所以如果我们想修改管程内部的共享数据结构的话,只能调用管程内部提供的函数来间接的修改这些数据结构。

一般来说,管程(Monitor)和信号量(Semaphore)是等价的,所谓等价指的是用管程能够实现信号量,也能用信号量实现管程。

在管程的发展历程上,先后出现过Hasen模型、Hoare模型和MESA模型等三种不同的管程模型,现在正在广泛使用的是MESA模型。

在MESA模型中,管程中引入了条件变量(Conditional Variable)的概念,而且每个条件变量都对应有一个等待队列(Wait Queue)。其中,条件变量和等待队列的作用是解决线程之间的同步问题。

而对于解决线程之间的互斥问题,将共享变量(Shared Variable)及其对共享变量的操作统一封装起来,一般主要是实现一个线程安全的阻塞队列(Blocking Queue),将线程不安全的队列封装起来,对外提供线程安全的操作方法,例如入队操作(Enqueue)和出队操作(Dequeue)。

在Java领域中,对于Java语法层面实现的锁(synchronized 关键字), 其实就是参考了 MESA 模型,并且对 MESA 模型进行了精简,一般在MESA 模型中,条件变量可以有多个,Java 语言内置的管程(synchronized)里只有一个条件变量。

这就意味着,被synchronized 关键字修饰的代码块或者直接标记静态方法以及实例方法,在编译期会自动生成相关加锁(lock)和解锁(unlock)的代码,即就是monitorenter和monitorexit指令。

对于synchronized 关键字来说,主要是在Java HotSpot(TM) VM 虚拟机通过Monitor(监视器)来实现monitorenter和monitorexit指令的。

同时,在Java HotSpot(TM) VM 虚拟机中,每个对象都会有一个监视器,监视器和对象一起创建、销毁。

监视器相当于一个用来监视这些线程进入的特殊房间,其义务是保证(同一时间)只有一个线程可以访问被保护的临界区代码块。

本质上,监视器是一种同步工具,也可以说是JVM对管程的同步机制的封装实现,主要特点是:

  • 同步:监视器所保护的临界区代码是互斥地执行的。一个监视器是一个运行许可,任一线程进入临界区代码都需要获得这个许可,离开时把许可归还。
  • 协作:监视器提供Signal机制,允许正持有许可的线程暂时放弃许可进入阻塞等待状态,等待其他线程发送Signal去唤醒;其他拥有许可的线程可以发送Signal,唤醒正在阻塞等待的线程,让它可以重新获得许可并启动执行。

在Hotspot虚拟机中,监视器是由C++类ObjectMonitor实现的,ObjectMonitor类定义在ObjectMonitor.hpp文件中,其中:

  • Owner: 指向的线程即为获得锁的线程
  • Cxq:竞争队列(Contention Queue),所有请求锁的线程首先被放在这个竞争队列中
  • EntryList:对象实体列表,表示Cxq中那些有资格成为候选资源的线程被移动到EntryList中。
  • WaitSet:类似于等待队列,某个拥有ObjectMonitor的线程在调用Object.wait()方法之后将被阻塞,然后该线程将被放置在WaitSet链表中。

同时,管程与Java中面向对象原则(Object Oriented Principle)也是非常契合的,主要体现在 java.lang.Object类中wait()、notify()、notifyAll() 这三个方法,其中:

  • wait()方法: 阻塞线程并且进入等待队列
  • notify()方法:随机地通知等待队列中的一个线程
  • notifyAll()方法: 通知等待队列中的所有线程

不难发现,在Java中synchronized 关键字及 java.lang.Object类中wait()、notify()、notifyAll() 这三个方法都是管程的组成部分。

由此可见,我们可以得到一个比较通用的并发同步工具基础模型,大致包含如下几个内容,其中:

  • 条件变量(Conditional Variable): 利用线程间共享的变量进行同步的一种工作机制
  • 共享变量((Shared Variable)):一般指对象实体对象的成员变量和属性
  • 阻塞队列(Blocking Queue):共享变量(Shared Variable)及其对共享变量的操作统一封装
  • 等待队列(Wait Queue):每个条件变量都对应有一个等待队列(Wait Queue),内部需要实现入队操作(Enqueue)和出队操作(Dequeue)方法
  • 变量状态描述机(Synchronization Status):描述条件变量和共享变量之间状态变化,又可以称其为同步状态
  • 工作模式(Operation Mode): 线程资源具有排他性,因此定义独占模式和共享模式两种工作模式

综上所述,条件变量和等待队列的作用是解决线程之间的同步问题;共享变量与阻塞队列的作用是解决线程之间的互斥问题。

二.AQS基础同步器的设计与实现

在Java领域中,同步器是专门为多线程并发设计的同步机制,主要是多线程并发执行时线程之间通过某种共享状态来实现同步,只有当状态满足这种条件时线程才往下执行的一种同步机制。

对于多线程实现实现并发处理机制来说,一直以来,多线程都存在2个问题:

  • 线程之间内存共享,需要通过加锁进行控制,但是加锁会导致性能下降,同时复杂的加锁机制也会增加编程编码难度
  • 过多线程造成线程之间的上下文切换,导致效率低下

因此,在并发编程领域中,一直有一个很重要的设计原则: “ 不要通过内存共享来实现通信,而应该通过通信来实现内存共享。”

简单来说,就是尽可能通过消息通信,而不是内存共享来实现进程或者线程之间的同步。

其中,同步器是专门为多线程并发设计的同步机制,主要是多线程并发执行时线程之间通过某种共享状态来实现同步,只有当状态满足这种条件时线程才往下执行的一种同步机制。

由于在不同的应用场景中,对于同步器的需求也会有所不同,一般在我们自己去实现和设计一种并发工具的时候,都需会考虑以下几个问题:

  • 是否支持响应中断? 如果阻塞状态的线程能够响应中断信号,也就是说当我们给阻塞的线程发送中断信号的时候,能够唤醒它,那它就有机会释放曾经持有的锁。
  • 是否支持超时?如果线程在一段时间之内没有获取到锁,不是进入阻塞状态,而是返回一个错误,那这个线程也有机会释放曾经持有的锁。
  • 是否支持非阻塞地获取锁资源 ? 如果尝试获取锁失败,并不进入阻塞状态,而是直接返回,那这个线程也有机会释放曾经持有的锁。

从阅读JDK源码不难发现,主要是采用设计模式中模板模式的原则,JDK将各种同步器中相同的部分抽象封装成了一个统一的基础同步器,然后基于基础同步器为模板通过继承的方式来实现不同的同步器。

也就是说,在实际开发过程中,除了直接使用JDK实现的同步器,还可以基于这个基础同步器我们也可以自己自定义实现符合我们业务需求的同步器。

在JDK源码中,同步器位于java.util.concurrent.locks包下,其基本定义是AbstractQueuedSynchronizer类,即就是我们常说的AQS同步器。

1. 设计思想

一个标准的AQS同步器主要有同步状态机制,等待队列,条件队列,独占模式,共享模式等五大核心要素组成。

JDK的JUC(java.util.concurrent.)包中提供了各种并发工具,但是大部分同步工具的实现基于AbstractQueuedSynchronizer类实现,其内部结构主要如下:

  • 同步状态机制(Synchronization Status):主要用于实现锁(Lock)机制,是指同步状态,其要求对于状态的更新必须原子性的
  • 等待队列(Wait Queue):主要用于存放等待线程获取到的锁资源,并且把线程维护到一个Node(节点)里面和维护一个非阻塞的CHL Node FIFO(先进先出)队列,主要是采用自旋锁+CAS操作来保证节点插入和移除的原子性操作。
  • 条件队列(Condition Queue):用于实现锁的条件机制,一般主要是指替换“等待-通知”工作机制,主要是通过ConditionObject对象实现Condition接口提供的方法实现。
  • 独占模式(Exclusive Mode):主要用于实现独占锁,主要是基于静态内部类Node的常量标志EXCLUSIVE来标识该节点是独占模式
  • 共享模式(Shared Mode):主要用于实现共享锁,主要是基于静态内部类Node的常量标志SHARED来标识该节点是共享模式

其中,对于AbstractQueuedSynchronizer类的实现原理,我们可以从如下几个方面来看:


public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {

    private static final long serialVersionUID =  L;


    protected AbstractQueuedSynchronizer() {}

    /**
     * 等待队列: head-头节点
     */
    private transient volatile Node head;

    /**
     * 等待队列: tail-尾节点
     */
    private transient volatile Node tail;

    /**
     * 同步状态:32位整数类型,更新同步状态(state)时必须保证其是原子性的
     */
    private volatile int state;

    /**
     * 自旋锁消耗超时时间阀值(threshold): threshold < 1000ns时,表示竞争时选择自旋;threshold > 1000ns时,表示竞争时选择系统阻塞
     */
    static final long spinForTimeoutThreshold = 1000 L;

    /**
     * CAS原子性操作
     */
    private static final Unsafe unsafe = Unsafe.getUnsafe();

    /**
     * stateOffset
     */
    private static final long stateOffset;

    /**
     * headOffset
     */
    private static final long headOffset;

    /**
     * tailOffset
     */
    private static final long tailOffset;

    /**
     * waitStatusOffset
     */
    private static final long waitStatusOffset;

    /**
     * nextOffset
     */
    private static final long nextOffset;


    static {
        try {
            stateOffset = unsafe.objectFieldOffset(AbstractQueuedSynchronizer.class.getDeclaredField("state"));
            headOffset = unsafe.objectFieldOffset(AbstractQueuedSynchronizer.class.getDeclaredField("head"));
            tailOffset = unsafe.objectFieldOffset(AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
            waitStatusOffset = unsafe.objectFieldOffset(Node.class.getDeclaredField("waitStatus"));
            nextOffset = unsafe.objectFieldOffset(Node.class.getDeclaredField("next"));

        } catch (Exception ex) {
            throw new Error(ex);
        }
    }
		
		    private final boolean compareAndSetHead(Node update)  {
        return unsafe.compareAndSwapObject(this, headOffset, null, update);
    }
		
		    private final boolean compareAndSetTail(Node expect, Node update) {
        return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
    }
		
		    private static final boolean compareAndSetWaitStatus(Node node,
                                                         int expect,
                                                         int update) {
        return unsafe.compareAndSwapInt(node, waitStatusOffset,
                                        expect, update);
    }
		
		    private static final boolean compareAndSetNext(Node node,
                                                   Node expect,
                                                   Node update) {
        return unsafe.compareAndSwapObject(node, nextOffset, expect, update);
    }
		
		    protected final boolean compareAndSetState(int expect, int update) {
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }

}

[1]. AbstractQueuedSynchronizer类的实现原理是继承了基于AbstractOwnableSynchronizer类的抽象类,其中主要对AQS同步器的通用特性和方法进行抽象封装定义,主要包括如下方法:

public abstract class AbstractOwnableSynchronizer
implements java.io.Serializable {


    private static final long serialVersionUID =  L;


    protected AbstractOwnableSynchronizer() {}

    /**
     *  同步器拥有者
     */
    private transient Thread exclusiveOwnerThread;

    /**
     * 设置同步器拥有者:把线程当作参数传入,指定某个线程为独享
     */
    protected final void setExclusiveOwnerThread(Thread thread) {
        exclusiveOwnerThread = thread;
    }

    /**
     * 获取同步器拥有者:获取指定的某个线程
     */
    protected final Thread getExclusiveOwnerThread() {
        return exclusiveOwnerThread;
    }
}
  • setExclusiveOwnerThread(Thread thread)方法: 把某个线程作为参数传入,从而设置AQS同步器的所有者,即就是我们设置的某个线程
  • getExclusiveOwnerThread()方法: 获取当前AQS同步器的所有者,即就是我们指定的某个线程

[2]. 对于同步状态(state),其类型是32位整数类型,并且是被volatile修饰的,表示在更新同步状态(state)时必须保证其是原子性的。

[3]. 对于等待队列的结构,主要是在Node定义了head和tail变量,其中head表示头部节点,tail表示尾部节点

[4].对于等待队列的结构提到的Node类来说,主要内容如下:

  static final class Node {
      /** Marker to indicate a node is waiting in shared mode */
      static final Node SHARED = new Node();
			
      /** Marker to indicate a node is waiting in exclusive mode */
      static final Node EXCLUSIVE = null;

      /** waitStatus value to indicate thread has cancelled */
      static final int CANCELLED = 1;
			
      /** waitStatus value to indicate successor's thread needs unparking */
      static final int SIGNAL = -1;
			
      /** waitStatus value to indicate thread is waiting on condition */
      static final int CONDITION = -2;
			
      /**
       * waitStatus value to indicate the next acquireShared should
       * unconditionally propagate
       */
      static final int PROPAGATE = -3;

      /**
       * Status field, taking on only the values:
       *   SIGNAL:     The successor of this node is (or will soon be)
       *               blocked (via park), so the current node must
       *               unpark its successor when it releases or
       *               cancels. To avoid races, acquire methods must
       *               first indicate they need a signal,
       *               then retry the atomic acquire, and then,
       *               on failure, block.
       *   CANCELLED:  This node is cancelled due to timeout or interrupt.
       *               Nodes never leave this state. In particular,
       *               a thread with cancelled node never again blocks.
       *   CONDITION:  This node is currently on a condition queue.
       *               It will not be used as a sync queue node
       *               until transferred, at which time the status
       *               will be set to 0. (Use of this value here has
       *               nothing to do with the other uses of the
       *               field, but simplifies mechanics.)
       *   PROPAGATE:  A releaseShared should be propagated to other
       *               nodes. This is set (for head node only) in
       *               doReleaseShared to ensure propagation
       *               continues, even if other operations have
       *               since intervened.
       *   0:          None of the above
       *
       *
       * The field is initialized to 0 for normal sync nodes, and
       * CONDITION for condition nodes.  It is modified using CAS
       * (or when possible, unconditional volatile writes).
       */
      volatile int waitStatus;

      /**
       * Link to predecessor node that current node/thread relies on
       */
      volatile Node prev;

      /**
       * Link to the successor node that the current node/thread
       */
      volatile Node next;

      /**
       * The thread that enqueued this node.  Initialized on
       * construction and nulled out after use.
       */
      volatile Thread thread;

      /**
       * Link to next node waiting on condition, or the special
       */
      Node nextWaiter;

      /**
       * Returns true if node is waiting in shared mode.
       */
      final boolean isShared() {
          return nextWaiter == SHARED;
      }


      final Node predecessor() throws NullPointerException {
          Node p = prev;
          if (p == null)
              throw new NullPointerException();
          else
              return p;
      }

      Node() { // Used to establish initial head or SHARED marker
      }

      Node(Thread thread, Node mode) { // Used by addWaiter
          this.nextWaiter = mode;
          this.thread = thread;
      }

      Node(Thread thread, int waitStatus) { // Used by Condition
          this.waitStatus = waitStatus;
          this.thread = thread;
      }
  }

  • 标记Node的工作模式常量标记:主要维护了SHARED和EXCLUSIVE等2个静态字面常量,其中 SHARED 用于标记Node中是共享模式,EXCLUSIVE:用于标记Node中是独享模式
  • 标记等待状态的静态字面常量标记: 主要维护了0(表示无状态),SIGNAL(-1,表示后续节点中的线程通过park进入等待,当前节点在释放和取消时,需要通过unpark解除后后续节点的等待),CANCELLED(1,表示当前节点中的线程因为超时和中断被取消),CONDITION(-2,表示当前节点在条件队列中),PROPAGATE(-3,SHARED共享模式的头节点描述状态,表示无条件往下传播)等5个静态字面常量
  • 维护了一个等待状态(waitStatus): 主要用于描述等待队列中节点的状态,其取值范围为0(waitStatus=0,表示无状态),SIGNAL(waitStatus=-1,表示等待信号状态),CANCELLED(waitStatus=1,表示取消状态),CONDITION(waitStatus=-2,表示条件状态),PROPAGATE(waitStatus=-3,表示SHARED共享模式状态)等5个静态字面常量,CAS操作时写入,默认值为0。
  • 维护了Node的2个结构节点变量: 主要是prev和next,其中,prev表示前驱节点,next表示后续节点,表示构成双向向链表,构成了等待队列的数据结构
  • 维护了一个状态工作模式标记: 主要是维护了一个nextWaiter,用于表示在等待队列中当前节点在是共享模式还是独享模式,而对于条件队列来说,用于组成单向链表结构
  • 维护了一个线程对象变量: 主要用于记录当前节点中的线程thread

[5].对于自旋锁消耗超时时间阀值(spinForTimeoutThreshold),主要表示系统依据这个阀值来选择自旋方式还是系统阻塞。一般假设这个threshold,当 threshold < 1000ns时,表示竞争时选择自旋;否则,当threshold > 1000ns时,表示竞争时选择系统阻塞

[6].对于带有Offset 等变量对应各自的句柄,主要用于执行CAS操作。在JDK1.8版本之前,CAS操作主要通过Unsafe类来说实现;在JDK1.8版本之后,已经开始利用VarHandle来替代Unsafe类操作实现。

[7].对于CAS操作来说,主要提供了如下几个方法:

  • compareAndSetState(int expect, int update)方法:CAS操作原子更新状态
  • compareAndSetHead(Node update)方法:CAS操作原子更新头部节点
  • compareAndSetTail(Node expect, Node update)方法:CAS操作原子更新尾部节点
  • compareAndSetWaitStatus(Node node, int expect,int update)方法:CAS操作原子更新等待状态
  • compareAndSetNext(Node node,Node expect,Node update)方法:CAS操作原子更新后续节点

[8].对于条件队列(ConditionObject)来说,主要内容如下:

  public class ConditionObject implements Condition, java.io.Serializable {

      private static final long serialVersionUID =  L;

      /** First node of condition queue. */
      private transient Node firstWaiter;

      /** Last node of condition queue. */
      private transient Node lastWaiter;

      /** Mode meaning to reinterrupt on exit from wait */
      private static final int REINTERRUPT = 1;

      /** Mode meaning to throw InterruptedException on exit from wait */
      private static final int THROW_IE = -1;

      /**
       * Creates a new {@code ConditionObject} instance.
       */
      public ConditionObject() {}
			
  }

  • 基于Condition的接口实现条件队列,其核心主要是实现阻塞和唤醒的工作机制
  • 基于Node定义了firstWaiter和lastWaiter变量,其中,firstWaiter表示的是头节点,lastWaiter是尾节点
  • 还定义了2个字面常量REINTERRUPT和THROW_IE,其中REINTERRUPT=1,描述的是当中断是退出条件队列,THROW_IE=-1表示的是发生异常时退出

[8].除此之外,在AQS基础同步器中,一般可以通过构造方法直接将参数值赋给对应变量,也可以通过变量句柄进行赋值操作:

  • isShared()方法: 用于判断等待队列是否为共享模式
  • predecessor()方法: 用于获取当前节点对应的前驱节点,如果为空,则 throw new NullPointerException();

2. 基本实现

一个标准的AQS同步器最核心底层设计实现是一个非阻塞的CHL Node FIFO(先进先出)队列数据结构,通过采用自旋锁+CAS操作的方法来保证原子性操作。

总的来说,一个AQS基础同步器,底层的数据结构采用的是一个非阻塞的CHL Node FIFO(先进先出)队列数据结构,而实现的核心算法则是采用自旋锁+CAS操作的方法。

首先,对于非阻塞的CHL Node FIFO(先进先出)队列数据结构,一般来说,FIFO(First In First Out,先进先出)队列是一个有序列表,属于抽象型数据类型(Abstract Data Type,ADT),所有的插入和删除操作都发生在队首(Front)和队尾(Rear)两端,具有先进先出的特性。


    /**
     * 等待队列: head-头节点
     */
    private transient volatile Node head;

    /**
     * 等待队列: tail-尾节点
     */
    private transient volatile Node tail;
		
    /**
     * Inserts node into queue, initializing if necessary. See picture above.
     * @param node the node to insert
     * @return node's predecessor
     */
    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

    /**
     * Creates and enqueues node for current thread and given mode.
     *
     * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
     * @return the new node
     */
    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }

在AQS同步器的源码中,主要是通过静态内部类Node来实现的这个非阻塞的CHL Node FIFO(先进先出)队列数据结构, 维护了两个变量head和tail,其中head对应队首(Front),tail对应队尾(Rear)。同时,还定义了addWaiter(Node mode)方法来表示入队操作,其中有个enq(final Node node)方法,主要用于初始化队列中head和tail的设置。

其次,AQS同步器以CLH锁为基础,其中CLH锁是一种自旋锁,对于自旋锁的实现方式来看,主要可以分为普通自旋锁和自适应自旋锁,CLH锁和MCS锁等4种,其中:

  • 普通自旋锁:多个线程不断自旋,不断尝试获取锁,其不具备公平性和由于要保证CPU和缓存以及主存之间的数据一致性,其开销较大。
  • 自适应自旋锁:主要是为解决普通自旋锁的公平性问题,引入了一个排队机制,一般称为排他自旋锁,其具备公平性,但是没有解决保证CPU和缓存以及主存之间的数据一致性问题,其开销较大。
  • CLH锁:通过一定手段将线程对于某一个共享变量的轮询竞争转化为一个线程队列,且队列中的线程各自轮询自己本地变量。
  • MCS锁:主旨在于解决 CLH锁的问题,也是基于FIFO队列,与CLH锁不同是,只对本地变量自旋,前驱节点负责通知MCS锁中线程自适结束。

自旋锁是一种实现同步的方案,属于一种非阻塞锁,与常规锁主要的区别就在于获取锁失败之后的处理方式不同,主要体现在:

  • 一般情况下,常规锁在获取锁失败之后,会将线程阻塞并适当时重新唤醒
  • 而自旋锁则是使用自旋来替换阻塞操作,主要是线程会不断循环检查该锁是否被释放,一旦释放线程便会获取锁资源。

从本质上讲,自旋是一钟忙等待状态,会一直消耗CPU的执行时间。一般情况下,常规互斥锁适用于持有锁长时间的情况,自旋锁适合持有时间短的情况。

其中,对于CLH锁来说,其核心是为解决同步带来的花销问题,Craig,Landim,Hagersten三人发明了CLH锁,其中主要是:

  • 构建一个FIFO(先进先出)队列,构建时主要通过移动尾部节点tail来实现队列的排队,每个想获得锁的线程都会创建一个新节点(next)并通过CAS操作原子操作将新节点赋予给tail,当前线程轮询前一个节点的状态。
  • 执行完线程后,只需将当前线程对应节点状态设置为解锁即可,主要是判断当前节点是否为尾部节点,如果是直接设置尾部节点设置为空。由于下一个节点一直在轮询,所以可以获得锁。

CLH锁将众多线程长时间对资源的竞争,通过有序化这些线程将其转化为只需要对本地变量检测。唯一存在竞争的地方就是入队之前对尾部节点tail 的竞争,相对来说,当前线程对资源的竞争次数减少,这节省了CPU缓存同步的消耗,从而提升了系统性能。

但是同时也有一个问题,CLH锁虽然解决了大量线程同时操作同一个变量时带来的开销问题,如果前驱节点和当前节点在本地主存中不存在,则访问时间过长,也会引起性能问题。

为了让CLH锁更容易实现取消和超时的功能,AQS同步器在设计时进行了改造,主要体现在:节点的结构和节点等待机制。其中:

  • 节点的结构: 主要引入了头节点和尾节点,分别指向队列头部和尾部,对于锁的相关操作都与其息息相关,并且每个节点都引入了前驱节点和后继节点。
  • 节点等待机制: 主要在原来的自旋基础上增加了系统阻塞唤醒,主要体现在 自旋锁消耗超时时间阀值(threshold): threshold < 1000ns时,表示竞争时选择自旋;threshold > 1000ns时,表示竞争时选择系统阻塞。

由此可见,主要是通过前驱节点和后继节点的引用连接起来形成一个链表队列,其中对于入队,检测节点,出队,判断超时,取消节点等操作主要如下:

  • 入队(enqueue): 主要采用一个无限循环进行CAS操作,即就是使用自旋方式竞争直到成功。
  • 检测节点(checkedPrev): 一般在入队完成后,主要是检测判断当前节点的前驱节点是否为头节点, 一般自旋方式是直接进入循环检测,而系统阻塞方式是当前线程先检测,其中如果是头节点并成功获取锁,则直接返回,当前线程不阻塞,否则对当前线程进行阻塞。
  • 出队(dequeue):主要负责唤醒等待队列中的后继节点,并且按照条件往下传播有序执行
  • 判断超时(checkedTimeout): 队列中等待锁的线程可能因为中断或者超时的情况,当总耗时大于等于自定义耗时就直接返回,即就是
  • 取消节点(cancel): 主要是对于中断和超时而涉及到取消操作,而且这样的情况不再参与锁竞争,即就是一般通过调用compareAndSetNext(Node node, Node expect,Node update)来进行CAS操作。

特别值得注意的是,AQS基础同步器中主要有等待队列和条件队列两种对列结构,对比便不难发现:等待队列采用的底层数据结构是双向链表结构,而对于条件队列则是单向链表结构。

最后,AQS同步器中使用了CAS操作,其中CAS(Compare And Swap,比较并交换)操作时一种乐观锁策略,主要涉及三个操作数据:内存值,预期值,新值,主要是指当且仅当预期值和内存值相等时才去修改内存值为新值。

一般来说,CAS操作的具体逻辑,主要可以分为三个步骤:

  • 首先,检查某个内存值是否与该线程之前取到值一样。
  • 其次,如果不一样,表示此内存值已经被别的线程修改,需要舍弃本次操作。
  • 最后,如果时一样,表示期间没有线程更改过,则需要用新值执行更新内存值。

除此之外,需要注意的是CAS操作具有原子性,主要是由CPU硬件指令来保证,并且通过Java本地接口(Java Native Interface,JNI)调用本地硬件指令实现。

当然,CAS操作避免了悲观策略独占对象的 问题,同时提高了并发性能,但是也有以下三个问题:

  • 乐观策略只能保证一个共享变量的原子操作,如果是多个变量,CAS便不如互斥锁,主要是CAS操作的局限所致。
  • 长时间循环操作可能导致开销过大。
  • 经典的ABA问题: 主要是检查某个内存值是否与该线程之前取到值一样,这个判断逻辑不严谨。解决ABA问题的核心在于,引入版本号,每次更新变量值更新版本号。

而在AQS同步器中,为了保证并发实现保证原子性,而且是硬件级别的原子性,一般是通过JNI(Java native interface,Java 本地接口)方式让Java代码调用C/C++本地代码。

通过分析源码可知,一般使用Unsafe类需要只用关注如下方法即可:


public final class Unsafe {

    private static final Unsafe theUnsafe;

    private static native void registerNatives();

    private Unsafe() {}

    @CallerSensitive
    public static Unsafe getUnsafe() {
        Class var0 = Reflection.getCallerClass();
        if (!VM.isSystemDomainLoader(var0.getClassLoader())) {
            throw new SecurityException("Unsafe");
        } else {
            return theUnsafe;
        }
    }

    public final native boolean compareAndSwapObject(Object var1, long var2, Object var4, Object var5);

    public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);

    public final native boolean compareAndSwapLong(Object var1, long var2, long var4, long var6);

    public native void unpark(Object var1);

    public native void park(boolean var1, long var2);

}
  • rgisterNatives()方法:是一个静态 方法,主要用于注册本地方法
  • 构造函数是private私有化的,一般无法通过构造函数来实例化Unsafe对象
  • getUnsafe()方法:是用来获取Unsafe对象的,虽然是公有化的,但是如果Java语言开发层面的对进行安全检查

一般地,由于Unsafe类的操作涉及到硬件底层的操作,JDK对其实例化做了安全校验,只有受系统信任的代码才对其实例化,主要是通过类加载器来解析,其实例化方式主要有如下方式:

  • 第一种:直接调用该方法,主要新式是Unsafe.getUnsafe()。但是,对于我们实际开发来说,这种方式无法通过安全校验行不通,系统会抛出 throw new SecurityException("Unsafe")信息
  • 第二种:通过反射机制绕过安全检查,主要是修改Unsafe类中theUnsafe字段的访问权限,让其能被访问从而达到获取Unsafe对象的目的

需要注意的是,在Java领域中,对于CAS操作实现,主要有两点问题:

  • JDK1.8版本之前,CAS操作主要使用Unsafe类来执行底层操作,一般并发和线程操作时,主要用compareAndSwapObject,compareAndSwapInt,compareAndSwapLong等来实现CAS,而对于线程调度主要是park和unpark方法,其主要在sun.misc包下面。
  • JDK1.8版本之后,JDK1.9的CAS操作主要使用VarHandle类,只是用VarHandle替代了一部分Unsafe类的操作,但是对于新版本中Unsafe,本质上Unsafe类会间接调用jdk.internal.misc包下面Unsafe类来实现。

3. 具体实现

在Java领域中,AQS同步器利用独享模式和共享模式来实现同步机制,主要为解决多线并发执行中数据竞争和竞争条件问题。

为解决多线并发执行中数据竞争和竞争条件问题,引入了同步机制,主要是通过控制共享数据和临界区的访问,一般比较通用的方式是通过锁机制来实现。

在Java领域中,JDK对于AQS基础同步器抽象封装了锁的获取和释放操作,主要提供了独享和共享两种工作模式:

  • 独享模式(Exclusive Mode) :对应着独享锁(Exclusive Lock),表示着对于锁的获取和释放,一次只能至多一个或者只有一个线程把持,其他线程无法获得并获得持有,必须等待持有线程释放锁。
  • 共享模式(Shared Mode) :对应着共享锁(Shared Lock),表示着对于锁的获取和释放,一次可以至少一个或者允许多个线程把持,其他线程可以获得并获得持有,不用等待持有线程释放锁。

其中,AQS基础同步器对于独享模式和共享模式的工作模式的基本流程,主要如下:

  • 获取锁流程: 先尝试获取锁,如果获取成功则往下继续进行,否则把线程维护到等待队列中,线程可能会挂起。
  • 释放锁流程:唤醒等待队列中的一个或者多个线程去尝试获取需要释放的锁。

一般地,AQS基础同步器对于独享模式和共享模式的封装和实现,其中:

3.1. 独享模式的技术实现

[1].获取锁操作相关的核心逻辑,主要如下:


public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {

    /**
     * 独占模式:[1].通过acquire获取锁操作
     */

    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }


    /**
     * 独占模式:[2].通过tryAcquire尝试获取锁操作
     */
    protected boolean tryAcquire(int arg) {
        throw new UnsupportedOperationException();
    }



    /**
     * 独占模式:[3].通过addWaiter入队操作
     */
    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }

    /**
     * 独占模式:[4].通过acquireQueued检测节点
     */
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

    /**
     * 独占模式:[5].通过cancelAcquire取消锁获取
     */
    private void cancelAcquire(Node node) {

        if (node == null)
            return;

        node.thread = null;


        Node pred = node.prev;
        while (pred.waitStatus > 0)
            node.prev = pred = pred.prev;


        Node predNext = pred.next;


        node.waitStatus = Node.CANCELLED;


        if (node == tail && compareAndSetTail(node, pred)) {
            compareAndSetNext(pred, predNext, null);
        } else {

            int ws;
            if (pred != head &&
                ((ws = pred.waitStatus) == Node.SIGNAL ||
                    (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
                pred.thread != null) {
                Node next = node.next;
                if (next != null && next.waitStatus <= 0)
                    compareAndSetNext(pred, predNext, next);
            } else {
                unparkSuccessor(node);
            }

            node.next = node; 
        }
    }


}

[2]. 释放锁操作相关的核心逻辑,主要如下:


public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {


    /**
     * 独占模式:[1].通过release释放锁操作
     */
    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

    /**
     * 独占模式:[2].通过tryRelease尝试释放锁操作
     */
    protected boolean tryRelease(int arg) {
        throw new UnsupportedOperationException();
    }

    /**
     * 独占模式:[3].通过unparkSuccessor唤醒后继节点
     */
    private void unparkSuccessor(Node node) {

        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);


        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }

}

由此可见,对于独占模式的锁获取和释放,主要是依据acquire和release等方法来实现。

3.1. 共享模式的技术实现

[1].获取锁操作相关的核心逻辑,主要如下:


public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {

    /**
     * 共享模式:[1].通过acquireShared获取锁操作
     */

    public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }

    /**
     * 共享模式:[2].通过tryAcquireShared尝试获取锁操作
     */
    protected int tryAcquireShared(int arg) {
        throw new UnsupportedOperationException();
    }

    /**
     * 共享模式:[3].通过doAcquireShared入队操作
     */
    private void doAcquireShared(int arg) {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

}

[2].释放锁操作相关的核心逻辑,主要如下:


public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {

    /**
     * 共享模式:[1].通过releaseShared释放锁操作
     */

    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }

    /**
     * 共享模式:[2].通过tryReleaseShared尝试释放锁操作
     */


    protected boolean tryReleaseShared(int arg) {
        throw new UnsupportedOperationException();
    }

    /**
     * 共享模式:[3].通过doReleaseShared释放锁就绪操作
     */
    private void doReleaseShared() {

        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;
                    unparkSuccessor(h);
                } else if (ws == 0 &&
                    !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;
            }
            if (h == head)
                break;
        }
    }
		
}

由此可见,对于共享模式的锁获取和释放,主要是依据acquireShared和releaseShared等方法来实现。

综上所述,不难看出,AQS同步器设计思想是通过继承的方式提供一个模板,其核心原理是管理一个共享状态,通过对状态的控制来实现不同的控制。

二. LockSupport的设计与实现

在Java领域中,LockSupport主要从线程资源角度为同步器和锁提供基本线程阻塞和唤醒原语,是“等待-通知”工作机制的实现。

一般来说,当一个线程(Thread)只要参与锁竞争时,其经历的主要流程有:

  • 一旦当前线程进行锁竞争时,线程都会尝试获取锁,根据获取锁的情况进行后续处理。
  • 如果获取锁失败,则会创建节点插入到队列的尾部,会二次尝试重新获取锁,并不会阻塞当前线程。
  • 如果获取锁成功,则直接返回,否则会将节点设置为待运行状态(SIGNAL)。
  • 最后对当前线程进行阻塞,当前驱节点运行完成后会唤醒后继节点。

在Java领域中,对于线程的阻塞和唤醒,也许我们最早在学习面向对象原则的时候,一般都使用java.lang.Object类中wait()、notify()、notifyAll() 这三个方法,可以用它们帮助我们实现等待-通知”工作机制。

同时,在讲解AQS基础同步器的实现时,提到说CAS操作的核心是使用Unsafe类来执行底层操作,对于线程调度主要是park和unpark方法,但是一般的Java语言层 main开发对其调用又有安全检查的限制。

但是,在AQS基础同步的的阻塞和唤醒操作咋在获取锁饿的锁操作中需要使用,一般地:

  • 如果获取不到锁的当前线程在进入排到队列之后需要阻塞当前线程。
  • 并且,排到队列中前驱节点运行完成后,需要负责唤醒后继节点。

于是,在AQS基础同步器的设计与实现中,封装一个专门用于实现“等待-通知”工作机制的LockSupport类。

对于LockSupport类,主要是为同步器和锁提供基本线程阻塞和唤醒原语,AQS同步器和锁都是使用它来阻塞和唤醒线程。主要源码如下:


public class LockSupport {

    /**
     *  阻塞操作:利用park阻塞某个线程(指定参数)
     */
    public static void park(Object blocker) {
        Thread t = Thread.currentThread();
        setBlocker(t, blocker);
        UNSAFE.park(false, 0 L);
        setBlocker(t, null);
    }

    /**
     *  阻塞操作:利用park阻塞某个线程(无指定参数)
     */
    public static void park() {
        UNSAFE.park(false, 0 L);
    }


    /**
     *  阻塞操作:根据nanos许可,利用parkNanos阻塞某个线程
     */
    public static void parkNanos(long nanos) {
        if (nanos > 0)
            UNSAFE.park(false, nanos);
    }

    /**
     *  阻塞操作:根据nanos许可,利用parkNanos阻塞某个线程,但是指定阻塞对象
     */
    public static void parkNanos(Object blocker, long nanos) {
        if (nanos > 0) {
            Thread t = Thread.currentThread();
            setBlocker(t, blocker);
            UNSAFE.park(false, nanos);
            setBlocker(t, null);
        }
    }

    /**
     *  阻塞操作:根据deadline最大等待时间,利用parkUntil阻塞某个线程
     */
    public static void parkUntil(long deadline) {
        UNSAFE.park(true, deadline);
    }

    /**
     *  阻塞操作:根据deadline最大等待时间,利用parkUntil阻塞某个线程,需要指定阻塞对象
     */
    public static void parkUntil(Object blocker, long deadline) {
        Thread t = Thread.currentThread();
        setBlocker(t, blocker);
        UNSAFE.park(true, deadline);
        setBlocker(t, null);
    }


    /**
     *  唤醒操作:利用unpark唤醒某个线程
     */
    public static void unpark(Thread thread) {
        if (thread != null)
            UNSAFE.unpark(thread);
    }

    /**
     *  设置阻塞器: 指定线程和对象
     */
    private static void setBlocker(Thread t, Object arg) {
        // Even though volatile, hotspot doesn't need a write barrier here.
        UNSAFE.putObject(t, parkBlockerOffset, arg);
    }

    /**
     *  获取阻塞器中线程对象
     */
    public static Object getBlocker(Thread t) {
        if (t == null)
            throw new NullPointerException();
        return UNSAFE.getObjectVolatile(t, parkBlockerOffset);
    }

    /**
     *  获取阻塞器中线程对象
     */
    static final int nextSecondarySeed() {
        int r;
        Thread t = Thread.currentThread();
        if ((r = UNSAFE.getInt(t, SECONDARY)) != 0) {
            r ^= r << 13; // xorshift
            r ^= r >>> 17;
            r ^= r << 5;
        } else if ((r = java.util.concurrent.ThreadLocalRandom.current().nextInt()) == 0)
            r = 1; // avoid zero
        UNSAFE.putInt(t, SECONDARY, r);
        return r;
    }

    // Hotspot implementation via intrinsics API
    private static final sun.misc.Unsafe UNSAFE;

    private static final long parkBlockerOffset;

    private static final long SEED;

    private static final long PROBE;

    private static final long SECONDARY;

    static {
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class  tk = Thread.class;
            parkBlockerOffset = UNSAFE.objectFieldOffset(tk.getDeclaredField("parkBlocker"));
            SEED = UNSAFE.objectFieldOffset(tk.getDeclaredField("threadLocalRandomSeed"));
            PROBE = UNSAFE.objectFieldOffset(tk.getDeclaredField("threadLocalRandomProbe"));
            SECONDARY = UNSAFE.objectFieldOffset(tk.getDeclaredField("threadLocalRandomSecondarySeed"));
        } catch (Exception ex) {
            throw new Error(ex);
        }
    }

}

  • 设计思想: 相比用java.lang.Object类中的wait/notify方式,其LockSupport类更关注线程本身,解耦了线程之间的同步。
  • 实现原理: 主要还是使用Unsafe类来执行底层操作,主要是间接调用是park和unpark本地方法
  • 阻塞操作涉及方法: 一般以park开头的方法来阻塞线程操作,大致可以分为自定义阻塞对象参数和非自定义阻塞对象参数等阻塞方法
  • 唤醒线程操作: 主要通过unpark方法来对当前线程设置可用,相对于唤醒操作

三. Condition接口的设计与实现

在Java领域中,Condition接口是用来实现管程技术,其中 Condition用于解决同步问题。

相对于LockSupport的设计与实现来说,Condition接口只是在JDK层面对于阻塞和唤醒提供了一个模板的定义,是AQS基础同步器中条件队列的定义,而ConditionObject是在AQS基础同步器具体实现。

对于Condition接口而言,是提供了可替代wait/notify机制的条件队列模式,其中:


public interface Condition {

    /**
     * 条件队列模式:等待await操作
     */
    void await() throws InterruptedException;

    /**
     * 条件队列模式:等待awaitUninterruptibly操作,可中断模式
     */
    void awaitUninterruptibly();

    /**
     * 条件队列模式:等待awaitNanos操作,可超时模式
     */
    long awaitNanos(long nanosTimeout) throws InterruptedException;

    /**
     * 条件队列模式:等待await操作,可超时模式
     */
    boolean await(long time, TimeUnit unit) throws InterruptedException;

    /**
     *条件队列模式:等待awaitUntil操作,可超时模式
     */
    boolean awaitUntil(Date deadline) throws InterruptedException;

    /**
     * 条件队列模式:通知signal操作
     */
    void signal();

    /**
     * 条件队列模式:通知signalAll操作
     */
    void signalAll();
}
  • 定义了关于“等待(wait)”机制的相关实现:而以await开头的所有方法都是关于等待机制的定义。
  • 定义了关于“通知(signal)”机制的相关实现: signal()方法和signalAll()方法,其中 signal()方法是随机地通知等待队列中的一个线程,而signalAll()方法是通知等待队列中的所有线程。

四. Lock接口的设计与实现

在Java领域中,Lock接口是用来实现管程技术,其中 Lock 用于解决互斥问题。

Lock接口位于java.util.concurrent.locks包中,是JUC显式锁的一个抽象,Lock接口的主要抽象方法:

public interface Lock {

    /**
     * Lock接口-获取锁
     */
    void lock();

    /**
     * Lock接口-获取锁(可中断)
     */
    void lockInterruptibly() throws InterruptedException;

    /**
     * Lock接口-尝试获取锁
     *
     */
    boolean tryLock();

    /**
     *Lock接口-尝试获取锁(支持超时)
     */
    boolean tryLock(long time, TimeUnit unit) throws InterruptedException;

    /**
     *Lock接口-释放锁
     *
     */
    void unlock();

    /**
     *  Lock接口-设置条件变量
     */
    Condition newCondition();
}

  • 获取锁: lock()
  • 释放锁:unlock()
  • 条件变量: Condition

在JDK中,对于Lock接口的具体实现主要是ReentrantLock类,其中:


public class ReentrantLock implements Lock, java.io.Serializable {

    private static final long serialVersionUID =  L;

    /** Synchronizer providing all implementation mechanics */
    private final Sync sync;


    /**
     * 构造锁的非公平模式(默认模式)
     */
    public ReentrantLock() {
            sync = new NonfairSync();
        }
				
    /**
     * 构造锁的公平和非公平模式(可选公平或者非公平)
     */
    public ReentrantLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
    }

    /**
     * Lock接口-实现尝试获取锁
     */
    public void lock() {
        sync.lock();
    }

    /**
     * Lock接口-实现尝试获取锁
     */
    public void lockInterruptibly() throws InterruptedException {
        sync.acquireInterruptibly(1);
    }

    /**
     * Lock接口-实现尝试获取锁
     */
    public boolean tryLock() {
        return sync.nonfairTryAcquire(1);
    }

    /**
     * Lock接口-实现尝试获取锁
     */
    public boolean tryLock(long timeout, TimeUnit unit)
    throws InterruptedException {
        return sync.tryAcquireNanos(1, unit.toNanos(timeout));
    }

    /**
     * Lock接口-释放锁
     */
    public void unlock() {
        sync.release(1);
    }

    /**
     * Lock接口-创建条件变量
     */
    public Condition newCondition() {
        return sync.newCondition();
    }


}

  • 包含了一个同步器Sync,主要是基于AbstractQueuedSynchronizer实现,同时基于Sync类还实现FairSync类和NonfairSync类,其中FairSync类对应着公平模式,NonfairSync类对应非公平模式。
  • 实现Lock接口设计和定义的相关方法,可以设定其锁是公平和非公的,默认是非公平模式的。

相对于Java内置锁,Java SDK 并发包里的 Lock接口主要区别有能够响应中断、支持超时和非阻塞地获取锁等三个特性。

五. ReadWriteLock接口的设计与实现

在Java领域中,ReadWriteLock接口主要是基于Lock接口来封装了ReadLock锁和WriteLock锁等2种锁的实现方法,其中ReadLock锁是读锁的接口定义,WriteLock锁是写锁的接口定义。

ReadWriteLock接口的内部实现,主要是基于Lock接口来获取ReadLock锁和WriteLock锁的,其具体代码如下:


public interface ReadWriteLock {
    /**
     * ReadWriteLock接口-基于Lock接口实现ReadLock
     */
    Lock readLock();

    /**
     * ReadWriteLock接口-基于Lock接口实现WriteLock
     */
    Lock writeLock();
}

  • readLock()方法: 获取读锁,主要是基于基于Lock接口来定义,表示着其具体实现类都会基于AQS基础同步器实现
  • writeLock()方法:获取写锁,主要是基于基于Lock接口来定义,表示着其具体实现类都会基于AQS基础同步器实现

在JDK中,对于ReadWriteLock接口的具体实现主要是ReentrantReadWriteLock类,其中:


public class ReentrantReadWriteLock
implements ReadWriteLock, java.io.Serializable {

    private static final long serialVersionUID = - L;

    /** Inner class providing readlock */
    private final ReentrantReadWriteLock.ReadLock readerLock;

    /** Inner class providing writelock */
    private final ReentrantReadWriteLock.WriteLock writerLock;

    /** Performs all synchronization mechanics */
    final Sync sync;


    /**
     * 同步器-基于AbstractQueuedSynchronizer实现
     */
    abstract static class Sync extends AbstractQueuedSynchronizer {
        //.....

        abstract boolean readerShouldBlock();

        abstract boolean writerShouldBlock();
    }

    /**
     * 同步器-非公平模式
     */
    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = - L;

        final boolean writerShouldBlock() {
            return false;
        }

        final boolean readerShouldBlock() {
            return apparentlyFirstQueuedIsExclusive();
        }
    }

    /**
     * 同步器-公平模式
     */
    static final class FairSync extends Sync {
        private static final long serialVersionUID = - L;
        final boolean writerShouldBlock() {
            return hasQueuedPredecessors();
        }
        final boolean readerShouldBlock() {
            return hasQueuedPredecessors();
        }
    }


    /**
     * 构造锁的非公平模式(默认预设)
     */
    public ReentrantReadWriteLock() {
        this(false);
    }

    /**
     * 构造锁的公平和非公平模式(可选公平或者非公平)
     */
    public ReentrantReadWriteLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
        readerLock = new ReadLock(this);
        writerLock = new WriteLock(this);
    }

    /**
     * ReadWriteLock接口-基于Lock具体实现的writeLock()
     */
    public ReentrantReadWriteLock.WriteLock writeLock() {
        return writerLock;
    }

    /**
     * ReadWriteLock接口-基于Lock具体实现的readLock()
     */
    public ReentrantReadWriteLock.ReadLock readLock() {
        return readerLock;
    }

    /**
     * ReadWriteLock接口-ReadLock内置类
     */
    public static class ReadLock implements Lock, java.io.Serializable {
        private static final long serialVersionUID = - L;

        private final Sync sync;

        //.....
    }

    /**
     * ReadWriteLock接口-WriteLock内置类
     */
    public static class WriteLock implements Lock, java.io.Serializable {
        private static final long serialVersionUID = - L;

        private final Sync sync;

        //.....
    }

}

  • 包含了一个同步器Sync,主要是基于AbstractQueuedSynchronizer实现,同时基于Sync类还实现FairSync类和NonfairSync类,其中FairSync类对应着公平模式,NonfairSync类对应非公平模式。
  • 主要是内置定义了ReadLock类和WriteLock类等两个内部类,其中ReadLock类为读锁,而而WriteLock类为写锁,可以设定其锁是公平和非公的,默认是非公平模式的。
  • 实现了ReadWriteLock接口,通过内部类的定义来具体实现对应的锁,其中ReadLock类对应readLock()方法,而WriteLock类对应writeLock()方法。

当前名称:Java 并发编程解析 | 基于JDK源码解析Java领域中的并发锁,我们可以从中学习到什么内容?
文章来源:http://mswzjz.cn/article/dscgege.html

其他资讯