当前位置:网站首页 > 技术博客 > 正文

java线程池是什么



好,终于到 Java 的线程池了,这是 Java 并发编程中非常重要的一块内容,今天我们就通过图文的方式来彻底弄懂线程池的工作原理,以及在实际项目中该如何自定义适合业务的线程池。

线程池其实是一种池化的技术实现,池化技术的核心思想就是实现资源的复用,避免资源的重复创建和销毁带来的性能开销。线程池可以管理一堆线程,让线程执行完任务之后不进行销毁,而是继续去处理其它线程已经提交的任务。

使用线程池的好处

  • 降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
  • 提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。
  • 提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。

Java 主要是通过构建 ThreadPoolExecutor 来创建线程池的。接下来我们看一下线程池是如何构造出来的

ThreadPoolExecutor 的构造方法:

  • corePoolSize:线程池中用来工作的核心线程数量。
  • maximumPoolSize:最大线程数,线程池允许创建的最大线程数。
  • keepAliveTime:超出 corePoolSize 后创建的线程存活时间或者是所有线程最大存活时间,取决于配置。
  • unit:keepAliveTime 的时间单位。
  • workQueue:任务队列,是一个阻塞队列,当线程数达到核心线程数后,会将任务存储在阻塞队列中。
  • threadFactory :线程池内部创建线程所用的工厂。
  • handler:拒绝策略;当队列已满并且线程数量达到最大线程数量时,会调用该方法处理任务。

线程池的构造其实很简单,就是传入一堆参数,然后进行简单的赋值操作。

说完线程池的核心构造参数,接下来来讲解这些参数在线程池中是如何工作的。

线程池刚创建出来是什么样子呢,如下图:

没错,刚创建出来的线程池中只有一个构造时传入的阻塞队列,里面并没有线程,如果想要在执行之前创建好核心线程数,可以调用 prestartAllCoreThreads 方法来实现,默认是没有线程的。

当有线程通过 execute 方法提交了一个任务,会发生什么呢?

首先会去判断当前线程池的线程数是否小于核心线程数,也就是线程池构造时传入的参数 corePoolSize。

如果小于,那么就直接通过 ThreadFactory 创建一个线程来执行这个任务,如图

当任务执行完之后,线程不会退出,而是会去阻塞队列中获取任务,如下图

接下来如果又提交了一个任务,也会按照上述的步骤去判断是否小于核心线程数,如果小于,还是会创建线程来执行任务,执行完之后也会从阻塞队列中获取任务。

这里有个细节,就是提交任务的时候,就算有线程池里的线程从阻塞队列中获取不到任务,如果线程池里的线程数还是小于核心线程数,那么依然会继续创建线程,而不是复用已有的线程。

如果线程池里的线程数不再小于核心线程数呢?那么此时就会尝试将任务放入阻塞队列中,入队成功之后,如图

这样,阻塞的线程就可以获取到任务了。

但是,随着任务越来越多,队列已经满了,任务放入失败,怎么办呢?

此时会判断当前线程池里的线程数是否小于最大线程数,也就是入参时的 maximumPoolSize 参数

如果小于最大线程数,那么也会创建非核心线程来执行提交的任务,如图

所以,就算队列中有任务,新创建的线程还是会优先处理这个提交的任务,而不是从队列中获取已有的任务执行,从这可以看出,先提交的任务不一定先执行

假如线程数已经达到最大线程数量,怎么办呢?

此时就会执行拒绝策略,也就是构造线程池的时候,传入的 RejectedExecutionHandler 对象,来处理这个任务。

JDK 自带的 RejectedExecutionHandler 实现有 4 种

  • AbortPolicy:丢弃任务,抛出运行时异常
  • CallerRunsPolicy:由提交任务的线程来执行任务
  • DiscardPolicy:丢弃这个任务,但是不抛异常
  • DiscardOldestPolicy:从队列中剔除最先进入队列的任务,然后再次提交任务

线程池创建的时候,如果不指定拒绝策略就默认是 AbortPolicy 策略。

当然,你也可以自己实现 RejectedExecutionHandler 接口,比如将任务存在数据库或者缓存中,这样就可以从数据库或者缓存中获取被拒绝掉的任务了。

到这里,我们发现,线程池构造的几个参数 corePoolSize、maximumPoolSize、workQueue、threadFactory、handler 我们都在上述的执行过程中讲到了,那么还差两个参数 keepAliveTime 和 unit(unit 是 keepAliveTime 的时间单位)没讲到,所以 keepAliveTime 是如何起作用的呢,这个问题留到后面分析。

说完整个执行的流程,接下来看看 execute 方法的代码是如何实现的。

  • :判断是否小于核心线程数,是的话就通过 addWorker 方法,addWorker 用来添加线程并执行任务。
  • :尝试往阻塞队列中添加任务。添加失败就会再次调用 addWorker 尝试添加非核心线程来执行任务;如果还是失败了,就会调用 来拒绝这个任务。

再来另画一张图总结一下 execute 的执行流程

线程池的核心功能就是实现线程的重复利用,那么线程池是如何实现线程的复用呢?

线程在线程池内部其实被封装成了一个 Worker 对象

Worker 继承了 AQS,也就是具有一定锁的特性。

创建线程来执行任务的方法,上面提到了,是通过 addWorker 方法。在创建 Worker 对象的时候,会把线程和任务一起封装到 Worker 内部,然后调用 runWorker 方法来让线程执行任务,接下来我们就来看一下 runWorker 方法。

从这里就可以找出线程执行完任务不会退出的原因了,runWorker 内部使用了 while 死循环,当第一个任务执行完之后,会不断地通过 getTask 方法获取任务,只要能获取到任务,就会调用 run 方法继续执行任务,这就是线程能够复用的主要原因。

但是如果从 getTask 获取不到方法的话,就会调用 finally 中的 processWorkerExit 方法,将线程退出。

这里有个一个细节就是,因为 Worker 继承了 AQS,每次在执行任务之前都会调用 Worker 的 lock 方法,执行完任务之后,会调用 unlock 方法,这样做的目的就可以通过 Woker 的加锁状态判断出当前线程是否正在执行任务。

如果想知道线程是否正在执行任务,只需要调用 Woker 的 tryLock 方法,根据是否加锁成功就能判断,加锁成功说明当前线程没有加锁,也就没有执行任务了,在调用 shutdown 方法关闭线程池的时候,就时用这种方式来判断线程有没有在执行任务,如果没有的话,会尝试打断没有执行任务的线程。

前面我们讲到,线程在执行完任务之后,会继续从 getTask 方法中获取任务,获取不到就会退出。接下来我们就来看一看 getTask 方法的实现。

前面就是线程池的一些状态判断,这里有一行代码

这行代码是用来判断当前过来获取任务的线程是否可以超时退出。如果 allowCoreThreadTimeOut 设置为 true 或者线程池当前的线程数大于核心线程数,也就是 corePoolSize,那么该获取任务的线程就可以超时退出。

怎么做到超时退出呢,就是这行核心代码

会根据是否允许超时来选择调用阻塞队列 workQueue 的 poll 方法或者 take 方法。如果允许超时,则调用 poll 方法,传入 keepAliveTime,也就是构造线程池时传入的空闲时间,这个方法的意思就是从队列中阻塞 keepAliveTime 时间来获取任务,获取不到就会返回 null;如果不允许超时,就会调用 take 方法,这个方法会一直阻塞获取任务,直到从队列中获取到任务为止。

从这里就可以看到 keepAliveTime 是如何使用的了。

所以到这里,大家应该知道线程池中的线程为什么可以做到空闲一定时间就退出了吧?

其实最主要就是利用了阻塞队列的 poll 方法,这个方法可以指定超时时间,一旦线程达到了 keepAliveTime 还没有获取到任务,就会返回 null,一旦 getTask 方法返回 null,线程就会退出。

这里也有一个细节,就是判断当前获取任务的线程是否可以超时退出的时候,如果将 allowCoreThreadTimeOut 设置为 true,那么所有线程走到这个 timed 都是 true,所有线程包括核心线程都可以做到超时退出。如果线程池需要将核心线程超时退出,就可以通过 allowCoreThreadTimeOut 方法将 allowCoreThreadTimeOut 变量设置为 true。

整个 getTask 方法以及线程超时退出的机制如图所示

线程池内部有 5 个常量来代表线程池的五种状态

  • RUNNING:线程池创建时就是这个状态,能够接收新任务,以及对已添加的任务进行处理。
  • SHUTDOWN:调用 shutdown 方法,线程池就会转换成 SHUTDOWN 状态,此时线程池不再接收新任务,但能继续处理已添加的任务到队列中。
  • STOP:调用 shutdownNow 方法,线程池就会转换成 STOP 状态,不接收新任务,也不能继续处理已添加的任务到队列中任务,并且会尝试中断正在处理的任务的线程。
  • TIDYING:SHUTDOWN 状态下,任务数为 0, 其他所有任务已终止,线程池会变为 TIDYING 状态;线程池在 SHUTDOWN 状态,任务队列为空且执行中任务为空,线程池会变为 TIDYING 状态;线程池在 STOP 状态,线程池中执行中任务为空时,线程池会变为 TIDYING 状态。
  • TERMINATED:线程池彻底终止。线程池在 TIDYING 状态执行完 方法就会转变为 TERMINATED 状态。

线程池状态具体是存在 ctl 成员变量中的,ctl 中不仅存储了线程池的状态还存储了当前线程池中线程数的大小

最后画个图来总结一下这 5 种状态的流转

其实,在线程池运行过程中,绝大多数操作执行前都得判断当前线程池处于哪种状态,再来决定是否继续执行该操作。

线程池提供了 shutdown 和 shutdownNow 两个方法来关闭线程池。

shutdown 方法

就是将线程池的状态修改为 SHUTDOWN,然后尝试打断空闲的线程(如何判断空闲,上面在说 Worker 继承 AQS 的时候说过),也就是在阻塞等待任务的线程。

shutdownNow 方法

就是将线程池的状态修改为 STOP,然后尝试打断所有的线程,从阻塞队列中移除剩余的任务,这也是为什么 shutdownNow 不能执行剩余任务的原因。

所以也可以看出 shutdown 方法和 shutdownNow 方法的主要区别就是,shutdown 之后还能处理在队列中的任务,shutdownNow 直接就将任务从队列中移除,线程池里的线程就不再处理了。

