专注于 JetBrains IDEA 全家桶,永久激活,教程
持续更新 PyCharm,IDEA,WebStorm,PhpStorm,DataGrip,RubyMine,CLion,AppCode 永久激活教程

Reactor-Netty系列1-TcpClient源码分析-从示例程序开始

1、示例程序:

Reactor-Netty 版本:

<dependency>
    <groupId>io.projectreactor.netty</groupId>
    <artifactId>reactor-netty</artifactId>
    <version>0.8.10.RELEASE</version>
</dependency>

示例程序:

public class TcpServerApplication {
    public static void main(String[] args) {
        DisposableServer server = TcpServer
                .create()
                .host("127.0.0.1")
                .port(8080)
                .handle((inbound, outbound) ->
                        inbound.receive().asString().log().then()
                )
                .bindNow();

        server.onDispose()
                .block();
    }
}

public class TcpClientApplication {
    public static void main(String[] args) throws InterruptedException {
        TcpClient client = TcpClient.create()       // 1 TcpClientConnect
                .host("127.0.0.1")          // 2 TcpClientBootstrap
                .port(8080)         // 3 TcpClientBootstrap
                .handle((inbound, outbound) -> outbound.sendString(Mono.just("Hello World!")).then());              // 4 TcpClientDoOn
        client.connectNow();        // 5 Connection
        Thread.sleep(3000);
    }
}

TcpServerApplication 输出结果:

[ INFO] (reactor-tcp-nio-2) onSubscribe(FluxHandle.HandleSubscriber)
[ INFO] (reactor-tcp-nio-2) request(unbounded)
[ INFO] (reactor-tcp-nio-2) onNext(Hello World!)
[ INFO] (reactor-tcp-nio-2) cancel()

基本逻辑是:Server 端绑定 8080 端口并监听请求;Client 端连接上端口后发送字符串 Hello World!;Server 端口收到请求后打印出来。

下面进行具体源码分析。

2、TcpClient

TcpClient.create()

public static TcpClient create() {
   return create(TcpResources.get());
}

/**
 * 最终返回的是 TcpClientConnect
 * 从入参可知,TcpClientConnect 关注的是连接管理 ConnectionProvider
 */
public static TcpClient create(ConnectionProvider provider) {
     return new TcpClientConnect(provider);
}

public class TcpResources implements ConnectionProvider, LoopResources {
  final ConnectionProvider defaultProvider;
    final LoopResources      defaultLoops;

    protected TcpResources(LoopResources defaultLoops,
            ConnectionProvider defaultProvider) {
        this.defaultLoops = defaultLoops;
        this.defaultProvider = defaultProvider;
    }

  /**
   * 该静态方法最终返回的是 TcpResources,包括:
   *    ConnectionProvider: 管理连接
   *        LoopResources: 管理线程
   */
    public static TcpResources get() {
    // 如果不存在,那么创建 TcpResources;否则,直接返回 TcpResources
        return getOrCreate(tcpResources, null, null, ON_TCP_NEW,  "tcp");
    }

host()

/**
 * 1. 最终返回的是 TcpClientBootstrap
 * 2. TcpClientBootstrap 类有一个 bootstrapMapper, 是一个 Function: b -> TcpUtils.updateHost(b, host),关注两个地方:b 是一个 Bootstrap 对象,b 何时生成?Function 接口的 apply 方法什么时候被执行?可以看到 TcpClientBootstrap 类的 configure() 方法同时满足了上面 2 个地方,因此只需要关注该方法何时被调用即可。
 */
public final TcpClient host(String host) {
        Objects.requireNonNull(host, "host");
        return bootstrap(b -> TcpUtils.updateHost(b, host));
}

public final TcpClient bootstrap(Function<? super Bootstrap, ? extends Bootstrap> bootstrapMapper) {
        return new TcpClientBootstrap(this, bootstrapMapper);
}

final class TcpClientBootstrap extends TcpClientOperator {

    final Function<? super Bootstrap, ? extends Bootstrap> bootstrapMapper;

