专注于 JetBrains IDEA 全家桶,永久激活,教程
持续更新 PyCharm,IDEA,WebStorm,PhpStorm,DataGrip,RubyMine,CLion,AppCode 永久激活教程

Zookeeper实战

注册服务跟发现

定义:微服务时代,多个相同的jar包在不同的服务器上开启相同的服务,可以通过nginx在服务端进行负载均衡的配置。也可以通过ZooKeeper在客户端进行负载均衡配置。

1、 多个服务注册
2、 客户端获取中间件地址集合
3、 从集合中随机选一个服务执行任务
4、

19_1.png 19_2.png 19_3.png

服务端代码

用SpringBoot完成一个最简单的web服务,并且连接zk服务器,实现注册功能。 19_4.png ProductController

@RestController
@RequestMapping("/product")
public class ProductController
{

 @RequestMapping("/getProduct/{id}") public Object getProduct(HttpServletRequest request, @PathVariable("id") String id) { return new Product(id, "name:" + request.getLocalPort()); } } 

InitListener


public class InitListener implements ServletContextListener { @Value("${server.port}") private int port; @Override public void contextInitialized(ServletContextEvent sce) { WebApplicationContextUtils.getRequiredWebApplicationContext(sce.getServletContext()).getAutowireCapableBeanFactory().autowireBean(this); try { String hostAddress = InetAddress.getLocalHost().getHostAddress(); ServiceRegister.register(hostAddress,port); } catch (Exception e) { e.printStackTrace(); } } @Override public void contextDestroyed(ServletContextEvent sce) { } }

Product

public class Product {
    private  String id;

    private String name;

 public String getId() { return id; } public void setId(String id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public Product(String id, String name) { this.id = id; this.name = name; } public Product() { } } 

ServiceRegister

public class ServiceRegister
{

 private static final String BASE_SERVICES = "/services";
 private static final String SERVICE_NAME = "/products";
 public static void register(String address, int port) { try { ZooKeeper zooKeeper = new ZooKeeper("127.0.0.1:2181", 5000, (watchedEvent) -> { }); Stat exists = zooKeeper.exists(BASE_SERVICES + SERVICE_NAME, false); if (exists == null) { zooKeeper.create(BASE_SERVICES + SERVICE_NAME, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } String server_path = address + ":" + port; zooKeeper.create(BASE_SERVICES + SERVICE_NAME + "/child", server_path.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); } catch (Exception e) { e.printStackTrace(); } } } 

ProductApp

@SpringBootApplication
public class ProductApp
{
 public static void main(String[] args)
 {
 SpringApplication.run(ProductApp.class, args); } @Bean // 随着服务自动启动 public ServletListenerRegistrationBean servletListenerRegistrationBean() { ServletListenerRegistrationBean servletListenerRegistrationBean = new ServletListenerRegistrationBean(); servletListenerRegistrationBean.setListener(new InitListener()); return servletListenerRegistrationBean; } } 

application.properties

server.port=8080

可以改变server.port端口 比如 80808081 两个端口 来实现同时启动注册两个服务。

业务端代码

19_5.png OrderController

@RequestMapping("/order")
@RestController
public class OrderController
{

 @Resource private RestTemplate restTemplate; private LoadBalance loadBalance = new RamdomLoadBalance(); @RequestMapping("/getOrder/{id}") public Object getOrder(@PathVariable("id") String id) { System.out.println("sowhat1412:" + loadBalance.choseServiceHost()); Product product = restTemplate.getForObject("http://" + loadBalance.choseServiceHost() + "/product/getProduct/1", Product.class); return new Order(id, "orderName", product); } } 

InitListener

public class InitListener implements ServletContextListener {

    private  static final String BASE_SERVICES = "/services";
    private static final String  SERVICE_NAME="/products";

 private ZooKeeper zooKeeper; @Override public void contextInitialized(ServletContextEvent sce) { try { zooKeeper = new ZooKeeper("127.0.0.1:2181",5000,(watchedEvent)->{ if(watchedEvent.getType() == Watcher.Event.EventType.NodeChildrenChanged && watchedEvent.getPath().equals(BASE_SERVICES+SERVICE_NAME)) { updateServiceList(); } }); updateServiceList(); } catch (Exception e) { e.printStackTrace(); } } private void updateServiceList() { try{ List<String> children = zooKeeper.getChildren(BASE_SERVICES + SERVICE_NAME, true); List<String> newServerList = new ArrayList<String>(); for(String subNode:children) { byte[] data = zooKeeper.getData(BASE_SERVICES + SERVICE_NAME + "/" + subNode, false, null); String host = new String(data, "utf-8"); System.out.println("host:"+host); newServerList.add(host); } LoadBalance.SERVICE_LIST = newServerList; }catch (Exception e) { e.printStackTrace(); } } @Override public void contextDestroyed(ServletContextEvent sce) { } } 

Order

public class Order {
    private String id;
    private  String name;
    private Product product;

