第1757篇:gRPC在AI服务间通信的优势——双向流与Protocol Buffer的实战
第1757篇:gRPC在AI服务间通信的优势——双向流与Protocol Buffer的实战
我之前一直是REST的忠实用户,觉得REST够用就行,引入gRPC是过度设计。
这个想法在我做第一个AI系统时就动摇了。那个系统需要把一大段文本向量化,然后存入向量数据库。文本数据包含大量的Embedding向量(每个1536维的float数组),通过REST传输时,JSON序列化/反序列化的CPU开销大得出奇,而且JSON里的浮点数精度损失也是个隐患。
切换到gRPC之后,同样的向量数据传输,CPU消耗降了60%,延迟降了40%。
从那以后,凡是AI服务间的内部通信,我优先选gRPC。今天来聊聊为什么,以及怎么用。
一、为什么AI服务特别适合gRPC
Protocol Buffer的编码效率
AI服务经常传递的数据类型:embedding向量(大量浮点数)、张量数据、大段文本。这些数据用JSON表示非常低效——一个float32的JSON编码平均需要8-12字节,而Protobuf只需要4字节(就是float32本身的大小)。一个1536维的向量,JSON需要约15KB,Protobuf只需要6KB,体积减少60%以上。
HTTP/2的多路复用
gRPC基于HTTP/2,同一个连接可以并发传输多个流,不需要为每个请求建立新连接。AI服务间通信往往是高频的(比如每次对话都要做几次embedding查询),HTTP/2的连接复用能显著减少连接建立的开销。
流式通信的原生支持
AI推理的流式响应(SSE)在HTTP/1.1上需要一些技巧,gRPC把流式通信作为一等公民支持:单向流(Server Streaming)、客户端流(Client Streaming)、双向流(Bidirectional Streaming)。这对AI场景的各种流式交互非常自然。
强类型接口定义
Protobuf定义的接口是强类型的,客户端和服务端必须遵守同一个契约。这在团队协作中很重要,尤其是多个服务共同维护一套AI接口时,类型安全能避免很多运行时错误。
二、基础的Proto定义
先来定义AI服务的Protobuf接口:
// ai_service.proto
syntax = "proto3";
package ai.service.v1;
option java_package = "com.example.ai.grpc";
option java_outer_classname = "AIServiceProto";
option java_multiple_files = true;
// 通用消息类型
message TokenUsage {
int32 prompt_tokens = 1;
int32 completion_tokens = 2;
int32 total_tokens = 3;
}
// =====================
// 对话服务
// =====================
message ChatMessage {
string role = 1; // system/user/assistant/tool
string content = 2;
string name = 3; // 可选,角色名称
repeated ToolCall tool_calls = 4; // assistant调用工具时
string tool_call_id = 5; // tool角色时
}
message ToolCall {
string id = 1;
string type = 2; // "function"
FunctionCall function = 3;
}
message FunctionCall {
string name = 1;
string arguments = 2; // JSON字符串
}
message ChatRequest {
string model = 1;
repeated ChatMessage messages = 2;
float temperature = 3;
int32 max_tokens = 4;
bool stream = 5;
repeated ToolDefinition tools = 6;
map<string, string> metadata = 7; // 追踪信息等
}
message ChatResponse {
string id = 1;
string model = 2;
repeated ChatChoice choices = 3;
TokenUsage usage = 4;
int64 created = 5;
}
message ChatChoice {
int32 index = 1;
ChatMessage message = 2;
string finish_reason = 3;
}
// 流式响应的单个chunk
message ChatStreamChunk {
string id = 1;
string model = 2;
repeated StreamChoice choices = 3;
bool is_final = 4;
TokenUsage usage = 5; // 只在最后一个chunk里有
}
message StreamChoice {
int32 index = 1;
StreamDelta delta = 2;
string finish_reason = 3;
}
message StreamDelta {
string role = 1;
string content = 2;
repeated ToolCall tool_calls = 3;
}
// =====================
// Embedding服务
// =====================
message EmbeddingRequest {
string model = 1;
repeated string input = 2; // 支持批量
string encoding_format = 3; // float或base64
}
message EmbeddingResponse {
string model = 1;
repeated EmbeddingData data = 2;
TokenUsage usage = 3;
}
message EmbeddingData {
int32 index = 1;
repeated float embedding = 2; // Protobuf的repeated float比JSON高效很多
}
// =====================
// 服务定义
// =====================
service AIInferenceService {
// 普通对话(一次性)
rpc Chat(ChatRequest) returns (ChatResponse);
// 流式对话(Server Streaming)
rpc ChatStream(ChatRequest) returns (stream ChatStreamChunk);
// Embedding
rpc GetEmbeddings(EmbeddingRequest) returns (EmbeddingResponse);
// 批量Embedding(Client Streaming,客户端分批发送)
rpc GetEmbeddingsBatch(stream EmbeddingRequest) returns (EmbeddingResponse);
// 双向流:实时对话协作
rpc CollaborativeChat(stream ChatMessage) returns (stream ChatStreamChunk);
}三、服务端实现
Maven依赖配置:
<dependencies>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-netty-shaded</artifactId>
<version>1.59.0</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-protobuf</artifactId>
<version>1.59.0</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-stub</artifactId>
<version>1.59.0</version>
</dependency>
<!-- Spring Boot集成 -->
<dependency>
<groupId>net.devh</groupId>
<artifactId>grpc-server-spring-boot-starter</artifactId>
<version>2.15.0.RELEASE</version>
</dependency>
</dependencies>服务端实现:
@GrpcService
@Slf4j
public class AIInferenceServiceImpl extends AIInferenceServiceGrpc.AIInferenceServiceImplBase {
private final OpenAiClient openAiClient;
private final EmbeddingModel embeddingModel;
private final MeterRegistry meterRegistry;
@Override
public void chat(ChatRequest request, StreamObserver<ChatResponse> responseObserver) {
String requestId = request.getMetadataOrDefault("trace_id",
UUID.randomUUID().toString());
log.debug("Handling chat request: id={}, model={}", requestId, request.getModel());
try {
// 转换Protobuf消息为内部格式
List<Message> messages = request.getMessagesList().stream()
.map(this::convertMessage)
.collect(Collectors.toList());
// 调用LLM
LLMResponse llmResponse = openAiClient.chat(
LLMRequest.builder()
.model(request.getModel())
.messages(messages)
.temperature(request.getTemperature())
.maxTokens(request.getMaxTokens())
.build()
);
// 构建Protobuf响应
ChatResponse response = ChatResponse.newBuilder()
.setId(llmResponse.getId())
.setModel(llmResponse.getModel())
.addChoices(ChatChoice.newBuilder()
.setIndex(0)
.setMessage(ChatMessage.newBuilder()
.setRole("assistant")
.setContent(llmResponse.getContent())
.build())
.setFinishReason(llmResponse.getFinishReason())
.build())
.setUsage(TokenUsage.newBuilder()
.setPromptTokens(llmResponse.getUsage().getPromptTokens())
.setCompletionTokens(llmResponse.getUsage().getCompletionTokens())
.setTotalTokens(llmResponse.getUsage().getTotalTokens())
.build())
.setCreated(System.currentTimeMillis() / 1000)
.build();
responseObserver.onNext(response);
responseObserver.onCompleted();
meterRegistry.counter("grpc.chat.success", "model", request.getModel()).increment();
} catch (Exception e) {
log.error("Chat request failed: {}", e.getMessage());
meterRegistry.counter("grpc.chat.error").increment();
responseObserver.onError(
Status.INTERNAL.withDescription(e.getMessage())
.withCause(e).asRuntimeException()
);
}
}
@Override
public void chatStream(ChatRequest request,
StreamObserver<ChatStreamChunk> responseObserver) {
log.debug("Handling streaming chat request, model={}", request.getModel());
try {
List<Message> messages = request.getMessagesList().stream()
.map(this::convertMessage)
.collect(Collectors.toList());
AtomicInteger chunkIndex = new AtomicInteger(0);
AtomicReference<TokenUsage> finalUsage = new AtomicReference<>();
// 调用流式LLM API,订阅每个token
openAiClient.chatStream(LLMRequest.builder()
.model(request.getModel())
.messages(messages)
.stream(true)
.build()
).subscribe(
chunk -> {
// 每个token都发送一个chunk
ChatStreamChunk.Builder chunkBuilder = ChatStreamChunk.newBuilder()
.setId(chunk.getId())
.setModel(chunk.getModel())
.setIsFinal(false);
if (chunk.getDelta() != null) {
chunkBuilder.addChoices(StreamChoice.newBuilder()
.setIndex(0)
.setDelta(StreamDelta.newBuilder()
.setContent(chunk.getDelta().getContent() != null ?
chunk.getDelta().getContent() : "")
.build())
.setFinishReason(chunk.getFinishReason() != null ?
chunk.getFinishReason() : "")
.build());
}
if (chunk.getUsage() != null) {
finalUsage.set(TokenUsage.newBuilder()
.setPromptTokens(chunk.getUsage().getPromptTokens())
.setCompletionTokens(chunk.getUsage().getCompletionTokens())
.setTotalTokens(chunk.getUsage().getTotalTokens())
.build());
}
responseObserver.onNext(chunkBuilder.build());
chunkIndex.incrementAndGet();
},
error -> {
log.error("Streaming chat error: {}", error.getMessage());
responseObserver.onError(
Status.INTERNAL.withDescription(error.getMessage()).asRuntimeException()
);
},
() -> {
// 发送最终chunk,包含完整的token使用统计
ChatStreamChunk finalChunk = ChatStreamChunk.newBuilder()
.setIsFinal(true)
.setUsage(finalUsage.get() != null ?
finalUsage.get() : TokenUsage.getDefaultInstance())
.build();
responseObserver.onNext(finalChunk);
responseObserver.onCompleted();
log.debug("Streaming chat completed, chunks={}", chunkIndex.get());
}
);
} catch (Exception e) {
responseObserver.onError(
Status.INTERNAL.withDescription(e.getMessage()).asRuntimeException()
);
}
}
@Override
public void getEmbeddings(EmbeddingRequest request,
StreamObserver<EmbeddingResponse> responseObserver) {
try {
List<float[]> embeddings = embeddingModel.embed(request.getInputList());
EmbeddingResponse.Builder responseBuilder = EmbeddingResponse.newBuilder()
.setModel(request.getModel());
for (int i = 0; i < embeddings.size(); i++) {
EmbeddingData.Builder dataBuilder = EmbeddingData.newBuilder()
.setIndex(i);
// 将float[]转换为repeated float
for (float f : embeddings.get(i)) {
dataBuilder.addEmbedding(f);
}
responseBuilder.addData(dataBuilder.build());
}
responseObserver.onNext(responseBuilder.build());
responseObserver.onCompleted();
} catch (Exception e) {
responseObserver.onError(
Status.INTERNAL.withDescription(e.getMessage()).asRuntimeException()
);
}
}
/**
* 双向流:协作对话
* 场景:客户端实时发送用户输入(可以是分片的),服务端实时回复
*/
@Override
public StreamObserver<ChatMessage> collaborativeChat(
StreamObserver<ChatStreamChunk> responseObserver) {
List<ChatMessage> receivedMessages = new CopyOnWriteArrayList<>();
return new StreamObserver<ChatMessage>() {
@Override
public void onNext(ChatMessage message) {
// 接收客户端发送的消息
receivedMessages.add(message);
// 如果是用户消息,立即开始推理并流式返回
if ("user".equals(message.getRole())) {
startStreamingResponse(receivedMessages, responseObserver);
}
}
@Override
public void onError(Throwable t) {
log.error("Collaborative chat client error: {}", t.getMessage());
}
@Override
public void onCompleted() {
responseObserver.onCompleted();
log.debug("Collaborative chat session ended");
}
};
}
private void startStreamingResponse(List<ChatMessage> messages,
StreamObserver<ChatStreamChunk> observer) {
// 异步执行,不阻塞接收线程
CompletableFuture.runAsync(() -> {
List<Message> internalMessages = messages.stream()
.map(this::convertMessage)
.collect(Collectors.toList());
openAiClient.chatStream(LLMRequest.builder()
.messages(internalMessages)
.build()
).subscribe(
chunk -> {
if (chunk.getDelta() != null && chunk.getDelta().getContent() != null) {
observer.onNext(ChatStreamChunk.newBuilder()
.addChoices(StreamChoice.newBuilder()
.setDelta(StreamDelta.newBuilder()
.setContent(chunk.getDelta().getContent())
.build())
.build())
.build());
}
},
error -> observer.onError(Status.INTERNAL
.withDescription(error.getMessage()).asRuntimeException()),
() -> observer.onNext(ChatStreamChunk.newBuilder()
.setIsFinal(true).build())
);
});
}
}四、客户端调用
@Service
@Slf4j
public class AIInferenceGrpcClient {
@GrpcClient("ai-inference-service")
private AIInferenceServiceGrpc.AIInferenceServiceBlockingStub blockingStub;
@GrpcClient("ai-inference-service")
private AIInferenceServiceGrpc.AIInferenceServiceStub asyncStub;
/**
* 同步对话
*/
public ChatResponse chat(String userMessage, String systemPrompt) {
ChatRequest request = ChatRequest.newBuilder()
.setModel("gpt-4")
.addMessages(ChatMessage.newBuilder()
.setRole("system")
.setContent(systemPrompt)
.build())
.addMessages(ChatMessage.newBuilder()
.setRole("user")
.setContent(userMessage)
.build())
.setTemperature(0.7f)
.setMaxTokens(2048)
.putMetadata("trace_id", getCurrentTraceId())
.build();
try {
return blockingStub
.withDeadlineAfter(60, TimeUnit.SECONDS)
.chat(request);
} catch (StatusRuntimeException e) {
log.error("gRPC chat failed: status={}, message={}",
e.getStatus().getCode(), e.getMessage());
throw new AIServiceException("Chat failed: " + e.getMessage(), e);
}
}
/**
* 流式对话,返回Flux
*/
public Flux<String> chatStream(String userMessage, String systemPrompt) {
ChatRequest request = ChatRequest.newBuilder()
.setModel("gpt-4")
.addMessages(ChatMessage.newBuilder()
.setRole("system")
.setContent(systemPrompt)
.build())
.addMessages(ChatMessage.newBuilder()
.setRole("user")
.setContent(userMessage)
.build())
.setStream(true)
.build();
return Flux.create(sink -> {
asyncStub.chatStream(request, new StreamObserver<ChatStreamChunk>() {
@Override
public void onNext(ChatStreamChunk chunk) {
if (!chunk.getIsFinal() && !chunk.getChoicesList().isEmpty()) {
String content = chunk.getChoices(0).getDelta().getContent();
if (!content.isEmpty()) {
sink.next(content);
}
}
}
@Override
public void onError(Throwable t) {
sink.error(t);
}
@Override
public void onCompleted() {
sink.complete();
}
});
});
}
/**
* 批量Embedding(客户端流)
*/
public List<float[]> getEmbeddingsBatch(List<String> texts, int batchSize) {
List<float[]> results = new ArrayList<>();
CountDownLatch latch = new CountDownLatch(1);
AtomicReference<Throwable> error = new AtomicReference<>();
StreamObserver<EmbeddingResponse> responseObserver = new StreamObserver<EmbeddingResponse>() {
@Override
public void onNext(EmbeddingResponse response) {
response.getDataList().forEach(data -> {
float[] embedding = new float[data.getEmbeddingCount()];
for (int i = 0; i < data.getEmbeddingCount(); i++) {
embedding[i] = data.getEmbedding(i);
}
results.add(embedding);
});
}
@Override
public void onError(Throwable t) {
error.set(t);
latch.countDown();
}
@Override
public void onCompleted() {
latch.countDown();
}
};
StreamObserver<EmbeddingRequest> requestObserver =
asyncStub.getEmbeddingsBatch(responseObserver);
// 分批发送
for (int i = 0; i < texts.size(); i += batchSize) {
List<String> batch = texts.subList(i, Math.min(i + batchSize, texts.size()));
requestObserver.onNext(EmbeddingRequest.newBuilder()
.setModel("text-embedding-3-small")
.addAllInput(batch)
.build());
}
requestObserver.onCompleted();
try {
latch.await(120, TimeUnit.SECONDS);
if (error.get() != null) {
throw new RuntimeException("Batch embedding failed", error.get());
}
return results;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("Interrupted while waiting for embeddings", e);
}
}
}五、拦截器:认证与追踪
gRPC的拦截器类似Spring MVC的Filter,是处理横切关注点的好地方:
/**
* 服务端认证拦截器
*/
@Component
public class AuthServerInterceptor implements ServerInterceptor {
private static final Metadata.Key<String> AUTH_TOKEN_KEY =
Metadata.Key.of("authorization", Metadata.ASCII_STRING_MARSHALLER);
private final JwtTokenValidator tokenValidator;
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
ServerCall<ReqT, RespT> call,
Metadata headers,
ServerCallHandler<ReqT, RespT> next) {
String token = headers.get(AUTH_TOKEN_KEY);
if (token == null || !token.startsWith("Bearer ")) {
call.close(Status.UNAUTHENTICATED.withDescription("Missing or invalid token"),
new Metadata());
return new ServerCall.Listener<>() {};
}
try {
UserPrincipal principal = tokenValidator.validate(token.substring(7));
// 将用户信息存入Context,供后续处理器使用
Context ctx = Context.current()
.withValue(UserContextKey.USER_PRINCIPAL, principal);
return Contexts.interceptCall(ctx, call, headers, next);
} catch (TokenExpiredException e) {
call.close(Status.UNAUTHENTICATED.withDescription("Token expired"), new Metadata());
return new ServerCall.Listener<>() {};
} catch (Exception e) {
call.close(Status.INTERNAL.withDescription("Auth error: " + e.getMessage()),
new Metadata());
return new ServerCall.Listener<>() {};
}
}
}
/**
* 追踪拦截器:从元数据中提取TraceContext
*/
@Component
public class TracingServerInterceptor implements ServerInterceptor {
private static final Metadata.Key<String> TRACE_ID_KEY =
Metadata.Key.of("x-trace-id", Metadata.ASCII_STRING_MARSHALLER);
private static final Metadata.Key<String> SPAN_ID_KEY =
Metadata.Key.of("x-span-id", Metadata.ASCII_STRING_MARSHALLER);
private final Tracer tracer;
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
ServerCall<ReqT, RespT> call,
Metadata headers,
ServerCallHandler<ReqT, RespT> next) {
String traceId = headers.get(TRACE_ID_KEY);
String spanId = headers.get(SPAN_ID_KEY);
Span span;
if (traceId != null) {
// 续接上游的Trace
SpanContext remoteContext = SpanContext.createFromRemoteParent(
TraceId.fromLowerBase16(traceId, 0),
SpanId.fromLowerBase16(spanId, 0),
TraceFlags.getSampled(),
TraceState.getDefault()
);
span = tracer.spanBuilder("grpc." + call.getMethodDescriptor().getFullMethodName())
.setParent(io.opentelemetry.context.Context.root().with(
Span.wrap(remoteContext)))
.startSpan();
} else {
span = tracer.spanBuilder("grpc." + call.getMethodDescriptor().getFullMethodName())
.startSpan();
}
try (Scope scope = span.makeCurrent()) {
return new ForwardingServerCallListener.SimpleForwardingServerCallListener<ReqT>(
next.startCall(call, headers)) {
@Override
public void onComplete() {
span.end();
super.onComplete();
}
@Override
public void onCancel() {
span.setStatus(StatusCode.ERROR, "Cancelled");
span.end();
super.onCancel();
}
};
}
}
}六、性能调优
连接池配置
gRPC连接是长连接,但如果所有请求都走同一个连接,高并发时会有瓶颈。合理配置连接数:
grpc:
client:
ai-inference-service:
address: 'discovery:///ai-inference-service'
negotiation-type: plaintext
# 启用连接池
enable-keep-alive: true
keep-alive-time: 30s
keep-alive-timeout: 10s
keep-alive-without-calls: true
max-inbound-message-size: 50MB # embedding数据可能很大
max-outbound-message-size: 10MB流控与背压
流式响应时,如果消费端处理速度跟不上生产端,需要背压控制,否则内存会爆:
// 在StreamObserver里控制流量
public class BackpressureStreamObserver<T> implements StreamObserver<T> {
private final Semaphore semaphore;
private final Consumer<T> handler;
public BackpressureStreamObserver(int maxConcurrent, Consumer<T> handler) {
this.semaphore = new Semaphore(maxConcurrent);
this.handler = handler;
}
@Override
public void onNext(T value) {
try {
semaphore.acquire(); // 限制并发处理数
try {
handler.accept(value);
} finally {
semaphore.release();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
@Override
public void onError(Throwable t) {
log.error("Stream error: {}", t.getMessage());
}
@Override
public void onCompleted() {
log.debug("Stream completed");
}
}七、与REST共存:gRPC-Gateway
有时候需要同时支持gRPC和REST(比如浏览器客户端只能用HTTP/1.1),可以用grpc-gateway,或者更简单的方案:在Spring Boot里同时暴露gRPC端口和REST端口:
@RestController
@RequestMapping("/api/v1/ai")
@Slf4j
public class AIServiceRestAdapter {
private final AIInferenceGrpcClient grpcClient;
// REST接口内部调用gRPC
@PostMapping("/chat")
public ResponseEntity<ChatResponseDTO> chat(@RequestBody ChatRequestDTO request) {
ChatResponse grpcResponse = grpcClient.chat(
request.getMessage(), request.getSystemPrompt());
return ResponseEntity.ok(ChatResponseDTO.from(grpcResponse));
}
// 流式REST接口
@GetMapping(value = "/chat/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<String>> chatStream(@RequestParam String message) {
return grpcClient.chatStream(message, "You are a helpful assistant.")
.map(content -> ServerSentEvent.builder(content).build())
.onErrorReturn(ServerSentEvent.builder("[ERROR]").build());
}
}八、踩坑记录
坑一:Proto3默认值的陷阱
Proto3里,所有字段都有默认值:int默认0,string默认空字符串,bool默认false。这意味着你无法区分"用户设置了temperature=0"和"用户没有设置temperature"——因为两者序列化后的结果是一样的。
解法是用oneof或者包装类型(google.protobuf.FloatValue),这样可以区分null和0:
import "google/protobuf/wrappers.proto";
message ChatRequest {
// 使用包装类型,可以区分null和0
google.protobuf.FloatValue temperature = 3;
}坑二:流式调用的超时设置
对流式调用使用withDeadlineAfter要注意:deadline是整个流的超时,不是单个消息的超时。如果AI推理时间很长,deadline要设置得足够宽松,否则流在中途就会被截断:
// 错误:60秒对于长流可能不够
blockingStub.withDeadlineAfter(60, TimeUnit.SECONDS).chatStream(request, observer);
// 正确:流式接口要设置更长的deadline
asyncStub.withDeadlineAfter(300, TimeUnit.SECONDS).chatStream(request, observer);坑三:大消息的内存问题
向量数据或者大文本通过gRPC传输时,如果不设置max-inbound-message-size,默认的4MB限制会导致传输失败。但设置得太大又可能导致内存问题。
我的建议:对于大于1MB的数据,考虑分批传输(用客户端流),而不是单条大消息。
坑四:服务发现与gRPC的集成
gRPC的服务发现需要特别处理。默认情况下,gRPC连接建立后会一直连到同一台服务器,不会随着Nacos上服务实例的变化而重新路由。需要配置服务发现感知的负载均衡器,让gRPC在每次调用时都能选择最新的可用实例。
使用grpc-spring-boot-starter的discovery://地址格式,配合Spring Cloud服务发现,可以解决这个问题。
九、性能对比数据
在我们的系统里做了一组对比测试(条件:1000个并发客户端,每个发送1536维向量):
| 指标 | REST+JSON | gRPC+Protobuf |
|---|---|---|
| 消息体积 | 约24KB | 约6.2KB |
| 序列化耗时 | 1.8ms | 0.4ms |
| P99延迟 | 120ms | 45ms |
| CPU使用率 | 78% | 32% |
| 吞吐量 | 8,200/s | 22,000/s |
这个差异在向量数据量越大的时候越显著。如果你的AI系统频繁传递embedding向量,gRPC带来的收益非常实际。
