多线程(multithreading),是指从软件或者硬件上实现多个线程并发执行的技术。具有多线程能力的计算机因有硬件支持而能够在同一时间执行多于一个线程,进而提升整体处理性能。具有这种能力的系统包括对称多处理机多核心处理器以及芯片级多处理同时多线程处理器。在一个程序中,这些独立运行的程序片段叫作“线程”(Thread),利用它编程的概念就叫作“多线程处理” 。本文简单记录一些关于线程、多线程的问题。

参考文章:

多线程下如何保证事务的一致性

Spring在多线程环境下如何确保事务一致性

详解Spring多线程下如何保证事务的一致性

全网最完整Java学习笔记

基本介绍

并行和并发的区别

并行:多个处理器或多核处理器同时处理多个任务。
并发:多个任务在同一个 CPU 核上,按细分的时间片轮流(交替)执行,从逻辑上来看那些任务是同时执行。

线程和进程的关系

进程:是操作系统分配资源的基本单位,有独立的地址空间(内存空间的一部分,用于存储进程中的代码、数据和堆栈等信息)和内存空间,进程之间不能共享资源,上下文切换慢,并发低,能独立执行(有程序入口、执行序列、出口),更健壮(因为进程崩溃后不会影响其他进程)。

线程:是操作系统调度的基本单位,没有独立的地址空间和内存空间(只有自己的堆栈和局部变量,只能共享所在进程的内存空间),线程之间可以共享进程内的资源,上下文切换快,并发高,不能独立执行(应用程序控制多线程执行,进程通过管理线程优先级间接控制线程执行),不健壮(因为一个线程崩溃会导致整个进程崩溃)。

关系:一个程序运行后至少包括一个进程,一个进程至少有一个线程,一个进程下也可以有多个线程来增加程序的执行速度。

运行时数据区包括本地方法栈、虚拟机栈、方法区、堆、程序计数器。每个线程都有独自的本地方法栈、虚拟机栈、程序计数器。各线程共享进程的方法区和堆。

JVM运行时数据区参考:什么是JVM的内存模型?详细阐述Java中局部变量、常量、类名等信息在JVM中的存储位置_jvm中主要用于存储类的元数据(类型信息(类的描述信息 类的元数据))、静态变量、常-CSDN博客

线程状态(state)

1
2
3
4
5
6
NEW 线程创建但尚未启动
RUNNABLE 就绪或者正在执行中
BLOCKED 阻塞的(被同步锁或者IO锁阻塞),等待获取锁
WAITING 永久等待状态,等待其他线程唤醒(如wait())
TIMED_WAITING 等待指定的时间重新被唤醒的状态,定时等待(如sleep(long))
TERMINATED 线程执行完成

image-20250820125237038

守护线程(Daemon Thread)

守护线程是运行在后台的一种特殊进程。它独立于控制终端并且周期性地执行某种任务或等待处理某些发生的事件。在 Java 中垃圾回收线程就是特殊的守护线程。

  • 作用:为其他线程提供服务(如垃圾回收线程)
  • 特性:当所有非守护线程结束时,守护线程自动终止。
  • 设置:thread.setDaemon(true)必须在start()前调用

中断(Interruption)

  • 作用:通知线程应该终止,但线程可选择忽略。

  • 方法

    • thread.interrupt():中断线程。

    • Thread.interrupted():检查并清除中断状态。

    • thread.isInterrupted():检查中断状态。

多线程

一个程序运行后至少包括一个进程,一个进程至少有一个线程,一个进程下有多个线程并发地处理任务,称为多线程。

多线程的好处:当一个线程进入阻塞或者等待状态时,其他的线程可以获取CPU的执行权,提高了CPU的利用率。

多线程的缺点:

  • 死锁:多个进程或线程相互等待对方释放所持有的资源,从而无法继续执行的情况。若无外力作用,它们都将无法推进下去。死锁用占用CPU、内存等系统资源,导致资源浪费,死锁会导致程序无法正常退出,导致系统性能差。
  • 上下文频繁切换:频繁的上下文切换可能会造成资源的浪费;
  • 串行:如果因为资源的限制,多线程串行执行,可能速度会比单线程更慢。

线程的优先级:java是抢占式调度模型,每一个 Java 线程都有一个优先级,优先级是一个整数,其取值范围是 1 (Thread.MIN_PRIORITY ) - 10 (Thread.MAX_PRIORITY )。默认情况下,每一个线程都会分配一个优先级 NORM_PRIORITY(5)。

注意:优先级高的线程只是获取CPU时间片的几率高,但并不能保证先执行。

ThreadLocal 及使用场景

ThreadLocal 为每个使用该变量的线程提供独立的变量副本,所以每一个线程都可以独立地改变自己的副本,而不会影响其它线程所对应的副本。ThreadLocal 的经典使用场景是数据库连接和 session 管理等。

TransactionSynchronizationManager类内部默认提供了下面六个ThreadLocal属性,分别保存当前线程对应的不同事务资源:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// 保存当前事务关联的资源
// 默认只会在新建事务的时候保存当前获取到的DataSource和当前事务对应Connection的映射关系
// 这里Connection被包装为了ConnectionHolder
private static final ThreadLocal<Map<Object, Object>> resources =
new NamedThreadLocal<>("Transactional resources");
// 事务监听者
// 在事务执行到某个阶段的过程中,会去回调监听者对应的回调接口(典型观察者模式的应用)
// 默认为空集合
private static final ThreadLocal<Set<TransactionSynchronization>> synchronizations =
new NamedThreadLocal<>("Transaction synchronizations");
// 存放当前事务名字
private static final ThreadLocal<String> currentTransactionName =
new NamedThreadLocal<>("Current transaction name");
// 存放当前事务是否是只读事务
private static final ThreadLocal<Boolean> currentTransactionReadOnly =
new NamedThreadLocal<>("Current transaction read-only status");
// 存放当前事务的隔离级别
private static final ThreadLocal<Integer> currentTransactionIsolationLevel =
new NamedThreadLocal<>("Current transaction isolation level");
// 存放当前事务是否处于激活状态
private static final ThreadLocal<Boolean> actualTransactionActive =
new NamedThreadLocal<>("Actual transaction active");

需要多线程的业务场景

场景 实现 优势或示例
高并发Web服务器 处理大量HTTP请求,每个请求独立处理 使用线程池处理请求,避免频繁创建线程 示例:Tomcat、Netty等服务器的线程模型。
批处理任务 批量处理大量数据(如ETL作业) 将数据分片,每个线程处理一部分数据 优势:显著提高处理速度
异步IO操作 文件读写、网络通信等IO密集型操作 使用异步线程执行IO操作,主线程继续处理其他任务 示例:数据库查询、HTTP请求调用
定时任务调度 定期执行任务(如数据备份、统计报表生成) 使用ScheduledExecutorService或Spring的@Scheduled注解 示例:每天凌晨执行数据同步任务
实时数据处理 实时分析数据流(如日志分析、监控数据处理) 使用多线程并行处理数据流 示例:电商平台实时计算商品销量排行
图形界面应用 保持UI响应性的同时执行耗时操作 将耗时操作放在后台线程执行 示例:文件下载进度显示、复杂计算
分布式缓存更新 缓存失效时,异步更新缓存数据 使用后台线程重新加载数据到缓存 优势:避免用户请求等待缓存更新
消息队列消费者 从消息队列(如Kafka、RabbitMQ)消费消息 多线程并行消费,提高吞吐量 示例:订单处理、日志收集
搜索引擎索引构建 构建大规模索引(如Elasticsearch索引) 多线程并行处理文档,加速索引构建 优势:缩短索引构建时间,提高搜索服务可用性
游戏服务器 处理多个玩家的并发操作 每个玩家会话由独立线程处理 示例:多人在线游戏的服务器端逻辑

创建线程方法

简介

创建线程有4种方式:

  • 继承Thread类:继承Thread类,重写run()方法;然后创建线程对象调用start()方法开启线程。start()方法里包括了run()方法,用于开启线程。注意如果直接调用run()方法的话,将是普通方法调用,无法起到开启线程的效果
  • 实现Runnable接口:实现Runnable接口并重写run()方法,将实现类作为构造参数创建Thread对象。推荐,因为Java是单继承,线程类实现接口的同时,还可以继承其他类实现其他接口。
  • 实现Callable:实现Callable接口,重写带返回值的call()方法;将实现类对象作为构造参数创建FutureTask对象;将FutureTask对象作为构造参数创建Thread对象。所以此方法可以获取线程执行完后的返回值,而前两种方式不能。
  • ExecutorService的submit或execute方法:execute和submit都是ExecutorService接口的方法,用于线程池提交任务。所有线程池都直接或间接实现ExecutorService接口。
    • execute:参数只能是Runnable,没有返回值
    • submit:参数可以是Runnable、Callable,返回值是FutureTask

