专注于 JetBrains IDEA 全家桶,永久激活,教程
持续更新 PyCharm,IDEA,WebStorm,PhpStorm,DataGrip,RubyMine,CLion,AppCode 永久激活教程

Dubbo服务引入

前言

简介

在dubbo中,我们可以使用两种方式引用远程服务,一种是使用服务直连的方式引用服务[仅适合测试或调试环境],还有一种是基于注册中心进行服务引用,一般使用zookeeper做为注册中心,像我们公司就是使用的zk做为注册中心来进行服务的注册。

引入时机

万事俱备,只欠东风,这里时间很重要,正所谓,谋事在人,成事在天,程序和生活比起来更加有人情味的一点就在于他的可预见性,对于服务的引入有两个固定的时机,一个是Spring容器中调用ReferenceBean 时,另外一个是ReferenceBean 被注入到其他类中引用时,他们两个的区别在于,第一个是饿汉式,另外一个是懒汉式,具体可以参考一下单例模式,下面,我们看一下对于服务引入这一块的源码

源码分析

服务引入的入口方法是ReferenceBean,dubbo使用了spring的自定义标签解析,实体类ReferenceBean对应标签 dubbo:reference,话不多说,我们先看一下ReferenceBean的类结构图,然后在正式进入分析环节

96_1.png
这里我是直接用dubbo源码自带的demo进行调试,服务引用的入口方法为 ReferenceBean 的 getObject 方法,这个方法定义在 Spring 的 FactoryBean 接口中,ReferenceBean 实现了这个方法

   #ReferenceBean.java
    @Override
    public Object getObject() throws Exception {
        return get();
    }

     //ReferenceConfig.java
     public synchronized T get() {
        if (destroyed) {
            throw new IllegalStateException("Already destroyed!");
        }
        if (ref == null) {
            init();
        }
        return ref;
    }  

这里官网有一段说明, 2.6.4 及以下版本的 getObject 方法进行调试到if (ref == null)时,ref!=null,可以修改 IDEA Enable ‘toString’ object view,具体请查看官方文档

由上面的代码可以看出,主要执行逻辑在init()


