专注于 JetBrains IDEA 全家桶,永久激活,教程
持续更新 PyCharm,IDEA,WebStorm,PhpStorm,DataGrip,RubyMine,CLion,AppCode 永久激活教程

ThreadPoolExecutor 原理及源码详细分析

总览线程池工作机制

57_1.png

新任务来到的时候,先去判断核心线程池数量(corePoolSize)满了没有,如果没有满那么创建一个 Worker 去执行这个任务。

某一时刻发现核心线程池数量满了,那么这个任务就暂时放到 workQueue 中去了。后续只要线程池中的 Worker 空闲下来了(任务执行完了)就去 workQueue 中取任务执行。

随着任务不断的增加,核心线程池中 Worker 的消费能力跟不上了,即 workQueue 中堆积的任务开始变得越来越多,某一时刻 workQueue 满了。

这个时候新来的任务就要去检查线程池的允许创建的最大容量是多少了(maximumPoolSize),比如核心线程池数量限制为 5 它满了,队列长度限制为 10 也满了,最大线程池数量限制为 10,则允许再创建 5 个非核心的 Worker 去执行任务。

某一时刻 maximumPoolSize 也满了,那么新来的任务就将被采取对应的拒绝策略来执行了

这时候线程池逐渐闲下来了(比如到了半夜之类的用户请求很少了),此时队列里面的任务可能已经被消费完了,或者是很少,即需要执行的任务远远小于线程池 Worker 数量,那么最初创建的 10 个 Worker 中大部分的 Worker 都无法取得任务执行了,这个时候就要将部分闲置的 Worker 释放了比如说某个 Worker 在 keepAliveTime 的时间还没有获取到任务执行,那么就认为它空闲时间满足释放要求了,释放对应的 Worker

需要注意的是在底层实现中 Worker,并没有一个状态标识他是核心还是非核心的,在线程池这里仅仅是一个数量的概念,即在释放的时候会看情况释放某几个 Worker 而不管他创建的时候是否是核心 Worker,如果此时已经没有任务需要执行了,那么释放 Worker 后就能保证线程池中剩下的 worker 数量与 corePoolSize 一致。但是有一点需要注意的是如果将 allowCoreThreadTimeOut 设置为 true 了后,核心线程池中的 Worker 也会释放即最终 worker 数量可能为 0,但是 allowCoreThreadTimeOut 一般是默认为 false 的

一些关键字段含义

// 线程池状态,高 3 位为线程池状态,低 29 位为线程池容量
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
// 线程池最大容量为 2^29 - 1
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
// 取出线程池的运行状态
private static int runStateOf(int c)     { return c & ~CAPACITY; }
// 取出目前的线程数量
private static int workerCountOf(int c)  { return c & CAPACITY; }
// 获得 ctl
private static int ctlOf(int rs, int wc) { return rs | wc; }    

线程池状态 程序移位 高 3 位 低 29 位 说明
RUNNING -1 << 29 111 000… 运行状态
SHUTDOWN 0 << 29 000 000… 关闭状态
STOP 1 << 29 001 000… 停止状态
TIDYING 2 << 29 010 000… 所有线程终止了后调用 terminated()
TERMINATED 3 << 29 011 000… terminated() 调用结束

其中高 3 位表示线程池目前处于什么状态,低 29 位用来表示线程池目前线程的数量

构造方法

    // 核心线程池数量
    private volatile int corePoolSize;
    // 最大线程池数量
    private volatile int maximumPoolSize;
    // 非核心线程空闲时间,超过该时间后会被回收
    private volatile long keepAliveTime;
    // 工作队列,当核心线程池数量满了后,新的任务会放入工作队列中
    private final BlockingQueue<Runnable> workQueue;
    // 拒绝策略,当线程池无法再执行新的任务的时候调用
    private volatile RejectedExecutionHandler handler;
    // 默认的拒绝策略
    private static final RejectedExecutionHandler defaultHandler =
        new AbortPolicy();

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

Worker