在项目中使用线程池的时候,一般需要对线程池进行监控,方便出问题的时候快速定位。线程池本身提供了一些方法来获取线程池的运行状态。

  • getCompletedTaskCount:已经执行完成的任务数量
  • getLargestPoolSize:线程池里曾经创建过的最大的线程数量。这个主要是用来判断线程是否满过。
  • getActiveCount:获取正在执行任务的线程数据
  • getPoolSize:获取当前线程池中线程数量的大小

除了线程池提供的上述已经实现的方法,同时线程池也预留了很多扩展方法。比如在 runWorker 方法里面,执行任务之前会回调 beforeExecute 方法,执行任务之后会回调 afterExecute 方法,而这些方法默认都是空实现,小伙伴们可以自己继承 ThreadPoolExecutor 来重写这些方法,实现自己想要的功能。

在 Java 程序中,其实经常需要用到多线程来处理一些业务,但是不建议单纯继承 Thread 或者实现 Runnable 接口来创建线程,这样会导致频繁创建及销毁线程,同时创建过多的线程也可能引发资源耗尽的风险。

所以使用线程池是一种更合理的选择,方便管理任务,同时实现线程的重复利用。所以线程池一般适合需要异步或者多线程处理任务的场景。

以下是几个线程池使用场景的简单示例:

模拟一个简单的Web服务器,接受请求并使用线程池进行处理。

使用线程池进行并行的数值计算。

模拟处理异步任务。

在上面的示例中,我们使用了 JDK 内部提供的 Executors 工具类来快速创建线程池。

1)固定线程数量的线程池:核心线程数与最大线程数相等

2)单个线程数量的线程池

3)接近无限大线程数量的线程池

4)带定时调度功能的线程池

虽然 JDK 提供了快速创建线程池的方法,但其实不推荐使用 Executors 来创建线程池,因为从上面构造线程池的代码可以看出,newFixedThreadPool 线程池由于使用了 LinkedBlockingQueue,队列的容量默认无限大,实际使用中出现任务过多时会导致内存溢出;newCachedThreadPool 线程池由于核心线程数无限大,当任务过多的时候会导致创建大量的线程,可能机器负载过高导致服务宕机。

这也是面试常问的一道八股文,大家需要注意。

通过上面分析提到,通过 Executors 这个工具类来创建的线程池其实都无法满足实际的使用场景,那么在实际的项目中,到底该如何构造线程池呢,该如何合理的设置参数?

线程数的设置主要取决于业务是 IO 密集型还是 CPU 密集型。

CPU 密集型:指的是任务主要使用来进行大量的计算,没有什么导致线程阻塞。一般这种场景的线程数设置为 CPU 核心数+1。

IO 密集型:当执行任务需要大量的 io,比如磁盘 io,网络 io,可能会存在大量的阻塞,所以在 IO 密集型任务中使用多线程可以大大地加速任务的处理。一般线程数设置为 2*CPU 核心数

Java 中用来获取 CPU 核心数的方法是:

一般建议自定义线程工厂,构建线程的时候设置线程的名称,这样在查日志的时候就方便知道是哪个线程执行的代码。

一般需要设置有界队列的大小,比如 LinkedBlockingQueue 在构造的时候可以传入参数来限制队列中任务数据的大小,这样就不会因为无限往队列中扔任务导致系统的 oom。

OK,我们来通过自定义 ThreadPoolExecutor 改造一下前面使用 Executors 的例子。

Web服务器通常需要处理I/O操作,比如网络I/O,因此它们被视为I/O密集型任务。因此,我们将线程数设置为2 * CPU核心数。

并行计算任务主要用于计算,没有I/O阻塞,所以它们是CPU密集型的。线程数设置为CPU核心数 + 1。

异步任务通常涉及到I/O操作,比如数据库查询或文件读写,因此它们被视为I/O密集型任务。因此,我们将线程数设置为2 * CPU核心数。

本文主要介绍了线程池的原理以及使用场景,线程池主要是通过阻塞队列来实现的,线程池的使用场景主要是异步或者多线程处理任务的场景。线程池的使用可以通过 Executors 来快速创建,但是不推荐使用,因为 Executors 创建的线程池都有一些缺陷,比如无界队列可能导致内存溢出,无限大的线程数可能导致机器负载过高。所以在实际的项目中,建议自定义线程池 ThreadPoolExecutor,根据业务场景来合理的设置线程数,队列大小等参数。

编辑:沉默王二,部分内容来自于读者三友的公众号文章 https://mp.weixin..com/s/lVem8mGANea8aUYF3XCO7Q,写得非常不错,强烈推荐。


GitHub 上标星 10000+ 的开源知识库《二哥的 Java 进阶之路》第二份 PDF 《并发编程小册》终于来了!包括线程的基本概念和使用方法、Java的内存模型、sychronized、volatile、CAS、AQS、ReentrantLock、线程池、并发容器、ThreadLocal、生产者消费者模型等面试和开发必须掌握的内容,共计 15 万余字,200+张手绘图,可以说是通俗易懂、风趣幽默……详情戳:太赞了,二哥的并发编程进阶之路.pdf

加入二哥的编程星球,在星球的第二个置顶帖「知识图谱」里就可以获取 PDF 版本。

二哥的并发编程进阶之路获取方式
二哥的并发编程进阶之路获取方式

定时任务 类有两个用途:指定时间延迟后执行任务;周期性重复执行任务。

JDK 1.5 之前,主要使用类来完成定时任务,但是有以下缺陷:

  • Timer 是单线程模式;
  • 如果在执行任务期间某个 TimerTask 耗时较久,就会影响其它任务的调度;
  • Timer 的任务调度是基于绝对时间的,对系统时间敏感;
  • Timer 不会捕获执行 TimerTask 时所抛出的异常,由于 Timer 是单线程的,所以一旦出现异常,线程就会终止,其他任务无法执行。

于是 JDK 1.5 之后,开发者就抛弃了 ,开始使用。先通过下面这张图感受下。

假设我们有这样一个需求,指定时间给其他人发送消息。那么我们会将消息(包含发送时间)存储在数据库中,然后用一个定时任务,每隔 1 秒检查数据库在当前时间有没有需要发送的消息,那这个计划任务怎么完成呢?下面是一个 Demo:

下面截取一段输出(demo 会一直运行下去):

这就是 的一个简单运用,接下来我们来看看它的实现原理。

继承了ThreadPoolExecutor,并实现了接口。线程池 ThreadPoolExecutor 在之前介绍过了,相信大家都还有印象,接下来我们来看看 接口。

接口继承了 接口,并增加了几个定时相关的接口方法。前两个方法用于单次调度执行任务,区别是有没有返回值。

重点介绍一下后面两个方法:

scheduleAtFixedRate 方法在时长后第一次执行任务,以后每隔时长再次执行任务。注意,period 是从任务开始执行算起的。开始执行任务后,定时器每隔 period 时长检查该任务是否完成,如果完成则再次启动任务,否则等该任务结束后才启动任务。看下图:

该方法在时长后第一次执行任务,以后每当任务执行完成后,等待时长,再次执行任务。看下图。

相信大家能体会出来其中的差异。

我们先看看里面涉及到的几个类和接口的关系图谱:

接口很简单,继承了接口,表示对象是可以比较排序的。

回到方法中,它创建了一个对象,由上面的关系图可知,直接或者间接实现了很多接口,一起看看里面的实现方法吧。

构造方法

Delayed 接口的实现

Comparable 接口的实现

setNextRunTime

Runnable 接口实现

总结一下 run 方法的执行过程:

  1. 如果当前线程池运行状态不运行执行任务,那么就取消该任务,然后直接返回,否则执行步骤 2;
  2. 如果不是周期性任务,调用 FutureTask 中的 run 方法执行,会设置执行结果,然后直接返回,否则执行步骤 3;
  3. 如果是周期性任务,调用 FutureTask 中的 runAndReset 方法执行,不会设置执行结果,然后直接返回,否则执行步骤 4 和步骤 5;
  4. 计算下次执行该任务的具体时间;
  5. 重复执行任务。

方法是为任务多次执行而设计的。方法执行完任务后不会设置任务的执行结果,也不会去更新任务的状态,以及维持任务的状态为初始状态(NEW状态),这也是该方法和 FutureTask 方法的区别。

我们看一下代码:

这个方法和类似,不同点是方法内部创建的是,带有初始延时和固定周期的任务。

也是通过体现的,唯一不同的地方在于创建的不同。

前面讲到的、和最后都调用了方法,该方法是定时任务执行的主要方法。 一起来看看源码:

方法的逻辑也很简单,主要就是将任务添加到等待队列,然后调用方法。

方法主要是调用了 方法,线程池中的工作线程就是通过该方法来启动并执行任务的。相信大家都还有印象。

对于,添加到线程池后会在等待队列中等待获取任务,这点是和是一致的。但是 worker 是怎么从等待队列取定时任务的呢?

使用了 来保存等待的任务。

该等待队列的队首应该保存的是最近将要执行的任务,所以只关心队首任务,如果队首任务的开始执行时间还未到,worker 也应该继续等待。

DelayedWorkQueue 是一个无界优先队列,使用数组存储,底层使用堆结构来实现优先队列的功能。

可以转换成如下的数组:

在这种结构中,可以发现有如下特性。假设,索引值从 0 开始,子节点的索引值为 k,父节点的索引值为 p,则:

  • 一个节点的左子节点的索引为:k = p * 2 + 1;
  • 一个节点的右子节点的索引为:k = (p + 1) * 2;
  • 一个节点的父节点的索引为:p = (k - 1) / 2。

我们来看看 DelayedWorkQueue 的声明和成员变量:

当一个线程成为 leader,它只需等待队首任务的 delay 时间即可,其他线程会无条件等待。leader 取到任务返回前要通知其他线程,直到有线程成为新的 leader。每当队首的定时任务被其他更早需要执行的任务替换,leader 就设置为 null,其他等待的线程(被当前 leader 通知)和当前的 leader 重新竞争成为 leader。

所有线程都会有三种身份中的一种:leader、follower,以及一个干活中的状态:proccesser。它的基本原则是,永远最多只有一个 leader。所有 follower 都在等待成为 leader。线程池启动时会自动产生一个 Leader 负责等待网络 IO 事件,当有一个事件产生时,Leader 线程首先通知一个 Follower 线程将其提拔为新的 Leader,然后自己就去干活了,去处理这个网络事件,处理完毕后加入 Follower 线程等待队列,等待下次成为 Leader。这种方法可以增强 CPU 高速缓存相似性,及消除动态内存分配和线程间的数据交换。

