一、 线程池作用
- 方便管理,复用线程
- 避免重复的创建和销毁线程,减少开销
- 避免无限创建线程引起的OutOfMemoryError【简称OOM】
二、创建
JDK中Executors提供了四个静态方法快速创建线程池。
- newFixedThreadPool
- newSingleThreadExecutor
- newCachedThreadPool
- newSingleThreadScheduledExecutor
同步分析可以得知,以上几种线程池都是创建的ThreadPoolExecutor对象,过程如下:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
ThreadPoolExecutor类构造函数参数含义:
1、 corePoolSize:核心线程池数量
- 当前线程数少于核心数量时,有新任务进来就新建一个线程
- 当前线程数大于核心数量时,新任务会加到任务队列中或者创建非核心线程
1、 maximumPoolSize:最大线程数量
- 包括核心线程池数量 + 核心以外的数量
- 如果任务队列满了,并且池中线程数小于最大线程数,会再创建新的线程执行任务
1、 keepAliveTime:非核心线程的最大空闲时间
- 如果给线程池设置 allowCoreThreadTimeOut(true),这个参数也可以作用于核心线程
1、 workQueue:保存待执行任务的阻塞队列,JDK提供了几种实现,后面讨论
2、 threadFactory:创建线程的方法,默认使用DefaultThreadFactory
3、 handler:任务拒绝策略
- CallerRunsPolicy:只要线程池没关闭,就直接用调用者所在线程来运行任务
- AbortPolicy:直接抛出 RejectedExecutionException 异常
- DiscardPolicy:没有任何提示,放弃任务
- DiscardOldestPolicy:丢弃队列中最旧的任务
- 我们也可以实现自己的 RejectedExecutionHandler 接口自定义策略
现在,我们着重分析一下上面提到的四种创建线程的方式:
- newFixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}
可以看到,核心线程和最大线程相同,即线程数量是固定的。keepAliveTime=0,空闲的线程会立即终止。阻塞队列使用的是 LinkedBlockingQueue,使用的是其无参构造方法,没有指定队列的容量,那么它的容量就是Integer.MAX_VALUE
,基本相当于没有上限。
- newSingleThreadExecutor
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
核心线程和最大线程相同,只有一个线程。。keepAliveTime=0,空闲的线程会立即终止。阻塞队列和newFixedThreadPool一样。
- newCachedThreadPool
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
没有核心线程,最大线程数为Integer.MAX_VALUE
,队列是SynchronousQueue,不储存元素。有新任务时,没有空闲线程则创建线程。
- newSingleThreadScheduledExecutor(以后解析)
使用的是子类ScheduledThreadPoolExecutor,另外提供了两种执行任务的方式:
scheduleAtFixedRate(Runnable command,long initialDelay, long 1, TimeUnit.MINUTES)
, 以固定的速率执行,比如每隔一分钟执行一次。scheduleWithFixedDelay(Runnable command,long initialDelay, long 1,TimeUnit.MINUTES)
, 上次任务结束后1分钟,下次任务再执行。
至此,线程池创建完毕,接下来是添加任务到线程池中。
ExecutorService 提供了两种提交任务的方法:
1、 execute():提交不需要返回值的任务
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
其步骤可分为:
1、 当前线程数小于核心线程数,创建一个线程
2、 如果超过核心线程且任务队列未满,则加入队列中等待执行
3、 如果加入队列失败,则创建非核心线程执行任务(不得超过最大线程数量)
这里把添加任务 addWorker() 的代码贴出来:
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
addWorker() 前面一部分是关于线程池数量限制的运行状态的判断,这里不做分析,需要注意的是,添加任务的时候用到了 final ReentrantLock mainLock = this.mainLock; mainLock.lock();
这是一个同步的操作。
1、 submit():提交需要返回值的任务 在AbstractExecutorService中定义了 submit() 的重载方法:
Future<?> submit(Runnable task)
Future<T> submit(Runnable task, T result);
Future<T> submit(Callable<T> task)
同时它会返回一个 FutureTask 对象,通过它我们可以判断任务是否执行成功。
获得执行结果调用 FutureTask.get() 方法(也可以使用支持超时的方法),这个方法会阻塞当前线程直到任务完成,阻塞的过程如下:
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
}
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
for (;;) {
if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}
int s = state;
if (s > COMPLETING) {
if (q != null)
q.thread = null;
return s;
}
else if (s == COMPLETING) // cannot time out yet
Thread.yield();
else if (q == null)
q = new WaitNode();
else if (!queued)
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
else if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
LockSupport.parkNanos(this, nanos);
}
else
LockSupport.park(this);
}
}
awaitDone() 方法内通过不断的循环,形成阻塞,直到返回结果(超时)。
三、关闭线程池
- shutdown()
- shutdownNow()
四、其他
Excetors提供的方法容易造成OOM分析:
1、 FixedThreadPool和SingleThreadExecutor => 允许的请求队列长度为 Integer.MAX_VALUE
,可能会堆积大量的请求,从而引起OOM异常
2、 CachedThreadPool => 允许创建的线程数为 Integer.MAX_VALUE
,可能会创建大量的线程,从而引起OOM异常
本文使用 tech.souyunku.com 排版