Toc
  1. 线程池的创建
  2. 向线程池提交任务
  3. 线程池的终止
  4. 举个栗子
  5. 线程池的分类
    1. 1. CachedThreadPool
    2. 2. FixedThreadPool
    3. 3. ScheduledThreadPool
    4. 4. SingleThreadExecutor
    5. 5. SingleThreadScheduledExecutor
    6. 6. WorkStealingPool(JDK 1.8 之后)
Toc
0 results found
关于线程池的一切
2020/03/14 Java Java Thread

线程池(Thread Pool)对于 限制应用程序中同一时刻运行的线程数 很有用。因为每启动一个新线程都会有相应的性能开销,每个线程都需要给栈分配一些内存等等。

我们可以把并发执行的任务传递给一个线程池,来替代为每个并发执行的任务都启动一个新的线程。只要池里 有空闲的线程 ,任务就会分配给一个线程执行。在线程池的内部,任务被插入一个 阻塞队列 (Blocking Queue ),线程池里的线程会去 取这个队列里的任务。当一个新任务插入队列时,一个空闲线程就会成功的从队列中取出任务并且执行它。

线程池经常应用在多线程服务器上。每个通过网络到达服务器的连接都被包装成一个任务并且传递给线程池。线程池的线程会并发的处理连接上的请求。

合理利用线程池能够带来三个好处。

  1. 降低资源消耗。通过重复利用已创建的线程 降低线程创建和销毁造成的消耗
  2. 提高响应速度。当任务到达时,任务可以不需要等到线程创建就能 立即执行
  3. 提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池 可以进行统一的分配,调优和监控。但是要做到合理的利用线程池,必须对其原理了如指掌。

线程池的创建

java.util.concurrent包中提供了一个叫 ThreadPoolExecutor 的类,用来创建线程池:

public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
...
}

它在创建时最多可以传入 7 个参数,分别是:

corePoolSize:线程池的 核心线程 数量。当提交一个任务到线程池时,线程池会创建一个线程来执行任务,即使其他空闲的基本线程能够执行新任务也会创建线程,等到 需要执行的任务数大于线程池核心线程数量时就不再创建 。如果调用了线程池的prestartAllCoreThreads 方法,线程池会提前创建并启动所有基本线程。核心线程不会自动回收,但如果调用了线程池的 allowCoreThreadTimeOut() 方法,核心线程空闲时间达到 keepAliveTime 之后,就会被回收。

maximumPoolSize:线程池的 最大 大小。也即线程池内能容纳的 最大的线程数量 线程池最大数量 - 核心线程数 = 非核心线程数 。非核心线程在空间时间达到keepAliveTime 之后,就会被回收。

keepAliveTimeunit:当线程的数量大于corePoolSize 的时候,线程的存活时间。keepAliveTimeunit 两个参数要配合使用。unit的可选值有DAYS|HOURS|MINUTES|SECONDS|MILLISECONDS|MICROSECONDS|NANOSECONDS,超时后就会回收。无论选用哪种unit,在传入后都会被转成 NanoSeconds。

workQueue:持有等待被执行的任务的队列,也即我们常说的 阻塞队列 。这个队列只会持有通过调用execute() 方法提交的 Runnable 任务。BlockingQueue 实例可选的有以下几种:

  • ArrayBlockingQueue:是一个基于 数组结构的有界阻塞队列,此队列按 FIFO(先进先出)原则对元素进行排序。
  • LinkedBlockingQueue:一个基于 链表结构的阻塞队列 ,此队列按 FIFO (先进先出) 排序元素,吞吐量通常要高于 ArrayBlockingQueue。静态工厂方法Executors.newFixedThreadPool() 使用了这个队列。
  • SynchronousQueue:一个 不存储元素的阻塞队列 。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于 LinkedBlockingQueue,静态工厂方法Executors.newCachedThreadPool 使用了这个队列。
  • PriorityBlockingQueue:一个 具有优先级有无限阻塞队列
  • DelayedWorkQueue: 一个 具有延迟执行功能的队列

threadFactory:执行器创建一个新的线程时会用到的工厂。可以通过线程工厂给每个创建出来的线程设置更有意义的名字,Debug 和定位问题时非常又帮助。

