第2349篇:Java AI微服务通信——gRPC在AI服务间调用中的工程优势
2026/4/30大约 4 分钟
第2349篇:Java AI微服务通信——gRPC在AI服务间调用中的工程优势
适读人群:构建AI微服务架构的Java工程师,关注服务间通信性能和类型安全的架构师 | 阅读时长:约16分钟 | 核心价值:理解gRPC在AI微服务场景的优势,掌握gRPC流式接口在AI输出传递中的实战用法
AI微服务体系里,服务间的通信量和对延迟的要求都更高:
- 推理服务 → 业务服务:传递LLM的流式输出
- 业务服务 → 向量服务:高频的Embedding查询
- 编排服务 → 多个AI子服务:并发调用多个AI能力
用REST/HTTP+JSON在这些场景里有一些明显的痛点:序列化开销大(大量文本JSON化再传输)、流式支持不原生、接口契约靠文档维护容易漂移。
gRPC在这些场景里有优势:Protocol Buffers序列化比JSON小得多、原生双向流、IDL定义强制契约。
为什么AI微服务适合gRPC
场景1:LLM流式输出在微服务间传递
用户 → 网关(REST/SSE) → 编排服务(gRPC Server-Streaming) → 推理服务推理服务用gRPC Server Streaming给编排服务发送token流,编排服务转成SSE给前端。这比在每个服务间都用HTTP SSE要干净得多。
场景2:高频向量检索
Embedding查询可能每秒发生几百次,JSON序列化的开销在这个量级下不可忽视。gRPC + Protobuf序列化效率比JSON高50-80%。
场景3:多AI服务并发调用
编排服务同时调用"摘要服务""分类服务""情感分析服务",gRPC的多路复用和HTTP/2特性让并发更高效。
实战:AI推理服务的gRPC接口定义
// ai_inference.proto
syntax = "proto3";
package com.example.ai;
option java_package = "com.example.ai.grpc";
option java_multiple_files = true;
// AI推理服务
service AiInferenceService {
// 同步对话(简单问答)
rpc Chat (ChatRequest) returns (ChatResponse);
// 流式对话(LLM token流)
rpc ChatStream (ChatRequest) returns (stream ChatStreamResponse);
// 批量Embedding(向量化)
rpc EmbedTexts (EmbedRequest) returns (EmbedResponse);
// 双向流(对话 + 实时结果)
rpc InteractiveChat (stream ChatRequest) returns (stream ChatStreamResponse);
}
message ChatRequest {
string conversation_id = 1;
string user_message = 2;
string system_prompt = 3;
ChatOptions options = 4;
}
message ChatOptions {
string model = 1;
float temperature = 2;
int32 max_tokens = 3;
}
message ChatResponse {
string conversation_id = 1;
string reply = 2;
UsageStats usage = 3;
}
message ChatStreamResponse {
string conversation_id = 1;
string token = 2; // 单个token
bool is_final = 3; // 是否是最后一个token
UsageStats usage = 4; // 只在is_final=true时填充
}
message EmbedRequest {
repeated string texts = 1; // 批量文本
string model = 2;
}
message EmbedResponse {
repeated Embedding embeddings = 1;
}
message Embedding {
repeated float vector = 1; // 向量数组(packed=true更高效)
int32 dimensions = 2;
}
message UsageStats {
int32 prompt_tokens = 1;
int32 completion_tokens = 2;
int64 processing_time_ms = 3;
}服务端实现:Spring Boot gRPC Server
<!-- pom.xml -->
<dependency>
<groupId>net.devh</groupId>
<artifactId>grpc-spring-boot-starter</artifactId>
<version>3.1.0</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-stub</artifactId>
</dependency># application.yml
grpc:
server:
port: 9090
max-inbound-message-size: 10MB # AI场景可能传递大文档@GrpcService
@RequiredArgsConstructor
@Slf4j
public class AiInferenceGrpcService extends AiInferenceServiceGrpc.AiInferenceServiceImplBase {
private final ChatClient.Builder chatClientBuilder;
private final EmbeddingModel embeddingModel;
@Override
public void chat(ChatRequest request, StreamObserver<ChatResponse> responseObserver) {
try {
ChatClient client = buildChatClient(request.getSystemPrompt(), request.getOptions());
long start = System.currentTimeMillis();
String reply = client.prompt()
.user(request.getUserMessage())
.call()
.content();
ChatResponse response = ChatResponse.newBuilder()
.setConversationId(request.getConversationId())
.setReply(reply)
.setUsage(UsageStats.newBuilder()
.setProcessingTimeMs(System.currentTimeMillis() - start)
.build())
.build();
responseObserver.onNext(response);
responseObserver.onCompleted();
} catch (Exception e) {
log.error("gRPC chat失败:conversationId={}", request.getConversationId(), e);
responseObserver.onError(Status.INTERNAL
.withDescription(e.getMessage())
.asRuntimeException());
}
}
@Override
public void chatStream(ChatRequest request, StreamObserver<ChatStreamResponse> responseObserver) {
try {
ChatClient client = buildChatClient(request.getSystemPrompt(), request.getOptions());
// Spring AI的Flux流 → gRPC流
client.prompt()
.user(request.getUserMessage())
.stream()
.content()
.subscribe(
token -> {
// 每个token作为一个gRPC流消息发送
ChatStreamResponse chunk = ChatStreamResponse.newBuilder()
.setConversationId(request.getConversationId())
.setToken(token)
.setIsFinal(false)
.build();
responseObserver.onNext(chunk);
},
error -> {
log.error("流式输出错误", error);
responseObserver.onError(Status.INTERNAL
.withDescription(error.getMessage())
.asRuntimeException());
},
() -> {
// 发送完成信号
ChatStreamResponse finalChunk = ChatStreamResponse.newBuilder()
.setConversationId(request.getConversationId())
.setIsFinal(true)
.build();
responseObserver.onNext(finalChunk);
responseObserver.onCompleted();
}
);
} catch (Exception e) {
responseObserver.onError(Status.INTERNAL
.withDescription(e.getMessage())
.asRuntimeException());
}
}
@Override
public void embedTexts(EmbedRequest request, StreamObserver<EmbedResponse> responseObserver) {
try {
// 批量向量化
List<String> texts = request.getTextsList();
EmbedResponse.Builder responseBuilder = EmbedResponse.newBuilder();
for (String text : texts) {
float[] vector = embeddingModel.embed(TextSegment.from(text)).content().vector();
Embedding.Builder embeddingBuilder = Embedding.newBuilder()
.setDimensions(vector.length);
for (float v : vector) {
embeddingBuilder.addVector(v);
}
responseBuilder.addEmbeddings(embeddingBuilder.build());
}
responseObserver.onNext(responseBuilder.build());
responseObserver.onCompleted();
} catch (Exception e) {
responseObserver.onError(Status.INTERNAL
.withDescription(e.getMessage())
.asRuntimeException());
}
}
private ChatClient buildChatClient(String systemPrompt, ChatOptions options) {
ChatClient.Builder builder = chatClientBuilder;
if (systemPrompt != null && !systemPrompt.isBlank()) {
builder = builder.defaultSystem(systemPrompt);
}
return builder.build();
}
}客户端调用
@Service
@Slf4j
public class AiInferenceClient {
@GrpcClient("ai-inference-service")
private AiInferenceServiceGrpc.AiInferenceServiceBlockingStub blockingStub;
@GrpcClient("ai-inference-service")
private AiInferenceServiceGrpc.AiInferenceServiceStub asyncStub;
// 同步调用
public String chat(String conversationId, String message) {
ChatRequest request = ChatRequest.newBuilder()
.setConversationId(conversationId)
.setUserMessage(message)
.build();
return blockingStub
.withDeadlineAfter(30, TimeUnit.SECONDS)
.chat(request)
.getReply();
}
// 流式调用(转为Reactor Flux)
public Flux<String> chatStream(String conversationId, String message) {
ChatRequest request = ChatRequest.newBuilder()
.setConversationId(conversationId)
.setUserMessage(message)
.build();
return Flux.create(sink -> {
asyncStub.chatStream(request, new StreamObserver<ChatStreamResponse>() {
@Override
public void onNext(ChatStreamResponse value) {
if (!value.getIsFinal() && !value.getToken().isEmpty()) {
sink.next(value.getToken());
}
}
@Override
public void onError(Throwable t) {
sink.error(t);
}
@Override
public void onCompleted() {
sink.complete();
}
});
});
}
// 批量Embedding
public List<float[]> embedTexts(List<String> texts) {
EmbedRequest request = EmbedRequest.newBuilder()
.addAllTexts(texts)
.build();
return blockingStub
.withDeadlineAfter(60, TimeUnit.SECONDS)
.embedTexts(request)
.getEmbeddingsList()
.stream()
.map(e -> {
float[] vector = new float[e.getDimensions()];
for (int i = 0; i < e.getVectorCount(); i++) {
vector[i] = e.getVector(i);
}
return vector;
})
.toList();
}
}gRPC在AI微服务场景的引入要权衡:它带来了更好的性能和类型安全,但也引入了.proto文件管理、代码生成、调试复杂度等额外成本。对于流量大、对延迟敏感的核心AI服务间通信,gRPC是值得投入的;对于低频调用的辅助服务,REST已经足够。