同时,定义了 ReentrantLock 锁 lock 和 Condition available 用于控制和通知下一个线程竞争成为 leader。

当一个新的任务成为队首,或者需要有新的线程成为 leader 时,available 监视器上的线程将会被通知,然后竞争成为 leader 线程。有些类似于生产者-消费者模式。

DelayedWorkQueue 是一个优先级队列,它可以保证每次出队的任务都是当前队列中执行时间最靠前的,由于它是基于堆结构的队列,堆结构在执行插入和删除操作时的最坏时间复杂度是 。

接下来看看中几个比较重要的方法。

方法是什么时候调用的呢?

在讲解线程池的时候,我们介绍了方法,工作线程会循环从中取任务。但计划任务却不同,因为一旦方法取出了任务就开始执行了,而这时可能还没有到执行时间,所以在方法中,要保证只有到指定的执行时间,任务才可以被取走。

总结一下流程:

  1. 如果堆顶元素为空,在 available 上等待。
  2. 如果堆顶任务的执行时间已到,将堆顶元素替换为堆的最后一个元素并调整堆使其满足最小堆特性,同时设置任务在堆中索引为-1,返回该任务。
  3. 如果 leader 不为空,说明已经有线程成为 leader 了,其他线程都要在 available 监视器上等待。
  4. 如果 leader 为空,当前线程成为新的 leader,并等待直到堆顶任务执行时间到达。
  5. take 方法返回之前,将 leader 设置为空,并通知其他线程。

再来说一下 leader 的作用,这里的 leader 是为了减少不必要的定时等待,当一个线程成为 leader 时,它只等待下一个节点的时间间隔,但其它线程无限期等待。 leader 线程必须在或返回之前 signal 其它线程,除非其他线程成为了 leader。

举例来说,如果没有 leader,那么在执行 take 时,都要执行,假设当前线程执行了该段代码,这时还没有 signal,第二个线程也执行了该段代码,则第二个线程也要被阻塞。

但只有一个线程返回队首任务,其他的线程在之后,继续执行 for 循环,因为队首任务已经被返回了,所以这个时候的 for 循环拿到的队首任务是新的,又需要重新判断时间,又要继续阻塞。

所以,为了不让多个线程频繁的做无用的定时等待,这里增加了 leader,如果 leader 不为空,则说明队列中第一个节点已经在等待出队,这时其它的线程会一直阻塞,减少了无用的阻塞(注意,在中调用了来唤醒一个线程,而不是)。

该方法往队列插入一个值,返回是否成功插入。

offer 方法实现了向延迟队列插入一个任务的操作,并保证整个队列仍然满足最小堆的性质。

最小堆(Min Heap)是一个完全二叉树,其中每一个父节点的值都小于或等于其子节点的值。换句话说,在最小堆中,根节点(即树的顶部)是所有节点中的最小值。

前面我们也提到过最小堆。我们来看一下用于调整堆的 siftUp 方法。

代码很好理解,就是循环的根据key节点与它的父节点来判断,如果key节点的执行时间小于父节点,则将两个节点交换,使执行时间靠前的节点排列在队列的前面。

假设新入队的节点的延迟时间(调用getDelay()方法获得)是5,执行过程如下:

1、先将新的节点添加到数组的尾部,这时新节点的索引k为7:

2、计算新父节点的索引:,parent = 3,那么的时间间隔值为8,因为 ,将执行:

3、这时将k设置为3,继续循环,再次计算parent为1,的时间间隔为3,因为 ,这时退出循环,最终k为3:

可见,每次新增节点时,只是根据父节点来判断,而不会影响兄弟节点。

是一个定时任务的线程池,它的主要作用是周期性的执行任务。它的实现原理是通过来保存等待的任务,是一个无界优先队列,使用数组存储,底层使用堆结构来实现优先队列的功能。

编辑:沉默王二,原文内容来源于朋友小七萤火虫开源的这个仓库:深入浅出 Java 多线程,强烈推荐。其他参考链接如下:

  • ideabuffer
  • 博客园

推荐阅读:读者三友的 11 种延迟任务的实现方式


GitHub 上标星 10000+ 的开源知识库《二哥的 Java 进阶之路》第二份 PDF 《并发编程小册》终于来了!包括线程的基本概念和使用方法、Java的内存模型、sychronized、volatile、CAS、AQS、ReentrantLock、线程池、并发容器、ThreadLocal、生产者消费者模型等面试和开发必须掌握的内容,共计 15 万余字,200+张手绘图,可以说是通俗易懂、风趣幽默……详情戳:太赞了,二哥的并发编程进阶之路.pdf

加入二哥的编程星球,在星球的第二个置顶帖「知识图谱」里就可以获取 PDF 版本。

二哥的并发编程进阶之路获取方式
二哥的并发编程进阶之路获取方式

我们前面讲过 CAS,相信大家都还有印象,Java 中的原子操作类,如 AtomicInteger 和 AtomicLong,底层就是利用 CAS 来确保变量更新的原子性的。

像递增运算 count++ 就不是一个原子操作,在多线程环境下并不能得到正确的结果,因为 count++ 操作实际上分为三个步骤:

  1. 读取 count 变量的值;
  2. 将 count 变量的值加 1;
  3. 将 count 变量的值写入到内存中;

假定线程 A 正在修改 count 变量,为了保证线程 B 在使用 count 的时候是线程 A 修改过后的状态,可以用 synchronized 关键字同步一手。

但多个线程之间访问 方法是互斥的,线程 B 访问的时候必须要等待线程 A 访问结束,有没有更好的办法呢?

AtomicInteger 是 JDK 提供的一个原子操作的 Integer 类,它提供的加减操作是线程安全的。于是我们可以这样:

你看,这下是不是就舒服多了,不用加锁,也能保证线程安全。OK,接下来,我们来看看原子操作类都有哪些?

基本类型的原子操作主要有这些:

  1. AtomicBoolean:以原子更新的方式更新 boolean;
  2. AtomicInteger:以原子更新的方式更新 Integer;
  3. AtomicLong:以原子更新的方式更新 Long;

这几个类的用法基本一致,这里以 AtomicInteger 为例。

  1. :增加给定的 delta,并获取新值。
  2. :增加 1,并获取新值。
  3. :获取当前值,并将新值设置为 newValue。
  4. :获取当前值,并增加 1。

还有一些方法,可以直接查看 API,都很好理解。

为了能够弄懂 AtomicInteger 的实现原理,以 getAndIncrement 方法为例,来看下源码:

可以看出,该方法实际上是调用了 unsafe 对象的 getAndAddInt 方法,unsafe 对象是通过通过 UnSafe 类的静态方法 getUnsafe 获取的:

Unsafe 类我们在讲 CAS 的时候也讲过,包括 AtomicInteger 类,相信大家还有印象。

Unsafe 类是 Java 中的一个特殊类,用于执行低级、不安全的操作。getAndIncrement 方法就是利用了 Unsafe 类提供的 CAS(Compare-And-Swap)操作来实现原子的 increment 操作。CAS 是一种常用的无锁技术,允许在多线程环境中原子地更新值。

好,下面用一个简单的例子来说明 AtomicInteger 的用法:

输出结果:

AtomicLong 和 AtomicInteger 的实现原理基本一致,只不过一个针对的是 long 型,一个针对的是 int 型。

AtomicBoolean 类是怎样实现更新的呢?核心方法是 方法,其源码如下:

该方法尝试将当前值从expect设置为update,但这种设置只会在当前值确实为expect时成功。方法返回true表示更新成功,否则返回false。

如果需要原子更新数组里的某个元素,atomic 也提供了相应的类:

  1. AtomicIntegerArray:这个类提供了一些原子更新 int 整数数组的方法。
  2. AtomicLongArray:这个类提供了一些原子更新 long 型证书数组的方法。
  3. AtomicReferenceArray:这个类提供了一些原子更新引用类型数组的方法。

这几个类的用法一致,就以 AtomicIntegerArray 来总结下常用的方法:

  1. :以原子更新的方式将数组中索引为 i 的元素与输入值相加;
  2. :以原子更新的方式将数组中索引为 i 的元素自增加 1;
  3. :将数组中索引为 i 的位置的元素进行更新

可以看出,AtomicIntegerArray 与 AtomicInteger 的方法基本一致,只不过在 AtomicIntegerArray 的方法中会多一个数组索引 i。下面举一个简单的例子:

输出结果:

通过 getAndAdd 方法将位置为 1 的元素加 5,从结果可以看出索引为 1 的元素变成了 7,该方法返回的也是相加之前的数为 2。

如果需要原子更新引用类型的话,atomic 也提供了相关的类:

  1. AtomicReference:原子更新引用类型;
  2. AtomicReferenceFieldUpdater:原子更新引用类型里的字段;
  3. AtomicMarkableReference:原子更新带有标记位的引用类型;

这几个类的使用方法也是基本一样,以 AtomicReference 为例,来说明这些类的基本用法。下面是一个 demo:

输出结果:

首先将对象 User1 用 AtomicReference 进行封装,然后调用 getAndSet 方法进行赋值,从结果可以看出,该方法会原子更新 user 对象,变为 。

如果需要更新对象的某个字段,atomic 同样也提供了相应的原子操作类:

  1. AtomicIntegeFieldUpdater:原子更新整型字段类;
  2. AtomicLongFieldUpdater:原子更新长整型字段类;
  3. AtomicStampedReference:原子更新引用类型,这种更新方式会带有版本号,是为了解决 CAS 的 ABA 问题,ABA 问题我们前面也讲过。

使用原子更新字段需要两步:

  1. 通过静态方法创建一个更新器,并且设置想要更新的类和字段;
  2. 字段必须使用进行修饰;

以 AtomicIntegerFieldUpdater 为例来看看具体的使用:

输出结果:

从示例中可以看出,创建是通过它提供的静态方法进行创建的,方法会将指定的字段加上输入的值,并返回相加之前的值。user 对象中 age 字段原值为 1,加 5 之后变成了 6。

Java 中的 java.util.concurrent.atomic 包提供了一系列类,这些类支持原子操作(即线程安全而无需同步)在单个变量上,这大大减少了并发编程的复杂性。

