专注于 JetBrains IDEA 全家桶,永久激活,教程
持续更新 PyCharm,IDEA,WebStorm,PhpStorm,DataGrip,RubyMine,CLion,AppCode 永久激活教程

Ribbon源码解析

Spring-Cloud-Gateway 默认集成了一些负载均衡策略,比如轮询、随机、基于响应时间设置权重等等。由于业务需要,需要自定义一个策略,于是花时间先研究了下源码。先上结论:

一、结论

1、 LoadBalancerClient 接口中定义了 ServiceInstance choose(String serviceId) 方法,根据服务名获取具体的服务实例;
2、 SpringClientFactory 类中定义了 Map<String, AnnotationConfigApplicationContext> contexts,其中 key 为 serviceId,value 为 AnnotationConfigApplicationContext 对象。这意味着可以为每个服务设置不同的负载均衡策略。AnnotationConfigApplicationContext 中主要保存了 ILoadBalancer bean,定义了负载均衡的实现;而后者又依赖于:

 *  IClientConfig:定义了客户端配置,用于初始化客户端以及负载均衡配置;
 *  IRule:负载均衡的策略,比如轮询等;
 *  IPing:定义了如何确定服务实例是否正常;
 *  ServerList:定义了获取服务器列表的方法;
 *  ServerListFilter:根据配置或者过滤规则选择特定的服务器列表;
 *  ServerListUpdater:定义了动态更新服务器列表的策略。

关系图如下所示:

47_1.png

1、 AnnotationConfigApplicationContext 中确定 ILoadBalancer 的次序是:优先通过外部配置导入,其次是配置类。

二、源码解析

1. LoadBalancerClient

LoadBalancerClient 是一个负载均衡客户端,定义了 ServiceInstance choose(String serviceId) 方法,即通过服务名按照给定的策略获取服务实例。

其接口定义如下:

public interface LoadBalancerClient extends ServiceInstanceChooser {
    ...
}

public interface ServiceInstanceChooser {
    /**
     * Chooses a ServiceInstance from the LoadBalancer for the specified service.
     * @param serviceId The service ID to look up the LoadBalancer.
     * @return A ServiceInstance that matches the serviceId.
     */
    ServiceInstance choose(String serviceId);
}

public interface ServiceInstance {

    default String getInstanceId() {
        return null;
    }

    String getServiceId();

    String getHost();

    int getPort();

    boolean isSecure();

    URI getUri();

    Map<String, String> getMetadata();

    default String getScheme() {
        return null;
    }
}

默认实现类是 RibbonLoadBalancerClient:

public class RibbonLoadBalancerClient implements LoadBalancerClient {

    @Override
    public ServiceInstance choose(String serviceId) {
        return choose(serviceId, null);
    }

    public ServiceInstance choose(String serviceId, Object hint) {
        Server server = getServer(getLoadBalancer(serviceId), hint);
        if (server == null) {
            return null;
        }
        return new RibbonServer(serviceId, server, isSecure(server, serviceId),
                serverIntrospector(serviceId).getMetadata(server));
    }

    protected ILoadBalancer getLoadBalancer(String serviceId) {
        return this.clientFactory.getLoadBalancer(serviceId);
    }

}

第 9 行:getLoadBalancer(serviceId),先根据服务名获取负载均衡策略;getServer(getLoadBalancer(serviceId), hint),然后根据策略选择具体的服务实例。

第 18 行:根据服务名获取负载均衡策略由 SpringClientFactory 类实现。

2. SpringClientFactory

SpringClientFactory 中保存了每个服务的负载均衡的完整实现。

public class SpringClientFactory extends NamedContextFactory<RibbonClientSpecification> {
    public ILoadBalancer getLoadBalancer(String name) {
        return getInstance(name, ILoadBalancer.class);
    }

