永久链接: https://tech.souyunku.com/?p=6306
作者:zkp_java | 出处:https://blog.csdn.net/zkp_java/article/category/8044591
zookeeper原生api的不足
zookeeper原生api存在以下不足之处:
- 连接的创建是异步的,需要开发人员自行编码实现等待;
- 连接没有自动的超时重连机制;
- Zk本身不提供序列化机制,需要开发人员自行指定,从而实现数据的序列化和反序列化;
- Watcher注册一次只会生效一次,需要不断的重复注册;
- Watcher本身的使用方式不符合java本身的术语,如果采用监听器的方式,更容易理解;
- 不支持递归创建树形节点;
zookeeper第三方开源客户端
zookeeper的第三方开源客户端主要有zkClient和Curator。其中zkClient解决了session会话超时重连、Watcher反复注册等问题,提供了更加简洁的api,但zkClient社区不活跃,文档不够完善。而Curator是Apache基金会的顶级项目之一,它解决了session会话超时重连、Watcher反复注册、NodeExitsException异常等问题,Curator具有更加完善的文档,因此我们这里只学习Curator的使用。
Curator客户端api介绍
Curator包含了如下几个包:
- curator-framework:对zookeeper底层api的一些封装;
- curator-client:提供一些客户端的操作,如重试策略等;
- curator-recipes:封装了一些高级特性,如Cache事件监听、选举、分布式锁、分布式计数器、分布式Barrier等
首先我们在gradle中引入curator:
dependencies {
compile ('org.apache.zookeeper:zookeeper:3.4.13')
compile ('org.apache.curator:curator-framework:4.0.1') {
exclude group: 'org.apache.zookeeper', module: 'zookeeper'
}
compile ('org.apache.curator:curator-recipes:4.0.1') {
exclude group: 'org.apache.zookeeper', module: 'zookeeper'
}
compile ('org.apache.curator:curator-client:4.0.1') {
exclude group: 'org.apache.zookeeper', module: 'zookeeper'
}
}
Note: 为什么要exclude zookeeper模块请参考文档ZooKeeper Version Compatibility
curator提供了一种类似jdk8中stream一样的流式操作。
创建zookeeper会话
Curator
中org.apache.curator.framework.CuratorFrameworkFactory
类提供了如下两个创建zookeeper会话的方法:
public static CuratorFramework newClient(String connectString, RetryPolicy retryPolicy)
public static CuratorFramework newClient(String connectString, int sessionTimeoutMs, int connectionTimeoutMs, RetryPolicy retryPolicy)
该方法返回一个org.apache.curator.framework.CuratorFramework
类型的对象,参数说明如下:
- connectString:逗号分开的
ip:port
对; - sessionTimeoutMs:会话超时时间,单位为毫秒,默认是60000ms,指连接建立完后多久没有收到心跳检测,超过该时间即为会话超时;
- connectionTimeoutMs:连接创建超时时间,单位为毫秒,默认是15000ms,指客户端与服务端建立连接时多长时间没连接上就算超时;
- retryPolicy:重试策略,
retryPolicy
的类型定义如下
/**
* Abstracts the policy to use when retrying connections
*/
public interface RetryPolicy
{
/**
* Called when an operation has failed for some reason. This method should return
* true to make another attempt.
*
*
* @param retryCount the number of times retried so far (0 the first time),第几次重试
* @param elapsedTimeMs the elapsed time in ms since the operation was attempted,到当前重试时刻总的重试时间
* @param sleeper use this to sleep - DO NOT call Thread.sleep,重试策略
* @return true/false
*/
public boolean allowRetry(int retryCount, long elapsedTimeMs, RetrySleeper sleeper);
}
allowRetry
返回true继续重试,返回false不再重试
可以通过实现该接口来自定义策略,curator已经为我们提供了若干重试策略:
- ExponentialBackoffRetry:该重试策略随着重试次数的增加,sleep的时间呈指数增长,该提供了两个构造方法
public ExponentialBackoffRetry(int baseSleepTimeMs, int maxRetries)
public ExponentialBackoffRetry(int baseSleepTimeMs, int maxRetries, int maxSleepMs)
第retryCount
次重试的sleep时间计算方式为:baseSleepTimeMs * Math.max(1, random.nextInt(1 << (retryCount + 1)))
,如果该值大于maxSleepMs
,则sleep时间为maxSleepMs
,如果重试次数大于maxRetries
,则不再重试;
- RetryNTimes:该重试策略重试指定次数,每次sleep固定的时间,构造方法如下
public RetryNTimes(int n, int sleepMsBetweenRetries)
n
是重试的次数,sleepMsBetweenRetries
是sleep的时间;
- RetryOneTime:该重试策略只重试一次
- RetryUntilElapsed:该重试策略对重试次数不做限制,但对总的重试时间做限制,构造方法如下
public RetryUntilElapsed(int maxElapsedTimeMs, int sleepMsBetweenRetries)
maxElapsedTimeMs
是最大的重试时间,sleepMsBetweenRetries
是sleep的时间间隔;
通过newClient
获得CuratorFramework
对象后我们就可以进行各种操作了。
除了newClient
,CuratorFrameworkFactory
还提供了一种Builder
的方式来创建CuratorFramework
对象,如下的示例所示:
RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 5);
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("192.168.0.102:2181,192.168.0.102:2182,192.168.0.102:2183")
.sessionTimeoutMs(30000).connectionTimeoutMs(15000)
.retryPolicy(retryPolicy)
.namespace("curatorTest")
.build();
创建zookeeper节点
在curator中无论执行何种操作都必须先获得一个构建该操作的包装类(Builder对象),创建zookeeper节点需要先获得一个org.apache.curator.framework.api.CreateBuilder
(实际上是CreateBuilder
的实现类CreateBuilderImpl
)对象,然后用这个对象来构建创建节点的操作,CreateBuilderImpl
中常见的操作如下:
// 递归创建(持久)父目录
public ProtectACLCreateModeStatPathAndBytesable<String> creatingParentsIfNeeded()
// 设置创建节点的属性
public ACLBackgroundPathAndBytesable<String> withMode(CreateMode mode)
// 设置节点的acl属性
public ACLBackgroundPathAndBytesable<String> withACL(List<ACL> aclList, boolean applyToParents)
// 指定创建节点的路径和节点上的数据
public String forPath(final String givenPath, byte[] data) throws Exception
如下所示为创建一个节点的示例:
String test1Data = client.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT)
.withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
.forPath("/curatorTest/test1", "test1".getBytes());
删除zookeeper节点
同理先调用CuratorFramework
的delete()
获取构建删除操作的DeleteBuilder
(实际上为DeleteBuilderImpl
),DeleteBuilderImpl
提供了如下方法来构建删除操作:
// 指定要删除数据的版本号
public BackgroundPathable<Void> withVersion(int version)
// 确保数据被删除,本质上就是重试,当删除失败时重新发起删除操作
public ChildrenDeletable guaranteed()
// 指定删除的节点
public Void forPath(String path) throws Exception
// 递归删除子节点
public BackgroundVersionable deletingChildrenIfNeeded()
读取zookeeper节点数据
同理先调用CuratorFramework
的getData()
获取构建获取数据操作的GetDataBuilder
(实际上为GetDataBuilderImpl
),GetDataBuilderImpl
提供了如下方法来构建读取操作:
// 将节点状态信息保存到stat
public WatchPathable<byte[]> storingStatIn(Stat stat)
// 指定节点路径
public byte[] forPath(String path) throws Exception
如下示例为获取节点数据:
Stat test1Stat = new Stat();
byte[] test1DataBytes = client.getData().storingStatIn(test1Stat).forPath("/curatorTest/test1");
System.out.println("test1 data: " + new String(test1DataBytes));
更新zookeeper节点数据
同理先调用CuratorFramework
的setData()
获取构建获取数据操作的SetDataBuilder
(实际上为SetDataBuilderImpl
),SetDataBuilderImpl
提供了如下方法来构建更新操作:
// 指定版本号
public BackgroundPathAndBytesable<Stat> withVersion(int version)
// 指定节点路径和要更新的数据
public Stat forPath(String path, byte[] data) throws Exception
示例程序:
test1Stat = client.setData()
.withVersion(-1)
.forPath("/curatorTest/test1", "test1DataV2".getBytes());
读取zookeeper子节点
同理先调用CuratorFramework
的getChildren()
获取构建获取子节点数据操作的GetChildrenBuilder
(实际上为GetChildrenBuilderImpl
),GetChildrenBuilderImpl
提供了如下方法来构建更新操作:
// 把服务器端获取到的状态数据存储到stat对象中
public WatchPathable<List<String>> storingStatIn(Stat stat)
// 指定获取子节点数据的节点路径
public List<String> forPath(String path) throws Exception
// 设置watcher,类似于zookeeper本身的api,也只能使用一次
public BackgroundPathable<List<String>> usingWatcher(Watcher watcher)
public BackgroundPathable<List<String>> usingWatcher(CuratorWatcher watcher)
示例程序:
Stat childStat = new Stat();
List<String> childs = client.getChildren().storingStatIn(childStat).forPath("/curatorTest");
curator中关于异步操作
curator为所有操作都提供了异步执行的版本,只需要在构建操作的方法链中添加如下操作之一即可:
public ErrorListenerPathable<List<String>> inBackground()
public ErrorListenerPathable<List<String>> inBackground(Object context)
public ErrorListenerPathable<List<String>> inBackground(BackgroundCallback callback)
public ErrorListenerPathable<List<String>> inBackground(BackgroundCallback callback, Object context)
public ErrorListenerPathable<List<String>> inBackground(BackgroundCallback callback, Executor executor)
public ErrorListenerPathable<List<String>> inBackground(BackgroundCallback callback, Object context, Executor executor)
如下示例程序为使用异步执行删除操作:
client.delete()
.guaranteed()
.withVersion(-1)
.inBackground(((client1, event) -> {
System.out.println(event.getPath() + ", data=" + event.getData());
System.out.println("event type=" + event.getType());
System.out.println("event code=" + event.getResultCode());
}))
.forPath("/curatorTest/test1");
curator中的NodeCache
NodeCache
会将某一路径的节点(节点本身
)在本地缓存一份,当zookeeper中相应路径的节点发生更新、创建或者删除操作时,NodeCache
将会得到响应,并且会将最新的数据拉到本地缓存中,NodeCache只会监听路径本身的变化,并不会监听子节点的变化
。我们可以通过NodeCache
注册一个监听器来获取发生变化的通知。NodeCache
提供了如下构造函数:
public NodeCache(CuratorFramework client, String path)
public NodeCache(CuratorFramework client, String path, boolean dataIsCompressed)
参数说明:
- client: curator客户端;
- path: 需要缓存的节点路径;
- dataIsCompressed:是否压缩节点下的数据;
NodeCache
提供了一个如下类型的监听器容器,只要往容器中添加监听器,当节点发生变更时,容器中的监听器都将得到通知。
private final ListenerContainer<NodeCacheListener> listeners = new ListenerContainer<NodeCacheListener>();
NodeCache
缓存数据及添加Listener的示例代码如下:
NodeCache nodeCache = new NodeCache(client, "/curatorTest/test1");
// 是否立即拉取/curatorTest/test1节点下的数据缓存到本地
nodeCache.start(true);
// 添加listener
nodeCache.getListenable().addListener(() -> {
ChildData childData = nodeCache.getCurrentData();
if (null != childData) {
System.out.println("path=" + childData.getPath() + ", data=" + childData.getData() + ";");
}
});
Note:
NodeCache只会缓存节点本身的数据和状态,并不会缓存节点下的子节点信息,所以如果我们在节点下创建子节点,NodeCache中的Listener是不会得到通知的*
curator中的PathChildrenCache
PathChildrenCache
会将指定路径节点下的所有子节点缓存在本地,但不会缓存节点本身的信息,当执行新增(CHILD_ADDED)、删除(CHILD_REMOVED)、更新(CHILD_UPDATED)指定节点下的子节点等操作时,PathChildrenCache
中的Listener将会得到通知,PathChildrenCache
提供了如下几个构造函数:
public PathChildrenCache(CuratorFramework client, String path, boolean cacheData)
public PathChildrenCache(CuratorFramework client, String path, boolean cacheData, ThreadFactory threadFactory)
public PathChildrenCache(CuratorFramework client, String path, boolean cacheData, boolean dataIsCompressed, ThreadFactory threadFactory)
public PathChildrenCache(CuratorFramework client, String path, boolean cacheData, boolean dataIsCompressed, final ExecutorService executorService)
public PathChildrenCache(CuratorFramework client, String path, boolean cacheData, boolean dataIsCompressed, final CloseableExecutorService executorService)
参数说明:
- client:curator客户端;
- path:缓存的节点路径;
- cacheData:除了缓存节点状态外是否缓存节点数据,如果为true,那么客户端在接收到节点列表变更的同时,也能够获取到节点的数据内容,如果为false,则无法获取到数据内容;
- threadFactory:线程池工厂,当内部需要开启新的线程执行时,使用该线程池工厂来创建线程;
- dataIsCompressed:是否压缩节点数据;
- executorService:线程池;
PathChildrenCache
通过start
方法可以传入三种启动模式,这三种启动模式定义在org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode
中:
- NORMAL:异步初始化cache;
- BUILD_INITIAL_CACHE:同步初始化cache,以及创建cache后,就从服务器拉取对应的数据;
- POST_INITIALIZED_EVENT:异步初始化cache,初始化完成触发
PathChildrenCacheEvent.Type#INITIALIZED
事件,cache中Listener会收到该事件的通知;
PathChildrenCache
示例代码如下:
PathChildrenCache pathChildrenCache = new PathChildrenCache(client, "/curatorTest", true);
// startMode为BUILD_INITIAL_CACHE,cache是初始化完成会发送INITIALIZED事件
pathChildrenCache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
System.out.println(pathChildrenCache.getCurrentData().size());
pathChildrenCache.getListenable().addListener(((client1, event) -> {
ChildData data = event.getData();
switch (event.getType()) {
case INITIALIZED:
System.out.println("子节点cache初始化完成(StartMode为POST_INITIALIZED_EVENT的情况)");
System.out.println("INITIALIZED: " + pathChildrenCache.getCurrentData().size());
break;
case CHILD_ADDED:
System.out.println("添加子节点,path=" + data.getPath() + ", data=" + new String(data.getData()));
break;
case CHILD_UPDATED:
System.out.println("更新子节点,path=" + data.getPath() + ", data=" + new String(data.getData()));
break;
case CHILD_REMOVED:
System.out.println("删除子节点,path=" + data.getPath());
break;
default:
System.out.println(event.getType());
}
}));
curator完整示例代码
如下所示为演示curator使用的完整示例代码:
package com.ctrip.flight.test.zookeeper;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.Stat;
import java.util.List;
public class CuratorTest {
public static void main(String[] args) throws Exception {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 5);
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("192.168.0.104:2181,192.168.0.104:2182,192.168.0.104:2183")
.sessionTimeoutMs(30000).connectionTimeoutMs(15000)
.retryPolicy(retryPolicy)
//.namespace("curatorTest")
.build();
client.start();
// 判断节点是否存在,存在则先删除节点
Stat test1Stat = client.checkExists().forPath("/curatorTest/test1");
if (null != test1Stat) {
client.delete().guaranteed().deletingChildrenIfNeeded().withVersion(-1).forPath("/curatorTest/test1");
}
// 创建节点
String test1Data = client.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT)
.withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
.forPath("/curatorTest/test1", "test1DataV1".getBytes());
// 获取节点信息
test1Stat = new Stat();
byte[] test1DataBytes = client.getData().storingStatIn(test1Stat).forPath("/curatorTest/test1");
System.out.println("test1 stat: " + test1Stat);
System.out.println("test1 data: " + new String(test1DataBytes));
// 更新节点数据
test1Stat = client.setData()
.withVersion(-1)
.forPath("/curatorTest/test1", "test1DataV2".getBytes());
System.out.println("test1 stat: " + test1Stat);
// 获取所有子节点
Stat childStat = new Stat();
List<String> childs = client.getChildren().storingStatIn(childStat).forPath("/curatorTest");
System.out.println("curatorTest childs: " + childs);
// client.delete()
// .guaranteed()
// .withVersion(-1)
// .inBackground(((client1, event) -> {
// System.out.println(event.getPath() + ", data=" + event.getData());
// System.out.println("event type=" + event.getType());
// System.out.println("event code=" + event.getResultCode());
// }))
// .forPath("/curatorTest/test1");
// 缓存节点
NodeCache nodeCache = new NodeCache(client, "/curatorTest/test1");
nodeCache.start(true);
nodeCache.getListenable().addListener(() -> {
System.out.println("NodeCache:");
ChildData childData = nodeCache.getCurrentData();
if (null != childData) {
System.out.println("path=" + childData.getPath() + ", data=" + new String(childData.getData()) + ";");
}
});
// 缓存子节点
PathChildrenCache pathChildrenCache = new PathChildrenCache(client, "/curatorTest", true);
// startMode为BUILD_INITIAL_CACHE,cache是初始化完成会发送INITIALIZED事件
pathChildrenCache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
System.out.println(pathChildrenCache.getCurrentData().size());
pathChildrenCache.getListenable().addListener(((client1, event) -> {
ChildData data = event.getData();
switch (event.getType()) {
case INITIALIZED:
System.out.println("子节点cache初始化完成(StartMode为POST_INITIALIZED_EVENT的情况)");
System.out.println("INITIALIZED: " + pathChildrenCache.getCurrentData().size());
break;
case CHILD_ADDED:
System.out.println("添加子节点,path=" + data.getPath() + ", data=" + new String(data.getData()));
break;
case CHILD_UPDATED:
System.out.println("更新子节点,path=" + data.getPath() + ", data=" + new String(data.getData()));
break;
case CHILD_REMOVED:
System.out.println("删除子节点,path=" + data.getPath());
break;
default:
System.out.println(event.getType());
}
}));
Thread.sleep(20000000);
}
}