原子操作类主要有这些:

  1. 原子操作的基本数据类型:AtomicBoolean、AtomicInteger、AtomicLong;
  2. 原子操作的数组类型:AtomicIntegerArray、AtomicLongArray、AtomicReferenceArray;
  3. 原子操作的引用类型:AtomicReference、AtomicReferenceFieldUpdater、AtomicMarkableReference;

编辑:沉默王二,编辑前的内容主要来自于CL0610的 GitHub 仓库https://github.com/CL0610/Java-concurrency。推荐阅读:码农参上的Unsafe类详解


GitHub 上标星 10000+ 的开源知识库《二哥的 Java 进阶之路》第二份 PDF 《并发编程小册》终于来了!包括线程的基本概念和使用方法、Java的内存模型、sychronized、volatile、CAS、AQS、ReentrantLock、线程池、并发容器、ThreadLocal、生产者消费者模型等面试和开发必须掌握的内容,共计 15 万余字,200+张手绘图,可以说是通俗易懂、风趣幽默……详情戳:太赞了,二哥的并发编程进阶之路.pdf

加入二哥的编程星球,在星球的第二个置顶帖「知识图谱」里就可以获取 PDF 版本。

二哥的并发编程进阶之路获取方式
二哥的并发编程进阶之路获取方式

前面我们在讲 CAS 和原子操作 atomic 类的时候,都讲到了 Unsafe。

Unsafe 是 Java 中一个非常特殊的类,它为 Java 提供了一种底层、"不安全"的机制来直接访问和操作内存、线程和对象。正如其名字所暗示的,Unsafe 提供了许多不安全的操作,因此它的使用应该非常小心,并限于那些确实需要使用这些底层操作的场景。

首先我们来尝试获取一个 Unsafe 实例,如果按照的方式去创建,不好意思,编译器会直接报错:

查看 Unsafe 类的源码,可以发现它是被 final 修饰的,所以不允许被继承,并且构造方法为类型,即不允许我们直接 new 实例化。不过,Unsafe 在 static 静态代码块中,以单例的方式初始化了一个 Unsafe 对象:

Unsafe 类提供了一个静态方法,看上去貌似可以用它来获取 Unsafe 实例:

但是如果我们直接调用这个静态方法,也会抛出异常:

这是因为在方法中,会对调用者的进行检查,判断当前类是否由加载,如果不是的话就会抛出一个异常。

也就是说,只有启动类加载器加载的类才能够调用 Unsafe 类中的方法,这是为了防止这些方法在不可信的代码中被调用。

那么,为什么要对 Unsafe 类进行这么谨慎的使用限制呢?

说到底,还是因为它实现的功能过于底层,例如直接进行内存操作、绕过 jvm 的安全检查创建对象等等,概括的来说,Unsafe 类实现的功能可以被分为下面 8 类:

看到上面这些功能,你是不是已经有些迫不及待想要试一试了?

那么如果我们执意想要在自己的代码中调用 Unsafe 类的方法,应该怎么获取一个它的实例对象呢?

答案是利用反射获得 Unsafe 类中已经实例化完成的单例对象:

在获取到 Unsafe 的实例对象后,我们就可以使用它来为所欲为了,先来尝试使用它对一个对象的属性进行读写:

运行代码输出如下:

可以看到通过 Unsafe 类的方法获取到了对象中字段的偏移地址,这个偏移地址不是内存中的绝对地址而是一个相对地址,之后再通过这个偏移地址对类型字段的属性值进行读写操作,通过结果也可以看到 Unsafe 的方法和类中的方法获取到的值是相同的。

上面的例子中调用了 Unsafe 类的和方法,看一下源码中的方法:

先说作用,用于从对象的指定偏移地址处读取一个,用于在对象指定偏移地址处写入一个,并且即使类中的这个属性是类型的,也可以对它进行读写。

但是细心的小伙伴可能发现了,这两个方法相对于我们平常写的普通方法,多了一个native关键字修饰,并且没有具体的方法逻辑,那么它是怎么实现的呢?

native 方法我们讲过,这里简单回顾下。

方法,简单的说就是由 Java 调用非 Java 代码的接口,被调用的方法是由非 Java 语言实现的,例如它可以由 C 或 C++语言来实现,并编译成 DLL,然后直接供 Java 进行调用。方法是通过 JNI()实现调用的,从 Java 1.1 开始 JNI 标准就是 Java 平台的一部分,它允许 Java 代码和其他语言的代码进行交互。

Unsafe 类中的很多基础方法都属于方法,那么为什么要使用方法呢?原因可以概括为以下几点:

  • 需要用到 Java 中不具备的依赖于操作系统的特性,Java 在实现跨平台的同时要实现对底层的控制,需要借助其他语言发挥作用
  • 对于其他语言已经完成的一些现成功能,可以使用 Java 直接调用
  • 程序对时间敏感或对性能要求非常高时,有必要使用更加底层的语言,例如 C/C++甚至是汇编

包的很多并发工具类在实现并发机制时,都调用了方法,通过 native 方法可以打破 Java 运行时的界限,能够接触到操作系统底层的某些功能。

对于同一个方法,不同的操作系统可能会通过不同的方式来实现,但是对于使用者来说是透明的,最终都会得到相同的结果。

在对 Unsafe 的基础有了一定了解后,我们来看一下它的基本应用。

如果你写过或者,一定对内存操作不会陌生,而 Java 是不允许直接对内存进行操作的,对象内存的分配和回收都是由自己实现。但是在 Unsafe 中,提供的下列接口都可以直接进行内存操作:

使用下面的代码进行测试:

先看结果输出:

分析一下运行结果,首先使用方法申请 4 字节长度的内存空间,在循环中调用方法向每个字节写入内容为类型的 1,当使用 Unsafe 调用方法时,因为一个型变量占 4 个字节,会一次性读取 4 个字节,组成一个的值,对应的十进制结果为 ,可以通过图示理解这个过程:

代码中调用方法重新分配了一块 8 字节长度的内存空间,通过比较和可以看到和之前申请的内存地址是不同的。

在代码中的第二个 for 循环里,调用方法进行了两次内存的拷贝,每次拷贝内存地址开始的 4 个字节,分别拷贝到以和开始的内存空间上:

拷贝完成后,使用方法一次性读取 8 个字节,得到类型的值为 076673。

需要注意,通过这种方式分配的内存属于堆外内存,是无法进行垃圾回收的,需要我们把这些内存当做一种资源去手动调用方法进行释放,否则会产生内存泄漏。

通用的操作内存方式是在中执行对内存的操作,最后在块中进行内存的释放。

在介绍内存屏障前,需要知道编译器和 CPU 会在保证程序输出结果一致的情况下,会对代码进行重排序,从指令优化角度提升性能。

而指令重排序可能会带来一个不好的结果,导致 CPU 的高速缓存和内存中数据的不一致,而内存屏障()就是通过组织屏障两边的指令重排序从而避免编译器和硬件的不正确优化情况。

在硬件层面上,内存屏障是 CPU 为了防止代码进行重排序而提供的指令,不同的硬件平台上实现内存屏障的方法可能并不相同。

在 Java8 中,引入了 3 个内存屏障的方法,它屏蔽了操作系统底层的差异,允许在代码中定义、并统一由 jvm 来生成内存屏障指令,来实现内存屏障的功能。Unsafe 中提供了下面三个内存屏障相关方法:

内存屏障可以看做对内存随机访问操作中的一个同步点,使得此点之前的所有读写操作都执行后才可以开始执行此点之后的操作。

以方法为例,它会禁止读操作重排序,保证在这个屏障之前的所有读操作都已经完成,并且将缓存数据设为无效,重新从主存中进行加载。

看到这估计很多小伙伴们会想到 volatile 关键字了,如果在字段上添加了关键字,就能够实现字段在多线程下的可见性。

基于读内存屏障,我们也能实现相同的功能。下面定义一个线程方法,在线程中去修改标志位,注意这里的是没有被修饰的:

在主线程的循环中,加入内存屏障,测试是否能够感知到的修改变化:

运行结果:

而如果删掉上面代码中的方法,那么主线程将无法感知到发生的变化,会一直在中循环。可以用图来表示上面的过程:

了解 Java 内存模型()的小伙伴们应该清楚,运行中的线程不是直接读取主内存中变量的,只能操作自己工作内存中的变量,然后同步到主内存中,并且线程的工作内存是不能共享的。

上图中的流程就是子线程借助于主内存,将修改后的结果同步给了主线程,进而修改主线程中的工作空间,跳出循环。

01、对象成员属性的内存偏移量获取,以及字段属性值的修改,在上面的例子中我们已经测试过了。

除了前面的、方法外,Unsafe 提供了 8 种基础数据类型以及的和方法,并且所有的方法都可以越过访问权限,直接修改内存中的数据。

阅读 openJDK 源码中的注释可以发现,基础数据类型和的读写稍有不同,基础数据类型是直接操作的属性值(),而的操作则是基于引用值()。下面是的读写方法:

除了对象属性的普通读写外,Unsafe 还提供了volatile 读写有序写入方法。读写方法的覆盖范围与普通读写相同,包含了全部基础数据类型和类型,以类型为例:

相对于普通读写来说,读写具有更高的成本,因为它需要保证可见性和有序性。在执行操作时,会强制从主存中获取属性值,在使用方法设置属性值时,会强制将值更新到主存中,从而保证这些变更对其他线程是可见的。

有序写入的方法有以下三个:

有序写入的成本相对较低,因为它只保证写入时的有序性,而不保证可见性,也就是一个线程写入的值不能保证其他线程立即可见。

为了解决这里的差异性,需要对内存屏障的知识点再进一步进行补充,首先需要了解两个指令的概念:

  • :将主内存中的数据拷贝到处理器的缓存中
  • :将处理器缓存的数据刷新到主内存中

顺序写入与写入的差别在于,在顺序写时加入的内存屏障类型为类型,而在写入时加入的内存屏障是类型,如下图所示:

在有序写入方法中,使用的是屏障,该屏障确保立刻刷新数据到内存,这一操作先于以及后续的存储指令操作。

而在写入中,使用的是屏障,该屏障确保立刻刷新数据到内存,这一操作先于及后续的装载指令,并且,屏障会使该屏障之前的所有内存访问指令,包括存储指令和访问指令全部完成之后,才执行该屏障之后的内存访问指令。

