响应式微服务实战——WebFlux + WebClient 服务间调用完整方案
响应式微服务实战——WebFlux + WebClient 服务间调用完整方案
适读人群:微服务架构下使用 WebFlux 的 Java 工程师 | 阅读时长:约15分钟 | 核心价值:WebClient 最佳实践、超时重试、负载均衡、链路追踪,完整覆盖
接了一个某物流公司的改造项目,他们的核心订单服务是 Spring MVC,下游有7、8个微服务,用的是 Feign 调用,全部同步阻塞。业务高峰期,订单服务的线程池频繁打满,P99 延迟涨到了好几秒。
需求很明确:把订单服务的服务间调用改成异步的,降低线程占用,提升并发能力。
最初的方案是把 Spring MVC 整个切换成 WebFlux,但评估之后发现改造成本太高——订单服务的代码逻辑非常复杂,大量的 JPA 查询没法短时间内迁移到 R2DBC。
最后选的折中方案:Spring MVC 的架构不动,把 Feign 换成 WebClient,HTTP 调用层从同步变成异步(用 block() 桥接),先把 IO 等待的时间重叠起来。这不是完美的响应式,但是工程上可行的第一步。
这篇文章会覆盖 WebClient 的完整用法,包括纯 WebFlux 场景和 MVC 桥接场景。
一、WebClient 基础配置
不要用 WebClient.create(),要配置一个有合理参数的实例:
@Configuration
public class WebClientConfig {
@Bean
public WebClient.Builder webClientBuilder() {
// 配置 Netty HTTP 客户端
HttpClient httpClient = HttpClient.create()
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000) // 连接超时3s
.responseTimeout(Duration.ofSeconds(10)) // 响应超时10s
.doOnConnected(conn -> conn
.addHandlerLast(new ReadTimeoutHandler(10)) // 读超时
.addHandlerLast(new WriteTimeoutHandler(5))); // 写超时
return WebClient.builder()
.clientConnector(new ReactorClientHttpConnector(httpClient))
.defaultHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
.defaultHeader(HttpHeaders.ACCEPT, MediaType.APPLICATION_JSON_VALUE)
.codecs(configurer ->
configurer.defaultCodecs().maxInMemorySize(10 * 1024 * 1024)); // 响应体最大10MB
}
// 为不同的下游服务创建不同的 WebClient 实例
@Bean("orderServiceClient")
public WebClient orderServiceClient(WebClient.Builder builder) {
return builder
.baseUrl("http://order-service") // 配合服务发现使用
.build();
}
@Bean("userServiceClient")
public WebClient userServiceClient(WebClient.Builder builder) {
return builder
.baseUrl("http://user-service")
.build();
}
}二、基础 HTTP 调用
@Service
public class UserApiClient {
@Qualifier("userServiceClient")
@Autowired
private WebClient webClient;
// GET 请求
public Mono<UserDTO> getUserById(Long userId) {
return webClient.get()
.uri("/api/users/{id}", userId)
.retrieve()
.bodyToMono(UserDTO.class);
}
// 带查询参数的 GET
public Flux<UserDTO> searchUsers(String keyword, int page, int size) {
return webClient.get()
.uri(uriBuilder -> uriBuilder
.path("/api/users/search")
.queryParam("keyword", keyword)
.queryParam("page", page)
.queryParam("size", size)
.build())
.retrieve()
.bodyToFlux(UserDTO.class);
}
// POST 请求
public Mono<UserDTO> createUser(CreateUserRequest request) {
return webClient.post()
.uri("/api/users")
.bodyValue(request)
.retrieve()
.bodyToMono(UserDTO.class);
}
// 带请求头的调用(传递 Token)
public Mono<UserDTO> getUserWithAuth(Long userId, String token) {
return webClient.get()
.uri("/api/users/{id}", userId)
.header(HttpHeaders.AUTHORIZATION, "Bearer " + token)
.retrieve()
.bodyToMono(UserDTO.class);
}
}三、错误处理
WebClient 的 retrieve() 方法,对 4xx 和 5xx 默认会抛出 WebClientResponseException,需要处理:
public Mono<UserDTO> getUserById(Long userId) {
return webClient.get()
.uri("/api/users/{id}", userId)
.retrieve()
.onStatus(HttpStatus.NOT_FOUND::equals, response -> {
// 404 时返回自定义异常
return Mono.error(new UserNotFoundException(userId));
})
.onStatus(HttpStatusCode::is5xxServerError, response -> {
// 下游服务 5xx,记录并返回业务异常
return response.bodyToMono(String.class)
.flatMap(body -> {
log.error("下游用户服务 5xx: userId={}, body={}", userId, body);
return Mono.error(new DownstreamServiceException("用户服务异常"));
});
})
.bodyToMono(UserDTO.class);
}四、超时和重试
每个服务调用都要有超时和重试,这是构建健壮微服务的基本纪律:
public Mono<UserDTO> getUserByIdWithRetry(Long userId) {
return webClient.get()
.uri("/api/users/{id}", userId)
.retrieve()
.bodyToMono(UserDTO.class)
.timeout(Duration.ofMillis(800)) // 超时800ms
.retryWhen(
Retry.backoff(3, Duration.ofMillis(100)) // 退避重试
.maxBackoff(Duration.ofSeconds(2))
.filter(throwable ->
// 只重试网络超时和 5xx,不重试 4xx(4xx 是客户端错误,重试没意义)
throwable instanceof TimeoutException
|| throwable instanceof DownstreamServiceException
)
.onRetryExhaustedThrow((spec, signal) ->
new ServiceUnavailableException("用户服务暂时不可用"))
)
.onErrorResume(UserNotFoundException.class, e -> Mono.empty()); // 404 降级为空
}五、并发调用多个下游服务
这是微服务场景里用得最多的模式:
@Service
public class OrderDetailService {
@Autowired
private UserApiClient userApiClient;
@Autowired
private ProductApiClient productApiClient;
@Autowired
private LogisticsApiClient logisticsApiClient;
public Mono<OrderDetailVO> getOrderDetail(Long orderId) {
// 第一步:查订单基本信息(必须先有订单)
return orderRepository.findById(orderId)
.switchIfEmpty(Mono.error(new OrderNotFoundException(orderId)))
.flatMap(order -> {
// 三个下游调用并发发出,互不依赖
Mono<UserDTO> userMono = userApiClient.getUserById(order.getUserId())
.timeout(Duration.ofMillis(500))
.onErrorReturn(UserDTO.unknown());
Mono<ProductDTO> productMono = productApiClient.getProduct(order.getProductId())
.timeout(Duration.ofMillis(500))
.onErrorReturn(ProductDTO.unknown());
Mono<LogisticsDTO> logisticsMono = logisticsApiClient.getByOrderId(orderId)
.timeout(Duration.ofMillis(800))
.onErrorReturn(LogisticsDTO.notAvailable());
// zip 等待三个都完成(并发执行)
return Mono.zip(userMono, productMono, logisticsMono)
.map(tuple -> OrderDetailVO.builder()
.order(order)
.user(tuple.getT1())
.product(tuple.getT2())
.logistics(tuple.getT3())
.build());
});
}
}这个模式在那个物流项目里效果很好:原来三个服务串行调用,总时间是三者之和,大约780ms;改成并发之后,总时间取决于最慢的那个,降到了大约310ms。
六、负载均衡和服务发现
WebClient 集成 Spring Cloud LoadBalancer 很简单:
// pom.xml 添加
// spring-cloud-starter-loadbalancer
@Configuration
public class WebClientConfig {
@Bean
@LoadBalanced // 加这个注解,WebClient.Builder 就会自动集成负载均衡
public WebClient.Builder loadBalancedWebClientBuilder() {
return WebClient.builder();
}
@Bean
public WebClient userServiceClient(@LoadBalanced WebClient.Builder builder) {
return builder
.baseUrl("http://user-service") // 这里写服务名,不是 IP:PORT
.build();
}
}七、链路追踪:传递 Trace ID
在微服务链路里,Trace ID 的透传很重要。响应式场景里需要借助 Reactor Context 来传递:
@Component
public class TraceIdFilter implements WebFilter {
@Override
public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
String traceId = exchange.getRequest().getHeaders()
.getFirst("X-Trace-Id");
if (traceId == null) {
traceId = UUID.randomUUID().toString().replace("-", "");
}
final String finalTraceId = traceId;
exchange.getResponse().getHeaders().add("X-Trace-Id", finalTraceId);
return chain.filter(exchange)
.contextWrite(ctx -> ctx.put("traceId", finalTraceId));
}
}
// 在 WebClient 调用时透传 Trace ID
public Mono<UserDTO> getUserById(Long userId) {
return Mono.deferContextual(ctx -> {
String traceId = ctx.getOrDefault("traceId", "unknown");
return webClient.get()
.uri("/api/users/{id}", userId)
.header("X-Trace-Id", traceId) // 透传 Trace ID 给下游
.retrieve()
.bodyToMono(UserDTO.class);
});
}八、在 Spring MVC 里用 WebClient(桥接方案)
回到开头说的那个物流项目,MVC + WebClient 的桥接方案:
// 在 Spring MVC 的 Service 里使用 WebClient
@Service
public class OrderService {
@Autowired
private UserApiClient userApiClient; // 返回 Mono 的 WebClient 封装
// Spring MVC 的同步方法
public OrderDetailVO getOrderDetail(Long orderId) {
Order order = orderRepository.findById(orderId)
.orElseThrow(() -> new OrderNotFoundException(orderId));
// 并发调用三个下游服务
Mono<OrderDetailVO> detailMono = Mono.zip(
userApiClient.getUserById(order.getUserId()),
productApiClient.getProduct(order.getProductId()),
logisticsApiClient.getByOrderId(orderId)
).map(tuple -> OrderDetailVO.builder()
.order(order)
.user(tuple.getT1())
.product(tuple.getT2())
.logistics(tuple.getT3())
.build());
// 在 MVC 里必须用 block() 等待结果
// 但三个下游调用是并发的,总时间是最慢的那个,而不是三者之和
return detailMono.block(Duration.ofSeconds(5));
}
}这个方案不算"纯响应式",但对于存量 MVC 项目来说,是一个实际可行的性能优化路径。那个物流项目用这个方案,P99 从 1840ms 降到了 487ms,改造成本可控。
WebClient 是 WebFlux 技术栈里最容易"见到效果"的部分,即使你的系统还没全面迁移到响应式,仅仅把 HTTP 调用从 Feign 换成 WebClient,在多服务并发调用的场景里就能有明显的提升。
下一篇写 WebFlux 的性能调优,线程模型、调度器选择、跟 Spring MVC 的实测对比,这块有很多有意思的细节。
