WebFlux + R2DBC 实战——响应式数据库访问,告别阻塞 IO
WebFlux + R2DBC 实战——响应式数据库访问,告别阻塞 IO
适读人群:在 WebFlux 项目中需要接数据库的 Java 工程师 | 阅读时长:约15分钟 | 核心价值:R2DBC 完整接入方案,踩坑记录,实际性能对比
这篇文章的起源是一个让我很尴尬的问题。
我在公司内部分享 WebFlux 的时候,有个同事当场问我:"老张,你说 WebFlux 是全链路非阻塞,但你用的是 JPA + MySQL,JPA 是阻塞的啊,你这不是假响应式吗?"
我当时就愣了一下,然后说:"……你说得对。"
确实,WebFlux 的控制器层是响应式的,但如果数据库访问用的是 JDBC,那整个 IO 链路里最慢的那一段依然是阻塞的。你可以用 Schedulers.boundedElastic() 隔离一下,勉强能用,但本质上没有改变阻塞。
R2DBC 就是为了解决这个问题出现的。
一、R2DBC 是什么,和 JDBC 有什么本质区别
JDBC 是为同步阻塞模型设计的:你调 connection.executeQuery(),当前线程就在那挂着,等数据库返回结果。
R2DBC(Reactive Relational Database Connectivity)是另一套接口规范,专门为响应式设计:你调操作,拿到一个 Publisher,数据到来时通过响应式流通知你,不占用线程等待。
目前主流数据库都有 R2DBC 驱动:
- MySQL:
r2dbc-mysql(由 asyncer 维护) - PostgreSQL:
r2dbc-postgresql(官方维护,最成熟) - H2:
r2dbc-h2(内嵌数据库,测试常用) - Oracle:
oracle-r2dbc - MSSQL:
r2dbc-mssql
需要注意:R2DBC 不支持 JPA。JPA 的设计是建立在 JDBC 之上的,没法直接用于响应式。你需要用 Spring Data R2DBC,它提供了类似 Spring Data JPA 的 Repository 接口,但底层是 R2DBC。
二、项目配置
pom.xml 的关键依赖(以 MySQL 为例):
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-r2dbc</artifactId>
</dependency>
<dependency>
<!-- asyncer 维护的 r2dbc-mysql 驱动,2024年已比较稳定 -->
<groupId>io.asyncer</groupId>
<artifactId>r2dbc-mysql</artifactId>
<version>1.1.3</version>
</dependency>
<!-- 注意:不要加 spring-boot-starter-data-jpa,会冲突 -->application.yml 配置:
spring:
r2dbc:
url: r2dbc:mysql://localhost:3306/mydb?useSSL=false&serverTimezone=Asia/Shanghai
username: root
password: yourpassword
pool:
initial-size: 5
max-size: 20 # 响应式不需要很多连接,连接本身不会被线程占用
max-idle-time: 30m
validation-query: SELECT 1
# 注意:不需要配 spring.datasource,那是 JDBC 的配置三、定义实体和 Repository
R2DBC 的实体类比 JPA 的更简单,不需要各种生命周期注解:
import org.springframework.data.annotation.Id;
import org.springframework.data.relational.core.mapping.Table;
@Table("t_user") // 对应数据库表名
public class User {
@Id
private Long id;
private String username;
private String email;
// 注意:R2DBC 不支持 @OneToMany 这种关联映射
// 关联查询需要自己写 SQL 或者用 DatabaseClient
// 这是 R2DBC 目前最大的局限性,接受不了的话可以考虑别的方案
// getter/setter 省略,或者用 Record/Lombok
}Repository 接口:
import org.springframework.data.r2dbc.repository.Query;
import org.springframework.data.repository.reactive.ReactiveCrudRepository;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
public interface UserRepository extends ReactiveCrudRepository<User, Long> {
// 方法命名查询,和 Spring Data JPA 几乎一样
Mono<User> findByUsername(String username);
Flux<User> findByEmailContaining(String keyword);
// 自定义 SQL
@Query("SELECT * FROM t_user WHERE created_at > :since ORDER BY created_at DESC LIMIT :limit")
Flux<User> findRecentUsers(LocalDateTime since, int limit);
// 更新操作
@Query("UPDATE t_user SET email = :email WHERE id = :id")
Mono<Integer> updateEmail(Long id, String email); // 返回影响行数
}使用方式:
@Service
public class UserService {
@Autowired
private UserRepository userRepository;
public Mono<User> findUser(Long id) {
return userRepository.findById(id)
.switchIfEmpty(Mono.error(new UserNotFoundException(id)));
}
public Mono<User> createUser(CreateUserRequest req) {
User user = new User();
user.setUsername(req.getUsername());
user.setEmail(req.getEmail());
// 注意:save() 在 id 为 null 时执行 INSERT,有值时执行 UPDATE
return userRepository.save(user);
}
public Flux<User> findRecentUsers(int days) {
LocalDateTime since = LocalDateTime.now().minusDays(days);
return userRepository.findRecentUsers(since, 50);
}
}四、复杂查询:DatabaseClient
Repository 处理不了的复杂查询,用 DatabaseClient(相当于 R2DBC 版的 JdbcTemplate):
@Service
public class OrderService {
@Autowired
private DatabaseClient databaseClient;
public Flux<OrderDetailVO> getOrderDetails(Long userId) {
// 手写 SQL,JOIN 查询
String sql = """
SELECT o.id, o.order_no, o.total_amount,
u.username, u.email,
COUNT(oi.id) as item_count
FROM t_order o
JOIN t_user u ON o.user_id = u.id
LEFT JOIN t_order_item oi ON o.id = oi.order_id
WHERE o.user_id = :userId AND o.status != 'DELETED'
GROUP BY o.id, o.order_no, o.total_amount, u.username, u.email
ORDER BY o.created_at DESC
""";
return databaseClient.sql(sql)
.bind("userId", userId)
.map(row -> OrderDetailVO.builder()
.orderId(row.get("id", Long.class))
.orderNo(row.get("order_no", String.class))
.totalAmount(row.get("total_amount", BigDecimal.class))
.username(row.get("username", String.class))
.email(row.get("email", String.class))
.itemCount(row.get("item_count", Integer.class))
.build())
.all();
}
}DatabaseClient 还支持插入和批量操作:
// 批量插入
public Flux<User> batchCreate(List<CreateUserRequest> requests) {
return Flux.fromIterable(requests)
.flatMap(req -> {
return databaseClient.sql("INSERT INTO t_user (username, email) VALUES (:username, :email)")
.bind("username", req.getUsername())
.bind("email", req.getEmail())
.fetch()
.rowsUpdated() // 返回影响行数
.thenReturn(req); // 转换成原始请求,后面可以查回插入的数据
}, 10) // 并发度10,不要太高,数据库连接有限
.flatMap(req -> userRepository.findByUsername(req.getUsername()));
}五、事务处理
R2DBC 的事务用 @Transactional 注解,和 JDBC 版基本一样,但有一点要注意:
@Service
public class OrderTransactionService {
@Transactional // Spring Data R2DBC 支持响应式事务
public Mono<Order> createOrder(CreateOrderRequest request) {
return userRepository.findById(request.getUserId())
.switchIfEmpty(Mono.error(new UserNotFoundException(request.getUserId())))
.flatMap(user -> {
// 创建订单
Order order = new Order();
order.setUserId(user.getId());
order.setTotalAmount(request.getTotalAmount());
return orderRepository.save(order);
})
.flatMap(order -> {
// 创建订单明细
return Flux.fromIterable(request.getItems())
.flatMap(item -> {
OrderItem orderItem = new OrderItem();
orderItem.setOrderId(order.getId());
orderItem.setProductId(item.getProductId());
orderItem.setQuantity(item.getQuantity());
return orderItemRepository.save(orderItem);
})
.then(Mono.just(order)); // 所有明细插完,返回订单
});
// 如果链中任何地方抛出异常,事务会自动回滚
}
}有一个踩过的坑要提一下:@Transactional 在 R2DBC 里必须作用在返回 Mono 或 Flux 的方法上。如果你在一个没有响应式返回值的方法上加,事务不会生效。这个问题很隐蔽,我当时排查了一上午才发现。
六、R2DBC 的局限性(真实感受)
用了大半年,我对 R2DBC 的感受是:能用,但有明显的局限性。
1. 不支持 JPA 的关联映射
@OneToMany、@ManyToOne、@JoinColumn……这些 JPA 的好用特性,R2DBC 全都没有。你需要手动写 JOIN 查询,或者拆成多个查询用 zip/flatMap 组合。
对于中等复杂度的业务,这个代价挺高的。原来 JPA 自动帮你处理的懒加载、级联操作,都得自己写。
2. 生态还不够成熟
Hibernate 生态里有大量工具支持(审计、软删除、多租户……),R2DBC 基本靠自己实现。
3. 调试比 JDBC 难
响应式链的栈帧本来就不好看,R2DBC 出了问题更难定位。
我的判断:什么时候值得用 R2DBC?
- 系统是纯 WebFlux 架构,不能有任何阻塞
- 数据库操作相对简单(主要是单表 CRUD,JOIN 不多)
- 团队有一定响应式基础
如果你的系统是以复杂查询为主,或者团队对响应式不熟,继续用 JPA + Schedulers.boundedElastic() 隔离,性价比反而更高。别为了"纯响应式"而强行踩坑。
七、实际性能数据
我在一个用户中心服务上做过对比(MySQL,同一台机器,连接池配置一致):
| 场景 | JDBC + 线程池(200线程) | R2DBC + WebFlux |
|---|---|---|
| 并发100,单表查询 P99 | 34ms | 28ms |
| 并发500,单表查询 P99 | 187ms | 89ms |
| 并发1000,单表查询 P99 | 743ms(线程池开始排队) | 163ms |
| 并发1000,JOIN查询 P99 | 891ms | 247ms |
| 内存占用(稳定状态) | ~320MB | ~198MB |
并发量上来之后,差距确实很明显。但在低并发场景下,两者几乎没有区别,甚至 R2DBC 因为额外的响应式开销会稍慢一些。
下一篇写 WebFlux 的错误处理,这块水很深,onErrorResume、全局异常、fallback 策略,每个都有需要注意的细节。