综上所述,在上面的三类写入方法中,在写入效率方面,按照、、的顺序效率逐渐降低,

02、使用 Unsafe 的方法,允许我们使用非常规的方式进行对象的实例化,首先定义一个实体类,并且在构造方法中对其成员变量进行赋值操作:

分别基于构造方法、反射以及 Unsafe 方法的不同方式创建对象进行比较:

打印结果分别为 1、1、0,说明通过方法创建对象过程中,不会调用类的构造方法。

使用这种方式创建对象时,只用到了对象,所以说如果想要跳过对象的初始化阶段或者跳过构造器的安全检查,就可以使用这种方法。

在上面的例子中,如果将 A 类的构造方法改为类型,将无法通过构造方法和反射创建对象,但方法仍然有效。

在 Unsafe 中,可以使用方法获取数组中第一个元素的偏移地址,使用方法可以获取数组中元素间的偏移地址增量。使用下面的代码进行测试:

上面代码的输出结果为:

通过配合使用数组偏移首地址和各元素间偏移地址的增量,可以方便的定位到数组中的元素在内存中的位置,进而通过方法直接获取任意位置的数组元素。

需要说明的是,获取的并不是数组中元素占用的大小,而是地址的增量,按照 openJDK 中的注释,可以将它翻译为元素寻址的转换因子()。

在上面的例子中,第一个字符串长度为 11 字节,但其地址增量仍然为 4 字节。

那么,基于这两个值是如何实现寻址和数组元素的访问呢?

我们把上面例子中的 String 数组对象的内存布局画出来,方便大家理解:

在 String 数组对象中,对象头包含 3 部分,标记字占用 8 字节,类型指针占用 4 字节,数组对象特有的数组长度部分占用 4 字节,总共占用了 16 字节。

第一个 String 的引用类型相对于对象的首地址的偏移量是就 16,之后每个元素在这个基础上加 4,正好对应了我们上面代码中的寻址过程,之后再使用前面说过的方法,通过数组对象可以获得对象在堆中的首地址,再配合对象中变量的偏移量,就能获得每一个变量的引用。

在包的并发工具类中大量地使用了 CAS 操作,像在前面介绍的 synchronized 和 AQS 的文章中也多次提到了 CAS,其作为乐观锁在并发工具类中广泛发挥了作用。

在 Unsafe 类中,提供了、、方法来实现的对、、类型的 CAS 操作。以方法为例:

参数中为需要更新的对象,是对象中整形字段的偏移量,如果这个字段的值与相同,则将字段的值设为这个新值,并且此更新是不可被中断的,也就是一个原子操作。下面是一个使用的例子:

运行代码会依次输出:

在上面的例子中,使用两个线程去修改型属性的值,并且只有在的值等于传入的参数减一时,才会将的值变为,也就是实现对的加一的操作。流程如下所示:

需要注意的是,在调用方法后,会直接返回或的修改结果,因此需要我们在代码中手动添加自旋的逻辑。

在类的设计中,也是采用了将的结果作为循环条件,直至修改成功才退出死循环的方式来实现的原子性的自增操作。

Unsafe 类中提供了、、、、方法进行线程调度,在前面介绍 AQS 的文章中我们提到过使用 LockSupport 挂起或唤醒指定线程。这个类我们前面也讲到了,这里再回顾一下。

看一下的源码,可以看到它也是调用的 Unsafe 类中的方法:

LockSupport 的方法调用了 Unsafe 的方法来阻塞当前线程,此方法将线程阻塞后就不会继续往后执行,直到有其他线程调用方法唤醒当前线程。下面的例子对 Unsafe 的这两个方法进行测试:

程序输出为:

程序运行的流程也比较容易看懂,子线程开始运行后先进行睡眠,确保主线程能够调用方法阻塞自己,子线程在睡眠 5 秒后,调用方法唤醒主线程,使主线程能继续向下执行。整个流程如下图所示:

此外,Unsafe 源码中相关的三个方法已经被标记为,不建议被使用:

方法用于获得对象锁,用于释放对象锁,如果对一个没有被加锁的对象执行此方法,会抛出异常。方法尝试获取对象锁,如果成功则返回,反之返回。

Unsafe 对的相关操作主要包括类加载和静态变量的操作方法。

01、静态属性读取相关的方法:

创建一个包含静态属性的类,进行测试:

运行结果:

在 Unsafe 的对象操作中,我们学习了通过方法获取对象属性偏移量并基于它对变量的值进行存取,但是它不适用于类中的静态属性,这时候就需要使用方法。

在上面的代码中,获取对象需要依赖,而获取静态变量的属性时则不再依赖于。

在上面的代码中,首先创建一个对象,这是因为如果一个类没有被实例化,那么它的静态属性也不会被初始化,最后获取的字段属性将是。所以在获取静态属性前,需要调用方法,判断在获取前是否需要初始化这个类。如果删除创建 User 对象的语句,运行结果会变为:

02、使用方法允许程序在运行时动态地创建一个类,方法定义如下:

在实际使用过程中,可以只传入字节数组、起始字节的下标以及读取的字节长度,默认情况下,类加载器()和保护域()来源于调用此方法的实例。下面的例子中实现了反编译生成后的 class 文件的功能:

在上面的代码中,首先读取了一个文件并通过文件流将它转化为字节数组,之后使用方法动态的创建了一个类,并在后续完成了它的实例化工作,流程如下图所示,并且通过这种方式创建的类,会跳过 JVM 的所有安全检查。

除了方法外,Unsafe 还提供了一个方法:

使用该方法可以动态的创建一个匿名类,Lambda表达式中就是使用 ASM 动态生成字节码的,然后利用该方法定义实现相应的函数式接口的匿名类。

在 JDK 15 发布的新特性中,在隐藏类()一条中,指出将在未来的版本中弃用 Unsafe 的方法。

Unsafe 中提供的和方法用于获取系统信息,调用方法会返回系统指针的大小,如果在 64 位系统下默认会返回 8,而 32 位系统则会返回 4。调用 pageSize 方法会返回内存页的大小,值为 2 的整数幂。使用下面的代码可以直接进行打印:

执行结果:

这两个方法的应用场景比较少,在类中,在使用计算所需的内存页的数量时,调用了方法获取内存页的大小。另外,在使用方法拷贝内存时,调用了方法,检测 32 位系统的情况。

在本文中,我们首先介绍了 Unsafe 的基本概念、工作原理,并在此基础上,对它的 API 进行了说明与实践。

相信大家通过这一过程,能够发现 Unsafe 在某些场景下,确实能够为我们提供编程便利。但在使用这些便利时,确实存在着一些安全上的隐患,在我看来,一项技术具有不安全因素并不可怕,可怕的是它在使用过程中被滥用。

尽管之前有传言说会在 Java9 中移除 Unsafe 类,不过它还是照样已经存活到了 JDK 16,按照存在即合理的逻辑,只要使用得当,它还是能给我们带来不少的帮助,因此最后还是建议大家,在使用 Unsafe 的过程中一定要做到使用谨慎使用、避免滥用。

编辑:沉默王二,编辑前的内容主要来自于朋友码农参上的公众号文章,写得非常好,推荐关注。


GitHub 上标星 10000+ 的开源知识库《二哥的 Java 进阶之路》第二份 PDF 《并发编程小册》终于来了!包括线程的基本概念和使用方法、Java的内存模型、sychronized、volatile、CAS、AQS、ReentrantLock、线程池、并发容器、ThreadLocal、生产者消费者模型等面试和开发必须掌握的内容,共计 15 万余字,200+张手绘图,可以说是通俗易懂、风趣幽默……详情戳:太赞了,二哥的并发编程进阶之路.pdf

加入二哥的编程星球,在星球的第二个置顶帖「知识图谱」里就可以获取 PDF 版本。

二哥的并发编程进阶之路获取方式
二哥的并发编程进阶之路获取方式

JDK 中提供了一些并发编程中常用的通信工具类以供我们开发者使用,比如说 CountDownLatch,Semaphore,Exchanger,CyclicBarrier,Phaser。

它们都在 JUC 包下。先总体概括一下都有哪些工具类,它们有什么作用,然后再分别介绍它们的主要使用方法和原理。

类 作用 Semaphore 限制线程的数量 Exchanger 两个线程交换数据 CountDownLatch 线程等待直到计数器减为 0 时开始工作 CyclicBarrier 作用跟 CountDownLatch 类似,但是可以重复使用 Phaser 增强的 CyclicBarrier

Semaphore 翻译过来是信号的意思。顾名思义,这个工具类提供的功能就是多个线程彼此“传信号”。而这个“信号”是一个类型的数据,也可以看成是一种“资源”。

可以在构造方法中传入初始资源总数,以及是否使用“公平”的同步器。默认情况下,是非公平的。

最主要的方法是 acquire 方法和 release 方法。方法会申请一个 permit,而 release 方法会释放一个 permit。当然,你也可以申请多个 或者释放多个 。

每次 acquire,permits 就会减少一个或者多个。如果减少到了 0,再有其他线程来 acquire,那就要阻塞这个线程直到有其它线程 release permit 为止。

Semaphore 往往用于资源有限的场景中,去限制线程的数量。举个例子,我想限制同时只能有 3 个线程在工作:

输出:

可以看到,在这次运行中,最开始是 1, 0, 6 这三个线程获得了资源,而其它线程进入了等待队列。然后当某个线程释放资源后,就会有等待队列中的线程获得资源。

当然,Semaphore 默认的 acquire 方法是会让线程进入等待队列,且抛出异常中断。但它还有一些方法可以忽略中断或不进入阻塞队列:

Semaphore 内部有一个继承了 AQS 的同步器 Sync,重写了方法。在这个方法里,会去尝试获取资源。

如果获取失败(想要的资源数量小于目前已有的资源数量),就会返回一个负数(代表尝试获取资源失败)。然后当前线程就会进入 AQS 的等待队列。

Exchanger 类用于两个线程交换数据。它支持泛型,也就是说你可以在两个线程之间传送任何数据。先来一个案例看看如何使用,比如两个线程之间想要传送字符串:

输出:

可以看到,当一个线程调用 exchange 方法后,会处于阻塞状态,只有当另一个线程也调用了 exchange 方法,它才会继续执行。

