SpringCloud Ribbon详解

  |   0 评论   |   0 浏览

1 什么是Ribbon

学习一项技术时,我们应当先了解这项技术有哪些功能,能帮我们实现什么样的需求。在微服务架构的项目中,服务之间的调用是必然存在的,同时为了避免单点故障,每一个服务又会部署多个节点,简单的示例如下所示:

上图中service1调用service2的服务时,需要从service2的服务列表中选择出一个节点,拿到其对应的ip和端口,然后再发起对应的请求,这便是所谓的负载均衡算法,Ribbon便是一个帮我们实现了这个功能的组件。

2 Ribbon的使用

知道了Ribbon的功能后,我们通过一个简单的示例来学习下Ribbon的使用,项目结构比较简单,有两个模块如下:

  • user-service 一个SpringBoot编写的web项目,里面定义了一个接口。
  • ribbon-sample 一个SpringBoot编写的web项目,引入了Ribbon,调用user-service中的接口

我们将user-service修改端口后启动两个用来模拟多节点,结果入下图所示:

示例代码调用关系

2.1 user-service模块

这个模块就是一个SpringBoot编写的web服务,里面提供了一个接口,如下:

@RestController
@RequestMapping(value = "/user")
public class UserController {
    @Value("${server.port}")
    private int port;
  
    @GetMapping(value = "/{name}")
    public String getByName(@PathVariable String name) {
        System.out.println(port);
        return name;
    }
}

2.2 ribbon-sample模块

引入如下Ribbon的依赖:

<!-- https://mvnrepository.com/artifact/org.springframework.cloud/spring-cloud-starter-netflix-ribbon -->
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-netflix-ribbon</artifactId>
    <version>2.2.9.RELEASE</version>
</dependency>

之后也在这个项目中定义一个接口,并使用RestTemplate调用user-service中的接口,为了简便,我直接将接口定义在了启动类中,代码如下:

@RestController
@SpringBootApplication
public class RibbonServiceBootstrap {
    public static void main(String[] args) {
        SpringApplication.run(RibbonServiceBootstrap.class, args);
    }

    @Bean
    @LoadBalanced
    public RestTemplate restTemplate(RestTemplateBuilder restTemplateBuilder) {
        return restTemplateBuilder.build();
    }

    @Resource
    private RestTemplate restTemplate;

    @GetMapping(value = "/ribbon/user/{name}")
    public String getUserByName(@PathVariable String name) {
        return restTemplate.getForObject("http://user-service/user/" + name, String.class);
    }
}

application.yml配置文件中配置上user-service的地址,内容如下:

user-service:
ribbon:
listOfServers:localhost:8080,localhost:8081

注意上面的代码哈,在将RestTemplate注入Spring容器中的方法上添加了@LoadBalanced注解,我们访问user-service中的接口使用的是服务名,而不是ip:port的形式。

2.3 测试

代码编写完后,我们启动这两个服务,其中user-service需要启动两个,端口分别为80808081,然后在浏览器中输入ribbon-sample服务中接口http://localhost:8082/ribbon/user/zhangsan,发现可以正常访问:

可以多点几次,观察user-service两个服务控制台的输出,会发现这两个服务会交替的打印自己的端口号,大家应该很容易想到这是一种轮询的方式。

3 Ribbon的实现原理

在上面的示例中我们只需要在RestTemplate的创建方法上加一个@LoadBalanced注解便可以使用Ribbon相应的功能,这个注解的定义如下:

@Target({ ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD })
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@Qualifier
public @interface LoadBalanced {

}

通过上面的代码我们能够看出在这个注解中有@Qualifier注解,这个注解的作用是给加入到Spring IOC容器中的对象增加一个标识,然后我们可以获取相应标识的对象。

3.1 加载逻辑

了解了@LoadBalanced注解后,我们看看服务启动阶段是如何加载相关类的,这个加载的入口为org.springframework.cloud.netflix.ribbon.RibbonAutoConfiguration,当项目启动时会自动加载这个类,这里涉及到SpringBoot的自动装配相关的内容,不了解的同学可以自行去了解下,这个类中会创建LoadBalancerClient对象并放入容器中,如下:

@Bean
@ConditionalOnMissingBean(LoadBalancerClient.class)
public LoadBalancerClient loadBalancerClient() {
    return new RibbonLoadBalancerClient(springClientFactory());
}