# ReferenceConfig.java private void init() { #避免重复初始化 if (initialized) { return; } initialized = true; #interfaceName 你所暴露的接口 验证接口是否合理 if (interfaceName == null || interfaceName.length() == 0) { throw new IllegalStateException("<dubbo:reference interface=\"\" /> interface not allow null!"); } # 检测 consumer #变量是否为空,为空则创建,appendPropertie会填充我们配置的标签的bean属性 checkDefault(); appendProperties(this); # 是否使用泛化接口,默认使用<dubbo:consumer>的Generic if (getGeneric() == null && getConsumer() != null) { setGeneric(getConsumer().getGeneric()); } # 检查是否为泛化类 if (ProtocolUtils.isGeneric(getGeneric())) { interfaceClass = GenericService.class; } else { try { interfaceClass = Class.forName(interfaceName, true, Thread.currentThread() .getContextClassLoader()); } catch (ClassNotFoundException e) { throw new IllegalStateException(e.getMessage(), e); } checkInterfaceAndMethods(interfaceClass, methods); } # 从系统变量中获取与接口名对应的属性值 String resolve = System.getProperty(interfaceName); String resolveFile = null; if (resolve == null || resolve.length() == 0) { # 直连提供者->服务多时可以使用属性dubbo.resolve.file指定映射文件 resolveFile = System.getProperty("dubbo.resolve.file"); if (resolveFile == null || resolveFile.length() == 0) { File userResolveFile = new File(new File(System.getProperty("user.home")), "dubbo-resolve.properties"); if (userResolveFile.exists()) { resolveFile = userResolveFile.getAbsolutePath(); } } if (resolveFile != null && resolveFile.length() > 0) { Properties properties = new Properties(); FileInputStream fis = null; try { fis = new FileInputStream(new File(resolveFile)); properties.load(fis); } catch (IOException e) { throw new IllegalStateException("Unload " + resolveFile + ", cause: " + e.getMessage(), e); } finally { try { if (null != fis) fis.close(); } catch (IOException e) { logger.warn(e.getMessage(), e); } } resolve = properties.getProperty(interfaceName); } } if (resolve != null && resolve.length() > 0) { url = resolve; if (logger.isWarnEnabled()) { if (resolveFile != null) { logger.warn("Using default dubbo resolve file " + resolveFile + " replace " + interfaceName + "" + resolve + " to p2p invoke remote service."); } else { logger.warn("Using -D" + interfaceName + "=" + resolve + " to p2p invoke remote service."); } } } if (consumer != null) { if (application == null) { application = consumer.getApplication(); } if (module == null) { module = consumer.getModule(); } if (registries == null) { registries = consumer.getRegistries(); } if (monitor == null) { monitor = consumer.getMonitor(); } } if (module != null) { if (registries == null) { registries = module.getRegistries(); } if (monitor == null) { monitor = module.getMonitor(); } } if (application != null) { if (registries == null) { registries = application.getRegistries(); } if (monitor == null) { monitor = application.getMonitor(); } } checkApplication(); # 检测 Application 合法性 checkStubAndMock(interfaceClass); #检测 本地存根 合法性 Map<String, String> map = new HashMap<String, String>(); resolveAsyncInterface(interfaceClass, map); Map<Object, Object> attributes = new HashMap<Object, Object>(); map.put(Constants.SIDE_KEY, Constants.CONSUMER_SIDE); map.put(Constants.DUBBO_VERSION_KEY, Version.getProtocolVersion()); map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis())); if (ConfigUtils.getPid() > 0) { map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid())); }# 非泛化服务 if (!isGeneric()) { String revision = Version.getVersion(interfaceClass, version); if (revision != null && revision.length() > 0) { map.put("revision", revision); } #获取接口方法列表,并添加到 map 中 Wrapper.getWrapper是一个包装类 String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames(); if (methods.length == 0) { logger.warn("NO method found in service interface " + interfaceClass.getName()); map.put("methods", Constants.ANY_VALUE); } else { map.put("methods", StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ",")); } } map.put(Constants.INTERFACE_KEY, interfaceName); # 将 ApplicationConfig、ConsumerConfig、ReferenceConfig 等对象的字段信息添加到 map 中 appendParameters(map, application); appendParameters(map, module); appendParameters(map, consumer, Constants.DEFAULT_KEY); appendParameters(map, this); String prefix = StringUtils.getServiceKey(map); if (methods != null && !methods.isEmpty()) { for (MethodConfig method : methods) { appendParameters(map, method, method.getName()); String retryKey = method.getName() + ".retry"; if (map.containsKey(retryKey)) { String retryValue = map.remove(retryKey); if ("false".equals(retryValue)) { # 添加重试次数配置 methodName.retries map.put(method.getName() + ".retries", "0"); } } #添加 MethodConfig 中的“属性”字段到 attributes # 比如 onreturn、onthrow、oninvoke 等 appendAttributes(attributes, method, prefix + "." + method.getName()); checkAndConvertImplicitConfig(method, map, attributes); } } # 获取服务消费者 ip 地址 String hostToRegistry = ConfigUtils.getSystemProperty(Constants.DUBBO_IP_TO_REGISTRY); if (hostToRegistry == null || hostToRegistry.length() == 0) { hostToRegistry = NetUtils.getLocalHost(); // 默认ip 地址=本机 } else if (isInvalidLocalHost(hostToRegistry)) { throw new IllegalArgumentException("Specified invalid registry ip from property:" + Constants.DUBBO_IP_TO_REGISTRY + ", value:" + hostToRegistry); } map.put(Constants.REGISTER_IP_KEY, hostToRegistry); #attributes are stored by system context. StaticContext.getSystemContext().putAll(attributes); # 存储 attributes 到系统上下文中 ref = createProxy(map); # 根据服务名,ReferenceConfig,代理类构建 ConsumerModel, # 并将 ConsumerModel 存入到 ApplicationModel 中 ConsumerModel consumerModel = new ConsumerModel(getUniqueServiceName(), this, ref, interfaceClass.getMethods()); ApplicationModel.initConsumerModel(getUniqueServiceName(), consumerModel); }

