IDEA2023.1.3破解,IDEA破解,IDEA 2023.1破解,最新IDEA激活码

聊聊kingbus的startMasterServer

IDEA2023.1.3破解,IDEA破解,IDEA 2023.1破解,最新IDEA激活码

本文主要研究一下kingbus的startMasterServer

startMasterServer

kingbus/server/server.go

func (s *KingbusServer) startMasterServer(args *config.BinlogServerConfig) error {
    master, err := NewBinlogServer(args, s, s.store, s.applyBroadcast)
    if err != nil {
        log.Log.Errorf("NewBinlogServer error,err:%s,args:%v", err, *args)
        return err
    }
    s.master = master
    s.master.Start()
    log.Log.Infof("startMasterServer success,args:%v", *args)
    return nil
}

  • startMasterServer方法先执行NewBinlogServer创建master,然后执行master的Start方法

NewBinlogServer

kingbus/server/binlog_server.go

//NewBinlogServer create a binlog server
func NewBinlogServer(cfg *config.BinlogServerConfig, ki KingbusInfo, store storage.Storage, broadcast *utils.Broadcast) (*BinlogServer, error) {
    var err error
    s := new(BinlogServer)

    s.started = atomic.NewBool(false)
    s.cfg = cfg
    s.listener, err = net.Listen("tcp", s.cfg.Addr)
    if err != nil {
        log.Log.Errorf("Listen error,err:%s,addr:%s", err, s.cfg.Addr)
        return nil, err
    }
    s.store = store
    s.broadcast = broadcast
    s.kingbusInfo = ki
    s.slaves = make(map[string]*mysql.Slave)
    s.errch = make(chan error, 1)

    return s, nil
}

  • NewBinlogServer方法通过new方法创建BinlogServer,之后设置其listener、store、broadcast等属性

BinlogServer

kingbus/server/binlog_server.go

//BinlogServer is a binlog server,send binlog event to slave.
//The generic process:
//1.authentication
//SHOW GLOBAL VARIABLES LIKE 'BINLOG_CHECKSUM'
//SET @master_binlog_checksum='NONE'
//SET @master_heartbeat_period=%d
//2.COM_REGISTER_SLAVE
//3.semi-sync:
//SHOW VARIABLES LIKE 'rpl_semi_sync_master_enabled';
//SET @rpl_semi_sync_slave = 1
//4.COM_BINLOG_DUMP_GTID
type BinlogServer struct {
    started *atomic.Bool
    cfg     *config.BinlogServerConfig

    listener net.Listener
    errch    chan error

    l      sync.RWMutex
    slaves map[string]*mysql.Slave //key is uuid

    broadcast   *utils.Broadcast
    kingbusInfo KingbusInfo
    store       storage.Storage
}

  • BinlogServer定义了Bool、BinlogServerConfig、Listener、Slave、Broadcast、KingbusInfo、Storage属性

Start

kingbus/server/binlog_server.go

//Start implements binlog server start
func (s *BinlogServer) Start() {
    s.started.Store(true)
    go func() {
        for s.started.Load() {
            select {
            case err := <-s.errch:
                log.Log.Errorf("binlog server Run error,err:%s", err)
                s.Stop()
                return
            default:
                conn, err := s.listener.Accept()
                if err != nil {
                    log.Log.Infof("BinlogServer.Start:Accept error,err:%s", err)
                    continue
                }
                go s.onConn(conn)
            }
        }
    }()
}

  • Start方法先执行s.started.Store(true),然后通过select机制监听s.listener.Accept(),之后执行s.onConn(conn)

onConn

kingbus/server/binlog_server.go

func (s *BinlogServer) onConn(c net.Conn) {
    mysqlConn, err := mysql.NewConn(c, s, s.cfg.User, s.cfg.Password)
    if err != nil {
        log.Log.Errorf("onConn error,err:%s", err)
        return
    }
    mysqlConn.Run()
}

  • onConn方法通过mysql.NewConn创建mysqlConn,然后执行mysqlConn.Run方法

NewConn

kingbus/mysql/conn.go

//NewConn create a Conn
func NewConn(conn net.Conn, s BinlogServer, user string, password string) (*Conn, error) {
    c := new(Conn)

    c.user = user
    c.BaseConn = NewBaseConn(conn)
    c.connectionID = baseConnID.Add(1)
    c.salt, _ = gomysql.RandomBuf(20)
    c.closed = atomic.NewBool(false)
    masterInfo, err := s.GetMasterInfo()
    if err != nil {
        c.BaseConn.Close()
        log.Log.Errorf("NewConn:GetMasterInfo error,err:%s", err)
        return nil, err
    }

    err = c.handshake(masterInfo.Version, password)
    if err != nil {
        c.BaseConn.Close()
        log.Log.Errorf("NewConn:handshake error,err:%s", err)
        return nil, err
    }

    c.ctx, c.cancel = context.WithCancel(context.Background())
    c.userVariables = make(map[string]interface{})
    c.binlogServer = s

    return c, nil
}

  • NewConn方法通过s.GetMasterInfo()获取master信息,然后执行c.handshake(masterInfo.Version, password)

handshake

kingbus/mysql/conn.go

