专注于 JetBrains IDEA 全家桶,永久激活,教程
持续更新 PyCharm,IDEA,WebStorm,PhpStorm,DataGrip,RubyMine,CLion,AppCode 永久激活教程

Dubbo从实战->源码分析

一、 项目架构演变过程

单体架构->垂直架构(业务拆分)->分布式架构(SOA 松耦合)->微服务架构

二、 Dubbo架构与实战

1. Dubbo 架构概述

1.1 概述

Apache Dubbo是一款高性能的Java RPC框架。其前身是阿里巴巴公司开源的一个高性能、轻量级的开 源Java RPC框架,可以和Spring框架无缝集成。

1.2 特性

面向接口的远程方法调用,智能容错和负载均衡,以及服务自动注册和发现。

1.3 服务治理

包括最佳实践、架构原则、治理规程、规律以及其他决定性的因素。服务治理指的是用来管理SOA的采用和实现的过程。

2. 处理流程

86_1.png

3. 服务注册中心Zookeeper

Dubbo官方推荐使用Zookeeper作为服务注册中心。Zookeeper 是 Apache Hadoop 的子项目,作为 Dubbo 服 务的注册中心,工业强度较高,可用于生产环境,并推荐使用 。

4. Dubbo开发实战

使用方面直接官方文档吧,个人写的再好,也没有原创的好… 官方地址-> dubbo.apache.org/zh-cn/index…

总结下来小demo就是配置三个模块:客户端、服务端、公用api接口,剩下的配置好zk,可以完美和Spring结合,使用及其方便。

5. Dubbo管理控制台 dubbo-admin

1. 作用

主要包含:服务管理 、 路由规则、动态配置、服务降级、访问控制、权重调整、负载均衡等管理功能,2.6以后可以是jar包可以直接通过java命令运行。不需要配置tomcat。

2. 控制台安装步骤

官方配置走起-> dubbo.apache.org/zh-cn/docs/…

6. Dubbo配置项说明

6.1. dubbo:application

对应 org.apache.dubbo.config.ApplicationConfig, 代表当前应用的信息

1、 name: 当前应用程序的名称,在dubbo-admin中我们也可以看到。请求时也会根据name来进行请求。
2、 owner: 当前应用程序的负责人,可以通过这个负责人找到其相关的应用列表,用于快速定位到责 任人。
3、 qosEnable : 是否启动QoS 默认true
4、 qosPort : 启动QoS绑定的端口 默认22222
5、 qosAcceptForeignIp: 是否允许远程访问 默认是false

6.2. dubbo:registry

org.apache.dubbo.config.RegistryConfig, 代表该模块所使用的注册中心。可注册一个也可注册多个。

1、 id : 当服务中provider或者consumer中存在多个注册中心时,则使用需要增加该配置。在一 些公司,会通过业务线的不同选择不同的注册中心
2、 address : 当前注册中心的访问地址。
3、 protocol : 当前注册中心所使用的协议是什么。也可以直接在address 中写入,比如使用 zookeeper,就可以写成zookeeper://xx.xx.xx.xx:2181
4、 timeout : 当与注册中心不再同一个机房时,大多会把该参数延长。

6.3. dubbo:protocol

org.apache.dubbo.config.ProtocolConfig, 指定服务在进行数据传输所使用的协议。

1、 id : 在大公司,可能因为各个部门技术栈不同,所以可能会选择使用不同的协议进行交互。这里 在多个协议使用时,需要指定。
2、 name : 指定协议名称。默认使用dubbo 。

6.4. dubbo:service

org.apache.dubbo.config.ServiceConfig, 用于指定当前需要对外暴露的服务信息

1、 interface : 指定当前需要进行对外暴露的接口是什么。
2、 ref : 具体实现对象的引用,一般我们在生产级别都是使用Spring去进行Bean托管的,所以这里面 一般也指的是Spring中的BeanId。
3、 version : 对外暴露的版本号。不同的版本号,消费者在消费的时候只会根据固定的版本号进行消 费。

6.5. dubbo:reference

org.apache.dubbo.config.ReferenceConfig, 消费者的配置

1、 id : 指定该Bean在注册到Spring中的id。
2、 interface: 服务接口名
3、 version : 指定当前服务版本,与服务提供者的版本一致。
4、 registry : 指定所具体使用的注册中心地址。这里面也就是使用上面在dubbo:registry 中所声 明的id。

6.6. dubbo:method

org.apache.dubbo.config.MethodConfig, 用于在制定的dubbo:service 或者dubbo:reference 中的 更具体一个层级,指定具体方法级别在进行RPC操作时候的配置,可以理解为对这上面层级中的配置针 对于具体方法的特殊处理。

1、 name : 指定方法名称,用于对这个方法名称的RPC调用进行特殊配置。
2、 async: 是否异步 默认false

6.7. dubbo:service和dubbo:reference详解

1、 mock: 用于在方法调用出现错误时,当做服务降级来统一对外返回结果。
2、 timeout: 用于指定当前方法或者接口中所有方法的超时时间。比如我们在进行第三方服务依赖时可能会对接口的时长做放宽,防止第三方服务不稳定。
3、 check: 用于在启动时,检查生产者是否有该服务。我们一般都会将这个值设置为false,不让其进 行检查。因为如果出现模块之间循环引用的话,那么则可能会出现相互依赖,都进行check的话,那么这两个服务永远也启动不起来。
4、 retries: 用于指定当前服务在执行时出现错误或者超时时的重试机制。

1.  **提供者必须幂等**,否则可能出现数据一致性问题
2.  **提供者是否有类似缓存机制**,如出现大面积错误时,可能因为不停重试导致雪崩

5、 executes: 用于在提供者做配置,来确保最大的并行度。

1.  可能导致集群功能无法充分利用或者堵塞
2.  但是也可以启动部分对应用的保护功能
3.  可以不做配置,结合后面的熔断限流使用

