专注于 JetBrains IDEA 全家桶,永久激活,教程
持续更新 PyCharm,IDEA,WebStorm,PhpStorm,DataGrip,RubyMine,CLion,AppCode 永久激活教程

从netty源码解读优秀的线程模型

Netty框架的主要线程就是I/O线程,这一篇来学习Netty是如何设计优秀的线程模型的。Netty支持Reactor单线程模型、多线程模型和主从Reactor多线程模型。

本文分析的代码基于最新的master分支版本。

最常用的主从多线程模型有两个reactor线程池,Acceptor线程池用于处理客户端的TCP连接请求,I/O线程池用于处理读写编码等工作。

当服务端接收到一个新的请求时,处理流程如下:

1、 Acceptor线程池(图中Accept Pool)接收到客户端TCP连接,处理(接入认证等)。
2、 Acceptor将新创建的SocketChannel注册到I/O线程池的某个I/O线程上。
3、 I/O线程池(图中IO Pool)负责SocketChannel的I/O相关的读写和编解码工作

87_1.png

一个常规的demo服务端启动代码如下:

//Acceptor线程池
EventLoopGroup bossGroup = new MultithreadEventLoopGroup(1, NioHandler.newFactory());
//I/O线程池
EventLoopGroup workerGroup = new MultithreadEventLoopGroup(NioHandler.newFactory());
final EchoServerHandler serverHandler = new EchoServerHandler();
try {
    ServerBootstrap b = new ServerBootstrap();
  //设置两个线程池
    b.group(bossGroup, workerGroup)
    .channel(NioServerSocketChannel.class)
    .option(ChannelOption.SO_BACKLOG, 100)
    .handler(new LoggingHandler(LogLevel.INFO))
    .childHandler(new ChannelInitializer<SocketChannel>() {
        @Override
        public void initChannel(SocketChannel ch) throws Exception {

EventLoopGroup即线程池,如果要使用单线程模型,只需要把实现类换成SingleThreadEventLoop即可。

一个Channel只能注册到一个EventLoop上,所以这个Channel的所有I/O事件全都是一个线程串行化的处理,例如调用用户Handler处理业务逻辑等。

单线程模型

我们对Netty中比较简单易懂的单线程模型进行分析:

EventLoop

87_2.png

ScheduledExecutorService是来自JDK的,可以提交定时任务的提交器,就不赘述了。

EventExecutorGroup是多个EventExecutor集合,由于集成了Iterable接口,他可以迭代遍历EventExecutor。最主要的方法是next(),返回可用的EventExecutor供用户使用。

EventExecutor是一个特殊的EventExecutorGroup,集合里只有一个EventExecutor,调用next()方法是返回自己。

EventLoopGroupEventLoop集合,提供了将管道Channel注册到EventExecutorGroup的方法,主要方法next()和上面原理类似。

public interface EventLoopGroup extends EventExecutorGroup {
    /**
     * 返回下一个可用的{@link EventLoop}
     */
    @Override
    EventLoop next();

    /**
     * 把一个{@link Channel}注册到当前{@link EventLoop}上
     */
    ChannelFuture register(Channel channel);

    ChannelFuture register(ChannelPromise promise);

    @Deprecated
    ChannelFuture register(Channel channel, ChannelPromise promise);
}

EventLoop就是reactor线程,parent()方法会返回它所在的线程池。它的主要功能就是处理被注册的Channel的所有I/O操作。

public interface EventLoop extends OrderedEventExecutor, EventLoopGroup {
    /**
     * 返回它所在的EventLoopGroup
     *
     * @return
     */
    @Override
    EventLoopGroup parent();
}

EventExecutor

87_3.png

通过以上继承关系图,可以看出EventExecutorGroup家族被分为了两类,左边EventExecutor及其子类是单线程模型,右边EventExecutorGroup及其子类是多线程模型。我们重点分析SingleThreadEventExecutor的实现。

首先看EventExecutor的主要方法:

public interface EventExecutor extends EventExecutorGroup {

    /**
     * 返回可用的EventExecutor,即自己
     */
    @Override
    EventExecutor next();

    /**
     * 执行器是用来提交任务的,背后一定会有一个或多个线程实现
     * 判断线程是否是EventExecutor内部绑定的线程,是则返回true
     */
    boolean inEventLoop(Thread thread);

    /**
     * 创建一个新的{@link Promise},并且这个Promise与当前EventExecutor绑定在一起
     */
    <V> Promise<V> newPromise();

一般来说,Abstract类就是接口的骨架实现,AbstractScheduledEventExecutor简单地实现了调度定时任务。

我们分析里面的几个重要方法:

public abstract class AbstractScheduledEventExecutor extends AbstractEventExecutor {

    /**
     * 类的初始化时间
     */
    static final long START_TIME = System.nanoTime();

     /**
     * 空任务,用于唤醒阻塞队列
     */
    static final Runnable WAKEUP_TASK = new Runnable() {
        @Override
        public void run() {
        } // Do nothing
    };

    /**
     * 存放定时任务的优先队列,下一次执行时间越早排在越前面
     * 不是线程安全的
     */
    PriorityQueue<ScheduledFutureTask<?>> scheduledTaskQueue;

/**
 * 返回第一个在nanoTime时间内 应该执行的定时任务
 */
protected final RunnableScheduledFuture<?> pollScheduledTask(long nanoTime) {
    //只有内部绑定线程才能操作
    assert inEventLoop();

    Queue<RunnableScheduledFutureNode<?>> scheduledTaskQueue = this.scheduledTaskQueue;
    RunnableScheduledFutureNode<?> scheduledTask = scheduledTaskQueue == null ? null : scheduledTaskQueue.peek();
    //空队列
    if (scheduledTask == null) {
        return null;
    }

    //在nanoTime时间内会执行,从队列中取出返回
    if (scheduledTask.deadlineNanos() <= nanoTime) {
        scheduledTaskQueue.remove();
        return scheduledTask;
    }
    return null;
}

/**
 * 增加一个定时任务
 */
protected final <V> ScheduledFuture<V> schedule(final RunnableScheduledFuture<V> task) {
    //操作的线程是绑定线程
    if (inEventLoop()) {
        //直接添加
        add0(task);
    } else {
        //封装成task扔到executor里面
        execute(() -> add0(task));
    }
    return task;
}

private <V> void add0(RunnableScheduledFuture<V> task) {
    final RunnableScheduledFutureNode node;
    if (task instanceof RunnableScheduledFutureNode) {
        node = (RunnableScheduledFutureNode) task;
    } else {
        node = new DefaultRunnableScheduledFutureNode<V>(task);
    }
    //添加到定时任务队列
    scheduledTaskQueue().add(node);
}

/**
 * 删除一个定时任务
 */
final void removeScheduled(final RunnableScheduledFutureNode<?> task) {
    //操作的线程是绑定线程
    if (inEventLoop()) {
        //直接从队列里删除
        scheduledTaskQueue().removeTyped(task);
    } else {
        //封装成task扔到executor里面
        execute(() -> removeScheduled(task));
    }
}

scheduledTaskQueue不是线程安全的,添加任务删除任务等所有操作如果不是来自EventExecutor内部的线程,都会被封装成task统一由内部线程串行地执行。

SingleThreadEventExecutor继承了AbstractScheduledEventExecutor,可以调度非定时任务和定时任务。

首先来看SingleThreadEventExecutor的使用:

 public static void main(String[] args) throws InterruptedException {
    ExecutorService executor = Executors.newSingleThreadExecutor();
    //内部需要一个executor运行主要逻辑
    SingleThreadEventExecutor eventExecutor = new SingleThreadEventExecutor(executor);

    CountDownLatch latch = new CountDownLatch(1);

    //提交一个任务
    eventExecutor.submit(() -> {
        System.out.println("run task");
        latch.countDown();
    });

    latch.await();

    //关闭
    Future<?> future = eventExecutor.shutdownGracefully();

    future.addListener(future1 -> {
        System.out.println("shuwdown over");
    });
}

分析内部重要属性:

/**
 * 线程的各种状态
 */
private static final int ST_NOT_STARTED = 1;
private static final int ST_STARTED = 2;
private static final int ST_SHUTTING_DOWN = 3;
private static final int ST_SHUTDOWN = 4;
private static final int ST_TERMINATED = 5;

/**
 * 线程的当前状态
 */
private static final AtomicIntegerFieldUpdater<SingleThreadEventExecutor> STATE_UPDATER =
         AtomicIntegerFieldUpdater.newUpdater(SingleThreadEventExecutor.class, "state");
private volatile int state = ST_NOT_STARTED;

/**
 * 存放待执行任务的队列
 */
private final Queue<Runnable> taskQueue;

/**
 * 与当前EventExecuor绑定的线程
 */
private volatile Thread thread;

/**
 * 与当前EventExecuor绑定的executor
 * 由于执行主要逻辑
 */
private final Executor executor;

/**
 * 上一次执行任务的时间
 */
private long lastExecutionTime;

SingleThreadEventExecutor内部总共有两个队列,scheduledTaskQueue用于存放待处理的定时任务,taskQueue存放待执行的任务。一旦提交一个定时任务,都会放入scheduledTaskQueue,提交普通任务会放入taskQueue。然后内部线程不断轮询这两个队列,把时间到了的定时任务从scheduledTaskQueue中取出放入taskQueue,不断执行taskQueue里面的task。

87_4.png

接下来我们重点分析两个主要方法:

NIO线程的核心逻辑

核心逻辑很简单,是一个死循环,不断地取出任务,执行。

protected void run() {
    assert inEventLoop();
    do {
        //取出一个任务
        Runnable task = takeTask();
        if (task != null) {
        //执行
        task.run();
        //更新最新执行时间
        updateLastExecutionTime();
        }
        //判断state状态,是否要shutdown
    } while (!confirmShutdown());
}

@Override
public final boolean inEventLoop(Thread thread) {
    return thread == this.thread;   
}

取出任务的逻辑:

protected final Runnable takeTask() {
    assert inEventLoop();
    if (!(taskQueue instanceof BlockingQueue)) {
        throw new UnsupportedOperationException();
    }

    BlockingQueue<Runnable> taskQueue = (BlockingQueue<Runnable>) this.taskQueue;
        for (;;) {
            //查看有没有定时任务(不取出)
            RunnableScheduledFuture<?> scheduledTask = peekScheduledTask();
            if (scheduledTask == null) {
                //没有待处理的定时任务
                Runnable task = null;
                try {
                    //从taskQueue中获取待执行任务,为空则阻塞
                    task = taskQueue.take();
                    //遇到WAKEUP_TASK则返回null
                    if (task == WAKEUP_TASK) {
                        task = null;
                    }
                } catch (InterruptedException e) {
                    // Ignore
                }
                return task;
            } else {
                //有待处理定时任务
                //获取定时任务下一次执行的时间
                long delayNanos = scheduledTask.delayNanos();
                Runnable task = null;
                    if (delayNanos > 0) {
                    //还没到执行的时候,获取待执行任务(超过最长等待时间,就进入下一次循环,去处理定时任务)
                        try {
                            task = taskQueue.poll(delayNanos, TimeUnit.NANOSECONDS);
                        } catch (InterruptedException e) {
                            // Waken up.
                            return null;
                        }
                    }

            // 运行到此处有两种情况:
            // 1. 定时任务delayNanos=0,需要立刻执行
            // 2. 定时任务delayNanos>0,
            // 但是taskQueue.poll已经等待了delayNanos也没有需要执行的任务,定时任务需要立刻执行
            if (task == null) {
                //处理定时任务,从ScheduledTaskQueue中取出,放入taskQueue
                fetchFromScheduledTaskQueue();
                //取出待执行任务
                task = taskQueue.poll();
            }

            if (task != null) {
                return task;
            }
        }
    }
}

NIO线程的启动

当我们提交一个任务,即调用executor(),就会自动启动这个EventExecutor。主要逻辑是将任务放入taskQueue,如果内部线程没启动,就启动它。

@Override
public void execute(Runnable task) {
    requireNonNull(task, "task");

    //判断是由内部线程还是用户线程调用的
    boolean inEventLoop = inEventLoop();

    //将task加入taskQueue
    addTask(task);

    //如果是用户线程调用,说明executor可能还没启动
    if (!inEventLoop) {
        //启动线程
        startThread();
        //判断状态是否是shutdown,如果是就要拒绝任务
        if (isShutdown()) {
            boolean reject = false;
            try {
                if (removeTask(task)) {
                    reject = true;
                }
            } catch (UnsupportedOperationException e) {
            }
            if (reject) {
                reject();
            }
        }
    }

    if (!addTaskWakesUp && wakesUpForTask(task)) {
        wakeup(inEventLoop);
    }
}

private void startThread() {
    //如果状态是未启动
    if (state == ST_NOT_STARTED) {
        //修改状态
        if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
            boolean success = false;
            try {
                //启动线程
                doStartThread();
                success = true;
            } finally {
                if (!success) {
                    STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED);
                }
            }
        }
    }
}

内部线程所做的逻辑就是上文分析的nio线程的核心逻辑,run()是一个死循环,只有用户执行了shutdownGracefully(),将state改成ST_SHUTTING_DOWN才会退出循环。一旦退出运行,就会执行优雅关闭的逻辑。

查看关闭方法参数:

public final Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) 

quietPerod代表超过这个时间没有新任务,则关闭。

timeout代表等待关闭的最长时间,就算一直有新任务提交,一旦超过timeout,也会强制关闭。

优雅关闭EventExecutor总共有三个阶段:

1、 state=ST_SHUTTING_DOWN

在这个阶段,线程池还可以接收新任务,`EventExecutor`会试图跑完所有任务,直到`quietPerod`的时间没有新任务提交,或者`timeout`。

2、 state=ST_SHUTDOWN

这个阶段会拒绝新任务,并将最后一波任务跑完。

3、 state=ST_TERMINATED

完成关闭,通知`terminationFuture`。

87_5.png

优雅关闭的源码:

private void doStartThread() {
    assert thread == null;
    executor.execute(() -> {
        thread = Thread.currentThread();
        if (interrupted) {
        thread.interrupt();
        }

        boolean success = false;
        updateLastExecutionTime();
        try {
            //开始nio线程的主要逻辑,这是一个死循环,state >= ST_SHUTTING_DOWN才会退出循环
            SingleThreadEventExecutor.this.run();                                    success = true;
        } catch (Throwable t) {
            logger.warn("Unexpected exception from an event executor: ", t);
        } finally {
            //退出循环后开始优雅关闭

            //把state修改为ST_SHUTTING_DOWN
            for (;;) {
                int oldState = state;
                if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {
                    break;
                }
            }

            // Check if confirmShutdown() was called at the end of the loop.
            if (success && gracefulShutdownStartTime == 0) {
                if (logger.isErrorEnabled()) {
                    logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " + SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must " + "be called before run() implementation terminates.");
                }
            }

            try {
                // 取消所有定时任务,跑完所有任务和hooks
                // 此时可以接受新任务
                // 一旦在这个循环里运行的时间 > gracefulShutdownTimeout 或 >= quietTime没有新任务
                // 则进入下一阶段
                for (;;) {
                    if (confirmShutdown()) {
                    break;
                    }
                }

                //把state修改为ST_SHUTDOWN
                //此时不能接受新任务了
                for (;;) {
                    int oldState = state;
                    if (oldState >= ST_SHUTDOWN || STATE_UPDATER.compareAndSet(
                        SingleThreadEventExecutor.this, oldState, ST_SHUTDOWN)) {
                            break;
                        }
                    }

                //把最后一波任务跑完,因为不能接受新任务,没必要循环了
                confirmShutdown();
            } finally {
                try {
                    cleanup();
                } finally {
                    FastThreadLocal.removeAll();

                    //把state修改为ST_TERMINATED
                    STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);

                    threadLock.countDown();

                    int numUserTasks = drainTasks();
                    if (numUserTasks > 0 && logger.isWarnEnabled()) {
                        logger.warn("An event executor terminated with " + "non-empty task queue (" + numUserTasks + ')');
                    }
                    terminationFuture.setSuccess(null);
                }
            }
        }
    });
}

总结

SingleThreadEventExecutor内部有两个队列,分别存放待处理定时任务和待执行任务。内部有一个线程不断取出I/O任务执行。因此如果用户的业务逻辑复杂,不要在NIO线程上完成,可以将解码后的消息封装成Task,派发到业务线程池中由业务线程执行,以保证NIO线程尽快释放,处理其他的I/O操作。

文章永久链接:https://tech.souyunku.com/47156

未经允许不得转载:搜云库技术团队 » 从netty源码解读优秀的线程模型

JetBrains 全家桶,激活、破解、教程

提供 JetBrains 全家桶激活码、注册码、破解补丁下载及详细激活教程,支持 IntelliJ IDEA、PyCharm、WebStorm 等工具的永久激活。无论是破解教程,还是最新激活码,均可免费获得,帮助开发者解决常见激活问题,确保轻松破解并快速使用 JetBrains 软件。获取免费的破解补丁和激活码,快速解决激活难题,全面覆盖 2024/2025 版本!

联系我们联系我们