专注于 JetBrains IDEA 全家桶,永久激活,教程
持续更新 PyCharm,IDEA,WebStorm,PhpStorm,DataGrip,RubyMine,CLion,AppCode 永久激活教程

springboot项目中zookeeper分布式锁的实现

创建项目
springboot-zookeeper-demo

导入开发包

​
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-zookeeper-discovery</artifactId>
    <version>2.2.1.RELEASE</version>
</dependency>
<dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>3.6.0</version>
    <exclusions>
        <exclusion>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
        </exclusion>
        <exclusion>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
        </exclusion>
    </exclusions>
</dependency>
<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>4.3.0</version>
</dependency>
<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-client</artifactId>
    <version>4.3.0</version>
</dependency>​

添加配置项

spring.cloud.zookeeper.session-timeout-ms=6000
spring.cloud.zookeeper.connection-timeout-ms=6000
#spring.cloud.zookeeper.discovery.register=true
#spring.cloud.zookeeper.discovery.root=/
#spring.cloud.zookeeper.discovery.enabled=true
#spring.cloud.zookeeper.discovery.instanceHost=127.0.0.1
#spring.cloud.zookeeper.prefix=
spring.cloud.zookeeper.enabled=true
spring.cloud.zookeeper.connect-string=127.0.0.1:2181
spring.cloud.zookeeper.max-retries=3
spring.cloud.zookeeper.base-sleep-time-ms=1000
spring.cloud.zookeeper.max-sleep-ms=500

编写配置类

import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.zookeeper.ZookeeperProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class ZkConfiguration {

    @Value("${spring.cloud.zookeeper.connect-string}")
    private String zookeeperServer;
    @Value("${spring.cloud.zookeeper.session-timeout-ms}")
    private int sessionTimeoutMs;
    @Value("${spring.cloud.zookeeper.connection-timeout-ms}")
    private int connectionTimeoutMs;
    @Value("${spring.cloud.zookeeper.max-retries}")
    private int maxRetries;
    @Value("${spring.cloud.zookeeper.base-sleep-time-ms}")
    private int baseSleepTimeMs;

    @Bean(initMethod = "init", destroyMethod = "stop")
    public ZkClientUtil zkClient(ZookeeperProperties zookeeperProperties) {
        ZkClientUtil zkClient = new ZkClientUtil(zookeeperProperties);
        zkClient.setZookeeperServer(zookeeperProperties.getConnectString());
        zkClient.setSessionTimeoutMs(sessionTimeoutMs);
        zkClient.setConnectionTimeoutMs(connectionTimeoutMs);
        zkClient.setMaxRetries(zookeeperProperties.getMaxRetries());
        zkClient.setBaseSleepTimeMs(zookeeperProperties.getBaseSleepTimeMs());
        return zkClient;
    }
}

编写工具类

