本篇文章重点介绍JUC(java.util.concurrent)

JUC是”java.util.concurrent”包的简称,它是Java提供的一个并发工具包,旨在简化多线程编程,提供了丰富的类和接口来帮助开发者更高效、更安全地编写并发程序。JUC包增强了Java对并发的支持,解决了传统多线程编程中的一些难题,如死锁、竞争条件和资源管理等。

img点击并拖拽以移动

原子变量

img点击并拖拽以移动 基本类型原子变量

AtomicInteger

  • 提供对整型值的原子操作,如加法、减法等。
  • 方法示例:incrementAndGet(), decrementAndGet(), addAndGet(int delta), compareAndSet(int expect, int update)。

AtomicLong

  • 类似于AtomicInteger,但是针对长整型(long)值。
  • 方法与AtomicInteger相似,适用于需要处理较大数值的情况。

AtomicBoolean

  • 支持布尔类型的原子操作。
  • 方法示例:get(), set(boolean newValue), compareAndSet(boolean expect, boolean update)。

对象引用原子变量

AtomicReference

  • 用于对象引用的原子更新。
  • 方法示例:get(), set(V newValue), compareAndSet(V expect, V update)。

AtomicStampedReference

  • 带版本号的对象引用原子变量,解决ABA问题。
  • 方法示例:get(), set(V newValue, int newStamp), compareAndSet(V expectedReference, V newReference, int expectedStamp, int newStamp)。

AtomicMarkableReference

  • 带标记位的对象引用原子变量,适用于需要记录是否发生过变化的情况。
  • 方法示例:getReference(), isMarked(), compareAndSet(V expectedReference, V newReference, boolean expectedMark, boolean newMark)。

数组类型的原子变量

AtomicIntegerArray

  • 提供对整型数组元素的原子操作。
  • 方法示例:get(int i), set(int i, int newValue), incrementAndGet(int i), compareAndSet(int i, int expect, int update)。

AtomicLongArray

  • 类似于AtomicIntegerArray,但针对长整型数组。
  • 方法与AtomicIntegerArray相似。

AtomicReferenceArray

  • 提供对对象引用数组元素的原子操作。
  • 方法示例:get(int i), set(int i, V newValue), compareAndSet(int i, V expect, V update)。

字段更新器

AtomicIntegerFieldUpdater

  • 提供对现有对象字段进行原子更新的能力,无需直接使用原子变量。
  • 使用时需注意字段必须是volatile类型,并且不能是private。

AtomicLongFieldUpdater

  • 类似于AtomicIntegerFieldUpdater,但适用于长整型字段。

AtomicReferenceFieldUpdater<T, V>

  • 提供对对象引用字段的原子更新能力。

AtomicStampedReference

介绍:

解决了ABA问题的一种原子引用实现,通过引入版本号(称为“邮票”)来区分即使是相同的对象引用也可能是不同的状态。

典型应用场景:解决复杂的并发控制问题,特别是在可能出现ABA问题的场合。
常用方法:getReference() 和 getStamp() 获取引用及其版本号;compareAndSet(V expectedReference, V newReference, int expectedStamp, int newStamp) 进行带有版本检查的更新。

ABA问题的描述

假设有一个共享变量A,其初始值为A。现在有两个线程Thread 1和Thread 2同时操作这个变量:

  • Thread 1读取变量的值A。
  • 在Thread 1执行其他操作期间,Thread 2首先将变量的值从A改为B,然后又改回A。
  • 当Thread 1尝试使用类似compareAndSet(A, newValue)的操作来更新变量时,它会发现当前变量的值仍然是A,因此认为在这段时间内没有其他线程修改过该变量,并继续执行更新操作。
  • 但实际上,在这段时间里,变量已经经历了从A到B再到A的变化过程,这可能导致程序逻辑上的错误或数据不一致的情况。

ABA问题的具体场景

设想有三个线程:Thread 1、Thread 2 和 Thread 3,以及初始栈结构如下:

栈顶 -> A(10) -> B(20) -> null

现在考虑以下执行顺序:

1.Thread 1 开始执行pop()操作:

  • 它读取到当前栈顶为A(10)。
  • 在获取A.next之前被抢占。