方法1:继承Thread类

创建并启动线程的步骤:

  1. 创建一个继承了 Thread类的线程类,重写的run()方法是线程执行体。
  2. 创建这个类的对象。
  3. 调用线程对象的start()方法来启动该线程(之后Java虚拟机会调用该线程run方法)。

run()和start()区别:

  • run():封装线程执行的代码,直接调用相当于普通方法的调用。
  • start():启动线程,虚拟机调用该线程的**run()**方法。

构造方法:

  • Thread(): 创建一个新的线程对象。
  • Thread(String name): 创建一个新的线程对象并将其名称设置为指定的名称。
  • Thread(Runnable target): 创建一个新的线程对象并将其目标设置为指定的 Runnable 对象。主要用于后面通过Runable接口创建线程。
  • Thread(Runnable target, String name): 创建一个新的线程对象,将其目标设置为指定的 Runnable 对象,并将其名称设置为指定的名称。

常用方法:

  • void start(): 使线程开始执行;Java 虚拟机调用此线程的 run 方法。
  • void run(): 如果此线程是使用独立的 Runnable 运行对象构造的,则调用该 Runnable 对象的 run 方法;否则,此方法不执行任何操作并返回。
  • **void join():**等待该线程执行完成。A线程调用B线程的join()方法,A线程将被阻塞,直到B线程执行完。可以用于线程之间的通信。
  • void join(long millis): 等待该线程终止的时间最长为 millis 毫秒。
  • void join(long millis, int nanos): 等待该线程终止的时间最长为 millis 毫秒 + nanos 纳秒。
  • void interrupt(): 中断该线程。
  • boolean isInterrupted(): 测试当前线程是否已中断。
  • boolean isAlive(): 测试线程是否处于活动状态。
  • static void sleep(long millis): 使当前正在执行的线程休眠(暂停执行)指定的毫秒数。
  • static void sleep(long millis, int nanos): 使当前正在执行的线程休眠(暂停执行)指定的毫秒数加指定的纳秒数。

属性方法:

  • void setName(String name): 改变线程名称,使之与参数 name 相同。
  • String getName(): 返回该线程的名称。
  • void setPriority(int newPriority): 更改该线程的优先级。
  • int getPriority(): 返回该线程的优先级。
  • Thread.State getState(): 返回该线程的状态。
  • void setDaemon(boolean on): 将该线程标记为守护线程或用户线程。
  • boolean isDaemon(): 测试该线程是否为守护线程。用户线程是普通的线程,它们通常是应用程序执行任务的主要线程。守护线程为其他线程提供后台支持。当所有用户线程结束时,JVM 会自动退出,无论守护线程是否仍在运行。

代码示例继承 Thread 重写 run 方法:代码简单,但该类无法集成别的类

1
2
3
4
5
6
7
8
9
10
11
public class MyThread extends Thread {
@Override
public void run() {
System.out.println("线程执行中...");
}

public static void main(String[] args) {
MyThread thread = new MyThread();
thread.start();
}
}

代码时间

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// 1.主线程设置名字并查看
public static void main(String[] args) {
Thread.currentThread().setName("主线程");
System.out.println(Thread.currentThread().getName());
}

// 2.创建并启动线程
// 线程类:打印数字线程类
public class PrintNumberThread extends Thread{
// 打印1-100
@Override
public void run(){
for(int i=0;i<100;i++) {
System.out.println(getName()+":"+i);
}
}
// 构造方法 @param name 线程名
public PrintNumberThread(String name) {
super(name);
}
}
// 创建并启动线程
public class Test {
public static void main(String[] args) {
PrintNumberThread a = new PrintNumberThread ("a"), b = new PrintNumberThread ("b");
a.start();
b.start();
}
}
// 运行结果:
// 两个线程是随机交替打印的,因为它们获取CPU的调度是随机的

方法2:实现 Runnable 接口

步骤:

  1. 定义Runnable接口的实现类,并实现该接口的run()方法,该方法将作为线程执行体。
  2. 创建Runnable实现类的实例,并将其作为参数来创建Thread对象,Thread对象为线程对象。
  3. 调用线程对象的start()方法来启动该线程。

这种办法更好,优点:

  • 避免Java 单继承局限性:Java是单继承,使用这种方法,线程类实现接口的同时,还可以继承其他类、实现其他接口。
  • 逻辑和数据更好分离:通过实现 Runnable 接口的方法创建多线程更加适合同一个资源被多段业务逻辑并行处理的场景。在同一个资源被多个线程逻辑异步、并行处理的场景中,通过实现 Runnable 接口的方式设计多个 target 执行目标类可以更加方便、清晰地将执行逻辑和数据存储分离,更好地体现了面向对象的设计思想。

代码示例实现 Runnable 接口:继承其他类;同一实现该接口的实例可以共享资源。但代码复杂。

1
2
3
4
5
6
7
8
9
10
11
public class MyRunnable implements Runnable {
@Override
public void run() {
System.out.println("线程执行中...");
}

public static void main(String[] args) {
Thread thread = new Thread(new MyRunnable());
thread.start();
}
}

代码实践

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// 打印数字Runnable
public class PrintNumberRunnable implements Runnable{
@Override
public void run(){
for(int i=0;i<100;i++){
System.out.println(Thread.currentThread().getName()+":"+i);
}
}
}
// 测试
public class Test {
public static void main(String[] args) {
// 方法1:使用普通方式实现Runnable接口
PrintNumberRunnable runnable = new PrintNumberRunnable();
Thread a = new Thread(runnable, "a"), b = new Thread(runnable, "b");
// 方法2:使用Lambda表达式实现Runnable接口,无需再创建PrintNumberRunnable类
Thread d = new Thread(() -> {
for (int i = 0; i < 100; i++) {
System.out.println(Thread.currentThread().getName() + ":" + i);
}
},"d");
a.start();
b.start();
d.start();
}
}

方法3:实现 Callable接口

通过实现Callable接口来创建线程的步骤如下

  • 实现Callable接口,重写带返回值的call()方法;
  • 将实现类对象作为构造参数创建FutureTask对象;
  • 将FutureTask对象作为构造参数创建Thread对象。

相比于前两种方法,此方法可以获取线程执行完后的返回值,而前两种方式不能,因为call()方法是有返回值的。

代码示例实现 Callable 接口:可以获得异步任务的返回值

1
2
3
4
5
6
7
8
9
class MyCallable implements Callable {
@Override
public Object call() throws Exception {
return null;
}
}
// 使用
FutureTask task = new FutureTask(new MyCallable());
new Thread(task).start();

代码实践

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import java.util.concurrent.*;
public class MyCallable implements Callable<String> {
@Override
public String call() throws Exception {
return "执行结果";
}

public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService executor = Executors.newSingleThreadExecutor();
Future<String> future = executor.submit(new MyCallable());
System.out.println(future.get());
executor.shutdown();
}
}

方法4:线程池

线程池(Thread Pool)是一种多线程处理方式,用于减少创建和销毁线程的开销,提高系统资源利用率和处理效率。

线程池作用:

  • 管理线程数量:它可以管理线程的数量,可以避免无节制的创建线程,导致超出系统负荷直至崩溃。
  • 让线程复用:它还可以让线程复用,可以大大地减少创建和销毁线程所带来的开销。

线程池的两种创建方法:

  • 执行器工具类Executors;
  • 自定义线程池ThreadPoolExecutor

线程池两种提交任务的方法

execute和submit都是ExecutorService接口的方法,用于线程池提交任务。所有线程池都直接或间接实现ExecutorService接口。

  • execute:参数只能是Runnable,没有返回值
  • submit:参数可以是Runnable、Callable,返回值是FutureTask

代码示例两种创建线程池的方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// 线程池工具类,创建固定大小的线程池:
ExecutorService executorService = Executors.newFixedThreadPool(10);
executorService.execute(new Runnable() {
@Override
public void run() {
System.out.println("当前线程"+Thread.currentThread());
}
});
// 自定义线程池:实现自动化装配,易于管理,循环利用资源。
ThreadPoolExecutor executor = new ThreadPoolExecutor(
5, // 核心线程数
200, // 最大线程数量,控制资源并发
10, // 存活时间
TimeUnit.SECONDS, // 时间单位
new LinkedBlockingDeque<>( 100000), // 任务队列,大小100000个
Executors.defaultThreadFactory(), // 线程的创建工厂
new ThreadPoolExecutor.AbortPolicy()); // 拒绝策略
// 任务1
executor.execute(() -> {
try {
Thread.sleep(3 * 1000);
System.out.println("--helloWorld_001--" + Thread.currentThread().getName());
} catch (InterruptedException e) {
e.printStackTrace();
}
});

