Java线程池分析

引言

在并发编程中,我们经常会使用到线程池,当然我们也可以手动一个一个创建线程,那么为何我们还是推崇大家使用线程池进行并发编程呢?借用《Java并发编程的艺术》提到的来说一下使用线程池的优点有3个

  • 降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
  • 提高响应速度。当任务到达时,任务可以不需要的等到线程创建就能立即执行。
  • 提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。

ThreadPoolExecutor类

在日常使用中,大多数情况我们都会使用JDK提供的Executors去创建线程池,Executors利用工厂模式向我们提供了4种线程池实现方式,但是最新的阿里Java开发手册中明确说明了不建议使用Executors去创建线程池,原因是使用Executors创建线程池会使用很多默认值,默认使用的参数有时候是不合理,但是开发者往往会忽略。所以我们应该尽量使用ThreadPoolExecutor类来显示的创建线程池。

下面我们先来看下ThreadPoolExecutor类的继承结构

image.png

  • Executor接口只含有一个execute(Runnable command)方法,加入一个新的线程
  • ExecutorService接口相比Executor多个几个方法

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
     public interface ExecutorService extends Executor {
        void shutdown();
        boolean isShutdown();
        boolean isTerminated();
        boolean awaitTermination(long timeout, TimeUnit unit)
            throws InterruptedException;
        <T> Future<T> submit(Callable<T> task);
        <T> Future<T> submit(Runnable task, T result);
        Future<?> submit(Runnable task);
        <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
            throws InterruptedException;
        <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                      long timeout, TimeUnit unit)
            throws InterruptedException;
     
        <T> T invokeAny(Collection<? extends Callable<T>> tasks)
            throws InterruptedException, ExecutionException;
        <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                        long timeout, TimeUnit unit)
            throws InterruptedException, ExecutionException, TimeoutException;
    }

我们直接看ThreadPoolExecutor所提供的构造方法

1
2
3
4
5
6
7
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)

现在我们来解释一下各个参数的含义

  • corePoolSize 核心池的大小,这个参数跟后面讲述的线程池的实现原理有非常大的关系
  • maximumPoolSize 线程池最大线程数
  • keepAliveTime 空闲线程的存活时间,默认只有当线程池中的线程数大于corePoolSize时,keepAliveTime才会起作用
  • unit 参数keepAliveTime的时间单位
  • workQueue 一个阻塞队列,用来存储等待执行的任务
  • threadFactory 线程工厂,主要用来创建线程
  • handler 表示当拒绝处理任务时的策略,有以下四种取值
1
2
3
4
ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。 
ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。
ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务

下面再简单再介绍一下ThreadPoolExecutor中几个重要的方法

  • execute 这个方法其实就是对Executor接口的实现,也是最核心的方法,用于向线程池中提交一个任务执行
  • submit 该方法也是用于向线程池里提交任务,区别在于submit提交可以有返回值,会返回一个Futrue类型来异步获取执行结果
  • shutdown 用于关闭线程池
  • shutdownNow 也是关闭线程池,与shutdown区别在于shutdownNow会尝试关闭正在执行的线程任务

通过源码分析 ThreadPoolExecutor

上文我们简单的分析了ThreadPoolExecutor,下面我们将根据源码来进一步分析ThreadPoolExecutor核心功能的实现

先从其核心方法execute开始解读

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public void execute(Runnable command) {
// 如果传入空对象 则抛出空指针异常
if (command == null)
throw new NullPointerException();
int c = ctl.get();

/**
* 如果当前正在执行任务的线程数小于 corePoolSize
*/
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
/**
* 如果当前正在执行任务的线程数大于corePoolSize,且workQueue未满
*/
if (isRunning(c) && workQueue.offer(command)) {
// 重复检查一次,防止正好此时线程池被shutdown
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
/**
* 如果当前正在执行的任务线程数大于corePoolSize,且workQueue已满,则使用 * maximumPoolSize参数创建线程,如果已经到了maximumPoolSize最大值,则启用拒绝 * 策略
*/
else if (!addWorker(command, false))
reject(command);
}

上面的代码简单总结一下就是

  1. 当前正在执行任务的线程数小于corePoolSize时,正常加入执行
  2. 当前正在执行任务的线程数大于corePoolSize,且workQueue未满时,则将任务加入等待队列workQueue
  3. 如果当前正在执行的任务线程数大于corePoolSize,且workQueue已满,则会临时扩充线程数,根据maximumPoolSize最大线程数值
  4. 如果当前正在执行的任务线程数大于maximumPoolSize 这时已经超出最大可承受的线程数值了,会启用拒绝策略,也就是上文所配置的四种策略之一,默认是抛出异常,加入任务失败

workQueue 任务队列

上文有提到任务队列,也就是在等待执行的任务队列。workQueue的本质是一种阻塞队列BlockingQueue,列举三种常用类型

  1. 基于数组的先进先出队列,此队列创建时必须指定大小
  2. 基于链表的先进先出队列
  3. 同步队列 该队列不存储元素,每个插入操作必须等待另一个线程调用移除操作,否则插入操作会一直阻塞

核心功能分析

上文中我们通过查看源码知道了新任务加入线程池的策略,下面我们继续往下看,再分析之前,我们可以先思考几个问题

  1. 任务等待队列是在何时被执行?
  2. 线程是如何实现重复利用的?,毕竟这是线程池最重要的功能了
  3. 空闲线程根据keepAliveTime参数是在哪里被回收的?

接下来我们继续通过源码来解读这三个问题
根据上文execute方法源码,可以看到调用了addWorker方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);

// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;

for (;;) {
int wc = workerCountOf(c);
// 这里判断当前线程数是否大于最大值,大于了则返回false
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}

boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// 新建了一个 Worker
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
// 加锁
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());

if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
// 把当前这个工作线程加入到 workers 集合中
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
// 开始执行此worker
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
  • 根据参数core 判断当前线程数是否已经大于corePoolSizemaximumPoolSize,大于则直接返回false
  • 初始化了一个Worker对象,并且加入到了workers集合中,workers是一个Set类型的集合
  • 启动当前这个Worker线程任务

经过上面的分析,可能大概会有疑惑,Worker对象是做啥的?下面我们就来看下Worker类的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{

/** Thread this worker is running in. Null if factory fails. */
final Thread thread;
/** Initial task to run. Possibly null. */
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;

/**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}

/** Delegates main run loop to outer runWorker */
public void run() {
runWorker(this);
}

...
}
  • Worker实现了Runnable接口
  • 构造方法中通过ThreadFactory初始化了一个新的线程对象
  • run()方法调用了ThreadPoolExecutor的runWorker()

重点在runWorker方法中

线程池重复利用机制

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
// 获取第一个任务
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
// 从任务队列中循环获取任务执行 getTask方法有可能会阻塞
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
// 空实现,可以通过继承`ThreadPoolExecutor`重写此方法
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 执行
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}

代码解析到这里,基本可以回答上文所提出的三个问题了

  • 加入到任务队列的任务会在这里被执行,因为Worker对象执行完一个任务后,并不会立刻结束,而是会通过循环调用getTask()方法从任务队列中获取最新任务来执行,这样子就实现了线程的重复利用,而不必每次都重新创建一个Worker对象
  • getTask()方法无法获取到最新的任务后,该Worker线程才会被回收。所以第三个问题keepAliveTime的机制必然是在getTask()中实现的

getTask 超时策略

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?

for (;;) {
int c = ctl.get();
int rs = runStateOf(c);

// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}

int wc = workerCountOf(c);

// workers大于corePoolSize,或则允许corePoolSize设置空闲超时时间
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

// 当前线程数已经大于maximumPoolSize或则已经超时过一次,则直接返回null
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}

try {
// 获取任务,如果timed为true,则等待一定时间(keepAliveTime)未返回的话,会返回null,如果timed未设置为true,则会一直阻塞,直到有数据
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
  • 先判断当前线程池状态,如果runState大于SHUTDOWN(即为STOP或者TERMINATED),则直接返回null
  • 判断当前线程池的线程数是否已经大于核心池大小corePoolSize或者允许为核心池中的线程设置空闲存活时间,则调用workQueue.poll(time,timeUnit)来取任务,这个方法会阻塞等待一定的时间,如果取不到任务就返回null,否则则会调用workQueue.take()来取任务,这个方法会wait释放CPU一直阻塞。

总结一下,只有当我们设置allowCoreThreadTimeOut(核心池线程空闲超时时间)或则当前线程数大于corePoolSize时,keepAliveTime机制才会生效

整个流程到这里大概就分析完了,下图基本绘制了线程池提交任务,执行任务的整个流程

image.png

Executors 中几个常用的线程池

虽然阿里Java开发规约中不建议我们使用Executors类直接创建线程池,但还是有必要简单介绍几个其中常用的方法

  • Executors.newCachedThreadPool() 创建一个缓冲池,缓冲池容量大小为Integer.MAX_VALUE. 将corePoolSize设置为0,将maximumPoolSize设置为Integer.MAX_VALUEworkQueue使用的SynchronousQueue,也就是说来了任务就创建线程运行,当线程空闲超过60秒,就销毁线程
  • Executors.newSingleThreadExecutor() 创建容量为1的缓冲池,将corePoolSizemaximumPoolSize都设置为1,workQueue使用的LinkedBlockingQueue
  • Executors.newFixedThreadPool(int) 创建固定容量大小的缓冲池,空闲线程不会销毁。corePoolSizemaximumPoolSize值是相等的,workQueue使用的是LinkedBlockingQueue

总结

本篇文章主要从对线程池的配置使用,以及源码实现做了分析,总体上比较全面的分析了Java线程池的实现。

Powered by Hexo and Hexo-theme-hiker

Copyright © 2013 - 2020 王俊男的技术杂谈 All Rights Reserved.

访客数 : | 访问量 :