2.Thread 2 执行两次pop()操作

  • 弹出了A(10)和B(20),然后又压入了C(30)和A(10),使得栈变为:栈顶 -> A(10) -> C(30) -> null

3.Thread 1 恢复执行:

  • 它继续执行,发现A.next指向的是B(20)(这是它最初看到的状态),但实际上现在的A.next指向的是C(30)。
  • 当它尝试使用CAS将栈顶从A(10)改为B(20)时,由于此时栈顶确实是A(10),CAS操作成功。
  • 结果是,栈变成了B(20) -> C(30) -> null,而实际上应该保持为A(10) -> C(30) -> null。

源码分析

构造方法

1
2
3
4
5
6
7
8
9
10
11
12
/**
* 初始化一个带有时间戳的原子引用对象
*
* @param initialRef 初始引用对象,表示要被引用的对象
* @param initialStamp 初始时间戳,用于检测和防止虚假冲突
*
* 使用Pair.of方法来创建并初始化一个Pair对象,该Pair对象包含了引用对象和时间戳
* 这个构造方法允许用户在创建AtomicStampedReference对象时指定初始的引用对象和时间戳
*/
public AtomicStampedReference(V initialRef, int initialStamp) {
pair = Pair.of(initialRef, initialStamp);
}

点击并拖拽以移动

这个pair通过volitaile修饰的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
/**
* 一个泛型类,用于存储一个引用及其关联的版本戳。
* 主要用途是封装对实际对象的引用和一个表示版本的整数,
* 允许程序通过比较版本戳来确定引用是否被修改。
*
* @param <T> 引用的类型,表示该类可以用于任何对象类型。
*/
private static class Pair<T> {
final T reference; // 存储的引用对象
final int stamp; // 关联的版本戳

/**
* 构造函数,用于创建 Pair 对象。
*
* @param reference 要存储的对象引用。
* @param stamp 关联的版本戳,用于表示对象的状态或版本。
*/
private Pair(T reference, int stamp) {
this.reference = reference;
this.stamp = stamp;
}

/**
* 静态工厂方法,用于创建 Pair 对象。
* 该方法提供了一种更灵活的方式来创建 Pair 对象,简化了实例化过程。
*
* @param reference 要存储的对象引用。
* @param stamp 关联的版本戳。
* @return 返回一个包含给定引用和版本戳的 Pair 对象。
*/
static <T> Pair<T> of(T reference, int stamp) {
return new Pair<T>(reference, stamp);
}
}

/**
* 一个 volatile 的 Pair 对象,用于存储对象引用及其对应的版本戳。
* 声明为 volatile 以确保多个线程可以安全地访问这个 Pair 对象,
* 维护对该变量的写操作的可见性和顺序性。
*/
private volatile Pair<V> pair;

点击并拖拽以移动

compareAndSet

/**
* 使用原子方式比较并设置引用和戳
* 此方法用于在当前引用和戳与预期值匹配时,将引用和戳更新为新值
* 它是实现非阻塞算法的关键,特别是在并发控制中
*
* @param expectedReference 预期的引用值
* @param newReference 新的引用值
* @param expectedStamp 预期的戳值
* @param newStamp 新的戳值
* @return 如果更新成功则返回true,否则返回false
*/
public boolean compareAndSet(V expectedReference,
V newReference,
int expectedStamp,
int newStamp) {
// 获取当前的引用和戳对
Pair current = pair;
// 检查当前的引用和戳是否与预期值匹配
// 如果匹配,则进一步检查新值是否与当前值相同,或尝试使用CAS操作更新值
return
expectedReference == current.reference &&
expectedStamp == current.stamp &&
((newReference == current.reference &&
newStamp == current.stamp) ||
casPair(current, Pair.of(newReference, newStamp)));
}

weakCompareAndSet

/**
* 使用弱一致性比较并设置引用和戳
* 此方法与compareAndSet方法类似,但它使用弱一致性,这意味着在某些情况下,
* 它可能返回false,即使比较成功,这通常是为了提高性能
*
* @param expectedReference 预期的引用值
* @param newReference 新的引用值,如果预期引用匹配则设置此值
* @param expectedStamp 预期的戳值
* @param newStamp 新的戳值,如果预期戳匹配则设置此值
* @return 如果设置操作成功则返回true,否则返回false
*/
public boolean weakCompareAndSet(V expectedReference,
V newReference,
int expectedStamp,
int newStamp) {
return compareAndSet(expectedReference, newReference,
expectedStamp, newStamp);
}