线程池中执行任务的线程,它会将 Runnable 封装进去,当执行完了某一个任务后,它会继续从队列里面取任务执行,可以通过 prestartAllCoreThreads() 预先初始化创建好 corePoolSize 个数量的 Worker,后续任务来了就直接用创建好的 Worker 执行就好,或者懒加载的方式每当一个任务过来时候创建一个 Worker 去执行。

当池中 Worker 数量达到 corePoolSize 的时候,新来任务的那么就直接添加到 workQueue 当中(注意这时候只是添加的 Runnable 没有新的 Worker 产生,即 Worker 是用来执行任务的线程),这时线程池里面所有的 Worker 只要谁闲下来了那么就去工作队列中取出任务来执行即可。

当 workQueue 满了后(意味着核心线程池里面的 Worker 们忙不过来了,都在不停的执行任务都执行不完),那么就会添加非核心的 Worker 去执行任务。

然后任务继续过来达到 maximumPoolSize 限制的线程池最大容量后(核心 Worker 和非核心 Worker 全部都忙不过来了无法处理新的任务了)这个时候就选择对应的策略处理,拒绝新的任务,还是将其忽略等等。

非核心 Worker 如果空闲时间超过了 keepAliveTime 那么就会释放这个 Worker,这里需要注意的一点是,释放的时候只是保证释放一个 Worker 并不一定是释放之前 addWorker(task, false) 的这个,即如果目前比较空闲的话,会释放几个空闲的 Worker 最终保证核心 Worker 数量与 corePoolSize 一致即可

private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable {
        // 用以执行任务的线程
        final Thread thread;
        // 需要执行的任务
        Runnable firstTask;
        // 统计每个 Worker 完成了多少任务
        volatile long completedTasks;

        Worker(Runnable firstTask) {
            // 新创建的 Worker 没有执行是不允许中断的
            // 后面会提到为什么 setState(-1); 就无法中断
            setState(-1);
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }

        public void run() {
            // 执行任务
            runWorker(this);
        }
}

这里先大概介绍下 Worker 因为任务执行的时候需要用它,便于理解后续的分析,runWorker(this) 后面会详细分析是如何运行,如何释放非核心 Worker 等

执行线程池的任务

    public void execute(Runnable command) {
        if (command == null)
                throw new NullPointerException();
        int c = ctl.get();
        // 检测正在运行的 Worker 数量是否小于核心 Worker 数
        if (workerCountOf(c) < corePoolSize) {
            // 如果核心 worker 数没有到 corePoolSize 那么创建一个新的 Worker 来执行任务
            if (addWorker(command, true))
                return;
            // 如果核心 worker 添加失败则再次获取一下线程池状态值
            // 为什么会失败?addWorker() 中会说明
            c = ctl.get();
        }
        // 如果线程池处于运行状态,并且任务能够放入 workQueue 中
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            // 再次检查线程池的状态如果已经不处于活动状态
            // 删除刚刚放入队列里面的任务
            // 并且使用对应的拒绝策略拒绝任务
            if (! isRunning(recheck) && remove(command))
                reject(command);
            // 如果说在任务添加队列成功后,没有 Worker 了
            // 那么就添加一个非核心的 Worker 用来取队列里面的任务来执行
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        // 虽然队列已经满了但是如果还没有达到最大线程数 maximum 那么继续添加 Worker
        // 如果达到 maximum 无法添加了则采取对应的拒绝策略
        else if (!addWorker(command, false))
            reject(command);
    }

