第1756篇:服务注册与发现的AI适配——Nacos管理动态模型路由
第1756篇:服务注册与发现的AI适配——Nacos管理动态模型路由
有个让我想了很久的问题:AI系统里的"服务"和传统微服务的"服务"到底有什么不同?
传统微服务,你注册的是一个order-service,它的实例无论部署在哪台机器上,功能是完全相同的。但AI服务不一样——同样是"AI对话服务",部署了GPT-4的和部署了GPT-3.5的,能力差了好几个档次;挂载了GPU的实例和只有CPU的实例,响应速度可能相差10倍。
这意味着传统的"服务注册与发现"在AI场景需要升级——不只是找到有这个服务的实例,而是要找到最适合当前请求的实例。
今天就来讲这个:怎么用Nacos实现AI服务的智能路由。
一、Nacos在AI系统中的新角色
Nacos在传统微服务里主要干两件事:服务注册发现 + 配置管理。
在AI系统里,我们要把它的能力用得更深:
动态模型路由:不同的请求路由到不同的模型实例(按模型版本、按功能、按负载)。
模型实例的元数据管理:每个AI服务实例注册时携带丰富的元数据——支持哪些模型、GPU显存多少、当前负载、特殊能力(支持函数调用?支持视觉理解?)。
配置中心的特殊用途:提示词模板、模型参数(temperature、top_p)、路由规则,都通过Nacos配置中心管理,支持热更新。
二、AI服务实例的元数据设计
这是整个方案的基础,元数据设计得好,后续路由就灵活。
@Configuration
@Slf4j
public class AIServiceRegistrationConfig {
@Value("${spring.ai.model.name:gpt-3.5-turbo}")
private String modelName;
@Value("${spring.ai.model.version:unknown}")
private String modelVersion;
@Value("${spring.ai.gpu.memory.total:0}")
private int gpuMemoryTotal;
@Bean
public NamingService namingService(NacosProperties nacosProperties) throws Exception {
Properties properties = new Properties();
properties.setProperty("serverAddr", nacosProperties.getServerAddr());
properties.setProperty("namespace", nacosProperties.getNamespace());
return NamingFactory.createNamingService(properties);
}
@Bean
public CommandLineRunner registerAIService(NamingService namingService) {
return args -> {
// 构建AI实例元数据
Map<String, String> metadata = new HashMap<>();
// 模型能力元数据
metadata.put("model.name", modelName);
metadata.put("model.version", modelVersion);
metadata.put("model.family", getModelFamily(modelName));
// 硬件资源元数据
metadata.put("gpu.memory.total", String.valueOf(gpuMemoryTotal));
metadata.put("gpu.count", String.valueOf(detectGpuCount()));
metadata.put("hardware.type", gpuMemoryTotal > 0 ? "GPU" : "CPU");
// 能力标签元数据
Set<String> capabilities = detectCapabilities();
metadata.put("capabilities", String.join(",", capabilities));
// 优先级(用于负载均衡权重)
metadata.put("weight", String.valueOf(calculateWeight()));
// 注册服务
Instance instance = new Instance();
instance.setIp(getLocalIp());
instance.setPort(getServerPort());
instance.setMetadata(metadata);
instance.setWeight(calculateWeight());
instance.setHealthy(true);
instance.setEnabled(true);
namingService.registerInstance("ai-inference-service",
"ai-prod", instance);
log.info("Registered AI service instance: model={}, capabilities={}, gpu={}MB",
modelName, capabilities, gpuMemoryTotal);
};
}
private Set<String> detectCapabilities() {
Set<String> caps = new HashSet<>();
caps.add("text-generation");
if (modelName.contains("gpt-4") || modelName.contains("claude-3")) {
caps.add("function-calling");
caps.add("long-context");
}
if (modelName.contains("vision") || modelName.contains("4o")) {
caps.add("vision");
}
if (gpuMemoryTotal >= 24000) {
caps.add("large-context"); // 足够显存支持更长上下文
}
return caps;
}
private int calculateWeight() {
// 基础权重100,GPU实例额外加权
int weight = 100;
if (gpuMemoryTotal >= 80000) weight = 300; // 高端GPU
else if (gpuMemoryTotal >= 24000) weight = 200; // 中端GPU
else if (gpuMemoryTotal > 0) weight = 150; // 入门GPU
return weight;
}
private String getModelFamily(String modelName) {
if (modelName.startsWith("gpt-4")) return "gpt-4";
if (modelName.startsWith("gpt-3.5")) return "gpt-3.5";
if (modelName.startsWith("claude")) return "claude";
if (modelName.startsWith("llama")) return "llama";
return "unknown";
}
}三、智能路由选择器
有了丰富的元数据,路由选择就可以做得很精细:
@Service
@Slf4j
public class AIModelRouter {
private final NamingService namingService;
private final NacosConfigService configService;
private final AINodeStatusCache statusCache;
// 路由策略配置(从Nacos配置中心读取,支持热更新)
private volatile RoutingConfig routingConfig;
@PostConstruct
public void initRoutingConfig() {
// 监听路由配置变更
configService.addListener("ai-routing-config", "ai-prod", new Listener() {
@Override
public void receiveConfigInfo(String configInfo) {
try {
routingConfig = parseRoutingConfig(configInfo);
log.info("Routing config updated: {}", routingConfig);
} catch (Exception e) {
log.error("Failed to parse routing config: {}", e.getMessage());
}
}
@Override
public Executor getExecutor() {
return null; // 使用默认线程池
}
});
// 初始加载
String config = configService.getConfig("ai-routing-config", "ai-prod", 3000);
routingConfig = parseRoutingConfig(config);
}
/**
* 根据请求特征选择最合适的AI实例
*/
public Instance selectInstance(AIRequestContext context) throws NacosException {
// 获取所有健康实例
List<Instance> allInstances = namingService.selectInstances(
"ai-inference-service", "ai-prod", true);
if (allInstances.isEmpty()) {
throw new NoAvailableInstanceException("No healthy AI instances available");
}
// 按路由规则过滤
List<Instance> candidates = filterByRoutingRules(allInstances, context);
if (candidates.isEmpty()) {
log.warn("No instances match routing rules, falling back to all instances");
candidates = allInstances;
}
// 按实时负载排序
List<Instance> sorted = sortByLoad(candidates);
// 带权重的最终选择
return weightedSelect(sorted);
}
private List<Instance> filterByRoutingRules(List<Instance> instances,
AIRequestContext context) {
List<Instance> candidates = new ArrayList<>(instances);
// 规则1:必要能力过滤
if (context.requiresVision()) {
candidates.removeIf(instance -> {
String caps = instance.getMetadata().getOrDefault("capabilities", "");
return !caps.contains("vision");
});
}
if (context.requiresFunctionCalling()) {
candidates.removeIf(instance -> {
String caps = instance.getMetadata().getOrDefault("capabilities", "");
return !caps.contains("function-calling");
});
}
// 规则2:最小上下文长度过滤
if (context.getContextLength() > 32000) {
candidates.removeIf(instance -> {
String caps = instance.getMetadata().getOrDefault("capabilities", "");
return !caps.contains("long-context");
});
}
// 规则3:用户等级路由(高级用户路由到更好的模型)
if (context.getUserTier() == UserTier.PREMIUM && routingConfig.isPremiumGPT4Enabled()) {
List<Instance> premiumInstances = candidates.stream()
.filter(i -> "gpt-4".equals(i.getMetadata().get("model.family")))
.collect(Collectors.toList());
if (!premiumInstances.isEmpty()) {
return premiumInstances;
}
}
// 规则4:A/B测试路由
if (routingConfig.isABTestEnabled()) {
String abGroup = context.getUserId().hashCode() % 100 < routingConfig.getAbTestRatio()
? "B" : "A";
List<Instance> abInstances = candidates.stream()
.filter(i -> abGroup.equals(i.getMetadata().get("ab.group")))
.collect(Collectors.toList());
if (!abInstances.isEmpty()) {
return abInstances;
}
}
return candidates;
}
private List<Instance> sortByLoad(List<Instance> instances) {
return instances.stream()
.sorted(Comparator.comparingDouble(instance -> {
String instanceId = instance.getIp() + ":" + instance.getPort();
AINodeStatus status = statusCache.get(instanceId);
if (status == null) return 0.5; // 未知状态,给中等优先级
// 综合负载评分
double score = status.getGpuUtilization() * 0.4 +
status.getGpuMemoryUsed() * 0.3 +
(status.getActiveConnections() / 100.0) * 0.3;
return score;
}))
.collect(Collectors.toList());
}
private Instance weightedSelect(List<Instance> instances) {
if (instances.size() == 1) return instances.get(0);
// 取前三名做带权重选择
int topN = Math.min(3, instances.size());
double totalWeight = 0;
List<double[]> weightedInstances = new ArrayList<>();
for (int i = 0; i < topN; i++) {
Instance instance = instances.get(i);
double weight = instance.getWeight();
totalWeight += weight;
weightedInstances.add(new double[]{weight, i});
}
double rand = Math.random() * totalWeight;
double cumulative = 0;
for (double[] item : weightedInstances) {
cumulative += item[0];
if (rand <= cumulative) {
return instances.get((int) item[1]);
}
}
return instances.get(0);
}
}四、Nacos配置中心管理提示词与路由规则
这是我非常推崇的一个实践:把提示词和路由规则从代码里分离出来,放到Nacos配置中心统一管理。
为什么这很重要?因为提示词是AI产品最核心的"业务逻辑",它的修改频率比代码高得多,如果每次改提示词都要走代码发布流程(提交→Review→CI/CD→上线),效率极低,产品也没法快速迭代。
@Service
@Slf4j
public class DynamicPromptManager {
private final NacosConfigService configService;
// 提示词缓存,key=promptId, value=PromptTemplate
private final Map<String, PromptTemplate> promptCache = new ConcurrentHashMap<>();
@PostConstruct
public void init() {
// 注册各种提示词配置的监听器
registerPromptListener("system-prompts.yaml", "ai-prod");
registerPromptListener("tool-prompts.yaml", "ai-prod");
registerPromptListener("rag-prompts.yaml", "ai-prod");
}
private void registerPromptListener(String dataId, String group) {
try {
// 初始加载
String config = configService.getConfig(dataId, group, 5000);
loadPrompts(dataId, config);
// 监听变更
configService.addListener(dataId, group, new Listener() {
@Override
public void receiveConfigInfo(String configInfo) {
log.info("Prompt config changed: {}", dataId);
loadPrompts(dataId, configInfo);
}
@Override
public Executor getExecutor() {
return null;
}
});
} catch (NacosException e) {
log.error("Failed to register prompt listener for {}: {}", dataId, e.getMessage());
}
}
private void loadPrompts(String dataId, String yamlContent) {
if (yamlContent == null || yamlContent.isBlank()) return;
try {
Map<String, Object> config = yamlParser.load(yamlContent);
config.forEach((promptId, promptConfig) -> {
if (promptConfig instanceof Map) {
Map<String, Object> promptMap = (Map<String, Object>) promptConfig;
PromptTemplate template = PromptTemplate.builder()
.id(promptId)
.content((String) promptMap.get("content"))
.variables((List<String>) promptMap.getOrDefault("variables", List.of()))
.version((String) promptMap.getOrDefault("version", "1.0"))
.build();
promptCache.put(promptId, template);
log.debug("Loaded prompt: {} v{}", promptId, template.getVersion());
}
});
} catch (Exception e) {
log.error("Failed to load prompts from {}: {}", dataId, e.getMessage());
}
}
public String getPrompt(String promptId, Map<String, String> variables) {
PromptTemplate template = promptCache.get(promptId);
if (template == null) {
throw new PromptNotFoundException("Prompt not found: " + promptId);
}
return template.render(variables);
}
public String getPromptVersion(String promptId) {
PromptTemplate template = promptCache.get(promptId);
return template != null ? template.getVersion() : "unknown";
}
}Nacos里的提示词配置文件长这样(system-prompts.yaml):
customer-service-system:
version: "2.1"
content: |
你是「{{company_name}}」的AI客服助手,名字叫「{{bot_name}}」。
你的职责:
1. 解答用户关于产品的问题
2. 处理退换货请求,指引用户完成相关流程
3. 收集用户反馈
行为准则:
- 始终保持礼貌和专业
- 遇到无法解决的问题,及时转接人工客服
- 不要透露任何公司内部信息
- 不要承诺超出权限范围的事项
当前业务信息:
- 退货政策:7天无理由退货
- 客服热线:400-xxx-xxxx(工作日9:00-18:00)
variables:
- company_name
- bot_name
code-review-system:
version: "1.3"
content: |
你是一位资深Java工程师,专门负责代码审查。
审查重点:
1. 代码逻辑正确性
2. 性能问题(N+1查询、不必要的对象创建等)
3. 安全漏洞(SQL注入、XSS等)
4. 代码可读性和可维护性
5. 是否符合项目现有的编码规范
输出格式:
- 按严重程度(CRITICAL/MAJOR/MINOR)分类
- 每个问题给出具体的修复建议
- 总结性评价(通过/不通过/待讨论)
variables: []五、实时健康状态同步
前面提到AI节点要上报健康状态,这里用Nacos的临时实例特性来做:
@Component
@Slf4j
public class AINodeHealthReporter {
private final NamingService namingService;
private final GPUMetricsCollector gpuMetrics;
private final String instanceId;
@Scheduled(fixedDelay = 5000)
public void reportHealthStatus() {
try {
Instance instance = buildCurrentInstance();
// 更新实例元数据(包含最新的负载信息)
namingService.updateInstance("ai-inference-service", "ai-prod", instance);
} catch (NacosException e) {
log.warn("Failed to update health status in Nacos: {}", e.getMessage());
}
}
private Instance buildCurrentInstance() throws NacosException {
Instance instance = new Instance();
instance.setIp(getLocalIp());
instance.setPort(getServerPort());
instance.setEphemeral(true); // 临时实例,宕机后自动从注册中心删除
Map<String, String> metadata = new HashMap<>(getStaticMetadata());
// 动态健康指标
GPUMetrics metrics = gpuMetrics.collect();
metadata.put("gpu.utilization", String.format("%.2f", metrics.getUtilization()));
metadata.put("gpu.memory.used", String.valueOf(metrics.getMemoryUsed()));
metadata.put("active.connections",
String.valueOf(getActiveConnectionCount()));
metadata.put("queue.depth", String.valueOf(getRequestQueueDepth()));
metadata.put("health.timestamp",
String.valueOf(System.currentTimeMillis()));
// 根据负载动态调整权重
double load = calculateLoad(metrics);
if (load > 0.9) {
instance.setWeight(10); // 超载,几乎不接受新请求
metadata.put("node.state", "OVERLOADED");
} else if (load > 0.7) {
instance.setWeight(50); // 较忙
metadata.put("node.state", "BUSY");
} else {
instance.setWeight(100); // 正常
metadata.put("node.state", "HEALTHY");
}
instance.setMetadata(metadata);
return instance;
}
private double calculateLoad(GPUMetrics metrics) {
return metrics.getUtilization() * 0.5 +
metrics.getMemoryUsed() / (double) metrics.getMemoryTotal() * 0.5;
}
}六、多租户模型隔离
企业级AI系统通常要为不同的租户(客户)提供隔离的模型服务。用Nacos的命名空间和服务分组可以很优雅地实现这个需求:
@Service
public class MultiTenantModelRoutingService {
private final Map<String, NamingService> tenantNamingServices;
private final NacosProperties nacosProperties;
// 为每个租户创建独立的NamingService(对应不同的Nacos命名空间)
public NamingService getNamingServiceForTenant(String tenantId) {
return tenantNamingServices.computeIfAbsent(tenantId, id -> {
try {
Properties props = new Properties();
props.setProperty("serverAddr", nacosProperties.getServerAddr());
// 每个租户用独立的命名空间
props.setProperty("namespace", getTenantNamespace(id));
return NamingFactory.createNamingService(props);
} catch (NacosException e) {
throw new RuntimeException("Failed to create naming service for tenant: " + id, e);
}
});
}
public Instance selectInstanceForTenant(String tenantId, AIRequestContext context) {
try {
NamingService namingService = getNamingServiceForTenant(tenantId);
// 按租户配置的模型组查询
String serviceGroup = getTenantModelGroup(tenantId);
List<Instance> instances = namingService.selectInstances(
"ai-inference-service", serviceGroup, true);
if (instances.isEmpty()) {
// 降级到共享实例池
log.warn("No dedicated instances for tenant {}, falling back to shared pool",
tenantId);
return selectSharedInstance(context);
}
return instances.stream()
.filter(i -> meetsRequirements(i, context))
.min(Comparator.comparingDouble(i -> getCurrentLoad(i)))
.orElseThrow();
} catch (NacosException e) {
log.error("Failed to select instance for tenant {}: {}", tenantId, e.getMessage());
throw new RuntimeException(e);
}
}
private String getTenantNamespace(String tenantId) {
// 可以从数据库或配置里查询每个租户的命名空间
return tenantNamespaceMapping.getOrDefault(tenantId, "public");
}
private String getTenantModelGroup(String tenantId) {
// 租户的服务组名,可以对应不同的模型配置
TenantConfig config = tenantConfigRepo.findByTenantId(tenantId);
return config != null ? config.getModelGroup() : "default";
}
}七、金丝雀发布:新模型的渐进式上线
上线新版本的AI模型时,直接全量切换风险很大。用Nacos的权重机制可以实现金丝雀发布:
@Service
@Slf4j
public class ModelCanaryReleaseManager {
private final NamingService namingService;
private final NacosConfigService configService;
/**
* 开始金丝雀发布:将新版本模型实例的权重从0逐步增加
*/
public void startCanaryRelease(String canaryVersion, int targetTrafficPercent) {
// 在Nacos配置中记录金丝雀状态
CanaryReleaseConfig canaryConfig = CanaryReleaseConfig.builder()
.version(canaryVersion)
.targetPercent(targetTrafficPercent)
.status(CanaryStatus.IN_PROGRESS)
.startTime(LocalDateTime.now())
.build();
try {
configService.publishConfig(
"canary-release-config", "ai-prod",
objectMapper.writeValueAsString(canaryConfig)
);
// 设置金丝雀实例权重
updateCanaryInstanceWeights(canaryVersion, targetTrafficPercent);
log.info("Started canary release: version={}, targetPercent={}",
canaryVersion, targetTrafficPercent);
} catch (Exception e) {
log.error("Failed to start canary release: {}", e.getMessage());
throw new RuntimeException(e);
}
}
private void updateCanaryInstanceWeights(String version, int trafficPercent)
throws NacosException {
List<Instance> allInstances = namingService.selectInstances(
"ai-inference-service", "ai-prod", false);
for (Instance instance : allInstances) {
String instanceVersion = instance.getMetadata().get("model.version");
if (version.equals(instanceVersion)) {
// 新版本实例:权重设为 targetPercent
instance.setWeight(trafficPercent);
} else {
// 旧版本实例:权重设为 (100 - targetPercent)
instance.setWeight(100 - trafficPercent);
}
namingService.updateInstance("ai-inference-service", "ai-prod", instance);
}
}
/**
* 金丝雀发布成功,全量切换
*/
public void promoteCanary(String canaryVersion) throws NacosException {
List<Instance> allInstances = namingService.selectInstances(
"ai-inference-service", "ai-prod", false);
for (Instance instance : allInstances) {
String instanceVersion = instance.getMetadata().get("model.version");
if (version.equals(instanceVersion)) {
instance.setWeight(100);
namingService.updateInstance("ai-inference-service", "ai-prod", instance);
} else {
// 旧版本实例下线
namingService.deregisterInstance("ai-inference-service", "ai-prod", instance);
}
}
log.info("Canary release promoted: version={}", canaryVersion);
}
/**
* 金丝雀发布回滚
*/
public void rollbackCanary(String canaryVersion) throws NacosException {
List<Instance> allInstances = namingService.selectInstances(
"ai-inference-service", "ai-prod", false);
for (Instance instance : allInstances) {
String instanceVersion = instance.getMetadata().get("model.version");
if (canaryVersion.equals(instanceVersion)) {
// 新版本实例设为不可用
instance.setWeight(0);
instance.setEnabled(false);
namingService.updateInstance("ai-inference-service", "ai-prod", instance);
} else {
// 旧版本实例恢复全量
instance.setWeight(100);
instance.setEnabled(true);
namingService.updateInstance("ai-inference-service", "ai-prod", instance);
}
}
log.warn("Canary release rolled back: version={}", canaryVersion);
}
}八、服务发现在Spring Cloud的整合
把上面的能力整合到Spring Cloud Gateway的负载均衡器:
@Configuration
public class NacosAILoadBalancerConfig {
@LoadBalancerClient(name = "ai-inference-service",
configuration = AIModelLoadBalancerConfig.class)
public class LoadBalancerClientConfig {}
}
@Configuration
public class AIModelLoadBalancerConfig {
@Bean
public ReactorLoadBalancer<ServiceInstance> aiModelLoadBalancer(
Environment environment,
LoadBalancerClientFactory loadBalancerClientFactory,
AIModelRouter modelRouter) {
String name = environment.getProperty(LoadBalancerClientFactory.PROPERTY_NAME);
return new ReactorLoadBalancer<ServiceInstance>() {
@Override
public Mono<Response<ServiceInstance>> choose(Request request) {
return Mono.fromCallable(() -> {
// 从请求中提取AI上下文
HttpRequestContext httpContext = (HttpRequestContext) request.getContext();
AIRequestContext aiContext = extractAIContext(httpContext);
// 使用AI感知的路由逻辑选择实例
Instance nacosInstance = modelRouter.selectInstance(aiContext);
// 包装成Spring Cloud的ServiceInstance
ServiceInstance serviceInstance = new NacosServiceInstance(
nacosInstance, name);
return new DefaultResponse(serviceInstance);
});
}
};
}
private AIRequestContext extractAIContext(HttpRequestContext context) {
if (context == null) return AIRequestContext.defaultContext();
ServerHttpRequest request = context.getClientRequest();
String userId = request.getHeaders().getFirst("X-User-Id");
String userTier = request.getHeaders().getFirst("X-User-Tier");
String requiresVision = request.getHeaders().getFirst("X-Requires-Vision");
String contextLength = request.getHeaders().getFirst("X-Context-Length");
return AIRequestContext.builder()
.userId(userId)
.userTier(UserTier.fromString(userTier))
.requiresVision(Boolean.parseBoolean(requiresVision))
.contextLength(contextLength != null ? Integer.parseInt(contextLength) : 0)
.build();
}
}九、踩坑记录
坑一:临时实例心跳超时导致频繁重注册
Nacos默认临时实例心跳间隔5秒,超时阈值15秒。AI推理节点有时候因为处理大请求CPU跑满,心跳线程被饿死,超时后实例被摘除,然后恢复后重注册——这个过程会造成短暂的服务不可用。
解决方案:提高心跳线程优先级,或者把心跳间隔调小到2秒,让心跳更及时。
spring:
cloud:
nacos:
discovery:
heart-beat-interval: 2000 # 心跳间隔2秒
heart-beat-timeout: 6000 # 超时阈值6秒
ip-delete-timeout: 8000坑二:元数据更新有延迟
通过updateInstance更新实例的动态负载信息后,其他实例通过selectInstances查到的还是旧数据。Nacos的服务列表有本地缓存,默认每30秒主动刷新一次,加上Nacos Server端的更新广播,端到端延迟可能达到几十秒。
对于负载信息这种高频变化的数据,建议不要完全依赖Nacos元数据,而是在自己的Redis里维护一份实时状态,路由时从Redis读。Nacos元数据只做备用。
坑三:配置监听器在应用重启后失效
测试时发现,应用重启后配置监听器不会自动恢复,需要重新注册。这是因为NacosConfigService的监听器是基于内存的,重启后丢失。在@PostConstruct里重新注册就能解决,但要确保这个初始化逻辑在应用启动时可靠地执行。
坑四:多租户命名空间的连接数管理
给每个租户创建独立的NamingService连接,如果租户数量多,会创建大量的Nacos连接,耗尽连接池。要么限制最大租户连接数,要么用共享连接+服务分组代替多命名空间。
十、整体效果
用Nacos管理AI服务路由的价值,主要体现在三个地方:
一是模型灰度发布能力。以前上线新版本模型,要么全量切,要么维护两套部署配置,现在通过权重调整就能实现5%→20%→50%→100%的渐进上线,出问题随时回滚。
二是运营成本降低。高峰期自动路由到高性能GPU节点,低峰期轻量请求路由到CPU节点,大概节省了20%的GPU资源。
三是提示词迭代速度提升。提示词改动不需要走代码发布流程,产品同学通过Nacos配置控制台直接修改,十分钟内全量生效。这对AI产品的快速迭代非常重要。
