Java线程池详解
# Java线程池详解
# 一、Java线程池简介
线程池是为了提升多线程应用的性能和资源利用效率而提出的一种技术方案。它通过池化思想来管理和复用线程,避免了频繁创建和销毁线程所带来的开销。池化思想的核心在于复用已创建的资源(线程),从而提高系统的效率和响应速度。
# 池化思想
池化思想(Pooling)是一种资源管理的技术,通过维护一个预定义的资源池来提高效率和性能。其主要目标是减少资源的创建和销毁开销,同时提升资源的利用率和响应速度。池化思想的应用不仅限于线程池,还包括数据库连接池、对象池等。
池化思想的核心
资源复用: 池化思想通过创建一个资源池,已经创建的资源可以被重复使用,从而减少了每次使用资源时的开销。例如,线程池中的线程在执行完任务后不会被销毁,而是被回收到线程池中,以备下一个任务使用。
预分配资源: 在使用池化技术时,系统通常会预先创建一定数量的资源,并将其维护在池中。这些资源可以随时被请求和使用,从而减少了动态创建资源的时间。
管理资源生命周期: 资源池负责管理资源的生命周期,包括资源的创建、分配、回收和销毁。通过有效的管理,资源池能够确保资源的高效利用,避免资源的泄漏和过度使用。
控制资源数量: 池化技术通过限制资源池中的资源数量,避免了资源的过度创建和消耗。例如,线程池中的线程数量可以设置核心线程数和最大线程数,从而控制系统中线程的总数。
# 池化思想的优点
性能提升: 通过复用资源,减少了资源创建和销毁的开销,从而提高了系统的性能。
资源管理: 池化技术能够控制资源的数量和使用,避免了资源的浪费和过度消耗。
响应速度: 由于资源是预先创建和管理的,系统可以更快地响应资源请求,从而提高了响应速度。
资源限制: 通过限制资源池中的资源数量,可以防止系统资源的耗尽,提高系统的稳定性和可靠性。
# 二、线程池的实现原理分析
# 实现线程池需要考虑哪些问题?
结合上面说的池化思想来看:
①、要预分配资源就需要先创建线程,通过什么方式创建线程比较好?
②、要实现资源复用就要考虑线程在执行完任务后放在哪,是让线程阻塞等待还是继续空转等待?
③、要控制资源数量就要考虑如何制定一套合理高效的线程调度规则,资源池中线程的数量是设计成动态的还是固定的?
④、每次提交新任务,是放入队列等待执行,还是直接使用线程执行,如果线程资源不够了怎么处理?
带着上面几个问题我们一个一个来说。
对于问题①,JDK中的线程池比如:ThreadPoolExecutor
或者 ScheduledThreadPoolExecutor
都是使用JDK自己定义的ThreadFactory
接口的实例来创建线程,通过ThreadFactory
提供统一的接口,能够非常灵活的自定义自己的线程工厂来定制线程属性,方便后期对线程的管理。
比如:
实现一个最简单的线程工厂用来创建线程。
class MyThreadFactory implements ThreadFactory {
private final String name;
private final AtomicInteger threadNum = new AtomicInteger();
// 设置线程名称
public MyThreadFactory(String name){
this.name = name;
}
@Override
public Thread newThread(Runnable r) {
Thread thread = newThread(r);
// 设置线程名称和编号
thread.setName(name + " [#" + threadNum.incrementAndGet() + "]");
return thread;
}
}
对于问题②、实现资源复用,ThreadPoolExecutor
是将线程和任务封装为Worker
对象 , Worker
继承AQS
,并利用 AQS 的功能来实现线程的阻塞和唤醒,每个 Worker 对象维护一个线程,通过调用 Worker.run 方法来执行任务。后面会详细介绍,这里先简单说明一下。
对于问题③、制定一套合理高效的线程调度规则,那么线程池就要能实现灵活的线程配置,在ThreadPoolExecutor
类中线程池资源数量的灵活性主要体现在以下几个方面:
- 核心线程数(Core Pool Size)
- 最大线程数(Maximum Pool Size)
- 线程存活时间(Keep-Alive Time)
对于问题④、每次提交新任务,肯定是放入阻塞队列处理更加方便灵活,要不然ThreadPoolExecutor
可能就得在自己类的内部实现线程的调度管理了,这样很麻烦又冗余。因为JDK本身实现了一些非常有用的阻塞队列,在线程池的实现上正好能派上用场。
如果线程资源不够了怎么处理?
在线程资源不足时,线程池会根据设置的核心线程数、最大线程数和任务队列的状态来决定是否创建新线程或使用拒绝策略处理任务。
# 线程池的简单使用示例
还是拿狗吃骨头举例
一共三只狗,假设每只狗吃1秒骨头,且每只狗互不影响,单线程执行的话需要3秒,多线程并发执行只要1秒多(需要线程调度所以会大于1秒)。
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
public class TestA {
private static final ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
4, // 核心线程数
8, // 最大线程数
0, // 非核心线程的存活时间
TimeUnit.SECONDS, // 存活时间单位
new LinkedBlockingDeque<>(), // 存放任务的 阻塞队列
new MyThreadFactory("DogEatBones"), // 创建线程的 工厂
new ThreadPoolExecutor.AbortPolicy() // 线程池的拒绝策略
);
public static void main(String[] args) throws Exception {
String dog1 = "秀逗";
String dog2 = "四眼";
String dog3 = "大黄";
// 单线程执行
eat(dog1);
eat(dog2);
eat(dog3);
System.out.println("========= 单线程执行完了");
// 使用线程池执行
Future<?> future1 = threadPoolExecutor.submit(() -> eat(dog1));
Future<?> future2 = threadPoolExecutor.submit(() -> eat(dog2));
Future<?> future3 = threadPoolExecutor.submit(() -> eat(dog3));
// 等待每个任务执行完
future1.get();
future2.get();
future3.get();
System.out.println("========= 线程池执行完了");
// 关闭线程池
threadPoolExecutor.shutdown();
}
public static void eat(String name) {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(name + "吃了骨头!");
}
}
class MyThreadFactory implements ThreadFactory {
private final String name;
private final AtomicInteger threadNum = new AtomicInteger();
// 设置线程名称
public MyThreadFactory(String name) {
this.name = name;
}
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
// 设置线程名称和编号
thread.setName(name + " [#" + threadNum.incrementAndGet() + "]");
return thread;
}
}
执行结果:
这个例子能很好的体现出利用程池执行非关联异步任务的效率优势。
当执行多个不相关联的耗时任务时,使用线程池的话可让多个不相关联的任务同时执行,相比于单线程顺序执行通常会有不错的效率提升。
# 线程池原理的简单图示
# 三、Executor详解
# Executor简介
在没有接触线程池之前,我们通过Thread类的start方法来开启一个线程执行任务。
在JDK1.5 引入了Executor
,并通过Executor
及其相关的接口和实现类提供了一种更方便的方式来管理和控制线程的生命周期,使得开发者能够更加专注于业务逻辑而不是底层的线程操作。 并且Java线程池的实现也是基于Executor
及其相关的接口和实现类。
可以把 Executor
理解为一种框架,它的目的就是简化线程的管理和控制。
# Executor框架的继承结构
下图挑了几个常见常用的ExecutorService
的实现
# 总结
# Executor
Executor
:定义了一个执行任务的方法execute(Runnable command)
,这是执行任务的基本接口。
public interface Executor {
void execute(Runnable command);
}
# ExecutorService
ExecutorService
:继承自Executor
,提供了更全面的线程池管理功能,如提交任务、关闭线程池等。 我们使用具体的线程池时,比如ThreadPoolExecutor
和ScheduledThreadPoolExecutor
调用的基本上都是ExecutorService
定义的方法。
public interface ExecutorService extends Executor {
/**
* 启动有序关闭,其中之前提交的任务会被执行,但不再接受新的任务。
* 如果已经关闭,调用此方法没有其他效果。
*
* 这个方法不会等待之前提交的任务完成。可以使用 {@link #awaitTermination} 来等待任务完成。
*
* @throws SecurityException 如果存在安全管理器,并且关闭此 ExecutorService 可能会操作
* 调用者不允许修改的线程,因为它没有持有 {@link
* java.lang.RuntimePermission}{@code ("modifyThread")} 权限,或者安全管理器的 {@code checkAccess} 方法
* 拒绝访问。
*/
void shutdown();
/**
* 尝试停止所有正在执行的任务,停止处理等待中的任务,并返回一个待执行任务的列表。
*
* 这个方法不会等待正在执行的任务终止。可以使用 {@link #awaitTermination} 来等待任务终止。
*
* 对于正在执行的任务,停止操作只是尽力而为,可能会通过 {@link Thread#interrupt} 进行取消,但有些任务可能不会响应中断。
*
* @return 未开始执行的任务列表
* @throws SecurityException 如果存在安全管理器,并且关闭此 ExecutorService 可能会操作
* 调用者不允许修改的线程,因为它没有持有 {@link
* java.lang.RuntimePermission}{@code ("modifyThread")} 权限,或者安全管理器的 {@code checkAccess} 方法
* 拒绝访问。
*/
List<Runnable> shutdownNow();
/**
* 如果此执行器已经关闭,则返回 {@code true}。
*
* @return 如果此执行器已经关闭,则返回 {@code true}
*/
boolean isShutdown();
/**
* 如果所有任务在关闭后都已完成,则返回 {@code true}。
* 注意,{@code isTerminated} 只有在 {@code shutdown} 或 {@code shutdownNow} 被调用后才会为 {@code true}。
*
* @return 如果所有任务在关闭后都已完成,则返回 {@code true}
*/
boolean isTerminated();
/**
* 阻塞直到所有任务在关闭请求后完成执行,或者超时发生,或者当前线程被中断,以先发生者为准。
*
* @param timeout 等待的最大时间
* @param unit 超时时间的单位
* @return 如果此执行器终止,则返回 {@code true};如果超时发生而未终止,则返回 {@code false}
* @throws InterruptedException 如果在等待时被中断
*/
boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;
/**
* 提交一个返回值的任务进行执行,并返回一个表示任务结果的 Future 对象。
*
* 如果需要立即阻塞等待任务,可以使用类似 {@code result = exec.submit(aCallable).get();} 的构造。
*
* @param task 要提交的任务
* @param <T> 任务结果的类型
* @return 一个表示任务结果的 Future 对象
* @throws RejectedExecutionException 如果任务无法被调度执行
* @throws NullPointerException 如果任务为 null
*/
<T> Future<T> submit(Callable<T> task);
/**
* 提交一个 Runnable 任务进行执行,并返回一个表示任务结果的 Future 对象。
* 返回的 Future 对象会在任务成功完成后返回给定的结果。
*
* @param task 要提交的任务
* @param result 完成任务时返回的结果
* @param <T> 结果的类型
* @return 一个表示任务结果的 Future 对象
* @throws RejectedExecutionException 如果任务无法被调度执行
* @throws NullPointerException 如果任务为 null
*/
<T> Future<T> submit(Runnable task, T result);
/**
* 提交一个 Runnable 任务进行执行,并返回一个表示任务的 Future 对象。
* 返回的 Future 对象会在任务成功完成后返回 null。
*
* @param task 要提交的任务
* @return 一个表示任务的 Future 对象
* @throws RejectedExecutionException 如果任务无法被调度执行
* @throws NullPointerException 如果任务为 null
*/
Future<?> submit(Runnable task);
/**
* 执行给定的任务集合,返回一个包含所有任务的 Future 列表。
*
* @param tasks 任务集合
* @param <T> 任务返回值的类型
* @return 任务的 Future 列表,与给定任务集合的顺序相同
* @throws InterruptedException 如果在等待过程中被中断,未完成的任务会被取消
* @throws NullPointerException 如果任务或任务集合中的元素为 null
* @throws RejectedExecutionException 如果任何任务无法被调度执行
*/
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;
/**
* 执行给定的任务集合,返回一个包含所有任务的 Future 列表,直到超时发生或所有任务完成为止。
* 超时后,未完成的任务会被取消。
*
* @param tasks 任务集合
* @param timeout 最大等待时间
* @param unit 超时时间的单位
* @param <T> 任务返回值的类型
* @return 任务的 Future 列表,与给定任务集合的顺序相同。如果操作未超时,所有任务都会完成;如果超时,有些任务可能未完成。
* @throws InterruptedException 如果在等待过程中被中断,未完成的任务会被取消
* @throws NullPointerException 如果任务、任务集合中的元素或单位为 null
* @throws RejectedExecutionException 如果任何任务无法被调度执行
*/
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException;
/**
* 执行给定的任务集合,返回其中一个成功完成的任务的结果。如果没有任务成功完成,返回结果是未定义的。
* 超时或任务完成后,未完成的任务会被取消。
*
* @param tasks 任务集合
* @param <T> 任务返回值的类型
* @return 成功完成的任务返回的结果
* @throws InterruptedException 如果在等待过程中被中断
* @throws NullPointerException 如果任务集合或其中的元素为 null
* @throws IllegalArgumentException 如果任务集合为空
* @throws ExecutionException 如果没有任务成功完成
* @throws RejectedExecutionException 如果任务无法被调度执行
*/
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;
/**
* 执行给定的任务集合,返回其中一个成功完成的任务的结果,如果在给定的超时时间内有任务成功完成。
* 超时后,未完成的任务会被取消。
*
* @param tasks 任务集合
* @param timeout 最大等待时间
* @param unit 超时时间的单位
* @param <T> 任务返回值的类型
* @return 成功完成的任务返回的结果
* @throws InterruptedException 如果在等待过程中被中断
* @throws NullPointerException 如果任务集合、单位或任务集合中的元素为 null
* @throws TimeoutException 如果在超时时间内没有任务成功完成
* @throws ExecutionException 如果没有任务成功完成
* @throws RejectedExecutionException 如果任务无法被调度执行
*/
<T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
- 线程池实现:
Java 的线程池实现基于 Executor 框架中的接口和类。常用的线程池实现包括
ThreadPoolExecutor
、ScheduledThreadPoolExecutor
、ForkJoinPool
。其中ForkJoinPool
计划在后续文章中再详细说明。
# 四、ThreadPoolExecutor 详解
# 线程池的七个参数
ThreadPoolExecutor带七个参数的构造方法:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
- 参数1
int corePoolSize // 线程池核心线程数量
线程池中会维护一个最小的线程数量,即使这些线程处于空闲状态,也不会被销毁,除非设置了 allowCoreThreadTimeOut=true。这里的最小线程数量即是corePoolSize。
private volatile boolean allowCoreThreadTimeOut;
// 控制是否允许核心线程在空闲时被回收
// 默认情况下,核心线程在空闲时不会被销毁,除非调用了此方法并将 value 设置为 true。
// 允许核心线程超时的前提是 keepAliveTime 必须大于0
// 当启用核心线程超时时,通过调用 interruptIdleWorkers() 方法中断空闲线程,从而促使它们按照超时时间被销毁
public void allowCoreThreadTimeOut(boolean value) {
if (value && keepAliveTime <= 0)
throw new IllegalArgumentException("Core threads must have nonzero keep alive times");
if (value != allowCoreThreadTimeOut) {
allowCoreThreadTimeOut = value;
if (value)
interruptIdleWorkers();
}
}
参数2
int maximumPoolSize // 线程池最大线程数量
一个任务被提交到线程池以后,首先会找有没有空闲存活线程,如果有则直接执行,如果没有则会缓存到工作队列中,如果工作队列满了,才会创建一个新线程,然后从工作队列的头部取出一个任务交由新线程来处理,而将刚提交的任务放入工作队列尾部。线程池不会无限制的去创建新线程,它会有一个最大线程数量的限制,这个数量即由maximunPoolSize的数量减去corePoolSize的数量来确定,最多能达到maximunPoolSize即最大线程池线程数量。参数3
long keepAliveTime // 空闲线程存活时间
一个线程如果处于空闲状态,并且当前的线程数量大于corePoolSize,默认情况下在指定时间后,这个空闲线程会被销毁,这里的指定时间由keepAliveTime来设定。(默认情况是指allowCoreThreadTimeOut=false的情况)。参数4
TimeUnit unit // 空闲线程存活时间单位
keepAliveTime参数的时间计量单位参数5
BlockingQueue < Runnable > workQueue // 工作队列
新任务被提交后,会先进入到此工作队列中,任务调度时再从队列中取出任务。
对于阻塞队列的知识可参考 BlockingQueue详解 (opens new window) 这篇文章。
下面简单介绍下jdk中提供的几种常见的工作队列:
①、ArrayBlockingQueue
基于数组的有界阻塞队列,按FIFO排序。新任务进来后,会放到该队列的队尾,有界的数组可以防止资源耗尽问题。当线程池中线程数量达到corePoolSize后,再有新任务进来,则会将任务放入该队列的队尾,等待被调度。如果队列已经是满的,则创建一个新线程,如果线程数量已经达到maxPoolSize,则会执行拒绝策略。
②、LinkedBlockingQuene
基于链表的无界阻塞队列(其实最大容量为Integer.MAX_VALUE),按照FIFO排序。由于该队列的近似无界性,当线程池中线程数量达到corePoolSize后,再有新任务进来,会一直存入该队列,而不会去创建新线程直到maxPoolSize,因此使用该工作队列时,参数maxPoolSize其实是不起作用的,除非JVM内存能装下Integer.MAX_VALUE个任务且不发生OOM。
③、SynchronousQuene
一个不缓存任务的阻塞队列,生产者放入一个任务必须等到消费者取出这个任务。也就是说新任务进来时,不会缓存,而是直接被调度执行该任务,如果没有可用线程,则创建新线程,如果线程数量达到maxPoolSize,则执行拒绝策略。
④、PriorityBlockingQueue
具有优先级的无界阻塞队列,优先级通过参数Comparator实现。
⑤、DelayQueue详解
DelayQueue 是 Java 并发包(java.util.concurrent)中的一个特殊队列,用于在指定的延迟时间之后处理元素。
详细内容可参考 DelayQueue详解 (opens new window)
- 参数6
ThreadFactory threadFactory // 线程工厂
创建一个新线程时使用的工厂,可以用来设定线程名、是否为daemon(守护)线程等等
关于守护线程可参考https://blog.csdn.net/weixin_40304387/article/details/80507340 (opens new window)
- 参数7
RejectedExecutionHandler handler // 拒绝策略
当工作队列中的任务已到达最大限制,并且线程池中的线程数量也达到最大限制,这时如果有新任务提交进来,该如何处理呢。这里的拒绝策略,就是解决这个问题的,jdk中提供了4种拒绝策略:
①、CallerRunsPolicy
该策略下,在调用者线程中直接执行被拒绝任务的run方法。除非线程池已经shutdown,则直接抛弃任务,也就是说,当线程池已经关闭时,不会再将任务交给调用者线程执行。
创建CallerRunsPolicy 实例:
RejectedExecutionHandler callerRunsPolicy = new ThreadPoolExecutor.CallerRunsPolicy ();
public static class CallerRunsPolicy implements RejectedExecutionHandler {
public CallerRunsPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
}
②、AbortPolicy
该策略下,直接丢弃任务,并抛出RejectedExecutionException异常。
创建AbortPolicy实例:
RejectedExecutionHandler abortPolicy = new ThreadPoolExecutor.AbortPolicy();
public static class AbortPolicy implements RejectedExecutionHandler {
public AbortPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}
③、DiscardPolicy
该策略下,直接丢弃任务,什么都不做。
创建DiscardPolicy实例:
RejectedExecutionHandler discardPolicy= new ThreadPoolExecutor.DiscardPolicy();
public static class DiscardPolicy implements RejectedExecutionHandler {
public DiscardPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
}
④、DiscardOldestPolicy
创建DiscardPolicy实例:
RejectedExecutionHandler discardOldestPolicy= new ThreadPoolExecutor.DiscardOldestPolicy();
该策略下,抛弃进入队列最早的那个任务,然后尝试把这次拒绝的任务放入队列
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
public DiscardOldestPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
}
# 线程池的任务调度流程图示
图片参考https://tech.meituan.com/2020/04/02/java-pooling-pratice-in-meituan.html
# WorkThread模式和Future模式
下面内容摘自《图解Java多线程设计模式》
WorkThread模式
Worker 的意思是工作的人、劳动者。在 Worker Thread模式中,工人线程(worker thread)会逐个取回工作并进行处理。当所有工作全部完成后,工人线程会等待新的工作到来。Worker Thread模式也被称为 Background Thread(背景线程)模式。另外,如果从“保存多个工人线程的场所”这一点来看,我们也可以称这种模式为Thread Pool(线程池)模式。
Future 模式
假设我们去蛋糕店买蛋糕。下单后,店员一边递给我们提货单,一边说“请您傍晚再来取蛋糕”。到了傍晚,我们就拿着提货单去取蛋糕。这时,店员会先和我们说“您的蛋糕已经做好了”然后将蛋糕递给了我们。
Future 的意思是未来、期货(经济学用语)。假设有一个方法需要花费很长时间才能获取运行结果。那么,与其一直等待结果,不如先拿一张“提货单”。获取提货单并不耗费时间。这里的“提货单”我们就称为Future角色。
获取Future 角色的线程会在稍后使用Future 角色来获取运行结果。这与凭着提货单去取蛋糕非常相似。
如果运行结果已经出来了,那么直接领取即可;如果运行结果还没有出来,那么需要等待结果出来。Future 角色是购买蛋糕时的提货单、预购单、预约券,是“未来”可以转化为实物的凭证。
ExecutorService
提供两种类型的提交任务方法:
①、无返回值:
void execute(Runnable command);
这个方法用于提交一个实现了 Runnable 接口的任务。该任务在执行完后没有返回结果,也不会抛出异常。②、返回Future: 比如
<T> Future<T> submit(Callable<T> task);
或者<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
等。
具体还是看上面对于ExecutorService类里方法的详细注释。
# 线程池的状态和内部结构
线程池的状态
// 使用 AtomicInteger 存储线程池的状态和工作线程数量
// AtomicInteger类型是 32 位二进制表示,则其中高 3 位用来表示线程池状态,后面29 位用来记录线程池线程个数。
// 其中的(高3位)用来表示线程池状态,(低29位)用来表示线程个数
// 默认是RUNNING状态,线程个数为0
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 控制位掩码的位数 为 32-3 = 29
private static final int COUNT_BITS = Integer.SIZE - 3;
// 最大线程数量(最大工作线程数) (低29位全是1的情况) 000 11111111111111111111111111111
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// RUNNING: 线程池正在运行
// 高3位是 111
private static final int RUNNING = -1 << COUNT_BITS;
// SHUTDOWN: 线程池正在关闭,拒绝接受新任务,但处理队列中的任务
// 高3位是 000
private static final int SHUTDOWN = 0 << COUNT_BITS;
// STOP: 线程池正在停止,拒绝新任务和正在处理的任务,并尝试中断工作线程
// 高3位是 001
private static final int STOP = 1 << COUNT_BITS;
// TIDYING: 线程池已完全停止,所有任务都已完成,正在清理资源
// 高3位是 010
private static final int TIDYING = 2 << COUNT_BITS;
// TERMINATED: 线程池完全终止,所有资源都已清理
// 高3位是 011
private static final int TERMINATED = 3 << COUNT_BITS;
// 获取当前状态的高位部分(高3位), 也就是线程池的状态
private static int runStateOf(int c) {
return c & ~CAPACITY;
}
// 获取当前线程池中工作的线程数量 (低29位)
private static int workerCountOf(int c) {
return c & CAPACITY;
}
// 将线程池状态(runState)和工作线程数量(workerCount)打包成一个整数
// 便于在 ThreadPoolExecutor 中以原子操作的方式存储和更新这些信息
private static int ctlOf(int rs, int wc) {
return rs | wc;
}
内部结构
// 阻塞队列,用于存储待执行的任务
private final BlockingQueue<Runnable> workQueue;
// 用于在对 ThreadPoolExecutor 内部状态(如任务队列和工作线程集合)进行修改时提供线程安全(互斥访问)
private final ReentrantLock mainLock = new ReentrantLock();
// 用于存储线程池中的工作线程(Worker 对象)
// Worker 是 ThreadPoolExecutor 的内部类,表示线程池中的一个线程实例
private final HashSet<Worker> workers = new HashSet<Worker>();
# 线程池的关闭
当关闭一个线程池的时候,会有一些复杂的情况要考虑,比如有的线程还正在执行某个任务,有的调用者正在向线程池提交任务,并且队列中可能还有未执行的任务。因此,关闭所有的这些任务需要一个过程,这就涉及线程池的完整生命周期管理。
ExecutorService提供了两种关闭线程池的方法:
①、void shutdown();
作用:
shutdown() 方法用于平稳地关闭线程池。它会进行一种渐进的关闭过程,使线程池停止接收新任务,但会继续执行队列中已有的任务和正在执行的任务。
过程:
禁止接收新任务: 一旦调用 shutdown(),线程池将不再接收新的任务。即使调用 submit 或 execute 方法提交新任务,这些新任务也会被拒绝。
继续处理队列任务: 线程池会继续处理任务队列中尚未执行的任务。已经提交的任务将会被逐个处理,直到队列为空。
结束线程: 当队列中的任务执行完毕后,工作线程会终止。线程池将进入 SHUTDOWN 状态,等待所有任务完成后,最终进入 TERMINATED 状态。
②、List<Runnable> shutdownNow();
作用:
shutdownNow()
方法用于立即关闭线程池。它会尽力停止正在执行的任务,取消队列中的所有待执行任务,并返回那些尚未执行的任务列表。
过程:
禁止接收新任务: 与 shutdown()
方法一样,shutdownNow() 也会立即禁止线程池接收新任务。
中断正在执行的任务: 线程池会尝试中断正在执行的任务。这是通过调用 Thread.interrupt()
实现的。请注意,任务需要能够处理中断请求,以便被正确停止。
取消队列任务: 从任务队列中移除所有未执行的任务。未被处理的任务将不会被执行。
返回未执行的任务列表: shutdownNow()
方法返回一个 List<Runnable>
,包含所有从队列中移除但尚未执行的任务。这个列表可以用于进一步处理,例如记录或重试这些任务。
结束线程: 尽管 shutdownNow()
会尝试中断任务和线程,但线程池的工作线程最终会终止。线程池会进入 STOP
状态,并在所有任务完全处理后,最终进入 TERMINATED
状态。
# 线程池的状态转换图
下图参考《Java并发编程之美》
正确关闭线程池示例:
public static void main(String[] args) throws Exception {
String dog1 = "秀逗";
// 使用线程池执行任务
threadPoolExecutor.execute(() -> eat(dog1));
// 关闭线程池
threadPoolExecutor.shutdown();
// 或者 threadPoolExecutor.shutdownNow();
// 一直判端是否关闭
// while (!threadPoolExecutor.isTerminated()) {
// }
// 或者使用awaitTermination 隔短时间判断一次
while (!threadPoolExecutor.awaitTermination(1, TimeUnit.SECONDS)) {
}
System.out.println("线程池已关闭");
}
# 工作线程Worker
Worker
是 ThreadPoolExecutor
的内部类,表示线程池中的一个线程实例。
Worker
继承 AQS 实现了不可重入的互斥锁用于控制线程的状态和同步。
// `Worker` 类表示线程池中的一个工作线程
private final class Worker
extends AbstractQueuedSynchronizer // 继承自 AQS,用于控制线程的状态和同步
implements Runnable { // 实现 Runnable 接口,以便能够作为线程执行任务
// 当前工作线程实例
final Thread thread;
// 初始任务,在工作线程启动时执行
Runnable firstTask;
// 已完成的任务数量
volatile long completedTasks;
// 构造函数
Worker(Runnable firstTask) {
// 设置 AQS 的状态为 -1,以禁止中断,直到 `runWorker` 方法执行
setState(-1);
// 初始化 `firstTask`,这是构造 `Worker` 时传入的初始任务
this.firstTask = firstTask;
// 通过线程工厂创建一个新的线程,并将当前 `Worker` 作为任务传递给线程
this.thread = getThreadFactory().newThread(this);
}
// `run` 方法是 `Runnable` 接口中的方法,实现了线程的实际执行逻辑
@Override
public void run() {
// 执行 `runWorker` 方法,处理任务的实际逻辑
runWorker(this);
}
}
# execute
方法详解
ThreadPoolExecutor
类的execute
方法:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException(); // 检查任务是否为null,如果是,抛出空指针异常
int c = ctl.get(); // 获取当前状态
// 如果当前工作线程数小于核心线程数
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true)) // 尝试添加核心线程
return; // 如果成功添加,直接返回
c = ctl.get(); // 重新获取状态
}
// 如果线程池处于运行状态并且队列可以容纳任务
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get(); // 再次检查状态
if (!isRunning(recheck) && remove(command)) // 如果线程池不再运行且任务被移除
reject(command); // 拒绝任务
else if (workerCountOf(recheck) == 0) // 如果没有线程在工作
addWorker(null, false); // 添加非核心线程
} else if (!addWorker(command, false)) { // 如果无法添加线程(非核心线程)
reject(command); // 拒绝任务
}
}
总结:
execute 方法接收一个 Runnable 任务并开始处理它。
首先检查任务是否为 null,然后检查当前工作线程数是否少于核心线程数。
如果是,尝试添加一个核心线程。
如果线程池处于运行状态并且队列能够接受任务,它将任务添加到队列中。
如果线程池状态变化或没有工作线程,它将尝试添加额外的线程。
如果上述步骤失败,它将尝试添加一个非核心线程。
如果仍然无法添加线程,它将拒绝任务。
ThreadPoolExecutor
类的addWorker
方法:
// core如果是true 则添加核心线程 如果是false 则添加 非核心线程
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get(); // 获取当前状态
int rs = runStateOf(c); // 获取线程池运行状态
// 如果线程池状态是SHUTDOWN且任务不为空,返回false
if (rs >= SHUTDOWN && !(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c); // 获取当前工作线程数
// 如果工作线程数超出最大限制,返回false
if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry; // 尝试增加工作线程数,如果成功则跳出重试
c = ctl.get(); // 重新获取状态
if (runStateOf(c) != rs)
continue retry; // 如果状态发生变化,重新尝试
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask); // 创建新的工作线程
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int rs = runStateOf(ctl.get());
// 检查线程池状态,并确保线程未被启动
if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive())
throw new IllegalThreadStateException(); // 如果线程已经存活,抛出异常
workers.add(w); // 将工作线程添加到线程池中
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s; // 更新最大线程池大小
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start(); // 启动线程
workerStarted = true;
}
}
} finally {
if (!workerStarted)
addWorkerFailed(w); // 如果线程未成功启动,处理失败情况
}
return workerStarted;
}
总结:
addWorker 方法尝试添加一个新的工作线程。
它首先检查线程池的状态和工作线程数,然后尝试增加工作线程数。
如果线程成功增加,它创建一个新的 Worker 对象并启动线程。
如果线程未成功启动,它会调用 addWorkerFailed 方法处理失败情况。
ThreadPoolExecutor
类的addWorkerFailed
方法:
private void addWorkerFailed(Worker w) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (w != null)
workers.remove(w); // 移除未成功添加的工作线程
decrementWorkerCount(); // 减少工作线程计数
tryTerminate(); // 尝试终止线程池(如果需要)
} finally {
mainLock.unlock();
}
}
# Worker
的run
方法
上分析addWorker
方法,如果线程成功增加,会创建一个新的 Worker 对象并启动线程。
就会调用 Worker
的run
方法。
public void run() {
// Worker 的 run 方法被调用时,实际执行 runWorker 方法,传入当前 Worker 实例。
runWorker(this);
}
总结:
Worker
的run
方法调用 runWorker(this) 来执行 Worker 的任务处理逻辑。
runWorker
方法,负责执行任务并处理工作线程的生命周期
final void runWorker(Worker w) {
// 获取当前线程(即 Worker 线程)
Thread wt = Thread.currentThread();
// 从 Worker 中获取第一个任务,并将 firstTask 置为 null
Runnable task = w.firstTask;
w.firstTask = null;
// 解锁 Worker,允许中断操作
w.unlock();
boolean completedAbruptly = true; // 标记线程是否异常结束
try {
// 主循环:只要有任务可执行,就执行
while (task != null || (task = getTask()) != null) {
w.lock(); // 锁定 Worker,防止并发修改
// 如果线程池的状态至少是 STOP 状态,且线程没有中断,则中断线程
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
// 在执行任务之前调用 beforeExecute 方法
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 执行任务
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
// 在任务执行后调用 afterExecute 方法
afterExecute(task, thrown);
}
} finally {
// 任务执行完成后清空 task,并解锁 Worker
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false; // 如果正常完成,标记为 false
} finally {
// 处理 Worker 的退出逻辑
processWorkerExit(w, completedAbruptly);
}
}
总结:
runWorker
方法负责 Worker 线程的任务执行。
处理任务队列中的任务。
管理任务的执行前后钩子方法 (beforeExecute 和 afterExecute)。
处理线程中断和异常。
完成任务后更新任务计数,并处理 Worker 退出逻辑。
private Runnable getTask() {
boolean timedOut = false; // 上一次 poll() 是否超时
for (;;) {
int c = ctl.get(); // 获取线程池的控制状态
int rs = runStateOf(c); // 获取线程池的运行状态
// 如果线程池状态是 SHUTDOWN 或更高状态,且队列为空,则减少 Worker 数量,并返回 null
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c); // 获取当前工作线程的数量
// 是否需要考虑核心线程超时
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// 如果工作线程数超出最大线程数或已超时,且工作队列为空
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
// 尝试减少工作线程数
if (compareAndDecrementWorkerCount(c))
return null;
continue; // 继续循环,尝试重新获取任务
}
try {
// 尝试从工作队列中获取任务
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : // 超时情况下尝试获取任务
workQueue.take(); // 不超时情况下尝试获取任务
if (r != null)
return r; // 如果成功获取到任务,返回任务
timedOut = true; // 如果没有获取到任务且 timed 为 true,标记超时
} catch (InterruptedException retry) {
timedOut = false; // 如果获取任务过程中被中断,标记未超时
}
}
}
总结:
getTask 方法负责从工作队列中获取任务,它执行以下操作:
①、检查线程池状态:
如果线程池的状态是 SHUTDOWN 或更高状态,并且队列为空,则减少 Worker 数量并返回 null。这意味着线程池正在关闭或已经关闭,无法再获取任务。
②、判断是否需要考虑核心线程超时:
如果线程池允许核心线程超时或当前 Worker 数量超过核心线程池大小,则 timed 标记为 true。
③、检查是否需要减少 Worker 数量:
如果当前工作线程数超过最大线程数,或者超时并且工作队列为空,且 Worker 数量大于 1,则尝试减少 Worker 数量。如果成功减少,则返回 null。
④、获取任务:
如果未超时,则调用 workQueue.take() 阻塞地等待任务。如果超时,则调用 workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)
尝试获取任务。如果获取到任务,则返回任务;否则标记为超时并继续循环尝试。
⑤、处理中断情况:
如果在获取任务过程中被中断,标记为未超时,并继续循环尝试获取任务。
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) // 如果线程异常结束,则 workerCount 没有被调整
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 更新 completedTaskCount 和移除 Worker
completedTaskCount += w.completedTasks;
workers.remove(w);
} finally {
mainLock.unlock();
}
// 尝试终止线程池,如果需要的话
tryTerminate();
int c = ctl.get();
if (runStateLessThan(c, STOP)) {
// 如果线程池状态低于 STOP 状态,检查是否需要添加新的 Worker
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && !workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // 如果当前线程数大于或等于最小线程数,不需要添加新的 Worker
}
// 添加新的 Worker
addWorker(null, false);
}
}
总结:
处理 Worker 线程的退出逻辑。
更新任务计数和 Worker 列表。
根据线程池的状态和条件决定是否需要添加新的 Worker。
# submit
方法详解
submit
方法是在AbstractExecutorService
类中实现的 ,下面三个submit方法就不赘述了。
详细的方法注释都在上面ExecutorService
接口中。
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
可以看到,其最终还是调用execute方法来执行任务。
具体的步骤如下:
检查任务是否为空: 如果 task
为 null
,抛出 NullPointerException
。
创建 RunnableFuture
对象: 调用 newTaskFor
方法,将 task
封装为 RunnableFuture
对象。这个方法通常会根据具体的实现生成一个 RunnableFuture
实例。
执行任务: 调用 execute
方法来提交 RunnableFuture
对象到线程池执行。
返回 Future
对象: 返回封装了任务的 RunnableFuture
对象,允许调用者对任务进行管理(如取消、等待任务完成等)。
newTaskFor方法
// 用于创建一个 FutureTask 实例,将 Runnable 任务和一个预定义的结果 value 关联在一起
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
}
// 用于创建一个 FutureTask 实例,将 Callable 任务封装在一起
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}
通过submit
和newTaskFor
方法,ExecutorService
可以灵活地将不同类型的任务封装为 FutureTask
对象,然后提交给线程池执行,从而提供任务执行的异步处理能力和结果管理功能。
# 适配器RunnableFuture
RunnableFuture
是一个接口,继承自 Runnable
和 Future
接口,旨在同时兼容这两种接口。它的主要作用是允许通过 Runnable
任务接口来处理结果,这种设计使得任务可以同时被线程池执行和结果获取。
// 这个接口结合了 Runnable 和 Future 的功能
public interface RunnableFuture<V> extends Runnable, Future<V> {
void run();
}
实现类 FutureTask
这里仅简单解释下(FutureTask的详细分析计划后续再写一篇博客)
FutureTask 是 RunnableFuture 的一个常用实现,它可以封装 Runnable 或 Callable 任务,提供执行和结果获取的功能。
public class FutureTask<V> implements RunnableFuture<V> {}
# 五、ScheduledThreadPoolExecutor详解
ScheduledThreadPoolExecutor
是 Java 并发框架中用于执行定时任务和周期任务的一个类。它扩展了 ThreadPoolExecutor
,并实现了 ScheduledExecutorService
接口,提供了在给定的延迟后执行任务和周期性执行任务的能力。
# 类的继承体系
public class ScheduledThreadPoolExecutor
extends ThreadPoolExecutor
implements ScheduledExecutorService {}
# 三种特殊执行方式
①、schedule方法
/**
* 计划一个一次性任务,在指定的延迟后执行。
* 任务将在指定的延迟时间后执行一次,适用于需要在延迟后执行的单次任务。
*
* @param command 要执行的任务,必须是 Runnable 类型
* @param delay 从现在起的延迟时间,任务将在此时间后执行
* @param unit 延迟时间的单位,例如 TimeUnit.SECONDS
* @return 一个 ScheduledFuture 对象,代表已计划的任务
* @throws NullPointerException 如果 command 或 unit 为 null
*/
public ScheduledFuture<?> schedule(Runnable command,
long delay,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException(); // 确保 command 和 unit 不为 null
// 创建一个 ScheduledFutureTask 对象,设置指定的延迟时间
RunnableScheduledFuture<?> t = decorateTask(command,
new ScheduledFutureTask<Void>(command, null,
triggerTime(delay, unit)));
// 延迟执行任务
delayedExecute(t);
return t; // 返回代表已计划任务的 ScheduledFuture 对象
}
/**
* 计划一个一次性任务,在指定的延迟后执行。
* 任务将在指定的延迟时间后执行一次,适用于需要在延迟后执行的单次任务,并能返回计算结果。
*
* @param callable 要执行的任务,必须是 Callable 类型
* @param delay 从现在起的延迟时间,任务将在此时间后执行
* @param unit 延迟时间的单位,例如 TimeUnit.SECONDS
* @return 一个 ScheduledFuture 对象,代表已计划的任务,并且可以获取任务的结果
* @throws NullPointerException 如果 callable 或 unit 为 null
*/
public <V> ScheduledFuture<V> schedule(Callable<V> callable,
long delay,
TimeUnit unit) {
if (callable == null || unit == null)
throw new NullPointerException(); // 确保 callable 和 unit 不为 null
// 创建一个 ScheduledFutureTask 对象,设置指定的延迟时间
RunnableScheduledFuture<V> t = decorateTask(callable,
new ScheduledFutureTask<V>(callable,
triggerTime(delay, unit)));
// 延迟执行任务
delayedExecute(t);
return t; // 返回代表已计划任务的 ScheduledFuture 对象
}
②、scheduleAtFixedRate方法
/**
* 计划一个任务按固定频率执行。
* 任务会在初始延迟之后开始执行,然后按照固定的周期时间间隔重复执行,周期时间间隔不受任务执行时间的影响。
* 适用于需要定期执行的任务,例如每隔固定时间更新状态。 (任务的执行时长必须小于周期时长)
*
* @param command 要执行的任务,必须是 Runnable 类型
* @param initialDelay 第一次执行前的延迟时间
* @param period 后续执行之间的周期时间
* @param unit initialDelay 和 period 参数的时间单位,例如 TimeUnit.SECONDS
* @return 一个 ScheduledFuture 对象,代表已计划的任务
* @throws NullPointerException 如果 command 或 unit 为 null
* @throws IllegalArgumentException 如果 period 小于或等于零
*/
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException(); // 确保 command 和 unit 不为 null
if (period <= 0)
throw new IllegalArgumentException(); // 确保 period 大于零
// 创建一个 ScheduledFutureTask 对象,设置初始延迟和固定周期
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(period));
// 包装任务并设置外部任务引用
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
// 延迟执行任务
delayedExecute(t);
return t; // 返回代表已计划任务的 ScheduledFuture 对象
}
③、scheduleWithFixedDelay方法
/**
* 计划一个任务按固定延迟执行。
* 任务会在初始延迟之后开始执行,然后在每次任务结束后的固定延迟后再次执行。延迟时间从任务结束时开始计算,因此任务执行时间长度会影响下次执行的时间。
* 适用于需要任务完成后再等待固定时间再执行的场景,例如每次处理完成后等待一段时间。
*
* @param command 要执行的任务,必须是 Runnable 类型
* @param initialDelay 第一次执行前的延迟时间
* @param delay 后续执行之间的延迟时间
* @param unit initialDelay 和 delay 参数的时间单位,例如 TimeUnit.SECONDS
* @return 一个 ScheduledFuture 对象,代表已计划的任务
* @throws NullPointerException 如果 command 或 unit 为 null
* @throws IllegalArgumentException 如果 delay 小于或等于零
*/
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException(); // 确保 command 和 unit 不为 null
if (delay <= 0)
throw new IllegalArgumentException(); // 确保 delay 大于零
// 创建一个 ScheduledFutureTask 对象,设置初始延迟和固定延迟(负值表示固定延迟)
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(-delay));
// 包装任务并设置外部任务引用
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
// 延迟执行任务
delayedExecute(t);
return t; // 返回代表已计划任务的 ScheduledFuture 对象
}
# 使用示例
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class TestA {
private static final ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1);
public static void main(String[] args) {
String dog1 = "秀逗";
String dog2 = "四眼";
String dog3 = "大黄";
// 5秒后 秀逗吃骨头
executor.schedule(() -> System.out.println(dog1 + "吃了骨头!"),
5,
TimeUnit.SECONDS);
// 四眼吃骨头第一次不延时 以后固定周期每两秒吃一次骨头
// 如果吃骨头时间小于两秒 则下一次执行需要等待到2秒才能执行
// 如果吃骨头时间超过了两秒 则下一次执行需要等待上次执行完毕后执行
executor.scheduleAtFixedRate(() -> {
System.out.println(dog2 + "吃了骨头!");
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
BlockingQueue<Runnable> queue = executor.getQueue();
System.out.println(queue.size());
},
0,
2,
TimeUnit.SECONDS);
// 大黄吃骨头第一次不延时 以后每次等上次吃骨头结束后再等2秒执行
// 和具体的任务时间有关
executor.scheduleWithFixedDelay(() -> System.out.println(dog3 + "吃了骨头!"),
0,
2,
TimeUnit.SECONDS);
}
}
# 延时执行原理分析
在Java中延时执行的工具有好几种:
①、延时队列DelayQueue,详细可参考DelayQueue详解 (opens new window) 。
②、java.util.Timer类。
③、还有本次说的ScheduledThreadPoolExecutor。
对比:
工具 | 使用场景 | 实现方式 |
---|---|---|
DelayQueue | 适用于需要延迟执行的单个任务,例如缓存过期 | 实现了 BlockingQueue 接口,通过 Delayed 元素实现延迟队列 |
java.util.Timer | 适用于简单的定时任务和周期性任务 | 使用单线程调度任务,通过 TimerTask 实现定时执行 |
ScheduledThreadPoolExecutor | 适用于高并发定时和周期性任务 | 扩展自 ThreadPoolExecutor ,使用 DelayedWorkQueue 实现任务调度 |
# DelayedWorkQueue
ScheduledThreadPoolExecutor
内部实现了一个DelayedWorkQueue
延时队列。
static class DelayedWorkQueue extends AbstractQueue<Runnable>
implements BlockingQueue<Runnable> {}
ScheduledThreadPoolExecutor
的一个构造方法:
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
总结:
核心线程数:线程池的核心线程数由 corePoolSize 参数指定。
最大线程数:没有最大线程数限制(设置为 Integer.MAX_VALUE)。
线程空闲时间:在这个构造方法中无实际意义(设置为 0)。表示线程不会空闲等待。
时间单位:在这里设置为纳秒(NANOSECONDS),但不对空闲时间起作用。
任务队列:使用 DelayedWorkQueue,支持任务的延迟和定时调度。直接初始化一个DelayedWorkQueue实例。
再看下DelayedWorkQueue存的是什么?
private static final int INITIAL_CAPACITY = 16;
private RunnableScheduledFuture<?>[] queue = new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
可以看到DelayedWorkQueue底层是RunnableScheduledFuture数组。
可以看到RunnableScheduledFuture继承了Delayed接口, DelayQueue保存的对象也是要实现Delayed接口的。
都是要利用getDelay方法获取延迟时间。
public long getDelay(TimeUnit unit) {
return unit.convert(time - now(), NANOSECONDS);
}
# 延迟执行: schedule
方法
public ScheduledFuture<?> schedule(Runnable command,
long delay,
TimeUnit unit) {
// 检查参数是否为 null。如果是,则抛出 NullPointerException 异常。
if (command == null || unit == null)
throw new NullPointerException();
// 创建一个 ScheduledFutureTask 对象。ScheduledFutureTask 是实现了 ScheduledFuture 的任务。
// decorateTask 方法用于对任务进行装饰(例如,可能添加一些额外功能),
// 这里的 ScheduledFutureTask 以 command 为任务,使用 triggerTime 方法计算触发时间。
RunnableScheduledFuture<?> t = decorateTask(
command, // 任务命令
new ScheduledFutureTask<Void>(command, null, // 创建 ScheduledFutureTask 实例,传入任务命令和触发时间
triggerTime(delay, unit))
);
// 调用 delayedExecute 方法,将任务提交到延迟队列中,以便在指定的延迟时间后执行。
delayedExecute(t);
// 返回创建的 ScheduledFutureTask 实例,允许调用者通过它来获取任务的执行结果和状态。
return t;
}
private void delayedExecute(RunnableScheduledFuture<?> task) {
// 如果线程池已经关闭,则拒绝任务的添加。
if (isShutdown())
reject(task);
else {
// 将任务添加到任务队列中。
super.getQueue().add(task);
// 检查线程池是否已经关闭,并且当前线程池状态不允许运行此任务。
// 如果任务是周期性的,且在关闭后任务仍然在队列中,则取消任务。
if (isShutdown() &&
!canRunInCurrentRunState(task.isPeriodic()) &&
remove(task))
task.cancel(false);
else
// 确保线程池有足够的核心线程来处理任务。
ensurePrestart();
}
}
// ThreadPoolExecutor里的 ensurePrestart方法
void ensurePrestart() {
// 获取当前工作线程数量。
int wc = workerCountOf(ctl.get());
// 如果工作线程数少于核心线程数,则添加新的工作线程。
if (wc < corePoolSize)
addWorker(null, true);
// 如果工作线程数为 0,则添加一个新的工作线程。
else if (wc == 0)
addWorker(null, false);
}
可以看到schedule方法就是把提交的 Runnable 任务加上delay时间,转换成ScheduledFutureTask对象,放入DelayedWorkerQueue中。后面任务的执行过程还是复用的ThreadPoolExecutor,延迟的控制是在DelayedWorkerQueue内部完成的。
# 周期执行: withFixedDelay
和atFixedRate
周期执行整体步骤和延迟执行步骤基本一致,不同点在于ScheduledFutureTask
的内部实现。
scheduleWithFixedDelay
方法内创建ScheduledFutureTask
对象时传入的延迟时间是负值unit.toNanos(-delay)
。
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(-delay));
scheduleAtFixedRate
方法内创建ScheduledFutureTask
对象时传入的周期时间是正值unit.toNanos(period))
。
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(period));
正是正值和负值的区别觉得这两个方法的功能区别。
在 ScheduledFutureTask 类的 run 方法和 setNextRunTime 方法中,正值和负值的使用会影响任务的调度逻辑。
public void run() {
boolean periodic = isPeriodic();
// 如果当前状态不允许运行此任务,则取消任务
if (!canRunInCurrentRunState(periodic))
cancel(false);
// 如果任务是一次性的,则执行任务
else if (!periodic)
ScheduledFutureTask.super.run();
// 如果任务是周期性的,则重置任务并重新调度
else if (ScheduledFutureTask.super.runAndReset()) {
setNextRunTime();
reExecutePeriodic(outerTask);
}
}
private void setNextRunTime() {
long p = period;
// scheduleAtFixedRate方法 固定周期
// 周期为正值,下一次开始执行时间 = 上一次开始执行时间 + 周期时间
if (p > 0)
time += p;
else
//scheduleWithFixedDelay 方法
// 周期为负值, 通过 triggerTime(-p) 计算下一次运行时间 = 当前时间 + 延迟时间
time = triggerTime(-p);
}
# 六、Executors工具类
下面主要讲一下Executors提供的几种线程池:
- 1、Executors.newSingleThreadExecutor() 具体实现
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
- 2、ExecutorService threadPool2 = Executors.newFixedThreadPool(int nThreads); 具体实现
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
- 3、Executors.newCachedThreadPool(); 具体实现
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
threadFactory);
}
- 4、Executors.newScheduledThreadPool(int corePoolSize); 具体实现
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
总结:
上面的四种线程池都是Executors工具类提供的。并且均是阿里的代码规范禁止使用的。
阿里的代码规范:
【强制】线程池不允许使用Executors 去创建,而是通过ThreadPoolExecutor的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。
说明: Executors返回的线程池对象的弊端如下:
FixedThreadPool和 SingleThreadPool:
允许的请求队列长度为Integer.MAX_VALUE,可能会堆积大量的请求,从而导致OOM。也可能导致栈溢出。
CachedThreadPool和 ScheduledThreadPool:
允许的创建线程数量为Integer.MAX_VALUE,可能会创建大量的线程,从而导致OOM。也可能导致栈溢出。
# 七、线程池定义策略和监控
(仅供参考 实际情况并非这么简单,下面说的线程数量相关的配置方式仅为参考值,具体需要根据业务场景的负载进行配置)
# maximumPoolSize(最大线程数)
在大部分资料上面都是建议看具体任务类型来配置:
- ①、CPU密集型任务
CPU密集型任务是指那些主要依赖中央处理器(CPU)计算能力的任务。
比如业务中需要对已经查询出的大量数据进行排序,汇总报表等操作。大量数据的金额或者公式计算操作等。
这种情况设置maximumPoolSize等于 运行该程序的物理机CPU核心数量+1 例如服务器CPU为下图配置则 maximumPoolSize = 33+1 = 34。
这种方式可以保持CPU的效率最高,并且额外多出来的一个线程,即使当计算密集型的线程偶尔由于页缺失故障或者其他原因而暂停时,这个“额外”的线程也能确保CPU的时钟周期不会被浪费。
在java代码中 Runtime.getRuntime().availableProcessors() 可返回CPU核心数 注意 该方法有时候并不一定能准确的返回真实的CPU核心数 参考https://blog.csdn.net/zhanghongzheng3213/article/details/83376571 (opens new window)
- ②、IO密集型 IO(Input/Output)密集型是指程序在执行过程中受到输入输出操作限制的类型。这类程序的特点是它们大部分时间都在等待数据的输入或输出,而不是在进行计算。IO密集型任务往往涉及大量的文件读写、网络通信或其他类型的外部数据交换。
对于IO密集型任务,可以配置更多的线程,因为这些任务在等待IO操作完成时会被阻塞,这时阻塞的线程并不会占用CPU资源。这样做的好处是可以充分利用线程池中的线程在等待IO的同时处理其他任务,从而提高系统的吞吐量和效率。
一般可以设置为 CPU核心数量*2。
还有一种根据公式计算的方式:
参考《Java并发编程实战》
这种算出来一般是几倍于核心数的一个值。 但是实际可操作性不大。
总结:
最大线程数的设置:
对于CPU密集型任务一般设置为CPU核心数+1。
对于IO密集型任务可以尝试设置为2*CPU核心数。
上面的设置方式也只是理论值,具体需要考虑真正的任务情况和负债情况来设置更加合适的值。
# 核心线程数 (corePoolSize):
可以设置为较小的值,通常是较少的线程数来处理常见的请求。比如1 ~ 0.8*CPU核心数。
# 线程工厂
比较重要的是设置线程名称,最好是设置和业务相关的名称,如果出问题了方便排查。
# 非核心线程的存活时间和单位
单位一般就用秒TimeUnit.SECONDS
,如果机器的内存比较吃紧,或者线程引用的任务比较占内存,存活时间可以设置为0。
如果机器内存比较充足,存活时间设置为几百秒也可以。
# 工作队列
建议使用有界队列,比如ArrayBlockingQueue
或者LinkedBlockingQueue
(手动传入队列最大容量值)。
# 拒绝策略
一般使用 new ThreadPoolExecutor.AbortPolicy()
或者new ThreadPoolExecutor.CallerRunsPolicy ()
,主要目的就是为了让任务得到处理,或者即使任务得不到处理也要告知调用者没有处理成功。
# 监控线程池
这个属于比较高级的用法了,至少我在生产中没有监控过或者动态调整过线程池的参数。
主要是因为我们生产负载没有太高,按照平时测试情况和经验设置的线程池参数运行起来还算稳定。
如果应用并发较高或者对线程池使用量比较大的应用需要进一步优化线程池使用时,就需要对线程池进行监控或者动态调整线程池参数。 具体可以参考美团技术团队的做法 https://tech.meituan.com/2020/04/02/java-pooling-pratice-in-meituan.html (opens new window)
ThreadPoolExecutor也提供了一些方法来获取线程池的一些状态参数:
/**
* 返回线程池中已提交的任务的近似数量。
*/
public long getTaskCount();
/**
* 返回线程池中已完成任务的近似数量。
*/
public long getCompletedTaskCount();
/**
* 返回当前正在执行任务的线程的近似数量。
*/
public int getActiveCount();
/**
* 返回线程池中的当前线程数量。
*/
public int getPoolSize();
/**
* 返回线程池中曾经同时存在的最大线程数量。
*/
public int getLargestPoolSize();
/**
* 返回线程池允许的最大线程数量。
*/
public int getMaximumPoolSize();
// ...
我们可以利用这些方法来实现一个监控程序。 当我们能想到这一步的时候,一定已经有人把这事都做好了。
比如 https://hippo4j.cn/ (opens new window)
看着还不错,感兴趣的可以研究研究。
当然这种轮子不可能只有一家, 还有个https://dynamictp.cn/ (opens new window) 感谢兴趣也可以去看看。
# 八、实践 手动创建一个生产用线程池
为一个CPU密集型的业务场景设计一个线程池。
# 直接在类的内部创建,将线程池作为类的成员变量
直接创建 private static final ThreadPoolExecutor
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
public class TestA {
// 假设应用运行的服务器是8核
private static final ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
4, // 核心线程数
9, // 最大线程数
0, // 非核心线程的存活时间
TimeUnit.SECONDS, // 存活时间单位
new LinkedBlockingDeque<>(20000), // 存放任务的 阻塞队列 设置成2w
new MyThreadFactory(Executors.defaultThreadFactory(),"自定义线程名称"), // 创建线程的 工厂
new ThreadPoolExecutor.AbortPolicy() // 线程池的拒绝策略
);
public static void main(String[] args) {
}
}
/**
* 可自定义名称的线程池工厂
*/
class MyThreadFactory implements ThreadFactory {
private final AtomicInteger threadNum = new AtomicInteger();
private final ThreadFactory delegate;
private final String name;
public MyThreadFactory(ThreadFactory delegate, String name) {
this.delegate = delegate;
this.name = name;
}
@Override
public Thread newThread(Runnable r) {
Thread t = delegate.newThread(r);
t.setName(name + " [#" + threadNum.incrementAndGet() + "]");
return t;
}
}
# Spring项目中可以把线程池注入容器
将线程池注入到Spring容器中,并通过依赖注入的方式使用它。
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
@Configuration
@EnableAsync
public class ThreadPoolConfig {
@Bean("DefaultExecutorService")
public ExecutorService cpuIntensiveTaskExecutor() {
return new ThreadPoolExecutor(
4, // 核心线程数
9, // 最大线程数
0, // 非核心线程的存活时间
TimeUnit.SECONDS, // 存活时间单位
new LinkedBlockingDeque<>(20000), // 存放任务的 阻塞队列 设置成2w
new MyThreadFactory(Executors.defaultThreadFactory(),"自定义线程名称"), // 创建线程的 工厂
new ThreadPoolExecutor.AbortPolicy() // 线程池的拒绝策略
);
}
}
/**
* 可自定义名称的线程池工厂
*/
class MyThreadFactory implements ThreadFactory {
private final AtomicInteger threadNum = new AtomicInteger();
private final ThreadFactory delegate;
private final String name;
public MyThreadFactory(ThreadFactory delegate, String name) {
this.delegate = delegate;
this.name = name;
}
@Override
public Thread newThread(Runnable r) {
Thread t = delegate.newThread(r);
t.setName(name + " [#" + threadNum.incrementAndGet() + "]");
return t;
}
}
使用起来很方便
不过建议一个线程池还是只用于一种异步任务比较好,这样出问题了好排查。
@Resource(name = "DefaultExecutorService")
private ExecutorService executorService;
Spring也提供了线程池的实现ThreadPoolTaskExecutor
,创建过程更简单。感兴趣的也可以研究研究,这里不再赘述了。
最后简单的线程池使用需求,还是建议直接在类的内部创建private static final ThreadPoolExecutor
,将线程池作为类的成员变量使用就行了。
参考资料:
https://tech.meituan.com/2020/04/02/java-pooling-pratice-in-meituan.html
https://javaguide.cn/java/concurrent/java-thread-pool-best-practices.html
https://pdai.tech/md/java/thread/java-thread-x-juc-executor-ThreadPoolExecutor.html
《Java性能权威指南》
《Java并发编程之美》
《Java并发编程的艺术》
《图解Java多线程设计模式》