addWorker()

    private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            // 获取线程池状态
            int c = ctl.get();
            // 获取线程池运行状态
            int rs = runStateOf(c);
            // @1
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                // 获取线程池 worker 数量
                int wc = workerCountOf(c);
                // 如果 worker 数量达到了最大线程池允许的数量则拒绝添加 worker
                // 否则若是添加核心线程 worker 的话不能超过 corePoolSize 核心线程数
                // 若不是核心线程的话则不能超过限制的 maximumPoolSize 最大线程池数
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                // 线程池中 worker 数量 + 1
                // 如果添加成功的话则跳出准备执行 worker 线程
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                // 检查运行状态如果和之前的状态不一致那么重试添加
                if (runStateOf(c) != rs)
                    continue retry;
                // 如果状态一直并且添加失败则说明同时有有很多线程进来那么 CAS 循环重试直到成功
            }
        }

        // worker 启动成功标识
        boolean workerStarted = false;
        // worker 添加成功标识
        boolean workerAdded = false;
        Worker w = null;
        try {
            // 创建 Worker
            w = new Worker(firstTask);
            // 获取对应的线程
            final Thread t = w.thread;
            // 线程不为 Null 的话
            if (t != null) {
                // 重入锁锁住一个一个的添加并且启动
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    int rs = runStateOf(ctl.get());
                    // 再次检查状态
                    // 线程池处于运行状态或者
                    // 线程池处于 shutdown 但是 firstTask 为 Null
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        // 预检查线程是否可以启动不能的话抛出异常
                        if (t.isAlive())
                            throw new IllegalThreadStateException();
                        // 添加 worker
                        workers.add(w);
                        // 记录 worker 数量并且标识为添加成功
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                // worker 添加成功那么则将 worker 标识为启动成功
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            // 如果 worker 启动失败
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

这个方法主要的功能是添加核心、非核心 Worker 同时校验其合法性,并且增加 ctl 中 worker 计数,如果添加成功的话就执行对应的 Worker 来处理任务,如果添加失败的话则需要保证从 worker 池中移除并且 ctl 中 worker 统计数量 -1.

我们来看下 @1:

  • 线程池此时的状态为 > SHUTDOWN 的状态即 STOP、TIDYING、TERMINATED 不允许添加新的 Worker
  • 线程池此时的状态为 SHUTDOWN 但是却添加了一个新的任务要来执行,即 firstTask != null 这种情况是不允许的
  • 线程池此时的状态为 SHUTDOWN 添加了一个 Worker (firstTask == null) 可能需要用来执行队列中还没有终止的任务,但是却发现队列里面没有任务了!(队列里面没有任务了还添加新的 Worker 来做什么都 SHUTDOWN 了,我都准备关闭了,所以添加失败)

addWorkerFailed()

    private void addWorkerFailed(Worker w) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            // 从线程池 workers 中删除对应的 worker
            if (w != null)
                workers.remove(w);
            // 线程池 worker 数量 -1
            decrementWorkerCount();
            // 尝试去终止线程池后面再分析
            tryTerminate();
        } finally {
            mainLock.unlock();
        }
    }

这个方法比较简单,在 addWorker() 中分别对线程池 worker 数量+1 然后将 worker 放入池中这两部,如果后者添加失败的话就从 workers 中移除,然后保证 worker 数量 -1,然后去尝试调用 tryTerminate() 终止线程池,这个方法后面再分析

runWorker()

Worker 添加成功并且成功启动后就会调用 runWorker 了就是调用上面 Worker 的 run() 方法然后调用 runWorker()

    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        // 取出 worker 的第一个任务准备执行
        Runnable task = w.firstTask;
        w.firstTask = null;
        // @1 unlock 允许中断
        w.unlock(); // allow interrupts
        // 标识任务是不是被异常终止的默认是
        boolean completedAbruptly = true;
        try {
            // 执行添加的任务或者从队列里面取出任务不断的执行
            while (task != null || (task = getTask()) != null) { // @2
                // lock 锁住一个个的执行任务
                w.lock();
                // @3
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    // 执行之前可以做一些事情留给子类调用
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        // 运行任务
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        // 执行任务之后可以做一些事情留给子类调用
                        afterExecute(task, thrown);
                    }
                } finally {
                    // task 置位 null
                    task = null;
                    // 完成任务 + 1
                    w.completedTasks++;
                    // 解锁
                    w.unlock();
                    // 开启又一轮循环取任务执行等过程
                }
            }
            // 任务是正常结束的,即处理完了队列里面的所有任务
            completedAbruptly = false;
        } finally {
            // @4 处理退出逻辑
            processWorkerExit(w, completedAbruptly);
        }
    }

