gRPC + AI——流式响应的高性能传输方案
gRPC + AI——流式响应的高性能传输方案
适读人群:Java 后端工程师 / 对 AI 高并发感兴趣 | 阅读时长:约18分钟 | 核心价值:从 REST 换到 gRPC 的完整方案,含性能对比和踩坑记录
那是去年双十一前一个月,我接到一个需求:把我们内部的 AI 写作辅助工具做成一个可以给十几个业务系统调用的服务。
最开始我用的 REST + SSE,也就是标准的 Server-Sent Events 做流式响应。单机测没问题,一测并发就露馅了:50 个并发连接撑着流式输出,服务器 CPU 打满,GC 频繁,内存抖动。更烦的是,同一个下游系统对接了好几次语言版本(Java、Python、Go),SSE 每次都要处理连接断开重连的逻辑,各自有各自的 bug。
当时我做了一个决定:换 gRPC。
这个决定在组里有点争议。有人说 gRPC 太重了,调试不方便,Postman 还不支持,学习成本高。我说行,我先做出来跑个对比,数据说话。
一周后,数据出来了:相同并发下,P99 延迟从 4.2 秒降到 1.8 秒,内存使用降了约 40%。之后再没人反对了。
今天把这套方案完整写下来。
gRPC vs HTTP SSE:不是谁好谁坏,是场景适配
先说清楚,不是"gRPC 比 SSE 好",是在特定场景下 gRPC 更合适。
SSE 适合的场景:
- 浏览器直连(SSE 是原生 Web 技术,gRPC-Web 在浏览器里受限较多)
- 服务少、连接数不高
- 团队没有 gRPC 经验,快速上线优先
- 只有单向流(服务端推送)
gRPC 适合的场景:
- 服务间通信(不是浏览器直连)
- 高并发、低延迟要求
- 多语言客户端(proto 文件自动生成客户端代码,省大量工作)
- 双向流(客户端也需要在流式响应过程中发数据)
- 需要强类型契约
我那个项目恰好满足"服务间通信 + 高并发 + 多语言客户端"三个条件,所以 gRPC 是对的选择。
Proto 文件定义
先定义服务契约,这是 gRPC 的核心。
syntax = "proto3";
package ai.service.v1;
option java_package = "com.yourcompany.ai.proto";
option java_outer_classname = "AiServiceProto";
option java_multiple_files = true;
// AI 聊天服务
service AiChatService {
// 普通聊天(非流式)
rpc Chat(ChatRequest) returns (ChatResponse);
// 流式聊天(服务端流)——AI 生成时逐 token 推送
rpc ChatStream(ChatRequest) returns (stream ChatChunk);
// 双向流(支持用户中途打断、修改要求)
rpc ChatBidirectional(stream ChatMessage) returns (stream ChatChunk);
}
// 请求消息
message ChatRequest {
string conversation_id = 1; // 会话 ID,用于追踪
string user_message = 2; // 用户消息
repeated Message history = 3; // 历史消息
ChatOptions options = 4; // 可选参数
}
// 单条历史消息
message Message {
enum Role {
USER = 0;
ASSISTANT = 1;
SYSTEM = 2;
}
Role role = 1;
string content = 2;
}
// 聊天配置
message ChatOptions {
float temperature = 1; // 温度,0-1
int32 max_tokens = 2; // 最大生成长度
string model = 3; // 模型名称(不填用默认)
}
// 非流式响应
message ChatResponse {
string conversation_id = 1;
string content = 2;
UsageStats usage = 3; // token 用量
int64 latency_ms = 4; // 耗时(方便客户端监控)
}
// 流式响应的一个 chunk
message ChatChunk {
string conversation_id = 1;
string delta = 2; // 这次增量内容
bool is_final = 3; // 是否是最后一个 chunk
string finish_reason = 4; // 结束原因:stop/length/error
UsageStats usage = 5; // 只在 is_final=true 时有值
// 错误信息(is_final=true 且发生错误时)
ErrorInfo error = 6;
}
// 双向流的消息(可以在流式响应过程中发新消息)
message ChatMessage {
enum MessageType {
USER_INPUT = 0; // 用户输入
INTERRUPT = 1; // 打断当前生成
CONTEXT_UPDATE = 2; // 更新上下文
}
MessageType type = 1;
string content = 2;
string conversation_id = 3;
}
// Token 用量
message UsageStats {
int32 prompt_tokens = 1;
int32 completion_tokens = 2;
int32 total_tokens = 3;
}
// 错误信息
message ErrorInfo {
string code = 1; // 错误码
string message = 2; // 错误描述
bool retryable = 3; // 是否可以重试
}Spring Boot 3 服务端实现
<!-- pom.xml 关键依赖 -->
<dependencies>
<dependency>
<groupId>net.devh</groupId>
<artifactId>grpc-server-spring-boot-starter</artifactId>
<version>3.1.0.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.ai</groupId>
<artifactId>spring-ai-openai-spring-boot-starter</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
<version>0.6.1</version>
<configuration>
<protocArtifact>com.google.protobuf:protoc:3.25.0:exe:${os.detected.classifier}</protocArtifact>
<pluginId>grpc-java</pluginId>
<pluginArtifact>io.grpc:protoc-gen-grpc-java:1.59.0:exe:${os.detected.classifier}</pluginArtifact>
</configuration>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>compile-custom</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>@GrpcService
@Slf4j
public class AiChatServiceImpl extends AiChatServiceGrpc.AiChatServiceImplBase {
@Autowired
private ChatClient chatClient;
@Autowired
private ConversationHistoryService historyService;
/**
* 普通聊天(非流式)
*/
@Override
public void chat(ChatRequest request, StreamObserver<ChatResponse> responseObserver) {
String conversationId = request.getConversationId();
long startTime = System.currentTimeMillis();
try {
// 构建提示词,带历史上下文
ChatClient.CallResponseSpec response = chatClient.prompt()
.messages(buildMessages(request))
.options(ChatOptionsBuilder.builder()
.withTemperature(request.getOptions().getTemperature())
.withMaxTokens(request.getOptions().getMaxTokens())
.build())
.call();
String content = response.content();
ChatResponse.Builder builder = ChatResponse.newBuilder()
.setConversationId(conversationId)
.setContent(content)
.setLatencyMs(System.currentTimeMillis() - startTime);
// 保存对话历史
historyService.save(conversationId, request.getUserMessage(), content);
responseObserver.onNext(builder.build());
responseObserver.onCompleted();
} catch (Exception e) {
log.error("Chat failed, conversationId={}", conversationId, e);
responseObserver.onError(
Status.INTERNAL
.withDescription("AI service error: " + e.getMessage())
.withCause(e)
.asRuntimeException()
);
}
}
/**
* 流式聊天——这是重点
* 注意:StreamObserver 不是线程安全的,必须在同一个线程里调用
*/
@Override
public void chatStream(ChatRequest request, StreamObserver<ChatChunk> responseObserver) {
String conversationId = request.getConversationId();
log.info("Stream chat started, conversationId={}", conversationId);
try {
// Spring AI 的流式调用
Flux<String> contentFlux = chatClient.prompt()
.messages(buildMessages(request))
.stream()
.content();
// 注意:这里订阅时必须处理背压
// gRPC 的 StreamObserver 有自己的流控机制
contentFlux
.doOnNext(delta -> {
// 每来一个 token,立刻发给客户端
ChatChunk chunk = ChatChunk.newBuilder()
.setConversationId(conversationId)
.setDelta(delta)
.setIsFinal(false)
.build();
// StreamObserver.onNext 不是线程安全的
// 但 Project Reactor 默认单线程 subscribe,这里是安全的
responseObserver.onNext(chunk);
})
.doOnComplete(() -> {
// 发送结束标记
ChatChunk finalChunk = ChatChunk.newBuilder()
.setConversationId(conversationId)
.setIsFinal(true)
.setFinishReason("stop")
.build();
responseObserver.onNext(finalChunk);
responseObserver.onCompleted();
log.info("Stream chat completed, conversationId={}", conversationId);
})
.doOnError(error -> {
log.error("Stream chat error, conversationId={}", conversationId, error);
// 发送错误 chunk,给客户端机会处理
ChatChunk errorChunk = ChatChunk.newBuilder()
.setConversationId(conversationId)
.setIsFinal(true)
.setFinishReason("error")
.setError(ErrorInfo.newBuilder()
.setCode("AI_ERROR")
.setMessage(error.getMessage())
.setRetryable(isRetryableError(error))
.build())
.build();
try {
responseObserver.onNext(errorChunk);
responseObserver.onCompleted();
} catch (Exception e) {
// 客户端可能已断开
responseObserver.onError(
Status.INTERNAL.withDescription(error.getMessage()).asRuntimeException()
);
}
})
.subscribe();
} catch (Exception e) {
log.error("Failed to start stream chat, conversationId={}", conversationId, e);
responseObserver.onError(
Status.INTERNAL.withDescription(e.getMessage()).asRuntimeException()
);
}
}
/**
* 构建 Spring AI 需要的 Message 列表
*/
private List<org.springframework.ai.chat.messages.Message> buildMessages(ChatRequest request) {
List<org.springframework.ai.chat.messages.Message> messages = new ArrayList<>();
// 加入历史消息
for (Message histMsg : request.getHistoryList()) {
switch (histMsg.getRole()) {
case USER -> messages.add(new UserMessage(histMsg.getContent()));
case ASSISTANT -> messages.add(new AssistantMessage(histMsg.getContent()));
case SYSTEM -> messages.add(new SystemMessage(histMsg.getContent()));
}
}
// 加入当前用户消息
messages.add(new UserMessage(request.getUserMessage()));
return messages;
}
private boolean isRetryableError(Throwable error) {
String msg = error.getMessage();
if (msg == null) return false;
return msg.contains("rate limit") || msg.contains("timeout") || msg.contains("503");
}
}gRPC 服务端配置
grpc:
server:
port: 9090
# 关键配置:AI 响应慢,必须设够长
keep-alive-time: 60s
keep-alive-timeout: 20s
permit-keep-alive-without-calls: true
# 消息大小限制:AI 响应可能很长
max-inbound-message-size: 10MB
max-outbound-message-size: 10MB
# 并发流数量
# 根据服务器性能和 AI 调用并发量调整
flow-control-window: 1048576 # 1MBJava 客户端实现
@Service
@Slf4j
public class AiChatGrpcClient {
@GrpcClient("ai-service")
private AiChatServiceGrpc.AiChatServiceStub asyncStub;
@GrpcClient("ai-service")
private AiChatServiceGrpc.AiChatServiceBlockingStub blockingStub;
/**
* 流式调用,返回 Flux
* 这样调用方可以用响应式方式处理流式数据
*/
public Flux<String> streamChat(String conversationId, String userMessage) {
ChatRequest request = ChatRequest.newBuilder()
.setConversationId(conversationId)
.setUserMessage(userMessage)
.build();
return Flux.create(sink -> {
asyncStub.chatStream(request, new StreamObserver<ChatChunk>() {
private final StringBuilder fullContent = new StringBuilder();
@Override
public void onNext(ChatChunk chunk) {
if (!chunk.getIsFinal()) {
// 有新 token,推给订阅者
sink.next(chunk.getDelta());
fullContent.append(chunk.getDelta());
} else {
// 流结束
if (chunk.hasError()) {
log.error("AI stream error: {}", chunk.getError().getMessage());
if (chunk.getError().getRetryable()) {
sink.error(new RetryableAiException(chunk.getError().getMessage()));
} else {
sink.error(new AiException(chunk.getError().getMessage()));
}
}
// 可以在这里保存完整内容
}
}
@Override
public void onError(Throwable t) {
log.error("gRPC stream error", t);
sink.error(t);
}
@Override
public void onCompleted() {
sink.complete();
}
});
});
}
}踩的几个坑
坑1:StreamObserver 的线程安全问题
gRPC 的 StreamObserver.onNext() 不是线程安全的。如果你在多线程里调用它(比如用了 Schedulers.parallel()),就会出现消息乱序或者直接报错。
解决方案:要么用 synchronized,要么确保在同一个线程里调用。我选的是后者,用 Schedulers.single() 限定在单线程订阅。
坑2:大 Prompt 超出消息大小限制
gRPC 默认的消息大小限制是 4MB。如果你把整个文档内容放到消息里传,分分钟超限。我犯过这个错误,直接报 RESOURCE_EXHAUSTED: gRPC message exceeds maximum size。
解决方案:一是在配置里调大限制(如上面 yml 里的配置),二是更好的做法——把长文档存到对象存储,传引用不传内容。
坑3:客户端连接数不够
gRPC 默认用 HTTP/2,一个 TCP 连接可以复用多个 stream。但如果你的客户端配置了连接数上限太小,高并发时会排队。
# 客户端配置
grpc:
client:
ai-service:
address: static://localhost:9090
negotiation-type: plaintext
# 这两个配置控制连接池大小
keep-alive-time: 60s
keep-alive-timeout: 20s坑4:中文乱码
proto 的 string 类型本身支持 UTF-8,不会乱码。但如果你的代码里有地方做了 bytes 转换,指定了错误的 charset,就会乱码。检查一遍所有 new String(bytes) 的地方,确保都加了 StandardCharsets.UTF_8。
性能对比数据
这是我在同一台机器(8核16G)上,用 50 并发打了 5 分钟的数据:
测试场景:用户发问,AI 流式生成约 300 字的回复
REST + SSE:
P50: 1.2s
P95: 3.8s
P99: 4.2s
内存使用:峰值 4.1GB
GC 频率:约每 30 秒一次 Full GC(有问题)
gRPC Server-Side Stream:
P50: 0.9s
P95: 1.5s
P99: 1.8s
内存使用:峰值 2.4GB
GC 频率:基本没有 Full GC差距最大的是尾延迟(P99)。P50 差距不大,但 P99 从 4.2 秒降到 1.8 秒,对用户体验的影响是质的变化。
内存下降是因为 gRPC + HTTP/2 的连接复用,减少了大量的连接对象创建和回收。
什么时候不用 gRPC
这不是在劝所有人都上 gRPC。这几种情况就别折腾了:
- 前端直连:浏览器原生不支持 gRPC(gRPC-Web 有限制),用 SSE 更简单
- 单体应用:服务内部调用,直接函数调用就行,网络传输是多余的
- 团队没人熟悉 proto:学习成本在赶工期时是实际代价
- 并发低:QPS 低于 100,REST 完全够用,没必要引入复杂性
gRPC 是工具,不是信仰。并发高、多语言客户端、需要双向流这几个条件满足了再考虑。