看源码可以发现它是使用 park/unpark 来实现等待状态切换的,但是在使用 park/unpark 方法之前,使用了 CAS 检查,估计是为了提高性能。

因为 Exchanger 支持泛型,所以我们可以传输任何的数据,比如 IO 流或者 IO 缓存。根据 JDK 里面注释的说法,可以总结为一下特性:

  • 此类提供对外的操作是同步的;
  • 用于成对出现的线程之间交换数据;
  • 可以视作双向的同步队列;
  • 可应用于基因算法、流水线设计等场景。

Exchanger 类还有一个有超时参数的方法,如果在指定时间内没有另一个线程调用 exchange,就会抛出一个超时异常。

那么问题来了,Exchanger 只能是两个线程交换数据吗?那三个调用同一个实例的 exchange 方法会发生什么呢?答案是只有前两个线程会交换数据,第三个线程会进入阻塞状态。

需要注意的是,exchange 是可以重复使用的。也就是说。两个线程可以使用 Exchanger 在内存中不断地再交换数据。

先来解读一下 CountDownLatch 这个类名的意义。CountDown 代表计数递减,Latch 是“门闩”的意思。也有人把它称为“屏障”。而 CountDownLatch 这个类的作用也很贴合这个名字的意义,假设某个线程在执行任务之前,需要等待其它线程完成一些前置任务,必须等所有的前置任务都完成,才能开始执行本线程的任务。

CountDownLatch 的方法也很简单,如下:

我们知道,玩游戏的时候,在游戏真正开始之前,一般会等待一些前置任务完成,比如“加载地图数据”,“加载人物模型”,“加载背景音乐”等等。只有当所有的东西都加载完成后,玩家才能真正进入游戏。下面我们就来模拟一下这个 demo。

输出:

其实 CountDownLatch 类的原理挺简单的,内部同样是一个继承了 AQS 的实现类 Sync,且实现起来还很简单,可能是 JDK 里面 AQS 的子类中最简单的实现了,有兴趣的小伙伴可以去看看这个内部类的源码。

需要注意的是构造器中的计数值(count)实际上就是闭锁需要等待的线程数量。这个值只能被设置一次,而且 CountDownLatch没有提供任何机制去重新设置这个计数值

CyclicBarrirer 从名字上来理解是“循环屏障”的意思。前面提到了 CountDownLatch 一旦计数值被降为 0 后,就不能再重新设置了,它只能起一次“屏障”的作用。而 CyclicBarrier 拥有 CountDownLatch 的所有功能,还可以使用方法重置屏障。

如果参与者(线程)在等待的过程中,Barrier 被破坏,就会抛出 BrokenBarrierException。可以用方法检测 Barrier 是否被破坏。

  1. 如果有线程已经处于等待状态,调用 reset 方法会导致已经在等待的线程出现 BrokenBarrierException 异常。并且由于出现了 BrokenBarrierException,将会导致始终无法等待。
  2. 如果在等待的过程中,线程被中断,会抛出 InterruptedException 异常,并且这个异常会传播到其他所有的线程。
  3. 如果在执行屏障操作过程中发生异常,则该异常将传播到当前线程中,其他线程会抛出 BrokenBarrierException,屏障被损坏。
  4. 如果超出指定的等待时间,当前线程会抛出 TimeoutException 异常,其他线程会抛出 BrokenBarrierException 异常。

我们同样用玩游戏的例子。如果玩一个游戏有多个“关卡”,那使用 CountDownLatch 显然不太合适,因为需要为每个关卡都创建一个实例。那我们可以使用 CyclicBarrier 来实现每个关卡的数据加载等待功能。

输出:

注意这里跟 CountDownLatch 的代码有一些不同。CyclicBarrier 没有分为和,而是只有单独的一个方法。

一旦调用 await 方法的线程数量等于构造方法中传入的任务总量(这里是 3),就代表达到屏障了。CyclicBarrier 允许我们在达到屏障的时候可以执行一个任务,可以在构造方法传入一个 Runnable 类型的对象。

上述案例就是在达到屏障时,输出“本关卡所有前置任务完成,开始游戏...”。

CyclicBarrier 虽说功能与 CountDownLatch 类似,但是实现原理却完全不同,CyclicBarrier 内部使用的是 Lock + Condition 实现的等待/通知模式。详情可以查看这个方法的源码:

Phaser 是 Java 7 中引入的一个并发同步工具,它提供了对动态数量的线程的同步能力,这与 CyclicBarrier 和 CountDownLatch 不同,因为它们都需要预先知道等待的线程数量。Phaser 是多阶段的,意味着它可以同步不同阶段的多个操作。

前面我们介绍了 CyclicBarrier,可以发现它在构造方法里传入了“任务总量”之后,就不能修改这个值了,并且每次调用方法也只能消耗一个计数。但 Phaser 可以动态地调整任务总量!

Phaser 是阶段性的,所以它有一个内部的阶段计数器。每当我们到达一个阶段的结尾时,Phaser 会自动前进到下一个阶段。

名词解释:

  • Party:Phaser 的上下文中,一个 party 可以是一个线程,也可以是一个任务。当我们在 Phaser 上注册一个 party 时,Phaser 会递增它的参与者数量。
  • arrive:对应一个 party 的状态,初始时是 unarrived,当调用或者 进入 arrive 状态,可以通过获取当前未到达的数量。
  • register:注册一个新的 party 到 Phaser。
  • deRegister:减少一个 party。
  • phase:阶段,当所有注册的 party 都 arrive 之后,将会调用 Phaser 的方法来判断是否要进入下一阶段。

Phaser 的终止有两种途径,Phaser 维护的线程执行完毕或者返回。

还是游戏的案例。假设我们游戏有三个关卡,但只有第一个关卡有新手教程,需要加载新手教程模块。但后面的第二个关卡和第三个关卡都不需要。我们可以用 Phaser 来做这个需求。

代码:

输出:

这里要注意关卡 1 的输出,在“加载新手教程”线程中调用了减少一个 party 之后,后面的线程使用得到的是已经被修改后的 parties 了。但是当前这个阶段(phase),仍然是需要 4 个 parties 都 arrive 才触发屏障的。从下一个阶段开始,才需要 3 个 parties 都 arrive 就触发屏障。

Phaser 类用来控制某个阶段的线程数量很有用,但它并不在意这个阶段具体有哪些线程 arrive,只要达到它当前阶段的 parties 值,就触发屏障。所以我这里的案例虽然制定了特定的线程(加载新手教程)来更直观地表述 Phaser 的功能,但其实 Phaser 是没有分辨具体是哪个线程的功能的,它在意的只是数量,这一点需要大家注意。

Phaser 类的原理相比起来要复杂得多。它内部使用了两个基于 Fork-Join 框架的原子类辅助:

有兴趣的小伙伴可以去看看 JDK 源代码,这里不做过多叙述。

总的来说,CountDownLatch,CyclicBarrier,Phaser 是一个比一个强大,但也一个比一个复杂,需要根据自己的业务需求合理选择。

编辑:沉默王二,部分内容来源于朋友小七萤火虫开源的这个仓库:深入浅出 Java 多线程


GitHub 上标星 10000+ 的开源知识库《二哥的 Java 进阶之路》第二份 PDF 《并发编程小册》终于来了!包括线程的基本概念和使用方法、Java的内存模型、sychronized、volatile、CAS、AQS、ReentrantLock、线程池、并发容器、ThreadLocal、生产者消费者模型等面试和开发必须掌握的内容,共计 15 万余字,200+张手绘图,可以说是通俗易懂、风趣幽默……详情戳:太赞了,二哥的并发编程进阶之路.pdf

加入二哥的编程星球,在星球的第二个置顶帖「知识图谱」里就可以获取 PDF 版本。

二哥的并发编程进阶之路获取方式
二哥的并发编程进阶之路获取方式

并发编程领域的任务可以分为三种:简单并行任务、聚合任务和批量并行任务,见下图。

这些模型之外,还有一种任务模型被称为“分治”。分治是一种解决复杂问题的思维方法和模式;具体而言,它将一个复杂的问题分解成多个相似的子问题,然后再将这些子问题进一步分解成更小的子问题,直到每个子问题变得足够简单从而可以直接求解。

从理论上讲,每个问题都对应着一个任务,因此分治实际上就是对任务的划分和组织。分治思想在许多领域都有广泛的应用。例如,在算法领域,我们经常使用分治算法来解决问题(如归并排序和快速排序都属于分治算法,二分查找也是一种分治算法)。在大数据领域,MapReduce 计算框架背后的思想也是基于分治。

由于分治这种任务模型的普遍性,Java 并发包提供了一种名为 Fork/Join 的并行计算框架,专门用于支持分治任务模型的应用。

分治任务模型可分为两个阶段:一个阶段是 任务分解,就是迭代地将任务分解为子任务,直到子任务可以直接计算出结果;另一个阶段是 结果合并,即逐层合并子任务的执行结果,直到获得最终结果。下图是一个简化的分治任务模型图,你可以对照着理解。

在这个分治任务模型里,任务和分解后的子任务具有相似性,这种相似性往往体现在任务和子任务的算法是相同的,但是计算的数据规模是不同的。具备这种相似性的问题,我们往往都采用递归算法。

Fork/Join 是一个并行计算框架,主要用于支持分治任务模型。在这个计算框架中,Fork 代表任务的分解,而 Join 代表结果的合并。

Fork/Join 计算框架主要由两部分组成:分治任务的线程池 ForkJoinPool 和分治任务 ForkJoinTask。

这两部分的关系类似于 ThreadPoolExecutor 和 Runnable 之间的关系,都是用于提交任务到线程池的,只不过分治任务有自己独特的类型 ForkJoinTask。

ForkJoinTask 是一个抽象类,其中有许多方法,其中最核心的是 方法和 方法。fork 方法用于异步执行一个子任务,而 join 方法通过阻塞当前线程来等待子任务的执行结果。

ForkJoinTask 有两个子类:RecursiveAction 和 RecursiveTask。

从它们的名字就可以看出,都是通过递归的方式来处理分治任务的。这两个子类都定义了一个抽象方法 ,不同之处在于 RecursiveAction 的 compute 方法没有返回值,而 RecursiveTask 的 compute 方法有返回值。这两个子类也都是抽象类,在使用时需要创建自定义的子类来扩展功能。

接下来,让我们使用 Fork/Join 并行计算框架来计算斐波那契数列(下面的代码示例源自 Java 官方示例)。