    TcpClientBootstrap(TcpClient client,
            Function<? super Bootstrap, ? extends Bootstrap> bootstrapMapper) {
        super(client);
        this.bootstrapMapper = Objects.requireNonNull(bootstrapMapper, "bootstrapMapper");
    }

    @Override
    public Bootstrap configure() {
        return Objects.requireNonNull(bootstrapMapper.apply(source.configure()), "bootstrapMapper");
    }
}

port()

/**
 * 和 host(String host) 方法类似
 */
public final TcpClient port(int port) {
        return bootstrap(b -> TcpUtils.updatePort(b, port));
}

handler()

/**
 * 最终返回的是 TcpClientDoOn;
 * handler 的入参是 BiFunction,并且在 doOnConnected 方法中直接调用了 apply 方法;
 * BiFunction 返回的 Publisher 也直接调用了 subscribe 方法;
 * 因此,只需要关注 doOnConnected 方法的入参 Consumer 何时被调用即可
 */
public final TcpClient handle(BiFunction<? super NettyInbound, ? super NettyOutbound, ? extends Publisher<Void>> handler) {
        Objects.requireNonNull(handler, "handler");
        return doOnConnected(c -> {
            if (log.isDebugEnabled()) {
                log.debug(format(c.channel(), "Handler is being applied: {}"), handler);
            }

            Mono.fromDirect(handler.apply((NettyInbound) c, (NettyOutbound) c))
                .subscribe(c.disposeSubscriber());
        });
}

public final TcpClient doOnConnected(Consumer<? super Connection> doOnConnected) {
        Objects.requireNonNull(doOnConnected, "doOnConnected");
        return new TcpClientDoOn(this, null, doOnConnected, null);
}

final class TcpClientDoOn extends TcpClientOperator implements ConnectionObserver {

    final Consumer<? super Bootstrap>  onConnect;
        // onConnected 即 handle 方法中调用的 doOnConnected 的 Consumer
    final Consumer<? super Connection> onConnected;
    final Consumer<? super Connection> onDisconnected;

    TcpClientDoOn(TcpClient client,
            @Nullable Consumer<? super Bootstrap> onConnect,
            @Nullable Consumer<? super Connection> onConnected,
            @Nullable Consumer<? super Connection> onDisconnected) {
                // 继承上一个 TcpClient
        super(client);
        this.onConnect = onConnect;
        this.onConnected = onConnected;
        this.onDisconnected = onDisconnected;
    }

    @Override
    public Bootstrap configure() {
        Bootstrap b = source.configure();
        ConnectionObserver observer = BootstrapHandlers.connectionObserver(b);
                // 注意:这里设置了 ConnectionObserver,后面会讲到
        BootstrapHandlers.connectionObserver(b, observer.then(this));   
        return b;
    }

    @Override
    public Mono<? extends Connection> connect(Bootstrap b) {
        if (onConnect != null) {
            return source.connect(b)
                         .doOnSubscribe(s -> onConnect.accept(b));
        }
        return source.connect(b);
    }