在这个类上被@AutoConfigureBefore注解进行了修饰,在这个注解中配置的类会先被加载,其源码如下:

@AutoConfigureBefore({ LoadBalancerAutoConfiguration.class,
        AsyncLoadBalancerAutoConfiguration.class })

通过上面的代码我们看到会加载org.springframework.cloud.client.loadbalancer.LoadBalancerAutoConfiguration类。在这个类中我们会发现有如下一个变量:

@LoadBalanced
@Autowired(required = false)
private List<RestTemplate> restTemplates = Collections.emptyList();

这个变量中存储的是被@LoadBalanced注解修饰的变量。

这个类里都是创建对象并存入Spring IOC容器中,主要实例化的三个类如下:

  • LoadBalancerInterceptor
  • RestTemplateCustomizer
  • SmartInitializingSingleton

其逻辑如下:

@Bean
public LoadBalancerInterceptor loadBalancerInterceptor(LoadBalancerClient loadBalancerClient,
                                                       LoadBalancerRequestFactory requestFactory) {
    return new LoadBalancerInterceptor(loadBalancerClient, requestFactory);
}

@Bean
@ConditionalOnMissingBean
public RestTemplateCustomizer restTemplateCustomizer(final LoadBalancerInterceptor loadBalancerInterceptor) {
    return restTemplate -> {
        List<ClientHttpRequestInterceptor> list = new ArrayList<>(restTemplate.getInterceptors());
        list.add(loadBalancerInterceptor);
        restTemplate.setInterceptors(list);
    };
}

@Bean
public SmartInitializingSingleton loadBalancedRestTemplateInitializerDeprecated(
    final ObjectProvider<List<RestTemplateCustomizer>> restTemplateCustomizers) {
    return () -> restTemplateCustomizers.ifAvailable(customizers -> {
        for (RestTemplate restTemplate : LoadBalancerAutoConfiguration.this.restTemplates) {
            for (RestTemplateCustomizer customizer : customizers) {
                customizer.customize(restTemplate);
            }
        }
    });
}

通过上述代码我们可以看到在这个过程中Spring CloudRestTemplate添加上了一个LoadBalancerInterceptor对象,这个对象是一个拦截器,这个类中只有一个intercept()方法,改方法的逻辑如下:

@Override
public ClientHttpResponse intercept(final HttpRequest request, final byte[] body,
                                    final ClientHttpRequestExecution execution) throws IOException {
    // 解析出请求的URI对象
    final URI originalUri = request.getURI();
    // 获取到请求中的host  示例中是user-service
    String serviceName = originalUri.getHost();
    Assert.state(serviceName != null, "Request URI does not contain a valid hostname: " + originalUri);
    return this.loadBalancer.execute(serviceName, this.requestFactory.createRequest(request, body, execution));
}

3.2 执行阶段

使用RestTemplate发送请求时会进入上述拦截器的方法中,我们顺着源码往下走会走到RibbonLoadBalancerClient.execute()方法中,其逻辑如下:

@Override
public <T> T execute(String serviceId, LoadBalancerRequest<T> request)
    throws IOException {
    return execute(serviceId, request, null);
}

public <T> T execute(String serviceId, LoadBalancerRequest<T> request, Object hint)
    throws IOException {
    // 获取LoadBalancer对象
    ILoadBalancer loadBalancer = getLoadBalancer(serviceId);
    // 获取Server对象
    Server server = getServer(loadBalancer, hint);
    if (server == null) {
        throw new IllegalStateException("No instances available for " + serviceId);
    }
    RibbonServer ribbonServer = new RibbonServer(serviceId, server,
                                                 isSecure(server, serviceId),
                                                 serverIntrospector(serviceId).getMetadata(server));

    return execute(serviceId, ribbonServer, request);
}

在上面的代码中会分别获取ILoadBalancerServer对象。

getLoadBalancer的代码这里就不进行粘贴了,这里的逻辑是从Spring上下文中获取对象,最终返回的是一个ZoneAwareLoadBalancer对象。

getServer()方法的逻辑如下:

protected Server getServer(ILoadBalancer loadBalancer, Object hint) {
    if (loadBalancer == null) {
        return null;
    }
    // Use 'default' on a null hint, or just pass it on?
    return loadBalancer.chooseServer(hint != null ? hint : "default");
}