首先,我们需要创建一个 ForkJoinPool 线程池以及一个用于计算斐波那契数列的 Fibonacci 分治任务。然后,通过调用 ForkJoinPool 线程池的 方法来启动分治任务。

由于计算斐波那契数列需要返回结果,所以我们的 Fibonacci 类继承自 RecursiveTask。Fibonacci 分治任务需要实现 compute 方法,在这个方法中,逻辑与普通计算斐波那契数列的方法非常相似,只是在计算 时使用了异步子任务,这是通过 语句来实现。

运行程序,我们会得到如下的结果:

Fork/Join 并行计算的核心组件是 ForkJoinPool。下面简单介绍一下 ForkJoinPool 的工作原理。

当我们通过 ForkJoinPool 的 invoke 或 submit 方法提交任务时,ForkJoinPool 会根据一定的路由规则将任务分配到一个任务队列中。如果任务执行过程中创建了子任务,那么子任务会被提交到对应工作线程的任务队列中。

ForkJoinPool 中有一个数组形式的成员变量 ,其对应一个队列数组,每个队列对应一个消费线程。丢入线程池的任务,根据特定规则进行转发。

当工作线程的任务队列为空时,它是否无事可做呢?

不是的。ForkJoinPool 引入了一种称为"任务窃取"的机制。当工作线程空闲时,它可以从其他工作线程的任务队列中"窃取"任务。

例如,下图中线程 T2 的任务队列已经为空,它可以窃取线程 T1 任务队列中的任务。这样,所有的工作线程都能保持忙碌的状态。

ForkJoinPool 中的任务队列采用双端队列的形式。工作线程从任务队列的一个端获取任务,而"窃取任务"从另一端进行消费。这种设计能够避免许多不必要的数据竞争。

ForkJoinPool 与 ThreadPoolExecutor 有很多相似之处,例如都是线程池,都是用于执行任务的。但是,它们之间也有很多不同之处。

首先,ForkJoinPool 采用的是"工作窃取"的机制,而 ThreadPoolExecutor 采用的是"工作复用"的机制。这两种机制各有优劣,ForkJoinPool 的优势在于能够充分利用 CPU 的多核能力,而 ThreadPoolExecutor 的优势在于能够避免线程间的上下文切换。

其次,ForkJoinPool 采用的是分治任务模型,而 ThreadPoolExecutor 采用的是简单并行任务模型。这两种任务模型各有优劣,ForkJoinPool 的优势在于能够处理分治任务,而 ThreadPoolExecutor 的优势在于能够处理简单并行任务。

最后,ForkJoinPool 采用的是 LIFO 的任务队列,而 ThreadPoolExecutor 采用的是 FIFO 的任务队列。这两种任务队列各有优劣,ForkJoinPool 的优势在于能够避免数据竞争,而 ThreadPoolExecutor 的优势在于能够保证任务的顺序性。

假设:我们要计算 1 到 1 亿的和,为了加快计算的速度,我们自然想到算法中的分治原理,将 1 亿个数字分成 1 万个任务,每个任务计算 1 万个数值的综合,利用 CPU 的并发计算性能缩短计算时间。

由于 ThreadPoolExecutor 可以通过 Future 获取到执行结果,因此利用 ThreadPoolExecutor 也是可行的。

当然 ForkJoinPool 实现也是可以的。下面我们将这两种方式都实现一下,看看这两种实现方式有什么不同。

无论哪种实现方式,其大致思路都是:

  1. 按照线程池里线程个数 N,将 1 亿个数划分成 N 等份,随后丢入线程池进行计算。
  2. 每个计算任务使用 Future 接口获取计算结果,最后积加即可。

我们先使用 ThreadPoolExecutor 实现。

首先,定义一个 Calculator 接口,表示计算数字总和这个动作,如下所示。

接着,我们定义一个使用 ThreadPoolExecutor 线程池实现的类,如下所示。

如上面代码所示,我们实现了一个计算单个任务的类 SumTask,在该类中对数值进行累加。其次,我们在 方法中,对 1 亿的数字进行拆分,接着扔给线程池计算,最后阻塞等待计算结果,最终累加起来。

我们运行上面的代码,可以得到顺利得到最终结果,如下所示。

接着我们使用 ForkJoinPool 来实现。

我们首先实现 SumTask 继承 RecursiveTask 抽象类,并在 compute 方法中定义拆分逻辑及计算。最后在 sumUp 方法中调用 pool 方法进行计算,代码如下所示。

运行上面的代码,结果为:

对比 ThreadPoolExecutor 和 ForkJoinPool 这两者的实现,可以发现它们都有任务拆分的逻辑,以及最终合并数值的逻辑。但 ForkJoinPool 相比 ThreadPoolExecutor 来说,做了一些实现上的封装,例如:

  • 不用手动去获取子任务的结果,而是使用 join 方法直接获取结果。
  • 将任务拆分的逻辑,封装到 RecursiveTask 实现类中,而不是裸露在外。

因此对于没有父子任务依赖,但是希望获取到子任务执行结果的并行计算任务,就可以使用 ForkJoinPool 来实现。在这种情况下,使用 ForkJoinPool 实现更多是代码实现方便,封装做得更加好。

MapReduce 是一个编程模型,同时也是一个处理和生成大数据集的处理框架。它源于 Google,用于支持在大型数据集上的分布式计算。这个框架主要由两个步骤组成:Map 步骤和 Reduce 步骤,这也是它名字的由来。

Fork/Join 并行计算框架通常被用来实现学习 MapReduce 的入门程序,该程序用于统计文件中每个单词的数量。

首先,我们可以使用二分法递归地将文件拆分为更小的部分,直到每个部分只有一行数据。然后,在每个部分中统计单词的数量,并逐级汇总结果。你可以参考之前提到的简化版分治任务模型图来理解该过程。

现在,让我们开始实现。下面的代码使用了字符串数组来模拟文件内容,其中每个元素与文件中的行数据一一对应。关键代码位于方法中,这是一个递归方法。它将前半部分数据 fork 一个递归任务进行处理(关键代码:),而后半部分数据在当前任务中递归处理()。

这个示例程序是对 Fork/Join 模型的简化,实际上在真正的 MapReduce 框架中,还涉及到数据划分、映射阶段、归约阶段等更多的步骤。但是通过此示例,大家可以初步了解如何使用 Fork/Join 并行计算框架来处理类似的任务。

Fork/Join 并行计算框架主要解决的是分治任务。分治的核心思想是“分而治之”:将一个大的任务拆分成小的子任务去解决,然后再把子任务的结果聚合起来从而得到最终结果。这个过程非常类似于大数据处理中的 MapReduce,所以你可以把 Fork/Join 看作单机版的 MapReduce。

Fork/Join 并行计算框架的核心组件是 ForkJoinPool。ForkJoinPool 支持任务窃取机制,能够让所有线程的工作量基本均衡,不会出现有的线程很忙,而有的线程很闲的状况,所以性能很好。

Java 1.8 提供的 Stream API 里面并行流也是以 ForkJoinPool 为基础的。不过需要注意的是,默认情况下所有的并行流计算都共享一个 ForkJoinPool,这个共享的 ForkJoinPool 默认的线程数是 CPU 的核数;如果所有的并行流计算都是 CPU 密集型计算的话,完全没有问题,但是如果存在 I/O 密集型的并行流计算,那么很可能会因为一个很慢的 I/O 计算而拖慢整个系统的性能。

所以 建议用不同的 ForkJoinPool 执行不同类型的计算任务

编辑:沉默王二,部分内容来源于这篇文章:分而治之思想Forkjoin,还有一部分内容来源于朋友陈树义的这篇文章,内容很顶,强烈推荐。还有一部分图片来自于朋友「日拱一兵」的文章。


GitHub 上标星 10000+ 的开源知识库《二哥的 Java 进阶之路》第二份 PDF 《并发编程小册》终于来了!包括线程的基本概念和使用方法、Java的内存模型、sychronized、volatile、CAS、AQS、ReentrantLock、线程池、并发容器、ThreadLocal、生产者消费者模型等面试和开发必须掌握的内容,共计 15 万余字,200+张手绘图,可以说是通俗易懂、风趣幽默……详情戳:太赞了,二哥的并发编程进阶之路.pdf

加入二哥的编程星球,在星球的第二个置顶帖「知识图谱」里就可以获取 PDF 版本。

二哥的并发编程进阶之路获取方式
二哥的并发编程进阶之路获取方式

生产者-消费者模式是一个十分经典的多线程并发协作模式,弄懂生产者-消费者问题能够让我们对并发编程的理解加深。

所谓的生产者-消费者,实际上包含了两类线程,一种是生产者线程用于生产数据,另一种是消费者线程用于消费数据,为了解耦生产者和消费者的关系,通常会采用共享的数据区域,就像是一个仓库,生产者生产数据之后直接放置在共享数据区中,并不需要关心消费者的行为;而消费者只需要从共享数据区中获取数据,不需要关心生产者的行为。

这个共享数据区域中应该具备这样的线程间并发协作功能:

  1. 如果共享数据区已满的话,阻塞生产者继续生产数据;
  2. 如果共享数据区为空的话,阻塞消费者继续消费数据;

在实现生产者消费者问题时,可以采用三种方式:

  1. 使用 Object 的 wait/notify 的消息通知机制;
  2. 使用 Lock Condition 的 await/signal 消息通知机制;
  3. 使用 BlockingQueue 实现。

可以通过 Object 对象的 wait 方法和 notify 方法或 notifyAll 方法来实现线程间的通信。

调用 wait 方法将阻塞当前线程,直到其他线程调用了 notify 方法或 notifyAll 方法进行通知,当前线程才能从 wait 方法处返回,继续执行下面的操作。

这些知识我们在讲 Condition 的时候其实讲到过,相信大家都还有印象。

01、wait

该方法用来将当前线程置入休眠状态,直到接到通知或被中断为止。

在调用 wait 之前,线程必须获得该对象的监视器锁,即只能在同步方法或同步块中调用 wait 方法。调用 wait 方法之后,当前线程会释放锁。如果调用 wait 方法时,线程并未获取到锁的话,则会抛出 IllegalMonitorStateException异常。如果再次获取到锁的话,当前线程才能从 wait 方法处成功返回。

02、notify