attemptStamp

/**
* 尝试更新引用对象的版本号
* 此方法旨在更新引用对象的版本号(stamp),以实现更复杂的同步或版本控制逻辑
* 它首先检查当前引用对象是否与预期的引用对象匹配,然后尝试更新版本号如果版本号不匹配,
* 它会尝试使用CAS(Compare-And-Swap)操作来更新引用对象和版本号
*
* @param expectedReference 预期的引用对象,即我们期望当前引用对象所指向的对象
* @param newStamp 新的版本号,我们尝试更新引用对象到这个版本号
* @return 如果成功更新引用对象的版本号,则返回true;否则返回false
*/
public boolean attemptStamp(V expectedReference, int newStamp) {
// 获取当前的引用对象和版本号对
Pair current = pair;
// 检查预期的引用对象是否与当前引用对象匹配,并尝试更新版本号
return
expectedReference == current.reference &&
(newStamp == current.stamp ||
casPair(current, Pair.of(expectedReference, newStamp)));
}

Unsafe机制

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
// Unsafe 机制
// 使用 sun.misc.Unsafe 进行低级别的操作,绕过 Java 语言的安全检查。
// 注意:sun.misc.Unsafe 的使用通常不被推荐,因为它可能导致稳定性与安全性问题,并且它不属于标准 Java API。

private static final sun.misc.Unsafe UNSAFE = sun.misc.Unsafe.getUnsafe();
// 获取 AtomicStampedReference 中 "pair" 字段的内存偏移量
private static final long pairOffset =
objectFieldOffset(UNSAFE, "pair", AtomicStampedReference.class);

/**
* 尝试原子地更新当前实例的 pair 字段。
* 该方法使用 Unsafe 执行比较并交换(CAS)操作,这是实现无锁算法的关键部分。
* 它通过确保更新是原子的来保证线程安全:仅当当前值与预期值相同时,更新才会成功。
*
* @param cmp 预期的旧值
* @param val 要设置的新值
* @return 如果更新成功则返回 true,否则返回 false
*/
private boolean casPair(Pair<V> cmp, Pair<V> val) {
return UNSAFE.compareAndSwapObject(this, pairOffset, cmp, val);
}

/**
* 获取指定类中指定字段的内存偏移量。
* 该方法通过查找字段的确切位置,为低级别操作做准备。
* 它是一个辅助函数,简化了获取字段偏移量的过程。
*
* @param UNSAFE 用于低级别操作的 Unsafe 实例
* @param field 目标字段的名称
* @param klazz 包含目标字段的类
* @return 字段的内存偏移量
* @throws NoSuchFieldError 如果找不到指定字段,则抛出此错误
*/
static long objectFieldOffset(sun.misc.Unsafe UNSAFE,
String field, Class<?> klazz) {
try {
return UNSAFE.objectFieldOffset(klazz.getDeclaredField(field));
} catch (NoSuchFieldException e) {
// 将异常转换为对应的错误
NoSuchFieldError error = new NoSuchFieldError(field);
error.initCause(e);
throw error;
}
}

点击并拖拽以移动

并发集合

集合

ConcurrentHashMap

  • 线程安全的哈希表实现,支持高并发读写操作。

CopyOnWriteArrayList

  • 线程安全的列表,适用于读多写少的场景,写操作时创建底层数组的新副本。

CopyOnWriteArraySet

  • 基于CopyOnWriteArrayList实现的线程安全集合,保证元素唯一性。

ConcurrentLinkedQueue

  • 无界线程安全队列,基于链表结构,支持高效的插入和移除操作。

ConcurrentLinkedDeque

  • 类似于ConcurrentLinkedQueue,但支持双端队列操作。

BlockingQueue接口及其实现类

  • ArrayBlockingQueue:有界的阻塞队列。
  • LinkedBlockingQueue:可选容量限制的阻塞队列。
  • PriorityBlockingQueue:支持优先级排序的无界阻塞队列。
  • DelayQueue:元素只有在延迟期满后才能从队列中取出。
  • SynchronousQueue:不存储元素的阻塞队列,每个插入操作必须等待相应的删除操作,反之亦然。
  • LinkedTransferQueue:实现了TransferQueue接口的无界阻塞队列。