所有配置最终都将转换为 URL(protocol://username:password@host:port/path?key=value&key=value)表示,并由服务提供方生成,经注册中心传递给消费方。

三、Dubbo高级实战

1. SPI(面试常客)

1.1 SPI简介

SPI 全称为 (Service Provider Interface) ,是JDK内置的一种服务提供发现机制。 目前有不少框架用它 来做服务的扩展发现(springboot),简单来说,它就是一种动态替换发现的机制。使用SPI机制的优势是实现解耦, 使得第三方服务模块的装配控制逻辑与调用者的业务代码分离。

1.2 JDK中的SPI

86_2.png

SPI遵循如下约定:

  • 当服务提供者提供了接口的一种具体实现后,在META-INF/services目录下创建一个以“接口全 限定名”为命名的文件,内容为实现类的全限定名;
  • 接口实现类所在的jar包放在主程序的classpath中;
  • 主程序通过java.util.ServiceLoader动态装载实现模块,它通过扫描META-INF/services目录下 的配置文件找到实现类的全限定名,把类加载到JVM;
  • SPI的实现类必须携带一个无参构造方法;

1.3 Dubbo中的SPI

dubbo中大量的使用了SPI来作为扩展点,通过实现同一接口的前提下,可以进行定制自己的实现类。 比如比较常见的协议,负载均衡,都可以通过SPI的方式进行定制化,自己扩展。Dubbo中已经存在的 所有已经实现好的扩展点。详见dubbo-2.7.3.jar!/META-INF/dubbo/internal

1.4 Dubbo中扩展点使用方式

我们使用三个项目来演示Dubbo中扩展点的使用方式,一个主项目main,一个服务接口项目api,一个服务实现项目impl。

  • 创建接口
@SPI("human")
public interface HelloService {
    String  sayHello();
    @Adaptive
    String  sayHello(URL url);
}

  • 创建实现类
public class DogHelloService implements HelloService{
    @Override
    public String sayHello() {
        return "wang wang";
    }

    @Override
    public String sayHello(URL url) {
        return "wang url";
    }
}

  • 创建META-INF(key=value(实现类))
human=com.zjn.service.impl.HumanHelloService
dog=com.zjn.service.impl.DogHelloService

  • Main方法走起来
public class DubboSpiMain {
    public static void main(String[] args) {
        // 获取扩展加载器
        ExtensionLoader<HelloService>  extensionLoader  = ExtensionLoader.getExtensionLoader(HelloService.class);
        // 遍历所有的支持的扩展点 META-INF.META-INF
        Set<String> extensions = extensionLoader.getSupportedExtensions();
        for (String extension : extensions){
            String result = extensionLoader.getExtension(extension).sayHello();
            System.out.println(result);
        }

    }
}

  • dubbo自己做SPI的目的
      1. JDK 标准的SPI会一次性实例化扩展点所有实现,如果有扩展实现初始化很耗时,但如果没用上也加载,会很浪费资源
      2. 如果有扩展点加载失败,则所有扩展点无法使用
      3. 提供了对扩展点包装的功能(Adaptive),并且还支持通过set的方式对其他的扩展点进行注入

1.5 Dubbo SPI中的Adaptive功能

Dubbo中的Adaptive功能,主要解决的问题是如何动态的选择具体的扩展点。通过URL的方式对扩展点来进行动态选择。(dubbo中所有的注册信息都是通过URL的形式进行处理的。)

  • 接口在需要扩展的方法上加@Adaptive,案例见上面创建接口
  • 创建Main方法
public class DubboAdaptiveMain {
    public static void main(String[] args) {
        URL url = URL.valueOf("test://localhost/hello?hello.service=dog");
        final HelloService adaptiveExtension=ExtensionLoader.getExtensionLoader(HelloService.class).getAdaptiveExtension();
        adaptiveExtension.sayHello(url);
    }
}

  • 通过getAdaptiveExtension来提供一个统一的类来对所有的扩展点提供支持(底层对所有的扩展点进行封装)。
  • 调用时通过参数中增加URL 对象来实现动态的扩展点使用。
  • 如果URL没有提供该参数,则该方法会使用默认在SPI 注解中声明的实现。

1.6 Dubbo调用时拦截操作

提供了非常方便的扩展性,比如为dubbo接口实现ip白名单功能、监控功能、日志记录等。demo代码如下:

  • 实现org.apache.dubbo.rpc.Filter 接口
  • 使用org.apache.dubbo.common.extension.Activate 接口进行对类进行注册 通过group 可以指定生产端 消费端
@Activate(group = {CommonConstants.CONSUMER)

  • 在META-INF.dubbo 中新建org.apache.dubbo.rpc.Filter 文件,并将当前类的全名写入
timerFilter=包名.过滤器的名字

2 负载均衡策略

直接官方吧,很详细: dubbo.apache.org/zh-cn/docs/…

2.1 负载均衡基本配置

配置负载均衡策略,既可以在服务提供者一方配置,也可以在服务消费者一方配置,如下:

//在服务消费者一方配置负载均衡策略
@Reference(check = false,loadbalance = "random")

//在服务提供者一方配置负载均衡
@Service(loadbalance = "random")
public class HelloServiceImpl implements HelloService {
    public String sayHello(String name) {
        return "hello " + name;
    }
}

2.2 自定义负载均衡器

负载均衡器在Dubbo中的SPI接口是org.apache.dubbo.rpc.cluster.LoadBalance , 可以通过实现这个接口来实现自定义的负载均衡规则。

  • 创建负载均衡器OnlyFirstLoadbalancer
  • 配置负载均衡器:在dubbo-spi-loadbalance工程的META-INF/dubbo 目录下新建 org.apache.dubbo.rpc.cluster.LoadBalance 文件,并将当前类的全名写入
onlyFirst=包名.负载均衡器

保证注册相同的注册中心。其他配置与过滤器配置相同

3 异步调用

主要应用于提供者接口响应耗时明显,消费者端可以利用调用接口的时间去做一些其他的接口调用,利用Future模式来异步等待和获取结果即可。这种方式可以大大的提升消费者端的利用率。 目前这种方式可以通过XML的方式进行引入。

3.1 异步调用实现

  • 修改接口
String sayHello(String name, int timeToWait);

  • 服务端接口实现sleep 为了模拟调用耗时 可以让线程等待一段时间
  • 在消费者端,配置异步调用,配置如果提供端耗时大于1000毫秒超时
<dubbo:reference id="helloService" interface="com.zjn.service.HelloService">
<dubbo:method name="sayHello" async="true"/></dubbo:reference>

  • 消费者端通过RpcContext.getContext().getFuture()来进行获取Future对象来进行后续的结果等待操作。

3.2 异步调用特殊说明

注意:该方式的使用,请确保dubbo的版本在2.5.4及以后的版本使用。 原因在于在2.5.3及之前的版本使用的时候,会出现异步状态传递问题。 比如我们的服务调用关系是A->B->C,这时候如果A向B发起了异步请求,在错误的版本时,B向C发起的请求也会连带的产生异步请求。这是因为在底层实现层面,他是通过RPCContext 中的attachment实现的。在A向B发起异步请求时,会在attachment中增加一个异步标示字段来表明异步等待结果。B在接受到A中的请求时,会通过该字段来判断是否是异步处理。但是由于值传递问题,B向C发起时同样会将该值进行传递,导致C误以为需要异步结果,导致返回空。这个问题在2.5.4及以后的版本进行了修正。

4 线程池

4.1 Dubbo已有线程池

dubbo在使用时,都是通过创建真实的业务线程池进行操作的。

  • fix: 表示创建固定大小的线程池。也是Dubbo默认的使用方式,默认创建的执行线程数为200,并且是没有任何等待队列的。所以再极端的情况下可能会存在问题,比如某个操作大量执行时,可能存在堵塞的情况。
  • cache: 创建非固定大小的线程池,当线程不足时,会自动创建新的线程。但是使用这种的时候需要注意,如果突然有高TPS的请求过来,方法没有及时完成,则会造成大量的线程创建,对系统的CPU和负载都是压力,执行越多反而会拖慢整个系统。

4.2 自定义线程池

特殊业务使用fix模式,可能会因为线程不足产生错误。这是无感知的,所以要做一个线程监控,这样就可以进行及时的扩缩容机器或者告警。如下可以自定义线程池:

1、 线程池实现, 这里主要是基于对FixedThreadPool 中的实现做扩展出线程监控的部分,注意要写构造方法,上代码

package com.zjn.threadpool;

import org.apache.dubbo.common.URL;

import org.apache.dubbo.common.threadpool.support.fixed.FixedThreadPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
import java.util.concurrent.*;

/**
 * @author: 190coder <190coder.cn>
 * @description:
 * @create: 2020-07-13 16:31
 */
public class WachingThreadPool extends FixedThreadPool implements Runnable {
    private  static  final Logger LOGGER = LoggerFactory.getLogger(WachingThreadPool.class);
    // 定义线程池使用的阀值
    private  static  final  double  ALARM_PERCENT = 0.90;
    private  final Map<URL, ThreadPoolExecutor> THREAD_POOLS = new ConcurrentHashMap<>();
    public  WachingThreadPool(){
        // 每隔3秒打印线程使用情况
        Executors.newSingleThreadScheduledExecutor().scheduleWithFixedDelay(this,1,3, TimeUnit.SECONDS);
    }

    @Override
    public Executor getExecutor(URL url) {
        final  Executor executor = super.getExecutor(url);
        if(executor instanceof ThreadPoolExecutor){
            THREAD_POOLS.put(url,(ThreadPoolExecutor)executor);
        }
        return  executor;
    }

    @Override
    public void run() {

        // 遍历线程池
        for (Map.Entry<URL,ThreadPoolExecutor> entry: THREAD_POOLS.entrySet()){
            final   URL  url = entry.getKey();
            final   ThreadPoolExecutor  executor = entry.getValue();
            // 计算相关指标
            final  int  activeCount  = executor.getActiveCount();
            final  int  poolSize = executor.getCorePoolSize();
            double  usedPercent = activeCount / (poolSize*1.0);
            LOGGER.info("线程池执行状态:[{}/{}:{}%]",activeCount,poolSize,usedPercent*100);
            if (usedPercent > ALARM_PERCENT){
                LOGGER.error("超出警戒线! host:{} 当前使用率是:{},URL:{}",url.getIp(),usedPercent*100,url);
            }

        }

    }
}

1、 SPI声明,创建文件META-INF/dubbo/org.apache.dubbo.common.threadpool.ThreadPool

watching=com.zjn.threadpool.WachingThreadPool // 包名.线程池名

1、 服务提供方项目引入该依赖,设置使用该线程池生成器

dubbo.provider.threadpool=watching

5. 路由规则

路由是决定一次请求中需要发往目标机器的重要判断,通过对其控制可以决定请求的目标机器。我们可以通过创建这样的规则来决定一个请求会交给哪些服务器去处理。

5.1 快速入门

(1)提供两个提供者(一台本机作为提供者,一台为其他的服务器),每个提供者会在调用时可以返回不同的信息 以区分提供者。

(2)针对于消费者,我们这里通过一个死循环,每次等待用户输入,再进行调用,来模拟真实的请求情况。通过调用的返回值 确认具体的提供者。

(3)我们通过ipconfig来查询到我们的IP地址,并且单独启动一个客户端,来进行如下配置(这里假设我们希望隔离掉本机的请求,都发送到另外一台机器上)。

如果无集群,可只启一个客户端和服务端作为测试,执行一次,可根据规则禁止同一zk内的服务。设置规则如下:

import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.extension.ExtensionLoader;
import org.apache.dubbo.registry.Registry;
import org.apache.dubbo.registry.RegistryFactory;

/**
 * @author: 190coder <190coder.cn>
 * @description:
 * @create: 2020-07-13 18:00
 */
public class DubboRouterMain {
    public static void main(String[] args) {
        RegistryFactory registryFactory =
                ExtensionLoader.getExtensionLoader(RegistryFactory.class).getAdaptiveExtension();

        Registry registry =
                registryFactory.getRegistry(URL.valueOf("zookeeper://127.0.0.1:2181"));
        registry.register(URL.valueOf("condition://0.0.0.0/com.zjn.service.HelloService?category=routers&force=true&dynamic=true&rule=" + URL.encode("=> host != 你的机器ip不能是127.0.0.1")));
    }
}

5.2 路由规则详解

通过上面的程序,我们实际本质上就是通过在zookeeper中保存一个节点数据,来记录路由规则。消费 者会通过监听这个服务的路径,来感知整个服务的路由规则配置,然后进行适配。 参数详解官方文档走起 -> dubbo.apache.org/zh-cn/docs/…

5.3 路由与上线系统结合

实战场景:一个dubbo的提供者要准备进行上线,一般都提供多台提供者来同时在线上提供服务。这时候一个请求刚到达一个提供者,提供者却进行了关闭操作。那么此次请求就应该认定为失败了。所以基于这样的场景,我们可以通过路由的规则,把预发布(灰度)的机器进行从机器列表中移除。并且等待一定的时间,让其把现有的请求处理完成之后再进行关闭服务。同时,在启动时,同样需要等待一定的时间,以免因为尚未重启结束,就已经注册上去。等启动到达一定时间之 后,再进行开启流量操作。

实现主体思路 :

1.利用zookeeper的路径感知能力,在服务准备进行重启之前将当前机器的IP地址和应用名写入zookeeper。
2.服务消费者监听该目录,读取其中需要进行关闭的应用名和机器IP列表并且保存到内存中。
3.当前请求过来时,判断是否是请求该应用,如果是请求重启应用,则将该提供者从服务列表中移除。

具体操作流程:

1、 引入Curator 框架,操作Zookeeper
2、 编写需要进行预发布的路径管理器,用于缓存和监听所有的待灰度机器信息列表。
3、 编写路由类(实现org.apache.dubbo.rpc.cluster.Router ),主要目的在于对 ReadyRestartInstances中的数据进行处理,并且移除路由调用列表中正在重启中的服务。
4、 由于Router 机制比较特殊,所以需要利用一个专门的RouterFactory来生成,原因在于并不是所有的都需要添加路由,所以需要利用@Activate 来锁定具体哪些服务才需要生成使用。

    @Activate
    public class RestartingInstanceRouterFactory implements RouterFactory {
        @Override
            public Router getRouter(URL url) {
            return new RestartingInstanceRouter(url); // 自定义router实现类
        }
    }

1、 对RouterFactory 进行注册,同样放入到 META-INF/dubbo/org.apache.dubbo.rpc.cluster.RouterFactory

restartInstances=com.lagou.router.RestartingInstanceRouterFactory

1、 完成对zk节点的操作,使得节点列表变更

6. 服务动态降级

6.1 什么是服务降级

当服务器压力剧增的情况下,根据当前业务情况及流量对一些服务有策略的降低服务级别,以释放服务器资源,保证核心任务的正常运行。

6.2 为什么要服务降级

防止分布式服务发生雪崩效应,什么是雪崩?就是蝴蝶效应,当一个请求发生超时,一直等待着服务响应,那么在高并发情况下,很多请求都是因为这样一直等着响应,直到 服务资源耗尽产生宕机,而宕机之后会导致分布式其他服务调用该宕机的服务也会出现资源耗尽宕机,这样下去将导致整个分布式服务都瘫痪,这就是雪崩。

6.3 dubbo 服务降级实现

  • 在 dubbo 管理控制台配置服务降级
    • mock=force:return+null 表示消费方对该服务的方法调用都直接返回 null 值,不发起远程调用。用来屏蔽不重要服务不可用时对调用方的影响。
    • mock=fail:return+null 表示消费方对该服务的方法调用在失败后,再返回 null值,不抛异常。用来容忍不重要服务不稳定时对调用方的影响。

86_3.png

  • 配置xml 指定返回简单值或者null
    <dubbo:reference id="xxService" check="false" interface="com.xx.XxService" timeout="3000" mock="return null" />
    <dubbo:reference id="xxService2" check="false" interface="com.xx.XxService2" timeout="3000" mock="return 1234" />

  • 注解 :@Reference(mock=”return null”) @Reference(mock=”return 简单值”) 也支持 @Reference(mock=”force:return null”)
  • 嵌入代码:
    RegistryFactory registryFactory = ExtensionLoader.getExtensionLoader(RegistryFactory.class).getAdaptiveExtension();
    Registry registry=registryFactory.getRegistry(URL.valueOf("zookeeper://IP:端口"));
    registry.register(URL.valueOf("override://0.0.0.0/com.foo.BarService?
    category=configurators&dynamic=false&application=foo&mock=force:return+null"));

四、 源码剖析

1. 源码下载&编译

下载地址-> github.com/apache/dubb… ,防止master不问题,可以切换 分支到 release 版本,下载本地执行 mvn install -DskipTests

2. 架构整体设计

2.1 调用关系说明

86_4.png

  • provider : 服务提供方
    • Protocol : 协议交互
    • Service :真实的服务接口实现
    • Container : Dubbo的运行环境
  • Consumer: 服务消费方
    • Protocol : 协议交互
    • Cluster :感知提供者端的列表信息
    • Proxy :理解成提供者的服务调用代理类由它接管Consumer中的接口调用逻辑
  • Registry: 注册中心,用的zk
  • Monitor: 提供者和消费者中的数据统计,比如调用频次,成功失败次数等信息。

启动和执行流程说明 :

  • provider启动,容器负责把Service信息加载,通过Protocol注册到 Registry
  • Consumer启动,通过监听提供者列表来感知提供者信息 并在提供者发生改变时 通过注册中心及时通知消费端
  • 消费方发起 请求 通过Proxy模块
  • 利用Cluster模块 来选择真实的要发送给的提供者信息
  • 交由Consumer中的Protocol 把信息发送给提供者
  • 提供者同样需要通过 Protocol 模块来处理消费者的信息
  • 最后由真正的服务提供者 Service 来进行处理

####2.2 整体的调用链路

86_5.png

  • 淡绿色 代表了服务生产者的范围
  • 淡蓝色 代表了服务消费者的范围
  • 红色箭头 代表了调用的方向

总结下简单流程:

1、 Consumer通过Interface进行方法调用交给消费端Proxy,使用jdk/javassist代理技术
2、 交给filter 使用SPI机制统一过滤请求
3、 invoker调用,去配置中心读取信息,获取所有Invoker

1.  通过Cluster,根据选择路由规则选取Invoker列表
2.  通过LoadBalance 负载均衡选择一个Invoker处理请求
3.  如果出错,开始重试机制

4、 继续通过filter功能选择具体执行协议
5、 客户端进行编码和序列化,发送数据
6、 数据到达Service,进行反编码和反序列化接收数据
7、 使用Exporter选择执行器(dubbo,rmi…)
8、 交给filter 进行一个提供者过滤,到达Invoker执行器
9、 通过Invoker 调用接口的具体实现 然后返回

2.3 源码整体设计

86_6.png

码整体设计与调用链路十分相似。只不过这里可以看到接口的一些具体实现以及左侧也有更为详细的层次划分, 如下:。

  • service 业务层 包括我们的业务代码 比如 接口 实现类 直接面向开发者 RPC层 远程过程调用层
  • config 配置层 对外提供配置 以ServiceConfig ReferenceConfig 为核心 可以直接初始化配置类 也可以解析配置文件生成
  • proxy 服务代理层 无论是生产者 还是消费者 框架都会产生一个代理类 整个过程对上层透明 就是业务层对远程调用无感
  • registry 注册中心层 封装服务地址的注册与发现 以服务的URL为中心
  • cluster 路由层 (集群容错层)提供了多个提供者的路由和负载均衡并且它桥接注册中心 以Invoker为核心
  • monitor 监控层 RPC调用相关的信息 如 调用次数 成功失败的情况 调用时间等 在这一层完成
  • protocol 远程调用层 封装RPC调用 无论是服务的暴露 还是 服务的引用 都是在Protocol中作为主功能入口负责Invoker的整个生命周期Dubbo中所有的模型都向Invoker靠拢
  • Remoting层 远程数据传输层
  • exchange 信息交换层 封装请求和响应的模式 如把请求由同步 转换成异步
  • transport 网络传输层 统一网络传输的接口 比如 netty 和 mina统一为一个网络传输接口
  • serialize 数据序列化层 负责管理整个框架中的数据传输的序列化 和反序列化

3. 服务注册与消费源码剖析

3.1 注册中心Zookeeper剖析

我们可以选择Redis、Nacos、Zookeeper作为Dubbo的注册中心,Dubbo推荐用户使用Zookeeper作为注册中心。

注册中心Zookeeper目录结构

假设有个最基本的demo为我们提供服务

public interface HelloService {
    String sayHello(String name);
}

则zk的目录结果如下:

+- dubbo
| +- com.lagou.service.HelloService
| | +- consumers
| | | +- consumer://192.168.1.102/com.lagou.service.HelloService?
application=dubbo-demo-annotationconsumer&
category=consumers&check=false&dubbo=2.0.2&init=false&interface=com.lag
ou.service.HelloService&methods=sayHello,sayHelloWithPrint,sayHelloWithTransmiss
ion,sayHelloWithException&pid=25923&release=2.7.5&side=consumer&sticky=false&tim
estamp=1583896043650
| | +- providers
| | | +- dubbo://192.168.1.102:20880/com.lagou.service.HelloService?
anyhost=true&application=dubbo-demo-annotationprovider&
deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=com.l
agou.service.HelloService&methods=sayHello,sayHelloWithPrint,sayHelloWithTransmi
ssion,sayHelloWithException&pid=25917&release=2.7.5&side=provider&telnet=clear,e
xit,help,status,log,ls,ps,cd,pwd,invoke,trace,count,select,shutdown&timestamp=15
83896023597
| | +- configuration
| | +- routers

结构很明显,都在dubbo节点下,全限定接口名作为服务,底下有4个配置

  • consumers:的消费者列表(URL)
  • providers:提供者列表(URL)
  • configuration:配置信息,provider或者consumer会通过读取这里的配 置信息来获取配置
  • routers:当消费者在进行获取提供者的时,会通过这里配置好的路由来进行适配匹配规则。

dubbo基本上很多时候都是通过URL的形式来进行交互获取数据的,在URL中也会保存 很多的信息。

86_7.png

如图所示:

  • 提供者会在providers 目录下进行自身的进行注册。
  • 消费者会在consumers 目录下进行自身注册,并且监听provider 目录,以此通过监听提供者增加或者减少,实现服务发现。
  • Monitor模块会对整个服务级别做监听,用来得知整体的服务情况。以此就能更多的对整体情况做 监控。

3.2 服务的注册过程分析

服务注册(暴露)过程

86_8.png

debug调试分析下具体流程:

1、 启动源码的provider-demo
2、 进入到org/apache/dubbo/config/ServiceConfig.java,服务配置
3、 通过ProxyFactory 调用接口实现类中的 getInvoker 方法使用 ref 生成一个 AbstractProxyInvoker 实例,到这一步就完成具体服务到 Invoker 的转化。接下来就是 Invoker 转换到 Exporter 的过程。
4、 最终调用 org.apache.dubbo.config.ServiceConfig#doExportUrlsFor1Protocol 把Invoker 转换成 Exporter
5、 服务配置组装完成后,通过org.apache.dubbo.registry.integration.RegistryProtocol#export 将我们需要执行的信息注册并且导出。
6、 调用AbstractRegistry的继承类org.apache.dubbo.registry.support.FailbackRegistry#register,像zk 完成create注册

详细说下Registry中的类目录结构 :

+- RegistryService
| +- Registry
| | +- AbstractRegistry
| | | +- FailbackRegistry
| | | | +- ZookeeperRegistry
| | | | +- NacosRegistry
| | | | +- ...

注册就是使用的FailbackRegistry#register,从名字中可以看出来,失败自动恢复,后台记录失败请求,调用addFailedRegistered内部通过 Timer定时器去定时重发功能

3.3 URL规则详解 和 服务本地缓存

3.3.1 URL规则详解

URL demo :

protocol://host:port/path?key=value&key=value // 模版

provider://192.168.20.1:20883/com.zjn.service.HelloService?
anyhost=true&application=serviceprovider2&
bind.ip=192.168.20.1&bind.port=20883&category=configurators&check=fals
e&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=com.zjn.service

详解URL组成:

  • protocol: 协议,一般像我们的provider 或者consumer 在这里都是人为具体的协议
  • host: 当前provider 或者其他协议所具体针对的地址,比较特殊的像override 协议所指定的
  • host就是0.0.0.0 代表所有的机器都生效
  • port: 和上面相同,代表所处理的端口号
  • path: 服务路径,在provider 或者consumer 等其他中代表着我们真实的业务接口
  • key=value: 这些则代表具体的参数,这里我们可以理解为对这个地址的配置。比如我们provider中需要具体机器的服务应用名,就可以是一个配置的方式设置上去。

注意:Dubbo中的URL与java中的URL是有一些区别的,如下:

  • 这里提供了针对于参数的parameter 的增加和减少(支持动态更改)
  • 提供缓存功能,对一些基础的数据做缓存.
3.3.2 服务本地缓存

上面说了会对基础数据进行缓存,dubbo调用者需要通过注册中心(例如:ZK)注册信息,获取提供者,但是如果频繁往从ZK获取信息,肯定会存在单点故障问题,所以dubbo提供了将提供者信息缓存在本地的方法。

  • Dubbo在订阅注册中心的回调处理逻辑当中会保存服务提供者信息到本地缓存文件当中(同步/异步两种方式),以URL纬度进行全量保存。
  • Dubbo在服务引用过程中会创建registry对象并加载本地缓存文件,会优先订阅注册中心,订阅注册中 心失败后会访问本地缓存文件内容获取服务提供信息。

保存缓存流程如下:

  • 通过 org.apache.dubbo.registry.support.AbstractRegistry#AbstractRegistry#loadProperties加载已存在配置文件
  • 在通过 notify 方法 调用saveProperties
  • doSaveProperties 进行保存文件,加锁 将配置的文件信息保存到文件中。执行出现错误时,则交给专门的线程去进行重试

3.4 Dubbo 消费过程分析

86_9.png

首先 ReferenceConfig 类的 init 方法调用 createProxy() ,期间 使用Protocol 调用 refer 方法生成 Invoker实例(如上图中的红色部分),这是服务消费的关键。接下来使用ProxyFactory把 Invoker转换为客户端需要的接口(如:HelloService)。

4. Dubbo扩展SPI源码剖析

SPI在之前都有使用过,其中最重要的类就是ExtensionLoader,它是所有Dubbo中SPI的入口。

getExtensionLoader 获取扩展点加载器 并加载所对应的所有的扩展点实现

getExtension 根据name 获取扩展的指定实现

4.1 getExtensionLoader 加载过程

  • org/apache/dubbo/common/extension/ExtensionLoader.java这里的构造函数 获取ExtensionFactory 对象:
private ExtensionLoader(Class<?> type) {
    this.type = type;
    // 这里需要对对象的工厂做额外的创建,可以看到扩展的工厂也是一个扩展点
    objectFactory = (type == ExtensionFactory.class ? null :
    ExtensionLoader.getExtensionLoader(ExtensionFactory.class).getAdaptiveExtension());
}

  • ExtensionFactory 通过传入扩展点类型和真正的名称来获取扩展的。这里就和我们SPI中的具体名称实现相挂钩。
@SPI
public interface ExtensionFactory {
    /**
    * Get extension.
    *
    * @param type object type.
    * @param name object name.
    * @return object instance.
    */
    <T> T getExtension(Class<T> type, String name);
}

  • dubbo-common/src/main/resources/METAINF/ dubbo/internal/org.apache.dubbo.common.extension.ExtensionFactory 中看到,他默认有三个实现的提供
spring=org.apache.dubbo.config.spring.extension.SpringExtensionFactory
adaptive=org.apache.dubbo.common.extension.factory.AdaptiveExtensionFactory
spi=org.apache.dubbo.common.extension.factory.SpiExtensionFactory

  • AdaptiveExtensionFactory实现类上有@Adaptive,优先最高的默认实现类。作用是代理其他的ExtensionFactory。其中比较重要的方法在于 getSupportedExtensions 方法,获取所有支持的扩展信息实现。
  • org.apache.dubbo.common.extension.ExtensionLoader#getSupportedExtensions 获取所有的扩展类信息,返回所有的扩展点名称
  • getExtensionClasses() 双重检查为空从进行加载信息 加载扩展类
private Map<String, Class<?>> getExtensionClasses() {
    // 从缓存中获取已加载的扩展类
    Map<String, Class<?>> classes = cachedClasses.get();
        // 双重检查
        if (classes == null) {
            // 为空的话,则锁住,标识只会被执行一次
            synchronized (cachedClasses) {
            classes = cachedClasses.get();
            if (classes == null) {
            // 进行加载信息 加载扩展类
            classes = loadExtensionClasses();
            cachedClasses.set(classes);
            }
        }
    }
    return classes;
}

  • 进入到loadExtensionClasses,主要做了两件事情。1: 加载当前SPI的默认实现。2: 加载这个类的所有扩展点实现,并且按照name和Class对象的形式存储
  • 之后就是各种load…从指定META-INF下几个子目录找并加载,最终到达loadClass类的实现,可以看到这里是最终进行完成类映射的地方。

当执行完这几个方法之后,会对一下几个字段进行更新:

  • cachedAdaptiveClass: 当前Extension类型对应的AdaptiveExtension类型(只能一个)
  • cachedWrapperClasses: 当前Extension类型对应的所有Wrapper实现类型(无顺序)
  • cachedActivates: 当前Extension实现自动激活实现缓存(map,无序)
  • cachedNames: 扩展点实现类对应的名称(如配置多个名称则值为第一个)

4.2 根据name获取扩展点的方法 getExtension

  • getExtension 方法实现。这里面同样主要作用是根据name对扩展点进行处理和进行加锁来创建真实的引用,其中都是有使用缓存来处理。可见真正的实例对象在getExtension
public T getExtension(String name, boolean wrap) {
        if (StringUtils.isEmpty(name)) {
            throw new IllegalArgumentException("Extension name == null");
        }
        if ("true".equals(name)) {
            return getDefaultExtension();
        }
        // 获取当前类的holder,实现原理和cachedClasses的方式相同,都是建立同一个引用后再进行加锁
        final Holder<Object> holder = getOrCreateHolder(name);
        Object instance = holder.get();
        // 双重检查
        if (instance == null) {
            synchronized (holder) {
                instance = holder.get();
                if (instance == null) {
                    instance = createExtension(name, wrap);
                    holder.set(instance);
                }
            }
        }
        return (T) instance;
    }

  • getOrCreateHolder 保证缓存
private Holder<Object> getOrCreateHolder(String name) {
    // 获取当前名称的和对象Holder的映射关系
    Holder<Object> holder = cachedInstances.get(name);
    if (holder == null) {
    // 如果不存在的话,则使用putIfAbsent的原子操作来设置值,这个值可以保证多线程的额情
    况下有值的时候不处理,没有值进行保存
    cachedInstances.putIfAbsent(name, new Holder<>());
    // 获取真实的holder处理器
    holder = cachedInstances.get(name);
    }
    return holder;
}

  • 进入到createExtension根据扩展的class名称来进行创建实例的类。这里也是创建扩展点类的主要实现。
private T createExtension(String name) {
    // 从配置文件中加载所有的扩展类 可以得到配置项名称 到配置类的映射关系
    Class<?> clazz = getExtensionClasses().get(name);
    if (clazz == null) {
         throw findException(name);
    }
    try {
        // 获取是否已经有实例了
        T instance = (T) EXTENSION_INSTANCES.get(clazz);
        if (instance == null) {
        // 没有的话,同样适用putIfAbsent的方式来保证只会创建一个对象并且保存
        EXTENSION_INSTANCES.putIfAbsent(clazz, clazz.newInstance());
        instance = (T) EXTENSION_INSTANCES.get(clazz);
        }
        // 注入其他扩展点的实体,用于扩展点和其他的扩展点相互打通
        injectExtension(instance);
        // 进行遍历所有的包装类信息,分别对包装的类进行包装实例化,并且返回自身引用
        Set<Class<?>> wrapperClasses = cachedWrapperClasses;
        if (CollectionUtils.isNotEmpty(wrapperClasses)) {
            for (Class<?> wrapperClass : wrapperClasses) {
            // 同样进行注册其他扩展点的功能
            (4) injectExtension 方法观察
            instance = injectExtension((T)
            wrapperClass.getConstructor(type).newInstance(instance));
            }
        }
        // 对扩展点进行初始化操作
        initExtension(instance);
        return instance;
    } catch (Throwable t) {
        throw new IllegalStateException("Extension instance (name: " + name + ",
        class: " +
        type + ") couldn't be instantiated: " + t.getMessage(), t);
    }
}

  • injectExtension 方法观察,检查类中的方法扩展点…
private T injectExtension(T instance) {
    if (objectFactory null) {
        return instance;
    }
    try {
        // 遍历其中的所有方法
        for (Method method : instance.getClass().getMethods()) {
            // 是否是set方法
            // 1. 以"set"开头
            // 2. 参数长度为1
            // 3. 是公开的方法
            if (!isSetter(method)) {
                continue;
            }
            /**
            * Check {@link DisableInject} to see if we need auto injection for
            this property
            */
            // 如果设置了取消注册,则不进行处理
            if (method.getAnnotation(DisableInject.class) != null) {
                continue;
            }
            // 获取参数类型,并且非基础类型(String, Integer等类型)
            Class<?> pt = method.getParameterTypes()[0];
            if (ReflectUtils.isPrimitives(pt)) {
                continue;
            }
            try {
                // 获取需要set的扩展点名称
                String property = getSetterProperty(method);
                // 从ExtensionLoader中加载指定的扩展点
                // 比如有一个方法为setRandom(LoadBalance loadBalance),那么则以为着需
                要加载负载均衡中名为random的扩展点
                Object object = objectFactory.getExtension(pt, property);
                if (object != null) {
                    4.3 Adaptive功能实现原理
                    Adaptive的主要功能是对所有的扩展点进行封装为一个类,通过URL传入参数的时动态选择需要使用的
                    扩展点。其底层的实现原理就是动态代理,这里我们会通过源码的形式告诉大家,他是如何通过动态代
                    理进行加载的。
                    (1)这里我们getAdaptiveExtension 方法讲起,这个里面就是真正获取该类。这里可以看到,
                    ExtentionLoader 中大量的使用了Holder和加锁的方式去进行唯一创建。
                    method.invoke(instance, object);
                }
            } catch (Exception e) {
                logger.error("Failed to inject via method " + method.getName()
                + " of interface " + type.getName() + ": " +
                e.getMessage(), e);
            }
        }
    } catch (Exception e) {
        logger.error(e.getMessage(), e);
    }
    return instance;
}

  • 最后完成实例化类

4.3 Adaptive功能实现原理

Adaptive的主要功能是对所有的扩展点进行封装为一个类,通过URL传入参数的时动态选择需要使用的扩展点。其底层的实现原理就是动态代理,这里我们会通过源码的形式告诉大家,他是如何通过动态代理进行加载的。

开始追源码:

  • org.apache.dubbo.common.extension.ExtensionLoader#getAdaptiveExtension这个里面就是真正获取该类。这里可以看到,ExtentionLoader中大量的使用了Holder和加锁的方式去进行唯一创建。
public T getAdaptiveExtension() {
    // 和原先是用相同的方式,进行Holder和加锁的方式来保证只会被创建一次
    Object instance = cachedAdaptiveInstance.get();
    if (instance == null) {
        // 如果直接已经有创建并且错误的情况,则直接返回错误信息,防止重复没必要的创建
        if (createAdaptiveInstanceError != null) {
            throw new IllegalStateException("Failed to create adaptive instance:
            " +
            createAdaptiveInstanceError.toString(),
            createAdaptiveInstanceError);
        }
        synchronized (cachedAdaptiveInstance) {
            instance = cachedAdaptiveInstance.get();
                if (instance == null) {
                try {
                    // 这里真实的进行创建操作
                    instance = createAdaptiveExtension();
                    cachedAdaptiveInstance.set(instance);
                } catch (Throwable t) {
                    createAdaptiveInstanceError = t;
                    throw new IllegalStateException("Failed to create adaptive
                    instance: " + t.toString(), t);
                }
            }
        }
    }
    return (T) instance;
}

  • 进入createAdaptiveExtension,主要是进行了一些方法封装。
  • 进入createAdaptiveExtensionClass,主要是进行生成Adaptive的代码,并且 进行编译生成class。
private Class<?> createAdaptiveExtensionClass() {
    // 实例化一个新的Adaptive的代码生成器,并且进行代码生成
    String code = new AdaptiveClassCodeGenerator(type,cachedDefaultName).generate();
    // 获取类加载器
    ClassLoader classLoader = findClassLoader();
    // 通过扩展点,寻找编译器, 目前有Java自带的编译器和Javassist的编译器,这里不做细展开
    org.apache.dubbo.common.compiler.Compiler compiler =
    ExtensionLoader.getExtensionLoader(org.apache.dubbo.common.compiler.Compiler.cla
    ss).getAdaptiveExtension();
    // 编译并且生成class
    return compiler.compile(code, classLoader);
}

  • 体通过AdaptiveClassLoaderCodeGenerator.generate方法来进行实现真正的代码生成。
public String generate() {
    // 如果没有任何方法标记为Adaptive,则不做处理
    if (!hasAdaptiveMethod()) {
        throw new IllegalStateException("No adaptive method exist on extension "
        + type.getName() + ", refuse to create the adaptive class!");
    }
    // 进行编写代码
    StringBuilder code = new StringBuilder();
    // 生成包信息
    code.append(generatePackageInfo());
    // 生成引用信息
    code.append(generateImports());
    // 生成类声明
    (5)这里主要对其中的每一个方法来做处理。具体主要观看generateMethod 方法。这里的很多方法
    主要是依赖反射机制去进行方法封装,最终拼接为一个最终字符串。其中最关键的方法在于
    generateMethodContent 方法来生成代理功能。
    (6) generateMethodContent 方法解读。这块儿更推荐通过debug的形式走进来, 看代码也更直接了
    当(就可以直接按照常用功能中的SPI章节来debug)。这部分也是整个Adaptive中最为核心的代码,包括
    获取扩展点名称并且执行。
    code.append(generateClassDeclaration());
    // 生成每一个方法
    Method[] methods = type.getMethods();
    for (Method method : methods) {
        code.append(generateMethod(method));
    }
    // 输出最后的一个"}"来结束当前类
    code.append("}");
    if (logger.isDebugEnabled()) {
        logger.debug(code.toString());
    }
    return code.toString();
}

  • 具体主要观看generateMethod 方法。这里的很多方法 主要是依赖反射机制去进行方法封装,最终拼接为一个最终字符串。其中最关键的方法在于generateMethodContent 方法来生成代理功能。
  • generateMethodContent 整个Adaptive中最为核心的代码,拼接需要的部分类字符串,如下
if (arg0 == null) throw new IllegalArgumentException("url == null");
org.apache.dubbo.common.URL url = arg0;
String extName = url.getParameter("hello.service", "human");
if(extName == null) throw new IllegalStateException("Failed to get extension (com.lagou.service.HelloService) name from url (" + url.toString() + ") use keys([hello.service])");
com.lagou.service.HelloService extension = (com.lagou.service.HelloService)ExtensionLoader.getExtensionLoader(com.lagou.service.HelloService.class).getExtension(extName);
return extension.sayHello(arg0);

总结:Adaptive功能实现原理就是把带有@Adaptive请求的接口转化字符串重新生产dubbo所需要的类字符串,使用ExtensionLoader#compile去编译生成最终目的是去extension执行 url所需要的实现类。

5. 集群容错源码剖析

集群容错的所有组件。包含 Cluster、Cluster Invoker、Directory、Router 和 LoadBalance 等。

86_10.png

集群工作过程可分为两个阶段:

  • 第一个阶段是在服务消费者初始化期间,集群 Cluster 实现类为服务消费者创建 Cluster Invoker 实例,即上图中的 merge 操作。
  • 第二个阶段是在服务消费者进行远程调用 时。以 FailoverClusterInvoker 为例,
    • 该类型 Cluster Invoker 首先会调用 Directory 的 list 方法列举 Invoker 列表可感知注册中心配置的变化。每次变化后,RegistryDirectory 会动态增删Invoker。
    • 调用 Router 的 route 方法进行路由,过滤掉不符合路由规则的 Invoker
    • FailoverClusterInvoker 拿到 Directory 返回的 Invoker 列表后,它会通过 LoadBalance 从 Invoker 列 表中选择一个 Invoker。
    • 最后 FailoverClusterInvoker 会将参数传给 LoadBalance 选择出的 Invoker 实例的 invoke 方法,进行真正的远程调用。

Dubbo 主要提供了这样几种容错方式:

  • Failover Cluster – 失败自动切换 失败时会重试其它服务器
  • Failfast Cluster – 快速失败 请求失败后快速返回异常结果 不重试
  • Failsafe Cluster – 失败安全 出现异常 直接忽略 会对请求做负载均衡
  • Failback Cluster – 失败自动恢复 请求失败后 会自动记录请求到失败队列中
  • Forking Cluster – 并行调用多个服务提供者 其中有一个返回 则立即返回结果

5.1 信息缓存接口Directory

Directory是Dubbo中的一个接口,主要用于缓存当前可以被调用的提供者列表信息。我们在消费者进 行调用时都会通过这个接口来获取所有的提供者列表,再进行后续处理。

源码追踪:

  • Directory 接口,通过Directory 来找到指定服务中的提 供者信息列表。
public interface Directory<T> extends Node {
    // 获取服务的类型,也就是我们demo中所使用的HelloService
    Class<T> getInterface();
    // 根据本次调用的信息来获取所有可以被执行的提供者信息
    List<Invoker<T>> list(Invocation invocation) throws RpcException;
    // 获取所有的提供者信息
    List<Invoker<T>> getAllInvokers();
}

  • Directory实现类 -> AbstractDirectory中的list#doList,依靠routerChain去决定真实返回的提供者列表。
  • 到org/apache/dubbo/rpc/cluster/RouterChain.java#route,追到这里,发现invokers,我们先要去知道invoker 列表是从哪里生成获取的
  • 根据开始调用链路图,发现 RegistryProtocol#refer#doRefer是Invoker 生成的部分关键代码。
  • url 绑定directory,cluster.join(directory) 加入到集群,生产Invoker,如下:
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
        // 实例化Directory
        RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
        // 设置注册中心和所使用的协议
        directory.setRegistry(registry);
        directory.setProtocol(protocol);
        // all attributes of REFER_KEY
        //生成监听路径URL
        Map<String, String> parameters = new HashMap<String, String>(directory.getConsumerUrl().getParameters());
        URL subscribeUrl = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters);
        if (directory.isShouldRegister()) {
            // 在Directory中设置监听的consumerurl地址
            directory.setRegisteredConsumerUrl(subscribeUrl);
            // 在注册中心中注册消费者URL
            // 也就是我们之前的Zookeeper的node中看到的consumer://
            registry.register(directory.getRegisteredConsumerUrl());
        }
        // 构建路由链
        directory.buildRouterChain(subscribeUrl);
        // 进行监听所有的的provider
        directory.subscribe(toSubscribeUrl(subscribeUrl));
        // 加入到集群中
        Invoker<T> invoker = cluster.join(directory);
        List<RegistryProtocolListener> listeners = findRegistryProtocolListeners(url);
        if (CollectionUtils.isEmpty(listeners)) {
            return invoker;
        }

        RegistryInvokerWrapper<T> registryInvokerWrapper = new RegistryInvokerWrapper<>(directory, cluster, invoker, subscribeUrl);

        for (RegistryProtocolListener listener : listeners) {
            listener.onRefer(this, registryInvokerWrapper);
        }
        return registryInvokerWrapper;
    }

  • 回到RouterChain#route方法。这里所做的就是依次遍历所有的路由,然后分别执行并返回。这也就是整体的路由规则的实现。

5.2 路由规则实现原理

  • 通过 org.apache.dubbo.rpc.cluster.router.condition.ConditionRouter#route 去map中判断是url是否在其中,如果不存在任何invoker则直接返回,有则加入到list
  • 生成整个路由规则,ConditionRouter#init,解析根据”=>”来判断when或者then条件,符合条件的加入到values列表中

5.3 Cluster组件

主要用于代理真正的Invoker执行时做处理,提供了多种容错方案。源码追:

  • 接口
// 默认使用failover作为实现
@SPI(FailoverCluster.NAME)
public interface Cluster {
    // 生成一个新的invoker
    @Adaptive
    <T> Invoker<T> join(Directory<T> directory) throws RpcException;
}

  • failover 只是进行new了一个新的Invoker。
  • AbstractClusterInvoker.invoke
public Result invoke(final Invocation invocation) throws RpcException {
    // 检查是否已经关闭了
    checkWhetherDestroyed();
    // 拷贝当前RPCContext中的附加信息到当前的invocation中
    Map<String, String> contextAttachments =
    RpcContext.getContext().getAttachments();
    if (contextAttachments != null && contextAttachments.size() != 0) {
        ((RpcInvocation) invocation).addAttachments(contextAttachments);
    }
    // 找寻出所有支持的invoker,已经路由过的
    List<Invoker<T>> invokers = list(invocation);
    // 初始化负载均衡器
    LoadBalance loadbalance = initLoadBalance(invokers, invocation);
    // 用于适配异步请求使用
    RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
    // 交给子类进行真正处理请求
    return doInvoke(invocation, invokers, loadbalance);
}

  • FailoverClusterInvoker#doInvoke,获取这个方法最大的重试次数,通过for循环的形式表示可以重试的次数,选择具体的invoker(交给负载均衡),最终实现 invoke代理执行。
public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
        // 如果没有任何的invoker则抛出异常
        List<Invoker<T>> copyInvokers = invokers;
        checkInvokers(copyInvokers, invocation);

        // 获取这个方法最大的重试次数
        String methodName = RpcUtils.getMethodName(invocation);
        int len = getUrl().getMethodParameter(methodName, RETRIES_KEY, DEFAULT_RETRIES) + 1;
        if (len <= 0) {
            len = 1;
        }

        // 通过for循环的形式表示可以重试的次数
        RpcException le = null; // last exception.
        List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyInvokers.size()); // invoked invokers.
        Set<String> providers = new HashSet<String>(len);
        for (int i = 0; i < len; i++) {
            if (i > 0) {
                // 每次都执行一次是否关闭当前consumer的判断
                checkWhetherDestroyed();
                // 重新获取一遍invoker列表
                copyInvokers = list(invocation);
                // 再次进行一次存在invoker的检查
                checkInvokers(copyInvokers, invocation);
            }

            // 选择具体的invoker(交给负载均衡)
            Invoker<T> invoker = select(loadbalance, invocation, copyInvokers, invoked);

            // 增加到已经执行过得invoker列表中
            invoked.add(invoker);
            RpcContext.getContext().setInvokers((List) invoked);
            try {
                // 让其真正的去进行执行操作
                Result result = invoker.invoke(invocation);
                if (le != null && logger.isWarnEnabled()) {
                    logger.warn("Although retry the method " + methodName
                            + " in the service " + getInterface().getName()
                            + " was successful by the provider " + invoker.getUrl().getAddress()
                            + ", but there have been failed providers " + providers
                            + " (" + providers.size() + "/" + copyInvokers.size()
                            + ") from the registry " + directory.getUrl().getAddress()
                            + " on the consumer " + NetUtils.getLocalHost()
                            + " using the dubbo version " + Version.getVersion() + ". Last error is: "
                            + le.getMessage(), le);
                }
                return result;
            } catch (RpcException e) {
                // 如果是业务异常则直接抛出
                if (e.isBiz()) {
                    throw e;
                }
                le = e;
            } catch (Throwable e) {
                le = new RpcException(e.getMessage(), e);
            } finally {
                providers.add(invoker.getUrl().getAddress());
            }
        }

        // 如果重试了指定次数后依旧失败,则直接认定为失败
        throw new RpcException(le.getCode(), "Failed to invoke the method "
                + methodName + " in the service " + getInterface().getName()
                + ". Tried " + len + " times of the providers " + providers
                + " (" + providers.size() + "/" + copyInvokers.size()
                + ") from the registry " + directory.getUrl().getAddress()
                + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version "
                + Version.getVersion() + ". Last error is: "
                + le.getMessage(), le.getCause() != null ? le.getCause() : le);
    }

5.4 负载均衡实现原理

过程比较简单就是选择具体负载均衡策略实现类,可以关注下策略算法:

  • LoadBalance接口定义。这里默认选择了随机算法。
// 默认使用随机算法
@SPI(RandomLoadBalance.NAME)
public interface LoadBalance {
    // 进行选择真正的invoker
    @Adaptive("loadbalance")
    <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation
    invocation) throws RpcException;
}

  • LoadBalance 依旧选择了AbstractLoadBalance 作为基础的实现类。select 最终交给子类默认实现doSelect方法去执行。
  • 默认算法走起:
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
        // 总计的invoker列表数量
        int length = invokers.size();
        // 默认每个invoker的权重都是相同的
        boolean sameWeight = true;
        // 所有的权重列表
        int[] weights = new int[length];
        // 首个invoker的权重信息
        int firstWeight = getWeight(invokers.get(0), invocation);
        weights[0] = firstWeight;
        // 计算总共的权重,并且吧每一个invoker的权重进行设置到列表中
        int totalWeight = firstWeight;
        for (int i = 1; i < length; i++) {
            int weight = getWeight(invokers.get(i), invocation);
            // save for later use
            weights[i] = weight;
            // Sum
            totalWeight += weight;
            if (sameWeight && weight != firstWeight) {
                sameWeight = false;
            }
        }
        // 如果权重不相同    10   30   50   100
        if (totalWeight > 0 && !sameWeight) {
            // 通过总共的权重来随机分配
            int offset = ThreadLocalRandom.current().nextInt(totalWeight);
            // 看看最终落到哪一个机器上去
            for (int i = 0; i < length; i++) {
                offset -= weights[i];
                if (offset < 0) {
                    return invokers.get(i);
                }
            }
        }
        // 如果权重都是相同的话,则随机选取一个即可
        return invokers.get(ThreadLocalRandom.current().nextInt(length));
    }

