专注于 JetBrains IDEA 全家桶,永久激活,教程
持续更新 PyCharm,IDEA,WebStorm,PhpStorm,DataGrip,RubyMine,CLion,AppCode 永久激活教程

线程池相关原理解析

一、 线程池作用

  • 方便管理,复用线程
  • 避免重复的创建和销毁线程,减少开销
  • 避免无限创建线程引起的OutOfMemoryError【简称OOM】

二、创建

JDK中Executors提供了四个静态方法快速创建线程池。

  • newFixedThreadPool
  • newSingleThreadExecutor
  • newCachedThreadPool
  • newSingleThreadScheduledExecutor

同步分析可以得知,以上几种线程池都是创建的ThreadPoolExecutor对象,过程如下:

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

ThreadPoolExecutor类构造函数参数含义:

1、 corePoolSize:核心线程池数量

  • 当前线程数少于核心数量时,有新任务进来就新建一个线程
  • 当前线程数大于核心数量时,新任务会加到任务队列中或者创建非核心线程

1、 maximumPoolSize:最大线程数量

  • 包括核心线程池数量 + 核心以外的数量
  • 如果任务队列满了,并且池中线程数小于最大线程数,会再创建新的线程执行任务

1、 keepAliveTime:非核心线程的最大空闲时间

  • 如果给线程池设置 allowCoreThreadTimeOut(true),这个参数也可以作用于核心线程

1、 workQueue:保存待执行任务的阻塞队列,JDK提供了几种实现,后面讨论
2、 threadFactory:创建线程的方法,默认使用DefaultThreadFactory
3、 handler:任务拒绝策略

  • CallerRunsPolicy:只要线程池没关闭,就直接用调用者所在线程来运行任务
  • AbortPolicy:直接抛出 RejectedExecutionException 异常
  • DiscardPolicy:没有任何提示,放弃任务
  • DiscardOldestPolicy:丢弃队列中最旧的任务
  • 我们也可以实现自己的 RejectedExecutionHandler 接口自定义策略

现在,我们着重分析一下上面提到的四种创建线程的方式:

  • newFixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>(),
                                      threadFactory);
    }

可以看到,核心线程和最大线程相同,即线程数量是固定的。keepAliveTime=0,空闲的线程会立即终止。阻塞队列使用的是 LinkedBlockingQueue,使用的是其无参构造方法,没有指定队列的容量,那么它的容量就是Integer.MAX_VALUE,基本相当于没有上限。

  • newSingleThreadExecutor
public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

核心线程和最大线程相同,只有一个线程。。keepAliveTime=0,空闲的线程会立即终止。阻塞队列和newFixedThreadPool一样。

  • newCachedThreadPool
public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

没有核心线程,最大线程数为Integer.MAX_VALUE,队列是SynchronousQueue,不储存元素。有新任务时,没有空闲线程则创建线程。

  • newSingleThreadScheduledExecutor(以后解析)

    使用的是子类ScheduledThreadPoolExecutor,另外提供了两种执行任务的方式:

    1. scheduleAtFixedRate(Runnable command,long initialDelay, long 1, TimeUnit.MINUTES), 以固定的速率执行,比如每隔一分钟执行一次。
    2. scheduleWithFixedDelay(Runnable command,long initialDelay, long 1,TimeUnit.MINUTES), 上次任务结束后1分钟,下次任务再执行。

至此,线程池创建完毕,接下来是添加任务到线程池中。

ExecutorService 提供了两种提交任务的方法:

1、 execute():提交不需要返回值的任务

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }

其步骤可分为:

1、 当前线程数小于核心线程数,创建一个线程
2、 如果超过核心线程且任务队列未满,则加入队列中等待执行
3、 如果加入队列失败,则创建非核心线程执行任务(不得超过最大线程数量)

33_1.png 这里把添加任务 addWorker() 的代码贴出来:

private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);
            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;
            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

addWorker() 前面一部分是关于线程池数量限制的运行状态的判断,这里不做分析,需要注意的是,添加任务的时候用到了 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); 这是一个同步的操作。

1、 submit():提交需要返回值的任务 在AbstractExecutorService中定义了 submit() 的重载方法:

Future<?> submit(Runnable task)
Future<T> submit(Runnable task, T result);
Future<T> submit(Callable<T> task)

同时它会返回一个 FutureTask 对象,通过它我们可以判断任务是否执行成功。

获得执行结果调用 FutureTask.get() 方法(也可以使用支持超时的方法),这个方法会阻塞当前线程直到任务完成,阻塞的过程如下:

 public V get() throws InterruptedException, ExecutionException {
        int s = state;
        if (s <= COMPLETING)
            s = awaitDone(false, 0L);
        return report(s);
    }
private int awaitDone(boolean timed, long nanos)
        throws InterruptedException {
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        WaitNode q = null;
        boolean queued = false;
        for (;;) {
            if (Thread.interrupted()) {
                removeWaiter(q);
                throw new InterruptedException();
            }
            int s = state;
            if (s > COMPLETING) {
                if (q != null)
                    q.thread = null;
                return s;
            }
            else if (s == COMPLETING) // cannot time out yet
                Thread.yield();
            else if (q == null)
                q = new WaitNode();
            else if (!queued)
                queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                     q.next = waiters, q);
            else if (timed) {
                nanos = deadline - System.nanoTime();
                if (nanos <= 0L) {
                    removeWaiter(q);
                    return state;
                }
                LockSupport.parkNanos(this, nanos);
            }
            else
                LockSupport.park(this);
        }
    }

awaitDone() 方法内通过不断的循环,形成阻塞,直到返回结果(超时)。


三、关闭线程池

  • shutdown()
  • shutdownNow()

四、其他

Excetors提供的方法容易造成OOM分析:

1、 FixedThreadPool和SingleThreadExecutor => 允许的请求队列长度为 Integer.MAX_VALUE,可能会堆积大量的请求,从而引起OOM异常
2、 CachedThreadPool => 允许创建的线程数为 Integer.MAX_VALUE,可能会创建大量的线程,从而引起OOM异常

本文使用 tech.souyunku.com 排版

文章永久链接:https://tech.souyunku.com/37459

未经允许不得转载:搜云库技术团队 » 线程池相关原理解析

JetBrains 全家桶,激活、破解、教程

提供 JetBrains 全家桶激活码、注册码、破解补丁下载及详细激活教程,支持 IntelliJ IDEA、PyCharm、WebStorm 等工具的永久激活。无论是破解教程,还是最新激活码,均可免费获得,帮助开发者解决常见激活问题,确保轻松破解并快速使用 JetBrains 软件。获取免费的破解补丁和激活码,快速解决激活难题,全面覆盖 2024/2025 版本!

联系我们联系我们