并发编程

1. 认识线程

1.1 什么是线程和进程?

  • 进程:进程是程序的一次执行过程,是系统运行程序的基本单位,因此进程是动态的。系统运行一个程序即是一个进程从创建,运行到消亡的过程。在 Java 中,当我们启动 main 函数时其实就是启动了一个 JVM 的进程,而 main 函数所在的线程就是这个进程中的一个线程,也称主线程。
  • 线程:线程与进程相似,但线程是一个比进程更小的执行单位。一个进程在其执行的过程中可以产生多个线程。与进程不同的是同类的多个线程共享进程的堆和方法区资源,但每个线程有自己的程序计数器、虚拟机栈和本地方法栈,所以系统在产生一个线程,或是在各个线程之间做切换工作时,负担要比进程小得多,也正因为如此,线程也被称为轻量级进程。

1.2 线程和进程的关系、区别、优缺点

线程是进程划分成的更小的运行单位。线程和进程最大的不同在于基本上各进程是独立的,而各线程则不一定,因为同一进程中的线程极有可能会相互影响。线程执行开销小,但不利于资源的管理和保护;而进程正相反。

1.3 线程的生命周期

Java 线程在运行的生命周期中的指定时刻只可能处于下面 6 种不同状态的其中一个状态:

  • NEW: 初始状态,线程被创建出来但没有被调用 start() 。
  • RUNNABLE: 运行状态,线程被调用了 start()等待运行的状态。
  • BLOCKED:阻塞状态,需要等待锁释放。
  • WAITING:等待状态,表示该线程需要等待其他线程做出一些特定动作(通知或中断)。
  • TIME_WAITING:超时等待状态,可以在指定的时间后自行返回而不是像 WAITING 那样一直等待。
  • TERMINATED:终止状态,表示该线程已经运行完毕。

image.png

线程创建之后它将处于 NEW(新建) 状态,调用 start() 方法后开始运行,线程这时候处于 READY(可运行) 状态。可运行状态的线程获得了 CPU 时间片(timeslice)后就处于 RUNNING(运行) 状态。

在操作系统层面,线程有 READY 和 RUNNING 状态;而在 JVM 层面,只能看到 RUNNABLE 状态。因为现在的时分多任务系统多采用时间分片方式抢占式轮转调度,时间分片通常很小,所以操作系统会频繁地进行线程切换,大概只有 0.01s 这个量级,也就没什么必要区分这两种状态了。

1.4 为什么要使用多线程?

  • 从计算机底层来说: 线程可以比作是轻量级的进程,是程序执行的最小单位,线程间的切换和调度的成本远远小于进程。另外,多核 CPU 时代意味着多个线程可以同时运行,这减少了线程上下文切换的开销。
  • 从当代互联网发展趋势来说: 现在的系统并发量高,多线程并发编程正是开发高并发系统的基础,利用好多线程机制可以大大提高系统整体的并发能力以及性能。

1.5 多线程效率一定高吗?

  • CPU 密集型:CPU 密集型的线程主要进行计算和逻辑处理,需要占用大量的 CPU 资源。
  • IO 密集型:IO 密集型的线程主要进行输入输出操作,如读写文件、网络通信等,需要等待 IO 设备的响应,而不占用太多的 CPU 资源。

对于单核 CPU 来说,如果任务是 CPU 密集型的,那么开很多线程会影响效率;如果任务是 IO 密集型的,那么开多线程会提高效率。

1.6 死锁

多个线程同时被阻塞,它们中的一个或者全部都在等待某个资源被释放。由于线程被无限期地阻塞,因此程序不可能正常终止。

死锁的四个必要条件:

  • 互斥条件:该资源任意一个时刻只由一个线程占用。
  • 请求与保持条件:一个线程因请求资源而阻塞时,对已获得的资源保持不放。
  • 不剥夺条件:线程已获得的资源在未使用完之前不能被其他线程强行剥夺,只有自己使用完毕后才释放资源。
  • 循环等待条件:若干线程之间形成一种头尾相接的循环等待资源关系。

预防死锁:

  • 破坏请求与保持条件:一次性申请所有的资源。
  • 破坏不剥夺条件:占用部分资源的线程进一步申请其他资源时,如果申请不到,可以主动释放它占有的资源。
  • 破坏循环等待条件:靠按序申请资源来预防。按某一顺序申请资源,释放资源则反序释放。破坏循环等待条件。

避免死锁:
避免死锁就是在资源分配时,借助于算法(比如银行家算法)对资源分配进行计算评估,使其进入安全状态。系统能够按照某种线程推进顺序(P1、P2、P3……Pn)来为每个线程分配所需资源,直到满足每个线程对资源的最大需求,使每个线程都可顺利完成。

2. 线程使用

2.1 线程的创建

2.1.1 继承 Thread 类

1
2
3
4
5
6
7
8
9
10
class MyThread extends Thread {
@Override
public void run() {
for (int i = 0; i < 100; i++) {
// getName() 获取线程名称, 此方法继承自 Thread 类
System.out.println(getName() +" " + i);

}
}
}

创建线程对象,调用 start 方法启动线程。

1
2
3
4
5
6
7
8
9
10
11
public class Main {
public static void main(String[] args){
MyThread thread1 = new MyThread();
// 设置线程名称
thread1.setName("Thread1");
MyThread thread2 = new MyThread();
thread2.setName("Thread2");
thread1.start();
thread2.start();
}
}

2.1.2 实现 Runnable 接口

自己定义一个类实现 Runnable 接口,重写 run 方法。

1
2
3
4
5
6
7
8
9
class MyRunnable implements Runnable {
@Override
public void run() {
for (int i = 0; i < 5; i++) {
// Thread.currentThread() 获取当前线程对象
System.out.println(Thread.currentThread().getName() + " " + i);
}
}
}

创建类对象并调用 start 方法启动线程。

1
2
3
4
5
6
7
8
9
10
11
public class Main {
public static void main(String[] args){
MyRunnable r = new MyRunnable();
Thread t1 = new Thread(r);
t1.setName("Thread 1");
Thread t2 = new Thread(r);
t2.setName("Thread 2");
t1.start();
t2.start();
}
}

2.1.3 实现 Callable 接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
class MyCallable implements Callable<Integer> {
private int n;
public MyCallable(int n){
this.n = n;
}
@Override
public Integer call() throws Exception {
int sum = 0;
for (int i = 0; i < n; i++) {
sum += i;
}
return sum;
}
}