    @Override
    public <C> C getInstance(String name, Class<C> type) {
        C instance = super.getInstance(name, type);
        if (instance != null) {
            return instance;
        }
        IClientConfig config = getInstance(name, IClientConfig.class);
        return instantiateWithConfig(getContext(name), type, config);
    }
}

public abstract class NamedContextFactory<C extends NamedContextFactory.Specification>
        implements DisposableBean, ApplicationContextAware {
    // contexts 保存了每个 serviceId 对应的所有配置
    private Map<String, AnnotationConfigApplicationContext> contexts = new ConcurrentHashMap<>();

    public <T> T getInstance(String name, Class<T> type) {
        AnnotationConfigApplicationContext context = getContext(name);
        if (BeanFactoryUtils.beanNamesForTypeIncludingAncestors(context,
                type).length > 0) {
            return context.getBean(type);
        }
        return null;
    }

    // 返回 serviceId 对应的 AnnotationConfigApplicationContext
    // 如果不存在,调用 createContext(String name) 方法创建
    protected AnnotationConfigApplicationContext getContext(String name) {
        if (!this.contexts.containsKey(name)) {
            synchronized (this.contexts) {
                if (!this.contexts.containsKey(name)) {
                    this.contexts.put(name, createContext(name));
                }
            }
        }
        return this.contexts.get(name);
    }

    // 创建服务 name 对应的 AnnotationConfigApplicationContext
    protected AnnotationConfigApplicationContext createContext(String name) {
        AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext();
        // 1. 先检查外部导入的配置 configurations 中是否包含 serviceId 服务,有的话导入
        if (this.configurations.containsKey(name)) {
            for (Class<?> configuration : this.configurations.get(name)
                    .getConfiguration()) {
                context.register(configuration);
            }
        }
        // 2. 检查外部导入的配置中是否含有默认配置,有的话导入
        for (Map.Entry<String, C> entry : this.configurations.entrySet()) {
            if (entry.getKey().startsWith("default.")) {
                for (Class<?> configuration : entry.getValue().getConfiguration()) {
                    context.register(configuration);
                }
            }
        }
        // 3. 将 PropertyPlaceholderAutoConfiguration 以及 defaultConfigType 注册类中的 bean 注册到 context 中(如果使用 SpringClientFactory 的默认 bean 的话,这里的 this.defaultConfigType 指的是 RibbonClientConfiguration)
        context.register(PropertyPlaceholderAutoConfiguration.class,
                this.defaultConfigType);
        context.getEnvironment().getPropertySources().addFirst(new MapPropertySource(
                this.propertySourceName,
                Collections.<String, Object> singletonMap(this.propertyName, name)));
        if (this.parent != null) {
            // Uses Environment from parent as well as beans
            context.setParent(this.parent);
        }
        context.setDisplayName(generateDisplayName(name));
        context.refresh();
        return context;
    }
}

3. ILoadBalancer

LoadBalancerClient 的 getServer 方法最终调用的是 ILoadBalancer 接口的 chooseServer 方法。

public class RibbonLoadBalancerClient implements LoadBalancerClient {
    protected Server getServer(ILoadBalancer loadBalancer, Object hint) {
        if (loadBalancer == null) {
            return null;
        }
        // Use 'default' on a null hint, or just pass it on?
        return loadBalancer.chooseServer(hint != null ? hint : "default");
    }
    ...
}

public interface ILoadBalancer {
    /**
     * Choose a server from load balancer.
     */
    public Server chooseServer(Object key);
    ...
}

根据 ILoadBalancer 的 chooseServer(Object key) 方法选择服务实例。

ILoadBalancer 接口的实现类如下图所示,默认的 bean 为 ZoneAwareLoadBalancer。

47_2.png

分析下 ILoadBase 的基本要素:

