gRPC高性能通信:Protocol Buffers设计、流式RPC、与REST的互操作
gRPC高性能通信:Protocol Buffers设计、流式RPC、与REST的互操作
适读人群:Java后端工程师、微服务架构师 | 阅读时长:约18分钟 | 技术栈:gRPC-Java、Protocol Buffers 3、grpc-gateway
开篇故事
我第一次接触gRPC是在2019年,当时公司从Spring Cloud(REST)迁移到内部服务调用统一走gRPC。那次迁移很顺利,性能提升也很明显——同等业务逻辑下,内部服务调用延迟从平均15ms降到了6ms,带宽消耗降了40%。
但那次迁移里有一个问题困扰了我很久:我们有些服务需要同时对外提供REST接口(给前端)、对内提供gRPC接口(给其他服务)。当时的解法是维护两套接口,代码重复,维护麻烦。
后来了解到grpc-gateway和grpc-web,才找到了一个优雅的解法:定义一次,通过协议转换同时提供REST和gRPC。
今天这篇文章,从Protobuf设计到流式RPC,再到REST互操作,完整地把gRPC的工程实践讲一遍。
一、核心问题:gRPC比REST快在哪里
gRPC基于HTTP/2和Protocol Buffers,和传统REST(HTTP/1.1 + JSON)相比,有几个关键优势:
Protocol Buffers的效率优势:
- JSON序列化:字段名每次都传输,文本格式冗余
- Protobuf序列化:字段用数字编号,二进制格式紧凑
实测数据(我们系统的一个User对象):
- JSON序列化:142字节,耗时0.8μs
- Protobuf序列化:47字节,耗时0.3μs
数据量缩小到1/3,序列化速度快2.5倍,在高频调用场景积累效果显著。
二、原理深度解析
2.1 gRPC的四种RPC模式
2.2 Protocol Buffers编码原理
理解编码原理对Proto设计很重要:字段编号1-15只需要1字节描述,16-2047需要2字节。所以频繁使用的字段应该用小编号。
三、完整代码实现
3.1 Proto文件设计
// user_service.proto
syntax = "proto3";
package com.example.user;
option java_package = "com.example.grpc.user";
option java_outer_classname = "UserServiceProto";
option java_multiple_files = true;
import "google/protobuf/timestamp.proto";
import "google/protobuf/wrappers.proto";
// 注意字段编号分配:
// 1-15 用于最频繁的字段(节省传输字节)
// 16+ 用于不常用字段
message User {
int64 id = 1; // 频繁字段
string username = 2; // 频繁字段
string email = 3; // 频繁字段
UserStatus status = 4; // 频繁字段
// 较少使用的字段用较大编号
string phone = 16;
string avatar_url = 17;
google.protobuf.Timestamp created_at = 18;
map<string, string> metadata = 19;
}
enum UserStatus {
USER_STATUS_UNSPECIFIED = 0; // proto3 enum必须有0值
USER_STATUS_ACTIVE = 1;
USER_STATUS_INACTIVE = 2;
USER_STATUS_BANNED = 3;
}
message GetUserRequest {
int64 user_id = 1;
}
message CreateUserRequest {
string username = 1;
string email = 2;
google.protobuf.StringValue phone = 3; // 可选字段用Wrapper类型
}
message UserListRequest {
int32 page = 1;
int32 page_size = 2;
UserFilter filter = 3;
}
message UserFilter {
google.protobuf.StringValue username = 1;
repeated UserStatus statuses = 2; // 状态列表
}
message UserListResponse {
repeated User users = 1;
int32 total_count = 2;
bool has_next = 3;
}
// 服务定义
service UserService {
// 一元RPC
rpc GetUser(GetUserRequest) returns (User);
rpc CreateUser(CreateUserRequest) returns (User);
// 服务端流:适合大量数据导出
rpc ListUsersStream(UserListRequest) returns (stream User);
// 客户端流:适合批量导入
rpc BatchCreateUsers(stream CreateUserRequest) returns (BatchCreateResult);
// 双向流:实时通信
rpc UserChat(stream ChatMessage) returns (stream ChatMessage);
}
message BatchCreateResult {
int32 success_count = 1;
int32 failure_count = 2;
repeated string failure_reasons = 3;
}3.2 gRPC服务端实现
@GrpcService
public class UserGrpcService extends UserServiceGrpc.UserServiceImplBase {
@Autowired
private UserService userService;
// =============================================
// 一元RPC
// =============================================
@Override
public void getUser(GetUserRequest request, StreamObserver<User> responseObserver) {
try {
UserEntity user = userService.findById(request.getUserId());
if (user == null) {
responseObserver.onError(
Status.NOT_FOUND
.withDescription("用户不存在: " + request.getUserId())
.asRuntimeException()
);
return;
}
responseObserver.onNext(convertToProto(user));
responseObserver.onCompleted();
} catch (Exception e) {
log.error("获取用户失败", e);
responseObserver.onError(
Status.INTERNAL
.withDescription("服务内部错误")
.withCause(e)
.asRuntimeException()
);
}
}
// =============================================
// 服务端流:批量导出
// =============================================
@Override
public void listUsersStream(UserListRequest request,
StreamObserver<User> responseObserver) {
try {
// 流式写出,不需要一次性加载所有数据到内存
userService.findAllAsStream(request.getFilter())
.forEach(user -> {
responseObserver.onNext(convertToProto(user));
// 背压控制:gRPC Java的流写入会自动处理背压
});
responseObserver.onCompleted();
} catch (Exception e) {
responseObserver.onError(Status.INTERNAL.withCause(e).asRuntimeException());
}
}
// =============================================
// 客户端流:批量导入
// =============================================
@Override
public StreamObserver<CreateUserRequest> batchCreateUsers(
StreamObserver<BatchCreateResult> responseObserver) {
List<UserEntity> successList = new ArrayList<>();
List<String> failureReasons = new ArrayList<>();
AtomicInteger failureCount = new AtomicInteger(0);
return new StreamObserver<CreateUserRequest>() {
@Override
public void onNext(CreateUserRequest request) {
try {
UserEntity user = userService.create(convertFromProto(request));
successList.add(user);
} catch (Exception e) {
failureCount.incrementAndGet();
failureReasons.add(e.getMessage());
}
}
@Override
public void onError(Throwable t) {
log.error("客户端流发生错误", t);
// 客户端发生错误,返回已处理的结果
sendResult(responseObserver, successList.size(), failureCount.get(), failureReasons);
}
@Override
public void onCompleted() {
// 客户端发送完毕,返回汇总结果
sendResult(responseObserver, successList.size(), failureCount.get(), failureReasons);
}
};
}
private void sendResult(StreamObserver<BatchCreateResult> observer,
int successCount, int failureCount, List<String> reasons) {
BatchCreateResult result = BatchCreateResult.newBuilder()
.setSuccessCount(successCount)
.setFailureCount(failureCount)
.addAllFailureReasons(reasons)
.build();
observer.onNext(result);
observer.onCompleted();
}
}3.3 gRPC客户端实现
@Service
public class UserGrpcClient {
@GrpcClient("user-service")
private UserServiceGrpc.UserServiceBlockingStub blockingStub;
@GrpcClient("user-service")
private UserServiceGrpc.UserServiceStub asyncStub;
/**
* 一元调用
*/
public UserDTO getUser(Long userId) {
GetUserRequest request = GetUserRequest.newBuilder()
.setUserId(userId)
.build();
try {
User user = blockingStub
.withDeadlineAfter(5, TimeUnit.SECONDS) // 超时
.getUser(request);
return convertToDTO(user);
} catch (StatusRuntimeException e) {
if (e.getStatus().getCode() == Status.Code.NOT_FOUND) {
throw new UserNotFoundException(userId);
}
throw new ServiceException("获取用户失败", e);
}
}
/**
* 服务端流:接收流式数据
*/
public List<UserDTO> listUsersFromStream(UserFilter filter) throws InterruptedException {
UserListRequest request = UserListRequest.newBuilder()
.setPageSize(Integer.MAX_VALUE)
.setFilter(filter)
.build();
CountDownLatch latch = new CountDownLatch(1);
List<UserDTO> results = new CopyOnWriteArrayList<>();
AtomicReference<Throwable> error = new AtomicReference<>();
asyncStub.listUsersStream(request, new StreamObserver<User>() {
@Override
public void onNext(User user) {
results.add(convertToDTO(user));
}
@Override
public void onError(Throwable t) {
error.set(t);
latch.countDown();
}
@Override
public void onCompleted() {
latch.countDown();
}
});
latch.await(30, TimeUnit.SECONDS);
if (error.get() != null) {
throw new ServiceException("流式获取用户失败", error.get());
}
return results;
}
/**
* 客户端流:批量发送
*/
public BatchCreateResult batchCreateUsers(List<CreateUserDTO> users) throws InterruptedException {
CountDownLatch latch = new CountDownLatch(1);
AtomicReference<BatchCreateResult> result = new AtomicReference<>();
AtomicReference<Throwable> error = new AtomicReference<>();
StreamObserver<CreateUserRequest> requestObserver = asyncStub.batchCreateUsers(
new StreamObserver<BatchCreateResult>() {
@Override
public void onNext(BatchCreateResult r) { result.set(r); }
@Override
public void onError(Throwable t) { error.set(t); latch.countDown(); }
@Override
public void onCompleted() { latch.countDown(); }
}
);
for (CreateUserDTO dto : users) {
requestObserver.onNext(convertToProto(dto));
}
requestObserver.onCompleted(); // 告诉服务端发送完毕
latch.await(60, TimeUnit.SECONDS);
if (error.get() != null) {
throw new ServiceException("批量创建用户失败", error.get());
}
return result.get();
}
}3.4 gRPC与REST互操作(grpc-gateway思路)
/**
* 用Spring MVC包装gRPC服务,实现REST兼容层
* 不需要grpc-gateway,Java原生实现
*/
@RestController
@RequestMapping("/api/v1")
public class UserRestController {
@Autowired
private UserGrpcClient grpcClient; // 内部调用gRPC
@GetMapping("/users/{id}")
public ResponseEntity<UserDTO> getUser(@PathVariable Long id) {
try {
UserDTO user = grpcClient.getUser(id);
return ResponseEntity.ok(user);
} catch (UserNotFoundException e) {
return ResponseEntity.notFound().build();
}
}
@PostMapping("/users/batch")
public ResponseEntity<BatchResult> batchCreate(@RequestBody List<CreateUserDTO> users)
throws InterruptedException {
BatchCreateResult grpcResult = grpcClient.batchCreateUsers(users);
return ResponseEntity.ok(convertToRest(grpcResult));
}
}四、工程实践与最佳实践
4.1 Protobuf设计原则
// 好的Proto设计
message Order {
// 1. 字段编号不能修改(破坏兼容性)
int64 id = 1;
int64 user_id = 2;
// 2. 废弃字段用reserved,不能复用编号
reserved 5, 6;
reserved "old_field";
// 3. 枚举总是有UNSPECIFIED=0
OrderStatus status = 4;
// 4. 可选字段考虑用oneof或wrapper
oneof optional_coupon {
string coupon_code = 10;
}
// 5. 时间用Timestamp而不是string
google.protobuf.Timestamp created_at = 8;
}4.2 拦截器:认证与监控
// 服务端拦截器:JWT认证
@Component
public class AuthInterceptor implements ServerInterceptor {
static final Metadata.Key<String> AUTH_HEADER =
Metadata.Key.of("authorization", Metadata.ASCII_STRING_MARSHALLER);
@Override
public <Q, R> ServerCall.Listener<Q> interceptCall(
ServerCall<Q, R> call, Metadata headers, ServerCallHandler<Q, R> next) {
String token = headers.get(AUTH_HEADER);
if (token == null || !validateToken(token)) {
call.close(Status.UNAUTHENTICATED.withDescription("无效的认证令牌"), new Metadata());
return new ServerCall.Listener<Q>() {};
}
// 将用户信息放入Context
Context ctx = Context.current().withValue(USER_CONTEXT_KEY, extractUser(token));
return Contexts.interceptCall(ctx, call, headers, next);
}
}五、踩坑实录
坑一:Proto字段修改导致数据不兼容
这是gRPC最大的坑之一。不兼容的修改包括:修改字段编号、修改字段类型、删除字段(不用reserved)、修改枚举值数字。
// 危险操作!
message User {
// 原来
// int64 id = 1;
// string username = 2;
// 错误:把id改成string类型 - 破坏兼容性
string id = 1;
string username = 2;
// 错误:复用了被删除字段的编号
// string deleted_field 原来是 3
string new_field = 3; // 旧客户端会把new_field解析成deleted_field的数据!
}解决方案:使用reserved声明废弃的编号,新字段用新编号。
坑二:gRPC Java的流式API不够友好
StreamObserver的回调式API在复杂业务逻辑下很难用,特别是双向流。我们后来把它封装成更接近同步的API。
坑三:连接管理与负载均衡
gRPC连接是长连接,在Kubernetes中,默认的轮询负载均衡是基于连接的,不是基于请求的。一旦建立连接,所有请求都走同一个Pod。
解决方案:在gRPC层做客户端负载均衡,或者用服务网格(Istio等)。
坑四:调试困难
gRPC使用二进制协议,不能直接用curl或者浏览器调试。我们统一配置了grpcurl工具,还在开发环境开启了gRPC反射,让工具能动态发现接口。
# 安装grpcurl
brew install grpcurl
# 查看所有服务
grpcurl -plaintext localhost:9090 list
# 调用接口
grpcurl -plaintext -d '{"user_id": 123}' \
localhost:9090 com.example.user.UserService/GetUser六、总结与个人判断
gRPC在内部微服务通信场景下,是目前性价比最高的技术选型之一。性能好、契约清晰、代码生成减少手写代码、四种RPC模式覆盖各种通信场景。
但我要说两点保留意见:
第一,学习成本不低。Proto文件设计、向后兼容规则、流式RPC的API,都需要时间掌握。团队规模小、业务复杂度不高的情况下,用HTTP/JSON的REST依然是更稳妥的选择。
第二,生态相对REST还是差一些。特别是调试工具、文档生成、API网关支持等方面,REST经历了十几年积累,gRPC还在追赶。
我的建议:内部服务间高频调用、对延迟敏感的链路,用gRPC;对外暴露的API、团队不熟悉的新项目,先用REST,有明确性能问题再迁移。不要为了用gRPC而用gRPC。