handler:饱和策略。当队列和线程池都满了,说明线程池处于饱和状态,那么必须采取一种策略处理提交的新任务。这个策略默认情况下是 AbortPolicy,表示无法处理新任务时抛出异常。以下是 JDK1.5 提供的四种策略。

  • AbortPolicy:直接抛出异常
  • CallerRunsPolicy:只用调用者所在线程 来运行任务。
  • DiscardOldestPolicy:丢弃队列里 最近的一个任务,并执行当前任务。
  • DiscardPolicy:不处理,丢弃掉。

当然也可以根据应用场景需要来实现 RejectedExecutionHandler 接口自定义策略。如记录日志或持久化不能处理的任务。

在 Android 的 AsyncTask 中就使用了 RejectedExecutionHandler 来实现了备用线程池。

向线程池提交任务

向线程池提交任务有两个方法:execute()submit(),我们来看看两个方法的源码,比较下有什么不同:

execute()

 /**
* 在未来的某个时间执行给定的任务。该任务有可能在一个新的线程或者是已存在的线程中执行。
*
* 如果任务无法被添加,要么就是因为它的执行器被回收了,要么就是达到了线程池的最大容量,该任务就会触发拒绝策略。
*/
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
// 获得当前线程的生命周期对应的二进制状态码
int c = ctl.get();
// 判断当前线程数量是否小于核心线程数量,如果小于就直接创建线程执行任务,创建成功直接跳出,失败则接着往下走.
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// 判断线程池是否为 RUNNING 状态,并且将任务添加至队列中
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 审核线程池的状态,如果不是 RUNNING 状态,直接移出队列中
if (!isRunning(recheck) && remove(command))
reject(command);
// 如果当前线程数量为 0,则单独创建线程,而不指定任务.
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 如果不满足上述条件,尝试创建一个非核心线程来执行任务,如果创建失败,调用 reject() 方法.
else if (!addWorker(command, false))
reject(command);
}

addWorker()方法是创建线程主要的方法,我们简单看一下:

private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
// 获得当前线程的状态
int c = ctl.get();
int rs = runStateOf(c);

// 只在必要的情况下检查队列是否为空
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;

for (;;) {
// 获取正在工作的线程数量
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}

boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// 新建一个 Worker
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
// 使用 Lock 机制向池中添加工作线程
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());

if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}

submit()

public  Future submit(Callable task) {
if (task == null) throw new NullPointerException();
RunnableFuture ftask = newTaskFor(task);
execute(ftask);
return ftask;
}

protected RunnableFuture newTaskFor(Callable callable) {
return new FutureTask(callable);
}


public class FutureTask<V> implements RunnableFuture<V> {
...
}

public interface RunnableFuture<V> extends Runnable, Future<V> {
void run();
}

可以看出,submit()最终还是会去调用 execute() 方法来执行任务。不同的地方在于,submit()会返回一个包装好的任务,是一个 Future 对象。所以,通常情况下,在 不需要线程执行返回结果值时,我们使用 execute() 方法 。 而当我们 需要返回值时,则使用 submit() 方法 ,他会返回一个 Future 对象。Future 不仅仅可以获得一个结果,他还可以被取消,我们可以通过调用Future.cancel() 方法,取消一个 Future 的执行。

线程池的终止

线程 run 方法跑完之后,就正常结束了,但是线程池也是有关闭的办法的。

  1. shutdown():关闭线程池,不再接收新的任务,但已经添加的任务不受影响。
  2. shutdownNow():尝试停止所有正在执行的任务,停止处理正在等待的任务,并且返回一个正在等待执行的任务列表。

举个栗子

举个例子来看看线程池的用法:

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ThreadPoolTest {
private final ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
3, 5, 1,
TimeUnit.MINUTES,
new ArrayBlockingQueue(50));

public void test() {
for (int i = 0; i < 10; i++) {
final int j = i;
threadPoolExecutor.execute(() -> {
try {
Thread.sleep(3000);
String info = String.format("第 %d 次 - 当前线程 %s", j, Thread.currentThread().getName());
System.out.println(info);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}

((Runnable) () -> {
try {
Thread.sleep(5000);
threadPoolExecutor.shutdown();
} catch (InterruptedException e) {
e.printStackTrace();
}
}).run();
}

public static void main(String[] args) {
new ThreadPoolTest().test();
}
}

输出结果如下:

第 0 次 - 当前线程 pool-1-thread-1
第 1 次 - 当前线程 pool-1-thread-2
第 2 次 - 当前线程 pool-1-thread-3
第 3 次 - 当前线程 pool-1-thread-1
第 5 次 - 当前线程 pool-1-thread-2
第 4 次 - 当前线程 pool-1-thread-3
第 8 次 - 当前线程 pool-1-thread-3
第 6 次 - 当前线程 pool-1-thread-1
第 7 次 - 当前线程 pool-1-thread-2
第 9 次 - 当前线程 pool-1-thread-3

从结果可以看出,虽然是按照结果执行了 10 次任务,但是并不是按照放入的顺序执行的。

线程池的分类

默认的线程池是 ThreadPoolExecutor,那么系统是不是提供了一些个性化的线程池供我们在各种情况下使用呢?

答案是有的。我们看看下图,有 6 种线程池可供选择:

我们分别来简单介绍一下:

1. CachedThreadPool

它的声明如下:

public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue());
}

