上一篇我们了解了一下Netty三个核心类的主要作用,而且还大概勾画了一下Netty的运行机制。那么这一篇我们就从Netty服务端代码来剖析一下。
public static void main(String[] args) throws InterruptedException {
EventLoopGroup boss = new NioEventLoopGroup(1);
EventLoopGroup worker = new NioEventLoopGroup(8);
ServerBootstrap b = new ServerBootstrap();
try {
b.group(boss, worker)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
//这两个handle为netty自带的
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new StringEncoder());
//这是实现我们自定义逻辑的handle
ch.pipeline().addLast(new ServerExampleHandler());
}
});
b.bind(8080).channel().closeFuture().sync();
} finally {
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
1.首先创建EventLoopGroup
首先我们创建了两个EventLoopGroup,上一篇我们说过这个类的主要作用是持有EventLoop,并把获取到的任务均匀的分配给他们。所以我们来看一下他的构造函数:
public NioEventLoopGroup(int nThreads) {
this(nThreads, (Executor) null);
}
public NioEventLoopGroup(int nThreads, Executor executor) {
this(nThreads, executor, SelectorProvider.provider());
}
public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider) {
this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
}
public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,
final SelectStrategyFactory selectStrategyFactory) {
super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
}
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}
//这边引入了选择器
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
}
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
//...
children = new EventExecutor[nThreads];
//线程数量决定了内部持有几个EventLoop
for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
children[i] = newChild(executor, args);
success = true;
} catch (Exception e) {
// ...
} finally {
//...
}
}
//根据选择工厂,创建了选择器。
chooser = chooserFactory.newChooser(children);
//...
}
构造函数的代码跟踪比较简单,现阶段我们主要需要关注两个参数,一个是我们传进来的线程数量,还有一个就是倒数第二层引入的选择器工厂。为了看着更清晰,我保留了主要的逻辑代码。
下面我们看看newChild方法和选择器的创建。
//newChild方法的具体实现在NioeventLoopGroup中
//可以清晰的看到NioeventLoopGroup内部持有的child就是NioEventLoop
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
EventLoopTaskQueueFactory queueFactory = args.length == 4 ? (EventLoopTaskQueueFactory) args[3] : null;
return new NioEventLoop(this, executor, (SelectorProvider) args[0],
((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2], queueFactory);
}
//chooser的创建也比较简单,首先判断child的数量是否是2的指数
//如2,4,8,16等如果是则生成指数对应的选择器,如果不是则是普通的
//这两个选择器的next方法大家可以自行查看。
public EventExecutorChooser newChooser(EventExecutor[] executors) {
if (isPowerOfTwo(executors.length)) {
return new PowerOfTwoEventExecutorChooser(executors);
} else {
return new GenericEventExecutorChooser(executors);
}
}
2.设置ServerBootstrap的参数。
ServerBootstrap是服务端的启动引导器。我们来看看启动前我们配置了啥。
//首先将第一个参数传递给父类,第二个参数自己保留
public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
super.group(parentGroup);
if (this.childGroup != null) {
throw new IllegalStateException("childGroup set already");
}
this.childGroup = ObjectUtil.checkNotNull(childGroup, "childGroup");
return this;
}
public B group(EventLoopGroup group) {
ObjectUtil.checkNotNull(group, "group");
if (this.group != null) {
throw new IllegalStateException("group set already");
}
this.group = group;
return self();
}
//其次设置服务端的channel,这个地方用了反射工厂,主要是保证channel必须有无参构造函数
public B channel(Class<? extends C> channelClass) {
return channelFactory(new ReflectiveChannelFactory<C>(
ObjectUtil.checkNotNull(channelClass, "channelClass")
));
}
//最后设置了childHandler
public ServerBootstrap childHandler(ChannelHandler childHandler) {
this.childHandler = ObjectUtil.checkNotNull(childHandler, "childHandler");
return this;
}
现阶段还是以剖析流程为主,所以不会配置太多,但是这3个设置一定要记住。
3.查看bind方法逻辑
在jdk的编程中,我们也会调用ServerSocketChannel的bind方法,那么Netty的bind方法和jdk的有啥区别呢?
public ChannelFuture bind(int inetPort) {
return bind(new InetSocketAddress(inetPort));
}
public ChannelFuture bind(SocketAddress localAddress) {
validate();
return doBind(ObjectUtil.checkNotNull(localAddress, "localAddress"));
}
private ChannelFuture doBind(final SocketAddress localAddress) {
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
if (regFuture.isDone()) {
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
//...
promise.setFailure(cause);
} else {
//...
promise.registered();
doBind0(regFuture, channel, localAddress, promise);
}
}
});
return promise;
}
}
前面的方法主要是验证参数是否设置和解析地址,真正的逻辑从doBind方法开始。
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
channel = channelFactory.newChannel();
init(channel);
} catch (Throwable t) {
//...
}
ChannelFuture regFuture = config().group().register(channel);
//...
return regFuture;
}
首先我们进入initAndRegister方法,该方法主要做了三件事
1、 创建Channel
2、 初始化Channel
3、 注册Channel
创建Channel
还记得之前我们的三个设置么,这个地方的channel就是我们的第二个设置。 所以我们去到NioServerSocketChannel的无参构造函数中。
public NioServerSocketChannel() {
this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}
private static ServerSocketChannel newSocket(SelectorProvider provider) {
try {
return provider.openServerSocketChannel();
} catch (IOException e) {
throw new ChannelException(
"Failed to open a server socket.", e);
}
}
这个ServerSocketChannel就是我们在jdk中使用的那个。这也验证了我们上一篇的猜测,Netty的channel会持有jdk的Socket。
继续往下
protected AbstractChannel(Channel parent) {
this.parent = parent;
id = newId();
unsafe = newUnsafe();
pipeline = newChannelPipeline();
}
在AbstractChannel中我们发现了创建UnSafe类和pipeline的方法。那么还是一样,这个地方先记住即可,不必太多纠缠。
初始化Channel
创建channel的分析就先到这里了,下一步来看看初始化channel,回到ServerBootstrap的init方法。
void init(Channel channel) {
//...
ChannelPipeline p = channel.pipeline();
//...
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) {
final ChannelPipeline pipeline = ch.pipeline();
//注意这个地方的handel不是我们设置的childHandle
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}
其他代码是一些参数的设置,我们先不管。这个init方法的主要作用就是往我们刚刚创建的channel的pipline中,放入一个初始化的handel。这里我们简单的看下pipline和ChannelInitializer:
//pipline是在刚刚的AbstractChannel类中创建的
//根据newChannelPipeline()方法,进去之后创建的就是DefaultChannelPipeline类
//可以看出初始化阶段pipline中只有头和尾两个Context
protected DefaultChannelPipeline(Channel channel) {
this.channel = ObjectUtil.checkNotNull(channel, "channel");
succeededFuture = new SucceededChannelFuture(channel, null);
voidPromise = new VoidChannelPromise(channel, true);
tail = new TailContext(this);
head = new HeadContext(this);
head.next = tail;
tail.prev = head;
}
在加上我们刚刚添加进pipline的ChannelInitializer,现在pipline中应该有三个context。后面会详细分析pipline的作用,这边记住pipline里面有几个组件即可。
现在在看一下ChannelInitializer
//这是我们刚刚复写的方法
protected abstract void initChannel(C ch) throws Exception;
//在handle添加的时候会触发initChannel方法
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
if (ctx.channel().isRegistered()) {
//...
if (initChannel(ctx)) {
//...
removeState(ctx);
}
}
}
//该方法会执行我们复写的initChannel方法,并在执行完成后删除pipline中的ChannelInitializer
private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
if (initMap.add(ctx)) { // Guard against re-entrance.
try {
initChannel((C) ctx.channel());
} catch (Throwable cause) {
//...
} finally {
ChannelPipeline pipeline = ctx.pipeline();
if (pipeline.context(this) != null) {
pipeline.remove(this);
}
}
return true;
}
return false;
}
在熟悉了这几个类的作用之后,我们知道,handle添加完成后,在NioServerSocketChannel的pipline中,此时有三个context。头,尾,和ServerBootstrapAcceptor。
注册Channel
public ChannelFuture register(Channel channel) {
return next().register(channel);
}
public ChannelFuture register(Channel channel) {
return register(new DefaultChannelPromise(channel, this));
}
public ChannelFuture register(final ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise");
promise.channel().unsafe().register(this, promise);
return promise;
}
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
//...
AbstractChannel.this.eventLoop = eventLoop;
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
//这个地方我们启动线程肯定不是eventLoop线程,所以进入execute方法
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
//...
}
}
}
public void execute(Runnable task) {
ObjectUtil.checkNotNull(task, "task");
execute(task, !(task instanceof LazyRunnable) && wakesUpForTask(task));
}
private void execute(Runnable task, boolean immediate) {
boolean inEventLoop = inEventLoop();
addTask(task);
if (!inEventLoop) {
startThread();
//...
}
}
private void startThread() {
if (state == ST_NOT_STARTED) {
if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
boolean success = false;
try {
doStartThread();
success = true;
} finally {
if (!success) {
STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED);
}
}
}
}
}
private void doStartThread() {
assert thread == null;
executor.execute(new Runnable() {
@Override
public void run() {
//...
try {
SingleThreadEventExecutor.this.run();
success = true;
} //...
}
在doStartThread方法中,我们看到了调用了SingleThreadEventExecutor.this.run()方法,而这个方法,正是我们上一篇分析NioEventLoop的run方法。
所以经过注册之后,NioEventLoop已经在不停的检测Selector的状态,而NioServerSocketChannel的pipline也已经准备就绪。我们去看一下NioEventLoop的run方法。
protected void run() {
int selectCnt = 0;
for (; ; ) {
if (ioRatio == 100) {
try {
if (strategy > 0) {
//查看Selector的连接状态
processSelectedKeys();
}
} finally {
//运行任务队列的任务
ranTasks = runAllTasks();
}
} else if (strategy > 0) {
final long ioStartTime = System.nanoTime();
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
final long ioTime = System.nanoTime() - ioStartTime;
ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
} else {
ranTasks = runAllTasks(0); // This will run the minimum number of tasks
}
}
}
private void processSelectedKeys() {
if (selectedKeys != null) {
processSelectedKeysOptimized();
} else {
//处理检测到事件的连接
processSelectedKeysPlain(selector.selectedKeys());
}
}
private void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) {
//...
Iterator<SelectionKey> i = selectedKeys.iterator();
for (;;) {
final SelectionKey k = i.next();
final Object a = k.attachment();
i.remove();
if (a instanceof AbstractNioChannel) {
processSelectedKey(k, (AbstractNioChannel) a);
}//...
}
}
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
//...
//ServerSocketChannel关注的是OP_ACCEPT事件
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}
public void read() {
assert eventLoop().inEventLoop();
final ChannelConfig config = config();
final ChannelPipeline pipeline = pipeline();
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
allocHandle.reset(config);
boolean closed = false;
Throwable exception = null;
try {
try {
do {
//查看NioServerSocketChannel的该方法
int localRead = doReadMessages(readBuf);
if (localRead == 0) {
break;
}
if (localRead < 0) {
closed = true;
break;
}
allocHandle.incMessagesRead(localRead);
} while (allocHandle.continueReading());
} catch (Throwable t) {
exception = t;
}
int size = readBuf.size();
for (int i = 0; i < size; i ++) {
readPending = false;
//触发读事件,将该channel分配给worker线程处理
pipeline.fireChannelRead(readBuf.get(i));
}
//...
}
protected int doReadMessages(List<Object> buf) throws Exception {
SocketChannel ch = SocketUtils.accept(javaChannel());
try {
if (ch != null) {
buf.add(new NioSocketChannel(this, ch));
return 1;
}
} catch (Throwable t) {
//...
}
return 0;
}
到这儿服务端接收连接的逻辑就完成了。下一篇在分析一下Netty对连接的处理流程。