public DynamicServerListLoadBalancer(IClientConfig clientConfig, IRule rule, IPing ping,
                                         ServerList<T> serverList, ServerListFilter<T> filter,
                                         ServerListUpdater serverListUpdater) {
        super(clientConfig, rule, ping);
        this.serverListImpl = serverList;
        this.filter = filter;
        this.serverListUpdater = serverListUpdater;
        if (filter instanceof AbstractServerListFilter) {
            ((AbstractServerListFilter) filter).setLoadBalancerStats(getLoadBalancerStats());
        }
        restOfInit(clientConfig);
    }

  • ClientConfig:定义了客户端配置,用于初始化客户端以及负载均衡配置。
  • IRule:负载均衡的规则(策略),比如轮询等。
  • IPing:定义了如何确定服务实例是否正常。
  • ServerList:定义了获取服务器列表的方法。
  • ServerListUpdater:定义了动态更新服务器列表的策略。
  • ServerListFilter:根据配置或者过滤规则选择特定的服务器列表,使用 consul 作为注册中心的话默认实现为 HealthServiceServerListFilter。

在 RibbonClientConfiguration 配置类中定义了默认实现:

@Configuration
@EnableConfigurationProperties
@Import({HttpClientConfiguration.class, OkHttpRibbonConfiguration.class, RestClientRibbonConfiguration.class, HttpClientRibbonConfiguration.class})
public class RibbonClientConfiguration {

    public static final int DEFAULT_CONNECT_TIMEOUT = 1000;
    public static final int DEFAULT_READ_TIMEOUT = 1000;
    public static final boolean DEFAULT_GZIP_PAYLOAD = true;

    @RibbonClientName
    private String name = "client";

    @Autowired
    private PropertiesFactory propertiesFactory;

    @Bean
    @ConditionalOnMissingBean
    public IClientConfig ribbonClientConfig() {
        DefaultClientConfigImpl config = new DefaultClientConfigImpl();
        config.loadProperties(this.name);
        config.set(CommonClientConfigKey.ConnectTimeout, DEFAULT_CONNECT_TIMEOUT);
        config.set(CommonClientConfigKey.ReadTimeout, DEFAULT_READ_TIMEOUT);
        config.set(CommonClientConfigKey.GZipPayload, DEFAULT_GZIP_PAYLOAD);
        return config;
    }

    @Bean
    @ConditionalOnMissingBean
    public IRule ribbonRule(IClientConfig config) {
        if (this.propertiesFactory.isSet(IRule.class, name)) {
            return this.propertiesFactory.get(IRule.class, config, name);
        }
        ZoneAvoidanceRule rule = new ZoneAvoidanceRule();
        rule.initWithNiwsConfig(config);
        return rule;
    }

    @Bean
    @ConditionalOnMissingBean
    public IPing ribbonPing(IClientConfig config) {
        if (this.propertiesFactory.isSet(IPing.class, name)) {
            return this.propertiesFactory.get(IPing.class, config, name);
        }
        return new DummyPing();
    }

    @Bean
    @ConditionalOnMissingBean
    @SuppressWarnings("unchecked")
    public ServerList<Server> ribbonServerList(IClientConfig config) {
        if (this.propertiesFactory.isSet(ServerList.class, name)) {
            return this.propertiesFactory.get(ServerList.class, config, name);
        }
        ConfigurationBasedServerList serverList = new ConfigurationBasedServerList();
        serverList.initWithNiwsConfig(config);
        return serverList;
    }

    @Bean
    @ConditionalOnMissingBean
    public ServerListUpdater ribbonServerListUpdater(IClientConfig config) {
        return new PollingServerListUpdater(config);
    }

    @Bean
    @ConditionalOnMissingBean
    public ILoadBalancer ribbonLoadBalancer(IClientConfig config,
            ServerList<Server> serverList, ServerListFilter<Server> serverListFilter,
            IRule rule, IPing ping, ServerListUpdater serverListUpdater) {
        if (this.propertiesFactory.isSet(ILoadBalancer.class, name)) {
            return this.propertiesFactory.get(ILoadBalancer.class, config, name);
        }
        return new ZoneAwareLoadBalancer<>(config, rule, ping, serverList,
                serverListFilter, serverListUpdater);
    }