创建类对象并调用 start 方法启动线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

public class Main {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 创建MyCallable对象(表示要执行的任务)
MyCallable mc = new MyCallable(10);
// 创建FutureTask对象(管理多线程运行的结果)
FutureTask<Integer> ft = new FutureTask<>(mc);
// 创建线程的对象
Thread t = new Thread(ft);
// 启动线程
t.start();
// 获取线程的返回值
System.out.println(ft.get());

}
}

2.2 线程常用方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21

public class Main {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 可以传入字符串作为线程的名字,但是需要调用父类的构造方法
MyThread thread1 = new MyThread();
// 1.获取线程的名字,默认是 Thread-X
System.out.println(thread1.getName());
// 2.设置线程的名字
thread1.setName("线程1");
// 3.获取当前线程对象
Thread thread = Thread.currentThread();
// 4.线程休眠。哪条线程执行到这个方法,哪条线程就休眠。单位是ms
Thread.sleep(1000);
// 5.获取线程的优先级[1, 10], 越大优先级越高,默认是5
System.out.println(thread.getPriority());
// 6.设置守护线程, 当其他线程执行完毼,守护线程也会结束。
thread1.setDaemon(true);
// 7.礼让线程,让当前线程暂停,让其他线程执行 [不一定成功] Thread.yield();
// 8.插入线程,让其他线程等待当前线程执行完毕再执行 thread1.join()
}
}

2.3 线程的状态

2.4 线程同步

2.4.1 synchronized

2.4.1.1 同步代码块

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30

class MyThread extends Thread {
static int ticket = 0;
// 锁对象一定要唯一
static Object obj = new Object();
@Override
public void run() {
while(true){
synchronized (obj){
if(ticket > 100){
break;
}
ticket++;
System.out.println(Thread.currentThread().getName() + "卖票" + ticket);
}
}
}
}

public class Main {
public static void main(String[] args) throws ExecutionException, InterruptedException {
MyThread mt1 = new MyThread();
MyThread mt2 = new MyThread();
MyThread mt3 = new MyThread();
mt1.start();
mt2.start();
mt3.start();

}
}

锁对象一定要唯一,否则会出现线程安全问题。
synchronized 一定要写在 while 循环里面,否则只有一个线程能卖票。

2.4.1.2 同步方法

需求:三个窗口卖票,总票数为 100 张。

  • 同步方法是是锁方法里面的全部代码
  • 非静态方法锁对象是 this,静态方法锁对象是当前类的字节码对象
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47

public class Main {
public static void main(String[] args) throws ExecutionException, InterruptedException {
MyRunnable r = new MyRunnable();
Thread t1 = new Thread(r);
Thread t2 = new Thread(r);
Thread t3 = new Thread(r);
t1.setName("窗口1");
t2.setName("窗口2");
t3.setName("窗口3");
t1.start();
t2.start();
t3.start();


}
}

class MyRunnable implements Runnable {
// 只创建一个对象,所以票数是共享的
int ticket = 0;
@Override
public void run() {
while(true){
synchronized (MyRunnable.class){
if (method()) break;
}
}
}

// this
private synchronized boolean method() {
if (ticket == 100) {
return true;
} else {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
ticket++;
System.out.println(Thread.currentThread().getName() + "卖票" + ticket);
}
return false;
}
}

2.5 Lock

Lock 是一个接口,需要实现类 ReentrantLock。Lock 锁是一个显式锁,需要通过 lock() 方法上锁,必须通过 unlock() 方法释放锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24

class MyThread extends Thread {
static int ticket = 0;
// 锁对象一定要唯一
static Lock lock = new ReentrantLock();

@Override
public void run() {
while (true) {
lock.lock();
try {
if (ticket >= 100) {
break;
}
ticket++;
System.out.println(Thread.currentThread().getName() + "卖票" + ticket);
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
lock.unlock();
}
}
}
}

2.6 生产者和消费者

2.6.1 wait 和 notify

Cook 类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
package Cook;

