文章永久连接:https://tech.souyunku.com/?p=6010
作者:唯有坚持不懈 | 出处:https://blog.csdn.net/prestigeding/article/details/78888290
1、消息消费需要解决的问题
首先再次重复啰嗦一下 RocketMQ 消息消费的一些基本元素的关系
主题 —》 消息队列(MessageQueue) 1 对多。
主题 —》 消息生产者,一般主题会由多个生产者组成,生产者组。
主题 —》 消息消费者,一般一个主题也会被多个消费者消费。
那消息消费至少需要解决如下问题:
1、一个消费组中多个消费者是如何对消息队列(1个主题多个消息队列)进行负载消费的。
2、一个消费者中多个线程又是如何协作(并发)的消费分配给该消费者的消息队列中的消息呢?
3、消息消费进度如何保存,包括MQ是如何知道消息是否正常被消费了。
4、RocketMQ 推拉模式实现机制。
再提一个业界关于消费者与消息队列的消费规则。
1个消费者可以消费多个消息队列,但一个消息队列同一时间只能被一个消费者消费,这又是如何实现的呢?
本文紧接着上文:消息消费概述 。
继续探讨消息分发与消费端负载均衡。
我们从上文知道,PullMessageService 线程主要是负责 pullRequestQueue 中的 PullResult,那问题来了,pullRequestQueue 中的数据从哪来,在什么时候由谁来填充呢。
那我们就先沿着这条线索分析下去,看一下 PullMessageService 的 pullReqestQueue 添加元素的方法的调用链条如下:
也就是调用链:
RebalanceService. run()
MQClientInstance.doRebalance()
DefaultMQPulConsumerImpl.doRebalance()
RebalanceImpl.doRebalance()
RebalanceImpl.rebalanceByTopic
RebalanceImpl.updateProcessQueueTableInRebalance
RebalanceImpl.dispatchPullRequest
DefaultMQPushConsumerImpl.executePullRequestImmediately
从上面可以直观的看出,向 PullMesssageService 的 LinkedBlockingQueue
那么RebalanceService是何许人也,让我们一起来揭开其神秘面纱。
2、消息消费负载机制分析
2.1 RebalanceService 线程
从上面可以看出,MQClientInstance 持有一个 RebalanceService 线程并启动它。RebalanceService 线程的 run 方法比较简单,就是直接调用 mqClientFactory.doRebalance。
下面重点分步骤来详细探究 MQClientInstance.doRebalance 方法的执行流程。
2.1.1 MQClientInstance.doRebalance
循环遍历每个消费组获取 MQConsumeInner 对象(其实就是 DefaultMQPushConsumerImpl 或 DefaultMQPullConsumerImpl 对象),并执行其 doRebalance 方法。
2.1.2 DefaultMQPushConsumerImpl.doRebalance
RebalanceImpl doRebalance
到这里,经过层层对象委托,终于进入到实现消息负载分发的核心地带了,RebalanceImpl 类,我们应该停下脚步,先重点认识一下RebalanceImpl类。
3、RebalanceImpl 类初探
我们先来看看其核心属性:
- ConcurrentMap<MessageQueue, ProcessQueue> processQueueTable
消息处理队列。 - ConcurrentMap<String, Set
topicSubscribeInfoTable
topic 的队列信息。 - ConcurrentMap<String, SubscriptionData> subscriptionInner
订阅信息。 - String consumerGroup
消费组名称。 - MessageModel messageModel
消费模式。 - AllocateMessageQueueStrategy allocateMessageQueueStrategy
队列分配算法。 - MQClientInstance mqClientFactory
MQ 客户端实例。
下面还是从doRebalance方法入手:
1、根据 topic 来进行负载。
2、移除 MessageQueue,如果 MesageQueue 的 topic 不在订阅的主题中,接下来重点关注 rebalanceByTopic 方法。
RebalanceImpl rebalanceByTopic详解:
part1:根据消息消费模式(集群还是广播)我们先重点看集群模式。
part2: 获取主题的消息消费队列、主题与该消费组的消费者id列表,任意一个为空,则退出方法的执行。
part3: 主要是对主题的消息队列排序、消费者ID进行排序,然后利用分配算法,计算当前消费者ID(mqClient.clientId) 分配出需要拉取的消息队列。
具体的消息消费队列分配算法参考:AllocateMessageQueueStrategy的实现类,具体算法实现就不细化研究了。
在这里举一个最简单的队列分配机制,,比如一个topic 有8个消息队列(q1,q2,q3,q4,q5,q6,q7,q8) ,比如有三个消费者 c1,c2,c3
一种队列负载算法: q1,q4,q7 分给c1,,q2,q5,q8 c2,,q3,q5 给 c3。下文会专题研究一下负载算法。
part4: 更新主题的消息消费处理队列,并返回消息队列负载是否改变。
遍历消息队列-处理队列缓存,只处理 mq 的主题与该主题相关的 ProcessQueue, 如果 mq 不在当期主题的处理范围内(由于消息队列数量变化等原因,消费者的消费队列发生了变化,该消息队列已经分配给别的消费者去消费了),首先设置该消息队列为丢弃 (dropped 为 voliate 修饰),可以及时的阻止继续向 ProceeQueue 中拉取数据,然后执行removeUnecessaryMessageQueue(mq,pq) 来判断是否需要移除。
既然我们都是从Push进入的,本文以Push模式展开(同时我们也可以先思考思考push,pull差别),移步到RebalancePushImpl。
目前只看非顺序消息,逻辑就比较简单了,丢弃之前,先将 MessageQueue 消息消费进度 持久化,然后丢弃,重新被其他消费者加载。顺序消息将会本系列的后续文章中详细介绍。
接下来处理 MessageQueue 的 ProcessQueue,也就是在 ProcessQueueTable 中没有 mq 的处理队列(因为重新负载后,可能会分配一些新的队列)。
主要就是在内存中移除 MessageQueue 的 offerset, 然后计算下一个拉取偏移量,然后每一个MessageQueue创建一个拉取任务(PullRequest)。
RebalancePushImpl
PullMessageService
往PullServiceMessage中的 pullRequestQueue中放入PullRequest,则PullMessageService线程 的run方法就不会阻塞
part5:如果消息负载发生变化,需处理
主要是调整主题小各个队列的拉取阔值。
这里,主要看出来当消费者挂断后,或主题消息队列动态变化后,消息负载会发生变化的重新分布情况。
总结:
本文主要阐述了消息消费端负载机制,这里消息非顺序消息机制就梳理到这里了,大概再总结一下:
1、首先RebalanceService线程启动,为消费者分配消息队列,其实每一个MessageQueue 会构建一个 PullRequest 对象,然后通过 RebalanceImpl 将 PullRequest放入到 PullMessageService 线程的 LinkedBlockingQueue, 进而唤醒 queue.take()方法,然后执行 DefaultMQPushConsumerImpl 的 pullMessage,通过网络从broker端拉取消息,一次最多拉取的消息条数可配置,默认为32条,然后然后将拉取的消息,执行过滤等,然后封装成任务(ConsumeRequest),提交到消费者的线程池去执行,每次消费消息后,又将该 PullRequest 放入到 PullMessageService中(DefaultMQPushConsumerImpl 的机制就是pullInterval 为 0;
下文预告:
CommitLog写入与ConsumeQueue队列的持久化机制
消息消费进度存储机制,再谈RocketMQ消息存储
RocketMQ顺序消息
RocketMQ主从机制
备注:本文是《RocketMQ技术内幕》的原始素材,建议关注笔者的书籍:《RocketMQ技术内幕》。
干货推荐
附录:RocketMQ源码分析,系列文章
- 一、RocketMQ源码分析之NameServer
- 二、RocketMQ源码分析之Broker概述与同步消息发送原理与高可用设计及思考
- 三、RocketMQ源码分析之CommitLog消息存储机制
- 四、RocketMQ源码分析之消息消费概述
- 五、RocketMQ源码分析消息消费机制—-消费者拉取消息机制
- 【当前读到】六、RocketMQ源码分析消息消费机制—-消费端消息负载均衡机制与重新分布
- 七、RocketMQ源码分析之消息消费重试机制
- 八、RocketMQ源码分析之消息ACK机制(消费进度)
- 九、RocketMQ源码分析之消费队列、Index索引文件存储结构与存储机制-上篇
- 十、RocketMQ源码分析之消费队列、Index索引文件存储结构与存储机制-下篇
- 十一、RocketMQ源码分析刷盘机制
- 十二、RocketMQ源码分析消息过滤机制上篇—–消息消费服务端过滤与TAG模式过滤实现
- 十三、RocketMQ源码分析消息过滤机制下篇-FilterServer、ClassFilter模式详解
- 十四、RocketMQ源码分析消息拉取拉模式PULL
- 十五、RocketMQ源码分析消息PULL-长轮询模式
- 十六、RocketMQ源码分析顺序消息消费实现原理
- 十七、RocketMQ源码分析文件清除机制
- 十八、源码研究RocketMQ主从同步机制(HA)
- 十九、RocketMQ 主从同步读写分离机制
- 二十、RocketMQ源码分析之从官方示例窥探RocketMQ事务消息实现基本思想
- 二十一、RocketMQ源码分析之RocketMQ事务消息实现原理上篇
- 二十二、RocketMQ源码分析之RocketMQ事务消息实现原理中篇—-事务消息状态回查
- 二十三、RocketMQ源码分析之事务消息实现原理下篇-消息服务器Broker提交回滚事务实现原理
- 二十四、RocketMQ事务消息实战
- 二十五、RocketMQ实战:生产环境中,autoCreateTopicEnable为什么不能设置为true
- 二十六、RocketMQ 消息发送system busy、broker busy原因分析与解决方案
- 二十七、RocketMQ HA机制(主从同步)
- 二十八、RocketMQ ACL使用指南
- 二十九、RocketMQ源码分析 ACL实现机制
- 三十、RocketMQ消息轨迹-设计篇
- 三十一、RocketMQ源码分析消息轨迹
- 三十二、RocketMQ一个新的消费组初次启动时从何处开始消费呢?
- 三十三、RocketMQ 多副本前置篇:初探raft协议
- 三十四、源码分析 RocketMQ DLedger 多副本之 Leader 选主
- 三十五、源码分析 RocketMQ DLedger 多副本存储实现
- 三十六、RocketMQ 主题扩分片后遇到的坑