参数作用 :

  • corePoolSize:线程池的基本大小,当提交的任务数小于此值时,直接创建新线程执行任务。
  • maximumPoolSize:线程池允许的最大线程数,当任务队列满且线程数小于此值时,会创建新线程
  • keepAliveTime:当线程数大于核心线程数时,多余的空闲线程在终止前等待新任务的最长时间
  • workQueue:用于保存等待执行的任务的阻塞队列,常见类型有:
    • ArrayBlockingQueue:有界队列
    • LinkedBlockingQueue:无界队列(需注意OOM风险)
    • SynchronousQueue:直接提交队列
  • threadFactory:创建线程的工厂,可自定义线程名称、优先级等。
  • handler:当任务队列和线程池都满时的拒绝策略,默认有四种:
    • AbortPolicy:直接抛出异常(默认)
    • CallerRunsPolicy:由调用线程处理任务
    • DiscardPolicy:丢弃最新的任务
    • DiscardOldestPolicy:丢弃最老的任务

知识加油站

线程生命周期

Java线程在运行的生命周期中,在任意给定的时刻,只能处于下列6种状态之一:

  • NEW :初始状态,线程被创建,但是还没有调用start方法。
  • RUNNABLE:可运行状态,等待调度或运行。线程正在JVM中执行,但是有可能在等待操作系统的调度。
  • BLOCKED :阻塞状态,线程正在等待获取监视器锁。
  • WAITING :等待状态,线程正在等待其他线程的通知或中断。线程等待状态不占用 CPU 资源,被唤醒后进入可运行状态(等待调度或运行)。
  • TIMED_WAITING:超时等待状态,在WAITING的基础上增加了超时时间,即超出时间自动返回。Thread.sleep(1000);让线程超时等待1s。
  • TERMINATED:终止状态,线程已经执行完毕。

线程的运行过程:

线程在创建之后默认为NEW(初始状态),在调用start方法之后进入RUNNABLE(可运行状态)。

注意:可运行状态不代表线程正在运行,它有可能正在等待操作系统的调度。

WAITING (等待状态)的线程需要其他线程的通知才能返回到可运行状态,而TIMED_WAITING(超时等待状态)相当于在等待状态的基础上增加了超时限制,除了他线程的唤醒,在超时时间到达时也会返回运行状态。

此外,线程在执行同步方法时,在没有获取到锁的情况下,会进入到BLOCKED(阻塞状态)。线程在执行完run方法之后,会进入到TERMINATED(终止状态)。

image-20250723122520457

等待状态如何被唤醒

Object类:

  • wait()方法让线程进入等待状态
  • notify()唤醒该对象上的随机一个线程
  • notifyAll()唤醒该对象上的所有线程。

这3个方法必须处于synchronized代码块或方法中,否则会抛出IllegalMonitorStateException异常。因为调用这三个方法之前必须拿要到当前锁对象的监视器(Monitor对象),synchronized基于对象头和Monitor对象。

另外,也可以通过Condition类的 await/signal/signalAll方法实现线程的等待和唤醒,从而实现线程的通信,令线程之间协作处理任务。这两个方法依赖于Lock对象。

notify() 和 notifyAll() 的区别

notifyAll()会唤醒所有的线程,notify()之后唤醒一个线程。
notifyAll() 调用后,会将全部线程由等待池移到锁池,然后参与锁的竞争,竞争成功则继续执行,如果不成功则留在锁池等待锁被释放后再次参与竞争。而 notify()只会唤醒一个线程,具体唤醒哪一个线程由虚拟机控制。

sleep() 和 wait() 的区别

  • 类的不同:sleep() 来自 Thread,wait() 来自 Object。
  • 释放锁:sleep() 不释放锁;wait() 释放锁。
  • 用法不同:sleep() 时间到会自动恢复;wait() 可以使用 notify()/notifyAll() 直接唤醒。

线程的通信方式

线程通信:用于多个线程之间协作工作,共同完成某个任务多个线程在并发执行的时候,他们在CPU中是随机切换执行的,想多个线程一起来完成一件任务,就需要线程之间的通信。

image-20250723122705785

线程通信方式:

  • 通过 volatile 关键字:多个线程同时监听一个volatile变量,当这个变量发生变化的时候 ,线程能够感知并执行相应的业务。利用了volatile可见性,即一旦修改变量则立即刷新到共享内存中。
  • 通过Object类的 wait/notify/notifyAll 方法:当我们使用synchronized同步时就会使用Monitor来实现线程通信,这里的Monitor其实就是锁对象,其利用Object类的wait,notify,notifyAll等方法来实现线程通信。Monitor是Java虚拟机实现锁的一种底层机制,用于控制线程对共享资源的访问。
  • 通过Condition类的 await/signal 方法:而使用Lock进行同步时就是使用Condition对象来实现线程通信,Condition对象通过Lock的lock.newCondition()方法创建,使用其await,sign或signAll方法实现线程通信。Condition 是一个与锁 Lock 相关联的条件对象,可以让等待线程在某个条件被满足时被唤醒,从而达到线程协作的目的。
  • 通过Semaphore的acquire/release方法: Semaphore是一个计数信号量,用于控制同时访问某个资源的线程数量。线程可以通过acquire()方法获取许可,release()方法释放许可。
  • 通过Thread类的join()方法:join() 方法等待该线程执行完成。A线程调用B线程的join()方法,A线程将被阻塞,直到B线程执行完。

应用场景:

  • 线程交替打印:在多线程交替打印A/B、或者交替打印1到100时,需要在锁中使用线程通信。如果不使用lock.notify()和lock.wait(),可能导致当前线程释放锁后立刻又拿回锁(因为多线程是CPU随机切换的),从而达不到交替打印的效果

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    //第一个线程,例如打印A
    new Thread(
    () -> {
    while (true) {
    synchronized (lock) {
    // 1.临界值校验:到临界值唤醒其他线程,防止其他线程永远等待;
    // 2.打印判断:如果需要打印,则打印、操作原子类。
    // 如果用的当前行值原子类,则加1;如果用的总行数原子类,则减1
    // 4.线程通信:唤醒、等待。
    // 如果删除下面两行代码,可能导致当前线程释放锁后立刻又拿到锁了,从而达不到交替打印的效果
    lock.notifyAll();
    try-catch{ lock.wait(); }
    }
    }
    }
    ).start();
    //另一个线程,例如打印B...

线程池

作用

为了对多线程进行统一的管理,Java引入了线程池,它通过限制并发线程的数量、将待执行的线程放入队列、销毁空闲线程,来控制资源消耗,使线程更合理地运行,避免系统因为创建过多线程而崩溃。

线程池作用:

  • 管理线程数量:它可管理线程的数量,可避免无节制的销毁、创建线程,导致额外的性能损耗、或线程数超出系统负荷直至崩溃。
  • 提高性能:当有新任务到来时,可直接从线程池中取出一个空闲线程来执行任务,而不需要等待创建新线程,从而减少了响应时间。
  • 让线程复用:它还可以让线程复用,可以大大地减少创建和销毁线程所带来的开销。
  • 合理的拒绝策略:线程池提供了多种拒绝策略,当线程池队列满了时,可采用不同的策略进行处理,如抛出异常、丢弃任务或调用者运行等。

生命周期

通常线程池的生命周期包含5个状态,对应状态值分别是:-1、0、1、2、3,这些状态只能由小到大迁移,不可逆。

  1. RUNNING:运行。线程池处于正常状态,可以接受新的任务,同时会按照预设的策略来处理已有任务的执行。
  2. SHUTDOWN:关闭。线程池处于关闭状态,不再接受新的任务,但是会继续执行已有任务直到执行完成。执行线程池对象的shutdown()时进入该状态。
  3. STOP:停止。线程池处于关闭状态,不再接受新的任务,同时会中断正在执行的任务,清空线程队列。执行shutdownNow()时进入该状态。
  4. TIDYING:整理。所有任务已经执行完毕,线程池进入该状态会开始进行一些结尾工作,比如及时清理线程池的一些资源。
  5. TERMINATED:终止。线程池已经完全停止,所有的状态都已经结束了,线程池处于最终的状态。

创建线程池的方式1:线程池工具类Executors

