IDEA2023.1.3破解,IDEA破解,IDEA 2023.1破解,最新IDEA激活码

聊聊rocketmq-client-go的transactionProducer

IDEA2023.1.3破解,IDEA破解,IDEA 2023.1破解,最新IDEA激活码

本文主要研究一下rocketmq-client-go的transactionProducer

transactionProducer

rocketmq-client-go-v2.0.0/producer/producer.go

type transactionProducer struct {
    producer *defaultProducer
    listener primitive.TransactionListener
}

  • transactionProducer定义了producer及listener属性

NewTransactionProducer

rocketmq-client-go-v2.0.0/producer/producer.go

func NewTransactionProducer(listener primitive.TransactionListener, opts ...Option) (*transactionProducer, error) {
    producer, err := NewDefaultProducer(opts...)
    if err != nil {
        return nil, errors.Wrap(err, "NewDefaultProducer failed.")
    }
    return &transactionProducer{
        producer: producer,
        listener: listener,
    }, nil
}

  • NewTransactionProducer方法实例化transactionProducer

Start

rocketmq-client-go-v2.0.0/producer/producer.go

func (tp *transactionProducer) Start() error {
    go primitive.WithRecover(func() {
        tp.checkTransactionState()
    })
    return tp.producer.Start()
}

  • Start方法先异步执行tp.checkTransactionState(),然后执行tp.producer.Start()

checkTransactionState

rocketmq-client-go-v2.0.0/producer/producer.go

func (tp *transactionProducer) checkTransactionState() {
    for ch := range tp.producer.callbackCh {
        switch callback := ch.(type) {
        case *internal.CheckTransactionStateCallback:
            localTransactionState := tp.listener.CheckLocalTransaction(callback.Msg)
            uniqueKey := callback.Msg.GetProperty(primitive.PropertyUniqueClientMessageIdKeyIndex)
            if uniqueKey == "" {
                uniqueKey = callback.Msg.MsgId
            }
            header := &internal.EndTransactionRequestHeader{
                CommitLogOffset:      callback.Header.CommitLogOffset,
                ProducerGroup:        tp.producer.group,
                TranStateTableOffset: callback.Header.TranStateTableOffset,
                FromTransactionCheck: true,
                MsgID:                uniqueKey,
                TransactionId:        callback.Header.TransactionId,
                CommitOrRollback:     tp.transactionState(localTransactionState),
            }

            req := remote.NewRemotingCommand(internal.ReqENDTransaction, header, nil)
            req.Remark = tp.errRemark(nil)

            err := tp.producer.client.InvokeOneWay(context.Background(), callback.Addr.String(), req,
                tp.producer.options.SendMsgTimeout)
            if err != nil {
                rlog.Error("send ReqENDTransaction to broker error", map[string]interface{}{
                    "callback":               callback.Addr.String(),
                    "request":                req.String(),
                    rlog.LogKeyUnderlayError: err,
                })
            }
        default:
            rlog.Error(fmt.Sprintf("unknown type %v", ch), nil)
        }
    }
}

  • checkTransactionState方法遍历tp.producer.callbackCh,根据type来不同处理,目前支持CheckTransactionStateCallback,它会构造EndTransactionRequestHeader执行tp.producer.client.InvokeOneWay

Shutdown

rocketmq-client-go-v2.0.0/producer/producer.go

func (tp *transactionProducer) Shutdown() error {
    return tp.producer.Shutdown()
}

  • Shutdown方法执行tp.producer.Shutdown()

SendMessageInTransaction

rocketmq-client-go-v2.0.0/producer/producer.go