 public String getId() { return id; } public void setId(String id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public Product getProduct() { return product; } public void setProduct(Product product) { this.product = product; } public Order(String id, String name, Product product) { this.id = id; this.name = name; this.product = product; } public Order() { } } 

Product

public class Product {
    private  String id;

    private String name;

 public String getId() { return id; } public void setId(String id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public Product(String id, String name) { this.id = id; this.name = name; } public Product() { } } 

LoadBalance

public abstract class LoadBalance
{
 public volatile static List<String> SERVICE_LIST;
 //  注意此处用 volatile 修饰的 IP池

 public abstract String choseServiceHost(); } 

public class RamdomLoadBalance extends LoadBalance { @Override public String choseServiceHost() { String result = ""; if (!CollectionUtils.isEmpty(SERVICE_LIST)) { int index = new Random().nextInt(SERVICE_LIST.size()); result = SERVICE_LIST.get(index); } return result; } }

分布式锁

任务通过竞争获取锁才能对该资源进行操作(①竞争锁);当有一个任务在对资源进行更新时(②占有锁),其他任务都不可以对这个资源进行操作(③任务阻塞),直到该任务完成更新(④释放锁);

1、 多任务环境中才需要
2、 任务都需要对同一共享资源进行写操作;
3、 对资源的访问是互斥的

以前的多线程都是一个JVM集群上JUC编程,可以同syn,Lock,AtomicInteger来实现。但是当在集群中的时候就不可用了,因为 JVM 是多个。

老方法

/**
 * @author sowhat
 * @create 2020-06-10 14:37
 */
public class OrderService implements Runnable { private OrderNumGenerator orderNumGenerator = new OrderNumGenerator(); private static CountDownLatch countDownLatch = new CountDownLatch(50); private static List<String> result = new Vector<>(); @Override public void run() { countDownLatch.countDown(); result.add(orderNumGenerator.getNumber_byLock()); } public static void main(String[] args) throws InterruptedException { for (int i = 0; i < 50; i++) { new Thread(new OrderService()).start(); } countDownLatch.await(); Thread.sleep(1000); Collections.sort(result); result.forEach(s -> System.out.println(s)); } } class OrderNumGenerator { public static int count = 0; public String getNumber() { SimpleDateFormat sim = new SimpleDateFormat("yyyy-MM-dd HH-mm-ss"); return sim.format(new Date()) + "-" + (++count); } public static Object obj = new Object(); public String getNumber_syn() { synchronized (obj) { SimpleDateFormat sim = new SimpleDateFormat("yyyy-MM-dd HH-mm-ss"); return sim.format(new Date()) + "-" + (++count); } } private Lock lock = new ReentrantLock(); public String getNumber_byLock() { try { lock.lock(); SimpleDateFormat sim = new SimpleDateFormat("yyyy-MM-dd HH-mm-ss"); return sim.format(new Date()) + "-" + (++count); } finally { lock.unlock(); } } } 

zookeeper 基于同名节点的分布式锁

核心思想:在zk中有一个唯一的临时节点,只有拿到节点的才可以操作数据,没拿到就需要等待。 缺点:可能引发羊群效应,第一个用完后瞬间有999个同时并发的向zk请求获得锁。 19_6.png

package cn.sowhat;

import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
 import java.text.SimpleDateFormat; import java.util.Date; import java.util.concurrent.CountDownLatch; class OrderNumGenerator { public static int count = 0; public String getNumber() { SimpleDateFormat sim = new SimpleDateFormat("yyyy-MM-dd HH-mm-ss"); return sim.format(new Date()) + "-" + (++count); } } public class ZkLock implements Runnable { private OrderNumGenerator orderNumGenerator = new OrderNumGenerator(); private Lock lock = new ZookeeperDistrbuteLock(); @Override public void run() { getNumber(); } public void getNumber() { try { lock.getLock(); String number = orderNumGenerator.getNumber(); System.out.println(Thread.currentThread().getName() + " ,生成ID: " + number); } catch (Exception e) { e.printStackTrace(); } finally { lock.unLock(); } } public static void main(String[] args) { for (int i = 0; i < 50; i++) { new Thread(new ZkLock()).start(); } } } interface Lock { public void getLock(); public void unLock(); } abstract class AbstrackLock implements Lock { @Override public void getLock() { if (tryLock()) { System.out.println("获取lock锁资源"); } else { waitLock(); getLock(); } } public abstract boolean tryLock(); public abstract void waitLock(); } abstract class ZookeeperAbstractLock extends AbstrackLock { private static final String CONN = "127.0.0.1:2181"; protected ZkClient zkClient = new ZkClient(CONN); protected static final String PATH = "/lock"; protected static final String PATH2 = "/lock2"; } class ZookeeperDistrbuteLock extends ZookeeperAbstractLock { private CountDownLatch countDownLatch = null; @Override public boolean tryLock() { try { zkClient.createEphemeral(PATH); // 能成功创建则说明获得锁成功 return true; } catch (Exception e) { return false; } } @Override public void waitLock() { IZkDataListener iZkDataListener = new IZkDataListener() { @Override public void handleDataDeleted(String path) throws Exception {// 唤醒被等待的线程 if (countDownLatch != null) { countDownLatch.countDown(); } } @Override public void handleDataChange(String s, Object data) throws Exception { } }; // 注册事件 监控数据 数据删除了会有反应 zkClient.subscribeDataChanges(PATH, iZkDataListener); if (zkClient.exists(PATH)) { // 如果已经有人用了 当先线程就等着吧,什么时候 出现了删除就会调用 iZkDataListener countDownLatch = new CountDownLatch(1); try { countDownLatch.await(); } catch (Exception e) { e.printStackTrace(); } } zkClient.unsubscribeDataChanges(PATH, iZkDataListener); // 拿到锁需要的了 删除监听 } @Override public void getLock() { super.getLock(); } @Override public void unLock() { if (zkClient != null) { zkClient.delete(PATH); zkClient.close(); } } } 

高性能分布式锁

思想:很简单,每个要操作的不要乱抢了,在zk中进行排队,轮到了再操作数据。这样可以有效避免并发。 19_7.png 19_8.png

package cn.sowhat;

import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
 import java.text.SimpleDateFormat; import java.util.Collections; import java.util.Date; import java.util.List; import java.util.concurrent.CountDownLatch; class OrderNumGenerator { public static int count = 0; public String getNumber() { return ++count + ""; } } public class ZkLock implements Runnable { private OrderNumGenerator orderNumGenerator = new OrderNumGenerator(); private Lock lock = new ZookeeperDistrbuteLock2(); @Override public void run() { getNumber(); } public void getNumber() { try { lock.getLock(); String number = orderNumGenerator.getNumber(); System.out.println(Thread.currentThread().getName() + " ,生成ID: " + number); } catch (Exception e) { e.printStackTrace(); } finally { lock.unLock(); } } public static void main(String[] args) { for (int i = 0; i < 50; i++) { new Thread(new ZkLock()).start(); } } } interface Lock { public void getLock(); public void unLock(); } abstract class AbstrackLock implements Lock { @Override public void getLock() { if (tryLock()) { System.out.println("获取lock锁资源"); } else { waitLock(); getLock(); } } public abstract boolean tryLock(); public abstract void waitLock(); } abstract class ZookeeperAbstractLock extends AbstrackLock { private static final String CONN = "127.0.0.1:2181"; protected ZkClient zkClient = new ZkClient(CONN); protected static final String PATH = "/lock"; protected static final String PATH2 = "/lock2"; } class ZookeeperDistrbuteLock2 extends ZookeeperAbstractLock { private CountDownLatch countDownLatch = null; private String beforePath; private String currentPath; public ZookeeperDistrbuteLock2() { if (!this.zkClient.exists(PATH2)) { this.zkClient.createPersistent(PATH2); // 创建持久性节点 } } @Override public boolean tryLock() { // 如果currentPath为空则为第一次尝试加锁,第一次加锁赋值currentpath if (currentPath == null || currentPath.length() <= 0) { // 创建一个临时 顺序节点 currentPath = this.zkClient.createEphemeralSequential(PATH2 + "/", "lock"); } // 获取所有 临时节点并且排序,临时节点名称为自增的字符串 如: 0000000000400 List<String> children = this.zkClient.getChildren(PATH2); Collections.sort(children); if (currentPath.equals(PATH2 + '/' + children.get(0))) { return true; // 如果当前节点 在所有节点中排名第一则获得锁了 } else { int wz = Collections.binarySearch(children, currentPath.substring(7)); beforePath = PATH2 + '/' + children.get(wz - 1); } return false; } @Override public void waitLock() { IZkDataListener iZkDataListener = new IZkDataListener() { @Override public void handleDataChange(String s, Object o) throws Exception { } @Override public void handleDataDeleted(String s) throws Exception { if(countDownLatch != null){ countDownLatch.countDown(); } } }; // 给排在前面的节点增加数据删除的watcher, 本质上是开启另外一个线程 监听前置节点 this.zkClient.subscribeDataChanges(beforePath,iZkDataListener); if(this.zkClient.exists(beforePath)){ countDownLatch = new CountDownLatch(1); try{countDownLatch.await();} catch(Exception e){ e.printStackTrace(); } } this.zkClient.unsubscribeDataChanges(beforePath,iZkDataListener); } @Override public void getLock() { super.getLock(); } @Override public void unLock() { zkClient.delete(currentPath); zkClient.close(); } } 

集群选举

19_9.png 19_10.png 思想: 其实跟分布式锁一样,就是分布式锁的简化版本,

package cn.sowhat.order;

import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.web.servlet.ServletListenerRegistrationBean; import org.springframework.context.annotation.Bean; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import java.util.Timer; import javax.servlet.ServletContextEvent; import javax.servlet.ServletContextListener; /** * @author sowhat * @create 2020-06-10 18:55 */ class IsMaster { public static boolean isSurvival; } @RestController class IndexController { @RequestMapping("/getserverinfo") public String getServerInfo() { return IsMaster.isSurvival ? "is Master" : "is slave"; } } class InitListener implements ServletContextListener { ZkClient zkClient = new ZkClient("127.0.0.1:2181"); private String path = "/election"; @Value("${server.port}") private String serverPort; private void init() { System.out.println("项目启动完成"); createEphemeral(); zkClient.subscribeDataChanges(path, new IZkDataListener() { @Override public void handleDataChange(String s, Object o) throws Exception { } @Override public void handleDataDeleted(String s) throws Exception { System.out.println("主节点挂了,重新选举"); Thread.sleep(5000); createEphemeral(); } }); } private void createEphemeral() { try { zkClient.createEphemeral(path, serverPort); IsMaster.isSurvival = true; } catch (Exception e) { IsMaster.isSurvival = false; } } @Override public void contextInitialized(ServletContextEvent servletContextEvent) { init(); } @Override public void contextDestroyed(ServletContextEvent servletContextEvent) { } } @SpringBootApplication public class selectMaster { public static void main(String[] args) { SpringApplication.run(selectMaster.class,args); } @Bean public ServletListenerRegistrationBean servletListenerRegistrationBean(){ ServletListenerRegistrationBean servletListenerRegistrationBean = new ServletListenerRegistrationBean(); servletListenerRegistrationBean.setListener(new InitListener()); return servletListenerRegistrationBean; } } 

配置中心

比如mybatis通过URL访问mysql,我们通过动态的修改URL实现从访问A数据库到访问B数据库的切换。核心就监听 dataChange。 19_11.png

ZK注意事项

1、 Zk数据与日志清理

dataDir目录、dataLogDir两个目录会随着时间推移变得庞大,容易造成硬盘满了,清理办法: 自己编写shell脚本,保留最新的n个文件 使用zk自带的zkClient.sh保留最新的n个文件,zkClient.sh –n 15 配置autopurge.snapRetainCount和autopurge.purgeInterval两个参数配合使用;

1、 Too many connections

默认最大连接数 默认为60,配置maxClientCnxns参数,配置单个客户端机器创建的最大连接数;

1、 磁盘管理

磁盘的I/O性能直接制约zookeeper更新操作速度,为了提高zk的写性能建议:使用单独的磁盘,Jvm堆内存设置要小心。

1、 磁盘管理集群数量

集群中机器的数量并不是越多越好,一个写操作需要半数以上的节点ack,所以集群节点数越多,整个集群可以抗挂点的节点数越多(越可靠),但是吞吐量越差。集群的数量必须为奇数;

1、 磁盘管理集群数量

zk是基于内存进行读写操作的,有时候会进行消息广播,因此不建议在节点存取容量比较大的数据;

本文使用 tech.souyunku.com 排版

未经允许不得转载:搜云库技术团队 » Zookeeper实战

JetBrains 全家桶,激活、破解、教程

提供 JetBrains 全家桶激活码、注册码、破解补丁下载及详细激活教程,支持 IntelliJ IDEA、PyCharm、WebStorm 等工具的永久激活。无论是破解教程,还是最新激活码,均可免费获得,帮助开发者解决常见激活问题,确保轻松破解并快速使用 JetBrains 软件。获取免费的破解补丁和激活码,快速解决激活难题,全面覆盖 2024/2025 版本!

联系我们联系我们