corePoolSize: the number of threads to keep in the pool, even if they are idle, unless allowCoreThreadTimeOut is set
maximumPoolSize: the maximum number of threads to allow in the pool keepAliveTime when the number of threads is greater than the core, this is the maximum time that excess idle threads will wait for new tasks before terminating.
unit : the time unit for the keepAliveTime argument
workQueue: the queue to use for holding tasks before they are executed. This queue will hold only the Runnable tasks submitted by the execute method.
handler: the handler to use when execution is blocked because the thread bounds and queue capacities are reache

  1. corePoolSize(核心线程数)

线程池中会始终保持的最小线程数
即使这些线程处于空闲状态,也不会被销毁(除非设置了allowCoreThreadTimeOut为true)
这些线程响应速度最快,因为不需要创建新线程

  1. maximumPoolSize(最大线程数)

线程池允许创建的最大线程数
当工作队列满了,且当前线程数小于maximumPoolSize时,会创建新线程
超过这个数量的任务会触发拒绝策略

  1. keepAliveTime(空闲线程存活时间)

非核心线程空闲超过这个时间后会被销毁
如果allowCoreThreadTimeOut为true,核心线程也会受这个参数影响
用于减少资源浪费

  1. unit(时间单位)

keepAliveTime的时间单位
可以是SECONDS、MINUTES等TimeUnit枚举值

  1. workQueue(工作队列)

用于存放待执行的任务
常用队列类型:

ArrayBlockingQueue:有界队列
LinkedBlockingQueue:无界队列
SynchronousQueue:无缓冲的等待队列
PriorityBlockingQueue:优先级队列

  1. handler(拒绝策略)

当队列满且线程数达到maximumPoolSize时的处理策略
默认策略类型:

AbortPolicy:抛出异常(默认)
CallerRunsPolicy:在调用者线程执行
DiscardPolicy:直接丢弃
DiscardOldestPolicy:丢弃最早的任务

Process UML

planUML描述如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
@startuml
title ThreadPoolExecutor执行流程

skinparam backgroundColor #FFFFFF
skinparam handwritten false

start

:提交新任务;

if (当前线程数 < 核心线程数?) then (是)
:创建新的核心线程;
:执行任务;
else (否)
if (工作队列未满?) then (是)
:将任务放入工作队列;
note right
队列类型:
- ArrayBlockingQueue (有界)
- LinkedBlockingQueue (无界)
- SynchronousQueue (同步)
- PriorityBlockingQueue (优先级)
end note
else (否)
if (当前线程数 < 最大线程数?) then (是)
:创建新的临时线程;
note right
临时线程在空闲超过
keepAliveTime后销毁
end note
:执行任务;
else (否)
:执行拒绝策略;
note right
拒绝策略:
- AbortPolicy (默认)
- CallerRunsPolicy
- DiscardPolicy
- DiscardOldestPolicy
end note
endif
endif
endif

fork
:核心线程池;
note right: 保持运行
fork again
:临时线程池;
note right: 空闲超时销毁
fork again
:等待队列;
note right: 任务缓存
end fork

stop

legend right
参数说明:
====
corePoolSize: 核心线程数
maximumPoolSize: 最大线程数
keepAliveTime: 空闲线程存活时间
workQueue: 工作队列
handler: 拒绝策略
endlegend

@enduml

Comment and share

一. 同步结构

提供了比synchronized更加高级的同步结构:countDownLatch、CyclicBarrier、Semaphore等,可以实现更加丰富的多线程操作。

  1. Semaphore:作为资源控制器限制同时进行工作的线程数量,java版本的信号量实现。

    通过Semaphore实现车站调度demo

  2. CountDownLatch:允许一个或者多个线程等待某些操作完成。

  3. CyclicBarrier:一种辅助性的同步结构,语序多个线程等待到达某个屏障。

    CountDownLatch和CyclicBarrier的区别:

    i. CountDownLatch是不可以重置,所以无法重用,而CyclicBarrier则没有这种限制,可以重用。

    ii. CountDownLatch的基本操作组合是countDown/await。调用await的线程阻塞等待countDown足够的次数,不管你是在一个线程还是多个线程里countDown,只要次数足够即可。CountDownLatch操作的是事件。

    iii. CyclicBarrier的基本操作组合,则就是await,当所有的伙伴(parties)都调用了await,才会继续进行任务,并自动进行重置。注意,正常情况下,CyclicBarrier的重置都是自动发生的,如果我们调用reset方法,单还有线程在等待,就会导致等待线程发生干扰,抛出BrokenBarrierException异常。CyclicBarrier侧重点是线程,而不是调用事件, 它的典型应用场景是用来等待并发线程结束。

二. 线程安全容器

java.util.concurrent 包提供的容器(Queue、List、Set)、Map,从命名上可以大概区分为 Concurrent*、CopyOnWrite和 Blocking

Map形式的:

  1. ConcurrentHashMap:jdk8以前使用分段锁,jdk8后采用CAS

  2. ConcunrrentSkipListMap:

  3. ConcurrentSkipListMap:是TreeMap的线程安全版本。

List形式的:

CopyOnWriteArrayList:通过快照实现,适用于读多写少的场景。在对其实例进行修改操作(add/remove等)会新建一个数据并修改,修改完毕之后,再将原来的引用指向新的数组。

Set形式的:

CopyOnWriteArraySet:同上CopyOnWriteArrayList

Queue形式的:

Queue类图

ArrayBlockQueue: 有界队列,需要显示的指定队列大小

LinkedBlockQueue: 被认为无界,其实有

我在介绍 ReentrantLock 的条件变量用法的时候分析过 ArrayBlockingQueue,不知道你有没有注意到,其条件变量与 LinkedBlockingQueue 版本的实现是有区别的。notEmpty、notFull 都是同一个再入锁的条件变量,而 LinkedBlockingQueue 则改进了锁操作的粒度,头、尾操作使用不同的锁,所以在通用场景下,它的吞吐量相对要更好一些。 — 引用

PriorityBlockQueue: 无边界的优先级队列

三. 强大的Executor框架

  1. newCachedThreadPool(),它是一种用来处理大量短时间工作任务的线程池,具有几个鲜明特点:它会试图缓存线程并重用,当无缓存线程可用时,就会创建新的工作线程;如果线程闲置的时间超过 60 秒,则被终止并移出缓存;长时间闲置时,这种线程池,不会消耗什么资源。其内部使用 SynchronousQueue 作为工作队列。
  2. newSingleThreadExecutor(),它的特点在于工作线程数目被限制为 1,操作一个无界的工作队列,所以它保证了所有任务的都是被顺序执行,最多会有一个任务处于活动状态,并且不允许使用者改动线程池实例,因此可以避免其改变线程数目。
  3. newSingleThreadScheduledExecutor() 和 newScheduledThreadPool(int corePoolSize),创建的是个 ScheduledExecutorService,可以进行定时或周期性的工作调度,区别在于单一工作线程还是多个工作线程。
  4. newWorkStealingPool(int parallelism),这是一个经常被人忽略的线程池,Java 8 才加入这个创建方法,其内部会构建。

四. 框架图

并发图谱

参考作品

《Java Core 卷一》

JUC常用4大并发工具类 https://mp.weixin.qq.com/s/Ixz8V0oMHyRvCzJ3EsZkFA

Java并发干货 https://mp.weixin.qq.com/s/Sxnf5teW1vehkhBfkV7E-g

Comment and share

  • page 1 of 1
Author's picture

Topsion

Fullstack Developer


Coder


Xi'an China