    @Bean
    @ConditionalOnMissingBean
    @SuppressWarnings("unchecked")
    public ServerListFilter<Server> ribbonServerListFilter(IClientConfig config) {
        if (this.propertiesFactory.isSet(ServerListFilter.class, name)) {
            return this.propertiesFactory.get(ServerListFilter.class, config, name);
        }
        ZonePreferenceServerListFilter filter = new ZonePreferenceServerListFilter();
        filter.initWithNiwsConfig(config);
        return filter;
    }

    @Bean
    @ConditionalOnMissingBean
    public RibbonLoadBalancerContext ribbonLoadBalancerContext(ILoadBalancer loadBalancer,
                                                               IClientConfig config, RetryHandler retryHandler) {
        return new RibbonLoadBalancerContext(loadBalancer, config, retryHandler);
    }

    @Bean
    @ConditionalOnMissingBean
    public RetryHandler retryHandler(IClientConfig config) {
        return new DefaultLoadBalancerRetryHandler(config);
    }

    @Bean
    @ConditionalOnMissingBean
    public ServerIntrospector serverIntrospector() {
        return new DefaultServerIntrospector();
    }

    @PostConstruct
    public void preprocess() {
        setRibbonProperty(name, DeploymentContextBasedVipAddresses.key(), name);
    }

    static class OverrideRestClient extends RestClient {

        private IClientConfig config;
        private ServerIntrospector serverIntrospector;

        protected OverrideRestClient(IClientConfig config,
                ServerIntrospector serverIntrospector) {
            super();
            this.config = config;
            this.serverIntrospector = serverIntrospector;
            initWithNiwsConfig(this.config);
        }

        @Override
        public URI reconstructURIWithServer(Server server, URI original) {
            URI uri = updateToSecureConnectionIfNeeded(original, this.config,
                    this.serverIntrospector, server);
            return super.reconstructURIWithServer(server, uri);
        }

        @Override
        protected Client apacheHttpClientSpecificInitialization() {
            ApacheHttpClient4 apache = (ApacheHttpClient4) super.apacheHttpClientSpecificInitialization();
            apache.getClientHandler().getHttpClient().getParams().setParameter(
                    ClientPNames.COOKIE_POLICY, CookiePolicy.IGNORE_COOKIES);
            return apache;
        }

    }

}

如果使用 consul 注册中心的话,默认实现如下:

@Configuration
public class ConsulRibbonClientConfiguration {
    @Autowired
    private ConsulClient client;

    @Value("${ribbon.client.name}")
    private String serviceId = "client";

    protected static final String VALUE_NOT_SET = "__not__set__";

    protected static final String DEFAULT_NAMESPACE = "ribbon";

    public ConsulRibbonClientConfiguration() {
    }

    public ConsulRibbonClientConfiguration(String serviceId) {
        this.serviceId = serviceId;
    }

    @Bean
    @ConditionalOnMissingBean
    public ServerList<?> ribbonServerList(IClientConfig config, ConsulDiscoveryProperties properties) {
        ConsulServerList serverList = new ConsulServerList(client, properties);
        serverList.initWithNiwsConfig(config);
        return serverList;
    }

    @Bean
    @ConditionalOnMissingBean
    public ServerListFilter<Server> ribbonServerListFilter() {
        return new HealthServiceServerListFilter();
    }

    @Bean
    @ConditionalOnMissingBean
    public IPing ribbonPing() {
        return new ConsulPing();
    }

    @Bean
    @ConditionalOnMissingBean
    public ConsulServerIntrospector serverIntrospector() {
        return new ConsulServerIntrospector();
    }

    @PostConstruct
    public void preprocess() {
        setProp(this.serviceId, DeploymentContextBasedVipAddresses.key(), this.serviceId);
        setProp(this.serviceId, EnableZoneAffinity.key(), "true");
    }

