接精细Zookeeper(一)基础应用后续…
四、应用场景
1. 数据发布/订阅
发布/订阅有2种设计模式,推Push & 拉Pull。在推模中,服务端将所有数据更新发给订阅的客户端,而拉是由客户端主动发起请求获取最新数据。通常采用轮寻。
zk采用推拉结合,客户端向服务端注册自己需要关注的节点,一旦该节点数据发生变更,服务器像客户端发送Watcher事件通知,收到消息主动向服务端获取最新数据。这种模式主要用于配置信息获取同步。
2. 命名服务
通常客户端能够根据指定名字获取资源实体、服务地址和提供者信息。来实现提供全局唯一ID的分配机制。其中比较常见的RPC就是使用这种机制。
由于zk可以创建顺序节点,保证了同一节点下子节点是唯一的,所以直接按照存放文件的方法,设置节点,比如一个路径下不可能存在两个相同的文件名,这种定义创建节点,就是全局唯一ID,如下
ZkClient zkClient = new ZkClient("127.0.0.1:2181", 5000);
zkClient.createPersistentSequential("/home/zjn/wyid",null);
获取节点直接拼接所定义的路径,方可得到全局唯一ID
3. 集群管理
根据zk的watcher机制和会话结束临时节点自动删除两大特性可实现实时监控机器活动,实现集群管理。
4. Master 选举
在分布式环境中,相同的业务应用分布在不同的机器上,有些业务逻辑(例如一些耗时的计算,网络I/O处理),往往只需要让整个集群中的某一台机器进行执行,其余机器可以共享这个结果,这样可以大大减少重复劳动,提高性能。
利用ZooKeeper的强一致性,能够保证在分布式高并发情况下节点创建的全局唯一性,即:同时有多个客户端请求创建 /currentMaster 节点,最终一定只有一个客户端请求能够创建成功。利用这个特性,就能很轻易的在分布式环境中进行集群选取了。(其实只要实现数据唯一性就可以做到选举,关系型数据库也可以,但是性能不好,设计也复杂)
5. 分布式锁
5.1 实现排他锁
在java中,可以利用zk提供的api实现分布式锁,具体流程为:
- 定义锁:在zk中,通过数据节点表示一个锁,例如 “/exclusive_lock/lock” 可以被定义成一个锁,在使用中可定义个临时节点。
- 获取锁:创建节点成功就是抢锁成功,同时所有没获取到锁的客户需要到”/exclusive_lock” 注册一个子节点变更的Watcher监听,以便实时监听lock节点变更。
- 释放锁:由于定义临时节点,那在以下两种情况有可能释放锁:
- 客户端宕机
- 执行完业务逻辑后,主动删除
释放后,重新获取锁,整个流程如下:
5.2 实现共享锁(读锁)
其实就是与写锁区分开,单独创建一个读的节点 例如 :”/shared_lock/host1-R-0000000001″。
这里说明下,在同一节点下创建任何带顺序子节点,都会使序号递增。
下面摘抄下网络上的共享锁demo,方便配合解释理解:
package cn.itcast.bigdata.zklock;
import java.util.Collections;
import java.util.List;
import java.util.Random;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
public class ZookeeperShareLock {
// 会话超时
private static final int SESSION_TIMEOUT = 2000;
// zookeeper集群地址
private String hosts = "mini1:2181,mini2:2181,mini3:2181";
private String groupNode = "locks";
private String subNode = "sub";
private boolean haveLock = false;
private ZooKeeper zk;
// 记录自己创建的子节点路径
private volatile String thisPath;
/**
* 连接zookeeper
*/
public void connectZookeeper() throws Exception {
zk = new ZooKeeper(hosts, SESSION_TIMEOUT, new Watcher() {
public void process(WatchedEvent event) {
try {
// 判断事件类型,此处只处理子节点变化事件
if (event.getType() == EventType.NodeChildrenChanged && event.getPath().equals("/" + groupNode)) {
//获取子节点,并对父节点进行监听
List<String> childrenNodes = zk.getChildren("/" + groupNode, true);
String thisNode = thisPath.substring(("/" + groupNode + "/").length());
// 去比较是否自己是最小id
Collections.sort(childrenNodes);
if (childrenNodes.indexOf(thisNode) == 0) {
//访问共享资源处理业务,并且在处理完成之后删除锁
doSomething();
//重新注册一把新的锁
thisPath = zk.create("/" + groupNode + "/" + subNode, null, Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL_SEQUENTIAL);
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
});
// 1、程序一进来就先注册一把锁到zk上
thisPath = zk.create("/" + groupNode + "/" + subNode, null, Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL_SEQUENTIAL);
// wait一小会,便于观察
Thread.sleep(new Random().nextInt(1000));
// 从zk的锁父目录下,获取所有子节点,并且注册对父节点的监听
List<String> childrenNodes = zk.getChildren("/" + groupNode, true);
//如果争抢资源的程序就只有自己,则可以直接去访问共享资源
if (childrenNodes.size() == 1) {
doSomething();
thisPath = zk.create("/" + groupNode + "/" + subNode, null, Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL_SEQUENTIAL);
}
}
/**
* 处理业务逻辑,并且在最后释放锁
*/
private void doSomething() throws Exception {
try {
System.out.println("gain lock: " + thisPath);
Thread.sleep(2000);
// do something
} finally {
System.out.println("finished: " + thisPath);
// 将thisPath删除, 监听thisPath的client将获得通知
// 相当于释放锁
zk.delete(this.thisPath, -1);
}
}
public static void main(String[] args) throws Exception {
ZookeeperShareLock dl = new ZookeeperShareLock();
dl.connectZookeeper();
Thread.sleep(Long.MAX_VALUE);
}
}
以上就是实现最简单的共享锁的逻辑。 如果升级为读写锁都有,其实实现读写锁核心就是判断读写顺序就可以 ,如下:
1、 创建完节点后,获取/shared_lock节点下所有子节点,并对该节点变更注册监听。
2、 确定自己的节点序号在所有子节点中的顺序。
3、 对于读请求:若没有比自己序号小的子节点或所有比自己序号小的子节点都是读请求,那么表 明自己已经成功获取到共享锁,同时开始执行读取逻辑,若有写请求,则需要等待。对于写请求:若自己不 是序号最小的子节点,那么需要等待。
4、 接收到Watcher通知后,重复步骤1
5.3 羊群效应
以上实现可以满足一般分布式集群竞争锁的需求(一般集群为10台机器以内),并且性能都还可以。但是机器规模扩大后,会出现问题,如下图:
以上会出现的问题,就是如果 host1完成读操作,并将节点删除,会通知所有节点,但是你如果理解上面讲的判断读写顺序,其实可以分析出,host1移除只影响host2,对其他机器并没有产生影响。如果同一时间有多个节点完成事务给大量节点进行通知,会对网络开销和性能造成巨大影响,这就是所谓的羊群效应。
问题根源就是没有找准客户端真正关注的点,其实分布式锁的竞争核心是只需要关注比自己序号小的那个节点的变更情况就可以,不需要关注所有节点。
改进:
核心:只对比自己小的节点注册Watcher机制
至于demo就不放上来了,这么实现还是比较复杂的,合理运用搜索引擎… 最后建议在实际生产环境中,尽量去缩小锁的范围。能简化开发则简化。毕竟一般公司达不到大规模集群…
6. 分布式队列
6.1 FIFO 先入先出
和创建共享锁差不多,判断自己序号是不是最小,是就等,不是就dosomeing…
6.2 Barrier: 分布式屏障
可以理解成分布式闭锁,特指分布式一个协调条件,规定了一个队列元素必须聚焦后才能统一进行安排,否则一直等待。一般应用在大规模分布式计算应用场景。
设计思想如下:
设置一个/queue_barrier 节点,其数据内容赋值为一个数字n来代表Barrier 值,步骤如下:
1、 创建并设置值
create /queue_barrier 10
get /queue_barrier // 得到10
2、 通过 getChildren 获取所有子节点,同时对注册子节点变更的Watcher监听。
3、 统计,数不足10个,需要等待。
4、 接受Watch通知,重复步骤2
五、深入进阶
1. ZAB协议
Zab协议是为分布式协调服务Zookeeper专门设计的一种支持崩溃恢复的原子广播协议,是Zookeeper保证数据一致性的核心算法。Zab借鉴了Paxos算法,但又不像Paxos那样,是一种通用的分布式一致性算法。它是特别为Zookeeper设计的支持崩溃恢复的原子广播协议。
ZAB核心是定义对于改变zk服务数据状态的事务请求的处理方式:
即:所有事务请求必须由一个全局唯一的服务器来协调处理,这样的服务器被称为Leader服务器,余下的 服务器则称为Follower服务器,Leader服务器负责将一个客户端事务请求转化成一个事务Proposal(提 议),并将该Proposal分发给集群中所有的Follower服务器,之后Leader服务器需要等待所有 Follower服务器的反馈,一旦超过半数的Follower服务器进行了正确的反馈后,那么Leader就会再次向 所有的Follower服务器分发Commit消息,要求其将前一个Proposal进行提交,类似2pc
协议内容
当整个集群启动过程中,或者当 Leader 服务器出现网络中弄断、崩溃退出或重启等异常时,Zab协议就会 进入崩溃恢复模式,选举产生新的Leader。当新的Leader出来了,同时,已有过半机器完成同步之后,ZAB协议将退出恢复模式。进入消息广播模式。这时,如果有一台遵守Zab协议的服务器加入集群,因为此时集群中已经存在一个Leader服务器在广播消息,那么该新加入的服务器自动进入恢复模式:找到Leader服务器,并且完成数据同步。同步完成后,作为新的Follower一起参与到消息广播流程中。如果集群中其他机器收到客户端事务请求后,那么会先转发Leader服务器,由Leader统一处理。
- 崩溃恢复: 一但出现崩溃,会导致数据不一致,ZAB的崩溃恢复开始起作用。有如下两个确保:
- ZAB协议需要确保已经在Leader提交的事务最终被所有服务器提交。
- ZAB协议需要确保丢弃只在Leader服务器上被提出的事务。
最终决定了Leader选举算法:针对上两个要求,如果Leader选举算法保证新选举出来的Leader服务器拥有集群中所有机器最高编号(ZXID最大)的事务Proposal,那么就能保证新的Leader 一定具有已提交的所有提案,更重要是,如果这么做,可以省去Leader服务器检查Proposal的提交和丢弃工作的这一步。
-
消息广播模式 :
- 在zookeeper集群中,数据副本的传递策略就是采用消息广播模式。zookeeper中农数据副本的同步方式与二段提交相似,但是却又不同。二段提交要求协调者必须等到所有的参与者全部反馈ACK确认消息后,再发送commit消息。要求所有的参与者要么全部成功,要么全部失败。二段提交会产生严重的阻塞问题。
- Zab协议中 Leader 等待 Follower 的ACK反馈消息是指“只要半数以上的Follower成功反馈即可,不需要收到全部Follower反馈”
- 整个过程中,Leader为每个事务请求生产对应的Proposal,在广播前,为这个事务分配一个全局唯一ID,为ZXID(事务ID),必须按照递增的事务顺序进行处理。
- 具体流程如上图。
运行时状态分析
- LOOKING:Leader选举阶段
- FOLLOWING:Follower 和Leader保持同步状态
- LEADING:Leader服务器作为主进程领导状态
所有进程初始状态都是LOOKING,之后选举Leader,有了Leader,会变成FOLLOWING,Leader变成 LEADING,当Leader崩溃后,所有进程变成LOOKING,重新选举。当 Leader与所有机器断开TCP连接,其他FOLLOWING会放弃这个Leader,同时转到 LOOKING,之后重新选举。
ZAB与Paxos联系&区别
两者设计目标不一样,ZAB主要用于构建高可用分布式系统,Paxos 算法用于构建一致性状态机器。所有会有细微差别。但是ZAB就是在Paxos保证一致性基础上设计出高可用的协议。
2. 服务器角色
Leader
zk的核心,主要工作如下:
1. 事务请求的唯一调度和处理者,保证集群的顺序性。
2. 集群各服务调度者。
请求处理链:
- PrepRequestProcessor:请求预处理器。如创建请求事务头、事务体、会话检查、ACL检查和版本检查
- ProposalRequestProcessor :事务日志记录处理器。事务处理发起者,非事务请求直接转发CommitProcessor,对于事务请求还需要创建Proposal提议,并发送给所有Follower,进行一次集群内事务投票。同时ProposalRequestProcessor还会将事务请求交给SyncRequestProcessor进行事务日志记录。
- SyncRequestProcessor: 事务日志记录处理器,记录日志,并触发zk进行快照。
- AckRequestProcessor:记录日志后向Proposal的投票收集器发送ACK反馈。告知当前服务器已完成日志记录。
- CommitProcessor : 事务提交处理器。非事务请求,直接交付下一阶段。
- ToBeCommitProcessor : 此处理器有toBeApplied队列,用来存储已经被CommitProcessor处理过的可被提交的Proposal,将这些完成的交给FinalRequestProcessor,处理完成后从队列移除。
- FinalRequestProcessor :用来进行对客户端请求的返回之前的操作,针对事务操作,直接应用到内存数据库。
Follower
跟随者,主要工作如下:
1. 处理非事务请求(读请求),转发事务请求给Leader
2. 参与事务请求Proposal投票
3. 参与Leader选举
Follower也采用了责任链模式处理客户端请求。
- FollowerRequestProcessor : 事务请求转发器
- SendAckRequestProcessor : 事务日志记录反馈,完成日志记录后,向Leader发送ACK消息表面完成工作。
Observer
zk3、3开始引入的角色,观察最新状态,并变更。与Follower不同只是不参与投票、选举,只提供非事务服务。 处理链如下:
SyncRequestProcessor 只是在初始化 记录日志、快照。运行中Leader不会将请求投票发给Observer
3. 服务启动
五个步骤:
1、 配置文件解析
2、 初始化数据管理器
3、 初始化网络I/O管理器
4、 数据恢复
5、 对外服务
- 单机服务器启动流程:
- 集群服务器启动流程:
Leader 选举
1、 初始化Leader选举。集群模式特有,Zookeeper首先会根据自身的服务器ID(SID)、最新的 ZXID(lastLoggedZxid)和当前的服务器epoch(currentEpoch)来生成一个初始化投票,在 初始化过程中,每个服务器都会给自己投票。然后,根据zoo.cfg的配置,创建相应Leader选举算法 实现,Zookeeper提供了三种默认算法(LeaderElection、AuthFastLeaderElection、 FastLeaderElection),可通过zoo.cfg中的electionAlg属性来指定,但现只支持 FastLeaderElection选举算法。在初始化阶段,Zookeeper会创建Leader选举所需的网络I/O层 QuorumCnxManager,同时启动对Leader选举端口的监听,等待集群中其他服务器创建连接。
2、 注册JMX服务。
3、 检测当前服务器状态 运行期间,QuorumPeer会不断检测当前服务器状态。在正常情况下,Zookeeper服务器的状态 在LOOKING、LEADING、FOLLOWING/OBSERVING之间进行切换。在启动阶段,QuorumPeer的初始 状态是LOOKING,因此开始进行Leader选举。
4、 Leader选举 ZooKeeper的Leader选举过程,简单地讲,就是一个集群中所有的机器相互之间进行一系列投 票,选举产生最合适的机器成为Leader,同时其余机器成为Follower或是Observer的集群机器⻆ 色初始化过程。关于Leader选举算法,简而言之,就是集群中哪个机器处理的数据越新(通常我们根 据每个服务器处理过的最大ZXID来比较确定其数据是否更新),其越有可能成为Leader。当然,如 果集群中的所有机器处理的ZXID一致的话,那么SID最大的服务器成为Leader,其余机器称为 Follower和Observer
Leader Follower 启动期交互过程
4. Leader选举
当zk集群一台服务器出现以下2种情况之一时候,需要进入Leader选举。
1. 服务器初始化启动
2. 服务器运行无法和Leader保持连接。
分析以上两种情况:
1、 启动时期选举:
* 每个Server发出一个投票
* 接受来自各个服务器的投票
* 处理投票(优先检查ZXID,相同就比较myid)
* 统计投票( 判断是否已经有过半的机器接收到相同的投票信息,所谓“过半”就是指大于集群机器数量的一半,即大于或等于 (n/2+1)。对于这里由3台机器构成的集群,大于等于2台即为达到“过半”要求。)
* 改变服务器状态:Leader->LEADING, Follower->FOLLOWING
2、 服务运行期间的Leader选举:
* 变更状态:Leader挂后,剩下的Follower都变成LOOKING,进入Leader选举
* 每个Server发出投票,第一轮都投自己,然后将自己投票发给所有机器
* 接收投票,与启动选举相同
* 处理投票,与启动选举相同
* 统计投票,与启动选举相同
* 改变服务器状态,与启动选举相同