    @Override
    public void onStateChange(Connection connection, State newState) {
                // onConnected 在这里被调用,即 connection 状态改变时
        if (onConnected != null && newState == State.CONFIGURED) {
            onConnected.accept(connection);
            return;
        }
        if (onDisconnected != null) {
            if (newState == State.DISCONNECTING) {
                connection.onDispose(() -> onDisconnected.accept(connection));
            }
            else if (newState == State.RELEASED) {
                onDisconnected.accept(connection);
            }
        }
    }
}

connectNow()

// 设置超时 45s
public final Connection connectNow() {
        return connectNow(Duration.ofSeconds(45));
}

public final Connection connectNow(Duration timeout) {
    Objects.requireNonNull(timeout, "timeout");
    try {
        // 这里 connect() 方法返回的是 Mono
        return Objects.requireNonNull(connect().block(timeout), "aborted");
    }
    catch (IllegalStateException e) {
        ...
    }
}

// 返回的是 Mono
public final Mono<? extends Connection> connect() {
    ...
    return connect(b);
}

// block 方法中直接开始订阅
public T block(Duration timeout) {
    BlockingMonoSubscriber<T> subscriber = new BlockingMonoSubscriber<>();
    onLastAssembly(this).subscribe(Operators.toCoreSubscriber(subscriber));
    return subscriber.blockingGet(timeout.toMillis(), TimeUnit.MILLISECONDS);
}

final T blockingGet(long timeout, TimeUnit unit) {
    ...
    if (getCount() != 0) {
        try {
            if (!await(timeout, unit)) {
                dispose();  // 超时取消订阅
                throw new IllegalStateException("Timeout on blocking read for " + timeout + " " + unit);
            }
        }
        catch (InterruptedException ex) {
            dispose();
            RuntimeException re = Exceptions.propagate(ex);
            //this is ok, as re is always a new non-singleton instance
            re.addSuppressed(new Exception("#block has been interrupted"));
            throw re;
        }
    }
    ...
}

由以上分析可知,在最后的 connectNow() 方法中,才开始真正的订阅执行。下面继续分析 connect 方法。

connect()

public final Mono<? extends Connection> connect() {
    Bootstrap b;
    try {
        // 1. 获取默认的 Bootstrap
        b = configure();
    }
    catch (Throwable t) {
        Exceptions.throwIfJvmFatal(t);
        return Mono.error(t);
    }
    // 2. connect(b)
    return connect(b);
}

public Bootstrap configure() {
    return DEFAULT_BOOTSTRAP.clone();
}

static final Bootstrap DEFAULT_BOOTSTRAP =
    new Bootstrap().option(ChannelOption.AUTO_READ, false)             .remoteAddress(InetSocketAddressUtil.createUnresolved(NetUtil.LOCALHOST.getHostAddress(), DEFAULT_PORT));

继续看 connect(Bootstrap b) 方法:

// 这是一个抽象方法,很多继承类都实现了该方法。根据之前的代码分析,首先调用的应该是 TcpClientDoOn 类
public abstract Mono<? extends Connection> connect(Bootstrap b);

// TcpClientDoOn 类
public Mono<? extends Connection> connect(Bootstrap b) {
    if (onConnect != null) {
        return source.connect(b)
                        .doOnSubscribe(s -> onConnect.accept(b));
    }
    // 往上传递,source 代表上一个 TcpClient;最终传递到初始的 TcpClientConnect
    return source.connect(b);
}

// TcpClientConnect 类
final ConnectionProvider provider;
public Mono<? extends Connection> connect(Bootstrap b) {
    // 填充 b 的属性
    if (b.config()
            .group() == null) {
        TcpClientRunOn.configure(b,
                LoopResources.DEFAULT_NATIVE,
                TcpResources.get(),
                maxConnections != -1);
    }
    // 最终调用这个方法
    return provider.acquire(b);
}

ConnectionProvider

上面讲到 connect 方法最终调用的是 ConnectionProvider 类中的方法。ConnectionProvider 在之前的分析中出现过,即TcpResources.get() 方法返回的 TcpResources 对象中包含这个属性。

// 创建默认的 TcpResources
static <T extends TcpResources> T create(@Nullable T previous,
            @Nullable LoopResources loops, @Nullable ConnectionProvider provider,
            String name,
            BiFunction<LoopResources, ConnectionProvider, T> onNew) {
        if (previous == null) {
            loops = loops == null ? LoopResources.create("reactor-" + name) : loops;
                        // 创建 ConnectionProvider
            provider = provider == null ? ConnectionProvider.elastic(name) : provider;
        }
        else {
            loops = loops == null ? previous.defaultLoops : loops;
            provider = provider == null ? previous.defaultProvider : provider;
        }
        return onNew.apply(loops, provider);
    }
}

static ConnectionProvider elastic(String name) {
    // 这里的第 2 个入参 PoolFactory 又是一个函数式接口,因此对象的生成时间点在于何时调用 PoolFactory.newPool 方法; 生成的 ChannelPool 类型为 SimpleChannelPool。
        return new PooledConnectionProvider(name,
                (bootstrap, handler, checker) -> new SimpleChannelPool(bootstrap,
                        handler,
                        checker,
                        true,
                        false));
}

