百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术文章 > 正文

再不看就删了!超详细的Ribbon源码解析

zhezhongyun 2025-06-10 04:05 1 浏览

Ribbon简介

什么是Ribbon?

Ribbon是springcloud下的客户端负载均衡器,消费者在通过服务别名调用服务时,需要通过Ribbon做负载均衡获取实际的服务调用地址,然后通过httpclient的方式进行本地RPC远程调用。

Ribbon原理

Ribbon负载均衡算法主要是轮询算法,分为以下几步:

  1. 根据服务别名,从eureka获取服务提供者的列表
  2. 将列表缓存到本地
  3. 根据具体策略获取服务提供者

Ribbon的核心是负载均衡管理,另还有5个大功能点。如下图:

源码分析

事前准备

  1. 先搭建一个SpringCloud的项目,也可以从我的github上下载。地址:github.com/mmcLine/spr…
  2. 拷贝以下代码 @Configuration public class RestTemplateConfiguration { @Bean @LoadBalanced public RestTemplate getRestTemplate(){ return new RestTemplate(); } } @Autowired private RestTemplate restTemplate; @GetMapping("/testRibbon/{id}") public User getTodayStatistic(@PathVariable("id") Integer id){ String url ="http://STUDY-USER/user/getUserById?id="+id; return restTemplate.getForObject(url, User.class); } 复制代码

代码都准备好了,可以开始分析了。

  1. 执行调用

http://localhost:8005/trade/testRibbon/2

为什么这么就能调用到服务提供者的方法?

打断点,可以看到restTemplate里有两个拦截器,根据名字可以推断
RetryLoadBalancerInterceptor是关键。

跟踪到
RetryLoadBalancerInterceptor类

@Override
	public ClientHttpResponse intercept(final HttpRequest request, final byte[] body,
										final ClientHttpRequestExecution execution) throws IOException {
		final URI originalUri = request.getURI();
		//获取到service的name
		final String serviceName = originalUri.getHost();
		Assert.state(serviceName != null, "Request URI does not contain a valid hostname: " + originalUri);
		//根据serviceName和LoadBalancerClient,LoadBalancedRetryPolicy里面包含了RibbonLoadBalancerContext和ServiceInstanceChooser
		final LoadBalancedRetryPolicy retryPolicy = lbRetryFactory.createRetryPolicy(serviceName,
				loadBalancer);
		RetryTemplate template = createRetryTemplate(serviceName, request, retryPolicy);
		//执行方法会进入到doExecute方法
		return template.execute(context -> {
			ServiceInstance serviceInstance = null;
			if (context instanceof LoadBalancedRetryContext) {
				LoadBalancedRetryContext lbContext = (LoadBalancedRetryContext) context;
				serviceInstance = lbContext.getServiceInstance();
			}
			if (serviceInstance == null) {
				serviceInstance = loadBalancer.choose(serviceName);
			}
			ClientHttpResponse response = RetryLoadBalancerInterceptor.this.loadBalancer.execute(
					serviceName, serviceInstance,
					requestFactory.createRequest(request, body, execution));
			int statusCode = response.getRawStatusCode();
			if (retryPolicy != null && retryPolicy.retryableStatusCode(statusCode)) {
				byte[] bodyCopy = StreamUtils.copyToByteArray(response.getBody());
				response.close();
				throw new ClientHttpResponseStatusCodeException(serviceName, response, bodyCopy);
			}
			return response;
		}, new LoadBalancedRecoveryCallback<ClientHttpResponse, ClientHttpResponse>() {
			//This is a special case, where both parameters to LoadBalancedRecoveryCallback are
			//the same.  In most cases they would be different.
			@Override
			protected ClientHttpResponse createResponse(ClientHttpResponse response, URI uri) {
				return response;
			}
		});
	}
复制代码

doExecute方法:

