线程池及实现原理

在Java实际开发当中,多线程开发很常见,提起多线程自然少不了线程池,下面来学习一下线程池的原理以及实际应用。

什么是线程池

线程池是一种多线程处理的形式,通过把处理的任务添加到队列中,然后在创建线程后自动执行这些任务。帮我们重复管理线程,避免创建大量的线程增加开销。

有了线程池除了降低开销以外,还可以提高响应的速度,在JVM中一个对象的创建经过以下几个步骤:

  1. 检查对应类是否已经被加载、解析和初始化。
  2. 类加载后,为新生对象分配内存。
  3. 将分配到的内存空间初始为0。
  4. 对对象进行关键信息的设置,比如对象的哈希码等。
  5. 然后执行init方法初始化对象。

线程池的构造

创建线程池需要使用ThreadPoolExecutor类来创建,

1
2
3
4
5
6
7
8
public ThreadPoolExecutor(int corePoolSize,	//核心线程的数量
int maximunPoolSize, //最大线程数量
long keepAliveTime, //超出核心线程数量以外的线程空余存活时间
TimeUnit unit, //设定存活时间的单位
BlockingQueue<Runnable> workQueue, //保存执行任务的队列
ThreadFactory threadFactory, //创建新线程使用的工厂
RejectedExecutionHandler handler //当任务无法执行的处理器
){}

ThreadPoolExecutor这个类的构造函数中,这些参数所起到的作用:

  1. corePoolSize:线程池中核心线程的数量
    • 在线程数少于核心数量时,有新任务进来就新创建一个线程,即使空闲的线程。
    • 等超出核心数量后,就不会再新创建线程了,这时空闲的队列就开始到队列中执行任务了。
  2. maximumPoolSize:线程池中最大线程的数量
    • 包括核心线程池数量+核心以外的数量
    • 如果任务队列满了,并且线程池中的线程小于最大线程数量,会再创建新的线程去执行任务。
  3. keepAliveTime:核心线程以外的线程存活的时间
    • 如果给线程池设置allowCoreThreadTimeOut(true),则核心线程在空闲时也会进入到存活倒计时。
    • 如果任务是多而且容易执行的,可以调大这个参数,这样就可以在存活的时间里执行更多的任务。
  4. workQueue:保存待执行任务的阻塞队列
    • 不同的任务类型有不同的选择
  5. threadFactory:生成新的线程工厂类
  6. handler:线程饱和策略
    • CallerRunsPolicy:中要线程池没关闭,就直接用调用者所在线程来执行任务。
    • AbortPoliy:直接抛出RejectedExecutionExceptioin异常。
    • DiscardPolicy:悄悄的把任务放生了,不执行了
    • DiscardOldestPolicy:把队列里待最久的那个任务丢弃,然后再调用execute()。
    • 我们也可以实现RejectedExecutionHandler接口来自定义策略。

我们再结合着源码,看看这些个参数起到了什么作用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();

int c = ctl.get();
//1.当前池中线程比核心数少,新建一个线程执行任务
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
//2.核心线程池已满,但任务队列未满,添加到队列中
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command)) //如果这时被关闭了,拒绝任务
reject(command);
else if (workerCountOf(recheck) == 0) //如果之前的线程已被销毁完,新建一个线程
addWorker(null, false);
}
//3.核心池已满,队列已满,试着创建一个新线程
else if (!addWorker(command, false))
reject(command); //如果创建新线程失败了,说明线程池被关闭或者线程池完全满了,拒绝任务
}

从上面可以看出,线程池处理一个任务主要分为三步来完成的。

保存任务的队列

当线程池中的核心线程数已满时,任务就要保存到队列中了。

线程池中使用的队列是BlockingQueue接口,常用的实现有如下几种:

  1. ArrayBlockingQueue:是基于数组、有界队列,有助于防止资源耗尽,但是可能较难调整和控制,按FIFO(先进先出)原则对元素进行排序。

  2. LinkedBlockingQueue:是基于链表、无界队列,创建的线程数不会超过corePoolSizes,当线程正忙时,任务进入队列等待,按照FIFO原则对元素进行排序

    • 吞吐量通常要高于ArrayBlockingQueue
    • Executors.newFixedThreadPool()使用了这个队列
  3. SynchronousQueue:不存储元素的阻塞队列

    • 每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态
    • 吞吐量通常要高于LinkedBlockingQueue
    • Executors.newCachedThreadPool使用了这个队列
  4. PriorityBlockingQueue:具有优先级的、无限阻塞队列

常用的几种线程池

