专注于 JetBrains IDEA 全家桶,永久激活,教程
持续更新 PyCharm,IDEA,WebStorm,PhpStorm,DataGrip,RubyMine,CLion,AppCode 永久激活教程

八、ZooKeeper 进阶: Leader选举源码分析

永久链接: https://tech.souyunku.com/6310

作者:zkp_java | 出处:https://blog.csdn.net/zkp_java/article/category/8044591

Zookeeper启动的main方法是org.apache.zookeeper.server.quorum.QuorumPeerMain类的main方法:

public static void main(String[] args) {
        QuorumPeerMain main = new QuorumPeerMain();
        try {
            main.initializeAndRun(args);
        } ...
        ...
    }

进入到initializeAndRun方法:

protected void initializeAndRun(String[] args)
        throws ConfigException, IOException
    {
         // args[0]是传入的配置文件
        QuorumPeerConfig config = new QuorumPeerConfig();
        if (args.length == 1) {
            // 解析配置文件
            config.parse(args[0]);
        }

        // Start and schedule the the purge task
        // 启动一个定时清除日志的任务
        DatadirCleanupManager purgeMgr = new DatadirCleanupManager(config
                .getDataDir(), config.getDataLogDir(), config
                .getSnapRetainCount(), config.getPurgeInterval());
        purgeMgr.start();
         // 我们是集群启动Zookeeper,必须使用到配置文件,因此接下来运行到runFromConfig
        if (args.length == 1 && config.servers.size() > 0) {
            runFromConfig(config);
        } else {
            LOG.warn("Either no config or no quorum defined in config, running "
                    + " in standalone mode");
            // there is only server in the quorum -- run as standalone
            ZooKeeperServerMain.main(args);
        }
    }

这里传入的配置文件就是Zookeeper根目录下的conf/zoo.cfg文件(可参考启动脚本zkServer.sh),该配置文件可以配置哪些配置项可以参考QuorumPeerConfig的解析配置文件过程。接下来进入runFromConfig方法:

public void runFromConfig(QuorumPeerConfig config) throws IOException {
      try {
          ManagedUtil.registerLog4jMBeans();
      } catch (JMException e) {
          LOG.warn("Unable to register log4j JMX control", e);
      }

      LOG.info("Starting quorum peer");
      try {
          // cnxnFactory用于处理本节点上的一些IO操作,默认情况下是NIOServerCnxnFactory的对象
          ServerCnxnFactory cnxnFactory = ServerCnxnFactory.createFactory();
          // 配置本节点监听的客户端端口和连接数
          cnxnFactory.configure(config.getClientPortAddress(),
                                config.getMaxClientCnxns());
            // QuorumPeer是zk的重要类之一
          quorumPeer = getQuorumPeer();

            // 设置从配置文件读取到的所有配置节点(zoo.cfg中的server.1=localhost:2888:3888配置项)
          quorumPeer.setQuorumPeers(config.getServers());
          quorumPeer.setTxnFactory(new FileTxnSnapLog(
                  new File(config.getDataLogDir()),
                  new File(config.getDataDir())));
          // 设置使用的Leader选举算法
          quorumPeer.setElectionType(config.getElectionAlg());
          quorumPeer.setMyid(config.getServerId());
          quorumPeer.setTickTime(config.getTickTime());
          quorumPeer.setInitLimit(config.getInitLimit());
          quorumPeer.setSyncLimit(config.getSyncLimit());
          quorumPeer.setQuorumListenOnAllIPs(config.getQuorumListenOnAllIPs());
          quorumPeer.setCnxnFactory(cnxnFactory);
          quorumPeer.setQuorumVerifier(config.getQuorumVerifier());
          quorumPeer.setClientPortAddress(config.getClientPortAddress());
          quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout());
          quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout());
          quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));
          quorumPeer.setLearnerType(config.getPeerType());
          quorumPeer.setSyncEnabled(config.getSyncEnabled());

          // sets quorum sasl authentication configurations
          quorumPeer.setQuorumSaslEnabled(config.quorumEnableSasl);
          if(quorumPeer.isQuorumSaslAuthEnabled()){
              quorumPeer.setQuorumServerSaslRequired(config.quorumServerRequireSasl);
              quorumPeer.setQuorumLearnerSaslRequired(config.quorumLearnerRequireSasl);
              quorumPeer.setQuorumServicePrincipal(config.quorumServicePrincipal);
              quorumPeer.setQuorumServerLoginContext(config.quorumServerLoginContext);
              quorumPeer.setQuorumLearnerLoginContext(config.quorumLearnerLoginContext);
          }

          quorumPeer.setQuorumCnxnThreadsSize(config.quorumCnxnThreadsSize);
          quorumPeer.initialize();

          quorumPeer.start();
          quorumPeer.join();
      } catch (InterruptedException e) {
          // warn, but generally this is ok
          LOG.warn("Quorum Peer interrupted", e);
      }
    }

runFromConfig方法主要将从zoo.cfg中读取到的配置设置到QuorumPeer的对象属性中区,QuorumPeer继承了ZooKeeperThread类,而ZooKeeperThread继承了Thread类,因此QuorumPeer是一个线程类,稍后我们将分析其run方法。进入到quorumPeer.start()方法:

    @Override
    public synchronized void start() {
         // 从快照文件(dataDir目录下snapshot文件)中恢复会话和zk节点信息
        loadDataBase();
        // 处理IO事件(暂不了解这个的作用,之后的文章会对这个进行分析,现在跳过不影响我们分析Leader的选举)
        cnxnFactory.start();
        // 开始进行Leader的选举        
        startLeaderElection();
        // 开始监听本节点的状态,根据不同的状态(LOOKING、OBSERVING、FOLLOWING、LEADING)执行相应操作
        super.start();
    }

专栏推荐

专栏汇总:2000+ 道 Java互联网大厂面试题

专栏汇总:RabbitMQ 源码解析

专栏汇总:Dubbo 源码分析

专栏汇总:Tomcat源码分析

专栏汇总:RocketMQ 源码分析

附录 ZooKeeper 进阶 ,系列文章

一、ZooKeeper 进阶:基本介绍

二、ZooKeeper 进阶:持久节点、临时节点及ACL

三、ZooKeeper 进阶:watcher的使用及原理

四、ZooKeeper 进阶:分布式系统的理解

五、ZooKeeper 进阶:自带客户端原生api的使用

六、ZooKeeper 进阶:开源客户端curator

七、ZooKeeper 进阶:选举及数据一致性

八、ZooKeeper 进阶: Leader选举源码分析

未经允许不得转载:搜云库技术团队 » 八、ZooKeeper 进阶: Leader选举源码分析

JetBrains 全家桶,激活、破解、教程

提供 JetBrains 全家桶激活码、注册码、破解补丁下载及详细激活教程,支持 IntelliJ IDEA、PyCharm、WebStorm 等工具的永久激活。无论是破解教程,还是最新激活码,均可免费获得,帮助开发者解决常见激活问题,确保轻松破解并快速使用 JetBrains 软件。获取免费的破解补丁和激活码,快速解决激活难题,全面覆盖 2024/2025 版本!

联系我们联系我们