5.5 Invoker执行逻辑

Invoker就是我们真实执行请求的组件。这里也会衍生出我们真正的Dubbo 或者Grpc 等其他协议的请求。

  • 找到Invoker接口
  • 查看实现类org.apache.dubbo.rpc.protocol.AbstractInvoker#invoke主要做的 是基础信息封装,并且将请求真正的子类。
  • 来到 org.apache.dubbo.rpc.protocol.dubbo.DubboInvoker#doInvoke
  • 来到 org.apache.dubbo.remoting.exchange.support.header.HeaderExchangeChannel#send
  • 最终选择Netty 的channel 实现

6. 网络通信原理剖析

dubbo协议采用固定长度的消息头(16字节)和不定长度的消息体来进行数据传输,消息头定义了底层框架(netty)在IO线程处理时需要的信息,协议的报文格式如下:

86_11.png

6.1 数据包结构

协议详情:

  • Magic – Magic High & Magic Low (16 bits) 标识协议版本号,Dubbo 协议:0xdabb
  • Serialization ID (5 bit) 标识序列化类型:比如 fastjson 的值为6。
  • Event (1 bit) 标识是否是事件消息,例如,心跳事件。如果这是一个事件,则设置为1。
  • Two Way (1 bit) 仅在 Req/Res 为1(请求)时才有用,标记是否期望从服务器返回值。如果需要来自服务器的返回值,则设置为1。
  • Req/Res (1 bit) 标识是请求或响应。请求: 1; 响应: 0。
  • Status (8 bits) 仅在 Req/Res 为0(响应)时有用,用于标识响应的状态。
    • 20 – OK
    • 30 – CLIENT_TIMEOUT
    • 31 – SERVER_TIMEOUT
    • 40 – BAD_REQUEST
    • 50 – BAD_RESPONSE
    • 60 – SERVICE_NOT_FOUND
    • 70 – SERVICE_ERROR
    • 80 – SERVER_ERROR
    • 90 – CLIENT_ERROR
    • 100 – SERVER_THREADPOOL_EXHAUSTED_ERROR
  • Request ID (64 bits) 标识唯一请求。类型为long。
  • Data Length (32 bits) 序列化后的内容长度(可变部分),按字节计数。int类型。
  • Variable Part 被特定的序列化类型(由序列化ID标识)序列化后,每个部分都是一个 byte [] 或者 byte
    • 如果是请求包 ( Req/Res = 1),则每个部分依次为:
      • Dubbo version
      • Service name
      • Service version
      • Method name
      • Method parameter types
      • Method arguments
      • Attachments
    • 如果是响应包(Req/Res = 0)则每个部分依次为:
      • 返回值类型(byte),标识从服务器端返回的值类型:
      • 返回空值:RESPONSE_NULL_VALUE 2
      • 正常响应值: RESPONSE_VALUE 1
      • 异常:RESPONSE_WITH_EXCEPTION 0
      • 返回值:从服务端返回的响应bytes