该方法也需要在同步方法或同步块中调用,即在调用前,线程也必须获得该对象的对象级别锁,如果调用 notify 时没有持有适当的锁,也会抛出 IllegalMonitorStateException

该方法会从 WAITTING 状态的线程中挑选一个进行通知,使得调用 wait 方法的线程从等待队列移入到同步队列中,等待机会再一次获取到锁,从而使得调用 wait 方法的线程能够从 wait 方法处退出。

调用 notify 后,当前线程不会马上释放该对象锁,要等到程序退出同步块后,当前线程才会释放锁。

03、notifyAll

该方法与 notify 方法的工作方式相同,重要的一点差异是:notifyAll 会使所有原来在该对象上 wait 线程统统退出 WAITTING 状态,使得他们全部从等待队列中移入到同步队列中去,等待下一次获取到对象监视器锁的机会。

不过,wait/notify 消息通知存在这样一些问题。

notify 通知的遗漏,即 threadA 还没开始 wait,threadB 已经 notify 了,这样,threadB 通知是没有任何响应的,当 threadB 退出 synchronized 代码块后,threadA 再开始 wait,便会一直阻塞等待,直到被别的线程打断。

下面的示例代码就模拟出了 notify 早期通知带来的问题:

示例中开启了两个线程,一个是 WaitThread,另一个是 NotifyThread。NotifyThread 会先启动调用 notify 方法。然后 WaitThread 线程才启动,调用 wait 方法,但由于通知过了,wait 方法就无法再获取到相应的通知,因此 WaitThread 会一直在 wait 方法处阻塞,这种现象就是通知过早的现象。

针对这种问题的解决方法是,添加一个状态标志,让 waitThread 调用 wait 方法前先判断状态是否已经改变了,如果通知已经发出,WaitThread 就不再去 wait。对上面的代码进行优化如下:

这段代码只增加了一个状态,NotifyThread 调用 notify 方法后会对状态进行更新,WaitThread 调用 wait 方法之前会先对状态进行判断。

该示例中,调用 notify 后将状态改变为 false,因此,在 WaitThread 中 while 对 isWait 判断后就不会执行 wait 方法,从而避免了 Notify 过早通知造成遗漏的情况。

总结:在使用线程的等待/通知机制时,一般都要配合一个 boolean 变量值,在 notify 之前改变该 boolean 变量的值,让 wait 返回后能够退出 while 循环,或在通知被遗漏后不会被阻塞在 wait 方法处。

如果线程在等待时接收到了通知,但是之后等待的条件发生了变化,并没有再次对等待条件进行判断,也会导致程序出现错误。

下面用一个例子来说明这种情况。

会报异常:

在这个例子中,一共开启了 3 个线程,Consumer1,Consumer2 以及 Productor。

Consumer1 调用了 wait 方法后,线程处于了 WAITTING 状态,并且将对象锁释放。

此时,Consumer2 获取到对象锁,进入到同步代块中,当执行到 wait 方法时,同样的也会释放对象锁。

然后 productor 获取到对象锁,进入到同步代码块中,向 list 中插入数据,通过 notifyAll 方法通知处于 WAITING 状态的 Consumer1 和 Consumer2 线程。

consumer1 得到对象锁后,从 wait 方法处退出,删除一个元素让 List 为空,方法执行结束,退出同步块,释放掉对象锁。

这个时候 Consumer2 获取到对象锁后,从 wait 方法退出,继续往下执行,这个时候 Consumer2 再执行就会出错,因为 List 已经为空了。

解决方案: 通过上面的分析,可以看出 Consumer2 报错是因为线程从 wait 方法退出之后没有对 wait 条件进行判断,但此时的 wait 条件已经发生了变化。解决办法就是在 wait 退出之后再对条件进行判断。

上面的代码与之前的代码相比,仅仅只是将 wait 外围的 if 语句改为了 while 循环,这样当 list 为空时,线程便会继续等待,而不会继续去执行删除 list 中元素中的代码。

总结:在使用线程的等待/通知机制时,一般都要在 while 循环中调用 wait 方法,因此需要配合一个 boolean 变量,满足 while 循环的条件时进入 while 循环,执行 wait 方法,不满足 while 循环条件时,跳出循环,执行后面的代码。

现象:如果是多消费者和多生产者情况,使用 notify 方法可能会出现“假死”的情况,即所有的线程都处于等待状态,无法被唤醒。

原因分析:假设当前有多个生产者线程调用了 wait 方法阻塞等待,其中一个生产者线程获取到对象锁之后使用 notify 通知处于 WAITTING 状态的线程,如果唤醒的仍然是生产者线程,就会造成所有的生产者线程都处于等待状态。

解决办法:将 notify 方法替换成 notifyAll 方法,如果使用的是 lock 的话,就将 signal 方法替换成 signalAll 方法。

总结:Object 提供的消息通知机制应该遵循如下这些条件:

  1. 永远在 while 循环中对条件进行判断而不是在 if 语句中进行 wait 条件的判断;
  2. 使用 NotifyAll 而不是使用 notify。

基本的使用范式如下:

利用 wait/notifyAll 实现生产者和消费者代码如下:

输出结果:

参照 Object 的 wait 和 notify/notifyAll 方法,Condition 也提供了同样的方法,即 await 方法和 signal/signalAll 方法。这部分知识我们前面在讲 Condition 的时候也讲到过,相信大家都还有印象。

那如果采用 Conditon 的消息通知原理来实现生产者-消费者模型,原理同使用 wait/notifyAll 一样。直接上代码:

输出结果:

在讲 BlockingQueue 的时候,我们就讲过,BlockingQueue 非常适合用来实现生产者-消费者模型。

其原因是 BlockingQueue 提供了可阻塞的插入和移除的方法。当队列容器已满,生产者线程会被阻塞,直到队列未满;当队列容器为空时,消费者线程会被阻塞,直至队列非空时为止。

有了这个队列,生产者就只需要关注生产,而不用管消费者的消费行为,更不用等待消费者线程执行完;消费者也只管消费,不用管生产者是怎么生产的,更不用等着生产者生产。

下面直接上代码:

输出结果:

可以看出,使用 BlockingQueue 来实现生产者-消费者很简洁,这正是 BlockingQueue 的优势所在。

生产者-消费者模式一般用于将生产数据的一方和消费数据的一方分割开来,将生产数据与消费数据的过程解耦开来。

通过将任务的提交和任务的执行解耦开来,提交任务的操作相当于生产者,执行任务的操作相当于消费者。

例如使用 Excutor 构建 Web 服务器,用于处理线程的请求:生产者将任务提交给线程池,线程池创建线程处理任务,如果需要运行的任务数大于线程池的基本线程数,那么就把任务扔到阻塞队列(通过线程池+阻塞队列的方式比只使用一个阻塞队列的效率高很多,因为消费者能够处理就直接处理掉了,不用每个消费者都要先从阻塞队列中取出任务再执行)

双十一的时候,会产生大量的订单,那么不可能同时处理那么多的订单,需要将订单放入一个队列里面,然后由专门的线程处理订单。

这里用户下单就是生产者,处理订单的线程就是消费者;再比如 12306 的抢票功能,先由一个容器存储用户提交的订单,然后再由专门处理订单的线程慢慢处理,这样可以在短时间内支持高并发服务。

比如上传附件并处理,那么这个时候可以将用户上传和处理附件分成两个过程,用一个队列暂时存储用户上传的附件,然后立刻返回用户上传成功,然后有专门的线程处理队列中的附件。

生产者-消费者模式的优点:

  • 解耦:将生产者类和消费者类进行解耦,消除代码之间的依赖性,简化工作负载的管理
  • 复用:通过将生产者类和消费者类独立开来,对生产者类和消费者类进行独立的复用与扩展
  • 调整并发数:由于生产者和消费者的处理速度是不一样的,可以调整并发数,给予慢的一方多的并发数,来提高任务的处理速度
  • 异步:对于生产者和消费者来说能够各司其职,生产者只需要关心缓冲区是否还有数据,不需要等待消费者处理完;对于消费者来说,也只需要关注缓冲区的内容,不需要关注生产者,通过异步的方式支持高并发,将一个耗时的流程拆成生产和消费两个阶段,这样生产者因为执行 put 的时间比较短,可以支持高并发
  • 支持分布式:生产者和消费者通过队列进行通讯,所以不需要运行在同一台机器上,在分布式环境中可以通过 redis 的 list 作为队列,而消费者只需要轮询队列中是否有数据。同时还能支持集群的伸缩性,当某台机器宕掉的时候,不会导致整个集群宕掉

本文主要讲解了线程的等待/通知机制,包括 wait/notify/notifyAll 方法的使用,以及使用 wait/notifyAll 实现生产者-消费者模型的示例代码。

还有 Condition 的 await/signalAll 方法的使用,以及使用 Condition 的 await/signalAll 实现生产者-消费者模型的示例代码。最后还讲解了使用 BlockingQueue 实现生产者-消费者模型的示例代码。

编辑:沉默王二,部分内容来自于CL0610的 GitHub 仓库https://github.com/CL0610/Java-concurrency,部分图片和内容来资源知乎这篇帖子。


GitHub 上标星 10000+ 的开源知识库《二哥的 Java 进阶之路》第二份 PDF 《并发编程小册》终于来了!包括线程的基本概念和使用方法、Java的内存模型、sychronized、volatile、CAS、AQS、ReentrantLock、线程池、并发容器、ThreadLocal、生产者消费者模型等面试和开发必须掌握的内容,共计 15 万余字,200+张手绘图,可以说是通俗易懂、风趣幽默……详情戳:太赞了,二哥的并发编程进阶之路.pdf

加入二哥的编程星球,在星球的第二个置顶帖「知识图谱」里就可以获取 PDF 版本。

二哥的并发编程进阶之路获取方式
二哥的并发编程进阶之路获取方式

版权声明


相关文章:

  • 什么专业就业前景好2026-02-02 19:00:59
  • 数学建模图论模型2026-02-02 19:00:59
  • mtr文件2026-02-02 19:00:59
  • swing技术2026-02-02 19:00:59
  • windows软件管家2026-02-02 19:00:59
  • 神经网络模型cnn2026-02-02 19:00:59
  • ubuntu安装ibus中文输入法2026-02-02 19:00:59
  • memtest参数2026-02-02 19:00:59
  • tftp功能2026-02-02 19:00:59
  • rapid程序实例2026-02-02 19:00:59