public class Cook extends Thread{
@Override
public void run() {
while(true){
synchronized(Desk.lock){
if(Desk.count == 0){
break;
} else{
if(Desk.foodFlag == 1){
try {
Desk.lock.wait();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
} else{
Desk.count--;
System.out.println("厨师正在做面,还能做" + Desk.count + "碗");
Desk.lock.notifyAll();
Desk.foodFlag = 1;
}
}
}
}
}
}

Foodie 类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package Cook;

public class Foodie extends Thread{
@Override
public void run() {
while(true){
synchronized (Desk.lock){
if(Desk.count == 0){
break;
} else{
if(Desk.foodFlag == 0){
try {
Desk.lock.wait(); // 当前线程和锁绑定
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
} else{
Desk.count--;
System.out.println("吃货正在吃面,还能再吃" + Desk.count + "碗");
Desk.lock.notifyAll(); // 唤醒和锁绑定的所有线程
// 修改桌子上的食物状态
Desk.foodFlag = 0;
}
}
}
}
}
}

Desk 类

1
2
3
4
5
6
7
8
9
package Cook;

public class Desk {
// 0表示没有食物,>=1表示有食物
public static int foodFlag = 0;
public static int count = 10;
public static Object lock = new Object();
}

Demo 类

1
2
3
4
5
6
7
8
9
10
11
12
13
package Cook;

public class ThreadDemo {
public static void main(String[] args) {
Cook cook = new Cook();
Foodie foodie = new Foodie();
cook.setName("厨师");
foodie.setName("吃货");
cook.start();
foodie.start();
}
}

2.6.2 BlockingQueue(阻塞队列)

take() 和 put() 方法是阻塞的,当队列为空时,调用 take() 方法会阻塞线程,直到队列中有元素;当队列满时,调用 put() 方法会阻塞线程,直到队列中有空间。这两个方法会在内部上锁。
Cook 类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
package blockqueue;

import java.util.Arrays;
import java.util.concurrent.ArrayBlockingQueue;

public class Cook extends Thread{

ArrayBlockingQueue<String> queue;
public Cook(ArrayBlockingQueue<String> queue){
this.queue = queue;
}
@Override
public void run() {
// 不停的往队列中放入食物
while(true){
try{
queue.put("面条");
System.out.println("厨师做了一碗面条");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
}

Foodie 类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package blockqueue;

import java.util.concurrent.ArrayBlockingQueue;

public class Foodie extends Thread{
ArrayBlockingQueue<String> queue;
public Foodie(ArrayBlockingQueue<String> queue){
this.queue = queue;
}
@Override
public void run() {
// 不停的从队列中取出食物
while(true){
try{
String food = queue.take();
System.out.println("吃货吃了" + food);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
}

Demo 类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
package blockqueue;

import java.util.concurrent.ArrayBlockingQueue;

public class Demo {
public static void main(String[] args) {
// 创建一个大小为1的阻塞队列
ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<>(1);
// 创建一个厨师线程
Cook cook = new Cook(queue);
// 创建一个吃货线程
Foodie foodie = new Foodie(queue);
// 启动线程
cook.start();
foodie.start();
}
}

2.7 线程池

线程池是管理一系列线程的资源池。当有任务要处理时,直接从线程池中获取线程来处理,处理完之后线程并不会立即被销毁,而是等待下一个任务。

使用线程池的好处:

  • 降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
  • 提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。
  • 提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。

2.7.1 创建线程池

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
package pool;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Demo {
public static void main(String[] args) {
// 获取线程池对象
// 1.创建没有上限的线程池
// ExecutorService pool = Executors.newCachedThreadPool();
// 2.创建固定大小的线程池
ExecutorService pool = Executors.newFixedThreadPool(3);
// 3.创建单线程的线程池
// ExecutorService pool = Executors.newSingleThreadExecutor();
// 4.创建定时任务的线程池
// ExecutorService pool = Executors.newScheduledThreadPool(3);
// 提交任务
pool.submit(new MyRunnable());
pool.submit(new MyRunnable());
pool.submit(new MyRunnable());
// 关闭线程池
pool.shutdown();
}
}

2.7.2 自定义线程池

  • 核心线程满时, 再提交任务就会排队
  • 核心线程满了,队列满了,再提交任务就会创建新的线程
  • 核心线程满了,队列满了,线程池满了,再提交任务就会执行拒绝策略
1
2
3
4
5
6
7
8
9
10
11
12
13
public static void main(String[] args) {
// 定义线程池对象
ThreadPoolExecutor pool = new ThreadPoolExecutor(
3, // 核心线程数
6, // 最大线程数
60, // 空闲线程存活时间
TimeUnit.SECONDS, // 时间单位
new ArrayBlockingQueue<>(3), // 任务队列
Executors.defaultThreadFactory(), // 线程工厂
new ThreadPoolExecutor.AbortPolicy() // 拒绝策略
);

}
  • CPU 密集型任务(N+1)
  • I/O 密集型任务(2N)
    最佳线程数 = N(CPU 核心数)∗(1+WT(线程等待时间)/ST(线程计算时间)),其中 WT(线程等待时间)= 线程运行总时间 - ST(线程计算时间)。

3. 进阶内容

3.1 CompletableFuture

3.1.1 Future 接口

Future 接口(FutureTaks 实现类) 定义了操作异步任务执行的一些方法,如获取任务执行结果、判断任务是否执行完成、取消任务等。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31

package pool;

import java.util.concurrent.*;

public class Demo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
FutureTask<String> futureTask = new FutureTask<>(new MyCallable());
Thread t1 = new Thread(futureTask);
t1.start();
// 获取执行结果
System.out.println(futureTask.get());
// 也可以设置超时时间
// System.out.println(futureTask.get(3, TimeUnit.SECONDS));

// get方法在前面会阻塞主线程
System.out.println("干别的了");
}
}

class MyCallable implements Callable<String> {

@Override
public String call() throws Exception {
System.out.println("Come in");
// 休眠5s
Thread.sleep(5000);
return "Hello";
}
}

Future 对于获取结果不友好,只能阻塞或者轮询(isDone 方法),直到任务执行完成。

3.1.2 CompletableFuture

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

runAsync 无返回值

```java
public class Demo {
public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
// 创建线程池
ExecutorService executorService = Executors.newFixedThreadPool(3);
// runAsync没有返回值, 默认使用ForkJoinPool.commonPool()作为线程池,但是也可以传入自定义线程池
CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> {
System.out.println(Thread.currentThread().getName() + "runAsync");
}, executorService);
// 返回null
System.out.println(completableFuture.get());
executorService.shutdown();
}
}

supplyAsync 有返回值
使用 join 方法获取结果不需要抛出异常,get 方法需要抛出异常。

1
2
3
4
5
6
7
8
9
10
11
12
public class Demo {
public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
// 创建线程池
ExecutorService executorService = Executors.newFixedThreadPool(3);
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() ->{
return "Hello";
}, executorService);

System.out.println(completableFuture.get());
executorService.shutdown();
}
}

whenComplete 方法, 对结果进行处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25

public class Demo {
public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
// 创建线程池, 默认的线程池是守护线程,程序执行完毕,线程池会自动关闭因此获取不到结果
ExecutorService executorService = Executors.newFixedThreadPool(3);
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() ->{
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return "Hello";
}, executorService).whenComplete((result, exception) -> {
if (exception == null) {
System.out.println("成功拿到结果: " + result);
} else {
System.out.println("出错了: " + exception);
}
});

System.out.println("我先干别的了!!");
executorService.shutdown();
}
}

3.1.3 常用方法

获取结果:

  • getNow()方法会立即返回结果,如果没有结果,就返回默认值
  • join()方法会阻塞当前线程,直到任务完成
  • get()方法会阻塞当前线程,直到任务完成,返回结果或者抛出异常

处理结果:

  • thenRun 方法,执行 A 之后执行 B,不需要上一个任务的结果。

  • thenApply 方法,对结果进行处理,返回一个新的结果
    当前步骤错误,会导致后续步骤不执行

  • handle 方法,对结果进行处理,返回一个新的结果
    当前步骤错误,可以根据异常信息进行处理。后续步骤会继续执行

消费结果:

  • thenAccept 方法,对结果进行消费,没有返回值

thenRun 方法 用指定的线程池
thenRunAsync 方法 使用默认的线程池

对计算速度进行选用:

  • applyToEither 方法,两个任务中任意一个完成,就执行下一步

对计算结果进行合并:

  • thenCombine 方法,两个任务都完成,对结果进行合并

3.2 锁

3.2.1 乐观锁和悲观锁

3.2.1.1 悲观锁

如 Synchronized、ReentrantLock。
悲观锁总是假设最坏的情况,认为共享资源每次被访问的时候就会出现问题(比如共享数据被修改),所以每次在获取资源操作的时候都会上锁,这样其他线程想拿到这个资源就会阻塞直到锁被上一个持有者释放。也就是说,共享资源每次只给一个线程使用,其它线程阻塞,用完后再把资源转让给其它线程。

应用场景:悲观锁通常多用于写比较多的情况(多写场景,竞争激烈),这样可以避免频繁失败和重试影响性能,悲观锁的开销是固定的。不过,如果乐观锁解决了频繁失败和重试这个问题的话(比如 LongAdder),也是可以考虑使用乐观锁的,要视实际情况而定。

锁的使用:

  • 方法上加了 synchronized 关键字,锁的是当前对象 this,被锁定后,其他的线程无法访问这个对象的其他 synchronized 方法。但是多个对象之间不影响。
  • 普通方法和同步锁无关。
  • 静态方法上加了 synchronized 关键字,锁的是当前类的字节码对象,被锁定后,其他的线程无法访问这个类的其他 synchronized 静态方法。

底层是通过 monitorenter 和 monitorexit 指令实现的。
一个 monitorenter 和两个 monitorexit 指令,一个是正常退出,一个是异常退出。
调用指令先检查方法的 ACC_SYNCHRONIZED 访问位是否被设置,如果设置了执行线程将先持有 monitor 对象,然后再去执行方法体,最后执行 monitorexit 指令释放 monitor 对象。
ACC_STATIC 表示是静态方法,ACC_SYNCHRONIZED 表示是同步方法。

3.2.1.2 乐观锁

乐观锁总是假设最好的情况,认为共享资源每次被访问的时候不会出现问题,线程可以不停地执行,无需加锁也无需等待,只是在提交修改的时候去验证对应的资源(也就是数据)是否被其它线程修改了(具体方法可以使用版本号机制或 CAS 算法)。

应用场景:乐观锁通常多用于写比较少的情况(多读场景,竞争较少),这样可以避免频繁加锁影响性能。不过,乐观锁主要针对的对象是单个共享变量

3.2.2 公平锁和非公平锁

1
2
3
4
// 传入一个 boolean 值,true 时为公平锁,false 时为非公平锁
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
  • 公平锁 : 锁被释放之后,先申请的线程先得到锁。性能较差一些,因为公平锁为了保证时间上的绝对顺序,上下文切换更频繁。
  • 非公平锁:锁被释放之后,后申请的线程可能会先获取到锁,是随机或者按照其他优先级排序的。性能更好,但可能会导致某些线程永远无法获取到锁。

3.2.3 可重入锁

又称递归锁,可重入锁是指同一个线程可以多次获取同一把锁,而不会出现死锁。ReentrantLock 就是一个可重入锁。

可重入锁的实现原理是,每个锁关联一个线程持有者和计数器,当计数器为 0 时表示该锁没有被任何线程持有,那么任何线程都可能获得该锁而调用相应的方法;当一个线程请求一个未被持有的锁时,JVM 将记下锁的持有者,并且将计数器置为 1;如果同一个线程再次请求这个锁,计数器将递增;当线程退出同步代码块时,计数器递减,如果计数器为 0,则释放该锁。

3.2.4 隐式锁和显式锁

隐式锁:synchronized 关键字是隐式锁,不需要手动释放锁,当 synchronized 代码块执行完毕后,锁会自动释放。

显示锁:ReentrantLock 是显式锁,需要手动释放锁,如果不释放锁,可能会导致死锁。

3.2.5 自旋锁 spinlock

CAS 经常会用到自旋操作来进行重试,也就是不成功就一直循环执行直到成功。如果长时间不成功,会给 CPU 带来非常大的执行开销。

3.3 LockSupport

3.3.1 线程中断

在 Java 中没办法立即停止一条线程,然而停止线程是非常重要的,因为线程可能会进入死锁状态,或者线程可能会执行一个长时间的任务,而我们又想要取消它。为了解决这个问题,Java 提供了一个中断机制,但是是否中断由线程自己决定

void interrupt():中断线程,设置中断标志位为 true。
boolean isInterrupted():判断线程是否被中断,不清除中断标志位。
static boolean interrupted():判断线程是否被中断,并清除中断标志位。

2 3 调用的实际上是同一个方法,只是 interrupted() 方法调用时多传了个参数

如果线程被阻塞(wait、sleep、join),调用 interrupt() 方法会清空中断标识并抛出 InterruptedException 异常,如果线程没有被阻塞,调用 interrupt() 方法只是设置中断标志位为 true。

3.3.2 LockSupport

3.3.2.1 线程阻塞和唤醒

sychronized 和 wait/notify 方法是一种阻塞和唤醒线程的方式,但是这种方式有一些缺点:

  • wait/notify 方法必须在 synchronized 方法或者代码块中使用。
  • notify 放在 wait 之前,可能会导致线程一直阻塞。

Condition 接口中的 await() 和 signal() 方法也是一种阻塞和唤醒线程的方式,但是 Condition 接口是在 Lock 接口中的,必须先获取锁才能使用。也需要先 signal 再 await,否则会导致线程一直阻塞。

LockSupport 的 park() 和 unpark() 方法可以阻塞和唤醒线程,不需要获取锁,也不需要在 synchronized 方法或者代码块中使用。先后顺序也没影响。

许可证机制:park() 方法会消耗许可证,unpark() 方法会释放许可证,许可证最多只有一个,多次调用 unpark() 方法也只会有一个许可证。

3.4 JMM

JMM(Java 内存模型)主要定义了对于一个共享变量,当另一个线程对这个共享变量执行写操作后,这个线程对这个共享变量的可见性。

3.4.1 为什么需要 JMM

CPU 和物理内存之间的速度差异很大,为了解决这个问题,引入了缓存,每个 CPU 都有自己的缓存,缓存中存储了主内存中的一部分数据,当 CPU 对数据进行操作时,会先将数据加载到缓存中,然后再进行操作,操作完成后再将数据写回主内存。
JMM 就是为了解决这个问题,定义了一套规范,保证了多线程之间对共享变量的可见性。屏蔽了底层硬件和操作系统的差异。

3.4.2 JMM 规范下,三大特性

  • 原子性:一次操作或者多次操作,要么所有的操作全部都得到执行并且不会受到任何因素的干扰而中断,要么都不执行。在 Java 中,可以借助 synchronized、各种 Lock 以及各种原子类实现原子性。synchronized 和各种 Lock 可以保证任一时刻只有一个线程访问该代码块,因此可以保障原子性。各种原子类是利用 CAS (compare and swap) 操作(可能也会用到 volatile 或者 final 关键字)来保证原子操作。

  • 可见性:当一个线程对共享变量进行了修改,那么另外的线程都是立即可以看到修改后的最新值。在 Java 中,可以借助 synchronized、volatile 以及各种 Lock 实现可见性。如果我们将变量声明为 volatile ,这就指示 JVM,这个变量是共享且不稳定的,每次使用它都到主存中进行读取。

  • 有序性:由于指令重排序问题,代码的执行顺序未必就是编写代码时候的顺序。指令重排序可以保证串行语义一致,但是没有义务保证多线程间的语义也一致 ,所以在多线程下,指令重排序可能会导致一些问题。在 Java 中,volatile 关键字可以禁止指令进行重排序优化。

3.4.3 happens-before 规则

总原则:

  • 如果一个操作 happpens-before 另一个操作,那么第一个操作的执行结果对第二个操作是可见的,即第一个操作的执行顺序排在第二个操作之前。
  • 两个操作之间存在 happens-before 关系,并不意味着前一个操作必须要在后一个操作之前执行!如果重排序之后的执行结果,与按 happens-before 关系来执行的结果一致,那么这种重排序并不非法。

八条规则:

  1. 程序次序规则:一个线程内,按照代码顺序,书写在前面的操作先行发生于书写在后面的操作。
  2. 锁定规则:一个 unLock 操作先行发生于后面对同一个锁的 lock 操作。
  3. volatile 变量规则:对一个 volatile 变量的写操作先行发生于后面对这个变量的读操作。
  4. 传递性规则:如果 A 先行发生于 B,且 B 先行发生于 C,那么 A 先行发生于 C。
  5. 线程启动规则:Thread 对象的 start() 方法先行发生于此线程的每一个动作。
  6. 线程中断规则:对线程 interrupt() 方法的调用先行发生于被中断线程的代码检测到中断事件的发生。
  7. 线程终结规则:线程中所有操作都先行发生于线程的终结,可以通过 isAlive()判断线程是否已经终结。
  8. 对象终结规则:一个对象的初始化完成(构造函数执行结束)先行发生于它的 finalize() 方法的开始。

3.4.4 volatile 关键字

3.4.4.1 volatile 的两个特性

两个特性: 可见性有序性(禁止指令重排序)
在 Java 中,volatile 关键字可以保证变量的可见性,如果我们将变量声明为 volatile ,这就指示 JVM,这个变量是共享且不稳定的,每次使用它都到主存中进行读取。

简单来说就是 volatile 写内存是直接刷新到主内存当中,而读内存是把本地内存设置为无效并直接从主内存中读取。

内存屏障(Memory Barrier,或有时叫做内存栅栏,Memory Fence)是一种 CPU 指令,用来禁止处理器指令发生重排序(像屏障一样),从而保障指令执行的有序性。另外,为了达到屏障的效果,它也会使处理器写入、读取值之前,将主内存的值写入高速缓存,清空无效队列,从而保障变量的可见性。

3.4.4.2 volatile 没有原子性

因为 sum++不是原子操作,它包括读取变量的值,对这个值加 1,然后将结果写回到内存。如果多个线程同时执行 sum++,就会出现问题。
如下代码,多个线程同时执行 sum++,最后的结果小于 10000。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class LockDemo {
public static int sum = 0;
public static void inc() {
sum++;
}
public static void main(String[] args) throws InterruptedException {

for(int j = 0; j < 10; j++)
{
new Thread(() ->{
for(int i = 0; i < 1000; i++) inc();
}).start();
}
Thread.sleep(1000);

System.out.println(sum);
}
}

3.4.4.3 volatile 的使用场景

  • 单一赋值可以,但是复合运算不可以。
  • 状态标记量。
  • 开销较低的读,写锁策略
  • double check。

3.4.4.4 小结

  • volatile 写之前的操作,都禁止重排序到 volatile 写之后。
  • volatile 读之后的操作,都禁止重排序到 volatile 读之前。
  • volatile 写之后 volatile 读之前,都禁止重排序。

3.5 CAS

3.5.1 CAS 介绍

CAS 的全称是 Compare And Swap(比较与交换) ,用于实现乐观锁,被广泛应用于各大框架中。CAS 的思想很简单,就是用一个预期值和要更新的变量值进行比较,两值相等才会进行更新。

CAS 涉及到三个操作数:

  • V:要更新的变量值(Var)
  • E:预期值(Expected)
  • N:拟写入的新值(New)

当且仅当 V 的值等于 E 时,CAS 通过原子方式用新值 N 来更新 V 的值。如果不等,说明已经有其它线程更新了 V,则当前线程放弃更新。

CAS 是一个原子操作,底层依赖于一条 CPU 的原子指令。
原子操作 即最小不可拆分的操作,也就是说操作一旦开始,就不能被打断,直到操作完成。

使用示例

1
2
3
4
5
6
7
8
9
10
import java.util.concurrent.atomic.AtomicInteger;

public class CASDemo {
public static void main(String[] args) {
AtomicInteger atomicInteger = new AtomicInteger(5);
System.out.println(atomicInteger.compareAndSet(5, 2025) + "\t current data: " + atomicInteger.get());
System.out.println(atomicInteger.compareAndSet(5, 2025) + "\t current data: " + atomicInteger.get());
}
}

输出结果

1
2
true	 current data: 2025
false current data: 2025

3.5.2 CAS 原理

3.5.3.1 unsafe 类

是 CAS 的核心类,由于 Java 方法无法直接访问底层系统,需要通过本地(native)方法来访问,unsafe 类提供了这种机制。

Unsafe 类位于 sun.misc 包下,是一个提供低级别、不安全操作的类。由于其强大的功能和潜在的危险性,它通常用于 JVM 内部或一些需要极高性能和底层访问的库中,而不推荐普通开发者在应用程序中使用。

3.5.3.2 ABA 问题

如果一个变量 V 初次读取的时候是 A 值,并且在准备赋值的时候检查到它仍然是 A 值,那我们就能说明它的值没有被其他线程修改过了吗?很明显是不能的,因为在这段时间它的值可能被改为其他值,然后又改回 A,那 CAS 操作就会误认为它从来没有被修改过。这个问题被称为 CAS 操作的 “ABA”问题。

解决:版本号机制

ABA 问题的解决思路是在变量前面追加上版本号或者时间戳。JDK 1.5 以后的 AtomicStampedReference 类就是用来解决 ABA 问题的,其中的 compareAndSet() 方法就是首先检查当前引用是否等于预期引用,并且当前标志是否等于预期标志,如果全部相等,则以原子方式将该引用和该标志的值设置为给定的更新值。

3.6 原子类

原子类是一种提供了原子操作的类,可以保证线程安全。原子类的底层是通过 CAS 操作来保证线程安全的。

3.6.1 基本类型原子

  • AtomicInteger:整型原子类
  • AtomicLong:长整型原子类
  • AtomicBoolean:布尔型原子类
    他们的方法用法类似
1
2
3
4
5
6
7
public final int get() //获取当前的值
public final int getAndSet(int newValue)//获取当前的值,并设置新的值
public final int getAndIncrement()//获取当前的值,并自增
public final int getAndDecrement() //获取当前的值,并自减
public final int getAndAdd(int delta) //获取当前的值,并加上预期的值
boolean compareAndSet(int expect, int update) //如果输入的数值等于预期值,则以原子方式将该值设置为输入值(update)
public final void lazySet(int newValue)//最终设置为newValue, lazySet 提供了一种比 set 方法更弱的语义,可能导致其他线程在之后的一小段时间内还是可以读到旧的值,但可能更高效。

顺便介绍一下 countDownLatch 使用:
CountDownLatch 是一个同步辅助类,它允许一个或多个线程等待,直到在其他线程中执行的一组操作完成。它通过一个计数器来实现,计数器的初始值为线程的数量。每当一个线程完成了它的工作后,计数器的值就会减一。当计数器的值变为零时,表示所有线程都已经完成了工作,等待的线程就可以继续执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;

class MyNumber{
AtomicInteger atomicInteger = new AtomicInteger();
public void add(){
atomicInteger.getAndIncrement();
}
}

public class CASDemo {
private static final int SIZE = 50;
public static void main(String[] args) throws InterruptedException {
MyNumber myNumber = new MyNumber();
CountDownLatch countDownLatch = new CountDownLatch(SIZE);
for(int i = 1; i <= SIZE; i++){
new Thread(() ->{
try {
for (int j = 0; j < 10000; j++) {
myNumber.add();
}
} finally {
countDownLatch.countDown();
}
}, String.valueOf(i)).start();
}
countDownLatch.await();
System.out.println(Thread.currentThread().getName() + " " + myNumber.atomicInteger.get());
}
}

3.6.2 数组类型原子类

使用原子的方式更新数组里的某个元素

  • AtomicIntegerArray:整形数组原子类
  • AtomicLongArray:长整形数组原子类
  • AtomicReferenceArray:引用类型数组原子类

常用方法, 以 AtomicIntegerArray 为例:

1
2
3
4
5
6
7
public final int get(int i) //获取 index=i 位置元素的值
public final int getAndSet(int i, int newValue)//返回 index=i 位置的当前的值,并将其设置为新值:newValue
public final int getAndIncrement(int i)//获取 index=i 位置元素的值,并让该位置的元素自增
public final int getAndDecrement(int i) //获取 index=i 位置元素的值,并让该位置的元素自减
public final int getAndAdd(int i, int delta) //获取 index=i 位置元素的值,并加上预期的值
boolean compareAndSet(int i, int expect, int update) //如果输入的数值等于预期值,则以原子方式将 index=i 位置的元素值设置为输入值(update)
public final void lazySet(int i, int newValue)//最终 将index=i 位置的元素设置为newValue,使用 lazySet 设置之后可能导致其他线程在之后的一小段时间内还是可以读到旧的值。

3.6.3 引用类型原子类

基本类型原子类只能更新一个变量,如果需要原子更新多个变量,需要使用 引用类型原子类。

  • AtomicReference:引用类型原子类
  • AtomicStampedReference:原子更新带有版本号的引用类型。该类将整数值与引用关联起来,可用于解决原子的更新数据和数据的版本号,可以解决使用 CAS 进行原子更新时可能出现的 ABA 问题。(修改过几次)
  • AtomicMarkableReference:原子更新带有标记的引用类型。该类将 boolean 标记与引用关联起来(是否修改过)

以 AtomicReference 为例,常用方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// Person 类
class Person {
private String name;
private int age;
//省略getter/setter和toString
}


// 创建 AtomicReference 对象并设置初始值
AtomicReference<Person> ar = new AtomicReference<>(new Person("SnailClimb", 22));

// 打印初始值
System.out.println("Initial Person: " + ar.get().toString());

// 更新值
Person updatePerson = new Person("Daisy", 20);
ar.compareAndSet(ar.get(), updatePerson);

// 打印更新后的值
System.out.println("Updated Person: " + ar.get().toString());

// 尝试再次更新
Person anotherUpdatePerson = new Person("John", 30);
boolean isUpdated = ar.compareAndSet(updatePerson, anotherUpdatePerson);

// 打印是否更新成功及最终值
System.out.println("Second Update Success: " + isUpdated);
System.out.println("Final Person: " + ar.get().toString());

3.6.4 对象的属性修改类型原子类

如果需要原子更新某个类里的某个字段时,需要用到对象的属性修改类型原子类。

  • AtomicIntegerFieldUpdater:原子更新整形字段的更新器
  • AtomicLongFieldUpdater:原子更新长整形字段的更新器
  • AtomicReferenceFieldUpdater:原子更新引用类型里的字段的更新器

要想原子地更新对象的属性需要两步。
第一步,因为对象的属性修改类型原子类都是抽象类,所以每次使用都必须使用静态方法 newUpdater()创建一个更新器,并且需要设置想要更新的类和属性。
第二步,更新的对象属性必须使用 public volatile 修饰符。

示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;

class Account{
//int money = 0;
public volatile int money = 0;
public void add(){
money++;
}
AtomicIntegerFieldUpdater<Account> atomicIntegerFieldUpdater = AtomicIntegerFieldUpdater.newUpdater(Account.class, "money");
public void transMonney(Account account){
atomicIntegerFieldUpdater.getAndIncrement(account);
}
}

public class CASDemo {
private static final int SIZE = 50;
public static void main(String[] args) throws InterruptedException {
Account account = new Account();
CountDownLatch countDownLatch = new CountDownLatch(SIZE);
for(int i = 1; i <= SIZE; i++){
new Thread(() ->{
try {
for (int j = 0; j < 10000; j++) {
account.transMonney(account);
}
} finally {
countDownLatch.countDown();
}
}, String.valueOf(i)).start();
}
countDownLatch.await();
System.out.println(Thread.currentThread().getName() + " " + account.money);
}
}

3.6.5 增强原子类

优点: 高并发性能更好,适用于高并发场景。

  • LongAdder:Long 类型的原子累加器
  • DoubleAdder:Double 类型的原子累加器
  • LongAccumulator:Long 类型的累加器
  • DoubleAccumulator:Double 类型的累加器

LongAdder 和 LongAccumulator 的区别:

  • LongAdder:适用于高并发下的累加操作,内部会维护一个 Cell 数组,每个 Cell 里面维护一个数值,多线程更新时,会通过 Hash 算法映射到具体的 Cell 上,然后更新 Cell 的值,最后将所有 Cell 的值累加得到最终的结果。
  • LongAccumulator:适用于高并发下的累加操作,内部会维护一个函数 function 和一个初始值 identity,多线程更新时,会通过函数 function 将当前值和新值进行计算,最后得到最终的结果。

3.7 ThreadLocal

3.7.1 ThreadLocal 介绍

通常情况下,我们创建的变量可以被任何一个线程访问和修改。这在多线程环境中可能导致数据竞争和线程安全问题。那么,如果想让每个线程都有自己的专属本地变量,该如何实现呢?

JDK 中提供的 ThreadLocal 类正是为了解决这个问题。ThreadLocal 类允许每个线程绑定自己的值,可以将其形象地比喻为一个“存放数据的盒子”。每个线程都有自己独立的盒子,用于存储私有数据,确保不同线程之间的数据互不干扰。当你创建一个 ThreadLocal 变量时,每个访问该变量的线程都会拥有一个独立的副本。这也是 ThreadLocal 名称的由来。线程可以通过 get() 方法获取自己线程的本地副本,或通过 set() 方法修改该副本的值,从而避免了线程安全问题

3.7.2 ThreadLocal 使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
import java.util.Random;

class House{
int saleCount = 0;
public synchronized void sale(){
saleCount++;
}
ThreadLocal<Integer> threadLocal = ThreadLocal.withInitial(() -> 0);
public void saleVolume(){
threadLocal.set(1 + threadLocal.get());
}
}

public class ThreadLocalDemo {
public static void main(String[] args) {
House house = new House();
for (int i = 0; i < 5; i++) {
new Thread(() -> {
try {
int size = new Random().nextInt(5) + 1;
for (int j = 0; j < size; j++) {
house.saleVolume();
house.sale();
}
// 获取ThreadLocal中的值
System.out.println(Thread.currentThread().getName() + " saleCount = " + house.threadLocal.get());
} finally {
house.threadLocal.remove(); // 防止内存泄漏, 用完一定要记得remove
}
}, String.valueOf(i)).start();
}

try {
Thread.sleep(2000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("总计销售量:" + house.saleCount);
}
}


3.7.3 ThreadLocal 原理

1
2
3
4
5
6
7
8
9
public class Thread implements Runnable {
//......
//与此线程有关的ThreadLocal值。由ThreadLocal类维护
ThreadLocal.ThreadLocalMap threadLocals = null;

//与此线程有关的InheritableThreadLocal值。由InheritableThreadLocal类维护
ThreadLocal.ThreadLocalMap inheritableThreadLocals = null;
//......
}

从上面 Thread 类 源代码可以看出 Thread 类中有一个 threadLocals 和 一个 inheritableThreadLocals 变量,它们都是 ThreadLocalMap 类型的变量,我们可以把 ThreadLocalMap 理解为 ThreadLocal 类实现的定制化的 HashMap。默认情况下这两个变量都是 null,只有当前线程调用 ThreadLocal 类的 set 或 get 方法时才创建它们,实际上调用这两个方法的时候,我们调用的是 ThreadLocalMap 类对应的 get()、set()方法。

最终的变量是放在了当前线程的 ThreadLocalMap 中,并不是存在 ThreadLocal 上,ThreadLocal 可以理解为只是 ThreadLocalMap 的封装,传递了变量值。
ThrealLocal 类中可以通过 Thread.currentThread()获取到当前线程对象后,直接通过 getMap(Thread t)可以访问到该线程的 ThreadLocalMap 对象。
每个 Thread 中都具备一个 ThreadLocalMap,而 ThreadLocalMap 可以存储以 ThreadLocal 为 key ,Object 对象为 value 的键值对。

3.7.4 内存泄漏

ThreadLocal 内存泄漏的根本原因在于其内部实现机制。每个线程维护一个名为 ThreadLocalMap 的 map。 当你使用 ThreadLocal 存储值时,实际上是将值存储在当前线程的 ThreadLocalMap 中,其中 ThreadLocal 实例本身作为 key,而你要存储的值作为 value。

ThreadLocalMap 的 key 和 value 引用机制:

  • key 是弱引用:ThreadLocalMap 中的 key 是 ThreadLocal 的弱引用 (WeakReference<ThreadLocal<?>>)。 这意味着,如果 ThreadLocal 实例不再被任何强引用指向,垃圾回收器会在下次 GC 时回收该实例,导致 ThreadLocalMap 中对应的 key 变为 null。
  • value 是强引用:ThreadLocalMap 中的 value 是强引用。 即使 key 被回收(变为 null),value 仍然存在于 ThreadLocalMap 中,被强引用,不会被回收。

所以,用完 ThreadLocal 后,一定要记得调用 remove() 方法。remove 方法中有一个方法调用 expungeStaleEntry(),会将 key 为 null 的 Entry 的 value 设置为 null,这样就可以被 GC 回收了。

各种引用类型的特点:
强引用:只要强引用存在,垃圾回收器永远不会回收被引用的对象。
软引用:只有在内存不足的时候,才会回收软引用指向的对象。
弱引用:只要发生垃圾回收,无论内存是否充足,都会回收弱引用指向的对象。
虚引用:用来跟踪对象被垃圾回收器回收的活动,不能单独使用,必须与引用队列(ReferenceQueue)联合使用。它的引用形同虚设,就是在对象被回收时,会收到一个通知或者进一步的处理,比 finalize 更灵活。

3.7.5 最佳实践

  • 推荐定义成 static,这样 ThreadLocal 变量只有一个实例,节省内存。
  • 初始化,避免空指针异常
  • 使用完 ThreadLocal 后,记得调用 remove() 方法,清除数据,避免内存泄漏。

3.8 synchronized 与 锁升级

(比较难的部分, 之后再补充)
无锁 -> 偏向锁 -> 轻量级锁 -> 重量级锁

3.9 AQS(AbstractQueuedSynchronizer, 抽象队列同步器)

AQS 核心思想是,如果被请求的共享资源空闲,则将当前请求资源的线程设置为有效的工作线程,并且将共享资源设置为锁定状态。如果被请求的共享资源被占用,那么就需要一套线程阻塞等待以及被唤醒时锁分配的机制,这个机制 AQS 是基于 CLH 锁 (Craig, Landin, and Hagersten locks) 进一步优化实现的。
CLH 锁 对自旋锁进行了改进,是基于单链表的自旋锁。在多线程场景下,会将请求获取锁的线程组织成一个单向队列,每个等待的线程会通过自旋访问前一个线程节点的状态,前一个节点释放锁之后,当前节点才可以获取锁。CLH 锁 的队列结构如下图所示。

AQS 中使用的 等待队列 是 CLH 锁队列的变体(接下来简称为 CLH 变体队列)。
AQS 的 CLH 变体队列是一个双向队列,会暂时获取不到锁的线程将被加入到该队列中,CLH 变体队列和原本的 CLH 锁队列的区别主要有两点:

  • 由 自旋 优化为 自旋 + 阻塞 :自旋操作的性能很高,但大量的自旋操作比较占用 CPU 资源,因此在 CLH 变体队列中会先通过自旋尝试获取锁,如果失败再进行阻塞等待。
  • 由 单向队列 优化为 双向队列 :在 CLH 变体队列中,会对等待的线程进行阻塞操作,当队列前边的线程释放锁之后,需要对后边的线程进行唤醒,因此增加了 next 指针,成为了双向队列。

AQS 将每条请求共享资源的线程封装成一个 CLH 变体队列的一个结点(Node)来实现锁的分配。在 CLH 变体队列中,一个节点表示一个线程,它保存着线程的引用(thread)、 当前节点在队列中的状态(waitStatus)、前驱节点(prev)、后继节点(next)。

AQS 使用 int 成员变量 state 表示同步状态,通过内置的 线程等待队列 来完成获取资源线程的排队工作。
另外,状态信息 state 可以通过 protected 类型的 getState()、setState()和 compareAndSetState() 进行操作。并且,这几个方法都是 final 修饰的,在子类中无法被重写。

3.10 ReentrantLock、ReentrantReadWriteLock、StampedLock

3.10.1 ReentrantLock

ReentrantLock 实现了 Lock 接口,是一个可重入且独占式的锁,和 synchronized 关键字类似。不过,ReentrantLock 更灵活、更强大,增加了轮询、超时、中断、公平锁和非公平锁等高级功能。

问题:只允许一个线程读取共享资源,其他线程必须等待,效率较低。

3.10.2 ReentrantReadWriteLock

ReentrantReadWriteLock 实现了 ReadWriteLock ,是一个可重入的读写锁,既可以保证多个线程同时读的效率,同时又可以保证有写入操作时的线程安全。

  • 一般锁进行并发控制的规则:读读互斥、读写互斥、写写互斥。
  • 读写锁进行并发控制的规则:读读不互斥、读写互斥、写写互斥(只有读读不互斥)。

ReentrantReadWriteLock 其实是两把锁,一把是 WriteLock (写锁),一把是 ReadLock(读锁) 。读锁是共享锁,写锁是独占锁。读锁可以被同时读,可以同时被多个线程持有,而写锁最多只能同时被一个线程持有。和 ReentrantLock 一样,ReentrantReadWriteLock 底层也是基于 AQS 实现的。

存在问题:允许多个线程读取资源,但是存在写锁饥饿问题,即写线程一直获取不到写锁,导致读线程一直获取到读锁,写线程一直获取不到写锁,导致写线程一直无法执行。

锁降级:

  • 写锁可以降级为读锁,但是读锁不能升级为写锁。
  • 一个线程占有了写锁,可以获取读锁,释放写锁,这样就实现了锁降级。
  • 读没有完成时,写是不能获取锁的。必须等待读锁释放。

3.10.3 StampedLock

StampedLock 是 JDK 1.8 引入的性能更好的读写锁,不可重入且不支持条件变量 Condition。不同于一般的 Lock 类,StampedLock 并不是直接实现 Lock 或 ReadWriteLock 接口,而是基于 CLH 锁 独立实现的(AQS 也是基于这玩意)。

StampedLock 提供了三种模式的读写控制模式:读锁、写锁和乐观读。

  • 写锁:独占锁,一把锁只能被一个线程获得。当一个线程获取写锁后,其他请求读锁和写锁的线程必须等待。类似于 ReentrantReadWriteLock 的写锁,不过这里的写锁是不可重入的。
  • 读锁 (悲观读):共享锁,没有线程获取写锁的情况下,多个线程可以同时持有读锁。如果己经有线程持有写锁,则其他线程请求获取该读锁会被阻塞。类似于 ReentrantReadWriteLock 的读锁,不过这里的读锁是不可重入的。
  • 乐观读:允许多个线程获取乐观读以及读锁。同时允许一个写线程获取写锁。但是需要对获取的数据进行校验,如果数据被修改,则获取失败,需要重试。

相比于传统读写锁多出来的乐观读是 StampedLock 比 ReadWriteLock 性能更好的关键原因。StampedLock 的乐观读允许一个写线程获取写锁,所以不会导致所有写线程阻塞,也就是当读多写少的时候,写线程有机会获取写锁,减少了线程饥饿的问题,吞吐量大大提高。

参考资料:


并发编程
https://kongshuilinhua.github.io/2025/03/13/并发编程/
作者
FireFLy
发布于
2025年3月13日
许可协议