注意:对于(Variable Part)变长部分,当前版本的Dubbo 框架使用json序列化时,在每部分内容间 额外增加了换行符作为分隔,请在Variable Part的每个part后额外增加换行符, 如 : Dubbo version bytes (换行符) Service name bytes (换行符)

优点:

  • 协议设计上很紧凑,能用 1 个 bit 表示的,不会用一个 byte 来表示,比如 boolean 类型的标识。
  • 请求、响应的 header一致,通过序列化器对content组装特定的内容,代码实现起来简单。

可以改进的点 :

  • 类似于 http 请求,通过 header 就可以确定要访问的资源,而 Dubbo 需要涉及到用特定序列化协议才可以将服务名、方法、方法签名解析出来,并且这些资源定位符是 string 类型或者 string数组,很容易转成 bytes,因此可以组装到 header 中。类似于 http2的header压缩,对于rpc调用的资源也可以协商出来一个int来标识,从而提升性能,如果在header 上组装资源定位符的话,该功能则更易实现。
  • 通过 req/res 是否是请求后,可以精细定制协议,去掉一些不需要的标识和添加一些特定的标识。 比如status , twoWay 标识可以严格定制,去掉冗余标识。还有超时时间是作为 Dubbo 的attachment 进行传输的,理论上应该放到请求协议的header中,因为超时是网络请求中必不可少的。提到 attachment ,通过实现可以看到 attachment 中有一些是跟协议 content 中已有的字段是重复的,比如 path 和version等字段,这些会增大协议尺寸。
  • Dubbo 会将服务名 com.alibaba.middleware.hsf.guide.api.param.ModifyOrderPriceParam ,转换为Lcom/alibaba/middleware/hsf/guide/api/param/ModifyOrderPriceParam; ,理论上是不必要的,最后追加一个; 即可。
  • Dubbo 协议没有预留扩展字段,没法新增标识,扩展性不太好,比如新增响应上下文的功能,只有改协议版本号的方式,但是这样要求客户端和服务端的版本都进行升级,对于分布式场景很不友好。