ConcurrentSkipListMap

  • 线程安全的可排序映射,基于跳表数据结构,允许范围查询。

ConcurrentSkipListSet

  • 基于ConcurrentSkipListMap实现的线程安全集合,支持排序。

ConcurrentHashMap.KeySetView

  • ConcurrentHashMap的键视图,提供了一种方式来使用ConcurrentHashMap作为集合。

ConcurrentHashMap

ConcurrentHashMap 是 Java 中 java.util.concurrent 包提供的一个高效且线程安全的哈希表实现。它在多线程环境下提供了比传统的同步集合(如 Collections.synchronizedMap() 或者 Hashtable)更好的并发性能,同时避免了传统锁机制带来的瓶颈问题

特点

1.分段锁机制(Segment Locking)

  • 在早期版本(Java 7及之前),ConcurrentHashMap 使用了一种称为“分段锁”的技术来减少锁竞争。整个哈希表被划分为多个段(segments),每个段实际上是一个小的哈希表,拥有自己的锁。这样,在理想情况下,不同的线程可以同时访问不同段的数据而不会发生冲突。
  • 从Java 8开始,这种设计被替换为更细粒度的锁机制——CAS(Compare-And-Swap)操作结合同步标记(synchronized blocks),这使得大部分读写操作可以在不加锁的情况下完成,进一步提高了并发性能。

2.无阻塞读取

  • ConcurrentHashMap 支持完全并发的读操作,即使有其他线程正在进行写操作或结构化修改(如增加/删除节点)。这是因为它的内部结构允许在不锁定整个表的情况下进行读取。
  • 高效的扩容机制
  • 当哈希表需要扩容时,ConcurrentHashMap 并不是一次性锁定整个表来进行复制和迁移工作,而是将这个过程分散到多次增量更新中完成,从而减少了对系统性能的影响。

3.弱一致性迭代器

  • 迭代器不会抛出 ConcurrentModificationException,并且它们是弱一致性的:它们反映的是某个时间点上的快照,并可能包含已删除的元素,但绝不会返回从未添加过的元素。

4.支持原子操作的方法

  • 提供了一系列支持原子操作的方法,如 putIfAbsent, remove(Object key, Object value), replace(K key, V oldValue, V newValue) 等,这些方法可以帮助开发者编写更加简洁和高效的代码。

扩容机制

触发条件:

扩容通常发生在插入新键值对时,如果当前哈希表的负载因子(即元素数量与桶数组长度的比例)超过了预定阈值。具体来说,当ConcurrentHashMap中的元素数量超过了容量乘以负载因子(loadFactor)时,就会触发扩容操作。默认情况下,负载因子设置为0.75。

扩容过程

1.创建新数组:

  • 首先确定需要扩容后的新容量(通常是原容量的两倍),然后创建一个新的桶数组。

2.迁移数据:

  • 对于旧数组中的每个非空桶(包括链表或红黑树结构),需要将其重新分配到新数组中。由于新数组的大小是原来的两倍,所以每个桶的数据可能被分配到新数组中的两个位置之一。
  • 具体来说,对于一个位于索引i的桶,在新数组中它可能会被放置在i或i + oldCapacity的位置上(其中oldCapacity是旧数组的容量)。

3.并行迁移:

  • 不同于一次性完成所有数据的迁移,ConcurrentHashMap采用了一种增量迁移的方式。这意味着不是由单个线程负责整个迁移过程,而是任何访问到未迁移桶的线程都可以帮助完成这部分桶的迁移工作。
  • 这种设计允许多个线程同时参与扩容过程,从而加速了迁移速度,并减少了对系统性能的影响。

4.完成扩容:

  • 当所有的桶都成功迁移到新的数组中后,旧的数组将被废弃,新的数组成为ConcurrentHashMap的实际存储结构。
  • 在此过程中,ConcurrentHashMap仍然能够处理查询、插入等操作,保证了系统的高可用性。

Executor框架

Executor框架主要组成