这个方法的主要功能是,用 Worker 去不断的取 workQueue 里面的任务来执行。

@1 处,为什么 unlock 就可以允许中断了呢,首先 unlock 保证这个 Worker 后续可以成功的 lock 然后执行任务,其次就是在 shutdown() 的时候,会去中断所有的 Worker 的时候会去尝试获取 Worker 的锁,如果这里释放了那么就能成功中断,在后面 shutdown() 会详细分析。

@2 处,关键就是所有的 Worker 都会通过 getTask() 去获取任务执行,如果获取到了那么就执行后续操作,如果任务没有呢?是不是就说明目前工作队列里面没有任务了,Worker 们好像有点闲啊,如果非核心的 Worker 达到了 keepAliveTime 这个时间还没有任务做,是不是该被释放了呢?这个操作就是 getTask() 和 processWorkerExit() 共同完成的,后面会详细分析。

@3 处,如果线程池状态 >= STOP 确保线程被正确中断,否则的话清除中断标记

@4 处 processWorkerExit() 的时候分析

getTask()

    private Runnable getTask() {
        // 判断是否 poll() 超时
        boolean timedOut = false; // Did the last poll() time out?

        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // @1
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }

            int wc = workerCountOf(c);

            // 如果线程池中的 worker 数量大于 corePoolSize 
            // 则说明可能需要淘汰一些空闲的线程
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

            // @2
            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
                // 在 keepAliveTime 时间内是否获取到了任务
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                // 获取到了成功返回
                if (r != null)
                    return r;
                // 如果没有获取到设置 timedOut 为 true
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

这个方法的主要功能是不断的从 workQueue 中取出任务,如果在指定的 keepAliveTime 时间还没有取到任务的话则返回 null 需要考虑是否将这个外层没有取到任务的 Worker 释放了,因为他暂时没有事做了。

@1 处,如果线程池状态为 SHUTDOWN 并且 workQueue 为空,则返回为 Null。如果线程池状态 > SHUTDOWN 则返回 Null。同时执行 decrementWorkerCount 减少 ctl 中低 29 位表示的 workers 数量

@2 处,如果线程池 worker 数量大于 maximumPoolSize 或者当前线程池 worker 数量大于了 corePoolSize 并且其它 worker 在 keepAliveTime 都没有获取到任务,那么返回 null 并且减少 workCount 数量

当 getTask() 返回为 null 的时候 completedAbruptly = false; 表示任务是正常结束的,最终执行 processWorkerExit() 方法

processWorkerExit()

    private void processWorkerExit(Worker w, boolean completedAbruptly) {
        // 如果 runWorker() 上述操作是正常完成的即 completedAbruptly = false
        // 那么在 getTask() 逻辑里面已经进行了线程池 worker 数量 -1 的操作了
        // 这里就不需要再次执行了
        if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
            decrementWorkerCount();

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            // 完成的任务数量 +1
            completedTaskCount += w.completedTasks;
            // 将空闲的 worker 从线程池中移除
            workers.remove(w);
        } finally {
            mainLock.unlock();
        }

        // 尝试终止线程池
        tryTerminate();

        int c = ctl.get();
        // 如果线程池状态处于 RUNNING 和 SHUTDOWN
        if (runStateLessThan(c, STOP)) {
            // 不是异常结束的 runWorker()
            if (!completedAbruptly) {
                // 默认返回 corePoolSize
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                if (min == 0 && ! workQueue.isEmpty())
                    min = 1;
                // 如果线程池 worker 数量已经大于了 corePoolSize
                // 那么就直接返回
                if (workerCountOf(c) >= min)
                    return; // replacement not needed
            }
            // 如果是异常结束的或者线程池 worker 数量没有大于 corePoolSize
            // 那么可以再添加一个 worker 去执行任务
            addWorker(null, false);
        }
    }