JDK中为我们提供了几种常见的线程池的实现,都可以通过Executors这个静态工厂来创建。

  1. newFixedThreadPool

    1
    2
    3
    4
    5
    public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
    0L, TimeUnit.MILLISECONDS,
    new LinkedBlockingQueue<Runnable>());
    }

    我们可以看到,FixedThreadPool的核心线程和最大线程数都是指定值,也就是说当线程池中的线程数量超过核心线程数后 ,任务都会被放到阻塞队列中。

    keepAliveTime为0,也就是多余的空闲线程会被立即终止。

    这里选用的阻塞队列是LinkedBlockingQueue,使用的是默认容量Integer.MAX_VALUE,相当于没有上限。

    即这个线程池执行任务的流程是:

    • 线程数少于核心线程数,也就是设置的线程数时,新建线程去执行任务。
    • 线程数等于核心线程数后,将任务加入阻塞队列,由于队列容量无限大,一直可以加。
    • 执行完任务的线程反复去队列中取任务执行。

    FixedThreadPool用于负载比较重的服务器,为了资源的合理利用,需要限制当前线程的数量。

  2. newSingleThreadExecutor

    1
    2
    3
    4
    5
    6
    public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
    (new ThreadPoolExecutor(1, 1,
    0L, TimeUnit.MILLISECONDS,
    new LinkedBlockingQueue<Runnable>()));
    }

    SingleThreadExecutor相当于是特殊的FixedThreadPool,它的执行流程如下:

    • 线程池中没有线程时,新建一个线程执行任务。
    • 有一个线程以后,将任务加到阻塞队列,不停的加
    • 唯一的这一个线程不停地去队列里取任务执行。

    SingleThreadExecutor用于串行执行任务的场景,每个任务必须按顺序执行,不需要并发执行。

  3. newCachedThreadPool

    1
    2
    3
    4
    5
    public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
    60L, TimeUnit.SECONDS,
    new SynchronousQueue<Runnable>());
    }

    CachedThreadPool没有核心线程,非核心线程数无上限,每个空闲的线程存活的时间是60s,超出之后会立刻被回收。

    CachedThreadPool使用的队列是SynchronousQueue,这个队列的作用就是传递任务,并不会保存。

    因此当提交任务的速度大于处理任务的速度时,每提交一个任务就会创建一个线程,在极端情况下会创建过多的线程,耗尽CPU和内存资源。

    它的执行流程:

    • 没有核心线程,直接向SynchronousQueue中提交任务。
    • 如果有空闲线程,就去取出任务执行;如果没有空闲线程就创建一人新的。
    • 执行完任务的线程有60s的存活时间,如果这个时间内可以接到新任务,就可以继续存活,否则就被回收了。

    由于空闲60s的线程会被终止,长时间保持空闲的CachedThreadPool不会占用任何资源,CachedThreadPool用于并发执行大量短期的小任务,或者负载较轻的服务器

  4. newScheduledThreadPool

    1
    2
    3
    4
    5
    6
    7
    8
    9
    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    return new ScheduledThreadPoolExecutor(corePoolSize);
    }
    public ScheduledThreadPoolExecutor(int corePoolSize) {
    super(corePoolSize, Integer.MAX_VALUE,
    DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
    new DelayedWorkQueue());
    }
    private static final long DEFAULT_KEEPALIVE_MILLIS = 10L;

    ScheduledThreadPoolExecutor继承自ThreadPoolExecutor,最多线程数为Integer.MAX_VALUE,使用DeplayedWorkQueue作为任务队列。

    ScheduledThreadPoolExecutor添加任务和执行任务的机制与ThreadPoolExecutor有所不同。

    ScheduledThreadPoolExecutor添加任务提供了两个方法:

    • scheduleAtFixedRate():按某种速率周期执行
    • scheduleWithFixedDelay():在某个延迟后执行

    两个方法的代码如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
    long initialDelay,
    long period,
    TimeUnit unit) {
    if (command == null || unit == null)
    throw new NullPointerException();
    if (period <= 0L)
    throw new IllegalArgumentException();
    ScheduledFutureTask<Void> sft =
    new ScheduledFutureTask<Void>(command,
    null,
    triggerTime(initialDelay, unit),
    unit.toNanos(period),
    sequencer.getAndIncrement());
    RunnableScheduledFuture<Void> t = decorateTask(command, sft);
    sft.outerTask = t;
    delayedExecute(t);
    return t;
    }
    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
    long initialDelay,
    long delay,
    TimeUnit unit) {
    if (command == null || unit == null)
    throw new NullPointerException();
    if (delay <= 0L)
    throw new IllegalArgumentException();
    ScheduledFutureTask<Void> sft =
    new ScheduledFutureTask<Void>(command,
    null,
    triggerTime(initialDelay, unit),
    -unit.toNanos(delay),
    sequencer.getAndIncrement());
    RunnableScheduledFuture<Void> t = decorateTask(command, sft);
    sft.outerTask = t;
    delayedExecute(t);
    return t;
    }

    从上可以看到,这两个方法都是创建了一个ScheduleRutureTask对象,调用decorateTask()方法转换成RunnableScheduledFuture对象,然后添加到队列中。

    ScheduledFutureTask的主要属性:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    private class ScheduledFutureTask<V>
    extends FutureTask<V> implements RunnableScheduledFuture<V> {
    //添加到队列中的顺序
    private final long sequenceNumber;
    //何时执行这个任务
    private volatile long time;
    //执行的间隔周期
    private final long period;
    //实际被添加到队列中的 task
    RunnableScheduledFuture<V> outerTask = this;
    //在 delay queue 中的索引,便于取消时快速查找
    int heapIndex;
    //...
    }

    DelayQueue中封装了一个优先级队列,这个队列会对队列中的ScheduledFutureTask进行排序,两个任务的执行时间不同时,time小的先执行;否则比较添加到队列中的顺序sequenceNumber,先提交的先执行。

    ScheduledThreadPoolExecutor的执行流程如下:

    • 调用上面两个方法添加一个任务
    • 线程池中的线程人DelayQueue中取任务
    • 然后执行任务

    具体执行任务的步骤是比较复杂的:

    • 线程从DelayQueue中获取time大于等于当前时间的ScheduledFutureTaskDelayQueue.take()
    • 执行完后修改这个tasktime为下次被执行的时间。
    • 然后再把这个task放回队列中DelayQueue.add()

    ScheduledThreadPoolExecutor用于需要多个后台线程执行周期任务,同时需要限制线程数量的场景。

