再不看就删了!超详细的Ribbon源码解析
zhezhongyun 2025-06-10 04:05 40 浏览
Ribbon简介
什么是Ribbon?
Ribbon是springcloud下的客户端负载均衡器,消费者在通过服务别名调用服务时,需要通过Ribbon做负载均衡获取实际的服务调用地址,然后通过httpclient的方式进行本地RPC远程调用。
Ribbon原理
Ribbon负载均衡算法主要是轮询算法,分为以下几步:
- 根据服务别名,从eureka获取服务提供者的列表
- 将列表缓存到本地
- 根据具体策略获取服务提供者
Ribbon的核心是负载均衡管理,另还有5个大功能点。如下图:
源码分析
事前准备
- 先搭建一个SpringCloud的项目,也可以从我的github上下载。地址:github.com/mmcLine/spr…
- 拷贝以下代码 @Configuration public class RestTemplateConfiguration { @Bean @LoadBalanced public RestTemplate getRestTemplate(){ return new RestTemplate(); } } @Autowired private RestTemplate restTemplate; @GetMapping("/testRibbon/{id}") public User getTodayStatistic(@PathVariable("id") Integer id){ String url ="http://STUDY-USER/user/getUserById?id="+id; return restTemplate.getForObject(url, User.class); } 复制代码
代码都准备好了,可以开始分析了。
- 执行调用
http://localhost:8005/trade/testRibbon/2
为什么这么就能调用到服务提供者的方法?
打断点,可以看到restTemplate里有两个拦截器,根据名字可以推断
RetryLoadBalancerInterceptor是关键。
跟踪到
RetryLoadBalancerInterceptor类
@Override
public ClientHttpResponse intercept(final HttpRequest request, final byte[] body,
final ClientHttpRequestExecution execution) throws IOException {
final URI originalUri = request.getURI();
//获取到service的name
final String serviceName = originalUri.getHost();
Assert.state(serviceName != null, "Request URI does not contain a valid hostname: " + originalUri);
//根据serviceName和LoadBalancerClient,LoadBalancedRetryPolicy里面包含了RibbonLoadBalancerContext和ServiceInstanceChooser
final LoadBalancedRetryPolicy retryPolicy = lbRetryFactory.createRetryPolicy(serviceName,
loadBalancer);
RetryTemplate template = createRetryTemplate(serviceName, request, retryPolicy);
//执行方法会进入到doExecute方法
return template.execute(context -> {
ServiceInstance serviceInstance = null;
if (context instanceof LoadBalancedRetryContext) {
LoadBalancedRetryContext lbContext = (LoadBalancedRetryContext) context;
serviceInstance = lbContext.getServiceInstance();
}
if (serviceInstance == null) {
serviceInstance = loadBalancer.choose(serviceName);
}
ClientHttpResponse response = RetryLoadBalancerInterceptor.this.loadBalancer.execute(
serviceName, serviceInstance,
requestFactory.createRequest(request, body, execution));
int statusCode = response.getRawStatusCode();
if (retryPolicy != null && retryPolicy.retryableStatusCode(statusCode)) {
byte[] bodyCopy = StreamUtils.copyToByteArray(response.getBody());
response.close();
throw new ClientHttpResponseStatusCodeException(serviceName, response, bodyCopy);
}
return response;
}, new LoadBalancedRecoveryCallback<ClientHttpResponse, ClientHttpResponse>() {
//This is a special case, where both parameters to LoadBalancedRecoveryCallback are
//the same. In most cases they would be different.
@Override
protected ClientHttpResponse createResponse(ClientHttpResponse response, URI uri) {
return response;
}
});
}
复制代码doExecute方法:
protected <T, E extends Throwable> T doExecute(RetryCallback<T, E> retryCallback,
RecoveryCallback<T> recoveryCallback, RetryState state)
throws E, ExhaustedRetryException {
//省略部分代码
/*
* We allow the whole loop to be skipped if the policy or context already
* forbid the first try. This is used in the case of external retry to allow a
* recovery in handleRetryExhausted without the callback processing (which
* would throw an exception).
*/
//执行逻辑的关键方法
while (canRetry(retryPolicy, context) && !context.isExhaustedOnly()) {
}
复制代码继续跟踪canRetry方法
@Override
public boolean canRetry(RetryContext context) {
LoadBalancedRetryContext lbContext = (LoadBalancedRetryContext)context;
if(lbContext.getRetryCount() == 0 && lbContext.getServiceInstance() == null) {
//We haven't even tried to make the request yet so return true so we do
//设置选中的服务提供者
lbContext.setServiceInstance(serviceInstanceChooser.choose(serviceName));
return true;
}
return policy.canRetryNextServer(lbContext);
}
复制代码我们跟踪
serviceInstanceChooser.choose(serviceName)看看怎么通过serviceName选服务提供者的。
@Override
public ServiceInstance choose(String serviceId) {
//选择server
Server server = getServer(serviceId);
if (server == null) {
return null;
}
return new RibbonServer(serviceId, server, isSecure(server, serviceId),
serverIntrospector(serviceId).getMetadata(server));
}
复制代码跟踪getServer方法
protected Server getServer(ILoadBalancer loadBalancer) {
if (loadBalancer == null) {
return null;
}
//可以看出是loadBalancer在选择
return loadBalancer.chooseServer("default"); // TODO: better handling of key
}
复制代码继续深入
public Server chooseServer(Object key) {
if (counter == null) {
counter = createCounter();
}
//有一个调用次数在+1
counter.increment();
if (rule == null) {
return null;
} else {
try {
//委托给了IRule,所以Irule是负载均衡的关键,最后来总结
return rule.choose(key);
} catch (Exception e) {
logger.warn("LoadBalancer [{}]: Error choosing server for key {}", name, key, e);
return null;
}
}
}
复制代码查看Irule的实现
public Server choose(Object key) {
ILoadBalancer lb = getLoadBalancer();
//lb.getAllServers里面是所有的服务提供者列表
Optional<Server> server = getPredicate().chooseRoundRobinAfterFiltering(lb.getAllServers(), key);
if (server.isPresent()) {
return server.get();
} else {
return null;
}
}
复制代码跟踪
chooseRoundRobinAfterFiltering方法
public Optional<Server> chooseRoundRobinAfterFiltering(List<Server> servers, Object loadBalancerKey) {
//拿到筛选后的servers
List<Server> eligible = getEligibleServers(servers, loadBalancerKey);
if (eligible.size() == 0) {
return Optional.absent();
}
//incrementAndGetModulo方法拿到下标,然后根据list.get取到一个服务
return Optional.of(eligible.get(incrementAndGetModulo(eligible.size())));
}
复制代码至此就拿到了具体的服务提供者。
但是到这里还有个问题?
- 怎么根据服务名拿到server的?
有一个ServerList接口是用于拿到服务列表的。我们使用的loadBalancer(ZoneAwareLoadBalancer)的父类
DynamicServerListLoadBalancer类的构造方法里,有一个restOfinit方法
public DynamicServerListLoadBalancer(IClientConfig clientConfig, IRule rule, IPing ping,
ServerList<T> serverList, ServerListFilter<T> filter,
ServerListUpdater serverListUpdater) {
super(clientConfig, rule, ping);
this.serverListImpl = serverList;
this.filter = filter;
this.serverListUpdater = serverListUpdater;
if (filter instanceof AbstractServerListFilter) {
((AbstractServerListFilter) filter).setLoadBalancerStats(getLoadBalancerStats());
}
restOfInit(clientConfig);
}
复制代码跟踪restOfInit方法
void restOfInit(IClientConfig clientConfig) {
boolean primeConnection = this.isEnablePrimingConnections();
// turn this off to avoid duplicated asynchronous priming done in BaseLoadBalancer.setServerList()
this.setEnablePrimingConnections(false);
enableAndInitLearnNewServersFeature();
//用于获取所有的serverList
updateListOfServers();
if (primeConnection && this.getPrimeConnections() != null) {
this.getPrimeConnections()
.primeConnections(getReachableServers());
}
this.setEnablePrimingConnections(primeConnection);
LOGGER.info("DynamicServerListLoadBalancer for client {} initialized: {}", clientConfig.getClientName(), this.toString());
}
复制代码继续跟踪updateListOfServers方法
public void updateListOfServers() {
List<T> servers = new ArrayList<T>();
if (serverListImpl != null) {
//查询serverList
servers = serverListImpl.getUpdatedListOfServers();
LOGGER.debug("List of Servers for {} obtained from Discovery client: {}",
getIdentifier(), servers);
if (filter != null) {
servers = filter.getFilteredListOfServers(servers);
LOGGER.debug("Filtered List of Servers for {} obtained from Discovery client: {}",
getIdentifier(), servers);
}
}
updateAllServerList(servers);
}
复制代码继续跟踪源码到obtainServersViaDiscovery方法,
private List<DiscoveryEnabledServer> obtainServersViaDiscovery() {
List<DiscoveryEnabledServer> serverList = new ArrayList<DiscoveryEnabledServer>();
//eurekaClientProvider.get()会去获取EurekaClient
if (eurekaClientProvider == null || eurekaClientProvider.get() == null) {
logger.warn("EurekaClient has not been initialized yet, returning an empty list");
return new ArrayList<DiscoveryEnabledServer>();
}
EurekaClient eurekaClient = eurekaClientProvider.get();
//vipAddresses就是serviceName
if (vipAddresses!=null){
for (String vipAddress : vipAddresses.split(",")) {
// if targetRegion is null, it will be interpreted as the same region of client
//此处获取到服务的信息
List<InstanceInfo> listOfInstanceInfo = eurekaClient.getInstancesByVipAddress(vipAddress, isSecure, targetRegion);
for (InstanceInfo ii : listOfInstanceInfo) {
if (ii.getStatus().equals(InstanceStatus.UP)) {
if(shouldUseOverridePort){
if(logger.isDebugEnabled()){
logger.debug("Overriding port on client name: " + clientName + " to " + overridePort);
}
// copy is necessary since the InstanceInfo builder just uses the original reference,
// and we don't want to corrupt the global eureka copy of the object which may be
// used by other clients in our system
InstanceInfo copy = new InstanceInfo(ii);
if(isSecure){
ii = new InstanceInfo.Builder(copy).setSecurePort(overridePort).build();
}else{
ii = new InstanceInfo.Builder(copy).setPort(overridePort).build();
}
}
DiscoveryEnabledServer des = new DiscoveryEnabledServer(ii, isSecure, shouldUseIpAddr);
des.setZone(DiscoveryClient.getZone(ii));
serverList.add(des);
}
}
if (serverList.size()>0 && prioritizeVipAddressBasedServers){
break; // if the current vipAddress has servers, we dont use subsequent vipAddress based servers
}
}
}
return serverList;
}
复制代码综合上面可以看出,最终是通过eurekaClient去拿到服务列表的。
那么如果服务列表发生变化怎么刷新呢?
是通过CacheRefreshThread在定时线程池里面执行,核心拉取方法是fetchRegistry
Iping
Iping是用于探测服务列表中的服务是否正常,如果不正常,则从eureka拉取服务列表并更新。
在BaseLoadBalancer里面有一个setupPingTask方法,启动定时任务,30秒一次定时向EurekaClient发送“ping”
public BaseLoadBalancer(String name, IRule rule, LoadBalancerStats stats,
IPing ping, IPingStrategy pingStrategy) {
logger.debug("LoadBalancer [{}]: initialized", name);
this.name = name;
this.ping = ping;
this.pingStrategy = pingStrategy;
setRule(rule);
setupPingTask();
lbStats = stats;
init();
}
复制代码Iping的具体逻辑在PingTask类里。
Irule总结:
Irule是负载均衡的规则:
我这里默认是使用的是ZoneAvoidanceRule,还有很多种策略:
- RandomRule: 随机
- RoundRobinRule: 轮询
- RetryRule: 先按照RoundRobinRule的策略获取服务,如果获取服务失败则在指定时间内会进行重试,获取可用的服务
- WeightedResponseTimeRule: 对RoundRobinRule的扩展,响应速度越快的实例选择权重越大,越容易被选择
- BestAvailableRule:会先过滤掉由于多次访问故障而处于断路器跳闸状态的服务,然后选择一个并发量最小的服务
- AvailabilityFilteringRule:先过滤掉故障实例,再选择并发较小的实例
- ZoneAvoidanceRule:默认规则,复合判断server所在区域的性能和server的可用性选择服务器
properties配置方式如下:
STUDY-USER是服务名
STUDY-USER.ribbon.NFLoadBalancerRuleClassName=com.netflix.loadbalancer.RoundRobinRule
作者:别掉头发了
链接:
https://juejin.cn/post/7020403359270043685
来源:稀土掘金
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。
相关推荐
- Python入门学习记录之一:变量_python怎么用变量
-
写这个,主要是对自己学习python知识的一个总结,也是加深自己的印象。变量(英文:variable),也叫标识符。在python中,变量的命名规则有以下三点:>变量名只能包含字母、数字和下划线...
- python变量命名规则——来自小白的总结
-
python是一个动态编译类编程语言,所以程序在运行前不需要如C语言的先行编译动作,因此也只有在程序运行过程中才能发现程序的问题。基于此,python的变量就有一定的命名规范。python作为当前热门...
- Python入门学习教程:第 2 章 变量与数据类型
-
2.1什么是变量?在编程中,变量就像一个存放数据的容器,它可以存储各种信息,并且这些信息可以被读取和修改。想象一下,变量就如同我们生活中的盒子,你可以把东西放进去,也可以随时拿出来看看,甚至可以换成...
- 绘制学术论文中的“三线表”具体指导
-
在科研过程中,大家用到最多的可能就是“三线表”。“三线表”,一般主要由三条横线构成,当然在变量名栏里也可以拆分单元格,出现更多的线。更重要的是,“三线表”也是一种数据记录规范,以“三线表”形式记录的数...
- Python基础语法知识--变量和数据类型
-
学习Python中的变量和数据类型至关重要,因为它们构成了Python编程的基石。以下是帮助您了解Python中的变量和数据类型的分步指南:1.变量:变量在Python中用于存储数据值。它们充...
- 一文搞懂 Python 中的所有标点符号
-
反引号`无任何作用。传说Python3中它被移除是因为和单引号字符'太相似。波浪号~(按位取反符号)~被称为取反或补码运算符。它放在我们想要取反的对象前面。如果放在一个整数n...
- Python变量类型和运算符_python中变量的含义
-
别再被小名词坑哭了:Python新手常犯的那些隐蔽错误,我用同事的真实bug拆给你看我记得有一次和同事张姐一起追查一个看似随机崩溃的脚本,最后发现罪魁祸首竟然是她把变量命名成了list。说实话...
- 从零开始:深入剖析 Spring Boot3 中配置文件的加载顺序
-
在当今的互联网软件开发领域,SpringBoot无疑是最为热门和广泛应用的框架之一。它以其强大的功能、便捷的开发体验,极大地提升了开发效率,成为众多开发者构建Web应用程序的首选。而在Spr...
- Python中下划线 ‘_’ 的用法,你知道几种
-
Python中下划线()是一个有特殊含义和用途的符号,它可以用来表示以下几种情况:1在解释器中,下划线(_)表示上一个表达式的值,可以用来进行快速计算或测试。例如:>>>2+...
- 解锁Shell编程:变量_shell $变量
-
引言:开启Shell编程大门Shell作为用户与Linux内核之间的桥梁,为我们提供了强大的命令行交互方式。它不仅能执行简单的文件操作、进程管理,还能通过编写脚本实现复杂的自动化任务。无论是...
- 一文学会Python的变量命名规则!_python的变量命名有哪些要求
-
目录1.变量的命名原则3.内置函数尽量不要做变量4.删除变量和垃圾回收机制5.结语1.变量的命名原则①由英文字母、_(下划线)、或中文开头②变量名称只能由英文字母、数字、下画线或中文字所组成。③英文字...
- 更可靠的Rust-语法篇-区分语句/表达式,略览if/loop/while/for
-
src/main.rs://函数定义fnadd(a:i32,b:i32)->i32{a+b//末尾表达式}fnmain(){leta:i3...
- C++第五课:变量的命名规则_c++中变量的命名规则
-
变量的命名不是想怎么起就怎么起的,而是有一套固定的规则的。具体规则:1.名字要合法:变量名必须是由字母、数字或下划线组成。例如:a,a1,a_1。2.开头不能是数字。例如:可以a1,但不能起1a。3....
- Rust编程-核心篇-不安全编程_rust安全性
-
Unsafe的必要性Rust的所有权系统和类型系统为我们提供了强大的安全保障,但在某些情况下,我们需要突破这些限制来:与C代码交互实现底层系统编程优化性能关键代码实现某些编译器无法验证的安全操作Rus...
- 探秘 Python 内存管理:背后的神奇机制
-
在编程的世界里,内存管理就如同幕后的精密操控者,确保程序的高效运行。Python作为一种广泛使用的编程语言,其内存管理机制既巧妙又复杂,为开发者们提供了便利的同时,也展现了强大的底层控制能力。一、P...
- 一周热门
- 最近发表
- 标签列表
-
- HTML 教程 (33)
- HTML 简介 (35)
- HTML 实例/测验 (32)
- HTML 测验 (32)
- JavaScript 和 HTML DOM 参考手册 (32)
- HTML 拓展阅读 (30)
- HTML文本框样式 (31)
- HTML滚动条样式 (34)
- HTML5 浏览器支持 (33)
- HTML5 新元素 (33)
- HTML5 WebSocket (30)
- HTML5 代码规范 (32)
- HTML5 标签 (717)
- HTML5 标签 (已废弃) (75)
- HTML5电子书 (32)
- HTML5开发工具 (34)
- HTML5小游戏源码 (34)
- HTML5模板下载 (30)
- HTTP 状态消息 (33)
- HTTP 方法:GET 对比 POST (33)
- 键盘快捷键 (35)
- 标签 (226)
- opacity 属性 (32)
- transition 属性 (33)
- 1-1. 变量声明 (31)