import org.apache.commons.lang.StringUtils;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.TreeCache;
import org.apache.curator.framework.recipes.cache.TreeCacheListener;
import org.apache.curator.framework.recipes.locks.InterProcessReadWriteLock;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.zookeeper.ZookeeperProperties;
import java.io.File;
import java.net.InetAddress;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ZkClientUtil {

    private final Logger logger = LoggerFactory.getLogger(this.getClass());

    private CuratorFramework client;
    private ZookeeperProperties zookeeperProperties;
    public TreeCache cache;
    private String zookeeperServer;
    private int sessionTimeoutMs;
    private int connectionTimeoutMs;
    private int baseSleepTimeMs;
    private int maxRetries;

    public ZkClientUtil(ZookeeperProperties zookeeperProperties) {
        this.zookeeperProperties = zookeeperProperties;
    }

    public void setZookeeperServer(String zookeeperServer) {
        this.zookeeperServer = zookeeperServer;
    }

    public String getZookeeperServer() {
        return zookeeperServer;
    }

    public void setSessionTimeoutMs(int sessionTimeoutMs) {
        this.sessionTimeoutMs = sessionTimeoutMs;
    }

    public int getSessionTimeoutMs() {
        return sessionTimeoutMs;
    }

    public void setConnectionTimeoutMs(int connectionTimeoutMs) {
        this.connectionTimeoutMs = connectionTimeoutMs;
    }

    public int getConnectionTimeoutMs() {
        return connectionTimeoutMs;
    }

    public void setBaseSleepTimeMs(int baseSleepTimeMs) {
        this.baseSleepTimeMs = baseSleepTimeMs;
    }

    public int getBaseSleepTimeMs() {
        return baseSleepTimeMs;
    }

    public void setMaxRetries(int maxRetries) {
        this.maxRetries = maxRetries;
    }

    public int getMaxRetries() {
        return maxRetries;
    }

    public void init() {
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(baseSleepTimeMs, maxRetries);
        client = CuratorFrameworkFactory.builder().connectString(zookeeperServer).retryPolicy(retryPolicy)
                .sessionTimeoutMs(sessionTimeoutMs).connectionTimeoutMs(connectionTimeoutMs).build();
        client.start();
    }

    public void stop() {
        client.close();
    }

    public CuratorFramework getClient() {
        return client;
    }

    public String register() {
        String forPath = "";
        try {
            String rootPath = "/" + "services";
            String hostAddress = InetAddress.getLocalHost().getHostAddress();
            String serviceInstance = "prometheus" + "-" + hostAddress + "-";
            forPath = client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(rootPath + "/" + serviceInstance);
        } catch (Exception e) {
            logger.error("注册出错", e);
        }
        return forPath;
    }

    public List<String> getChildren(String path) {
        List<String> childrenList = new ArrayList<>();
        try {
            childrenList = client.getChildren().forPath(path);
        } catch (Exception e) {
            logger.error("获取子节点出错", e);
        }
        return childrenList;
    }

    public int getChildrenCount(String path) {
        return getChildren(path).size();
    }

    public List<String> getInstances() {
        return getChildren("/services");
    }

    public int getInstancesCount() {
        return getInstances().size();
    }

    /**
     * 初始化本地缓存
     *
     * @param watchRootPath
     * @throws Exception
     */
    private void initLocalCache(String watchRootPath) throws Exception {
        cache = new TreeCache(client, watchRootPath);
        TreeCacheListener listener = (client1, event) -> {
            logger.info("event:" + event.getType() +
                    " |path:" + (null != event.getData() ? event.getData().getPath() : null));

            if (event.getData() != null && event.getData().getData() != null) {
                logger.info("发生变化的节点内容为:" + new String(event.getData().getData()));
            }
        };
        cache.getListenable().addListener(listener);
        cache.start();
    }

    /**
     * 创建节点
     *
     * @param mode     节点类型
     *                 1、PERSISTENT 持久化目录节点,存储的数据不会丢失。
     *                 2、PERSISTENT_SEQUENTIAL顺序自动编号的持久化目录节点,存储的数据不会丢失
     *                 3、EPHEMERAL临时目录节点,一旦创建这个节点的客户端与服务器端口也就是session 超时,这种节点会被自动删除
     *                 4、EPHEMERAL_SEQUENTIAL临时自动编号节点,一旦创建这个节点的客户端与服务器端口也就是session 超时,这种节点会被自动删除,并且根据当前已近存在的节点数自动加 1,然后返回给客户端已经成功创建的目录节点名。
     * @param path     节点名称
     * @param nodeData 节点数据
     */
    public String createNode(CreateMode mode, String path, String nodeData) {
        String forPath = "";
        try {
            if (StringUtils.isBlank(path)){
                path = File.pathSeparatorChar + "";
            } else {
                if (!(path.startsWith("/") || path.startsWith("\\"))) {
                    path = File.pathSeparatorChar + path;
                }
            }
            //使用creatingParentContainersIfNeeded()之后Curator能够自动递归创建所有所需的父节点
            forPath = client.create().creatingParentsIfNeeded().withMode(mode).forPath(path, nodeData.getBytes("UTF-8"));
        } catch (Exception e) {
            logger.error("注册出错", e);
        }
        return forPath;
    }

    /**
     * 创建节点
     *
     * @param mode 节点类型
     *             1、PERSISTENT 持久化目录节点,存储的数据不会丢失。
     *             2、PERSISTENT_SEQUENTIAL顺序自动编号的持久化目录节点,存储的数据不会丢失
     *             3、EPHEMERAL临时目录节点,一旦创建这个节点的客户端与服务器端口也就是session 超时,这种节点会被自动删除
     *             4、EPHEMERAL_SEQUENTIAL临时自动编号节点,一旦创建这个节点的客户端与服务器端口也就是session 超时,这种节点会被自动删除,并且根据当前已近存在的节点数自动加 1,然后返回给客户端已经成功创建的目录节点名。
     * @param path 节点名称
     */
    public String createNode(CreateMode mode, String path) {
        String forPath = "";
        try {
            if (StringUtils.isBlank(path)){
                path = File.pathSeparatorChar + "";
            } else {
                if (!(path.startsWith("/") || path.startsWith("\\"))) {
                    path = File.pathSeparatorChar + path;
                }
            }
            //使用creatingParentContainersIfNeeded()之后Curator能够自动递归创建所有所需的父节点
            forPath = client.create().creatingParentsIfNeeded().withMode(mode).forPath(path);
        } catch (Exception e) {
            logger.error("注册出错", e);
        }
        return forPath;
    }

    /**
     * 删除节点数据
     *
     * @param path
     */
    public void deleteNode(final String path) {
        try {
            deleteNode(path, true);
        } catch (Exception ex) {
            logger.error("", ex);
        }
    }

    /**
     * 删除节点数据
     *
     * @param path
     * @param deleteChildre 是否删除子节点
     */
    public void deleteNode(final String path, Boolean deleteChildre) {
        try {
            if (deleteChildre) {
                //guaranteed()删除一个节点,强制保证删除,
                // 只要客户端会话有效,那么Curator会在后台持续进行删除操作,直到删除节点成功
                client.delete().guaranteed().deletingChildrenIfNeeded().forPath(path);
            } else {
                client.delete().guaranteed().forPath(path);
            }
        } catch (Exception e) {
            logger.error("", e);
        }
    }

    /**
     * 设置指定节点的数据
     *
     * @param path
     * @param datas
     */
    public void setNodeData(String path, byte[] datas) {
        try {
            client.setData().forPath(path, datas);
        } catch (Exception ex) {
            logger.error("", ex);
        }
    }

    /**
     * 获取指定节点的数据
     *
     * @param path
     * @return
     */
    public byte[] getNodeData(String path) {
        Byte[] bytes = null;
        try {
            if (cache != null) {
                ChildData data = cache.getCurrentData(path);
                if (data != null) {
                    return data.getData();
                }
            }
            client.getData().forPath(path);
            return client.getData().forPath(path);
        } catch (Exception ex) {
            logger.error("", ex);
        }
        return null;
    }

    /**
     * 获取数据时先同步
     *
     * @param path
     * @return
     */
    public byte[] synNodeData(String path) {
        client.sync();
        return getNodeData(path);
    }

    /**
     * 判断路径是否存在
     *
     * @param path
     * @return
     */
    public boolean isExistNode(final String path) {
        client.sync();
        try {
            return null != client.checkExists().forPath(path);
        } catch (Exception ex) {
            logger.error("", ex);
            return false;
        }
    }

    /**
     * 随机读取一个path子路径, "/"为根节点对应该namespace
     * 先从cache中读取,如果没有,再从zookeeper中查询
     *
     * @param path
     * @return
     */
    public String getRandomData(String path) {
        try {
            Map<String, ChildData> cacheMap = cache.getCurrentChildren(path);
            if (cacheMap != null && cacheMap.size() > 0) {
                logger.debug("get random value from cache,path=" + path);
                Collection<ChildData> values = cacheMap.values();
                List<ChildData> list = new ArrayList<>(values);
                Random rand = new Random();
                byte[] b = list.get(rand.nextInt(list.size())).getData();
                return new String(b, "utf-8");
            }
            if (isExistNode(path)) {
                logger.debug("path [{}] is not exists,return null", path);
                return null;
            } else {
                logger.debug("read random from zookeeper,path=" + path);
                List<String> list = client.getChildren().forPath(path);
                if (list == null || list.size() == 0) {
                    logger.debug("path [{}] has no children return null", path);
                    return null;
                }
                Random rand = new Random();
                String child = list.get(rand.nextInt(list.size()));
                path = path + "/" + child;
                byte[] b = client.getData().forPath(path);
                return new String(b, "utf-8");
            }
        } catch (Exception e) {
            logger.error("", e);
        }
        return null;
    }

    /**
     * 可重入共享锁  -- Shared Reentrant Lock
     * @param lockPath
     * @param time
     * @param dealWork 获取
     * @return
     */
    /*public Object getSRLock(String lockPath,long time, SRLockDealCallback<?> dealWork){
        InterProcessMutex lock = new InterProcessMutex(client, lockPath);
        try {
            if (!lock.acquire(time, TimeUnit.SECONDS)) {
                logger.error("get lock fail:{}", " could not acquire the lock");
                return null;
            }
            logger.debug("{} get the lock",lockPath);
            Object b = dealWork.deal();
            return b;
        }catch(Exception e){
            logger.error("", e);
        }finally{
            try {
                lock.release();
            } catch (Exception e) {
                //log.error("",e);
            }
        }
        return null;
    }*/

    /**
     * 获取读写锁
     *
     * @param path
     * @return
     */
    public InterProcessReadWriteLock getReadWriteLock(String path) {
        return new InterProcessReadWriteLock(client, path);
    }

    /**
     * 在注册监听器的时候,如果传入此参数,当事件触发时,逻辑由线程池处理
     */
    private ExecutorService pool = Executors.newFixedThreadPool(2);

    /**
     * 监听数据节点的变化情况
     *
     * @param watchPath
     * @param listener
     */
    public void watchPath(String watchPath, TreeCacheListener listener) {
        // NodeCache nodeCache = new NodeCache(client, watchPath, false);
        TreeCache cache = new TreeCache(client, watchPath);
        cache.getListenable().addListener(listener, pool);
        try {
            cache.start();
        } catch (Exception e) {
            logger.error("", e);
        }
    }
}