Executor接口:

  • 这是最基础的接口,只有一个方法void execute(Runnable command);。它代表一个对象,其职责是执行提交给它的Runnable任务。

ExecutorService接口:

  • 继承自Executor接口,增加了管理服务的功能,如终止现有任务、等待所有已提交任务完成等。提供了更丰富的控制能力,比如提交带返回值的任务(Callable)、批量提交任务等。

AbstractExecutorService类:

  • 为ExecutorService接口提供了一个默认的实现模板,方便开发者创建自定义的执行服务。

ScheduledExecutorService接口:

  • 继承自ExecutorService接口,支持延迟和周期性任务的执行。

Executors工厂类:提供了静态方法用于创建不同类型的线程池,包括但不限于:

  • newFixedThreadPool(int nThreads):创建一个固定大小的线程池。
  • newCachedThreadPool():创建一个可根据需要创建新线程的线程池。
  • newSingleThreadExecutor():创建一个单线程的线程池。
  • newScheduledThreadPool(int corePoolSize):创建一个支持定时及周期性任务执行的线程池。

线程池

概念

线程池是一种用于管理和复用一组线程的技术,它避免了频繁创建和销毁线程所带来的开销。通过预先创建一定数量的线程并将其保持在一个池中,可以显著提升应用程序的性能。

线程池的创建

在JUC中,可以通过多种方式创建线程池,最常用的方式是使用Executors工厂类提供的静态方法,如:

  • newFixedThreadPool(int nThreads):创建一个固定大小的线程池。
  • newCachedThreadPool():创建一个根据需要创建新线程的线程池,但在以前构造的线程可用时将重用它们。
  • newSingleThreadExecutor():创建一个单线程的线程池。
  • newScheduledThreadPool(int corePoolSize):创建一个支持定时及周期性任务执行的线程池。

当然,也可以直接使用ThreadPoolExecutor或ScheduledThreadPoolExecutor来更灵活地配置线程池参数。

线程池工作流程

1.任务提交:

  • 当你向线程池提交一个新的任务(通过调用execute(Runnable command)方法),该任务会被放入队列等待执行。

2.任务分配:

  • 如果当前运行的线程少于核心线程数(corePoolSize),则创建新的线程来处理任务。
  • 如果当前运行的线程数等于或大于核心线程数,则将任务加入到任务队列等待空闲线程处理。
  • 如果任务队列已满,并且当前运行的线程数小于最大线程数(maximumPoolSize),则会创建新的线程来处理任务。
  • 如果线程数已经达到了最大线程数并且任务队列也满了,则根据拒绝策略处理无法执行的任务。

3.任务执行:线程从任务队列中取出任务并执行。
4.线程回收:

  • 当线程完成任务后,如果此时线程池中的线程数超过了核心线程数,并且这些多余的线程在一段时间内(keepAliveTime)没有新的任务可执行,则这些线程将会被终止以节省资源。

5.拒绝策略:

当线程池无法接受新的任务时,会按照预设的拒绝策略进行处理。JUC提供了几种默认的拒绝策略,如:

  • AbortPolicy:抛出RejectedExecutionException异常。
  • CallerRunsPolicy:由提交任务的线程自己运行该任务。
  • DiscardPolicy:直接丢弃任务,不抛出异常。
  • DiscardOldestPolicy:丢弃队列中最老的任务,然后尝试重新提交此任务。

ThreadPoolExecutor

案例引入:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// 创建一个ThreadPoolExecutor线程池
// 核心线程数为2,最大线程数为5
// 空闲线程存活时间为1秒
// 使用ArrayBlockingQueue作为任务队列,容量为3
// 当任务队列已满且线程数达到最大时,使用丢弃策略处理无法执行的任务
ThreadPoolExecutor executor = new ThreadPoolExecutor(
2, 3, 1, TimeUnit.SECONDS, new ArrayBlockingQueue<>(3),
new ThreadPoolExecutor.DiscardPolicy()
);

for (int i = 0; i < 10; i ++) {
int finalI = i;
Runnable task = new Runnable() {
@Override
public void run() {
log.debug("running... task{}", finalI);
}
};
executor.submit(task);
}

System.out.println(executor.getTaskCount());
executor.shutdown();

点击并拖拽以移动

img点击并拖拽以移动