上文中ref主要是由createProxy创建的一个代理类,在init()方法中,createProxy接受了一个封装了我们各种配置信息的map键值对,我们主要看一下createProxy()方法解析服务消费者ip,以及怎么样调用 createProxy 创建代理对象

  private T createProxy(Map<String, String> map) {
      # 注意!!! 这里的URL是org.apache.dubbo.common包下的
      URL tmpUrl = new URL("temp", "localhost", 0, map);
      final boolean isJvmRefer;
      if (isInjvm() == null) {
          #如果指定了url,不要做本地引用
          if (url != null && url.length() > 0) { # if a url is specified, don't do local reference
              isJvmRefer = false;
              # 根据tepUrl判断是否需要本地引用
          } else if (InjvmProtocol.getInjvmProtocol().isInjvmRefer(tmpUrl)) {
              # by default, reference local service if there is
              isJvmRefer = true;
          } else {
              isJvmRefer = false;
          }
      } else {
          isJvmRefer = isInjvm().booleanValue();
      }
      # 本地引用
      if (isJvmRefer) {
          # 创建服务引用 URL 对象
          URL url = new URL(Constants.LOCAL_PROTOCOL, NetUtils.LOCALHOST, 0, interfaceClass.getName()).addParameters(map);
          # 调用 refer 方法构建 InjvmInvoker 实例
          invoker = refprotocol.refer(interfaceClass, url);
          if (logger.isInfoEnabled()) {
              logger.info("Using injvm service " + interfaceClass.getName());
          }
      } else {
          # url 不为空,表明用户可能想进行点对点调用
          if (url != null && url.length() > 0) { # user specified URL, could be peer-to-peer address, or register center's address.
              String[] us = Constants.SEMICOLON_SPLIT_PATTERN.split(url);
              #   当需要配置多个 url 时,用;进行分割
              if (us != null && us.length > 0) {
                  for (String u : us) {
                      URL url = URL.valueOf(u);
                      if (url.getPath() == null || url.getPath().length() == 0) {
                          url = url.setPath(interfaceName);
                      }
                      if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
                          urls.add(url.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
                      } else {
                          urls.add(ClusterUtils.mergeUrl(url, map));
                      }
                  }
              }
          } else { #从注册中心中加载Url
              List<URL> us = loadRegistries(false);
              if (us != null && !us.isEmpty()) {
                  for (URL u : us) {
                      URL monitorUrl = loadMonitor(u);
                      if (monitorUrl != null) {
                          map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
                      }
                      urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
                  }
              }
              if (urls.isEmpty()) {
                  throw new IllegalStateException("No such any registry to reference " + interfaceName + " on the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", please config <dubbo:registry address=\"...\" /> to your spring config.");
              }
          }
          #  单个注册中心或服务提供者(服务直连,下同)
          if (urls.size() == 1) {
              invoker = refprotocol.refer(interfaceClass, urls.get(0));
          } else { # 多个注册中心或多个服务提供者,或者两者混合
              List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
              URL registryURL = null;
              for (URL url : urls) {
                  invokers.add(refprotocol.refer(interfaceClass, url));
                  if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
                      registryURL = url; # use last registry url
                  }
              }
              if (registryURL != null) { # registry url is available
                  # use AvailableCluster only when register's cluster is available
                  URL u = registryURL.addParameter(Constants.CLUSTER_KEY, AvailableCluster.NAME);
                  invoker = cluster.join(new StaticDirectory(u, invokers));
              } else { # not a registry url
                  invoker = cluster.join(new StaticDirectory(invokers));
              }
          }
      }

      Boolean c = check;
      if (c == null && consumer != null) {
          c = consumer.isCheck();
      }
      if (c == null) {
          c = true; # default true
      }
      if (c && !invoker.isAvailable()) {
          # make it possible for consumer to retry later if provider is temporarily unavailable
          initialized = false;
          throw new IllegalStateException("Failed to check the status of the service " + interfaceName + ". No provider available for the service " + (group == null ? "" : group + "/") + interfaceName + (version == null ? "" : ":" + version) + " from the url " + invoker.getUrl() + " to the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion());
      }
      if (logger.isInfoEnabled()) {
          logger.info("Refer dubbo service " + interfaceClass.getName() + " from url " + invoker.getUrl());
      }
      # create service proxy    生成代理类 调用的是AbstractProxyFactory.getProxy
      return (T) proxyFactory.getProxy(invoker);
  }

96_2.png

Invoker 的构建过程以及代理类的过程

public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
    # 序列化
    optimizeSerialization(url);
    # 创建 DubboInvoker
    DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
    invokers.add(invoker);
    return invoker;
}

上述代码中的getClients()方法用于获取客户端的实例,实例类型为 ExchangeClient,ExchangeClient默认会调用NettyClient

# DubboProtocol.java
private ExchangeClient[] getClients(URL url) {
    # 是否共享连接
    boolean service_share_connect = false;
    # 获取连接数,默认为0,表示未配置
    int connections = url.getParameter(Constants.CONNECTIONS_KEY, 0);
    # 如果未配置 connections,则共享连接
    if (connections == 0) {
        service_share_connect = true;
        connections = 1;
    }

    ExchangeClient[] clients = new ExchangeClient[connections];
    for (int i = 0; i < clients.length; i++) {
        if (service_share_connect) {
            # 获取共享客户端
            clients[i] = getSharedClient(url);
        } else {
            # 初始化新的客户端
            clients[i] = initClient(url);
        }
    }
    return clients;
}

默认情况下,使用共享客户端实例。getSharedClient()也会调用 initClient 方法

# DubboProtocol.java
  private ExchangeClient getSharedClient(URL url) {
        # key = 调用地址
        String key = url.getAddress(); 
        ReferenceCountExchangeClient client = referenceClientMap.get(key);
        if (client != null) {
            if (!client.isClosed()) {
                # 增加引用计数
                client.incrementAndGetCount();
                return client;
            } else {
                referenceClientMap.remove(key);
            }
        } 
        # putIfAbsent 如果传入key对应的value已经存在,就返回存在的value,不进行替换。如果不存在,就添加key和value,返回null 
        locks.putIfAbsent(key, new Object());
        synchronized (locks.get(key)) {
            if (referenceClientMap.containsKey(key)) {
                return referenceClientMap.get(key);
            }
            # 创建 ExchangeClient 客户端
            ExchangeClient exchangeClient = initClient(url);
            # 装饰模式
            client = new ReferenceCountExchangeClient(exchangeClient, ghostClientMap);
            referenceClientMap.put(key, client);
            ghostClientMap.remove(key);
            locks.remove(key);
            return client;
        }
    }