执行器工具类Executors创建线程池: 底层都是return new ThreadPoolExecutor(…)。一般不使用这种方式,参数配置死了不可控。

  • **newCachedThreadPool()**:缓存线程池(无限大)。一种用来处理大量短时间工作任务的线程池。
    • 核心线程数是0,最大线程数无限大:最大线程数Integer.MAX_VALUE。线程数量可以无限扩大,所有线程都是非核心线程。
    • 空闲线程存活时间60s:keepAliveTime为60S,空闲线程超过60s会被杀死。
    • 同步队列:因为最大线程数无限大,所以也用不到阻塞队列,所以设为没有存储空间的SynchronousQueue同步队列。它会试图缓存线程并重用,当无缓存线程可用时,就会创建新的工作线程。这意味着只要有请求到来,就必须要找到一条工作线程处理他,如果当前没有空闲的线程,那么就会再创建一条新的线程。
  • newFixedThreadPool(int nThreads):固定大小的线程池。重用指定数目(nThreads)的线程,其背后使用的是无界的工作队列,任何时候最多有 nThreads 个工作线程是活动的。
    • 核心线程数:所有线程都是核心线程(通过构造参数指定),最大线程数=核心线程数。
    • 存活时间0s:因为所有线程都是核心线程,所以用不到存活时间,线程都会一直存活。keepAliveTime为0S。
    • 链表阻塞队列:超出的线程会在LinkedBlockingQueue队列中等待空闲线程出现。即若有工作线程退出,将会有新的工作线程被创建,以补足指定的数目 nThreads
  • newScheduledThreadPool(int corePoolSize):定时任务线程池。创建定长线程池, 支持定时及周期性任务执行。可指定核心线程数,最大线程数。和 newSingleThreadScheduledExecutor() 类似,创建的是个 ScheduledExecutorService,区别在于单一工作线程还是多个工作线程。
  • newSingleThreadExecutor():单线程化的线程池。核心线程数与最大线程数都只有一个,操作一个无界的工作队列,不回收。后台从LinkedBlockingQueue队列中获取任务。创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务, 保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。
  • newWorkStealingPool(int parallelism):这是一个经常被人忽略的线程池,Java 8 才加入这个创建方法,其内部会构建ForkJoinPool,利用Work-Stealing算法,并行地处理任务,不保证处理顺序;
  • ThreadPoolExecutor():是最原始的线程池创建,上面的创建方式几乎都是对 ThreadPoolExecutor 的封装。(最核心)
1
2
3
4
5
6
7
ExecutorService executorService = Executors.newFixedThreadPool(10);

//源码
FixedThredPool: new ThreadExcutor(n, n, 0L, ms, new LinkedBlockingQueue<Runable>()
SingleThreadExecutor: new ThreadExcutor(1, 1, 0L, ms, new LinkedBlockingQueue<Runable>())
CachedTheadPool: new ThreadExcutor(0, max_valuem, 60L, s, new SynchronousQueue<Runnable>());
ScheduledThreadPoolExcutor: ScheduledThreadPool, SingleThreadScheduledExecutor.

注意: 一般要搭配计数器CountDownLatch,await(时间)让主线程等待,直到任务线程都执行完(计数器减为零),或者到达超时时间,防止无线等待。

异步计算的结果(Future)

Future 接口以及 Future 接口的实现类 FutureTask 类都可以代表异步计算的结果。
当我们把 Runnable接口Callable 接口 的实现类提交给 ThreadPoolExecutorScheduledThreadPoolExecutor 执行。(调用 submit() 方法时会返回一个 FutureTask 对象)

Executor 框架的使用

  • 主线程首先要创建实现 Runnable 或者 Callable 接口的任务对象。
  • 把创建完成的实现 Runnable/Callable接口的 对象直接交给 ExecutorService 执行: ExecutorService.execute(Runnable command))或者也可以把 Runnable 对象或Callable 对象提交给 ExecutorService 执行(ExecutorService.submit(Runnable task)ExecutorService.submit(Callable <T> task))。
  • 如果执行 ExecutorService.submit(…)ExecutorService 将返回一个实现Future接口的对象(我们刚刚也提到过了执行 execute()方法和 submit()方法的区别,submit()会返回一个 FutureTask 对象)。由于 FutureTask 实现了 Runnable,我们也可以创建 FutureTask,然后直接交给 ExecutorService 执行。
  • 最后,主线程可以执行 FutureTask.get()方法来等待任务执行完成。主线程也可以执行 FutureTask.cancel(boolean mayInterruptIfRunning)来取消此任务的执行。

image-20250820131029731

创建线程池的方式2:自定义线程池(推荐)

线程池执行器ThreadPoolExecutor创建自定义线程池:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor( 
5, // 核心线程数
200, // 最大线程数量,控制资源并发
10, // 存活时间
TimeUnit.SECONDS, // 时间单位
new LinkedBlockingDeque<>( 100000), // 任务队列,大小100000个
Executors.defaultThreadFactory(), // 线程的创建工厂
new ThreadPoolExecutor.AbortPolicy()); // 拒绝策略

CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> { // 开启异步编排,有返回值
return 1;
}, threadPoolExecutor).thenApplyAsync(res -> { // 串行化,接收参数并有返回值
return res+1;
}, threadPoolExecutor);
Integer integer = future.get(); // 获取返回值

七个参数:

  • corePoolSize:核心线程数。创建以后,会一直存活到线程池销毁,空闲时也不销毁。任务队列未达到队列容量时,最大可以同时运行的线程数量。

  • maximumPoolSize:最大线程数量。任务队列中存放的任务达到队列容量的时候,当前可以同时运行的线程数量变为最大线程数。

  • keepAliveTime: 存活时间。释放空闲时间超过“存活时间”的线程,仅留核心线程数量的线程。线程池中的线程数量大于 corePoolSize 的时候,如果这时没有新的任务提交,核心线程外的线程不会立即销毁,而是会等待,直到等待的时间超过了 keepAliveTime才会被回收销毁。

  • TimeUnitunit:时间单位

  • workQueue: 任务队列,储存等待执行任务的队列。新任务来的时候会先判断当前运行的线程数量是否达到核心线程数,如果达到的话,新任务就会被存放在队列中。只要有线程空闲,就会去队列取出新的任务执行。new LinkedBlockingDeque()队列大小默认是Integer的最大值,内存不够,所以建议指定队列大小。

    • SynchronousQueue是一个同步队列,这个阻塞队列没有存储空间,这意味着只要有请求到来,就必须要找到一条工作线程处理他,如果当前没有空闲的线程,那么就会再创建一条新的线程。

    • LinkedBlockingQueue是一个无界队列,可以缓存无限多的任务。由于其无界特性,因此需要合理地处理好任务的生产速率和线程池中线程的数量,以避免内存溢出等异常问题。无限缓存,拒绝策略就能随意了。

    • ArrayBlockingQueue是一个有界(容量固定)队列,只能缓存固定数量的任务。通过固定队列容量,可以避免任务过多导致线程阻塞,保证线程池资源的可控性和稳定性。推荐,有界队列能增加系统的稳定性和预警能力。可以根据需要设大一点,比如几千,新任务丢弃后未来重新入队。

    • PriorityBlockingQueue是一个优先级队列,能够对任务按照优先级进行排序,当任务数量超过队列容量时,会根据元素的Comparable或Comparator排序规则进行丢弃或抛异常。

      1
      new PriorityBlockingQueue<>((o1, o2) -> o1.length() - o2.length());
  • threadFactory:线程的创建工厂。可以使用默认的线程工厂Executors.defaultThreadFactory(),也可以自定义线程工厂(实现ThreadFactory接口)

  • RejectedExecutionHandler handler:拒绝策略。如果任务队列和最大线程数量满了,按照指定的拒绝策略执行任务。

    • Abort(默认):直接抛异常(拒绝执行异常RejectedExecutionException)
    • CallerRuns:直接同步调用线程run()方法,不创建线程了,但如果执行程序已关闭,则会丢弃该任务
    • DiscardOldest:丢弃最老任务,即最早的未处理的任务请求
    • Discard:直接丢弃新任务
    • 实现拒绝执行处理器接口(RejectedExecutionHandler),自定义拒绝策略。

线程池的原理

任务加入时判断的顺序:核心线程数 、阻塞队列、最大线程数、拒绝策略。

线程池执原理:

  1. 新加入任务,判断corePoolSize是否到最大值;如果没到最大值就创建核心线程执行新任务,如果到最大值就判断是否有空闲的核心线程;

  2. 如果有空闲的核心线程,则空闲核心线程执行新任务,如果没空闲的核心线程,则尝试加入FIFO阻塞队列;

  3. 若加入成功,则等待空闲核心线程将队头任务取出并执行,若加入失败(如队列满了),则判断maximumPoolSize是否到最大值;

  4. 如果没到最大值就创建非核心线程执行新任务,如果到了最大值就执行丢弃策略,默认丢弃新任务;

  5. 线程数大于corePoolSize时,空闲线程将在keepAliveTime后回收,直到线程数等于核心线程数。这些核心线程也不会被回收。