参数说明:

img点击并拖拽以移动

1. corePoolSize(核心线程数)

  • 表示线程池中保持的最小线程数量。
  • 即使这些线程处于空闲状态,也不会被销毁(除非设置了 allowCoreThreadTimeOut(true))。
  • 当提交任务时,如果当前运行的线程数小于 corePoolSize,即使有空闲线程,也会创建新线程来执行任务。

2. maximumPoolSize(最大线程数)

  • 线程池中允许的最大线程数量。
  • 当任务队列已满,并且当前运行的线程数小于 maximumPoolSize 时,线程池会创建新的线程来处理任务,直到达到 maximumPoolSize。
  • 如果线程数已经达到 maximumPoolSize,并且任务队列也满了,则根据拒绝策略处理无法执行的任务。

3. keepAliveTime(空闲线程存活时间)

  • 表示当线程池中的线程数超过 corePoolSize 时,多余空闲线程在终止前等待新任务的最长时间。
  • 时间单位由 unit 参数指定。
  • 默认情况下,keepAliveTime 只会影响超出核心线程数的线程;但可以通过调用 allowCoreThreadTimeOut(true) 方法,使得核心线程也遵循这个规则。

4. unit(时间单位)
keepAliveTime 的时间单位。
常见的时间单位包括:

  • TimeUnit.NANOSECONDS:纳秒
  • TimeUnit.MICROSECONDS:微秒
  • TimeUnit.MILLISECONDS:毫秒
  • TimeUnit.SECONDS:秒
  • TimeUnit.MINUTES:分钟
  • TimeUnit.HOURS:小时
  • TimeUnit.DAYS:天

5. workQueue(任务队列)
用于保存等待执行的任务的阻塞队列。
常见的任务队列类型:

  • ArrayBlockingQueue:基于数组的有界阻塞队列。
  • LinkedBlockingQueue:基于链表的无界阻塞队列(默认容量为 Integer.MAX_VALUE)。
  • SynchronousQueue:不存储元素的队列,每个插入操作必须等待另一个线程的对应移除操作。
  • PriorityBlockingQueue:支持优先级排序的无界阻塞队列。

队列的选择直接影响线程池的工作机制,例如任务排队和线程创建的策略。
6. threadFactory(线程工厂)

  • 用于创建新线程的工厂。
  • 默认使用 Executors.defaultThreadFactory(),它创建的线程具有相同的优先级(NORM_PRIORITY)并设置为非守护线程。
  • 自定义线程工厂可以用来设置线程的名称、优先级、是否为守护线程等。

7. handler(拒绝策略)
当线程池和任务队列都满了时使用的处理策略。
常见的拒绝策略:

  • AbortPolicy:抛出 RejectedExecutionException 异常(默认策略)。
  • CallerRunsPolicy:由提交任务的线程自己执行该任务。
  • DiscardPolicy:直接丢弃任务,不做任何处理。
  • DiscardOldestPolicy:丢弃队列中最老的任务,然后尝试重新提交被拒绝的任务。

也可以自定义拒绝策略,只需要实现 RejectedExecutionHandler 接口。

1. 提交任务

  • execute(Runnable command):用于执行不需要返回结果的任务(即实现了 Runnable 接口的任务)。该方法没有返回值。
  • submit(Runnable task):提交一个不需要返回结果的任务,并返回一个 Future 对象,可以通过这个对象来管理任务的状态或尝试取消任务。
  • submit(Runnable task, T result):类似于 submit(Runnable task),但允许指定一个结果对象,在任务完成时可以获取到这个结果(虽然对于 Runnable 任务,这个结果通常是 null)。
  • submit(Callable task):提交一个需要返回结果的任务(即实现了 Callable 接口的任务),并返回一个 Future 对象,通过这个对象可以获取任务执行的结果。

2. 线程池管理

  • shutdown():启动一次顺序关闭,在这个方法调用之后,线程池不再接受新的任务,但是会继续执行已经在队列中的任务。
  • shutdownNow():试图停止所有正在执行的任务,并暂停处理等待中的任务,返回等待执行的任务列表。
  • awaitTermination(long timeout, TimeUnit unit):阻塞当前线程直到线程池中的所有任务都完成执行,或者超过了指定的时间限制。