执行任务的两种方法

ExecutorService提供了两种提交任务的方法:

  • execute():提交不需要返回值的任务
  • submit():提交需要返回值的任务

execute()

1
void execute(Runnable command);

execute()的参数是一个Runnable,也没有返回值。因此提交后无法判断该任务是否被线程池执行成功。

1
2
3
4
5
6
7
ExecutorService executor = Executors.newCachedThreadPool();
executor.execute(new Runnable() {
@Override
public void run() {
//do something
}
});

submit()

1
2
3
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);

submit()有三种重载的方法,参数可以是Callable,也可以是Runnable

同时它会返回一个Future对象,通过它我们可以判断任务是否执行成功。

获得执行结果调用Future.get()方法,这个方法会阻塞当前线程直到任务完成。

提交一个Callable任务时,需要通过FutureTask

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
FutureTask futureTask = new FutureTask(new Callable<String>() {    //创建 Callable 任务
@Override
public String call() throws Exception {
String result = "";
//do something
return result;
}
});
Future<?> submit = executor.submit(futureTask); //提交到线程池
try {
Object result = submit.get(); //获取结果
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}

线程池的关闭

线程池即使不执行任务也会占用一些资源,所以在我们要退出任务时最好关闭线程池。

有两种关闭线程池的方法:

  1. shutdown()

    将线程池的状态设置为SHUTDOWN,然后中断所有没有正在执行的线程。

  2. shutdownNow()

    将线程池设置为STOP,然后尝试停止所有线程,并返回等待执行任务的列表

它们的共同点是,都是通过遍历线程池中的工作线程,逐个调用Thread.interrup()来中断线程,所以一些无法响应中断的任务可能永远无法停止。

小结

了解了JDK提供的几种线程池实现,在实际开发当中根据任务的类型决定选择使用哪个线程池。

  1. CachedThreadPool用于并发执行大量短期的小任务,或者是负载较轻的服务器。
  2. FixedThreadPool用于负载比较重的服务器,为了资源的合理利用,需要限制当前线程的数量。
  3. SingleThreadExecutor用于串行执行任务的场景,每个任务必须按顺序执行,不需要并发执行。
  4. ScheduledThreadPoolExecutor用于需要多个后台线程执行周期任务,同时需要限制线程数量的场景。

为了防止创建过多的线程导致系统奔溃,建议使用有边界的队列,因为在无法添加更多任务时会拒绝任务,这样可以提前预警,避免影响整个系统。

执行时间、顺序有要求的话可以选择优先级队列,同时也要保证低优先级的任务有机会被执行。

原文作者: dgb8901,yinxing

原文链接: https://www.itwork.club/2018/07/15/thread-pool/

版权声明: 转载请注明出处

为您推荐

体验小程序「简易记账」

关注公众号「特想学英语」

了解synchronized和lock