IDEA2023.1.3破解,IDEA破解,IDEA 2023.1破解,最新IDEA激活码

十九、RocketMQ 主从同步读写分离机制

IDEA2023.1.3破解,IDEA破解,IDEA 2023.1破解,最新IDEA激活码

文章永久连接:https://tech.souyunku.com/?p=6037

作者:唯有坚持不懈 | 出处:https://blog.csdn.net/prestigeding/article/details/78888290


关于主从同步最新理解:RocketMQ 主从同步若干问题答疑

RocketMQ在消息拉取时是如何根据消息消费队列MessageQueue来选择Broker的呢?消息消费队列如图所示:
img_0914_01_1.png
RocketMQ根据MessageQueue查找Broker地址的唯一依据便是brokerName,从RocketMQ的Broker组织实现来看,同一组Broker(M-S)服务器,其brokerName相同,主服务器的brokerId为0,从服务器的brokerId大于0,那RocketMQ根据brokerName如何定位到哪一台Broker上来呢?

PullAPIWrapper#pullKernelImpl

FindBrokerResult findBrokerResult =
            this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),
                this.recalculatePullFromWhichNode(mq), false);

RocketMQ的MQClientInstance类提供了根据brokerName、brokerId查找Broker地址的方法,返回值如图:
img_0914_01_2.png
MQClientInstance#findBrokerAddressInSubscribe

public FindBrokerResult findBrokerAddressInSubscribe(
        final String brokerName,
        final long brokerId,
        final boolean onlyThisBroker
    ) {
        String brokerAddr = null;
        boolean slave = false;
        boolean found = false;

        HashMap<Long/* brokerId */, String/* address */> map = this.brokerAddrTable.get(brokerName);
        if (map != null && !map.isEmpty()) {
            brokerAddr = map.get(brokerId);
            slave = brokerId != MixAll.MASTER_ID;
            found = brokerAddr != null;

            if (!found && !onlyThisBroker) {
                Entry<Long, String> entry = map.entrySet().iterator().next();
                brokerAddr = entry.getValue();
                slave = entry.getKey() != MixAll.MASTER_ID;
                found = true;
            }
        }

        if (found) {
            return new FindBrokerResult(brokerAddr, slave, findBrokerVersion(brokerName, brokerAddr));
        }

        return null;
    }
  • brokerName:broker名称;brokerId:brokerId;onlyThisBroker:是否必须返回brokerId的broker对应的服务器信息。
  • brokerAddrTable地址缓存表中根据brokerName获取所有的broker信息。brokerAddrTable的存储格式如:brokerName:{brokerId:brokerAddress}。
  • 根据brokerId从broker主从缓存表中获取指定broker名称,如果根据brokerId未找到相关条目,此时如果onlyThisBroker为false,则随机返回broker中任意一个Broker,否则返回null。
  • 组装FindBrokerResult时,需要设置是否是slave这个属性。如果brokerId=0表示返回的broker是主节点,否则返回的是从节点。

上述方法,根据brokerName是如何获取brokerId的呢?

请看MQClientInstance#recalculatePullFromWhichNode:

public long recalculatePullFromWhichNode(final MessageQueue mq) {
        if (this.isConnectBrokerByUser()) {
            return this.defaultBrokerId;
        }

        AtomicLong suggest = this.pullFromWhichNodeTable.get(mq);
        if (suggest != null) {
            return suggest.get();
        }

        return MixAll.MASTER_ID;
    }

首先从pullFromWhichNodeTable缓存表中获取该消息消费队列的brokerId,如果找到,则返回,否则返回brokerName的主节点。由此可以看出pullFromWhichNodeTable中存放的是消息队列建议从从哪个Broker服务器拉取消息的缓存表,其存储结构:MessageQueue:AtomicLong,那该信息从何而来呢?

原来消息消费拉取线程PullMessageService根据PullRequest请求从主服务器拉取消息后会返回下一次建议拉取的brokerId,消息消费者线程在收到消息后,会根据主服务器的建议拉取brokerId来更新pullFromWhichNodeTable,消息消费者线程更新pullFromWhichNodeTable的代码如下:

PullAPIWrapper#processPullResult

this.updatePullFromWhichNode(mq, pullResultExt.getSuggestWhichBrokerId());
public void updatePullFromWhichNode(final MessageQueue mq, final long brokerId) {
        AtomicLong suggest = this.pullFromWhichNodeTable.get(mq);
        if (null == suggest) {
            this.pullFromWhichNodeTable.put(mq, new AtomicLong(brokerId));
        } else {
            suggest.set(brokerId);
        }
    }

