专注于 JetBrains IDEA 全家桶,永久激活,教程
持续更新 PyCharm,IDEA,WebStorm,PhpStorm,DataGrip,RubyMine,CLion,AppCode 永久激活教程

一、RabbitMQ-客户端源码之ConnectionFactory

作者:朱小厮 | 出自:https://hiddenpps.blog.csdn.net/column/info/14800

ConnectionFactory factory = new ConnectionFactory();
factory.setHost(ip);
factory.setPort(5672);
factory.setUsername("root");
factory.setPassword("root");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
String message = "RabbitMQ Demo Test:" + System.currentTimeMillis();
channel.basicPublish(EXCHANGE_NAME, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
channel.close();
connection.close();

相信使用rabbitmq java客户端的同学来说,这段代码并不陌生,主要的作用是发送一条消息至broker然后关闭。通过wireshark抓包工具可以看到整个AMQP协议的流程,如下图:
(xx.xx.48.240是client的ip,xx.xx.197.73是broker的ip)

img_2.png

下面通过源码来分析下Connection有关的整个流程,对于上面AMQP流程中的Protocol-Header到Connection.Open-Ok的部分。

首先是ConnectionFactory类(文章开篇的demo中),这里主要包含一些与broker连接的配置参数等,比如:username, password, virtualHost, host,port, requestedChannelMax, requestedFrameMax, requestedHeartbeat, connectionTimeout, shutdownTimeout(只列出部分)。

这个类中其余都是些Getter和Setter方法,但是有个newConnection方法是关键,文中开篇的demo代码下面列出详细内容:

/**
 * Create a new broker connection, picking the first available address from
 * the list.
 *
 * If automatic connection recovery
 * is enabled, the connection returned by this method will be {@link Recoverable}. Future
 * reconnection attempts will pick a random accessible address from the provided list.
 *
 * @param executor thread execution service for consumers on the connection
 * @param addrs an array of known broker addresses (hostname/port pairs) to try in order
 * @return an interface to the connection
 * @throws java.io.IOException if it encounters a problem
 * @see Automatic Recovery
 */
public Connection newConnection(ExecutorService executor, Address[] addrs)
        throws IOException, TimeoutException {
    FrameHandlerFactory fhFactory = createFrameHandlerFactory();
    ConnectionParams params = params(executor);

    if (isAutomaticRecoveryEnabled()) {
        // see com.rabbitmq.client.impl.recovery.RecoveryAwareAMQConnectionFactory#newConnection
        AutorecoveringConnection conn = new AutorecoveringConnection(params, fhFactory, addrs);
        conn.init();
        return conn;
    } else {
        IOException lastException = null;
        for (Address addr : addrs) {
            try {
                FrameHandler handler = fhFactory.create(addr);
                AMQConnection conn = new AMQConnection(params, handler);
                conn.start();
                return conn;
            } catch (IOException e) {
                lastException = e;
            }
        }
        throw (lastException != null) ? lastException : new IOException("failed to connect");
    }
}

方法中首先是FrameHandlerFactory fhFactory = createFrameHandlerFactory();这个是用来处理client与broker之间的通信帧(Frame)的,包括建立通信链路(java的原生socket,注意这里没有NIO也没有netty)。

protected FrameHandlerFactory createFrameHandlerFactory() throws IOException {
    return new FrameHandlerFactory(connectionTimeout, factory, socketConf, isSSL());
}

调用createFrameHandlerFactory()方法得到FrameHandlerFactory对象之后再:“ FrameHandler handler = fhFactory.create(addr);”返回的是SocketFrameHandler对象,这个对象是对Socket的一个封装,完全可以看成是一个Socket对象。
注意这里的Socket的TCP_NODELAY参数默认设置为true,而不是默认的false。当然你也可以调用ConnectionFactory的setSocketConfigurator自行设置。

//这个方法是DefaultSocketConfigurator的唯一的方法
public void configure(Socket socket) throws IOException {
    // disable Nagle's algorithm, for more consistently low latency
    socket.setTcpNoDelay(true);
}

有关Socket的TCP_NODELAY参数:默认情况下发送数据是采用Negale算法。Negale算法是指发送方数据不会立刻发送出去,而是先放在缓冲区内,等待缓冲区满了,在发出去。Negale算法适用于需要发送大量数据的应用场景。这种算法减少传输的次数增加性能。但是如果对于需要即使响应的,小批量数据的应用场景,例如网络游戏就不能采用Negale算法了。默认是false,表示采用Negale算法。

ConnectionParams 主要用来配置与broker连接相关的参数,比如username,password,vhost等。这个与前面Socket的参数不同,需要注意区分。

之后if(isAutomaticRecoveryEnabled()){}之内的方法是建立可自动恢复连接的,这个可以忽略,直接看else里面的代码,因为if和else大体功能上一致,else里的更通用一些,也是默认的。上面提到 FrameHandler handler = fhFactory.create(addr);这段代码返回的是SocketFrameHandler对象,之后: AMQConnection conn = new AMQConnection(params, handler);这句通过参数和Socket与broker建立连接。之后初始化:conn.start();完成之后客户端就已经和broker建立了正常的连接了.

有关AMQConnection的详细内容将在下一篇文章[[二]RabbitMQ-客户端源码之AMQConnection]中讲述。

附录:RabbitMQ-客户端源码系列文章

未经允许不得转载:搜云库技术团队 » 一、RabbitMQ-客户端源码之ConnectionFactory

JetBrains 全家桶,激活、破解、教程

提供 JetBrains 全家桶激活码、注册码、破解补丁下载及详细激活教程,支持 IntelliJ IDEA、PyCharm、WebStorm 等工具的永久激活。无论是破解教程,还是最新激活码,均可免费获得,帮助开发者解决常见激活问题,确保轻松破解并快速使用 JetBrains 软件。获取免费的破解补丁和激活码,快速解决激活难题,全面覆盖 2024/2025 版本!

联系我们联系我们