Netty框架的主要线程就是I/O线程,这一篇来学习Netty是如何设计优秀的线程模型的。Netty支持Reactor单线程模型、多线程模型和主从Reactor多线程模型。
本文分析的代码基于最新的master分支版本。
最常用的主从多线程模型有两个reactor线程池,Acceptor线程池用于处理客户端的TCP连接请求,I/O线程池用于处理读写编码等工作。
当服务端接收到一个新的请求时,处理流程如下:
1、 Acceptor线程池(图中Accept Pool)接收到客户端TCP连接,处理(接入认证等)。
2、 Acceptor将新创建的SocketChannel注册到I/O线程池的某个I/O线程上。
3、 I/O线程池(图中IO Pool)负责SocketChannel的I/O相关的读写和编解码工作。
一个常规的demo服务端启动代码如下:
//Acceptor线程池
EventLoopGroup bossGroup = new MultithreadEventLoopGroup(1, NioHandler.newFactory());
//I/O线程池
EventLoopGroup workerGroup = new MultithreadEventLoopGroup(NioHandler.newFactory());
final EchoServerHandler serverHandler = new EchoServerHandler();
try {
ServerBootstrap b = new ServerBootstrap();
//设置两个线程池
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
EventLoopGroup即线程池,如果要使用单线程模型,只需要把实现类换成SingleThreadEventLoop
即可。
一个Channel只能注册到一个EventLoop
上,所以这个Channel的所有I/O事件全都是一个线程串行化的处理,例如调用用户Handler处理业务逻辑等。
单线程模型
我们对Netty中比较简单易懂的单线程模型进行分析:
EventLoop
ScheduledExecutorService
是来自JDK的,可以提交定时任务的提交器,就不赘述了。
EventExecutorGroup
是多个EventExecutor
集合,由于集成了Iterable
接口,他可以迭代遍历EventExecutor
。最主要的方法是next()
,返回可用的EventExecutor
供用户使用。
EventExecutor
是一个特殊的EventExecutorGroup
,集合里只有一个EventExecutor
,调用next()
方法是返回自己。
EventLoopGroup
是EventLoop
集合,提供了将管道Channel注册到EventExecutorGroup
的方法,主要方法next()
和上面原理类似。
public interface EventLoopGroup extends EventExecutorGroup {
/**
* 返回下一个可用的{@link EventLoop}
*/
@Override
EventLoop next();
/**
* 把一个{@link Channel}注册到当前{@link EventLoop}上
*/
ChannelFuture register(Channel channel);
ChannelFuture register(ChannelPromise promise);
@Deprecated
ChannelFuture register(Channel channel, ChannelPromise promise);
}
EventLoop就是reactor线程,parent()方法会返回它所在的线程池。它的主要功能就是处理被注册的Channel的所有I/O操作。
public interface EventLoop extends OrderedEventExecutor, EventLoopGroup {
/**
* 返回它所在的EventLoopGroup
*
* @return
*/
@Override
EventLoopGroup parent();
}
EventExecutor
通过以上继承关系图,可以看出EventExecutorGroup
家族被分为了两类,左边EventExecutor
及其子类是单线程模型,右边EventExecutorGroup
及其子类是多线程模型。我们重点分析SingleThreadEventExecutor
的实现。
首先看EventExecutor
的主要方法:
public interface EventExecutor extends EventExecutorGroup {
/**
* 返回可用的EventExecutor,即自己
*/
@Override
EventExecutor next();
/**
* 执行器是用来提交任务的,背后一定会有一个或多个线程实现
* 判断线程是否是EventExecutor内部绑定的线程,是则返回true
*/
boolean inEventLoop(Thread thread);
/**
* 创建一个新的{@link Promise},并且这个Promise与当前EventExecutor绑定在一起
*/
<V> Promise<V> newPromise();
一般来说,Abstract类就是接口的骨架实现,AbstractScheduledEventExecutor
简单地实现了调度定时任务。
我们分析里面的几个重要方法:
public abstract class AbstractScheduledEventExecutor extends AbstractEventExecutor {
/**
* 类的初始化时间
*/
static final long START_TIME = System.nanoTime();
/**
* 空任务,用于唤醒阻塞队列
*/
static final Runnable WAKEUP_TASK = new Runnable() {
@Override
public void run() {
} // Do nothing
};
/**
* 存放定时任务的优先队列,下一次执行时间越早排在越前面
* 不是线程安全的
*/
PriorityQueue<ScheduledFutureTask<?>> scheduledTaskQueue;
/**
* 返回第一个在nanoTime时间内 应该执行的定时任务
*/
protected final RunnableScheduledFuture<?> pollScheduledTask(long nanoTime) {
//只有内部绑定线程才能操作
assert inEventLoop();
Queue<RunnableScheduledFutureNode<?>> scheduledTaskQueue = this.scheduledTaskQueue;
RunnableScheduledFutureNode<?> scheduledTask = scheduledTaskQueue == null ? null : scheduledTaskQueue.peek();
//空队列
if (scheduledTask == null) {
return null;
}
//在nanoTime时间内会执行,从队列中取出返回
if (scheduledTask.deadlineNanos() <= nanoTime) {
scheduledTaskQueue.remove();
return scheduledTask;
}
return null;
}
/**
* 增加一个定时任务
*/
protected final <V> ScheduledFuture<V> schedule(final RunnableScheduledFuture<V> task) {
//操作的线程是绑定线程
if (inEventLoop()) {
//直接添加
add0(task);
} else {
//封装成task扔到executor里面
execute(() -> add0(task));
}
return task;
}
private <V> void add0(RunnableScheduledFuture<V> task) {
final RunnableScheduledFutureNode node;
if (task instanceof RunnableScheduledFutureNode) {
node = (RunnableScheduledFutureNode) task;
} else {
node = new DefaultRunnableScheduledFutureNode<V>(task);
}
//添加到定时任务队列
scheduledTaskQueue().add(node);
}
/**
* 删除一个定时任务
*/
final void removeScheduled(final RunnableScheduledFutureNode<?> task) {
//操作的线程是绑定线程
if (inEventLoop()) {
//直接从队列里删除
scheduledTaskQueue().removeTyped(task);
} else {
//封装成task扔到executor里面
execute(() -> removeScheduled(task));
}
}
scheduledTaskQueue
不是线程安全的,添加任务删除任务等所有操作如果不是来自EventExecutor
内部的线程,都会被封装成task统一由内部线程串行地执行。
SingleThreadEventExecutor
继承了AbstractScheduledEventExecutor
,可以调度非定时任务和定时任务。
首先来看SingleThreadEventExecutor
的使用:
public static void main(String[] args) throws InterruptedException {
ExecutorService executor = Executors.newSingleThreadExecutor();
//内部需要一个executor运行主要逻辑
SingleThreadEventExecutor eventExecutor = new SingleThreadEventExecutor(executor);
CountDownLatch latch = new CountDownLatch(1);
//提交一个任务
eventExecutor.submit(() -> {
System.out.println("run task");
latch.countDown();
});
latch.await();
//关闭
Future<?> future = eventExecutor.shutdownGracefully();
future.addListener(future1 -> {
System.out.println("shuwdown over");
});
}
分析内部重要属性:
/**
* 线程的各种状态
*/
private static final int ST_NOT_STARTED = 1;
private static final int ST_STARTED = 2;
private static final int ST_SHUTTING_DOWN = 3;
private static final int ST_SHUTDOWN = 4;
private static final int ST_TERMINATED = 5;
/**
* 线程的当前状态
*/
private static final AtomicIntegerFieldUpdater<SingleThreadEventExecutor> STATE_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(SingleThreadEventExecutor.class, "state");
private volatile int state = ST_NOT_STARTED;
/**
* 存放待执行任务的队列
*/
private final Queue<Runnable> taskQueue;
/**
* 与当前EventExecuor绑定的线程
*/
private volatile Thread thread;
/**
* 与当前EventExecuor绑定的executor
* 由于执行主要逻辑
*/
private final Executor executor;
/**
* 上一次执行任务的时间
*/
private long lastExecutionTime;
SingleThreadEventExecutor
内部总共有两个队列,scheduledTaskQueue用于存放待处理的定时任务,taskQueue存放待执行的任务。一旦提交一个定时任务,都会放入scheduledTaskQueue,提交普通任务会放入taskQueue。然后内部线程不断轮询这两个队列,把时间到了的定时任务从scheduledTaskQueue中取出放入taskQueue,不断执行taskQueue里面的task。
接下来我们重点分析两个主要方法:
NIO线程的核心逻辑
核心逻辑很简单,是一个死循环,不断地取出任务,执行。
protected void run() {
assert inEventLoop();
do {
//取出一个任务
Runnable task = takeTask();
if (task != null) {
//执行
task.run();
//更新最新执行时间
updateLastExecutionTime();
}
//判断state状态,是否要shutdown
} while (!confirmShutdown());
}
@Override
public final boolean inEventLoop(Thread thread) {
return thread == this.thread;
}
取出任务的逻辑:
protected final Runnable takeTask() {
assert inEventLoop();
if (!(taskQueue instanceof BlockingQueue)) {
throw new UnsupportedOperationException();
}
BlockingQueue<Runnable> taskQueue = (BlockingQueue<Runnable>) this.taskQueue;
for (;;) {
//查看有没有定时任务(不取出)
RunnableScheduledFuture<?> scheduledTask = peekScheduledTask();
if (scheduledTask == null) {
//没有待处理的定时任务
Runnable task = null;
try {
//从taskQueue中获取待执行任务,为空则阻塞
task = taskQueue.take();
//遇到WAKEUP_TASK则返回null
if (task == WAKEUP_TASK) {
task = null;
}
} catch (InterruptedException e) {
// Ignore
}
return task;
} else {
//有待处理定时任务
//获取定时任务下一次执行的时间
long delayNanos = scheduledTask.delayNanos();
Runnable task = null;
if (delayNanos > 0) {
//还没到执行的时候,获取待执行任务(超过最长等待时间,就进入下一次循环,去处理定时任务)
try {
task = taskQueue.poll(delayNanos, TimeUnit.NANOSECONDS);
} catch (InterruptedException e) {
// Waken up.
return null;
}
}
// 运行到此处有两种情况:
// 1. 定时任务delayNanos=0,需要立刻执行
// 2. 定时任务delayNanos>0,
// 但是taskQueue.poll已经等待了delayNanos也没有需要执行的任务,定时任务需要立刻执行
if (task == null) {
//处理定时任务,从ScheduledTaskQueue中取出,放入taskQueue
fetchFromScheduledTaskQueue();
//取出待执行任务
task = taskQueue.poll();
}
if (task != null) {
return task;
}
}
}
}
NIO线程的启动
当我们提交一个任务,即调用executor()
,就会自动启动这个EventExecutor。主要逻辑是将任务放入taskQueue,如果内部线程没启动,就启动它。
@Override
public void execute(Runnable task) {
requireNonNull(task, "task");
//判断是由内部线程还是用户线程调用的
boolean inEventLoop = inEventLoop();
//将task加入taskQueue
addTask(task);
//如果是用户线程调用,说明executor可能还没启动
if (!inEventLoop) {
//启动线程
startThread();
//判断状态是否是shutdown,如果是就要拒绝任务
if (isShutdown()) {
boolean reject = false;
try {
if (removeTask(task)) {
reject = true;
}
} catch (UnsupportedOperationException e) {
}
if (reject) {
reject();
}
}
}
if (!addTaskWakesUp && wakesUpForTask(task)) {
wakeup(inEventLoop);
}
}
private void startThread() {
//如果状态是未启动
if (state == ST_NOT_STARTED) {
//修改状态
if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
boolean success = false;
try {
//启动线程
doStartThread();
success = true;
} finally {
if (!success) {
STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED);
}
}
}
}
}
内部线程所做的逻辑就是上文分析的nio线程的核心逻辑,run()
是一个死循环,只有用户执行了shutdownGracefully()
,将state改成ST_SHUTTING_DOWN
才会退出循环。一旦退出运行,就会执行优雅关闭的逻辑。
查看关闭方法参数:
public final Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit)
quietPerod
代表超过这个时间没有新任务,则关闭。
timeout
代表等待关闭的最长时间,就算一直有新任务提交,一旦超过timeout,也会强制关闭。
优雅关闭EventExecutor
总共有三个阶段:
1、 state=ST_SHUTTING_DOWN
在这个阶段,线程池还可以接收新任务,`EventExecutor`会试图跑完所有任务,直到`quietPerod`的时间没有新任务提交,或者`timeout`。
2、 state=ST_SHUTDOWN
这个阶段会拒绝新任务,并将最后一波任务跑完。
3、 state=ST_TERMINATED
完成关闭,通知`terminationFuture`。
优雅关闭的源码:
private void doStartThread() {
assert thread == null;
executor.execute(() -> {
thread = Thread.currentThread();
if (interrupted) {
thread.interrupt();
}
boolean success = false;
updateLastExecutionTime();
try {
//开始nio线程的主要逻辑,这是一个死循环,state >= ST_SHUTTING_DOWN才会退出循环
SingleThreadEventExecutor.this.run(); success = true;
} catch (Throwable t) {
logger.warn("Unexpected exception from an event executor: ", t);
} finally {
//退出循环后开始优雅关闭
//把state修改为ST_SHUTTING_DOWN
for (;;) {
int oldState = state;
if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {
break;
}
}
// Check if confirmShutdown() was called at the end of the loop.
if (success && gracefulShutdownStartTime == 0) {
if (logger.isErrorEnabled()) {
logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " + SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must " + "be called before run() implementation terminates.");
}
}
try {
// 取消所有定时任务,跑完所有任务和hooks
// 此时可以接受新任务
// 一旦在这个循环里运行的时间 > gracefulShutdownTimeout 或 >= quietTime没有新任务
// 则进入下一阶段
for (;;) {
if (confirmShutdown()) {
break;
}
}
//把state修改为ST_SHUTDOWN
//此时不能接受新任务了
for (;;) {
int oldState = state;
if (oldState >= ST_SHUTDOWN || STATE_UPDATER.compareAndSet(
SingleThreadEventExecutor.this, oldState, ST_SHUTDOWN)) {
break;
}
}
//把最后一波任务跑完,因为不能接受新任务,没必要循环了
confirmShutdown();
} finally {
try {
cleanup();
} finally {
FastThreadLocal.removeAll();
//把state修改为ST_TERMINATED
STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);
threadLock.countDown();
int numUserTasks = drainTasks();
if (numUserTasks > 0 && logger.isWarnEnabled()) {
logger.warn("An event executor terminated with " + "non-empty task queue (" + numUserTasks + ')');
}
terminationFuture.setSuccess(null);
}
}
}
});
}
总结
SingleThreadEventExecutor
内部有两个队列,分别存放待处理定时任务和待执行任务。内部有一个线程不断取出I/O任务执行。因此如果用户的业务逻辑复杂,不要在NIO线程上完成,可以将解码后的消息封装成Task,派发到业务线程池中由业务线程执行,以保证NIO线程尽快释放,处理其他的I/O操作。