6.2 数据协议ExchangeCodec详解

ExchangeCodec 类,这个也是Dubbo在进行数据传输中的数据协议类。

  • 常量定义
    // header length.
    // 请求头的长度
    protected static final int HEADER_LENGTH = 16;
    // magic header.
    // 标示为0-15位  魔数
    protected static final short MAGIC = (short) 0xdabb;
    protected static final byte MAGIC_HIGH = Bytes.short2bytes(MAGIC)[0];
    protected static final byte MAGIC_LOW = Bytes.short2bytes(MAGIC)[1];
    // message flag.
    // 消息中的标示
    protected static final byte FLAG_REQUEST = (byte) 0x80;
    protected static final byte FLAG_TWOWAY = (byte) 0x40;
    protected static final byte FLAG_EVENT = (byte) 0x20;
    protected static final int SERIALIZATION_MASK = 0x1f;

  • 这个类中encode 和decode 分别用于将数据发送到ByteBuffer 中,还有就是将其反向的转换为对象。encode中的Request就是我们之前所讲的Request对象。
     public void encode(Channel channel, ChannelBuffer buffer, Object msg) throws IOException {
        // 处理请求对象
        if (msg instanceof Request) {
            encodeRequest(channel, buffer, (Request) msg);
        } else if (msg instanceof Response) {
            // 处理响应
            encodeResponse(channel, buffer, (Response) msg);
        } else {
            // 其他的交给上级处理,用于telnet模式
            super.encode(channel, buffer, msg);
        }
    }

    @Override
     public Object decode(Channel channel, ChannelBuffer buffer) throws IOException {
        // 可读字节数
        int readable = buffer.readableBytes();
        // 选取可读字节数 和  HEADER_LENGTH 中小的
        byte[] header = new byte[Math.min(readable, HEADER_LENGTH)];
        buffer.readBytes(header);
        return decode(channel, buffer, readable, header);
    }

  • encodeRequest 方法。这里也验证了我们之前所讲的header内容
     protected void encodeRequest(Channel channel, ChannelBuffer buffer, Request req) throws IOException {
        // 请求的序列化类型
        Serialization serialization = getSerialization(channel);
        // header.   // 写入header信息
        byte[] header = new byte[HEADER_LENGTH];
        // set magic number. 设置魔数
        Bytes.short2bytes(MAGIC, header);

        // set request and serialization flag.   // 标记为请求
        header[2] = (byte) (FLAG_REQUEST | serialization.getContentTypeId());
        // 是否是单向还是双向的(异步)
        if (req.isTwoWay()) {
            header[2] |= FLAG_TWOWAY;
        }
        // 是否为事件(心跳)
        if (req.isEvent()) {
            header[2] |= FLAG_EVENT;
        }

        // set request id.
        // 写入当前的请求ID
        Bytes.long2bytes(req.getId(), header, 4);

        // encode request data.
        // 保存当前写入的位置,将其写入的位置往后面偏移,保留出写入内容大小的位置,先进行写入body内容
        int savedWriteIndex = buffer.writerIndex();
        buffer.writerIndex(savedWriteIndex + HEADER_LENGTH);
        ChannelBufferOutputStream bos = new ChannelBufferOutputStream(buffer);
        ObjectOutput out = serialization.serialize(channel.getUrl(), bos);
        // 按照数据内容的不同,来写入不同的内容
        if (req.isEvent()) {
            encodeEventData(channel, out, req.getData());
        } else {
            encodeRequestData(channel, out, req.getData(), req.getVersion());
        }
        out.flushBuffer();
        if (out instanceof Cleanable) {
            ((Cleanable) out).cleanup();
        }
        bos.flush();
        bos.close();
        // 记录body中写入的长度
        int len = bos.writtenBytes();
        checkPayload(channel, len);
        // 将其写入到header中的位置中
        Bytes.int2bytes(len, header, 12);

        // write   // 发送到buffer中
        buffer.writerIndex(savedWriteIndex);
        buffer.writeBytes(header); // write header.
        buffer.writerIndex(savedWriteIndex + HEADER_LENGTH + len);
    }

  • org.apache.dubbo.remoting.exchange.codec.ExchangeCodec#encodeRequestData,在子类DubboCodec 中
     protected void encodeRequestData(Channel channel, ObjectOutput out, Object data, String version) throws IOException {
        RpcInvocation inv = (RpcInvocation) data;

        // 写入版本
        out.writeUTF(version);
        // 接口全名称
        out.writeUTF(inv.getAttachment(PATH_KEY));
        // 接口版本号
        out.writeUTF(inv.getAttachment(VERSION_KEY));
        // 写入方法名称
        out.writeUTF(inv.getMethodName());
        // 调用参数描述信息
        out.writeUTF(inv.getParameterTypesDesc());
        // 所有的请求参数写入
        Object[] args = inv.getArguments();
        if (args != null) {
            for (int i = 0; i < args.length; i++) {
                out.writeObject(encodeInvocationArgument(channel, inv, i));
            }
        }
        // 写入所有的附加信息
        out.writeAttachments(inv.getObjectAttachments());
    }

  • 再看encodeResponse 方法实现。一样的,这里可以看到和写入request相似。
     protected void encodeResponse(Channel channel, ChannelBuffer buffer, Response res) throws IOException {
        int savedWriteIndex = buffer.writerIndex();
        try {
            Serialization serialization = getSerialization(channel);

            // 和之前的参数一致
            byte[] header = new byte[HEADER_LENGTH];
            Bytes.short2bytes(MAGIC, header);
            header[2] = serialization.getContentTypeId();
            if (res.isHeartbeat()) {
                header[2] |= FLAG_EVENT;
            }

            // 写入状态码
            byte status = res.getStatus();
            header[3] = status;

            // 写入内容
            Bytes.long2bytes(res.getId(), header, 4);

            // 和Request一样的内容写入方式,先写入内容,再写入长度
            buffer.writerIndex(savedWriteIndex + HEADER_LENGTH);
            ChannelBufferOutputStream bos = new ChannelBufferOutputStream(buffer);
            ObjectOutput out = serialization.serialize(channel.getUrl(), bos);
            // encode response data or error message.
            if (status  == Response.OK) {
                if (res.isHeartbeat()) {
                    encodeEventData(channel, out, res.getResult());
                } else {
                    encodeResponseData(channel, out, res.getResult(), res.getVersion());
                }
            } else {
                // 这里不太一样的地方在于,如果错误的时候,则直接将错误信息写入,不需要再交由序列化
                out.writeUTF(res.getErrorMessage());
            }
            out.flushBuffer();
            if (out instanceof Cleanable) {
                ((Cleanable) out).cleanup();
            }
            bos.flush();
            bos.close();

            // 一样的写入模式
            int len = bos.writtenBytes();
            checkPayload(channel, len);
            Bytes.int2bytes(len, header, 12);
            buffer.writerIndex(savedWriteIndex);
            buffer.writeBytes(header); // write header.
            buffer.writerIndex(savedWriteIndex + HEADER_LENGTH + len);
        } catch (Throwable t) {
            // 写入出现异常
            buffer.writerIndex(savedWriteIndex);
            // send error message to Consumer, otherwise, Consumer will wait till timeout.
            if (!res.isEvent() && res.getStatus() != Response.BAD_RESPONSE) {
                Response r = new Response(res.getId(), res.getVersion());
                r.setStatus(Response.BAD_RESPONSE);

                // 如果是超过内容长度则重新设置内容大小并写入
                if (t instanceof ExceedPayloadLimitException) {
                    logger.warn(t.getMessage(), t);
                    try {
                        r.setErrorMessage(t.getMessage());
                        channel.send(r);
                        return;
                    } catch (RemotingException e) {
                        logger.warn("Failed to send bad_response info back: " + t.getMessage() + ", cause: " + e.getMessage(), e);
                    }
                }
            }
            // Rethrow exception
            if (t instanceof IOException) {
                throw (IOException) t;
            } else if (t instanceof RuntimeException) {
                throw (RuntimeException) t;
            } else if (t instanceof Error) {
                throw (Error) t;
            } else {
                throw new RuntimeException(t.getMessage(), t);
            }
        }
    }

  • 解码在同样的类中,不做详细介绍了…