final class PooledConnectionProvider implements ConnectionProvider {

    interface PoolFactory {

        ChannelPool newPool(Bootstrap b,
                ChannelPoolHandler handler,
                ChannelHealthChecker checker);
    }

    final ConcurrentMap<PoolKey, Pool> channelPools;
    final String                       name;
    final PoolFactory                  poolFactory;
    final int                          maxConnections;

    PooledConnectionProvider(String name, PoolFactory poolFactory) {
        this.name = name;
        this.poolFactory = poolFactory;
        this.channelPools = PlatformDependent.newConcurrentHashMap();
        this.maxConnections = -1;
    }
    ...
}

现在回到 provider.acquire(b) 方法,可以知道调用的是 PooledConnectionProvider 类中的方法,继续分析:

// Map 结构,每个 (remote address, handler) 组合都有一个连接池
final ConcurrentMap<PoolKey, Pool> channelPools;
final String                       name;
// 通过 poolFactory 生成 ChannelPool
final PoolFactory                  poolFactory;
final int                          maxConnections;

/**
 * 主要作用是从连接池中获取连接
 * 首先需要找到对应的连接池, 通过 channelPools.get(holder)
 * 如果不存在,那么创建新的连接池,并加入到 channelPools 中
 * 最后调用 disposableAcquire(sink, obs, pool, false);
 */
public Mono<Connection> acquire(Bootstrap b) {
    return Mono.create(sink -> {
        Bootstrap bootstrap = b.clone();
    // TODO:
        ChannelOperations.OnSetup opsFactory =
                BootstrapHandlers.channelOperationFactory(bootstrap);
    // TODO: 连接生命周期的监听器
        ConnectionObserver obs = BootstrapHandlers.connectionObserver(bootstrap);
        // 懒加载,这里需要设置 bootstrap 的 remote address(ip:port)
        NewConnectionProvider.convertLazyRemoteAddress(bootstrap);
        // 每个 (remote address, handler) 都有一个 Pool
        ChannelHandler handler = bootstrap.config().handler();
        PoolKey holder = new PoolKey(bootstrap.config().remoteAddress(),
                handler != null ? handler.hashCode() : -1);

        Pool pool;
        for (; ; ) {
            // 直接获取
            pool = channelPools.get(holder);
            if (pool != null) {
                break;
            }
            // 不存在则创建新的连接池
            pool = new Pool(bootstrap, poolFactory, opsFactory);
            if (channelPools.putIfAbsent(holder, pool) == null) {
                if (log.isDebugEnabled()) {
                    log.debug("Creating new client pool [{}] for {}",
                            name,
                            bootstrap.config()
                                    .remoteAddress());
                }
                break;
            }
            // 关闭多创建的 pool
            pool.close();
        }
        disposableAcquire(sink, obs, pool, false);
    });
}

Pool(Bootstrap bootstrap,
                PoolFactory provider,
                ChannelOperations.OnSetup opsFactory) {
            this.bootstrap = bootstrap;
            this.opsFactory = opsFactory;
                // 创建新的连接池
            this.pool = provider.newPool(bootstrap, this, this);
            this.defaultGroup = bootstrap.config()
                                         .group();
            HEALTHY = defaultGroup.next()
                                  .newSucceededFuture(true);
            UNHEALTHY = defaultGroup.next()
                                    .newSucceededFuture(false);
}

继续 disposableAcquire 方法,

static void disposableAcquire(MonoSink<Connection> sink, ConnectionObserver obs, Pool pool, boolean retried) {
            // 获取 Channel
        Future<Channel> f = pool.acquire();
        DisposableAcquire disposableAcquire =
                new DisposableAcquire(sink, f, pool, obs, retried);
            // 设置监听器, 该方法最终会调用 disposableAcquire.operationComplete() 方法,operationComplete() 方法会调用 disposableAcquire.run()
        f.addListener(disposableAcquire);
        sink.onCancel(disposableAcquire);
    }

