AI 应用的 gRPC + 双向流——比 WebSocket 更适合 AI 通信吗
AI 应用的 gRPC + 双向流——比 WebSocket 更适合 AI 通信吗
去年底我们团队在做一个实时 AI 助手功能,需要把 LLM 的流式输出推给前端。最开始用的是 SSE,简单快上。后来随着功能迭代,需要支持用户在生成过程中发送中断指令、追加上下文——这时候 SSE 的单向性就成了硬伤,我们切到了 WebSocket。
再后来,后端服务之间也开始有 AI 流式调用的需求:一个 orchestration service 需要把任务分发给多个 AI worker,每个 worker 返回的是流式结果,orchestration 还需要实时聚合。这种场景下,WebSocket 的管理成本开始变得难以接受。
就是在这个背景下,我开始认真看 gRPC 双向流(bidirectional streaming)在 AI 通信场景下的适用性。今天这篇文章,把这三种通信协议的对比和 gRPC + Spring AI 的实际集成方案系统梳理一遍。
三种协议的本质差异
先把概念理清楚,再谈适用场景。
SSE(Server-Sent Events)
SSE 是 HTTP 协议上的单向流。服务器可以持续向客户端推送数据,但客户端不能在同一个连接上向服务器发送数据(只能发起新的 HTTP 请求)。
协议本质:HTTP/1.1 长连接,Content-Type: text/event-stream
Client → Server: GET /chat/stream HTTP/1.1
Server → Client: data: {"token": "你好"}\n\n
Server → Client: data: {"token": ",我是"}\n\n
Server → Client: data: {"token": "AI助手"}\n\n
Server → Client: data: [DONE]\n\nWebSocket
WebSocket 是全双工的持久连接,客户端和服务器都可以在任意时刻发送消息。基于 HTTP Upgrade 握手建立,之后走独立的 WebSocket 帧协议。
gRPC 双向流
gRPC 基于 HTTP/2,支持四种 RPC 类型:
- Unary(一问一答)
- Server streaming(服务器流)
- Client streaming(客户端流)
- Bidirectional streaming(双向流)
双向流模式下,客户端和服务器都可以独立发送消息流,互不阻塞。这是在协议层面的多路复用,不同于 WebSocket 的帧序列化方式。
三者的详细对比
| 维度 | SSE | WebSocket | gRPC 双向流 |
|---|---|---|---|
| 方向性 | 单向(服务→客户端) | 全双工 | 全双工 |
| 协议基础 | HTTP/1.1 | 独立协议(HTTP升级) | HTTP/2 |
| 浏览器原生支持 | 是 | 是 | 需要 grpc-web |
| 消息格式 | 文本 | 文本/二进制 | Protobuf(二进制) |
| 负载均衡 | 标准 HTTP 负载均衡 | 粘性连接,难均衡 | 内置多种策略 |
| 服务间调用 | 不适合 | 勉强可用 | 天然适合 |
| 强类型契约 | 无 | 无 | Proto 定义 |
| 流量控制 | 无 | 基础 | HTTP/2 原生支持 |
gRPC 双向流在 AI 场景的实际优劣
优势
1. 服务间通信的天然契合
如果你的 AI 调用链是 API Gateway → Orchestration Service → AI Worker Service,服务之间用 gRPC 双向流可以拿到完整的流式结果,同时维持强类型的接口契约。WebSocket 在这里显得格外笨拙,因为你需要自己定义消息协议,还要处理 WebSocket 的连接管理。
2. 多路复用降低连接开销
HTTP/2 的多路复用意味着一个 TCP 连接上可以同时跑多个独立的 gRPC 流。在并发 AI 请求场景下,这比 WebSocket 的每连接独立 TCP 要节省很多资源。
3. 背压(Backpressure)支持
AI 模型生成 token 的速率有时候比客户端消费的速率快(比如批量处理场景),HTTP/2 的流量控制机制可以天然地处理这种背压情况,而 WebSocket 的背压需要自己在应用层实现。
4. Proto 定义的类型安全
AI 应用的消息格式往往比较复杂(thinking block、tool call、content block 等),用 Proto 定义可以保证服务间的格式一致性,避免 JSON 解析错误。
劣势
1. 浏览器端需要 grpc-web
gRPC 原生不能在浏览器里跑,需要引入 grpc-web 代理层(通常是 Envoy),增加了部署复杂度。如果你的 AI 接口需要直接面向浏览器,这是个真实的成本。
2. 调试和可观测性难度更高
WebSocket 的消息你可以在浏览器 DevTools 里直接看;gRPC 的 Protobuf 消息是二进制的,需要专门的工具(grpcurl、Postman gRPC 等)。
3. 与 Spring AI 的集成不如 HTTP 成熟
Spring AI 内置的 ChatClient 是基于 HTTP 的,如果你要用 gRPC 传输 AI 流式结果,需要自己做适配层,不是开箱即用的。
结论:gRPC 双向流更适合 AI 后端服务间通信,而不是前端直连场景。
代码:gRPC streaming + Spring AI 的集成实现
第一步:定义 Proto 文件
syntax = "proto3";
package ai.service;
option java_package = "com.laozhang.ai.grpc";
option java_outer_classname = "AiServiceProto";
// AI 流式服务定义
service AiStreamService {
// 双向流:客户端发送对话消息,服务端流式返回生成结果
rpc ChatStream(stream ChatRequest) returns (stream ChatResponse);
// 服务端流:适合单次查询的流式输出
rpc GenerateStream(GenerateRequest) returns (stream GenerateResponse);
}
message ChatRequest {
string session_id = 1;
string message = 2;
MessageType type = 3;
map<string, string> metadata = 4;
}
enum MessageType {
USER_MESSAGE = 0;
INTERRUPT = 1; // 中断生成
CONTEXT_APPEND = 2; // 追加上下文
}
message ChatResponse {
string session_id = 1;
string token = 2; // 流式 token
bool is_complete = 3; // 是否完成
string full_content = 4; // 完成时的完整内容
ResponseType response_type = 5;
TokenUsage usage = 6;
}
enum ResponseType {
TOKEN_CHUNK = 0;
TOOL_CALL = 1;
COMPLETE = 2;
ERROR = 3;
}
message TokenUsage {
int32 input_tokens = 1;
int32 output_tokens = 2;
}
message GenerateRequest {
string prompt = 1;
int32 max_tokens = 2;
}
message GenerateResponse {
string token = 1;
bool is_done = 2;
}第二步:服务端实现(Spring Boot + Spring AI)
import io.grpc.stub.StreamObserver;
import net.devh.boot.grpc.server.service.GrpcService;
import org.springframework.ai.chat.messages.UserMessage;
import org.springframework.ai.chat.model.ChatModel;
import org.springframework.ai.chat.prompt.Prompt;
import org.springframework.beans.factory.annotation.Autowired;
import reactor.core.publisher.Flux;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
@GrpcService
public class AiStreamServiceImpl extends AiStreamServiceGrpc.AiStreamServiceImplBase {
@Autowired
private ChatModel chatModel;
// 管理活跃会话
private final Map<String, AtomicBoolean> sessionInterruptFlags = new ConcurrentHashMap<>();
@Override
public StreamObserver<ChatRequest> chatStream(StreamObserver<ChatResponse> responseObserver) {
return new StreamObserver<ChatRequest>() {
private String currentSessionId;
private final StringBuilder contextBuffer = new StringBuilder();
@Override
public void onNext(ChatRequest request) {
currentSessionId = request.getSessionId();
// 处理不同类型的消息
switch (request.getType()) {
case INTERRUPT -> handleInterrupt(request.getSessionId(), responseObserver);
case CONTEXT_APPEND -> contextBuffer.append("\n").append(request.getMessage());
case USER_MESSAGE -> handleUserMessage(request, responseObserver);
default -> {} // 未知类型忽略
}
}
@Override
public void onError(Throwable t) {
cleanup(currentSessionId);
// 客户端断连,清理资源
}
@Override
public void onCompleted() {
cleanup(currentSessionId);
responseObserver.onCompleted();
}
private void handleUserMessage(ChatRequest request,
StreamObserver<ChatResponse> responseObserver) {
String sessionId = request.getSessionId();
AtomicBoolean interrupted = new AtomicBoolean(false);
sessionInterruptFlags.put(sessionId, interrupted);
String fullPrompt = contextBuffer.length() > 0
? contextBuffer + "\n\nUser: " + request.getMessage()
: request.getMessage();
// 使用 Spring AI 的流式调用
Flux<String> tokenStream = chatModel.stream(
new Prompt(new UserMessage(fullPrompt))
).map(response -> response.getResult().getOutput().getContent());
StringBuilder fullContent = new StringBuilder();
tokenStream.subscribe(
token -> {
if (interrupted.get()) return; // 已中断,丢弃后续 token
fullContent.append(token);
// 发送 token chunk
ChatResponse chunkResponse = ChatResponse.newBuilder()
.setSessionId(sessionId)
.setToken(token)
.setIsComplete(false)
.setResponseType(ResponseType.TOKEN_CHUNK)
.build();
responseObserver.onNext(chunkResponse);
},
error -> {
// 发送错误响应
ChatResponse errorResponse = ChatResponse.newBuilder()
.setSessionId(sessionId)
.setResponseType(ResponseType.ERROR)
.setFullContent(error.getMessage())
.setIsComplete(true)
.build();
responseObserver.onNext(errorResponse);
},
() -> {
// 发送完成响应
if (!interrupted.get()) {
ChatResponse completeResponse = ChatResponse.newBuilder()
.setSessionId(sessionId)
.setIsComplete(true)
.setFullContent(fullContent.toString())
.setResponseType(ResponseType.COMPLETE)
.build();
responseObserver.onNext(completeResponse);
}
sessionInterruptFlags.remove(sessionId);
}
);
}
private void handleInterrupt(String sessionId,
StreamObserver<ChatResponse> responseObserver) {
AtomicBoolean flag = sessionInterruptFlags.get(sessionId);
if (flag != null) {
flag.set(true);
}
// 可以发送一个中断确认消息
ChatResponse interruptAck = ChatResponse.newBuilder()
.setSessionId(sessionId)
.setResponseType(ResponseType.COMPLETE)
.setIsComplete(true)
.setFullContent("[已中断]")
.build();
responseObserver.onNext(interruptAck);
}
private void cleanup(String sessionId) {
if (sessionId != null) {
sessionInterruptFlags.remove(sessionId);
}
}
};
}
}第三步:客户端调用(服务间调用示例)
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.stub.StreamObserver;
import net.devh.boot.grpc.client.inject.GrpcClient;
import org.springframework.stereotype.Service;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
@Service
public class AiGrpcClientService {
@GrpcClient("ai-worker-service")
private AiStreamServiceGrpc.AiStreamServiceStub asyncStub;
/**
* 双向流调用 AI 服务
*/
public void chatWithAI(String sessionId, String message,
Consumer<String> tokenHandler,
Runnable onComplete) throws InterruptedException {
CountDownLatch completeLatch = new CountDownLatch(1);
// 设置服务端响应处理器
StreamObserver<ChatResponse> responseObserver = new StreamObserver<ChatResponse>() {
@Override
public void onNext(ChatResponse response) {
if (response.getResponseType() == ResponseType.TOKEN_CHUNK) {
tokenHandler.accept(response.getToken());
} else if (response.getIsComplete()) {
onComplete.run();
}
}
@Override
public void onError(Throwable t) {
completeLatch.countDown();
// 错误处理
}
@Override
public void onCompleted() {
completeLatch.countDown();
}
};
// 获取客户端流 Observer
StreamObserver<ChatRequest> requestObserver = asyncStub.chatStream(responseObserver);
// 发送用户消息
ChatRequest request = ChatRequest.newBuilder()
.setSessionId(sessionId)
.setMessage(message)
.setType(MessageType.USER_MESSAGE)
.build();
requestObserver.onNext(request);
// 等待完成(最多 120 秒)
completeLatch.await(120, TimeUnit.SECONDS);
requestObserver.onCompleted();
}
/**
* 发送中断指令
*/
public void interruptGeneration(StreamObserver<ChatRequest> requestObserver,
String sessionId) {
ChatRequest interruptRequest = ChatRequest.newBuilder()
.setSessionId(sessionId)
.setType(MessageType.INTERRUPT)
.build();
requestObserver.onNext(interruptRequest);
}
}第四步:Spring Boot 配置
# application.yml
grpc:
server:
port: 9090
max-inbound-message-size: 10485760 # 10MB
client:
ai-worker-service:
address: "discovery:///ai-worker-service" # 使用服务发现
negotiation-type: plaintext
keepAlive:
time: 30s
timeout: 10s
without-calls: true@Configuration
public class GrpcConfig {
@Bean
public ServerInterceptor authInterceptor() {
return new ServerInterceptor() {
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
ServerCall<ReqT, RespT> call,
Metadata headers,
ServerCallHandler<ReqT, RespT> next) {
// 提取 JWT token 做认证
String token = headers.get(
Metadata.Key.of("authorization", Metadata.ASCII_STRING_MARSHALLER));
if (!isValidToken(token)) {
call.close(Status.UNAUTHENTICATED.withDescription("Invalid token"), headers);
return new ServerCall.Listener<ReqT>() {};
}
return next.startCall(call, headers);
}
private boolean isValidToken(String token) {
// JWT 验证逻辑
return token != null && token.startsWith("Bearer ");
}
};
}
}实际场景下三种协议的选型建议
选型总结:
- 浏览器直连 + 只需要服务端推送:用 SSE,最简单,兼容性最好
- 浏览器直连 + 需要双向交互(用户中断、追加上下文):用 WebSocket
- 后端服务间 AI 流式调用:首选 gRPC 双向流
- 服务间调用但团队没有 gRPC 经验:用 HTTP/2 + SSE 作为过渡方案
总结
gRPC 双向流不是比 WebSocket "更好",而是解决了不同场景的问题。
对于前后端 AI 流式交互,WebSocket 或 SSE 仍然是更务实的选择——部署简单,调试方便,生态成熟。
gRPC 双向流的真正价值在于微服务架构下的 AI 能力调用链:强类型契约、多路复用、内置流量控制,这些优势在服务间通信场景下是实实在在的工程收益。
如果你的 AI 应用还处于早期阶段,不要为了 gRPC 而 gRPC。等到服务间 AI 调用变得复杂,流量压力上来之后,再做迁移是更理性的选择。