那服务端是如何计算下一次拉取建议从哪台Broker服务器拉取消息呢?

请看:DefaultMessageStore#getMessage

long diff = maxOffsetPy - maxPhyOffsetPulling;
long memory = (long) (StoreUtil.TOTAL_PHYSICAL_MEMORY_SIZE * (this.messageStoreConfig.getAccessMessageInMemoryMaxRatio() / 100.0));
getResult.setSuggestPullingFromSlave(diff > memory);
  • maxOffsetPy:代表当前主服务器消息存储文件最大偏移量,maxPhyOffsetPulling:此次拉取消息最大偏移量。
  • diff:对于PullMessageService线程来说,当前未被拉取到消息消费端的消息长度。
  • TOTAL_PHYSICAL_MEMORY_SIZE:RocketMQ所在服务器总内存大小;accessMessageInMemoryMaxRatio:表示RocketMQ所能使用的最大内存比例,超过该内存,消息将被置换出内存;memory表示RocketMQ消息常驻内存的大小,超过该大小,RocketMQ会将旧的消息置换会磁盘。
  • 如果diff大于memory,表示当前需要拉取的消息已经超出了常驻内存的大小,表示主服务器繁忙,此时才建议从从服务器拉取。

PullMessageProcessor#processRequest

if (getMessageResult.isSuggestPullingFromSlave()) {
     responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getWhichBrokerWhenConsumeSlowly());
} else {
     responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);
}

当 GetResult 的 suggestPullingFromSlave 为真是,将会直接返回消息消费组的配置信息whichBrokerWhenConsumeSlowly,默认为1,可以通过客户端命令updateSubGroup配置当主服务器繁忙时,建议从哪个从服务器读取消息。

注意:RocketMQ 读写分离不按套路出牌,并不是主服务器只负责消息发送,消息从服务器主要负责消息拉取,而是只有当主服务器消息拉取出现堆积时才将拉取任务转向从服务器。


备注:本文是《RocketMQ技术内幕》的前期素材,建议关注笔者的书籍:《RocketMQ技术内幕》。

干货推荐

本站推荐:精选优质专栏

专栏汇总:RabbitMQ 源码解析

专栏汇总:Dubbo 源码分析

专栏汇总:Tomcat源码分析

附录:RocketMQ源码分析,系列文章


Warning: A non-numeric value encountered in /data/wangzhan/tech.souyunku.com.wp/wp-content/themes/dux/functions-theme.php on line 1154
赞(76) 打赏



未经允许不得转载:搜云库技术团队 » 十九、RocketMQ 主从同步读写分离机制

IDEA2023.1.3破解,IDEA破解,IDEA 2023.1破解,最新IDEA激活码
IDEA2023.1.3破解,IDEA破解,IDEA 2023.1破解,最新IDEA激活码

评论 抢沙发

大前端WP主题 更专业 更方便

联系我们联系我们

觉得文章有用就打赏一下文章作者

微信扫一扫打赏

微信扫一扫打赏


Fatal error: Uncaught Exception: Cache directory not writable. Comet Cache needs this directory please: `/data/wangzhan/tech.souyunku.com.wp/wp-content/cache/comet-cache/cache/https/tech-souyunku-com/index.q`. Set permissions to `755` or higher; `777` might be needed in some cases. in /data/wangzhan/tech.souyunku.com.wp/wp-content/plugins/comet-cache/src/includes/traits/Ac/ObUtils.php:367 Stack trace: #0 [internal function]: WebSharks\CometCache\Classes\AdvancedCache->outputBufferCallbackHandler() #1 /data/wangzhan/tech.souyunku.com.wp/wp-includes/functions.php(5109): ob_end_flush() #2 /data/wangzhan/tech.souyunku.com.wp/wp-includes/class-wp-hook.php(303): wp_ob_end_flush_all() #3 /data/wangzhan/tech.souyunku.com.wp/wp-includes/class-wp-hook.php(327): WP_Hook->apply_filters() #4 /data/wangzhan/tech.souyunku.com.wp/wp-includes/plugin.php(470): WP_Hook->do_action() #5 /data/wangzhan/tech.souyunku.com.wp/wp-includes/load.php(1097): do_action() #6 [internal function]: shutdown_action_hook() #7 {main} thrown in /data/wangzhan/tech.souyunku.com.wp/wp-content/plugins/comet-cache/src/includes/traits/Ac/ObUtils.php on line 367