IDEA2023.1.3破解,IDEA破解,IDEA 2023.1破解,最新IDEA激活码

聊聊rocketmq-client-go的remoteBrokerOffsetStore

IDEA2023.1.3破解,IDEA破解,IDEA 2023.1破解,最新IDEA激活码

本文主要研究一下rocketmq-client-go的remoteBrokerOffsetStore

remoteBrokerOffsetStore

rocketmq-client-go-v2.0.0/consumer/offset_store.go

type remoteBrokerOffsetStore struct {
    group       string
    OffsetTable map[primitive.MessageQueue]int64 `json:"OffsetTable"`
    client      internal.RMQClient
    namesrv     internal.Namesrvs
    mutex       sync.RWMutex
}

  • remoteBrokerOffsetStore定义了group、OffsetTable、client、namesrv、mutex属性

NewRemoteOffsetStore

rocketmq-client-go-v2.0.0/consumer/offset_store.go

func NewRemoteOffsetStore(group string, client internal.RMQClient, namesrv internal.Namesrvs) OffsetStore {
    return &remoteBrokerOffsetStore{
        group:       group,
        client:      client,
        namesrv:     namesrv,
        OffsetTable: make(map[primitive.MessageQueue]int64),
    }
}

  • NewRemoteOffsetStore方法实例化remoteBrokerOffsetStore

persist

rocketmq-client-go-v2.0.0/consumer/offset_store.go

func (r *remoteBrokerOffsetStore) persist(mqs []*primitive.MessageQueue) {
    r.mutex.Lock()
    defer r.mutex.Unlock()
    if len(mqs) == 0 {
        return
    }

    used := make(map[primitive.MessageQueue]struct{}, 0)
    for _, mq := range mqs {
        used[*mq] = struct{}{}
    }

    for mq, off := range r.OffsetTable {
        if _, ok := used[mq]; !ok {
            delete(r.OffsetTable, mq)
            continue
        }
        err := r.updateConsumeOffsetToBroker(r.group, mq, off)
        if err != nil {
            rlog.Warning("update offset to broker error", map[string]interface{}{
                rlog.LogKeyConsumerGroup: r.group,
                rlog.LogKeyMessageQueue:  mq.String(),
                rlog.LogKeyUnderlayError: err.Error(),
                "offset":                 off,
            })
        } else {
            rlog.Info("update offset to broker success", map[string]interface{}{
                rlog.LogKeyConsumerGroup: r.group,
                rlog.LogKeyMessageQueue:  mq.String(),
                "offset":                 off,
            })
        }
    }
}

  • persist方法遍历OffsetTable,执行r.updateConsumeOffsetToBroker

remove

rocketmq-client-go-v2.0.0/consumer/offset_store.go

func (r *remoteBrokerOffsetStore) remove(mq *primitive.MessageQueue) {
    r.mutex.Lock()
    defer r.mutex.Unlock()

    delete(r.OffsetTable, *mq)
    rlog.Info("delete mq from offset table", map[string]interface{}{
        rlog.LogKeyMessageQueue: mq,
    })
}

  • remove方法执行delete(r.OffsetTable, *mq)

read

rocketmq-client-go-v2.0.0/consumer/offset_store.go

func (r *remoteBrokerOffsetStore) read(mq *primitive.MessageQueue, t readType) int64 {
    r.mutex.RLock()
    switch t {
    case _ReadFromMemory, _ReadMemoryThenStore:
        off, exist := r.OffsetTable[*mq]
        if exist {
            r.mutex.RUnlock()
            return off
        }
        if t == _ReadFromMemory {
            r.mutex.RUnlock()
            return -1
        }
        fallthrough
    case _ReadFromStore:
        off, err := r.fetchConsumeOffsetFromBroker(r.group, mq)
        if err != nil {
            rlog.Error("fecth offset of mq error", map[string]interface{}{
                rlog.LogKeyMessageQueue:  mq.String(),
                rlog.LogKeyUnderlayError: err,
            })
            r.mutex.RUnlock()
            return -1
        }
        r.mutex.RUnlock()
        r.update(mq, off, true)
        return off
    default:
    }

    return -1
}

  • read方法针对_ReadFromStore会执行r.fetchConsumeOffsetFromBroker

update

rocketmq-client-go-v2.0.0/consumer/offset_store.go

func (r *remoteBrokerOffsetStore) update(mq *primitive.MessageQueue, offset int64, increaseOnly bool) {
    r.mutex.Lock()
    defer r.mutex.Unlock()
    localOffset, exist := r.OffsetTable[*mq]
    if !exist {
        r.OffsetTable[*mq] = offset
        return
    }
    if increaseOnly {
        if localOffset < offset {
            r.OffsetTable[*mq] = offset
        }
    } else {
        r.OffsetTable[*mq] = offset
    }
}

  • update方法更新的是r.OffsetTable[*mq]

fetchConsumeOffsetFromBroker

rocketmq-client-go-v2.0.0/consumer/offset_store.go