//handshake implements the handshake protocol in mysql
func (c *Conn) handshake(serverVersion, password string) error {
    if err := c.writeInitialHandshake(serverVersion); err != nil {
        return err
    }

    if err := c.readHandshakeResponse(password); err != nil {
        c.writeError(err)
        return err
    }

    if err := c.writeOK(nil); err != nil {
        return err
    }

    c.ResetSequence()

    return nil
}

  • handshake方法实现的是mysql的handshake协议

Run

kingbus/mysql/conn.go

//Run implements handle client request in Conn
func (c *Conn) Run() {
    defer func() {
        r := recover()
        if err, ok := r.(error); ok {
            const size = 4096
            buf := make([]byte, size)
            buf = buf[:runtime.Stack(buf, false)]

            log.Log.Errorf("Conn Run error,err:%s,stack:%s", err, string(buf))
        }
        c.Close()
        log.Log.Debugf("close client connection")
    }()

    for {
        select {
        case <-c.ctx.Done():
            log.Log.Debugf("BinlogServer closed, close connection")
            return
        default:
            data, err := c.ReadPacket()
            if err != nil {
                log.Log.Errorf("ReadPacket error,err:%s", err)
                return
            }

            if err := c.dispatch(c.ctx, data); err != nil {
                log.Log.Errorf("dispatch error,err:%s,data:%v", err.Error(), data)
                //if the error is canceled, means the connection was killed by cmd
                //don't need send error message
                if err != context.Canceled && c.closed.Load() == false {
                    c.writeError(err)
                }
                return
            }

            //if the connection is closed, return from loop
            if c.closed.Load() == true {
                log.Log.Infof("connection status is closed,need return")
                return
            }
            c.Sequence = 0
        }
    }
}

-Run方法接收请求,然后通过c.dispatch(c.ctx, data)进行分发

dispatch

kingbus/mysql/command.go

func (c *Conn) dispatch(ctx context.Context, data []byte) error {
    cmd := data[0]
    data = data[1:]

    switch cmd {
    case gomysql.COM_QUIT:
        c.Close()
        log.Log.Debugf("close client connection")
        return nil
    case gomysql.COM_QUERY:
        return c.handleQuery(utils.BytesToString(data))
    case gomysql.COM_PING:
        return c.writeOK(nil)
    case gomysql.COM_BINLOG_DUMP_GTID:
        return c.handleBinlogDumpGtid(ctx, data)
    case gomysql.COM_REGISTER_SLAVE:
        return c.handleRegisterSlave(data)
    default:
        log.Log.Errorf("master not support this cmd:%v", data)
        return c.writeError(ErrSQLNotSupport)
    }

    return nil
}

  • dispatch根据不同的cmd来执行conn的不同方法

小结

startMasterServer方法先执行NewBinlogServer创建master,然后执行master的Start方法;master的Start方法先执行s.started.Store(true),然后通过select机制监听s.listener.Accept(),之后执行s.onConn(conn);onConn方法通过mysql.NewConn创建mysqlConn,然后执行mysqlConn.Run方法

doc

来源:https://juejin.im/post/5eecc219518825658c1ad024

文章永久链接:https://tech.souyunku.com/?p=16744


Warning: A non-numeric value encountered in /data/wangzhan/tech.souyunku.com.wp/wp-content/themes/dux/functions-theme.php on line 1154
赞(79) 打赏



未经允许不得转载:搜云库技术团队 » 聊聊kingbus的startMasterServer

IDEA2023.1.3破解,IDEA破解,IDEA 2023.1破解,最新IDEA激活码
IDEA2023.1.3破解,IDEA破解,IDEA 2023.1破解,最新IDEA激活码

评论 抢沙发

大前端WP主题 更专业 更方便

联系我们联系我们

觉得文章有用就打赏一下文章作者

微信扫一扫打赏

微信扫一扫打赏


Fatal error: Uncaught Exception: Cache directory not writable. Comet Cache needs this directory please: `/data/wangzhan/tech.souyunku.com.wp/wp-content/cache/comet-cache/cache/https/tech-souyunku-com/index.q`. Set permissions to `755` or higher; `777` might be needed in some cases. in /data/wangzhan/tech.souyunku.com.wp/wp-content/plugins/comet-cache/src/includes/traits/Ac/ObUtils.php:367 Stack trace: #0 [internal function]: WebSharks\CometCache\Classes\AdvancedCache->outputBufferCallbackHandler() #1 /data/wangzhan/tech.souyunku.com.wp/wp-includes/functions.php(5109): ob_end_flush() #2 /data/wangzhan/tech.souyunku.com.wp/wp-includes/class-wp-hook.php(303): wp_ob_end_flush_all() #3 /data/wangzhan/tech.souyunku.com.wp/wp-includes/class-wp-hook.php(327): WP_Hook->apply_filters() #4 /data/wangzhan/tech.souyunku.com.wp/wp-includes/plugin.php(470): WP_Hook->do_action() #5 /data/wangzhan/tech.souyunku.com.wp/wp-includes/load.php(1097): do_action() #6 [internal function]: shutdown_action_hook() #7 {main} thrown in /data/wangzhan/tech.souyunku.com.wp/wp-content/plugins/comet-cache/src/includes/traits/Ac/ObUtils.php on line 367