WebFlux 错误处理深度实战——onErrorResume、全局异常、fallback 策略
WebFlux 错误处理深度实战——onErrorResume、全局异常、fallback 策略
适读人群:正在用 WebFlux 构建生产系统、需要完善错误处理的工程师 | 阅读时长:约14分钟 | 核心价值:从真实案例出发,覆盖 WebFlux 错误处理的完整方案
去年我接手了一个同事写的 WebFlux 服务,当时他已经离职了,我来做功能迭代。第一次看那个代码,有点懵——错误处理的写法非常随意,有些地方直接 onErrorReturn(null),有些地方完全没有错误处理,下游抛个 500 整个请求就挂了。
用了一段时间之后,那个服务的报警频率让我有点崩溃。不是系统不稳定,是每次出了问题,日志里什么信息都没有,根本不知道发生了什么。
所以我花了差不多两周,把那个服务的错误处理完整重构了一遍。这篇文章是这段经历的总结。
一、WebFlux 里的错误传播是怎么工作的
要理解如何处理错误,得先搞清楚响应式链里错误是怎么传播的。
在 Spring MVC 里,异常从方法里抛出,沿着调用栈往上冒,到 @ControllerAdvice 被捕获。这个过程很直观,因为有物理意义上的"调用栈"。
WebFlux 里没有真正意义上的调用栈。错误在响应式链中是以"信号"的形式传播的:
// 这是一个错误传播的示例
Mono<UserVO> result = userRepository.findById(id) // 步骤1
.map(user -> expensiveTransform(user)) // 步骤2
.flatMap(user -> orderService.getOrders(user.getId())); // 步骤3
// 假设步骤2里的 expensiveTransform 抛出了异常
// 步骤3 不会执行
// 这个 Mono 会以 error 状态完成,携带那个异常
// 如果没有错误处理,最终会到达全局异常处理器任何位置发生的未处理异常,都会把当前的响应式链"染色"为错误状态,后续的 map、flatMap 等操作都会被跳过,直到遇到专门处理错误的操作符,或者到达终点。
二、链内错误处理的几个操作符
onErrorReturn:最简单的降级,返回默认值
public Mono<UserVO> getUser(Long id) {
return userRepository.findById(id)
.map(UserVO::from)
.onErrorReturn(UserVO.empty()); // 任何错误都返回空 VO
}
// 也可以指定只处理某类异常
public Mono<UserVO> getUserSafe(Long id) {
return userRepository.findById(id)
.map(UserVO::from)
.onErrorReturn(DataAccessException.class, UserVO.empty()) // 只处理 DB 异常
// 其他异常会继续传播,不在这里处理
;
}onErrorResume:更灵活的降级,切换到另一个 Mono
public Mono<UserVO> getUser(Long id) {
return userRepository.findById(id)
.map(UserVO::from)
.onErrorResume(DataAccessException.class, e -> {
// 数据库查不到,试试从缓存拿
log.warn("DB查询失败,尝试缓存: userId={}", id, e);
return cacheService.getUserFromCache(id)
.onErrorReturn(UserVO.empty()); // 缓存也失败,返回空
});
}onErrorMap:转换异常类型
// 把底层的技术异常转换成业务异常,避免技术细节暴露给上层
public Mono<User> findUser(Long id) {
return userRepository.findById(id)
.onErrorMap(R2dbcException.class,
e -> new ServiceException("用户查询失败: " + id, e));
// 注意:这里转换了异常类型,但错误信号还在继续传播
// 只是换了个"皮",并没有"消化"这个错误
}doOnError:不处理错误,只做副作用(打日志)
public Mono<User> findUser(Long id) {
return userRepository.findById(id)
.doOnError(e -> log.error("查询用户失败: id={}", id, e));
// 错误继续传播,只是顺道打了个日志
}三、全局异常处理器
在 WebFlux 里,全局异常处理有两个主要方式:@ControllerAdvice 和 WebExceptionHandler。
@ControllerAdvice 方式(处理控制器层的异常):
@RestControllerAdvice
public class GlobalExceptionHandler {
// 处理自定义业务异常
@ExceptionHandler(BusinessException.class)
public Mono<ResponseEntity<ErrorResponse>> handleBusinessException(
BusinessException e, ServerWebExchange exchange) {
log.warn("业务异常: {}", e.getMessage());
return Mono.just(ResponseEntity
.status(HttpStatus.BAD_REQUEST)
.body(ErrorResponse.of(e.getCode(), e.getMessage())));
}
// 处理参数校验异常
@ExceptionHandler(WebExchangeBindException.class)
public Mono<ResponseEntity<ErrorResponse>> handleValidationException(
WebExchangeBindException e) {
String message = e.getBindingResult().getFieldErrors().stream()
.map(fe -> fe.getField() + ": " + fe.getDefaultMessage())
.collect(Collectors.joining(", "));
return Mono.just(ResponseEntity
.status(HttpStatus.BAD_REQUEST)
.body(ErrorResponse.of("VALIDATION_ERROR", message)));
}
// 兜底处理
@ExceptionHandler(Exception.class)
public Mono<ResponseEntity<ErrorResponse>> handleUnknownException(
Exception e, ServerWebExchange exchange) {
// 不要把内部错误信息返回给客户端
log.error("未知异常: path={}", exchange.getRequest().getPath(), e);
return Mono.just(ResponseEntity
.status(HttpStatus.INTERNAL_SERVER_ERROR)
.body(ErrorResponse.of("INTERNAL_ERROR", "服务暂时不可用,请稍后重试")));
}
}这个和 Spring MVC 的写法几乎一样,迁移成本很低。
WebExceptionHandler 方式(更底层,处理 Filter 层的异常):
@Component
@Order(Ordered.HIGHEST_PRECEDENCE) // 优先级要高
public class GlobalWebExceptionHandler implements WebExceptionHandler {
@Override
public Mono<Void> handle(ServerWebExchange exchange, Throwable ex) {
ServerHttpResponse response = exchange.getResponse();
HttpStatus status;
String errorCode;
String message;
if (ex instanceof BusinessException be) {
status = HttpStatus.BAD_REQUEST;
errorCode = be.getCode();
message = be.getMessage();
} else if (ex instanceof AuthenticationException) {
status = HttpStatus.UNAUTHORIZED;
errorCode = "UNAUTHORIZED";
message = "请先登录";
} else {
status = HttpStatus.INTERNAL_SERVER_ERROR;
errorCode = "INTERNAL_ERROR";
message = "服务异常";
log.error("未处理的异常", ex);
}
response.setStatusCode(status);
response.getHeaders().setContentType(MediaType.APPLICATION_JSON);
ErrorResponse errorResponse = ErrorResponse.of(errorCode, message);
byte[] bytes = objectMapper.writeValueAsBytes(errorResponse);
DataBuffer buffer = response.bufferFactory().wrap(bytes);
return response.writeWith(Mono.just(buffer));
}
}一般来说,@ControllerAdvice 够用了。WebExceptionHandler 只在需要处理 Filter 层异常(比如鉴权失败)时才需要。
四、Fallback 策略的完整实现
Fallback 策略是指:当主要调用路径失败时,自动降级到备用方案。这在微服务场景里非常重要。
@Service
public class ProductService {
@Autowired
private ProductRepository productRepository;
@Autowired
private ProductCacheService cacheService;
@Autowired
private ProductDefaultService defaultService;
/**
* 三级降级策略:
* 1. 先查数据库(主路径)
* 2. DB 失败查缓存(一级降级)
* 3. 缓存也失败返回默认数据(二级降级)
*/
public Mono<ProductVO> getProduct(Long productId) {
return productRepository.findById(productId)
.map(ProductVO::from)
// 注意:switchIfEmpty 处理数据不存在(空),不是异常
.switchIfEmpty(Mono.error(new ProductNotFoundException(productId)))
.onErrorResume(ProductNotFoundException.class, e -> {
// 商品不存在,不需要降级,直接返回 404 错误
return Mono.error(e);
})
.onErrorResume(DataAccessException.class, e -> {
// 数据库访问失败,降级到缓存
log.warn("DB查询商品失败,降级到缓存: productId={}", productId, e);
return cacheService.getProduct(productId)
.doOnNext(vo -> log.info("缓存命中: productId={}", productId))
.onErrorResume(CacheException.class, ce -> {
// 缓存也失败,降级到默认数据
log.warn("缓存也失败,使用默认数据: productId={}", productId, ce);
return defaultService.getDefaultProduct(productId);
});
})
.timeout(Duration.ofMillis(800)) // 整个操作不能超过800ms
.onErrorResume(TimeoutException.class, e -> {
log.warn("查询商品超时,返回空数据: productId={}", productId);
return Mono.just(ProductVO.empty());
});
}
}这个链子写出来,有点长,但逻辑清晰:每一层 onErrorResume 处理特定类型的错误,不同的错误有不同的降级策略。
五、错误处理的几个常见反模式
反模式一:吞掉异常,什么都不做
// 危险!异常被吞掉,调用方以为成功了,但什么都没发生
.onErrorResume(e -> Mono.empty());
// 至少要打日志
.onErrorResume(e -> {
log.error("操作失败,但继续执行(降级为空结果)", e);
return Mono.empty();
});反模式二:在 onErrorReturn 里返回 null
// 这很危险,下游如果不判空会 NPE
.onErrorReturn(null);
// 用空对象模式代替
.onErrorReturn(UserVO.empty());反模式三:错误处理顺序错误
// 错误的写法!先设置了 timeout,onErrorResume 放在了 timeout 之前
// 这样 TimeoutException 不会被 onErrorResume 捕获
.onErrorResume(e -> fallback())
.timeout(Duration.ofMillis(500));
// 正确的写法:按照你希望处理的顺序放
.timeout(Duration.ofMillis(500))
.onErrorResume(TimeoutException.class, e -> fallback());响应式链的操作符是有顺序的,这和 Spring MVC 的异常处理不一样,不是按照异常类型匹配,而是按照在链中的位置。我当时在这里栽了个跟头,记忆深刻。
六、业务异常的设计规范
WebFlux 里的业务异常,我推荐统一的设计:
// 基础业务异常
public class BusinessException extends RuntimeException {
private final String code;
private final HttpStatus httpStatus;
public BusinessException(String code, String message, HttpStatus httpStatus) {
super(message);
this.code = code;
this.httpStatus = httpStatus;
}
}
// 具体的异常类型
public class UserNotFoundException extends BusinessException {
public UserNotFoundException(Long userId) {
super("USER_NOT_FOUND", "用户不存在: " + userId, HttpStatus.NOT_FOUND);
}
}
public class InsufficientBalanceException extends BusinessException {
public InsufficientBalanceException(BigDecimal required, BigDecimal actual) {
super("INSUFFICIENT_BALANCE",
String.format("余额不足,需要 %.2f,当前余额 %.2f", required, actual),
HttpStatus.BAD_REQUEST);
}
}在全局异常处理器里统一处理 BusinessException,根据 httpStatus 设置响应状态码。这样就不需要在每个 Controller 里写 onErrorMap 了。
七、实际案例:重构那个告警频繁的服务
回到文章开头说的那个服务,重构之后的主要变化:
- 统一了异常体系:所有业务异常继承
BusinessException,全局处理器统一响应 - 链内日志标准化:所有
onErrorResume前加doOnError打日志,再也不怕"无声的失败" - fallback 策略明确化:哪里降级、降级到什么、降级的日志级别(warn 还是 error),都明确规定
- 超时全部补上:所有外部调用(DB、HTTP)都加了
timeout,再也不会因为下游慢导致线程(event loop)被拖住
重构之后的效果:告警数量从每天平均47条降到了6条以内,而且每一条告警都有完整的错误信息,排查时间从平均1小时降到了15分钟左右。
这个改进说实话不需要什么高深技术,就是把错误处理的纪律确立起来。
下一篇写背压机制,这个是 WebFlux 里很多人知道概念但不知道怎么用的东西,我来说说实际场景里怎么处理生产者比消费者快的问题。