3. 状态检查

  • isShutdown():判断线程池是否已经启动了关闭程序(调用了 shutdown() 或 shutdownNow() 方法)。
  • isTerminated():如果调用了 shutdown() 或 shutdownNow() 方法后,所有任务都已完成,则返回 true。

4. 其他实用方法

  • getActiveCount():返回线程池中正在积极执行任务的线程数量。
  • getCompletedTaskCount():返回已执行完毕的任务数。
  • getPoolSize():返回当前线程池中的线程数量,包括空闲线程。
  • getLargestPoolSize():返回线程池曾经创建的最大线程数量。
  • getTaskCount():估计已执行的任务总数加上仍在队列中等待执行的任务数。
  • setThreadFactory(ThreadFactory threadFactory):设置用于创建新线程的工厂。
  • setRejectedExecutionHandler(RejectedExecutionHandler handler):设置当任务无法被提交执行时使用的拒绝策略。

Fork/Join

Fork/Join 框架是 Java 7 引入的一个用于并行执行任务的框架,它特别适用于那些可以递归分解成更小任务的问题。这个框架旨在高效地利用多核处理器来加速计算密集型操作。以下是 Fork/Join 框架的主要概念、工作原理以及如何使用它。

主要概念

  • Fork:将一个大任务拆分为多个子任务的过程。
  • Join:等待所有子任务完成并将它们的结果合并的过程。

核心类

  • ForkJoinPool:执行 ForkJoinTask 的线程池。它管理着一组工作线程,并提供了一种机制来执行任务及其子任务。
  • ForkJoinTask:这是一个抽象类,代表可以在 ForkJoinPool 中执行的任务。通常情况下,你不需要直接继承这个类,而是使用它的两个具体实现之一:
  • RecursiveAction:用于没有返回结果的任务。
  • RecursiveTask:用于有返回结果的任务(V 是返回值的类型)。

AQS

概念

全称AbstractQueuedSynchronizer(抽象队列同步器),是Java并发包(java.util.concurrent)中的一个核心组件。它提供了一个框架,用于实现依赖于先进先出(FIFO)等待队列的阻塞锁和相关的同步器(如信号量、事件等)。AQS的设计简化了开发高效且可靠的同步器的过程。

读写锁(ReentrantReadWriteLock)

基本操作:

1
2
3
static ReentrantReadWriteLock rw = new ReentrantReadWriteLock();
static ReentrantReadWriteLock.ReadLock r = rw.readLock();
static ReentrantReadWriteLock.WriteLock w = rw.writeLock();

点击并拖拽以移动

ReentrantReadWriteLock.ReadLock r = rw.readLock() : 返回用于读操作的锁

ReentrantReadWriteLock.WriteLock w = rw.writeLock() : 返回用于写操作的锁

基本方法

1. 获取读锁和写锁

  • readLock() :返回一个用于读操作的锁(Lock 对象)。
  • writeLock():返回一个用于写操作的锁(Lock 对象)。

2. 锁的基本操作
读锁(Read Lock)方法

  • void lock() :获取读锁。如果当前没有线程持有写锁,则可以成功获取;否则会阻塞,直到写锁被释放。
  • void unlock() :释放读锁。必须在持有读锁的线程中调用,否则会抛出llegalMonitorStateException。
  • boolean tryLock() : 尝试非阻塞地获取读锁。如果当前没有线程持有写锁,则立即获取并返回 true,否则返回 false。
  • boolean tryLock(long timeout, TimeUnit unit):尝试在指定时间内获取读锁。如果成功获取则返回 true,超时或被中断则返回 false。

写锁(Write Lock)方法

  • void lock():获取写锁。如果当前没有其他线程持有读锁或写锁,则可以成功获取;否则会阻塞,直到锁可用。
  • void unlock():释放写锁。必须在持有写锁的线程中调用,否则会抛出 IllegalMonitorStateException。
  • boolean tryLock():尝试非阻塞地获取写锁。如果当前没有其他线程持有读锁或写锁,则立即获取并返回 true,否则返回 false。
  • boolean tryLock(long timeout, TimeUnit unit):尝试在指定时间内获取写锁。如果成功获取则返回 true,超时或被中断则返回 false。