6.3 处理粘包和拆包问题

拆包:

当发生TCP拆包问题时候 这里假设之前还没有发生过任何数据交互,系统刚刚初始化好,那么这个时候在
InternalDecoder里面的buffer属性会是EMPTY_BUFFER。当发生第一次inbound数据的时候,第一次
在InternalDecoder里面接收的肯定是dubbo消息头的部分(这个由TCP协议保证),由于发生了拆包情
况,那么此时接收的inbound消息可能存在一下几种情况
1、当前inbound消息只包含dubbo协议头的一部分
2、当前inbound消息只包含dubbo的协议头
3、当前inbound消息只包含dubbo消息头和部分payload消息

通过上面的讨论,我们知道发生上面三种情况,都会触发ExchangeCodec返回NEED_MORE_INPUT,由于
在DubboCountCodec对于返回NEED_MORE_INPUT会回滚读索引,所以此时的buffer里面的数据可以当作
并没有发生过读取操作,并且DubboCountCodec的decode也会返回NEED_MORE_INPUT,在
InternalDecoder对于当判断返回NEED_MORE_INPUT,也会进行读索引回滚,并且退出循环,最后会执
行finally内容,这里会判断inbound消息是否还有可读的,由于在DubboCountCodec里面进行了读索引
回滚,所以此时的buffer里面不是完整的inbound消息,等待第二次的inbound消息的到来,当第二次
inbound消息过来的时候,再次经过上面的判断。

     protected Object decode(Channel channel, ChannelBuffer buffer, int readable, byte[] header) throws IOException {
        // 检查魔数
        if (readable > 0 && header[0] != MAGIC_HIGH
                || readable > 1 && header[1] != MAGIC_LOW) {
            int length = header.length;
            if (header.length < readable) {
                header = Bytes.copyOf(header, readable);
                buffer.readBytes(header, length, readable - length);
            }
            for (int i = 1; i < header.length - 1; i++) {
                if (header[i] == MAGIC_HIGH && header[i + 1] == MAGIC_LOW) {
                    buffer.readerIndex(buffer.readerIndex() - header.length + i);
                    header = Bytes.copyOf(header, i);
                    break;
                }
            }
            return super.decode(channel, buffer, readable, header);
        }
        // check length. 不完整的包
        if (readable < HEADER_LENGTH) {
            return DecodeResult.NEED_MORE_INPUT;
        }

        // 获取数据长度
        int len = Bytes.bytes2int(header, 12);
        checkPayload(channel, len);

        int tt = len + HEADER_LENGTH;
        // 需要继续读取
        if (readable < tt) {
            return DecodeResult.NEED_MORE_INPUT;
        }

        // limit input stream.
        ChannelBufferInputStream is = new ChannelBufferInputStream(buffer, len);

        try {
            // 解码数据
            return decodeBody(channel, is, header);
        } finally {
            if (is.available() > 0) {
                try {
                    if (logger.isWarnEnabled()) {
                        logger.warn("Skip input stream " + is.available());
                    }
                    StreamUtils.skipUnusedStream(is);
                } catch (IOException e) {
                    logger.warn(e.getMessage(), e);
                }
            }
        }
    }