func (tp *transactionProducer) SendMessageInTransaction(ctx context.Context, msg *primitive.Message) (*primitive.TransactionSendResult, error) {
    msg.WithProperty(primitive.PropertyTransactionPrepared, "true")
    msg.WithProperty(primitive.PropertyProducerGroup, tp.producer.options.GroupName)

    rsp, err := tp.producer.SendSync(ctx, msg)
    if err != nil {
        return nil, err
    }
    localTransactionState := primitive.UnknowState
    switch rsp.Status {
    case primitive.SendOK:
        if len(rsp.TransactionID) > 0 {
            msg.WithProperty("__transactionId__", rsp.TransactionID)
        }
        transactionId := msg.GetProperty(primitive.PropertyUniqueClientMessageIdKeyIndex)
        if len(transactionId) > 0 {
            msg.TransactionId = transactionId
        }
        localTransactionState = tp.listener.ExecuteLocalTransaction(msg)
        if localTransactionState != primitive.CommitMessageState {
            rlog.Error("executeLocalTransaction but state unexpected", map[string]interface{}{
                "localState": localTransactionState,
                "message":    msg,
            })
        }

    case primitive.SendFlushDiskTimeout, primitive.SendFlushSlaveTimeout, primitive.SendSlaveNotAvailable:
        localTransactionState = primitive.RollbackMessageState
    default:
    }

    tp.endTransaction(*rsp, err, localTransactionState)

    transactionSendResult := &primitive.TransactionSendResult{
        SendResult: rsp,
        State:      localTransactionState,
    }

    return transactionSendResult, nil
}

  • SendMessageInTransaction方法先执行tp.producer.SendSync(ctx, msg),然后根据rsp.Status来做不同处理;对于primitive.SendOK执行tp.listener.ExecuteLocalTransaction来更新localTransactionState;对于primitive.SendFlushDiskTimeout、primitive.SendFlushSlaveTimeout、primitive.SendSlaveNotAvailable则更新localTransactionState为primitive.RollbackMessageState;最后执行tp.endTransaction

小结

transactionProducer定义了producer及listener属性;它提供了NewTransactionProducer、Start、Shutdown、SendMessageInTransaction方法

doc

  • producer

文章永久链接:https://tech.souyunku.com/?p=25988


Warning: A non-numeric value encountered in /data/wangzhan/tech.souyunku.com.wp/wp-content/themes/dux/functions-theme.php on line 1154
赞(88) 打赏



未经允许不得转载:搜云库技术团队 » 聊聊rocketmq-client-go的transactionProducer

IDEA2023.1.3破解,IDEA破解,IDEA 2023.1破解,最新IDEA激活码
IDEA2023.1.3破解,IDEA破解,IDEA 2023.1破解,最新IDEA激活码

评论 抢沙发

大前端WP主题 更专业 更方便

联系我们联系我们

觉得文章有用就打赏一下文章作者

微信扫一扫打赏

微信扫一扫打赏


Fatal error: Uncaught Exception: Cache directory not writable. Comet Cache needs this directory please: `/data/wangzhan/tech.souyunku.com.wp/wp-content/cache/comet-cache/cache/https/tech-souyunku-com/index.q`. Set permissions to `755` or higher; `777` might be needed in some cases. in /data/wangzhan/tech.souyunku.com.wp/wp-content/plugins/comet-cache/src/includes/traits/Ac/ObUtils.php:367 Stack trace: #0 [internal function]: WebSharks\CometCache\Classes\AdvancedCache->outputBufferCallbackHandler() #1 /data/wangzhan/tech.souyunku.com.wp/wp-includes/functions.php(5109): ob_end_flush() #2 /data/wangzhan/tech.souyunku.com.wp/wp-includes/class-wp-hook.php(303): wp_ob_end_flush_all() #3 /data/wangzhan/tech.souyunku.com.wp/wp-includes/class-wp-hook.php(327): WP_Hook->apply_filters() #4 /data/wangzhan/tech.souyunku.com.wp/wp-includes/plugin.php(470): WP_Hook->do_action() #5 /data/wangzhan/tech.souyunku.com.wp/wp-includes/load.php(1097): do_action() #6 [internal function]: shutdown_action_hook() #7 {main} thrown in /data/wangzhan/tech.souyunku.com.wp/wp-content/plugins/comet-cache/src/includes/traits/Ac/ObUtils.php on line 367