    protected void setProp(String serviceId, String suffix, String value) {
        // how to set the namespace properly?
        String key = getKey(serviceId, suffix);
        DynamicStringProperty property = getProperty(key);
        if (property.get().equals(VALUE_NOT_SET)) {
            ConfigurationManager.getConfigInstance().setProperty(key, value);
        }
    }

    protected DynamicStringProperty getProperty(String key) {
        return DynamicPropertyFactory.getInstance().getStringProperty(key, VALUE_NOT_SET);
    }

    protected String getKey(String serviceId, String suffix) {
        return serviceId + "." + DEFAULT_NAMESPACE + "." + suffix;
    }

}

可以看到,改写了几个 bean 的默认实现:

  • ServerList:ConsulServerList;
  • ServerListFilter:HealthServiceServerListFilter;
  • IPing:ConsulPing。

继续查看源码,最终定位到由 IRule 接口的 choose(Object key) 方法来选择服务实例。

public class ZoneAwareLoadBalancer<T extends Server> extends DynamicServerListLoadBalancer<T> {
    @Override
    public Server chooseServer(Object key) {
        if (!ENABLED.get() || getLoadBalancerStats().getAvailableZones().size() <= 1) {
            logger.debug("Zone aware logic disabled or there is only one zone");
            return super.chooseServer(key);
        }
        ...
    }
    ...
}

public class BaseLoadBalancer extends AbstractLoadBalancer implements
        PrimeConnections.PrimeConnectionListener, IClientConfigAware {
    public Server chooseServer(Object key) {
        if (counter == null) {
            counter = createCounter();
        }
        counter.increment();
        if (rule == null) {
            return null;
        } else {
            try {
                return rule.choose(key);
            } catch (Exception e) {
                logger.warn("LoadBalancer [{}]:  Error choosing server for key {}", name, key, e);
                return null;
            }
        }
    }
}

public interface IRule {
    public Server choose(Object key);  
}

来看下 IRule 接口的实现类,

47_3.png

默认的 ZoneAvoidanceRule 是根据 zone 区域以及可用性来进行选择。

ZoneAvoidanceRule 继承了 PredicateBasedRule 类,

public abstract class PredicateBasedRule extends ClientConfigEnabledRoundRobinRule {
    @Override
    public Server choose(Object key) {
        ILoadBalancer lb = getLoadBalancer();
        // lb.getAllServers(): 获取所有服务实例
        // 然后根据 serviceId 以及负载均衡策略选择实例
        Optional<Server> server =   getPredicate().chooseRoundRobinAfterFiltering(lb.getAllServers(), key);
        if (server.isPresent()) {
            return server.get();
        } else {
            return null;
        }       
    }
}

// 获取所有服务实例方法如下
public class BaseLoadBalancer extends AbstractLoadBalancer implements
        PrimeConnections.PrimeConnectionListener, IClientConfigAware {
    @Monitor(name = PREFIX + "AllServerList", type = DataSourceType.INFORMATIONAL)
    protected volatile List<Server> allServerList = Collections
            .synchronizedList(new ArrayList<Server>());

    // 直接返回 allServerList,说明在生成 BaseLoadBalancer 对象的时候已经完成了初始化操作
    @Override
    public List<Server> getAllServers() {
        return Collections.unmodifiableList(allServerList);
    }
}

看下 ZoneAwareLoadBalancer bean 是如何生成的:

public class RibbonClientConfiguration {
    @Bean
    @ConditionalOnMissingBean
    public ILoadBalancer ribbonLoadBalancer(IClientConfig config,
            ServerList<Server> serverList, ServerListFilter<Server> serverListFilter,
            IRule rule, IPing ping, ServerListUpdater serverListUpdater) {
        if (this.propertiesFactory.isSet(ILoadBalancer.class, name)) {
            return this.propertiesFactory.get(ILoadBalancer.class, config, name);
        }
        return new ZoneAwareLoadBalancer<>(config, rule, ping, serverList,
                serverListFilter, serverListUpdater);
    }
}