说明:如果解码返回DecodeResult.NEED_MORE_INPUT,说明信息不完整,需要读索引回滚,并且退出循环,最后会执 行finally内容,这里会判断inbound消息是否还有可读的,由于在DubboCountCodec里面进行了读索引 回滚,所以此时的buffer里面不是完整的inbound消息,等待第二次的inbound消息的到来,当第二次inbound消息过来的时候,再次经过上面的判断,如下代码:

    try {
        // decode object.
        do {
            saveReaderIndex = message.readerIndex();
            try {
                msg = codec.decode(channel, message);
            } catch (IOException e) {
                buffer = org.apache.dubbo.remoting.buffer.ChannelBuffers.EMPTY_BUFFER;
                throw e;
            }
            if (msg == Codec2.DecodeResult.NEED_MORE_INPUT) {

                // 回滚重读
                message.readerIndex(saveReaderIndex);
                break;
            } else {
                if (saveReaderIndex == message.readerIndex()) {
                    buffer = org.apache.dubbo.remoting.buffer.ChannelBuffers.EMPTY_BUFFER;
                    throw new IOException("Decode without read data.");
                }
                if (msg != null) {
                    Channels.fireMessageReceived(ctx, msg, event.getRemoteAddress());
                }
            }
        } while (message.readable());
    } finally {
        // 判断消息是否可读
        if (message.readable()) {
            message.discardReadBytes();
            buffer = message;
        } else {
            buffer = org.apache.dubbo.remoting.buffer.ChannelBuffers.EMPTY_BUFFER;
        }
        NettyChannel.removeChannelIfDisconnected(ctx.getChannel());
    }

