作者:朱小厮 | 出自:https://hiddenpps.blog.csdn.net/column/info/14800
关于ChannelManager,官方注解:Manages a set of channels, indexed by channel number (1… _channelMax)。
ChannelManager类的代码量不是很多,主要用来管理Channel的,channelNumber=0的除外,应为channelNumber=0是留给Connection的特殊的channelNumber。
下面是ChannelManager的成员变量:
/** Monitor for _channelMap
and channelNumberAllocator
*/
private final Object monitor = new Object();
/** Mapping from 1.._channelMax
to {@link ChannelN} instance */
private final Map _channelMap = new HashMap();
private final IntAllocator channelNumberAllocator;
private final ConsumerWorkService workService;
private final Set shutdownSet = new HashSet();
/** Maximum channel number available on this connection. */
private final int _channelMax;
private final ThreadFactory threadFactory;
这上面的成员变量下面会有涉及。
对于ChannelManager的使用,是AMQConnection中的成员变量:
/** Object that manages a set of channels */
private volatile ChannelManager _channelManager;
AMQConnection中start()的_channelManager中对其初始化:
protected ChannelManager instantiateChannelManager(int channelMax, ThreadFactory threadFactory) {
return new ChannelManager(this._workService, channelMax, threadFactory);
}
再调用其构造函数:
public ChannelManager(ConsumerWorkService workService, int channelMax, ThreadFactory threadFactory) {
if (channelMax == 0) {
// The framing encoding only allows for unsigned 16-bit integers
// for the channel number
channelMax = (1 << 16) - 1;
}
_channelMax = channelMax;
channelNumberAllocator = new IntAllocator(1, channelMax);
this.workService = workService;
this.threadFactory = threadFactory;
}
这里的ConsumerWorkService也在AMQConnection的start()方法中初始化——initializeConsumerWorkService():
private void initializeConsumerWorkService() {
this._workService = new ConsumerWorkService(executor, threadFactory, shutdownTimeout);
}
再回到构造函数。
channelMax参数是在client接收到broker的Connection.Tune帧中的“Channel-Max”参数之后设置的,如果为0则表示没有限制,这里就会设置为默认的最大值:2的16次方-1。
threadFactory参数是指:Executors.defaultThreadFactory();
关于ConsumerWorkService请参考文章末尾处。
使用过RabbitMQ的同学知道要生产或者消费消息之前必须要初始化Channel,如下:
Channel channel = connection.createChannel();
这个createChannel()是AMQConnection中的方法:
public Channel createChannel(int channelNumber) throws IOException {
ensureIsOpen();
ChannelManager cm = _channelManager;
if (cm == null) return null;
return cm.createChannel(this, channelNumber);
}
public Channel createChannel() throws IOException {
ensureIsOpen();
ChannelManager cm = _channelManager;
if (cm == null) return null;
return cm.createChannel(this);
}
这里就是调用了ChannelManager的createChannel方法。
下面是ChannelManager中关于创建Channel的代码:
public ChannelN createChannel(AMQConnection connection) throws IOException {
ChannelN ch;
synchronized (this.monitor) {
int channelNumber = channelNumberAllocator.allocate();
if (channelNumber == -1) {
return null;
} else {
ch = addNewChannel(connection, channelNumber);
}
}
ch.open(); // now that it's been safely added
return ch;
}
public ChannelN createChannel(AMQConnection connection, int channelNumber) throws IOException {
ChannelN ch;
synchronized (this.monitor) {
if (channelNumberAllocator.reserve(channelNumber)) {
ch = addNewChannel(connection, channelNumber);
} else {
return null;
}
}
ch.open(); // now that it's been safely added
return ch;
}
private ChannelN addNewChannel(AMQConnection connection, int channelNumber) throws IOException {
if (_channelMap.containsKey(channelNumber)) {
// That number's already allocated! Can't do it
// This should never happen unless something has gone
// badly wrong with our implementation.
throw new IllegalStateException("We have attempted to "
+ "create a channel with a number that is already in "
+ "use. This should never happen. "
+ "Please report this as a bug.");
}
ChannelN ch = instantiateChannel(connection, channelNumber, this.workService);
_channelMap.put(ch.getChannelNumber(), ch);
return ch;
}
protected ChannelN instantiateChannel(AMQConnection connection, int channelNumber, ConsumerWorkService workService) {
return new ChannelN(connection, channelNumber, workService);
}
上面有两个createChannel方法,一个是带了channelNumber的,一个是自动分片channelNumber的,分别对应AMQConnection中的两个方法。最后都调用addNewChannel方法。
注意两个createChannel方法中都有这样一句代码:
ch.open();
这个是什么呢?其实是调用ChannelN的open方法:
/**
* Package method: open the channel.
* This is only called from {@link ChannelManager}.
* @throws IOException if any problem is encountered
*/
public void open() throws IOException {
// wait for the Channel.OpenOk response, and ignore it
exnWrappingRpc(new Channel.Open(UNSPECIFIED_OUT_OF_BAND));
}
这样就调用了AMQChannel的rpc方法,向broker发送了一个Channel.Open帧。
addNewChannel方法实际上是创建了一个ChannelN对象,然后置其于ChannelManager中的_channelMap中,方便管理。
channelNumberAllocator是channelNumber的分配器,其原理是采用BitSet来实现channelNumber的分配,有兴趣的同学可以深究进去看看。
关于ChannelN类会有专门一篇博文来讲述,其实整个RabbitMQ-client的代码最关键的就是ChannelN这个类,需要着重讲述。
细心的朋友可能会发现关于ConsumerWorkService这个,我并没有做什么阐述。这个主要牵涉到Channel层面的处理,涉及到的类有AMQConnection, ChannelN, ConsumerDispatcher等。ConsumerWorkService是在AMQConnection中初始化,在ChannelManager中引用。至于这里怎么理解,在ChannelN中这么解释:
service for managing this channel’s consumer callbacks。意思是管理消费回调的服务。
综述,ChannelManager主要用来管理Channel, 包括channelNumber与Channel之间的映射关系。
附录:RabbitMQ-客户端源码系列文章
- 一、RabbitMQ-客户端源码之ConnectionFactory
- 二、RabbitMQ-客户端源码之AMQConnection
- 三、RabbitMQ-客户端源码之ChannelManager
- 四、RabbitMQ-客户端源码之Frame
- 五、RabbitMQ-客户端源码之AMQChannel
- 六、RabbitMQ-客户端源码之AMQCommand
- 七、RabbitMQ-客户端源码之AMQPImpl+Method
- 八、RabbitMQ-客户端源码之ChannelN
- 九、RabbitMQ-客户端源码之Consumer