public class ZoneAwareLoadBalancer<T extends Server> extends DynamicServerListLoadBalancer<T> {
    public ZoneAwareLoadBalancer(IClientConfig clientConfig, IRule rule,
                                 IPing ping, ServerList<T> serverList, ServerListFilter<T> filter,
                                 ServerListUpdater serverListUpdater) {
        super(clientConfig, rule, ping, serverList, filter, serverListUpdater);
    }
}

public class DynamicServerListLoadBalancer<T extends Server> extends BaseLoadBalancer {
    public DynamicServerListLoadBalancer(IClientConfig clientConfig, IRule rule, IPing ping,
                                         ServerList<T> serverList, ServerListFilter<T> filter,
                                         ServerListUpdater serverListUpdater) {
        super(clientConfig, rule, ping);
        this.serverListImpl = serverList;
        this.filter = filter;
        this.serverListUpdater = serverListUpdater;
        if (filter instanceof AbstractServerListFilter) {
            ((AbstractServerListFilter) filter).setLoadBalancerStats(getLoadBalancerStats());
        }
        // 初始化操作
        restOfInit(clientConfig);
    }
}

最终定位到了 restOfInit(clientConfig) 方法,继续分析该方法:

public class DynamicServerListLoadBalancer<T extends Server> extends BaseLoadBalancer {
    void restOfInit(IClientConfig clientConfig) {
        boolean primeConnection = this.isEnablePrimingConnections();
        this.setEnablePrimingConnections(false);
        // enableAndInitLearnNewServersFeature() 方法里面定义了一个定时任务,会定时执行 updateListOfServers()
        enableAndInitLearnNewServersFeature();

        // 立刻执行
        updateListOfServers();

        if (primeConnection && this.getPrimeConnections() != null) {
            this.getPrimeConnections()
                    .primeConnections(getReachableServers());
        }
        this.setEnablePrimingConnections(primeConnection);
        LOGGER.info("DynamicServerListLoadBalancer for client {} initialized: {}", clientConfig.getClientName(), this.toString());
    }

    public void enableAndInitLearnNewServersFeature() {
        LOGGER.info("Using serverListUpdater {}", serverListUpdater.getClass().getSimpleName());
        serverListUpdater.start(updateAction);
    }

    @VisibleForTesting
    public void updateListOfServers() {
        List<T> servers = new ArrayList();
        if (this.serverListImpl != null) {
            // 1. 获取所有服务器列表,调用 ConsulServerList 类
            // ConsulServerList 的实现是通过调用 consul 提供的 http 接口:/health/service/:service,通过设置 passing(指定是否需要返回所有健康检查都通过的实例)参数为 false 返回所有实例
            servers = this.serverListImpl.getUpdatedListOfServers();
            LOGGER.debug("List of Servers for {} obtained from Discovery client: {}", this.getIdentifier(), servers);
            if (this.filter != null) {
                // 2. 获取过滤后的服务器列表(返回通过健康检查的服务实例)
                // 调用 HealthServiceServerListFilter 类
                servers = this.filter.getFilteredListOfServers((List)servers);
                LOGGER.debug("Filtered List of Servers for {} obtained from Discovery client: {}", this.getIdentifier(), servers);
            }
        }

        // 3. 更新可用的服务实例
        this.updateAllServerList((List)servers);
    }

    protected void updateAllServerList(List<T> ls) {
        // other threads might be doing this - in which case, we pass
        if (serverListUpdateInProgress.compareAndSet(false, true)) {
            try {
                for (T s : ls) {
                    s.setAlive(true); // set so that clients can start using these
                                      // servers right away instead
                                      // of having to wait out the ping cycle.
                }
                setServersList(ls);
                super.forceQuickPing();
            } finally {
                serverListUpdateInProgress.set(false);
            }
        }
    }
}