测试

    @Test
    public void lockTest1() {
        final CuratorFramework client = zkClient.getClient();
        //client.start();
        final InterProcessMutex lock = new InterProcessMutex(client, "/curator/lock");
        final CountDownLatch down = new CountDownLatch(1);
        for (int i = 0; i < 10; i++) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        down.await();
                        lock.acquire();
                    } catch (Exception e) {
                    }
                    SimpleDateFormat format = new SimpleDateFormat("HH:mm:ss.SSS");
                    String orderNo = format.format(new Date());
                    System.out.println(">>>>>>生成的订单号是: " + orderNo);
                    try {
                        lock.release();
                    } catch (Exception e) {
                    }
                }
            }).start();
        }
        down.countDown();
        zkClient.stop();
    }

111_1.png

InterProcessMutex类通过CuratorFramework和path构造,当执行了acquire方法的时候会在/curator/lock节点下生成尾号带顺序的节点.
zookeeper服务会获取/curator/lock节点下所有的等待锁的临时节点,最小的节点获得锁,并非最小的节点会等待上一个节点释放锁,释放锁的时候会删除临时节点,所以并非最小的节点会在小1的节点注册事件,一旦监听到事件,会立马唤醒所有等待的线程

文章永久链接:https://tech.souyunku.com/24886

未经允许不得转载:搜云库技术团队 » springboot项目中zookeeper分布式锁的实现

JetBrains 全家桶,激活、破解、教程

提供 JetBrains 全家桶激活码、注册码、破解补丁下载及详细激活教程,支持 IntelliJ IDEA、PyCharm、WebStorm 等工具的永久激活。无论是破解教程,还是最新激活码,均可免费获得,帮助开发者解决常见激活问题,确保轻松破解并快速使用 JetBrains 软件。获取免费的破解补丁和激活码,快速解决激活难题,全面覆盖 2024/2025 版本!

联系我们联系我们