func (r *remoteBrokerOffsetStore) fetchConsumeOffsetFromBroker(group string, mq *primitive.MessageQueue) (int64, error) {
    broker := r.namesrv.FindBrokerAddrByName(mq.BrokerName)
    if broker == "" {
        r.namesrv.UpdateTopicRouteInfo(mq.Topic)
        broker = r.namesrv.FindBrokerAddrByName(mq.BrokerName)
    }
    if broker == "" {
        return int64(-1), fmt.Errorf("broker: %s address not found", mq.BrokerName)
    }
    queryOffsetRequest := &internal.QueryConsumerOffsetRequestHeader{
        ConsumerGroup: group,
        Topic:         mq.Topic,
        QueueId:       mq.QueueId,
    }
    cmd := remote.NewRemotingCommand(internal.ReqQueryConsumerOffset, queryOffsetRequest, nil)
    res, err := r.client.InvokeSync(context.Background(), broker, cmd, 3*time.Second)
    if err != nil {
        return -1, err
    }
    if res.Code != internal.ResSuccess {
        return -2, fmt.Errorf("broker response code: %d, remarks: %s", res.Code, res.Remark)
    }

    off, err := strconv.ParseInt(res.ExtFields["offset"], 10, 64)

    if err != nil {
        return -1, err
    }

    return off, nil
}

  • fetchConsumeOffsetFromBroker方法构建QueryConsumerOffsetRequestHeader请求,然后通过r.client.InvokeSync发起请求

updateConsumeOffsetToBroker

rocketmq-client-go-v2.0.0/consumer/offset_store.go

func (r *remoteBrokerOffsetStore) updateConsumeOffsetToBroker(group string, mq primitive.MessageQueue, off int64) error {
    broker := r.namesrv.FindBrokerAddrByName(mq.BrokerName)
    if broker == "" {
        r.namesrv.UpdateTopicRouteInfo(mq.Topic)
        broker = r.namesrv.FindBrokerAddrByName(mq.BrokerName)
    }
    if broker == "" {
        return fmt.Errorf("broker: %s address not found", mq.BrokerName)
    }

    updateOffsetRequest := &internal.UpdateConsumerOffsetRequestHeader{
        ConsumerGroup: group,
        Topic:         mq.Topic,
        QueueId:       mq.QueueId,
        CommitOffset:  off,
    }
    cmd := remote.NewRemotingCommand(internal.ReqUpdateConsumerOffset, updateOffsetRequest, nil)
    return r.client.InvokeOneWay(context.Background(), broker, cmd, 5*time.Second)
}

  • updateConsumeOffsetToBroker方法构建UpdateConsumerOffsetRequestHeader请求,然后通过r.client.InvokeOneWay发起请求

小结

remoteBrokerOffsetStore定义了group、OffsetTable、client、namesrv、mutex属性;它提供了NewRemoteOffsetStore、persist、remove、read、update、fetchConsumeOffsetFromBroker、updateConsumeOffsetToBroker方法

doc

  • offset_store

文章永久链接:https://tech.souyunku.com/?p=25976


Warning: A non-numeric value encountered in /data/wangzhan/tech.souyunku.com.wp/wp-content/themes/dux/functions-theme.php on line 1154
赞(62) 打赏



未经允许不得转载:搜云库技术团队 » 聊聊rocketmq-client-go的remoteBrokerOffsetStore

IDEA2023.1.3破解,IDEA破解,IDEA 2023.1破解,最新IDEA激活码
IDEA2023.1.3破解,IDEA破解,IDEA 2023.1破解,最新IDEA激活码

评论 抢沙发

大前端WP主题 更专业 更方便

联系我们联系我们

觉得文章有用就打赏一下文章作者

微信扫一扫打赏

微信扫一扫打赏


Fatal error: Uncaught Exception: Cache directory not writable. Comet Cache needs this directory please: `/data/wangzhan/tech.souyunku.com.wp/wp-content/cache/comet-cache/cache/https/tech-souyunku-com/index.q`. Set permissions to `755` or higher; `777` might be needed in some cases. in /data/wangzhan/tech.souyunku.com.wp/wp-content/plugins/comet-cache/src/includes/traits/Ac/ObUtils.php:367 Stack trace: #0 [internal function]: WebSharks\CometCache\Classes\AdvancedCache->outputBufferCallbackHandler() #1 /data/wangzhan/tech.souyunku.com.wp/wp-includes/functions.php(5109): ob_end_flush() #2 /data/wangzhan/tech.souyunku.com.wp/wp-includes/class-wp-hook.php(303): wp_ob_end_flush_all() #3 /data/wangzhan/tech.souyunku.com.wp/wp-includes/class-wp-hook.php(327): WP_Hook->apply_filters() #4 /data/wangzhan/tech.souyunku.com.wp/wp-includes/plugin.php(470): WP_Hook->do_action() #5 /data/wangzhan/tech.souyunku.com.wp/wp-includes/load.php(1097): do_action() #6 [internal function]: shutdown_action_hook() #7 {main} thrown in /data/wangzhan/tech.souyunku.com.wp/wp-content/plugins/comet-cache/src/includes/traits/Ac/ObUtils.php on line 367