Spring Cloud LoadBalancer:自定义负载均衡策略与标签路由实现
Spring Cloud LoadBalancer:自定义负载均衡策略与标签路由实现
适读人群:有Spring Cloud基础的后端工程师 | 阅读时长:约22分钟 | Spring Boot 3.2 / Spring Cloud 2023.0
开篇故事
去年我们做了一次数据库迁移,新旧两套数据库并行运行了大概两周。迁移期间,有一批用户的数据已经迁移到新库,另一批还在旧库,两套数据库对应的是两套不同配置的服务实例(同一个服务名,但连的数据库不一样)。
最理想的方案是:已经迁移的用户请求路由到新库实例,未迁移的用户请求路由到旧库实例,整个过程对用户透明。如果用轮询负载均衡,请求会随机打到两套实例上,必然出问题。
那时候我们在Nacos的服务metadata里打了标签,然后写了一个自定义LoadBalancer,根据请求里的用户标识读取metadata,把请求精准路由到正确的实例上。这套标签路由的方案用下来效果非常好,后来也用在了灰度发布、蓝绿部署等场景,今天把完整实现写出来。
一、核心问题分析
Spring Cloud 2020年正式移除了Ribbon,引入Spring Cloud LoadBalancer作为替代。两者最大的区别是:Ribbon是同步阻塞的,而Spring Cloud LoadBalancer是响应式的,与WebFlux和WebClient能很好地集成。
Spring Cloud LoadBalancer内置了两种策略:轮询(RoundRobinLoadBalancer)和随机(RandomLoadBalancer)。这两种策略都不感知服务实例的属性,比如版本号、所在区域、是否是灰度实例。
真实的生产环境往往需要更细粒度的路由控制,主要场景包括:
- 版本路由:不同版本的客户端调用对应版本的服务实例,实现API版本隔离
- 区域感知:同一机房的服务优先调用同机房的实例,减少跨机房延迟
- 标签路由:根据请求中的标签(灰度标、用户分组等)选择对应的服务实例
- 权重路由:某些实例配置更高,分配更多流量
实现这些需求,需要理解Spring Cloud LoadBalancer的扩展机制,并自定义ReactorServiceInstanceLoadBalancer。
二、原理深度解析
2.1 LoadBalancer核心架构
2.2 自定义LoadBalancer扩展点
2.3 标签路由的数据流
三、完整代码实现
3.1 项目依赖
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-loadbalancer</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>
<!-- 缓存支持,LoadBalancer缓存实例列表需要 -->
<dependency>
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>caffeine</artifactId>
</dependency>
</dependencies>3.2 标签路由LoadBalancer核心实现
package com.laozhang.lb.loadbalancer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.cloud.client.ServiceInstance;
import org.springframework.cloud.client.loadbalancer.DefaultResponse;
import org.springframework.cloud.client.loadbalancer.EmptyResponse;
import org.springframework.cloud.client.loadbalancer.Request;
import org.springframework.cloud.client.loadbalancer.Response;
import org.springframework.cloud.loadbalancer.core.ReactorServiceInstanceLoadBalancer;
import org.springframework.cloud.loadbalancer.core.ServiceInstanceListSupplier;
import reactor.core.publisher.Mono;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
/**
* 标签感知的负载均衡器
* 优先选择与请求标签匹配的实例,如果没有匹配的实例则降级到全量实例轮询
*/
@Slf4j
public class TagAwareLoadBalancer implements ReactorServiceInstanceLoadBalancer {
private final String serviceId;
private final ObjectProvider<ServiceInstanceListSupplier> serviceInstanceListSupplierProvider;
private final AtomicInteger position = new AtomicInteger(ThreadLocalRandom.current().nextInt(1000));
public TagAwareLoadBalancer(
String serviceId,
ObjectProvider<ServiceInstanceListSupplier> serviceInstanceListSupplierProvider
) {
this.serviceId = serviceId;
this.serviceInstanceListSupplierProvider = serviceInstanceListSupplierProvider;
}
@Override
public Mono<Response<ServiceInstance>> choose(Request request) {
ServiceInstanceListSupplier supplier = serviceInstanceListSupplierProvider
.getIfAvailable();
if (supplier == null) {
log.warn("没有可用的ServiceInstanceListSupplier,serviceId={}", serviceId);
return Mono.just(new EmptyResponse());
}
return supplier.get(request).next().map(instances -> {
if (instances.isEmpty()) {
log.warn("没有可用的服务实例,serviceId={}", serviceId);
return new EmptyResponse();
}
return selectInstance(request, instances);
});
}
private Response<ServiceInstance> selectInstance(
Request<?> request,
List<ServiceInstance> instances
) {
// 从请求上下文中获取路由标签
Map<String, String> routeTags = extractRouteTags(request);
if (!routeTags.isEmpty()) {
// 尝试找到匹配所有标签的实例
List<ServiceInstance> taggedInstances = instances.stream()
.filter(instance -> matchesTags(instance, routeTags))
.collect(Collectors.toList());
if (!taggedInstances.isEmpty()) {
ServiceInstance selected = roundRobin(taggedInstances);
log.debug("标签路由命中,serviceId={},标签={},选中实例={}:{}",
serviceId, routeTags, selected.getHost(), selected.getPort());
return new DefaultResponse(selected);
} else {
log.debug("未找到匹配标签的实例,降级到全量实例,serviceId={},标签={}",
serviceId, routeTags);
}
}
// 降级:在全量实例中轮询
ServiceInstance selected = roundRobin(instances);
return new DefaultResponse(selected);
}
private Map<String, String> extractRouteTags(Request<?> request) {
// 从LoadBalancerRequestContext中获取标签
// 标签由调用方在发起请求时通过LoadBalancerRequestContextHolder设置
try {
Object context = request.getContext();
if (context instanceof Map) {
@SuppressWarnings("unchecked")
Map<String, String> tags = (Map<String, String>) ((Map<?, ?>) context)
.get("routeTags");
return tags != null ? tags : Map.of();
}
} catch (Exception e) {
log.debug("获取路由标签失败", e);
}
return Map.of();
}
private boolean matchesTags(ServiceInstance instance, Map<String, String> routeTags) {
Map<String, String> instanceMetadata = instance.getMetadata();
return routeTags.entrySet().stream()
.allMatch(entry -> entry.getValue().equals(instanceMetadata.get(entry.getKey())));
}
private ServiceInstance roundRobin(List<ServiceInstance> instances) {
int pos = Math.abs(position.incrementAndGet());
return instances.get(pos % instances.size());
}
}3.3 配置LoadBalancer到特定服务
package com.laozhang.lb.config;
import com.laozhang.lb.loadbalancer.TagAwareLoadBalancer;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.cloud.loadbalancer.core.ReactorServiceInstanceLoadBalancer;
import org.springframework.cloud.loadbalancer.core.ServiceInstanceListSupplier;
import org.springframework.cloud.loadbalancer.support.LoadBalancerClientFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.core.env.Environment;
/**
* 自定义LoadBalancer配置
* 注意:不要加@Configuration,通过@LoadBalancerClient指定给特定服务
*/
public class TagAwareLoadBalancerConfig {
@Bean
public ReactorServiceInstanceLoadBalancer tagAwareLoadBalancer(
Environment environment,
LoadBalancerClientFactory loadBalancerClientFactory
) {
String name = environment.getProperty(LoadBalancerClientFactory.PROPERTY_NAME);
ObjectProvider<ServiceInstanceListSupplier> supplier =
loadBalancerClientFactory.getLazyProvider(name, ServiceInstanceListSupplier.class);
return new TagAwareLoadBalancer(name, supplier);
}
}package com.laozhang.lb;
import com.laozhang.lb.config.TagAwareLoadBalancerConfig;
import org.springframework.cloud.loadbalancer.annotation.LoadBalancerClient;
import org.springframework.cloud.loadbalancer.annotation.LoadBalancerClients;
import org.springframework.context.annotation.Configuration;
/**
* 为特定服务指定自定义LoadBalancer
* 也可以用@LoadBalancerClients(defaultConfiguration=...) 全局生效
*/
@Configuration
@LoadBalancerClients({
@LoadBalancerClient(name = "order-service", configuration = TagAwareLoadBalancerConfig.class),
@LoadBalancerClient(name = "user-service", configuration = TagAwareLoadBalancerConfig.class)
})
public class LoadBalancerConfiguration {
}3.4 路由标签传递工具类
在Feign调用和RestTemplate调用中,需要把路由标签传递给LoadBalancer:
package com.laozhang.lb.context;
import org.springframework.cloud.client.loadbalancer.LoadBalancerRequestContext;
import java.util.HashMap;
import java.util.Map;
/**
* 路由标签上下文,线程局部变量存储当前请求的路由标签
* 用于在Feign拦截器中设置,在LoadBalancer中读取
*/
public class RouteTagContext {
private static final ThreadLocal<Map<String, String>> TAG_HOLDER =
ThreadLocal.withInitial(HashMap::new);
public static void setTag(String key, String value) {
TAG_HOLDER.get().put(key, value);
}
public static Map<String, String> getTags() {
return TAG_HOLDER.get();
}
public static void clear() {
TAG_HOLDER.remove();
}
// 便捷方法:设置版本标签
public static void setVersion(String version) {
setTag("version", version);
}
// 便捷方法:设置灰度标签
public static void setGray(boolean isGray) {
setTag("version", isGray ? "gray" : "stable");
}
}package com.laozhang.lb.interceptor;
import com.laozhang.lb.context.RouteTagContext;
import feign.RequestInterceptor;
import feign.RequestTemplate;
import org.springframework.stereotype.Component;
import java.util.Map;
/**
* Feign请求拦截器:把路由标签写入请求Header
* LoadBalancer从Header中读取标签信息
*/
@Component
public class RouteTagFeignInterceptor implements RequestInterceptor {
private static final String ROUTE_TAG_HEADER_PREFIX = "X-Route-Tag-";
@Override
public void apply(RequestTemplate template) {
Map<String, String> tags = RouteTagContext.getTags();
tags.forEach((key, value) -> {
template.header(ROUTE_TAG_HEADER_PREFIX + key, value);
});
}
}3.5 区域感知LoadBalancer
区域感知是标签路由的一个重要应用,优先选择同区域的实例:
package com.laozhang.lb.loadbalancer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.cloud.client.ServiceInstance;
import org.springframework.cloud.client.loadbalancer.DefaultResponse;
import org.springframework.cloud.client.loadbalancer.EmptyResponse;
import org.springframework.cloud.client.loadbalancer.Request;
import org.springframework.cloud.client.loadbalancer.Response;
import org.springframework.cloud.loadbalancer.core.ReactorServiceInstanceLoadBalancer;
import org.springframework.cloud.loadbalancer.core.ServiceInstanceListSupplier;
import reactor.core.publisher.Mono;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
/**
* 区域感知负载均衡器
* 优先选择同区域的实例,同区域没有实例时才跨区域
*/
@Slf4j
public class ZoneAwareLoadBalancer implements ReactorServiceInstanceLoadBalancer {
private final String serviceId;
private final String localZone;
private final ObjectProvider<ServiceInstanceListSupplier> supplierProvider;
private final AtomicInteger position = new AtomicInteger(0);
public ZoneAwareLoadBalancer(
String serviceId,
String localZone,
ObjectProvider<ServiceInstanceListSupplier> supplierProvider
) {
this.serviceId = serviceId;
this.localZone = localZone;
this.supplierProvider = supplierProvider;
}
@Override
public Mono<Response<ServiceInstance>> choose(Request request) {
ServiceInstanceListSupplier supplier = supplierProvider.getIfAvailable();
if (supplier == null) {
return Mono.just(new EmptyResponse());
}
return supplier.get(request).next().map(instances -> {
if (instances.isEmpty()) {
return new EmptyResponse();
}
// 优先同区域
List<ServiceInstance> sameZoneInstances = instances.stream()
.filter(i -> localZone.equals(i.getMetadata().get("zone")))
.collect(Collectors.toList());
if (!sameZoneInstances.isEmpty()) {
log.debug("区域感知路由,使用同区域实例,zone={},数量={}",
localZone, sameZoneInstances.size());
return new DefaultResponse(roundRobin(sameZoneInstances));
}
// 降级:跨区域
log.debug("无同区域实例,降级跨区域,serviceId={}", serviceId);
return new DefaultResponse(roundRobin(instances));
});
}
private ServiceInstance roundRobin(List<ServiceInstance> instances) {
int pos = Math.abs(position.incrementAndGet());
return instances.get(pos % instances.size());
}
}3.6 LoadBalancer缓存优化配置
spring:
cloud:
loadbalancer:
# 开启Caffeine本地缓存,避免每次调用都查询Nacos
cache:
enabled: true
ttl: 35s # 缓存TTL,略大于Nacos心跳超时
capacity: 1000 # 缓存容量
# 健康检查相关
health-check:
initial-delay: 0
interval: 25s
# 重试配置
retry:
enabled: true
max-retries-on-same-service-instance: 0
max-retries-on-next-service-instance: 2
retryable-status-codes: 500, 502, 503, 504四、生产配置与调优
4.1 与Nacos权重的集成
Nacos控制台可以调整实例的权重,自定义LoadBalancer要支持Nacos权重:
package com.laozhang.lb.loadbalancer;
import com.alibaba.cloud.nacos.lb.NacosLoadBalancer;
import org.springframework.cloud.client.ServiceInstance;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
/**
* 加权随机选择算法
* 根据Nacos实例的weight字段做加权随机
*/
public class WeightedRandomSelector {
public static ServiceInstance select(List<ServiceInstance> instances) {
double totalWeight = instances.stream()
.mapToDouble(i -> {
String weightStr = i.getMetadata().getOrDefault("nacos.weight", "1.0");
return Double.parseDouble(weightStr);
})
.sum();
double randomValue = ThreadLocalRandom.current().nextDouble(totalWeight);
double cumWeight = 0;
for (ServiceInstance instance : instances) {
String weightStr = instance.getMetadata().getOrDefault("nacos.weight", "1.0");
cumWeight += Double.parseDouble(weightStr);
if (randomValue <= cumWeight) {
return instance;
}
}
return instances.get(instances.size() - 1);
}
}五、踩坑实录
坑一:@LoadBalancerClient配置类不能被@ComponentScan扫描到。
这是Spring Cloud LoadBalancer里最经典的坑,和Spring Cloud Netflix时代的Ribbon一模一样。@LoadBalancerClient(configuration=...)指定的配置类不能放在主包或子包下,否则它会被@ComponentScan扫描到,变成全局配置,对所有服务生效,而不是只对指定服务生效。
解决方案:把配置类放在单独的包下,比如com.laozhang.lb.config.isolated,然后在@SpringBootApplication的scanBasePackages里排除这个包,或者单独放在主包之外。
坑二:LoadBalancer缓存与Nacos实例变更的时延问题。
开启了Caffeine缓存后,cache.ttl设置的是35秒。如果某个服务实例在Nacos下线,LoadBalancer要等缓存过期后(最多35秒)才能感知到。在这35秒里,仍然可能把请求路由到已经下线的实例上,导致调用失败。
正确的处理方式是配合Nacos的服务变更监听器,在监听到实例变更时主动刷新LoadBalancer缓存:
// 监听Nacos实例变更,主动刷新LoadBalancer缓存
@Autowired
private CachingServiceInstanceListSupplier cachingSupplier;
// 在NamingEvent回调里调用
cachingSupplier.getCache().invalidateAll();坑三:线程局部变量在异步Feign调用中丢失。
RouteTagContext用ThreadLocal存储标签,但如果Feign调用是在异步线程池里发起的(比如CompletableFuture.supplyAsync),新线程里的ThreadLocal是空的,路由标签就丢失了。
解决方案是在提交异步任务时,把当前线程的标签信息传递给子线程:
Map<String, String> currentTags = RouteTagContext.getTags();
CompletableFuture.supplyAsync(() -> {
RouteTagContext.getTags().putAll(currentTags);
try {
return orderServiceClient.getOrder(orderId);
} finally {
RouteTagContext.clear();
}
});六、总结
Spring Cloud LoadBalancer的扩展点非常清晰:通过自定义ReactorServiceInstanceLoadBalancer实现选实例的逻辑,通过ServiceInstanceListSupplier实现实例过滤逻辑。标签路由、区域感知、权重路由都是基于这两个扩展点实现的。
关键注意点:配置类不能被主包扫描到(否则变成全局配置),缓存TTL要合理设置,标签信息在异步调用时要显式传递。
