线程池及实现原理
在Java实际开发当中,多线程开发很常见,提起多线程自然少不了线程池,下面来学习一下线程池的原理以及实际应用。
什么是线程池
线程池是一种多线程处理的形式,通过把处理的任务添加到队列中,然后在创建线程后自动执行这些任务。帮我们重复管理线程,避免创建大量的线程增加开销。
有了线程池除了降低开销以外,还可以提高响应的速度,在JVM
中一个对象的创建经过以下几个步骤:
- 检查对应类是否已经被加载、解析和初始化。
- 类加载后,为新生对象分配内存。
- 将分配到的内存空间初始为0。
- 对对象进行关键信息的设置,比如对象的哈希码等。
- 然后执行init方法初始化对象。
线程池的构造
创建线程池需要使用ThreadPoolExecutor
类来创建,
1 | public ThreadPoolExecutor(int corePoolSize, //核心线程的数量 |
在ThreadPoolExecutor
这个类的构造函数中,这些参数所起到的作用:
corePoolSize
:线程池中核心线程的数量- 在线程数少于核心数量时,有新任务进来就新创建一个线程,即使空闲的线程。
- 等超出核心数量后,就不会再新创建线程了,这时空闲的队列就开始到队列中执行任务了。
maximumPoolSize
:线程池中最大线程的数量- 包括核心线程池数量+核心以外的数量
- 如果任务队列满了,并且线程池中的线程小于最大线程数量,会再创建新的线程去执行任务。
keepAliveTime
:核心线程以外的线程存活的时间- 如果给线程池设置
allowCoreThreadTimeOut(true)
,则核心线程在空闲时也会进入到存活倒计时。 - 如果任务是多而且容易执行的,可以调大这个参数,这样就可以在存活的时间里执行更多的任务。
- 如果给线程池设置
workQueue
:保存待执行任务的阻塞队列- 不同的任务类型有不同的选择
threadFactory
:生成新的线程工厂类handler
:线程饱和策略CallerRunsPolicy
:中要线程池没关闭,就直接用调用者所在线程来执行任务。AbortPoliy
:直接抛出RejectedExecutionExceptioin
异常。DiscardPolicy
:悄悄的把任务放生了,不执行了DiscardOldestPolicy
:把队列里待最久的那个任务丢弃,然后再调用execute()。- 我们也可以实现
RejectedExecutionHandler
接口来自定义策略。
我们再结合着源码,看看这些个参数起到了什么作用:
1 | public void execute(Runnable command) { |
从上面可以看出,线程池处理一个任务主要分为三步来完成的。
保存任务的队列
当线程池中的核心线程数已满时,任务就要保存到队列中了。
线程池中使用的队列是BlockingQueue
接口,常用的实现有如下几种:
ArrayBlockingQueue
:是基于数组、有界队列,有助于防止资源耗尽,但是可能较难调整和控制,按FIFO(先进先出)原则对元素进行排序。LinkedBlockingQueue
:是基于链表、无界队列,创建的线程数不会超过corePoolSizes
,当线程正忙时,任务进入队列等待,按照FIFO原则对元素进行排序- 吞吐量通常要高于
ArrayBlockingQueue
Executors.newFixedThreadPool()
使用了这个队列
- 吞吐量通常要高于
SynchronousQueue
:不存储元素的阻塞队列- 每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态
- 吞吐量通常要高于
LinkedBlockingQueue
Executors.newCachedThreadPool
使用了这个队列
PriorityBlockingQueue
:具有优先级的、无限阻塞队列
常用的几种线程池
JDK
中为我们提供了几种常见的线程池的实现,都可以通过Executors
这个静态工厂来创建。
newFixedThreadPool
1
2
3
4
5public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}我们可以看到,
FixedThreadPool
的核心线程和最大线程数都是指定值,也就是说当线程池中的线程数量超过核心线程数后 ,任务都会被放到阻塞队列中。而
keepAliveTime
为0,也就是多余的空闲线程会被立即终止。这里选用的阻塞队列是
LinkedBlockingQueue
,使用的是默认容量Integer.MAX_VALUE
,相当于没有上限。即这个线程池执行任务的流程是:
- 线程数少于核心线程数,也就是设置的线程数时,新建线程去执行任务。
- 线程数等于核心线程数后,将任务加入阻塞队列,由于队列容量无限大,一直可以加。
- 执行完任务的线程反复去队列中取任务执行。
FixedThreadPool
用于负载比较重的服务器,为了资源的合理利用,需要限制当前线程的数量。newSingleThreadExecutor
1
2
3
4
5
6public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}SingleThreadExecutor
相当于是特殊的FixedThreadPool
,它的执行流程如下:- 线程池中没有线程时,新建一个线程执行任务。
- 有一个线程以后,将任务加到阻塞队列,不停的加
- 唯一的这一个线程不停地去队列里取任务执行。
SingleThreadExecutor
用于串行执行任务的场景,每个任务必须按顺序执行,不需要并发执行。newCachedThreadPool
1
2
3
4
5public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}CachedThreadPool
没有核心线程,非核心线程数无上限,每个空闲的线程存活的时间是60s,超出之后会立刻被回收。CachedThreadPool
使用的队列是SynchronousQueue
,这个队列的作用就是传递任务,并不会保存。因此当提交任务的速度大于处理任务的速度时,每提交一个任务就会创建一个线程,在极端情况下会创建过多的线程,耗尽CPU和内存资源。
它的执行流程:
- 没有核心线程,直接向
SynchronousQueue
中提交任务。 - 如果有空闲线程,就去取出任务执行;如果没有空闲线程就创建一人新的。
- 执行完任务的线程有60s的存活时间,如果这个时间内可以接到新任务,就可以继续存活,否则就被回收了。
由于空闲60s的线程会被终止,长时间保持空闲的
CachedThreadPool
不会占用任何资源,CachedThreadPool
用于并发执行大量短期的小任务,或者负载较轻的服务器- 没有核心线程,直接向
newScheduledThreadPool
1
2
3
4
5
6
7
8
9public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE,
DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
new DelayedWorkQueue());
}
private static final long DEFAULT_KEEPALIVE_MILLIS = 10L;ScheduledThreadPoolExecutor
继承自ThreadPoolExecutor
,最多线程数为Integer.MAX_VALUE
,使用DeplayedWorkQueue
作为任务队列。ScheduledThreadPoolExecutor
添加任务和执行任务的机制与ThreadPoolExecutor
有所不同。ScheduledThreadPoolExecutor
添加任务提供了两个方法:scheduleAtFixedRate()
:按某种速率周期执行scheduleWithFixedDelay()
:在某个延迟后执行
两个方法的代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (period <= 0L)
throw new IllegalArgumentException();
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(period),
sequencer.getAndIncrement());
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
delayedExecute(t);
return t;
}
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (delay <= 0L)
throw new IllegalArgumentException();
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
-unit.toNanos(delay),
sequencer.getAndIncrement());
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
delayedExecute(t);
return t;
}从上可以看到,这两个方法都是创建了一个
ScheduleRutureTask
对象,调用decorateTask()
方法转换成RunnableScheduledFuture
对象,然后添加到队列中。ScheduledFutureTask
的主要属性:1
2
3
4
5
6
7
8
9
10
11
12
13
14private class ScheduledFutureTask<V>
extends FutureTask<V> implements RunnableScheduledFuture<V> {
//添加到队列中的顺序
private final long sequenceNumber;
//何时执行这个任务
private volatile long time;
//执行的间隔周期
private final long period;
//实际被添加到队列中的 task
RunnableScheduledFuture<V> outerTask = this;
//在 delay queue 中的索引,便于取消时快速查找
int heapIndex;
//...
}DelayQueue
中封装了一个优先级队列,这个队列会对队列中的ScheduledFutureTask
进行排序,两个任务的执行时间不同时,time
小的先执行;否则比较添加到队列中的顺序sequenceNumber
,先提交的先执行。ScheduledThreadPoolExecutor
的执行流程如下:- 调用上面两个方法添加一个任务
- 线程池中的线程人
DelayQueue
中取任务 - 然后执行任务
具体执行任务的步骤是比较复杂的:
- 线程从
DelayQueue
中获取time
大于等于当前时间的ScheduledFutureTask
,DelayQueue.take()
- 执行完后修改这个
task
的time
为下次被执行的时间。 - 然后再把这个
task
放回队列中DelayQueue.add()
ScheduledThreadPoolExecutor
用于需要多个后台线程执行周期任务,同时需要限制线程数量的场景。
执行任务的两种方法
ExecutorService
提供了两种提交任务的方法:
execute()
:提交不需要返回值的任务submit()
:提交需要返回值的任务
execute()
1 | void execute(Runnable command); |
execute()
的参数是一个Runnable,也没有返回值。因此提交后无法判断该任务是否被线程池执行成功。
1 | ExecutorService executor = Executors.newCachedThreadPool(); |
submit()
1 | <T> Future<T> submit(Callable<T> task); |
submit()
有三种重载的方法,参数可以是Callable
,也可以是Runnable
。
同时它会返回一个Future
对象,通过它我们可以判断任务是否执行成功。
获得执行结果调用Future.get()
方法,这个方法会阻塞当前线程直到任务完成。
提交一个Callable
任务时,需要通过FutureTask
:
1 | FutureTask futureTask = new FutureTask(new Callable<String>() { //创建 Callable 任务 |
线程池的关闭
线程池即使不执行任务也会占用一些资源,所以在我们要退出任务时最好关闭线程池。
有两种关闭线程池的方法:
shutdown()
将线程池的状态设置为
SHUTDOWN
,然后中断所有没有正在执行的线程。shutdownNow()
将线程池设置为
STOP
,然后尝试停止所有线程,并返回等待执行任务的列表
它们的共同点是,都是通过遍历线程池中的工作线程,逐个调用Thread.interrup()
来中断线程,所以一些无法响应中断的任务可能永远无法停止。
小结
了解了JDK提供的几种线程池实现,在实际开发当中根据任务的类型决定选择使用哪个线程池。
CachedThreadPool
用于并发执行大量短期的小任务,或者是负载较轻的服务器。FixedThreadPool
用于负载比较重的服务器,为了资源的合理利用,需要限制当前线程的数量。SingleThreadExecutor
用于串行执行任务的场景,每个任务必须按顺序执行,不需要并发执行。ScheduledThreadPoolExecutor
用于需要多个后台线程执行周期任务,同时需要限制线程数量的场景。
为了防止创建过多的线程导致系统奔溃,建议使用有边界的队列,因为在无法添加更多任务时会拒绝任务,这样可以提前预警,避免影响整个系统。
执行时间、顺序有要求的话可以选择优先级队列,同时也要保证低优先级的任务有机会被执行。
原文作者: dgb8901,yinxing
原文链接: https://www.itwork.club/2018/07/15/thread-pool/
版权声明: 转载请注明出处
为您推荐
体验小程序「简易记账」
关注公众号「特想学英语」