messageReceived的核心就是message.readerIndex(saveReaderIndex) ,回滚重读。最后判断消息是否可读。

粘包:

当发生TCP粘包的时候 是tcp将一个dubbo协议栈放在一个tcp包中,那么有可能发生下面几种情况
1、当前inbound消息只包含一个dubbo协议栈
2、当前inbound消息包含一个dubbo协议栈,同时包含部分另一个或者多个dubbo协议栈内容
如果发生只包含一个协议栈,那么当前buffer通过ExchangeCodec解析协议之后,当前的buffer的
readeIndex位置应该是buffer尾部,那么在返回到InternalDecoder中message的方法readable返回
的是false,那么就会对buffer重新赋予EMPTY_BUFFER实体,而针对包含一个以上的dubbo协议栈,当然
也会解析出其中一个dubbo协议栈,但是经过ExchangeCodec解析之后,message的readIndex不在
message尾部,所以message的readable方法返回的是true。那么则会继续遍历message,读取下面的
信息。最终要么message刚好整数倍包含完整的dubbo协议栈,要不ExchangeCodec返回
NEED_MORE_INPUT,最后将未读完的数据缓存到buffer中,等待下次inbound事件,将buffer中的消息合
并到下次的inbound消息中,种类又回到了拆包的问题上。
dubbo在处理tcp的粘包和拆包时是借助InternalDecoder的buffer缓存对象来缓存不完整的dubbo协议
栈数据,等待下次inbound事件,合并进去。

总结 :

在dubbo中解决TCP拆包和粘包的时候是通过buffer 变量来解决的。

  • 发生拆包肯定是接收消息不完整的,通过检查信息完整度返回NEED_MORE_INPUT消息,知道整合完整。
  • 发生粘包也是消息包含一个完整或又同时包含多个dubbo协议内容,循环读取。如果还有不完整的最后解决不完整协议内容会又回到拆包问题上,等待下次inbound事件进行合并。

TCP拆包粘包问题:

  • 接收到的是一个报文,它是由发送的两个报文组成的,这样对于应用程序来说就很难处理了(粘包)

86_12.png

  • 还有可能出现上面这样的虽然收到了两个包,但是里面的内容却是互相包含,对于应用来说依然无法解析(拆包)。

86_13.png

文章永久链接:https://tech.souyunku.com/29270

未经允许不得转载:搜云库技术团队 » Dubbo从实战->源码分析

JetBrains 全家桶,激活、破解、教程

提供 JetBrains 全家桶激活码、注册码、破解补丁下载及详细激活教程,支持 IntelliJ IDEA、PyCharm、WebStorm 等工具的永久激活。无论是破解教程,还是最新激活码,均可免费获得,帮助开发者解决常见激活问题,确保轻松破解并快速使用 JetBrains 软件。获取免费的破解补丁和激活码,快速解决激活难题,全面覆盖 2024/2025 版本!

联系我们联系我们