这个方法的主要功能是如果一个 Worker 没有事做了,那么就可以将其释放了(主要是在线程池数量 > 核心线程池数量,并且队列任务比较少执行 Worker 大都比较闲),在释放的同时看下是否需要终止线程池。

shutdown()

    public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            // 将线程池状态设置为 SHUTDOWN
            advanceRunState(SHUTDOWN);
            // 然后去中断所有的 worker
            interruptIdleWorkers();
            // 留给子类使用的 hook 关闭后做一些事情
            onShutdown(); // hook for ScheduledThreadPoolExecutor
        } finally {
            mainLock.unlock();
        }
        // 尝试终止
        tryTerminate();
    }

这个方法的主要功能是将线程池的状态标记为 SHUTDOWN,此时拒绝接收新的任务,但是如果发现工作队列中还有任务,是可以添加一个不携带任务的非核心的 Wroker 去将队列里面的任务执行完成的,然后去中断所有的 worker

shutdownNow()

    public List<Runnable> shutdownNow() {
        List<Runnable> tasks;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            // 将线程池状态设置为 STOP
            advanceRunState(STOP);
            // 中断所有的 worker
            interruptWorkers();
            // 抛弃 workQueue 里面所有的任务
            tasks = drainQueue();
        } finally {
            mainLock.unlock();
        }
        // 尝试终止
        tryTerminate();
        return tasks;
    }

这个方法的主要功能是将线程池的状态标记为 STOP,此时拒绝接收新的任务,同时抛弃 workQueue 里面的所有任务,然后去中断所有的 worker

tryTerminate()

        for (;;) {
            int c = ctl.get();
            // 只有在线程池状态为 SHUTDOWN 并且队列不为空的时候才会继续执行
            // 或者线程池状态位 STOP 的时候才会继续执行
            // 否则的话就直接返回
            if (isRunning(c) ||
                runStateAtLeast(c, TIDYING) ||
                (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
                return;
            // 如果线程池 worker 数量不为 0 依次中断对应的 worker
            if (workerCountOf(c) != 0) { // Eligible to terminate
                interruptIdleWorkers(ONLY_ONE);
                return;
            }

            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // 将线程池的状态改为 TIDYING
                // 从这里也能看出为什么上面要判断只能线程池状态位 SHUTDOWN 和 STOP
                // 才能执行该方法 SHUTDOWN < STOP < TIDYING
                if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                    try {
                        // 然后调用 terminated 
                        terminated();
                    } finally {
                        // 最终将线程池状态改为 TERMINATED
                        ctl.set(ctlOf(TERMINATED, 0));
                        termination.signalAll();
                    }
                    return;
                }
            } finally {
                mainLock.unlock();
            }
            // 否则的话 CAS 不断循环处理
            // else retry on failed CAS
        }

从代码可以看到调用了 shutdown() 后线程池状态从 RUNNING 变为 SHUTDOWN,然后调用 tryTerminate() 如果 workQueue 不为空的话表示还有任务没有执行完那么不能终止需要等待队列里面的任务执行完毕后才能终止。

如果调用了 shutdownNow() 后线程池状态从 RUNNING 变为 STOP,并且将 workQueue 里面的任务全部移除了,最终线程池 worker 数量为 0 了后调用 terminated() 后将线程池状态设置为 TERMINATED

总结下状态的转换主要为

调用 shutdown() 后线程池状态从 RUNNING -> SHUTDOWN,调用之后的队列和线程池的任务都执行完成后那么 SHUTDOWN -> TIDYING

调用 shuwdownNow() 后线程池状态从 (RUNNING or SHUTDOWN) -> STOP,调用之后会抛弃队列里面等待执行的任务,然后等待线程池里面的任务执行完成后 STOP -> TIDYING

terminated() 方法执行完毕后 TIDYING -> TERMINATED

但是如果说线程不能正常结束或者不能响应中断那么意味着 shutdown() 和 shutdownNow() 都无法将线程池正常结束,如下。 调用 shutdownNow() 后

public class Test {