先访问缓存,若缓存未命中,则通过 initClient 方法创建新的 ExchangeClient 实例,并将该实例传给 ReferenceCountExchangeClient 构造方法创建一个带有引用计数功能的 ExchangeClient 实例,我们看一下initClient(url)

# DubboProtocol.java
private ExchangeClient initClient(URL url) {

    // 获取客户端类型,默认为 netty
    String str = url.getParameter(Constants.CLIENT_KEY, url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_CLIENT));

    // 添加编解码和心跳包参数到 url 中
    url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);
    url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));

    // 检测客户端类型是否存在,不存在则抛出异常
    if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
        throw new RpcException("Unsupported client type: ...");
    }

    ExchangeClient client;
    try {
        // 获取 lazy 配置,并根据配置值决定创建的客户端类型
        if (url.getParameter(Constants.LAZY_CONNECT_KEY, false)) {
            // 创建懒加载 ExchangeClient 实例
            client = new LazyConnectExchangeClient(url, requestHandler);
        } else {
            // 创建普通 ExchangeClient 实例
            client = Exchangers.connect(url, requestHandler);
        }
    } catch (RemotingException e) {
        throw new RpcException("Fail to create remoting client for service...");
    }
    return client;
}

# Exchangers
 public static ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
        if (url == null) {
            throw new IllegalArgumentException("url == null");
        }
        if (handler == null) {
            throw new IllegalArgumentException("handler == null");
        }
        url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
        # 获取 Exchanger 实例,默认为 HeaderExchangeClient
        return getExchanger(url).connect(url, handler);
    }

主要方法getExchanger(url).connect(url, handler);

# HeaderExchanger.java
@Override
    #  1. 创建 HeaderExchangeHandler 对象 -> new HeaderExchangeHandler(handler)
    #  2. 创建 DecodeHandler 对象 ->  new DecodeHandler()
    #  3. 通过 Transporters 构建 Client 实例 -> Transporters.connect()
    #  4. 创建 HeaderExchangeClient 对象 ->  new HeaderExchangeClient()
    public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
        return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
    }

PS:这里实现了Exchanger

# Exchanger.java
 /**
     * connect.
     *
     * @param url
     * @param handler
     * @return message channel
     */
    @Adaptive({Constants.EXCHANGER_KEY})
    ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException;

Dubbo提供了一种SPI的机制用于动态的加载扩展类,Adaptive机制在运行时动态的选用扩展类来提供服务,Adaptive注解可以用于接口的某个子类上,也可以用于接口方法上。如果用在接口的子类上,则表示Adaptive机制的实现会按照该子类的方式进行自定义实现;如果用在方法上,则表示Dubbo会为该接口自动生成一个子类,并且按照一定的格式重写该方法,而其余没有标注@Adaptive注解的方法将会默认抛出异常

# Transporters 的 connect 方法
public static Client connect(URL url, ChannelHandler... handlers) throws RemotingException {
    if (url == null) {
        throw new IllegalArgumentException("url == null");
    }
    ChannelHandler handler;
    if (handlers == null || handlers.length == 0) {
        handler = new ChannelHandlerAdapter();
    } else if (handlers.length == 1) {
        handler = handlers[0];
    } else {
        // 如果 handler 数量大于1,则创建一个 ChannelHandler 分发器
        handler = new ChannelHandlerDispatcher(handlers);
    }

    // 获取 Transporter 自适应拓展类 默认加载 NettyTransporter,并调用 connect 方法生成 Client 实例
    return getTransporter().connect(url, handler);
}

# getTransporter().connect(url, handler);
public Client connect(URL url, ChannelHandler listener) throws RemotingException {
    // 创建 NettyClient 对象
    return new NettyClient(url, listener);
}

文章永久链接:https://tech.souyunku.com/19684

未经允许不得转载:搜云库技术团队 » Dubbo服务引入

JetBrains 全家桶,激活、破解、教程

提供 JetBrains 全家桶激活码、注册码、破解补丁下载及详细激活教程,支持 IntelliJ IDEA、PyCharm、WebStorm 等工具的永久激活。无论是破解教程,还是最新激活码,均可免费获得,帮助开发者解决常见激活问题,确保轻松破解并快速使用 JetBrains 软件。获取免费的破解补丁和激活码,快速解决激活难题,全面覆盖 2024/2025 版本!

联系我们联系我们