final static class DisposableAcquire
            implements Disposable, GenericFutureListener<Future<Channel>>,
                       ConnectionObserver , Runnable {

    final Future<Channel>      f;
    final MonoSink<Connection> sink;
    final Pool                 pool;
    final ConnectionObserver   obs;
    final boolean              retried;

    DisposableAcquire(MonoSink<Connection> sink,
            Future<Channel> future,
            Pool pool,
            ConnectionObserver obs,
            boolean retried) {
        this.f = future;
        this.pool = pool;
        this.sink = sink;
        this.obs = obs;
        this.retried = retried;
    }

    // 当连接的状态改变时,调用 obs.onStateChange;而这里的 obs 就是我们在 TcpClientDoOn.configure() 方法中设置的;所以一旦连接状态改变,就会调用 TcpClient.handle 中的方法
    @Override
    public void onStateChange(Connection connection, State newState) {
        if (newState == State.CONFIGURED) {
            sink.success(connection);
        }
        obs.onStateChange(connection, newState);
    }
    ...
}

DisposableAcquire 是一个监听器,监听的是连接,即上面代码中的 Future f = pool.acquire()。那么这个 f 是什么类型呢?之前的代码分析中已经知道 pool 为 SimpleChannelPool 类型。

public SimpleChannelPool(Bootstrap bootstrap, final ChannelPoolHandler handler, ChannelHealthChecker healthCheck,
                             boolean releaseHealthCheck, boolean lastRecentUsed) {
        this.handler = checkNotNull(handler, "handler");
        this.healthCheck = checkNotNull(healthCheck, "healthCheck");
        this.releaseHealthCheck = releaseHealthCheck;
        // Clone the original Bootstrap as we want to set our own handler
        this.bootstrap = checkNotNull(bootstrap, "bootstrap").clone();
        this.bootstrap.handler(new ChannelInitializer<Channel>() {
            @Override
            protected void initChannel(Channel ch) throws Exception {
                assert ch.eventLoop().inEventLoop();
                // 当新建连接时,会调用该方法
                handler.channelCreated(ch);
            }
        });
        this.lastRecentUsed = lastRecentUsed;
    }
}

public void channelCreated(Channel ch) {
            inactiveConnections.incrementAndGet();
            ...
                        // 这里把 ch 包装成了一下, PooledConnection 这个类同时实现了 Connection 以及 ConnectionObserver 接口,也就是说既是一个 channel,又是一个 listener。后续如果 channel 的状态发生改变,会调用 PooledConnection 的 onStateChange 方法。
            PooledConnection pooledConnection = new PooledConnection(ch, this);
            pooledConnection.bind();
            Bootstrap bootstrap = this.bootstrap.clone();
            BootstrapHandlers.finalizeHandler(bootstrap, opsFactory, pooledConnection);
            ch.pipeline()
              .addFirst(bootstrap.config()
                                 .handler());
}

下面继续看 PooledConnection 的 onStateChange 方法。

public void onStateChange(Connection connection, State newState) {
        if (newState == State.DISCONNECTING) {
        ...
            }
            // 其他状态走这里
        owner().onStateChange(connection, newState);
}

ConnectionObserver owner() {
            ConnectionObserver obs;
            for (;;) {
                obs = channel.attr(OWNER)
                             .get();
                if (obs == null) {
                    obs = new PendingConnectionObserver();
                }
                else {
                    return obs;
                }
                                // 设置 channel.attr(OWNER) 为新创建的 PendingConnectionObserver
                                // 之后再次调用 own() 方法时直接返回该 PendingConnectionObserver
                if (channel.attr(OWNER)
                           .compareAndSet(null, obs)) {
                    return obs;
                }
            }
}

final static class PendingConnectionObserver implements ConnectionObserver {

        final Queue<Pending> pendingQueue = Queues.<Pending>unbounded(4).get();