public class PollingServerListUpdater implements ServerListUpdater {
    @Override
    public synchronized void start(final UpdateAction updateAction) {
        if (isActive.compareAndSet(false, true)) {
            final Runnable wrapperRunnable = new Runnable() {
                @Override
                public void run() {
                    if (!isActive.get()) {
                        if (scheduledFuture != null) {
                            scheduledFuture.cancel(true);
                        }
                        return;
                    }
                    try {
                        updateAction.doUpdate();
                        lastUpdated = System.currentTimeMillis();
                    } catch (Exception e) {
                        logger.warn("Failed one update cycle", e);
                    }
                }
            };
            // 定义了一个线程组,用来执行定时任务,线程名为 PollingServerListUpdater-%d
            // 初次执行延迟时间为 1s,之后执行间隔时间默认为 30s
            // 定时任务主要是上面的 updateAction.doUpdate() 方法
            scheduledFuture = getRefreshExecutor().scheduleWithFixedDelay(
                    wrapperRunnable,
                    initialDelayMs,
                    refreshIntervalMs,
                    TimeUnit.MILLISECONDS
            );
        } else {
            logger.info("Already active, no-op");
        }
    }

    // 定时任务的间隔时间优先从 clientConfig 中获取,如果没有设置的话,默认为 30s
    private static long getRefreshIntervalMs(IClientConfig clientConfig) {
        return clientConfig.get(CommonClientConfigKey.ServerListRefreshInterval, LISTOFSERVERS_CACHE_REPEAT_INTERVAL);
    }
}

以上代码的逻辑是:通过调用 restOfInit 方法对所有服务实例列表、可用服务实例列表等进行初始化;并且通过在 PollingServerListUpdater 类中设置了一个线程组执行定时任务,默认间隔 30s,用于更新所有服务实例以及通过健康检查的实例列表。

以上代码用到了 IRule,ServerList,ServerListFilter 等接口,那么 IPing 的作用又是什么呢?

继续查看源码,

public class BaseLoadBalancer extends AbstractLoadBalancer implements
        PrimeConnections.PrimeConnectionListener, IClientConfigAware {
    // 1. 构造函数中有个设置 setupPingTask 方法
    public BaseLoadBalancer() {
        this.name = DEFAULT_NAME;
        this.ping = null;
        setRule(DEFAULT_RULE);
        setupPingTask();
        lbStats = new LoadBalancerStats(DEFAULT_NAME);
    }

    // 2. 该方法中针对每个服务都设置了一个线程,定时执行 PingTask
    void setupPingTask() {
        // 可以跳过
        if (canSkipPing()) {
            return;
        }
        if (lbTimer != null) {
            lbTimer.cancel();
        }
        lbTimer = new ShutdownEnabledTimer("NFLoadBalancer-PingTimer-" + name,
                true);
        lbTimer.schedule(new PingTask(), 0, pingIntervalSeconds * 1000); // 默认10s
        forceQuickPing();
    }

    class PingTask extends TimerTask {
        public void run() {
            try {
                new Pinger(pingStrategy).runPinger();
            } catch (Exception e) {
                logger.error("LoadBalancer [{}]: Error pinging", name, e);
            }
        }
    }

    class Pinger {
        // 3. 具体的 PingTask 内容
        public void runPinger() throws Exception {
            if (!pingInProgress.compareAndSet(false, true)) { 
                return; // Ping in progress - nothing to do
            }

            Server[] allServers = null;
            boolean[] results = null;

            Lock allLock = null;
            Lock upLock = null;

            try {
                allLock = allServerLock.readLock();
                allLock.lock();
                // 3.1 如果采用注册中心的话,allServerList 是从注册中心获取的服务器列表
                allServers = allServerList.toArray(new Server[allServerList.size()]);
                allLock.unlock();

                int numCandidates = allServers.length;
                // 3.2 检查每个服务实例的健康状态,健康为 true
                // DynamicServerListLoadBalancer 中的 updateListOfServers 设置了 allServers 为通过健康检查的服务实例
                results = pingerStrategy.pingServers(ping, allServers);

                final List<Server> newUpList = new ArrayList<Server>();
                final List<Server> changedServers = new ArrayList<Server>();

                for (int i = 0; i < numCandidates; i++) {
                    boolean isAlive = results[i];
                    Server svr = allServers[i];
                    boolean oldIsAlive = svr.isAlive();

                    svr.setAlive(isAlive);
                    // 3.3 比较和之前的状态是否不一样
                    if (oldIsAlive != isAlive) {
                        changedServers.add(svr);
                        logger.debug("LoadBalancer [{}]:  Server [{}] status changed to {}", 
                            name, svr.getId(), (isAlive ? "ALIVE" : "DEAD"));
                    }

                    // 3.4 更新本地变量 newUpList
                    if (isAlive) {
                        newUpList.add(svr);
                    }
                }
                upLock = upServerLock.writeLock();
                upLock.lock();
                // 3.5 更新可用的服务实例列表
                upServerList = newUpList;
                upLock.unlock();

                notifyServerStatusChangeListener(changedServers);
            } finally {
                pingInProgress.set(false);
            }
        }
    }
}