跟随源码发现会调用到BaseLoadBalancer.chooseServer()方法,这个方法的逻辑如下:

public Server chooseServer(Object key) {
    if (counter == null) {
        counter = createCounter();
    }
    counter.increment();
    if (rule == null) {
        return null;
    } else {
        try {
            return rule.choose(key);
        } catch (Exception e) {
            logger.warn("LoadBalancer [{}]:  Error choosing server for key {}", name, key, e);
            return null;
        }
    }
}

通过上面的代码我们发现,实现负载均衡的规则是在IRule中实现的,Ribbon中提供的规则如下:

  • RoundRobbinRule 轮询
  • RandomRule 随机
  • WeightedResponseTimeRule 根据响应权重选择
  • MyGrayBalancerRule
  • RetryRule 轮询并重试规则
  • BestAvailableRule 忽略连接失败的服务器
  • ZoneAvoidanceRule
  • AvailabilityFilteringRule 从可用的服务中进行轮询

3.3 服务列表维护

通过上面的代码,我们知道了负载均衡算法的实现是由IRule中实现的,大家可以自己跟着源码看看具体的实现逻辑。选择使用调用那一个服务,第一步肯定是需要获取到都有哪些服务列表,接下来我们就了解下Ribbon是如何维护服务列表的。

Ribbon中默认的负载均衡算法是轮询,我们继续根据这个规则的选择服务的方法,其源码如下:

@Override
public Server choose(Object key) {
    return choose(getLoadBalancer(), key);
}

public Server choose(ILoadBalancer lb, Object key) {
    if (lb == null) {
        log.warn("no load balancer");
        return null;
    }

    Server server = null;
    int count = 0;
    while (server == null && count++ < 10) {
        // 仍然存活的服务列表
        List<Server> reachableServers = lb.getReachableServers();
        // 所有的服务列表
        List<Server> allServers = lb.getAllServers();
        int upCount = reachableServers.size();
        int serverCount = allServers.size();

        if ((upCount == 0) || (serverCount == 0)) {
            log.warn("No up servers available from load balancer: " + lb);
            return null;
        }

        int nextServerIndex = incrementAndGetModulo(serverCount);
        server = allServers.get(nextServerIndex);

        if (server == null) {
            /* Transient. */
            Thread.yield();
            continue;
        }

        if (server.isAlive() && (server.isReadyToServe())) {
            return (server);
        }

        // Next.
        server = null;
    }

    if (count >= 10) {
        log.warn("No available alive servers after 10 tries from load balancer: "
                 + lb);
    }
    return server;
}

在上面的代码中我们重点关注下reachableServersallServers两个变量的获取,点进去源码,这两个服务列表对应的是BaseLoadBalancer类中的如下两个字段:

@Monitor(name = PREFIX + "AllServerList", type = DataSourceType.INFORMATIONAL)
protected volatile List<Server> allServerList = Collections
    .synchronizedList(new ArrayList<Server>());
@Monitor(name = PREFIX + "UpServerList", type = DataSourceType.INFORMATIONAL)
protected volatile List<Server> upServerList = Collections
    .synchronizedList(new ArrayList<Server>());

这两个属性又是在那里设置上去的呢?大家还记得我们在上面的代码中获取到的ILoadBalancer实例是什么吗?是一个ZoneAwareLoadBalancer对象,这个对象是在RibbonClientConfiguration类中被添加到Spring容器中的,代码如下:

@Bean
@ConditionalOnMissingBean
public ILoadBalancer ribbonLoadBalancer(IClientConfig config,
                                        ServerList<Server> serverList, ServerListFilter<Server> serverListFilter,
                                        IRule rule, IPing ping, ServerListUpdater serverListUpdater) {
    if (this.propertiesFactory.isSet(ILoadBalancer.class, name)) {
        return this.propertiesFactory.get(ILoadBalancer.class, config, name);
    }
    return new ZoneAwareLoadBalancer<>(config, rule, ping, serverList,
                                       serverListFilter, serverListUpdater);
}

public ZoneAwareLoadBalancer(IClientConfig clientConfig, IRule rule,
                             IPing ping, ServerList<T> serverList, ServerListFilter<T> filter,
                             ServerListUpdater serverListUpdater) {
    super(clientConfig, rule, ping, serverList, filter, serverListUpdater);
}