protected <T, E extends Throwable> T doExecute(RetryCallback<T, E> retryCallback,
			RecoveryCallback<T> recoveryCallback, RetryState state)
			throws E, ExhaustedRetryException {
        //省略部分代码

			/*
			 * We allow the whole loop to be skipped if the policy or context already
			 * forbid the first try. This is used in the case of external retry to allow a
			 * recovery in handleRetryExhausted without the callback processing (which
			 * would throw an exception).
			 */
			 //执行逻辑的关键方法
			while (canRetry(retryPolicy, context) && !context.isExhaustedOnly()) {

				}
复制代码

继续跟踪canRetry方法

  @Override
    public boolean canRetry(RetryContext context) {
        LoadBalancedRetryContext lbContext = (LoadBalancedRetryContext)context;
        if(lbContext.getRetryCount() == 0  && lbContext.getServiceInstance() == null) {
            //We haven't even tried to make the request yet so return true so we do
            //设置选中的服务提供者
            lbContext.setServiceInstance(serviceInstanceChooser.choose(serviceName));
            return true;
        }
        return policy.canRetryNextServer(lbContext);
    }
复制代码

我们跟踪
serviceInstanceChooser.choose(serviceName)看看怎么通过serviceName选服务提供者的。

@Override
	public ServiceInstance choose(String serviceId) {
	    //选择server
		Server server = getServer(serviceId);
		if (server == null) {
			return null;
		}
		return new RibbonServer(serviceId, server, isSecure(server, serviceId),
				serverIntrospector(serviceId).getMetadata(server));
	}
复制代码

跟踪getServer方法

protected Server getServer(ILoadBalancer loadBalancer) {
		if (loadBalancer == null) {
			return null;
		}
		//可以看出是loadBalancer在选择
		return loadBalancer.chooseServer("default"); // TODO: better handling of key
	}
复制代码

继续深入

 public Server chooseServer(Object key) {
        if (counter == null) {
            counter = createCounter();
        }
        //有一个调用次数在+1
        counter.increment();
        if (rule == null) {
            return null;
        } else {
            try {
                //委托给了IRule,所以Irule是负载均衡的关键,最后来总结
                return rule.choose(key);
            } catch (Exception e) {
                logger.warn("LoadBalancer [{}]:  Error choosing server for key {}", name, key, e);
                return null;
            }
        }
    }
复制代码

查看Irule的实现

 public Server choose(Object key) {
        ILoadBalancer lb = getLoadBalancer();
        //lb.getAllServers里面是所有的服务提供者列表
        Optional<Server> server = getPredicate().chooseRoundRobinAfterFiltering(lb.getAllServers(), key);
        if (server.isPresent()) {
            return server.get();
        } else {
            return null;
        }       
    }
复制代码

跟踪
chooseRoundRobinAfterFiltering方法

public Optional<Server> chooseRoundRobinAfterFiltering(List<Server> servers, Object loadBalancerKey) {
        //拿到筛选后的servers
        List<Server> eligible = getEligibleServers(servers, loadBalancerKey);
        if (eligible.size() == 0) {
            return Optional.absent();
        }
        //incrementAndGetModulo方法拿到下标,然后根据list.get取到一个服务
        return Optional.of(eligible.get(incrementAndGetModulo(eligible.size())));
    }
复制代码

至此就拿到了具体的服务提供者。

但是到这里还有个问题?

  1. 怎么根据服务名拿到server的?

有一个ServerList接口是用于拿到服务列表的。我们使用的loadBalancer(ZoneAwareLoadBalancer)的父类
DynamicServerListLoadBalancer类的构造方法里,有一个restOfinit方法

public DynamicServerListLoadBalancer(IClientConfig clientConfig, IRule rule, IPing ping,
                                         ServerList<T> serverList, ServerListFilter<T> filter,
                                         ServerListUpdater serverListUpdater) {
        super(clientConfig, rule, ping);
        this.serverListImpl = serverList;
        this.filter = filter;
        this.serverListUpdater = serverListUpdater;
        if (filter instanceof AbstractServerListFilter) {
            ((AbstractServerListFilter) filter).setLoadBalancerStats(getLoadBalancerStats());
        }
        restOfInit(clientConfig);
    }
复制代码

跟踪restOfInit方法

void restOfInit(IClientConfig clientConfig) {
        boolean primeConnection = this.isEnablePrimingConnections();
        // turn this off to avoid duplicated asynchronous priming done in BaseLoadBalancer.setServerList()
        this.setEnablePrimingConnections(false);
        enableAndInitLearnNewServersFeature();
        
        //用于获取所有的serverList
        updateListOfServers();
        if (primeConnection && this.getPrimeConnections() != null) {
            this.getPrimeConnections()
                    .primeConnections(getReachableServers());
        }
        this.setEnablePrimingConnections(primeConnection);
        LOGGER.info("DynamicServerListLoadBalancer for client {} initialized: {}", clientConfig.getClientName(), this.toString());
    }
复制代码

继续跟踪updateListOfServers方法

 public void updateListOfServers() {
        List<T> servers = new ArrayList<T>();
        if (serverListImpl != null) {
            //查询serverList
            servers = serverListImpl.getUpdatedListOfServers();
            LOGGER.debug("List of Servers for {} obtained from Discovery client: {}",
                    getIdentifier(), servers);

            if (filter != null) {
                servers = filter.getFilteredListOfServers(servers);
                LOGGER.debug("Filtered List of Servers for {} obtained from Discovery client: {}",
                        getIdentifier(), servers);
            }
        }
        updateAllServerList(servers);
    }
复制代码

继续跟踪源码到obtainServersViaDiscovery方法,

private List<DiscoveryEnabledServer> obtainServersViaDiscovery() {
        List<DiscoveryEnabledServer> serverList = new ArrayList<DiscoveryEnabledServer>();
    //eurekaClientProvider.get()会去获取EurekaClient
        if (eurekaClientProvider == null || eurekaClientProvider.get() == null) {
            logger.warn("EurekaClient has not been initialized yet, returning an empty list");
            return new ArrayList<DiscoveryEnabledServer>();
        }

        EurekaClient eurekaClient = eurekaClientProvider.get();
        //vipAddresses就是serviceName
        if (vipAddresses!=null){
            for (String vipAddress : vipAddresses.split(",")) {
                // if targetRegion is null, it will be interpreted as the same region of client
                //此处获取到服务的信息
                List<InstanceInfo> listOfInstanceInfo = eurekaClient.getInstancesByVipAddress(vipAddress, isSecure, targetRegion);
                for (InstanceInfo ii : listOfInstanceInfo) {
                    if (ii.getStatus().equals(InstanceStatus.UP)) {

                        if(shouldUseOverridePort){
                            if(logger.isDebugEnabled()){
                                logger.debug("Overriding port on client name: " + clientName + " to " + overridePort);
                            }

                            // copy is necessary since the InstanceInfo builder just uses the original reference,
                            // and we don't want to corrupt the global eureka copy of the object which may be
                            // used by other clients in our system
                            InstanceInfo copy = new InstanceInfo(ii);

                            if(isSecure){
                                ii = new InstanceInfo.Builder(copy).setSecurePort(overridePort).build();
                            }else{
                                ii = new InstanceInfo.Builder(copy).setPort(overridePort).build();
                            }
                        }

                        DiscoveryEnabledServer des = new DiscoveryEnabledServer(ii, isSecure, shouldUseIpAddr);
                        des.setZone(DiscoveryClient.getZone(ii));
                        serverList.add(des);
                    }
                }
                if (serverList.size()>0 && prioritizeVipAddressBasedServers){
                    break; // if the current vipAddress has servers, we dont use subsequent vipAddress based servers
                }
            }
        }
        return serverList;
    }
复制代码

综合上面可以看出,最终是通过eurekaClient去拿到服务列表的。

那么如果服务列表发生变化怎么刷新呢?

是通过CacheRefreshThread在定时线程池里面执行,核心拉取方法是fetchRegistry

Iping

Iping是用于探测服务列表中的服务是否正常,如果不正常,则从eureka拉取服务列表并更新。

在BaseLoadBalancer里面有一个setupPingTask方法,启动定时任务,30秒一次定时向EurekaClient发送“ping”

public BaseLoadBalancer(String name, IRule rule, LoadBalancerStats stats,
            IPing ping, IPingStrategy pingStrategy) {
	
        logger.debug("LoadBalancer [{}]:  initialized", name);
        
        this.name = name;
        this.ping = ping;
        this.pingStrategy = pingStrategy;
        setRule(rule);
        setupPingTask();
        lbStats = stats;
        init();
    }
复制代码

Iping的具体逻辑在PingTask类里。

Irule总结:

Irule是负载均衡的规则:

我这里默认是使用的是ZoneAvoidanceRule,还有很多种策略:

  • RandomRule: 随机
  • RoundRobinRule: 轮询
  • RetryRule: 先按照RoundRobinRule的策略获取服务,如果获取服务失败则在指定时间内会进行重试,获取可用的服务
  • WeightedResponseTimeRule: 对RoundRobinRule的扩展,响应速度越快的实例选择权重越大,越容易被选择
  • BestAvailableRule:会先过滤掉由于多次访问故障而处于断路器跳闸状态的服务,然后选择一个并发量最小的服务
  • AvailabilityFilteringRule:先过滤掉故障实例,再选择并发较小的实例
  • ZoneAvoidanceRule:默认规则,复合判断server所在区域的性能和server的可用性选择服务器

properties配置方式如下:
STUDY-USER是服务名

STUDY-USER.ribbon.NFLoadBalancerRuleClassName=com.netflix.loadbalancer.RoundRobinRule


作者:别掉头发了
链接:
https://juejin.cn/post/7020403359270043685

来源:稀土掘金
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。


相关推荐

屏幕属性详解:DCI-P3、对比度、色域、Nit

屏幕属性详解:DCI-P3、对比度、色域、Nit---一、DCI-P3(色域标准)1.定义DCI-P3是由美国电影工业制定的广色域标准,覆盖CIE1931色彩空间的约96%,尤其强化红色和绿...

Qt属性系统(Qt Property System)(qt的pro文件属性说明)

Qt提供了巧妙的属性系统,它与某些编译器支持的属性系统相似。然而,作为平台和编译器无关的库,Qt不能够依赖于那些非标准的编译器特性,比如__property或者[property]。Qt的解决方案...

用 Cursor 开发 10 +项目后,汇总了40 多条提示词

每次跟新手讲解Cursor的使用技巧时,他们总会说:"哎呀,这工具好是好,就是不知道该怎么跟它对话..."是的,不少小伙伴都在为这个困扰:手握着强大的AI编程工具,却像拿着一把...

Excel常用技能分享与探讨(5-宏与VBA简介 VBA与数据库)

在VBA(VisualBasicforApplications)中使用数据库(如Access、SQLServer、MySQL等)具有以下优点,适用于需要高效数据管理和复杂业务逻辑的场景:1....

汽车诊断协议J1850-VPW 测试(汽车诊断协议工程师干啥的)

硬件说明:MCU:GD32C103120M,128K,32kRAM.输入:USB5V.OBD功能口定义:OBD(2,10)VPWM、OBD7(K线)、OBD6(CANH)、OBD14(...

ssl证书有哪些类型?有什么区别?(ssl证书的区别)

以下是**Windows服务器常用快捷命令大全(100条)**,涵盖系统管理、网络诊断、安全维护、文件操作等场景,适合管理员快速操作和故障排查:---###**一、系统信息与配置**1.**`s...

嵌入式工程师竟然看不懂这些专业语句,那真别怪人说你菜

本文出自公众号:硬件笔记本,原创文章,转载请注明出处AASIC(专用集成电路)Application-SpecificIntegratedCircuit.Apieceofcustom-de...

同星提供SecOC信息安全解决方案(上海同星)

需求背景在传统的汽车电子结构中,车内的电控单元(ECU)数量和复杂性受到限制,通信带宽也受到限制。因此,人们普遍认为车内各个ECU之间的通信是可靠的。只要ECU节点接收到相应的消息,就会对其进行处理。...

H3C MSR系列路由器EAA监控策略配置举例

1简介本文档介绍使用EAA监控策略进行网络监控的典型配置举例。2配置前提本文档适用于使用ComwareV7软件版本的MSR系列路由器,如果使用过程中与产品实际情况有差异,请参考相关产品手册,或以...

网卡DM9000裸机驱动开发详解(网卡驱动9462)

一、网卡1.概念网卡是一块被设计用来允许计算机在计算机网络上进行通讯的计算机硬件。由于其拥有MAC地址,因此属于OSI模型的第2层。它使得用户可以通过电缆或无线相互连接。每一个网卡都有一个被称为MA...

如何检验自己的手机有没有问题,实操干货!包学会

此贴供已购买二手苹果或正打算购买的朋友参考,已入新机的朋友也可以验一下你的手机有没有问题。无废话,直接上干货以小编随机挑的一台12为例1.外观:拿到手机先看整体外观,是否与商家描述的外观一致,磕碰和划...

再不看就删了!超详细的Ribbon源码解析

Ribbon简介什么是Ribbon?Ribbon是springcloud下的客户端负载均衡器,消费者在通过服务别名调用服务时,需要通过Ribbon做负载均衡获取实际的服务调用地址,然后通过httpcl...

细数ThreadLocal三大坑,内存泄露仅是小儿科

我在参加CodeReview的时候不止一次听到有同学说:我写的这个上下文工具没问题,在线上跑了好久了。其实这种想法是有问题的,ThreadLocal写错难,但是用错就很容易,本文将会详细总结Thre...

微服务架构下的Java服务监控:让程序“健康自检”不再难

微服务架构下的Java服务监控:让程序“健康自检”不再难引言:为什么需要服务监控?在微服务架构的世界里,我们的系统由众多小型且独立的服务组成。想象一下,这些服务就像一群跳舞的小精灵,在各自的舞台上表演...

6. 并发编程(并发编程模型)

本章深入解析Go语言并发编程核心机制,涵盖调度原理、通信模式及工程实践,结合性能优化与陷阱规避策略。6.1Goroutine基础6.1.1创建与调度//启动goroutinegofunc()...