实际上线程本身没有核心和非核心的概念,都是靠比较corePoolSize和当前线程数判断一个线程是不是能看作核心线程。

可能某个线程之前被看作是核心线程,等它空闲了,线程池又有corePoolSize个线程在执行任务,这个线程到keepAliveTime后还是会被回收。

image-20250820132547061

如何为线程池设置合适的线程数

下面的参数只是一个预估值,适合初步设置,具体的线程数需要经过压测确定,压榨(更好的利用)CPU的性能。

CPU核心数为N;

核心线程数:

  • CPU密集型:N+1。数量与CPU核数相近是为了不浪费CPU,并防止频繁的上下文切换,加1是为了有线程被阻塞后还能不浪费CPU的算力。
  • I/O密集型:2N,或N/(1-阻塞系数)。I/O密集型任务CPU使用率并不是很高,可以让CPU在等待I/O操作的时去处理别的任务,充分利用CPU,所以数量就比CPU核心数高一倍。
  • 有些公司会考虑阻塞系数,阻塞系数是任务线程被阻塞的比例,一般是0.8~0.9。
  • 实际开发中更适合的公式:N*((线程等待时间+线程计算时间)/线程计算时间)

最大线程数:设成核心线程数的2-4倍。数量主要由CPU和IO的密集性、处理的数据量等因素决定。

需要增加线程的情况:jstack打印线程快照,如果发现线程池中大部分线程都等待获取任务、则说明线程够用。如果大部分线程都处于运行状态,可以继续适当调高线程数量。

jstack:打印指定进程此刻的线程快照。定位线程长时间停顿的原因,例如死锁、等待资源、阻塞。如果有死锁会打印线程的互相占用资源情况。线程快照:该进程内每条线程正在执行的方法堆栈的集合。

多线程实现的四种方式

  1. 继承Thread类

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    public class TestThread {
    public static void main(String[] args) {
    ThreadDemo threadDemo = new ThreadDemo();
    threadDemo.start();
    }
    }

    class ThreadDemo extends Thread {
    @Override
    public void run() {
    boolean flag = false;
    for(int i = 3 ; i < 100 ; i ++) {
    flag = false;
    for(int j = 2; j <= Math.sqrt(i) ; j++) {
    if(i % j == 0) {
    flag = true;
    break;
    }
    }
    if(flag == false) {
    System.out.print(i+" ");
    }
    }
    }
    }
  2. 实现Runnable接口

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    public class TestRunnable {
    public static void main(String[] args) {
    RunnableDemo runnableDemo = new RunnableDemo();
    new Thread(runnableDemo).start();
    }
    }

    class RunnableDemo implements Runnable{
    @Override
    public void run() {
    boolean flag = false;
    for(int i = 3 ; i < 100 ; i ++) {
    flag = false;
    for(int j = 2; j <= Math.sqrt(i) ; j++) {
    if(i % j == 0) {
    flag = true;
    break;
    }
    }
    if(flag == false) {
    System.out.print(i+" ");
    }
    }
    }
    }
  3. 实现Callable接口

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    import java.util.ArrayList;
    import java.util.List;
    import java.util.concurrent.Callable;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.Executors;
    import java.util.concurrent.FutureTask;

    public class TestCallable1 {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
    CallableDemo callableDemo = new CallableDemo();
    FutureTask futureTask = new FutureTask<>(callableDemo);
    new Thread(futureTask).start();
    List<Integer> lists = (List<Integer>)futureTask.get(); //获取返回值
    for (Integer integer : lists) {
    System.out.print(integer + " ");
    }
    }
    }

    class CallableDemo implements Callable<List<Integer>>{
    @Override
    public List<Integer> call() throws Exception {
    boolean flag = false;
    List<Integer> lists = new ArrayList<>();
    for(int i = 3 ; i < 100 ; i ++) {
    flag = false;
    for(int j = 2; j <= Math.sqrt(i) ; j++) {
    if(i % j == 0) {
    flag = true;
    break;
    }
    }
    if(flag == false) {
    lists.add(i);
    }
    }
    return lists;
    }
    }
  4. 线程池:提供了一个线程队列,队列中保存着所有等待状态的线程。避免了创建与销毁额外开销,提高了响应的速度。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    package com.lxj.juc;

    import java.util.ArrayList;
    import java.util.List;
    import java.util.concurrent.Callable;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.Future;

    public class TestThreadPool {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
    ExecutorService executorService = Executors.newFixedThreadPool(5);
    List<Future<List<Integer>>> ints = new ArrayList<>();
    for(int i = 0 ; i < 5; i ++) {
    Future<List<Integer>> future = executorService.submit(new Callable<List<Integer>>() {
    @Override
    public List<Integer> call() throws Exception {
    boolean flag = false;
    System.out.println(Thread.currentThread().getName()+" ");
    List<Integer> lists = new ArrayList<>();
    for(int i = 3 ; i < 100 ; i ++) {
    flag = false;
    for(int j = 2; j <= Math.sqrt(i) ; j++) {
    if(i % j == 0) {
    flag = true;
    break;
    }
    }
    if(flag == false) {
    lists.add(i);
    }
    }
    return lists;
    }
    });
    ints.add(future);
    }

    for (Future<List<Integer>> future : ints) {
    System.out.println(future.get());
    }
    }
    }

    class ThreadPoolDemo {

    }

线程池中 submit() 和 execute() 方法的区别

execute():只能执行 Runnable 类型的任务。
submit():可以执行 Runnable 和 Callable 类型的任务。
Callable 类型的任务可以获取执行的返回值,而 Runnable 执行无返回值。

线程池配置

  1. 手动配置线程池

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    import java.util.concurrent.*;

    public class ThreadPoolConfig {
    public static ExecutorService createThreadPool() {
    return new ThreadPoolExecutor(
    5, // 核心线程数
    10, // 最大线程数
    60, // 空闲线程存活时间
    TimeUnit.SECONDS,
    new LinkedBlockingQueue<>(100), // 任务队列大小
    Executors.defaultThreadFactory(),
    new ThreadPoolExecutor.CallerRunsPolicy() // 拒绝策略
    );
    }
    }
  2. Spring Boot自动配置:在Spring Boot项目中,可通过配置文件设置线程池参数

    1
    2
    3
    4
    5
    6
    7
    8
    9
    spring:
    task:
    execution:
    pool:
    core-size: 5
    max-size: 10
    queue-capacity: 100
    keep-alive: 60s
    thread-name-prefix: my-task-
  3. Spring Cloud中的线程池配置:在微服务架构中,线程池配置需考虑服务间调用的特性

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

    @Configuration
    public class AsyncConfig {
    @Bean(name = "asyncExecutor")
    public ThreadPoolTaskExecutor asyncExecutor() {
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    executor.setCorePoolSize(10);
    executor.setMaxPoolSize(50);
    executor.setQueueCapacity(200);
    executor.setKeepAliveSeconds(300);
    executor.setThreadNamePrefix("cloud-async-");
    executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
    executor.initialize();
    return executor;
    }
    }

    使用@Async注解启用异步方法:
    @Async 是 Spring 框架提供的注解,用于标记一个方法为异步方法。当调用该方法时,Spring 会将其提交到线程池执行,而不是由调用线程同步执行。这在处理耗时操作时非常有用,可以避免阻塞主线程

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    import org.springframework.scheduling.annotation.Async;
    import org.springframework.stereotype.Service;

    @Service
    public class MyService {
    @Async("asyncExecutor")
    public CompletableFuture<String> processAsync() {
    // 异步处理逻辑
    return CompletableFuture.completedFuture("处理完成");
    }
    }

练习:多线程交替打印A/B/C,每个打印3次

核心逻辑:创建线程,循环加锁,执行以下逻辑:

  1. 临界值判断:到达临界值后唤醒其他线程并结束锁;
  2. 打印判断:如果需要打印,则打印、操作原子类(只有打印后才操作原子类,否则就是不满足条件,需要下一步的唤醒等待后,进入下一轮的循环);
  3. 线程通信:唤醒、等待。