根据它的构造方法,我们可以看出它的特性:

  • 没有核心线程。
  • 60 秒 timeout。
  • 采用 SynchronousQueue 队列

如果线程处理任务速度小于提交任务的速度,则会不断地创建新的线程,这时需要注意不要过度创建,应采取措施调整双方速度,不然线程创建太多会影响性能。从其特点可以看出,CachedThreadPool 适用于有大量需要立即执行的耗时少的任务的情况。比较适合 执行很多短期异步的小程序或者负载较轻的服务器

2. FixedThreadPool

它的声明如下:

public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue());
}

根据它的构造方法,我们可以看出它的特性:

  • 无论线程池多大,全部是核心线程,没有非核心线程。
  • 因为全部是核心线程,所以不会回收。
  • 采用 LinkedBlockingQueue 队列。

这个非常适合 执行长期任务,因为线程不会回收,能更快地响应执行任务的请求,性能会比较好。

3. ScheduledThreadPool

它的声明如下:

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}

public class ScheduledThreadPoolExecutor
extends ThreadPoolExecutor {
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
}

根据它的构造方法,我们可以看出它的特性:

  • 核心线程数量固定,非核心线程数量无上限
  • 非核心线程在工作完成后会立刻被回收。
  • 采用 DelayedWorkQueue 队列。

ScheduledThreadPool 适合 执行延时任务以及有固定周期的重复任务

4. SingleThreadExecutor

它的声明如下:

public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue()));
}

static class FinalizableDelegatedExecutorService
extends DelegatedExecutorService {
FinalizableDelegatedExecutorService(ExecutorService executor) {
super(executor);
}
protected void finalize() {
super.shutdown();
}
}

根据它的构造方法,我们可以看出它的特性:

  • 只有一个线程,且为核心线程。
  • 线程不会被回收。
  • 采用 LinkedBlockingQueue 队列。
  • 在调用 finalize() 方法触发 GC 时,会关掉线程池。

适合 一个任务一个任务按执行的场景

5. SingleThreadScheduledExecutor

它的声明如下:

public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
return new DelegatedScheduledExecutorService
(new ScheduledThreadPoolExecutor(1));
}

它最终还是创建了一个 ScheduledThreadPoolExecutor,只不过核心线程数为 1,它的特性也很明显:

  • 只有一个核心线程,非核心线程数量无上限
  • 非核心线程在工作完成后会立刻被回收。
  • 采用 DelayedWorkQueue 队列。
  • 可以延期执行任务

它综合了 SingleThreadExecutor 和 ScheduledThreadPool,适合 执行一个接一个的延期任务

6. WorkStealingPool(JDK 1.8 之后)

它的声明如下:

public static ExecutorService newWorkStealingPool(int parallelism) {
return new ForkJoinPool
(parallelism,
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}

作为 JDK 1.8 之后加入的『后起之秀』,这个线程池必然有一些过人之处。文档上介绍的是这样的:

创建一个含有足够多线程的线程池,来维持相应的 并行级别,它会通过工作窃取的方式,使得多核的 CPU 不会闲置,总会有活着的线程让 CPU 去运行。

实际的线程数量可能会动态地增加或减少。

工作窃取线程池无法保证任务的执行顺序与添加顺序一致。

工作窃取不是什么 Java 独有的东西,.NET 的 TPL 库早就存在好几年了。所谓工作窃取,指的是闲置的线程去处理本不属于它的任务。

每个处理器核,都有一个队列存储着需要完成的任务。对于多核的机器来说,当一个核对应的任务处理完毕后,就可以去帮助其他的核处理任务。

它的本质是 ForkJoinPool,在 JDK 1.7 版本中加入的。

打赏
支付宝
微信
本文作者:CodingRabbit
版权声明:本文首发于CodingRabbit的博客,转载请注明出处!