public DynamicServerListLoadBalancer(IClientConfig clientConfig, IRule rule, IPing ping,
                                     ServerList<T> serverList, ServerListFilter<T> filter,
                                     ServerListUpdater serverListUpdater) {
    super(clientConfig, rule, ping);
    this.serverListImpl = serverList;
    this.filter = filter;
    this.serverListUpdater = serverListUpdater;
    if (filter instanceof AbstractServerListFilter) {
        ((AbstractServerListFilter) filter).setLoadBalancerStats(getLoadBalancerStats());
    }
    restOfInit(clientConfig);
}

根据上面源码我们会看到会调用到DynamicServerListLoadBalancer.restOfInit方法中,在这个方法中会维护上述的两个字段,这个方法的源码如下:

void restOfInit(IClientConfig clientConfig) {
    boolean primeConnection = this.isEnablePrimingConnections();
    // turn this off to avoid duplicated asynchronous priming done in BaseLoadBalancer.setServerList()
    this.setEnablePrimingConnections(false);
    // 启动定时任务定时更新服务列表
    enableAndInitLearnNewServersFeature();
	// 获取服务列表
    updateListOfServers();
    if (primeConnection && this.getPrimeConnections() != null) {
        this.getPrimeConnections()
            .primeConnections(getReachableServers());
    }
    this.setEnablePrimingConnections(primeConnection);
    LOGGER.info("DynamicServerListLoadBalancer for client {} initialized: {}", clientConfig.getClientName(), this.toString());
}

在这个方法中主要的逻辑在enableAndInitLearnNewServersFeatureupdateListOfServers两个方法中,接下来我们先看看updateListOfServers方法,这个方法的逻辑如下:

public void updateListOfServers() {
    List<T> servers = new ArrayList<T>();
    if (serverListImpl != null) {
        // 从配置文件中获取  ConfigurationBasedServerList
        servers = serverListImpl.getUpdatedListOfServers();
        LOGGER.debug("List of Servers for {} obtained from Discovery client: {}",
                     getIdentifier(), servers);

        if (filter != null) {
            servers = filter.getFilteredListOfServers(servers);
            LOGGER.debug("Filtered List of Servers for {} obtained from Discovery client: {}",
                         getIdentifier(), servers);
        }
    }
    updateAllServerList(servers);
}

protected void updateAllServerList(List<T> ls) {
    // other threads might be doing this - in which case, we pass
    if (serverListUpdateInProgress.compareAndSet(false, true)) {
        try {
            for (T s : ls) {
                s.setAlive(true); // set so that clients can start using these
                // servers right away instead
                // of having to wait out the ping cycle.
            }
            // 设置allServerList属性  如果可以不用Ping的情况,upServerList = allServerList
            setServersList(ls);
            // 通过Ping规则用来判断存活的服务
            super.forceQuickPing();
        } finally {
            serverListUpdateInProgress.set(false);
        }
    }
}

enableAndInitLearnNewServersFeature方法的逻辑如下:

public void enableAndInitLearnNewServersFeature() {
    LOGGER.info("Using serverListUpdater {}", serverListUpdater.getClass().getSimpleName());
    serverListUpdater.start(updateAction);
}

在往下会调用到PollingServerListUpdater.start方法,这个方法中会启动个定时任务,定时调用updateListOfServers方法,截图如下:

关于服务是否存活的判断在IPing中,由于默认情况都是返回true,我们这里不贴出来了,大家自行查看源码就行。

4 总结
今天的文章到这里就结束了,在本文中主要介绍了下如何使用原生的Ribbon组件进行服务调用,在本篇文章中的示例我们并没有引入注册中心,是把服务列表写死到配置文件中的,之后介绍注册中心时我们再看看Ribbon是如何同注册中心配合使用的。

  • 原生Ribbon的使用
  • 查看源码,找到入口很关键,主要的配置类有RibbonClientConfigurationRibbonAutoConfigurationLoadBalancerAutoConfiguration
  • Ribbon中的核心组件
    • ILoadBalancer 负载均衡器
    • IRule 负载均衡规则
    • ServerList 获取服务列表
    • IPing 用于验证是否存活
  • 服务列表的更新规则

转自https://zhuanlan.zhihu.com/p/509733814?utm_id=0


标题:SpringCloud Ribbon详解
作者:michael
地址:https://blog.junxworks.cn/articles/2023/11/14/1699931314431.html