        @Override
        public void onUncaughtException(Connection connection, Throwable error) {
            pendingQueue.add(new Pending(connection, error, null));
        }

        @Override
        public void onStateChange(Connection connection, State newState) {
                        // 把状态变更放入了等待队列,其他什么都不做
            pendingQueue.add(new Pending(connection, null, newState));
        }

        static class Pending {
            final Connection connection;
            final Throwable error;
            final State state;

            Pending(Connection connection, @Nullable Throwable error, @Nullable State state) {
                this.connection = connection;
                this.error = error;
                this.state = state;
            }
        }
    }

从上面代码可知,Channel 的状态变更最终放入了一个等待队列,缺少了通知各个监听器的调用。继续回到 DisposableAcquire 类,发现同时实现了 Runnable 接口。

final static class DisposableAcquire
            implements Disposable, GenericFutureListener<Future<Channel>>,
                       ConnectionObserver , Runnable {

    final Future<Channel>      f;
    final MonoSink<Connection> sink;
    final Pool                 pool;
    final ConnectionObserver   obs;
    final boolean              retried;

    @Override
    public void onStateChange(Connection connection, State newState) {
        if (newState == State.CONFIGURED) {
            sink.success(connection);
        }
        obs.onStateChange(connection, newState);
    }

    @Override
    public void run() {
        Channel c = f.getNow();
        pool.activeConnections.incrementAndGet();
        pool.inactiveConnections.decrementAndGet();
    // 之前 owner() 方法设置了 PendingConnectionObserver
        ConnectionObserver current = c.attr(OWNER)
                                        .getAndSet(this);

        if (current instanceof PendingConnectionObserver) {
            PendingConnectionObserver pending = (PendingConnectionObserver)current;
            PendingConnectionObserver.Pending p;
            current = null;
            // 监听连接关闭
            registerClose(c, pool);     
        // 依次处理等待队列中的事件(连接状态变更)
            while((p = pending.pendingQueue.poll()) != null) {
                if (p.error != null) {
                    onUncaughtException(p.connection, p.error);
                }
                else if (p.state != null) {
                    // 通知各个监听器
                    onStateChange(p.connection, p.state);
                }
            }
        }
        else if (current == null) {
            registerClose(c, pool);
        }
    // TODO: 什么情况会走这边?
        if (current != null) {
            Connection conn = Connection.from(c);
            if (log.isDebugEnabled()) {
                log.debug(format(c, "Channel acquired, now {} active connections and {} inactive connections"),
                        pool.activeConnections, pool.inactiveConnections);
            }
            obs.onStateChange(conn, State.ACQUIRED);

            PooledConnection con = conn.as(PooledConnection.class);
            if (con != null) {
                ChannelOperations<?, ?> ops = pool.opsFactory.create(con, con, null);
                if (ops != null) {
                    ops.bind();
                    obs.onStateChange(ops, State.CONFIGURED);
                    sink.success(ops);
                }
                else {
                    //already configured, just forward the connection
                    sink.success(con);
                }
            }
            else {
                //already bound, just forward the connection
                sink.success(conn);
            }
            return;
        }
        //Connected, leave onStateChange forward the event if factory
                ...
        if (pool.opsFactory == ChannelOperations.OnSetup.empty()) {
            sink.success(Connection.from(c));
        }
    }
}

至此,TcpClient 示例程序中的几行代码差不多就算是分析完了。

文章永久链接:https://tech.souyunku.com/25830

未经允许不得转载:搜云库技术团队 » Reactor-Netty系列1-TcpClient源码分析-从示例程序开始

JetBrains 全家桶,激活、破解、教程

提供 JetBrains 全家桶激活码、注册码、破解补丁下载及详细激活教程,支持 IntelliJ IDEA、PyCharm、WebStorm 等工具的永久激活。无论是破解教程,还是最新激活码,均可免费获得,帮助开发者解决常见激活问题,确保轻松破解并快速使用 JetBrains 软件。获取免费的破解补丁和激活码,快速解决激活难题,全面覆盖 2024/2025 版本!

联系我们联系我们