线程池基本
一、优势
- 避免频繁的创建和销毁线程
- 提供运行效率
- 合理设置线程池大小,避免因线程数超过硬件资源瓶颈带来的问题
二、类型
- Executors.newFixedThreadPool() 创建固定线程长度的线程池
- Executors.newCachedThreadPool() 创建不限个数的线程池,空闲线程会在 60s 后被回收
- Executors.newSingleThreadExecutor() 创建只有一个线程的线程池,只有一个工作线程在工作,FIFO
- Executors.newScheduledThreadPool() 创建一个可以指定数量的线程池,但这个带有延迟性和可以周期性执行任务的功能
- Executors.newWorkStealingPool()
三、使用
public class Demo implements Runnable {
@Override
public void run() {
System.err.println("哈哈哈哈");
}
public static void main(String[] args) {
ExecutorService service = Executors.newFixedThreadPool(3);
for (int i = 0 ; i<= 3; i++) {
service.execute(new Demo());
}
}
}
四、ThreadPool 分析
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newWorkStealingPool(int parallelism) {
return new ForkJoinPool
(parallelism,
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
public static ScheduledExecutorService newScheduledThreadPool(
int corePoolSize, ThreadFactory threadFactory) {
return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}
我们可以发现,大部分的线程池的创建操作都是通过 new ThreadPollExecutor 进行创建的,我们找到它的构造函数
public ThreadPoolExecutor(int corePoolSize, // 核心线程数
int maximumPoolSize, // 最大线程数
long keepAliveTime, // 线程的空闲时间,超过时间后将会被回收
TimeUnit unit, // 线程超时的时间单位
BlockingQueue<Runnable> workQueue, // 阻塞队列
RejectedExecutionHandler handler) { // 拒绝策略
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), handler);
}
4.1 newFixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
- 在构建该线程池的时候,会传递一个线程数,这个传递过来的值,代表了核心线程数和最大线程数,通过使用 LinkedBlockingQueue 队列来进行线程的存放,最大容量是 Integer.MAX_VALUE,相当于没有上限
- 执行流程
- 首先判断当前的工作线程是否少于核心线程数
- 如果大于核心线程数,将其放入阻塞队列中
- 如果少于核心线程数,则直接创建 Worker 进行执行
- 当线程执行完毕,从阻塞队列中拉取,执行
- 适用于负载比较大的场景,但为了资源的合理利用,要合理的设置线程数
4.2 newCachedThreadPool
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
- cachedThreadPool ,默认创建一个可缓存的线程池,keepAliveTime 设置为 60,代表线程空闲 60S 之后,就会被回收
- 执行流程
- 没有核心线程,会直接将任务放到 SynchronousQueue 中
- 如果有空闲线程则直接运行,若没有空闲线程,则新建一个
- 执行完毕后的线程会继续接任务,如果60S后还没有接到任务,则会被直接回收
4.3 newSingleThreadExecutor
简单的创建一个 先进先出的单线程的线程池
五、执行流程
当我们调用 execute 后
5.1 execute
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
// 如果当前的工作线程小于核心线程,则直接创建线程执行
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) { // 核心线程池已满,添加到队列中
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command)) // 如果线程池已经停止运行,并将其移除队列,则执行拒绝策略
reject(command);
else if (workerCountOf(recheck) == 0) // 队列已满,创建非核心线程
addWorker(null, false);
}
else if (!addWorker(command, false)) // 都满,拒绝
reject(command);
}
ctl 的话可以说是一个标记为
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1; // 右移29位-1表示最大线程容量
// 高3位表示运行状态,
private static final int RUNNING = -1 << COUNT_BITS; // 接收新任务,执行队列中的任务
private static final int SHUTDOWN = 0 << COUNT_BITS; // 不接收新任务,执行队列中的任务
private static final int STOP = 1 << COUNT_BITS; // 不接收新任务,停止执行队列中的任务
private static final int TIDYING = 2 << COUNT_BITS; // 所有任务执行完毕,线程池工作线程数量为0,等待调用 terminated()
private static final int TERMINATED = 3 << COUNT_BITS; // terminated() 方法执行完毕
// Packing and unpacking ctl
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
通过对 int 值进行位运算,让高3位表示线程状态,低29位表示线程数
5.2 addWorker
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
//
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c); // 获得当前工作线程
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false; // 大于最大容量,拒绝添加
if (compareAndIncrementWorkerCount(c)) // 进行计数,失败则继续进行
break retry;
c = ctl.get(); // 检查一下状态,若状态发生改变,则重新执行
if (runStateOf(c) != rs)
continue retry;
}
}
boolean workerStarted = false; // 是否成功启动
boolean workerAdded = false; // 是否成功添加
Worker w = null;
try {
w = new Worker(firstTask); // 将当前任务封装为 工作线程(Worker)
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock(); // 加锁
try {
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w); // 添加到工作队列中,HashSet
int s = workers.size(); // 获取当前队列大小
if (s > largestPoolSize) // largestPoolSize 是出现过得最大数
largestPoolSize = s;
workerAdded = true; // 标记成功
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start(); // 启动
workerStarted = true; // 标记启动成功
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w); // 添加失败
}
return workerStarted;
}
主要实现两个功能:
- 添加工作线程数,计数
- 创建工作线程,并调用 start 方法启动
5.3 Worker
private final class Worker extends AbstractQueuedSynchronizer
implements Runnable{
final Thread thread; // 当前线程
Runnable firstTask; // 首先要执行的任务
volatile long completedTasks; // 计数
Worker(Runnable firstTask) {
setState(-1); // 初始状态 -1,防止在调用 runWorker() 方法前被中断
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
public void run() {
runWorker(this);
}
}
其实这里面逻辑也比较简单,Worker 类继承了 AQS,实现了 Runnable ,最终的执行是调用 runWroker() 方法,
当新建一个 worker 的时候,首先设置一个标记为 state = -1, 通过线程工厂创建出来线程,将最初要执行的task 设置为当前 task
5.4 addWorkerFailed
private void addWorkerFailed(Worker w) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (w != null)
// 如果 worker 构建好了,就移除
workers.remove(w);
// 原子递减
decrementWorkerCount();
// 尝试结束线程池
tryTerminate();
} finally {
mainLock.unlock();
}
}
5.5 runWorker
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // 解锁为了允许进行中断,因为 new Worker 默认的 state=-1,将 state 设置为0,只有为0才能进行中断
boolean completedAbruptly = true;
try {
// 如果 task 等于空,通过 getTask() 从阻塞队列中取
while (task != null || (task = getTask()) != null) {
w.lock();
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt(); // 中断
try {
beforeExecute(wt, task); // 空实现
Throwable thrown = null;
try {
task.run(); // 调用 run 方法,执行
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;// 设置为 Null , 继续从阻塞队列中取值
w.completedTasks++; // 计数
w.unlock();
}
}
completedAbruptly = false;
} finally {
// 将 worker 移除,计数递减
// 根据布尔值 allowCoreThreadTimeOut 来决定是否补充新的 Worker线程
processWorkerExit(w, completedAbruptly);
}
}
5.6 getTask
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// timed 变量用于判断是否需要进行超时控制。
// allowCoreThreadTimeOut 默认是 false,也就是核心线程不允许进行超时;
// wc > corePoolSize,表示当前线程池中的线程数量大于核心线程数量;
// 对于超过核心线程数量的这些线程,需要进行超时控制
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
// 如果需要控制超时,则通过 poll 进行获取
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r; //如果拿到的任务不为空,则直接返回给 worker 进行处理
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
其实简单来说,如果当前的工作线程大于核心线程数,并小于最大线程数,且阻塞队列已经满了,这时还是可以增加工作线程,但这时如果超时没有获取到任务,就代表阻塞队列已经空了,也就代表可以清除核心线程之外的工作线程,当getTask 为null的时候,会跳出循环,runWorker 方法执行完毕,由JVM对线程进行回收
六、拒绝策略
默认的拒绝策略为 : AbortPolicy
- AbortPolicy : 抛出异常
- CallerRunsPolicy : 通过当前线程,直接运行
- DiscardOldestPolicy :丢弃阻塞队列中靠最前的任务,并执行当前任务
- DiscardPolicy :不作为
我们可以通过实现 RejectedExecutionHandler , 实现自己的拒绝策略
七、线程池的基本操作
通过 Executors 进行线程池的创建,因为好多参数我们不需要去知道是什么意思,所以就可能会导致出现各种各样的问题,所以我们可以通过 new ThreadExecutor 的方式去进行创建
- 当我们创建出来线程池的时候,里面是没有线程的,我们可以通过 prestartCoreThread,prestartAllCoreThread,一个初始化一个核心线程,一个初始化全部核心线程
- 当我们要关闭线程池的时候,shutdown,shutdownNow,一个调用之后,会执行完毕才会关闭,一个会停止执行,清空缓存队列,返回尚未执行完毕的任务
- 我们可以通过 setCorePoolSize() , setMaximumPoolSize(),一个设置核心池大小,一个设置最大线程数
八、阻塞队列
workQueue 中存放待执行的任务,类型为 BlockingQueue,通常可取:
- ArrayBlockingQueue:基于数组的先进先出队列,此队列创建时必须指定大小;
- LinkedBlockingQueue:基于链表的先进先出队列,如果创建时没有指定此队列大小,则默 Integer.MA_VALUE;
- SynchronousQueue:这个队列比较特殊,它不会保存提交的任务,而是将直接新建一个 线程来执行新来的任务。
九、execute 与 submit 的区别
- execute
- execute 接收 Runnable 类型参数
- execute 执行报错会抛出异常
- execute 无返回值
- submit
- 可以接收 Runnable 和 Callable 两种类型的参数
- submit 执行报错不会抛出异常,除非调用 Future.get
- 若传入的是 Callable 类型的参数,则可以获得一个 Future 类型的返回值
十、Callable 和 Future
- 实例
@Override
public String call() throws Exception {
return "hello world";
}
public static void main(String[] args) throws ExecutionException,
InterruptedException {
Demo callableDemo = new Demo();
FutureTask futureTask = new FutureTask(callableDemo);
new Thread(futureTask).start();
System.out.println(futureTask.get());
}
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}
RunnableFuture 是一个接口,实现了 Future 和 Runnable
Future 的话表示一个任务的生命周期,并提供相关的判断,来查看线程执行的一些状态
public interface Future<V> {
// 取消
boolean cancel(boolean mayInterruptIfRunning);
// 判断是否取消
boolean isCancelled();
// 判断是否结束
boolean isDone();
// 获取,如果当前线程还没执行完毕,则等待执行完毕
V get() throws InterruptedException, ExecutionException;
// 有超时的获取
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
FutureTask 可以说是 Future 和 Runnable 的结合,我们可以将 Runnable 看作是生产者,Future 当做是消费者,生产者通过 run 方法计算结果,消费者通过 get 获取结果
Future 有一些状态值
private static final int NEW = 0;// 新建状态,表示这个 FutureTask 还没有开始运行
private static final int COMPLETING = 1;// 完成状态,表示 FutureTask 任务已经计算完成了
private static final int NORMAL = 2;// 正常执行完毕
private static final int EXCEPTIONAL = 3;// 执行完毕,有异常
private static final int CANCELLED = 4;// 执行完毕,被取消
private static final int INTERRUPTING = 5;// 执行完毕,发起了中断请求
private static final int INTERRUPTED = 6;// 执行完毕,已经完成中断请求
- run
public void run() {
// 如果当前不是昔年状态,且设置运行标识失败,则直接 return
// 保证了只有一个线程可以执行
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable; // 设置 callable
if (c != null && state == NEW) {
V result;
boolean ran;
try {
// 调用 callable.call()
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex); // 设置异常
}
if (ran)
// 结果封装
set(result);
}
} finally {
runner = null;
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
- get
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
}
get方法就是阻塞获取线程执行结果,这里主要做了两个事情
1. 判断当前的状态,如果状态小于等于 COMPLETING,表示 FutureTask 任务还没有完结, 所以调用awaitDone方法,让当前线程等待。
2. 1. report返回结果值或者抛出异常
* awaitDone
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false; // 节点是否添加
for (;;) {
// 如果被中断了,则直接移除,并抛出异常
if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}
int s = state;
if (s > COMPLETING) { // 表示已经执行完毕
if (q != null)
q.thread = null; // 设置为 Null
return s;
}
else if (s == COMPLETING) // cannot time out yet
Thread.yield();
else if (q == null)
// 构建一个 waitNode ,加入到队列中
q = new WaitNode();
else if (!queued)
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
else if (timed) {
// 有超时的挂起
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
LockSupport.parkNanos(this, nanos);
}
else
// 挂起
LockSupport.park(this);
}
}
run 方法执行完毕后会进行释放