经过以上代码,得出的流程大概是:

  • PollingServerListUpdater 以及 DynamicServerListLoadBalancer 负责每隔 30s 从注册中心获取所有的服务实例,并且筛选出状态正常的实例加入到 allServerList 以及 upServerList 中;
  • BaseLoadBalancer 中的 setupPingTask() 方法负责每隔 10s “ping” 下这些 allServerList 中的服务器,如果状态异常,则从 upServerList 中剔除。

如果有服务实例挂了,给人的感觉是最多会有 10s 左右的时间部分请求可能会报错,很不幸,这是不对的。最多有可能达到 30s 左右。

深入 3.2 results = pingerStrategy.pingServers(ping, allServers) 这行源码,最终定位到(如果采用 ConsulPing 的话):

public class ConsulPing implements IPing {
    @Override
    public boolean isAlive(Server server) {
        boolean isAlive = true;

        if (server != null && server instanceof ConsulServer) {
            ConsulServer consulServer = (ConsulServer) server;
            return consulServer.isPassingChecks();
        }

        return isAlive;
    }
}

这里的入参是 DynamicServerListLoadBalancer 返回的状态正常的服务器列表,那么其执行 ConsulPing 的 isAlive 方法,其结果肯定为 true;原因在于调用 ConsulPing 的 isAlive 方法,并没有执行真正意义上的 “ping” 操作(比如再次调用 consul 的 api 接口去确认该服务实例是否正常),而是直接返回 ConsulServer 对象中的健康检查状态。

三、不算问题的问题

1、 BaseLoadBalancer 类,为每个服务都设置了一个线程用于定期执行 PingTask 任务;作为网关,如果需要对接上百个微服务的话,就会产生上百个线程,而每个线程大部分时间处于闲置状态。可以考虑设置一个较小的线程池替代。
2、 Consul 作为注册中心的话,ConsulPing 的实现没有起到期望的作用。可以考虑跳过 Ping 检查(比如设置默认 IPing 为 DummyPing),通过缩小 PollingServerListUpdater 中定时任务的执行间隔来降低个别服务实例掉线的影响。

文章永久链接:https://tech.souyunku.com/25834

未经允许不得转载:搜云库技术团队 » Ribbon源码解析

JetBrains 全家桶,激活、破解、教程

提供 JetBrains 全家桶激活码、注册码、破解补丁下载及详细激活教程,支持 IntelliJ IDEA、PyCharm、WebStorm 等工具的永久激活。无论是破解教程,还是最新激活码,均可免费获得,帮助开发者解决常见激活问题,确保轻松破解并快速使用 JetBrains 软件。获取免费的破解补丁和激活码,快速解决激活难题,全面覆盖 2024/2025 版本!

联系我们联系我们