坑点:

  • 临界值判断不能放到while里:防止最后一个线程无法唤醒其他线程,从而导致死锁(其他线程没人唤醒了)。
  • 必须用线程通信:防止当前线程释放锁后立刻又拿回锁(因为多线程是CPU随机切换的),从而达不到交替打印的效果
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
// 多线交替打印A/B/C
// Object类的wait()和notifyAll()方案、不抽取方法:
public class Test2 {
private static AtomicInteger index = new AtomicInteger(0); // 当前行值
private static final int count = 9; // 总打印行数
public static void main(String[] args) {
Object lock = new Object();
// 下面创建三个线程可以抽取成一个方法,这里方便理解所以拆开
new Thread(() -> {
// tip:这里条件没必要index.get()<count,因为where不在锁里。
// 如果临界值判断加到这里,会导致最后一个线程无法唤醒其他线程,从而导致死锁(其他线程没人唤醒了)。
while (true) {
synchronized (lock) {
// 1.临界值判断:到达临界值后唤醒其他线程并结束锁;
if(index.get()>=count){
lock.notifyAll();
break;
}
// 2.打印判断:如果需要打印,则打印、操作原子类
if (index.get() % 3 == 0) {
System.out.println("A");
// 只有打印后才操作原子类,否则就是不满足条件,需要下一步的唤醒等待后,进入下一轮的循环
index.getAndIncrement();
}
// 3.线程通信:唤醒、等待
// 3.1 唤醒其他线程:不管能不能整除,结束后都唤醒其他线程
// notifyAll()唤醒该对象上的所有线程
lock.notifyAll();
// 3.2 当前线程等待:Object类的wait()让线程等待,直到其他线程调用notify()或notifyAll()方法唤醒
try {
lock.wait();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
}, "线程1打印A").start();
new Thread(() -> {
while (true) {
synchronized (lock) {
if(index.get()>=count){
lock.notifyAll();
break;
}
if (index.get() % 3 == 1) {
System.out.println("B");
index.getAndIncrement();
}
lock.notifyAll();
try {
lock.wait();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
}, "线程2打印B").start();
new Thread(() -> {
while (true) {
synchronized (lock) {
if(index.get()>=count){
lock.notifyAll();
break;
}
if (index.get() % 3 == 2) {
System.out.println("C");
index.getAndIncrement();
}
lock.notifyAll();
try {
lock.wait();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
}, "线程3打印C").start();
}
}
// 创建和启动线程抽取方法
import java.util.concurrent.atomic.AtomicInteger;
public class Test2 {
private static AtomicInteger index = new AtomicInteger(0); // 当前行值
private static final int count = 9; // 总打印行数

public static void main(String[] args) {
Object lock = new Object();
createAndStartThread("线程1打印A", lock, 0, "A");
createAndStartThread("线程2打印B", lock, 1, "B");
createAndStartThread("线程3打印C", lock, 2, "C");
}
private static void createAndStartThread(String threadName, Object lock, int remainder, String output) {
new Thread(() -> {
while (true) {
synchronized (lock) {
if (index.get() >= count) {
lock.notifyAll();
break;
}
if (index.get() % 3 == remainder) {
System.out.println(output);
index.getAndIncrement();
}
lock.notifyAll();
try {
lock.wait();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
}, threadName).start();
}
}

其他线程通信方式:

  • Object类的wait()和notifyAll()(采用)
  • Conditon的await,sign或signAll方法:创建三个Conditon对象A/B/C,A.await()就是让A线程等待;
  • Semaphore的acquire和release方法:使用三个Semaphore对象,分别初始化为1、0、0,表示A、B、C三个线程的初始许可数。每个线程在打印字母之前,需要调用对应的Semaphore对象的acquire方法,获取许可。每个线程在打印字母之后,需要调用下一个Semaphore对象的release方法,释放许可。

线程安全

基本介绍

当多个线程访问共享资源时,若不采取同步措施,可能导致数据不一致或其他异常。常见的线程安全问题包括:

  • 竞态条件Race Condition):多个线程竞争同一资源导致结果不确定。
  • 内存可见性:一个线程修改了共享变量,其他线程可能无法立即看到最新值。
  • 指令重排序:编译器或处理器为优化性能而重新排序指令,可能影响多线程执行顺序。

线程安全:程序在多线程环境下可以持续进行正确的处理,不会产生数据竞争(例如死锁)和不一致的问题。解决方案:原子类、volatile、锁、线程安全的集合

线程安全的解决方案:按照资源占用情况由轻到重排列:

  • 原子类:AtomicIntegerAtomicLong等,具有原子操作特征(化学中原子是最小单位、不可分割)的类,只能保证单个共享变量的线程安全
  • volatile:只能保证单个共享变量的线程安全
  • 锁:可以保证临界区内的多个共享变量线程安全。

原子类

原子类是具有原子操作特征(化学中原子是最小单位、不可分割)的类,原子是指一个操作是不可中断的。即使是在多个线程一起执行的时候,一个操作一旦开始,就不会被其他线程干扰。

在java.util.concurrent.atomic包下,有一系列“Atomic”开头的类,统称为原子类。例如AtomicInteger替代int ,底层采用CAS原子指令实现,内部的存储值使用volatile修饰,因此多线程之间是修改可见的。

以AtomicInteger为例,某线程调用该对象的incrementAndGet()方式自增时采用CAS尝试修改它的值,若此时没有其他线程操作该值便修改成功否则反复执行CAS操作直到修改成功。

CAS:不断对变量进行原子性比较和交换,从而解决单个变量的线程安全问题。比较内存中值和预期值,如果相等则交换,如果不相等就代表被其他线程改了则重试。

AtomicInteger常用方法:

  • 构造方法:
    • AtomicInteger (): 创建一个初始值为0的 AtomicInteger。
    • AtomicInteger(int initialValue): 创建一个初始值为 initialValue 的 AtomicInteger。
    • 获取和设置:
      • int get(): 获取当前的值。
      • void set(int newValue): 设置为 newValue。
      • int getAndSet(int newValue): 获取当前值,并设置为 newValue。
  • 原子更新:
    • boolean compareAndSet(int expect, int update): 如果当前值等于 expect,则更新为 update。
    • int getAndIncrement(): 以原子方式将当前值加1,返回的是旧值。
    • int incrementAndGet(): 以原子方式将当前值加1,返回的是新值。
    • int getAndDecrement(): 以原子方式将当前值减1,返回的是旧值。
    • int decrementAndGet(): 以原子方式将当前值减1,返回的是新值。
    • int getAndAdd(int delta): 以原子方式将当前值加上 delta,返回的是旧值。
    • int addAndGet(int delta): 以原子方式将当前值加上 delta,返回的是新值。
  • 其他方法:
    • int getAndUpdate(IntUnaryOperator updateFunction): 获取当前值,并按更新函数计算新值设置。
    • int updateAndGet(IntUnaryOperator updateFunction): 按更新函数计算新值设置,并返回新值。
    • int getAndAccumulate(int x, IntBinaryOperator accumulatorFunction): 获取当前值,并按累加函数计算新值设置。
    • int accumulateAndGet(int x, IntBinaryOperator accumulatorFunction): 按累加函数计算新值设置,并返回新值。

验证原子类的线程安全:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class Test {
public static int num = 0;
public static void main(String[] args) throws InterruptedException {
AtomicInteger atomicInteger = new AtomicInteger(0);
// 创建10个线程,分别对atomicInteger进行操作
for (int i = 0; i < 10; i++) {
new Thread(() -> {
for (int j = 0; j < 10000; j++) {
atomicInteger.incrementAndGet();
num++;
}
}).start();
}
// 阻塞主线程1s,保证10个线程执行完毕
Thread.sleep(1000);
System.out.println(atomicInteger);
System.out.println(num);
}
}
// 运行之后,可以看到原子类正常加到100000,而num没有

volatile关键字

volatile是一个关键字,被volatile声明的变量存在共享内存中,所有线程要读取、修改这个变量,都是从内存中读取、修改,并且修改操作是原子性的,所以它能保证线程安全。

volatile特性:

  • 有序性:被volatile声明的变量之前的代码一定会比它先执行,而之后的代码一定会比它慢执行。底层是在生成字节码文件时,在指令序列中插入内存屏障防止指令重排序。
  • 可见性:一旦修改变量则立即刷新到共享内存中,当其他线程要读取这个变量的时候,最终会去内存中读取,而不是从自己的工作空间中读取。每个线程自己的工作空间用于存放堆栈(存方法的参数和返回地址)和局部变量。
  • 原子性:volatile变量不能保证完全的原子性,只能保证单次的读/写操作具有原子性(在同一时刻只能被一个线程访问和修改),自增减、复合操作(+=,/=等)则不具有原子性。这也是和synchronized的区别。

读写内存语义:

  • 写内存语义:当写一个volatile变量时,JMM(Java内存模型)会把该线程本地内存中的共享变量的值刷新到主内存中。
  • 读内存语义:当读一个volatile变量时,JMM会把该线程本地内存置为无效,使其从主内存中读取共享变量。

有序性实现机制:

volatile有序性是通过内存屏障来实现的。内存屏障就是在编译器生成字节码时,会在指令序列中插入内存屏障来禁止特定类型的处理器重排序。

机器指令:JVM包括类加载子系统、运行时数据区、执行引擎。 执行引擎负责将字节码指令转为操作系统能识别的本地机器指令。

指令重排序:处理器为了提高运算速度会对指令重排序,重排序分三种类型:编译器优化重排序、处理器指令级并行重排序、内存系统重排序。

  • 编译器优化的重排序:编译器在不改变单线程程序的语义前提下,可以重新安排语句的执行顺序。
  • 指令级并行的重排序:现在处理器采用了指令集并行技术,将多条指令重叠执行。如果不存在依赖性,处理器可以改变语句对应的机器指令的执行顺序。
  • 内存系统的重排序:由于处理器使用缓存和读写缓冲区,这使得加载和存储操作看上去可能是在乱序执行。

加锁的方式有两种,分别是synchronized关键字和Lock接口(在JUC包下)。

synchronized锁是互斥锁,可以作用于实例方法、静态方法、代码块,能够保证同一个时刻只有一个线程执行该段代码,保证线程安全。 在执行完或者出现异常时自动释放锁。synchronized锁基于对象头和Monitor对象,在1.6之后引入轻量级锁、偏向锁等优化。

lock锁接口可以通过lock、unlock方法锁住一段代码,Lock实现类都是基于AQS实现的。Lock可以让等待锁的线程响应中断,而synchronized却不行,使用synchronized时,等待的线程会一直等待下去,不能够响应中断。

1
2
3
4
5
6
7
8
9
10
Lock lock = new ReentrantLock();
lock.lock();
try {
System.out.println("获得锁");
} catch(Exception e){
// ToDo: handle exception
} finally {
System. out.println(“释放锁");
lock. unlock();
}

线程安全的集合

  1. Collections工具类:Collections工具类的synchronizedXxx()方法将ArrayList等集合类包装成线程安全的集合类。
  2. 古老api:java.util包下性能差的古老api,如Vector、Hashtable
  3. 降低锁粒度的并发容器:JUC包下Concurrent开头的、以降低锁粒度来提高并发性能的容器,如ConcurrentHashMap。
  4. 复制技术实现的并发容器:JUC包下以CopyOnWrite开头的、采用写时复制技术实现的并发容器,如CopyOnWriteArrayList。

分布式场景下的线程安全

在分布式系统中,仅靠JVM级别的同步机制无法保证线程安全,需引入分布式锁:

  1. Redis分布式锁

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    import redis.clients.jedis.Jedis;

    public class RedisLock {
    private static final String LOCK_KEY = "distributed_lock";
    private static final String RELEASE_SCRIPT =
    "if redis.call('get', KEYS[1]) == ARGV[1] then " +
    " return redis.call('del', KEYS[1]) " +
    "else " +
    " return 0 " +
    "end";

    private Jedis jedis;

    public RedisLock(Jedis jedis) {
    this.jedis = jedis;
    }

    public boolean acquireLock(String requestId, int expireTime) {
    String result = jedis.set(LOCK_KEY, requestId, "NX", "PX", expireTime);
    return "OK".equals(result);
    }

    public boolean releaseLock(String requestId) {
    Object result = jedis.eval(RELEASE_SCRIPT, 1, LOCK_KEY, requestId);
    return 1L.equals(result);
    }
    }
  2. ZooKeeper分布式锁:使用Apache Curator框架

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    import org.apache.curator.framework.CuratorFramework;
    import org.apache.curator.framework.CuratorFrameworkFactory;
    import org.apache.curator.framework.recipes.locks.InterProcessMutex;
    import org.apache.curator.retry.ExponentialBackoffRetry;

    public class ZookeeperLock {
    private static final String LOCK_PATH = "/distributed_lock";
    private InterProcessMutex lock;

    public ZookeeperLock(String zkConnectString) {
    CuratorFramework client = CuratorFrameworkFactory.newClient(
    zkConnectString,
    new ExponentialBackoffRetry(1000, 3)
    );
    client.start();
    lock = new InterProcessMutex(client, LOCK_PATH);
    }

    public void acquire() throws Exception {
    lock.acquire();
    }

    public void release() throws Exception {
    lock.release();
    }
    }

线程同步

基本介绍

多条语句共享数据时,多线程程序会出现数据安全问题

线程同步:即当有一个线程在对内存进行操作时,其他线程都不可以对这个内存地址进行操作,直到该线程完成操作, 其他线程才能对该内存地址进行操作,而其他线程又处于等待状态。

Java主要通过加锁的方式实现线程同步,而锁有两类,分别是synchronized关键字和Lock接口(在JUC包下)。具体见

对比线程安全和线程同步:线程同步是实现线程安全的一种手段

  • 线程安全:程序在多线程环境下可以持续进行正确的处理,不会产生数据竞争(例如死锁)和不一致的问题。解决方案:原子类、volatile、锁、线程安全的集合
  • 线程同步:确保多个线程正确、有序地访问共享资源。解决方案:锁

同步代码块

同步代码块作用在代码块上,则需要在关键字后面的小括号里,显式指定锁对象,例如this、Xxx.class。

同步代码块简单来说就是将一段代码用一把锁给锁起来, 只有获得了这把锁的线程才访问, 并且同一时刻, 只有一个线程能持有这把锁, 这样就保证了同一时刻只有一个线程能执行被锁住的代码。

1
2
3
synchronized(同步对象) {
//多条语句操作共享数据的代码
}

同步代码块的好处:解决了多线程的数据安全问题

弊端:线程很多时,每个线程都会去判断锁,这是很耗费资源和时间的。例如,共有100张票,三个窗口卖票,通过加锁防止超卖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class SellTicket implements Runnable {
private int tickets = 100;
private final Object obj = new Object();
@Override
public void run() {
while (true) {
synchronized (obj) {
if (tickets > 0) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " 正在出售第 " + tickets + " 张票");
tickets--;
} else {
break;
}
}
}
}
public static void main(String[] args) {
SellTicket sellTicket = new SellTicket();
Thread t1 = new Thread(sellTicket, "窗口1");
Thread t2 = new Thread(sellTicket, "窗口2");
Thread t3 = new Thread(sellTicket, "窗口3");
t1.start();
t2.start();
t3.start();
}
}

同步方法

  1. 作用在静态方法上,则锁是当前类的Class对象。

  2. 作用在普通方法上,则锁是当前的实例(this)。

非静态同步方法的锁对象为this。下面代码是相同功能的同步方法和同步代码块:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
// 锁的粒度是当前对象
// 方法1:实例方法,使用this对象锁
private void sellTicket1() {
synchronized (this) {
if (tickets > 0) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " 正在出售第 " + tickets + " 张票");
tickets--;
}
}
}

// 方法2:实例方法,使用this对象锁
private void sellTicket2() {
synchronized (this) {
if (tickets > 0) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " 正在出售第 " + tickets + " 张票");
tickets--;
}
}
}

// 锁的粒度是整个类:
// 静态同步方法的锁对象为:类名.class。下面代码是相同功能的同步方法和同步代码块
// 方法3:静态方法,使用类对象锁
private static synchronized void sellTicket3() {
if (tickets > 0) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " 正在出售第 " + tickets + " 张票");
tickets--;
}
}

// 方法4:静态方法,使用类对象锁
private static void sellTicket4() {
synchronized (SellTicket.class) {
if (tickets > 0) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " 正在出售第 " + tickets + " 张票");
tickets--;
}
}
}

异步执行时保证事务有效性

Spring的事务管理基于线程绑定的TransactionSynchronizationManager,而异步方法会在独立线程中执行,导致事务上下文丢失。

问题根源

Spring事务依赖于线程上下文传递事务信息。当使用@Async时,方法在新线程中执行,与调用线程不在同一个事务上下文:

  1. 事务管理器失效:新线程没有绑定事务上下文。
  2. 数据库连接丢失:每个线程使用独立的数据库连接。
  3. 异常回滚失效:异步线程的异常无法触发调用线程的事务回滚。

解决方案

独立事务(推荐)

  • 为每个异步方法创建独立的事务,适用于可容忍部分失败的场景(如批量处理)

  • 特点

    • 每个异步任务独立提交/回滚
    • 适合批量处理大量数据,部分失败不影响整体
  • 配置示例

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    @Service
    public class AsyncService {
    @Async("asyncExecutor")
    @Transactional(propagation = Propagation.REQUIRES_NEW) // 创建新事务
    public CompletableFuture<Void> processData(Long recordId) {
    // 数据库操作
    repository.updateStatus(recordId, "PROCESSING");
    try {
    // 业务逻辑
    complexProcessing(recordId);
    repository.updateStatus(recordId, "SUCCESS");
    return CompletableFuture.completedFuture(null);
    } catch (Exception e) {
    repository.updateStatus(recordId, "FAILED");
    throw new RuntimeException("处理失败", e); // 触发当前事务回滚
    }
    }
    }

事件驱动架构

  • 将异步操作转为事件,主线程提交事务后再处理事件,确保数据一致性。

  • 特点

    • 事务提交后才触发异步处理
    • 适合耗时操作不影响主线程事务的场景
  • 实现步骤

    • 定义事件

      1
      2
      3
      4
      5
      public class DataProcessEvent {
      private final Long recordId;
      public DataProcessEvent(Long recordId) { this.recordId = recordId; }
      // getter
      }
    • 发布事件(在事务内)

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      @Service
      public class MainService {
      @Autowired
      private ApplicationEventPublisher eventPublisher;
      @Transactional
      public void createAndProcessData() {
      // 创建记录(事务内)
      Long recordId = repository.save(new Record()).getId();
      // 发布事件(事务提交后触发)
      eventPublisher.publishEvent(new DataProcessEvent(recordId));
      }
      }
    • 异步监听事件

      1
      2
      3
      4
      5
      6
      7
      8
      9
      @Component
      public class DataProcessListener {
      @Async
      @EventListener
      public void handleDataProcessEvent(DataProcessEvent event) {
      // 异步处理(无事务)
      processData(event.getRecordId());
      }
      }

手动管理事务(高级)

  • 在异步方法中手动获取和管理事务,适用于强一致性要求的场景。

  • 特点

    • 完全控制事务边界
    • 代码复杂度高,需谨慎处理异常
  • 示例代码

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    @Service
    public class ManualTransactionService {
    @Autowired
    private PlatformTransactionManager transactionManager;
    @Autowired
    private TransactionDefinition transactionDefinition;

    @Async("asyncExecutor")
    public CompletableFuture<Void> processWithManualTx(Long recordId) {
    TransactionStatus status = transactionManager.getTransaction(transactionDefinition);
    try {
    // 数据库操作
    repository.updateStatus(recordId, "PROCESSING");
    complexProcessing(recordId);

    // 手动提交事务
    transactionManager.commit(status);
    return CompletableFuture.completedFuture(null);
    } catch (Exception e) {
    // 手动回滚事务
    transactionManager.rollback(status);
    throw new RuntimeException("处理失败", e);
    }
    }
    }

补偿事务(最终一致性)

  • 通过补偿机制保证最终一致性,适用于分布式系统

  • 特点

    • 保证最终一致性,而非强一致性
    • 适合跨服务、跨系统的操作
  • 实现方案

    • 记录操作日志:在主事务中记录所有操作。
    • 异步执行:调用外部服务或执行复杂逻辑。
    • 补偿逻辑:若异步操作失败,根据日志执行反向操作
  • 示例代码

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    @Service
    public class CompensationService {
    @Transactional
    public void createOrderWithCompensation(Order order) {
    // 1. 创建订单(主事务)
    Order savedOrder = orderRepository.save(order);
    // 2. 记录补偿日志(主事务)
    compensationLogRepository.save(new CompensationLog(
    savedOrder.getId(), "CREATE_ORDER", savedOrder
    ));
    // 3. 异步处理库存、支付等(无事务)
    asyncService.processOrderAsync(savedOrder.getId());
    }
    }

    @Service
    public class AsyncService {
    @Async
    public void processOrderAsync(Long orderId) {
    try {
    // 扣减库存、调用支付等操作
    inventoryService.debitStock(orderId);
    paymentService.processPayment(orderId);
    } catch (Exception e) {
    // 触发补偿逻辑
    compensationService.rollbackOrder(orderId);
    }
    }
    }

最佳实践总结

  1. 优先使用独立事务:为每个异步任务创建独立事务,通过状态跟踪失败记录。
  2. 避免长事务:将耗时操作移出事务,减少锁持有时间。
  3. 使用可靠消息队列:如RabbitMQ、Kafka,确保事件不丢失。
  4. 实现幂等性:异步操作需支持重试(如唯一索引、状态校验)。
  5. 监控与告警:记录异步任务状态,及时发现并处理失败。

常见误区

  1. 错误配置传播行为

    • 使用Propagation.REQUIRED(默认)会导致异步方法加入调用者的事务(但实际上无法加入)。

    • 必须使用Propagation.REQUIRES_NEW创建新事务。

  2. 忽略异步异常

    • 未捕获的异常会导致事务无法回滚。
    • 确保在异步方法中处理异常或使用CompletableFuture的异常处理。
  3. 过度依赖同步事务

    • 在分布式系统中,强一致性难以实现,考虑最终一致性方案

多线程如何保证事务一致性

‌在多线程环境下保证事务一致性主要通过编程式事务管理、线程资源隔离、分布式事务框架及数据库锁机制等方案实现‌,具体选择需根据业务场景和技术架构灵活适配。‌‌

核心实现方案:编程式事务管理‌

  1. 通过代码手动控制事务边界,将多个子线程操作纳入统一事务管理框架

  2. 使用Spring的TransactionTemplate在子线程中执行数据库操作。

  3. 主线程等待所有子线程执行完成后统一提交或回滚事务。‌‌‌‌

  4. 示例代码结构

    1
    2
    3
    4
    5
    6
    7
    8
    9
    List<Future<?>> futures = new ArrayList<>();
    for (Task task : tasks) {
    futures.add(executor.submit(() ->
    transactionTemplate.execute(status -> {
    // 事务操作
    });
    ));
    }
    // 检查所有子线程结果后决策提交/回滚 (自己判断是提交还是回滚)

关键技术支撑

  • 事务资源隔离机制

    • 突破默认的ThreadLocal绑定模式,通过自定义TransactionSynchronizationManager实现跨线程Connection共享。‌‌

    • 使用@Transactional(propagation = REQUIRES_NEW)创建独立事务上下文。

  • 并发控制策略

    • 数据库锁机制:结合SELECT ... FOR UPDATE实现行级锁,配合事务隔离级别(如REPEATABLE_READ)防止脏写。‌‌

    • Java同步工具:采用CountDownLatch确保所有子线程就绪后统一提交。

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      46
      47
      48
      49
      50
      51
      52
      53
      54
      55
      56
      57
      58
      59
      60
      61
      62
      63
      64
      65
      66
      67
      @Slf4j
      static class PersonService {
      @Resource
      private JdbcTemplate jdbcTemplate;
      @Resource
      private DataSource dataSource ;

      @Transactional
      public void save() throws Exception {
      CountDownLatch cdl = new CountDownLatch(2) ;
      AtomicBoolean txRollback = new AtomicBoolean(false) ;
      CompletableFuture.runAsync(() -> {
      Person person = new Person();
      person.setAge(1);
      person.setName("张三");
      transactionTemplate.execute(status -> {
      int result = 0;
      try {
      result = jdbcTemplate.update( "insert into t_person (age, name) values (?, ?)",
      person.getAge(), person.getName() );
      // TODO
      // log.info(1 / 0) ;
      } catch (Exception e) {
      txRollback.set(true); // 当发生异常后将状态该为 true
      }
      try {
      cdl.countDown(); // 计数减一
      cdl.await(); // 继续等待其它线程结束
      } catch (InterruptedException e) {
      e.printStackTrace();
      }
      if (txRollback.get()) { // 如果回滚状态为 true 说明有线程发生了异常,需要事务回滚
      status.setRollbackOnly(); // 标记当前事务回滚
      }
      log.info("%s Insert Operator Result: %d 次%n", Thread.currentThread().getName(), result);
      return result ;
      }) ;
      }) ;
      transactionTemplate.execute(status -> {
      Person person = new Person();
      person.setAge(2);
      person.setName("李四");
      int result = 0 ;
      try {
      result = jdbcTemplate.update("insert into t_person (age, name) values (?, ?)",
      person.getAge(), person.getName()) ;
      // TODO
      TimeUnit.SECONDS.sleep(3) ;
      } catch (Exception e) {
      txRollback.set(true) ;
      }
      try {
      cdl.countDown() ;
      cdl.await() ;
      } catch (InterruptedException e) {
      e.printStackTrace();
      }
      if (txRollback.get()) {
      status.setRollbackOnly(); // 回滚
      }
      log.info("%s Insert Operator Result: %d 次%n", Thread.currentThread().getName(), result);
      return result ;
      }) ;
      cdl.await() ;
      log.info("Operator Complete...") ;
      }
      }
    • 分布式锁:通过RedisZooKeeper实现跨进程锁协调。‌‌

扩展方案选型‌ :分布式事务补偿‌

  • Saga模式:拆分事务为多个可补偿的本地事务。

  • TCC(Try-Confirm-Cancel)模式:通过预留资源保证最终一致性。

  • 整合Seata等分布式事务框架实现全局事务管理