    public static void main(String[] args) {
        ThreadPoolExecutor threadPoolExecutor =
            new ThreadPoolExecutor(5,
                10,
                10,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(5));
        fillThreadPoll(threadPoolExecutor, 5);
        System.out.println("此时核心线程数已经满了");
        fillThreadPoll(threadPoolExecutor, 5);
        System.out.println("此时队列已经已经满了");
        fillThreadPoll(threadPoolExecutor, 5);
        System.out.println("此时最大线程数已经满了");
        threadPoolExecutor.shutdownNow();
    }

    private static void fillThreadPoll(ThreadPoolExecutor threadPoolExecutor, int size) {
        for (int i = 0; i < size; i++) {
            threadPoolExecutor.execute(() -> {
                while (true) {
                }
            });
        }
    }
}

调整下代码就能正常结束

    private static void fillThreadPoll(ThreadPoolExecutor threadPoolExecutor, int size) {
        for (int i = 0; i < size; i++) {
            threadPoolExecutor.execute(() -> {
                while (!Thread.currentThread().isInterrupted()) {

                }
            });
        }
    }

但是调用 shutdown() 发现还是无法正常结束,因为他会去调用 interruptIdleWorkers(false);

    private void interruptIdleWorkers(boolean onlyOne) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker w : workers) {
                Thread t = w.thread;
                // 这里无法获得锁
                if (!t.isInterrupted() && w.tryLock()) {
                    try {
                        t.interrupt();
                    } catch (SecurityException ignore) {
                    } finally {
                        w.unlock();
                    }
                }
                if (onlyOne)
                    break;
            }
        } finally {
            mainLock.unlock();
        }
    }

因为 w.tryLock() 无法获得锁,所有的 worker 都处于忙碌状态,每一个 worker 执行对应的任务,那个任务都没有结束,锁都没有是释放,所以 shutdown() 无法结束。

而 shutdownNow() 可以结束的原因是它调用的是

    private void interruptWorkers() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker w : workers)
                w.interruptIfStarted();
        } finally {
            mainLock.unlock();
        }
    }

    void interruptIfStarted() {
        Thread t;
        if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
            try {
                t.interrupt();
            } catch (SecurityException ignore) {
            }
        }
    }

可以看到就算 Worker 正在执行中没有释放锁也可以直接对其进行中断

拒绝任务

当ThreadPoolExecutor关闭、队列和线程池饱和时,会拒绝新提交的任务,同时调用RejectedExecutionHandler.rejectedExecution(Runnable, ThreadPoolExecutor)方法。

ThreadPoolExecutor预置了以下四种策略:

  • ThreadPoolExecutor.AbortPolicy,默认策略,在拒绝任务时,会抛出RejectedExecutionException。
  • ThreadPoolExecutor.CallerRunsPolicy,由提交的线程自己来执行(execute)当前提交的任务。这种策略提供了简单的反馈控制机制,能够降低新的任务提交的速率。
  • ThreadPoolExecutor.DiscardPolicy,简单粗暴的抛弃不能执行的任务。
  • ThreadPoolExecutor.DiscardOldestPolicy,如果ThreadPoolExecutor没有被关闭,那么删除队列头部的任务,并且再次尝试提交任务,如果仍然被拒绝,那么再删除队列头部任务,如此反复。 可以自定义RejectedExecutionHandler拒绝策略,但是要小心处理好策略生效时需要满足的条件,例如队列和线程池大小等等。

文章永久链接:https://tech.souyunku.com/26076

未经允许不得转载:搜云库技术团队 » ThreadPoolExecutor 原理及源码详细分析

JetBrains 全家桶,激活、破解、教程

提供 JetBrains 全家桶激活码、注册码、破解补丁下载及详细激活教程,支持 IntelliJ IDEA、PyCharm、WebStorm 等工具的永久激活。无论是破解教程,还是最新激活码,均可免费获得,帮助开发者解决常见激活问题,确保轻松破解并快速使用 JetBrains 软件。获取免费的破解补丁和激活码,快速解决激活难题,全面覆盖 2024/2025 版本!

联系我们联系我们