3. 公平性相关方法

  • boolean isFair():判断锁是否是公平锁。如果是公平锁,等待时间最长的线程优先获得锁;否则是非公平锁,默认为非公平模式。
  • Thread getOwner():返回当前持有写锁的线程。如果没有线程持有写锁,则返回 null。
  • int getQueueLength():返回正在等待获取写锁的线程数量。
  • Collection getQueuedThreads():返回所有正在等待获取写锁的线程集合。
  • int getReadLockCount():返回当前持有读锁的线程数(包括重入次数)。
  • boolean hasQueuedThreads():判断是否有线程正在等待获取写锁。

4. 状态检查方法

  • boolean isWriteLocked():判断写锁是否被任何线程持有。
  • boolean isWriteLockedByCurrentThread():判断当前线程是否持有写锁。
  • int getWriteHoldCount():返回当前线程持有写锁的重入次数。如果当前线程未持有写锁,则返回 0。
  • int getReadHoldCount()返回当前线程持有读锁的重入次数。如果当前线程未持有读锁,则返回 0。

5. 条件变量(Condition)支持
虽然 ReentrantReadWriteLock 支持条件变量,但需要注意:

  • 读锁不支持条件变量:尝试使用读锁创建条件变量会导致 UnsupportedOperationException。
  • 写锁支持条件变量:可以使用写锁创建条件变量。

读读锁可以并发

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
static ReentrantReadWriteLock rw = new ReentrantReadWriteLock();
static ReentrantReadWriteLock.ReadLock r = rw.readLock();
static ReentrantReadWriteLock.WriteLock w = rw.writeLock();

public static void read() {
r.lock();
try {
log.debug("获取read锁, {}", System.currentTimeMillis());
sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
log.debug("释放read锁, {}", System.currentTimeMillis());
r.unlock();
}
}

public static void write() {
w.lock();
try {
log.debug("获取write锁, {}", System.currentTimeMillis());
sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
log.debug("释放write锁, {}", System.currentTimeMillis());
w.unlock();
}
}
public static void main(String[] args) {

new Thread(Test5::read, "t1").start();
new Thread(Test5::read, "t2").start();
}

点击并拖拽以移动

img点击并拖拽以移动

读写锁互相阻塞

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
static ReentrantReadWriteLock rw = new ReentrantReadWriteLock();
static ReentrantReadWriteLock.ReadLock r = rw.readLock();
static ReentrantReadWriteLock.WriteLock w = rw.writeLock();

public static void read() {
r.lock();
try {
log.debug("获取read锁, {}", System.currentTimeMillis());
sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
log.debug("释放read锁, {}", System.currentTimeMillis());
r.unlock();
}
}

public static void write() {
w.lock();
try {
log.debug("获取write锁, {}", System.currentTimeMillis());
sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
log.debug("释放write锁, {}", System.currentTimeMillis());
w.unlock();
}
}
public static void main(String[] args) {

new Thread(Test5::read, "t1").start();
new Thread(Test5::write, "t2").start();
}

点击并拖拽以移动

img点击并拖拽以移动

写写锁互相阻塞

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
static ReentrantReadWriteLock rw = new ReentrantReadWriteLock();
static ReentrantReadWriteLock.ReadLock r = rw.readLock();
static ReentrantReadWriteLock.WriteLock w = rw.writeLock();

public static void read() {
r.lock();
try {
log.debug("获取read锁, {}", System.currentTimeMillis());
sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
log.debug("释放read锁, {}", System.currentTimeMillis());
r.unlock();
}
}

public static void write() {
w.lock();
try {
log.debug("获取write锁, {}", System.currentTimeMillis());
sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
log.debug("释放write锁, {}", System.currentTimeMillis());
w.unlock();
}
}
public static void main(String[] args) {

new Thread(Test5::write, "t1").start();
new Thread(Test5::write, "t2").start();
}

点击并拖拽以移动

img点击并拖拽以移动

写锁是独占的,读锁是共享的。


如有错误,欢迎指正!!!

碎碎念:Java并发编程JUC篇,目前来说是个基础篇,更深入的都还没学,后续有时间会进一步完善qwq,juc内容很多,如果想要深入学习,还是要看一下源码深入理解一下底层原理。。。