zookeeper用来做分布式锁,主要原理是同一路径下的节点名称不能重复,zookeeper是怎么保证节点唯一性的呢?大致看了源码,发现创建节点的方法,是用synchronized修饰的。
com.apache.zookeeper.server.DataTree这个类是zk节点的结构体,里面有createNode方法用来创建节点
/**
* Add a new node to the DataTree.
* @param path
* Path for the new node.
* @param data
* Data to store in the node.
* @param acl
* Node acls
* @param ephemeralOwner
* the session id that owns this node. -1 indicates this is not
* an ephemeral node.
* @param zxid
* Transaction ID
* @param time
* @param outputStat
* A Stat object to store Stat output results into.
* @throws NodeExistsException
* @throws NoNodeException
* @throws KeeperException
*/
public void createNode(final String path, byte data[], List<ACL> acl,
long ephemeralOwner, int parentCVersion, long zxid, long time, Stat outputStat)
throws KeeperException.NoNodeException,
KeeperException.NodeExistsException {
int lastSlash = path.lastIndexOf('/');
String parentName = path.substring(0, lastSlash);
String childName = path.substring(lastSlash + 1);
StatPersisted stat = new StatPersisted();
stat.setCtime(time);
stat.setMtime(time);
stat.setCzxid(zxid);
stat.setMzxid(zxid);
stat.setPzxid(zxid);
stat.setVersion(0);
stat.setAversion(0);
stat.setEphemeralOwner(ephemeralOwner);
DataNode parent = nodes.get(parentName);
if (parent == null) {
throw new KeeperException.NoNodeException();
}
synchronized (parent) {
Set<String> children = parent.getChildren();
if (children.contains(childName)) {
throw new KeeperException.NodeExistsException();
}
if (parentCVersion == -1) {
parentCVersion = parent.stat.getCversion();
parentCVersion++;
}
parent.stat.setCversion(parentCVersion);
parent.stat.setPzxid(zxid);
Long longval = aclCache.convertAcls(acl);
DataNode child = new DataNode(data, longval, stat);
parent.addChild(childName);
nodes.put(path, child);
EphemeralType ephemeralType = EphemeralType.get(ephemeralOwner);
if (ephemeralType == EphemeralType.CONTAINER) {
containers.add(path);
} else if (ephemeralType == EphemeralType.TTL) {
ttls.add(path);
} else if (ephemeralOwner != 0) {
HashSet<String> list = ephemerals.get(ephemeralOwner);
if (list == null) {
list = new HashSet<String>();
ephemerals.put(ephemeralOwner, list);
}
synchronized (list) {
list.add(path);
}
}
if (outputStat != null) {
child.copyStat(outputStat);
}
}
// now check if its one of the zookeeper node child
if (parentName.startsWith(quotaZookeeper)) {
// now check if its the limit node
if (Quotas.limitNode.equals(childName)) {
// this is the limit node
// get the parent and add it to the trie
pTrie.addPath(parentName.substring(quotaZookeeper.length()));
}
if (Quotas.statNode.equals(childName)) {
updateQuotaForPath(parentName
.substring(quotaZookeeper.length()));
}
}
// also check to update the quotas for this node
String lastPrefix = getMaxPrefixWithQuota(path);
if(lastPrefix != null) {
// ok we have some match and need to update
updateCount(lastPrefix, 1);
updateBytes(lastPrefix, data == null ? 0 : data.length);
}
dataWatches.triggerWatch(path, Event.EventType.NodeCreated);
childWatches.triggerWatch(parentName.equals("") ? "/" : parentName,
Event.EventType.NodeChildrenChanged);
}
可以看到参数path就是节点的路径,该方法首先会截取节点的父路径(parentName),还有当前节点的名称(childName),通过parentName去nodes中获取一个DataNode对象,nodes就是一个ConcurrentHashMap,key是节点路径,value是节点的DataNode对象。
int lastSlash = path.lastIndexOf('/');
String parentName = path.substring(0, lastSlash);
String childName = path.substring(lastSlash + 1);
如果parentName取出的DataNode为空,说明路径不存在,就抛出NoNodeException异常。
DataNode parent = nodes.get(parentName);
if (parent == null) {
throw new KeeperException.NoNodeException();
}
如果parentName取出的DataNode不为空,就对该节点加锁 synchronized(parent)
synchronized (parent)
接着获取父节点下的所有子节点列表,这里是个Set,然后判断要添加的节点名称是否存在,如果存在就抛出NodeExistsException异常
Set<String> children = parent.getChildren();
if (children.contains(childName)) {
throw new KeeperException.NodeExistsException();
}
接着就是设置一些节点信息,然后创建一个子节点对应的DataNode,将子节点对应的DataNode加到父类子节点的列表中,然后将节点路径 和 对应的DataNode加入到nodes的ConcurrentHashmap中。
DataNode child = new DataNode(data, longval, stat);
parent.addChild(childName);
nodes.put(path, child);
这里看到有个addChild方法,就是将节点加到父类的子节点列表中。
/**
* Method that inserts a child into the children set
*
* @param child
* to be inserted
* @return true if this set did not already contain the specified element
*/
public synchronized boolean addChild(String child) {
if (children == null) {
// let's be conservative on the typical number of children
children = new HashSet<String>(8);
}
return children.add(child);
}
可以看到这个方法也是一个同步方法,所以创建节点使用了同步方法来保证节点的唯一性。
在集群环境下,如果follower节点接收到了添加/修改/删除等操作指令,会将这些指令发给leader节点来统一执行,在通过上述代码,做到节点唯一。