第1774篇:本地模型与云端模型的混合调度——按敏感度和成本动态切换
第1774篇:本地模型与云端模型的混合调度——按敏感度和成本动态切换
这个话题我聊了很多次,但每次都有新的角度。
最近帮一个医疗科技公司做AI架构评审,他们的问题很典型:业务上要用AI,但合规要求患者数据不能出境,甚至不能离开他们自己的数据中心。所以云端API根本没法用,只能本地部署。
但本地部署的问题是:硬件成本高,模型能力弱,维护复杂。完全放弃云端API,又损失了很多能力。
解决方案:混合调度——敏感数据走本地模型,非敏感数据走云端,成本和能力都不妥协。
这个思路不只适合医疗行业,任何有数据安全要求或者想降低API依赖的团队都适用。
混合调度的核心逻辑
混合调度要解决三个问题:
1. 数据敏感度分级:哪些数据能出去,哪些不能。 2. 本地模型能力评估:本地模型能搞定哪些任务,搞不定的才交给云端。 3. 成本最优分配:在满足敏感度约束的前提下,选成本最低的方案。
数据敏感度分级体系
先建立分级标准,这是整个系统的基础。
public enum DataSensitivityLevel {
/**
* Level 1 - 严格保密:绝对不能出本地环境
* 例:患者个人信息、金融账户数据、商业核心机密
*/
CONFIDENTIAL(1, "严格保密", false),
/**
* Level 2 - 内部敏感:脱敏后可以外部处理
* 例:用户行为数据、内部流程数据、员工信息(脱敏后)
*/
INTERNAL(2, "内部敏感", true), // canDesensitize = true
/**
* Level 3 - 一般使用:可以在云端处理
* 例:产品描述、公开文档、FAQ内容
*/
GENERAL(3, "一般使用", true),
/**
* Level 4 - 完全公开
*/
PUBLIC(4, "公开数据", true);
private final int level;
private final String description;
private final boolean canSendToCloud; // 脱敏后是否可送云端
DataSensitivityLevel(int level, String description, boolean canSendToCloud) {
this.level = level;
this.description = description;
this.canSendToCloud = canSendToCloud;
}
}数据敏感度检测器
@Component
public class DataSensitivityDetector {
// 敏感信息的正则模式
private static final Map<String, Pattern> SENSITIVE_PATTERNS = Map.of(
"ID_CARD", Pattern.compile("[1-9]\\d{5}(18|19|20)\\d{2}(0[1-9]|1[0-2])(0[1-9]|[12]\\d|3[01])\\d{3}[0-9Xx]"),
"PHONE", Pattern.compile("1[3-9]\\d{9}"),
"BANK_CARD", Pattern.compile("\\b[1-9]\\d{15,18}\\b"),
"PATIENT_ID", Pattern.compile("P\\d{8}"), // 医疗业务的患者ID格式
"EMAIL", Pattern.compile("[a-zA-Z0-9._%+\\-]+@[a-zA-Z0-9.\\-]+\\.[a-zA-Z]{2,}")
);
// 业务敏感关键词(医疗场景)
private static final Set<String> MEDICAL_SENSITIVE_KEYWORDS = Set.of(
"诊断", "病历", "处方", "手术", "住院", "出院", "用药", "化验", "检查结果"
);
/**
* 检测文本的敏感级别
*/
public SensitivityCheckResult check(String text, String featureCode) {
List<String> foundPatterns = new ArrayList<>();
List<String> maskedPositions = new ArrayList<>();
// 1. 检测结构化敏感信息(身份证、手机号等)
for (Map.Entry<String, Pattern> entry : SENSITIVE_PATTERNS.entrySet()) {
Matcher matcher = entry.getValue().matcher(text);
while (matcher.find()) {
foundPatterns.add(entry.getKey() + ":" + matcher.group());
maskedPositions.add(entry.getKey() + "@[" + matcher.start() + "," + matcher.end() + "]");
}
}
// 2. 检测业务敏感关键词
boolean hasMedicalContent = MEDICAL_SENSITIVE_KEYWORDS.stream()
.anyMatch(text::contains);
// 3. 判断敏感级别
DataSensitivityLevel level;
if (!foundPatterns.isEmpty()) {
level = DataSensitivityLevel.CONFIDENTIAL;
} else if (hasMedicalContent && "medical_context".equals(featureCode)) {
level = DataSensitivityLevel.INTERNAL;
} else {
level = DataSensitivityLevel.GENERAL;
}
return SensitivityCheckResult.builder()
.level(level)
.foundPatterns(foundPatterns)
.maskablePositions(maskedPositions)
.canDesensitize(!foundPatterns.isEmpty())
.build();
}
/**
* 对文本进行脱敏处理
*/
public String desensitize(String text) {
String result = text;
for (Map.Entry<String, Pattern> entry : SENSITIVE_PATTERNS.entrySet()) {
result = entry.getValue().matcher(result).replaceAll(m -> {
String matched = m.group();
// 保留首尾,中间替换为*
if (matched.length() <= 4) return "****";
return matched.substring(0, 2) + "*".repeat(matched.length() - 4) + matched.substring(matched.length() - 2);
});
}
return result;
}
}本地模型能力评估
知道了数据敏感度,还要知道本地模型能不能搞定这个任务。
本地模型的能力通常弱于云端旗舰,我们需要维护一个能力矩阵。
@Component
public class LocalModelCapabilityMatrix {
/**
* 本地模型能力评分(0-1)
* 数据来自离线测试 benchmark,定期更新
*/
private final Map<String, Map<String, Double>> capabilityScores;
@PostConstruct
public void init() {
// 格式:模型名 -> 任务类型 -> 能力分数
capabilityScores = new HashMap<>();
// Qwen2.5-7B 本地部署
Map<String, Double> qwen7b = new HashMap<>();
qwen7b.put("text_classification", 0.88);
qwen7b.put("sentiment_analysis", 0.85);
qwen7b.put("simple_qa", 0.82);
qwen7b.put("text_summarization", 0.75);
qwen7b.put("data_extraction", 0.80);
qwen7b.put("code_generation", 0.55); // 代码能力较弱
qwen7b.put("complex_reasoning", 0.60);
capabilityScores.put("qwen2.5-7b", qwen7b);
// Llama-3.1-8B 本地部署
Map<String, Double> llama8b = new HashMap<>();
llama8b.put("text_classification", 0.85);
llama8b.put("sentiment_analysis", 0.83);
llama8b.put("simple_qa", 0.80);
llama8b.put("text_summarization", 0.72);
llama8b.put("data_extraction", 0.75);
llama8b.put("code_generation", 0.65);
llama8b.put("complex_reasoning", 0.63);
capabilityScores.put("llama-3.1-8b", llama8b);
}
/**
* 判断本地模型是否胜任某类任务
* @param modelName 本地模型名
* @param taskType 任务类型
* @param minAcceptableScore 最低可接受分数
*/
public boolean isCapable(String modelName, String taskType, double minAcceptableScore) {
Map<String, Double> scores = capabilityScores.get(modelName);
if (scores == null) return false;
Double score = scores.get(taskType);
return score != null && score >= minAcceptableScore;
}
/**
* 获取最佳的本地模型
*/
public Optional<String> getBestLocalModel(String taskType, double minScore) {
return capabilityScores.entrySet().stream()
.filter(e -> {
Double score = e.getValue().get(taskType);
return score != null && score >= minScore;
})
.max(Comparator.comparingDouble(e -> e.getValue().getOrDefault(taskType, 0.0)))
.map(Map.Entry::getKey);
}
}混合调度器核心实现
把数据敏感度检测和能力评估结合起来,做出调度决策。
@Service
@Slf4j
public class HybridScheduler {
@Autowired
private DataSensitivityDetector sensitivityDetector;
@Autowired
private LocalModelCapabilityMatrix capabilityMatrix;
@Autowired
private LocalModelInferenceService localInference;
@Autowired
private CloudModelService cloudService;
@Autowired
private TokenCostRecorder costRecorder;
/**
* 核心调度方法
*/
public SchedulerResult dispatch(SchedulerRequest request) {
String fullText = buildFullText(request);
// Step 1: 检测数据敏感度
SensitivityCheckResult sensitivityResult =
sensitivityDetector.check(fullText, request.getFeatureCode());
DataSensitivityLevel level = sensitivityResult.getLevel();
log.info("敏感度检测: featureCode={}, level={}, patterns={}",
request.getFeatureCode(), level, sensitivityResult.getFoundPatterns());
// Step 2: 根据敏感度决定路由策略
if (level == DataSensitivityLevel.CONFIDENTIAL) {
// 严格保密:只能本地
return dispatchToLocal(request, "CONFIDENTIAL_POLICY");
}
if (level == DataSensitivityLevel.INTERNAL) {
// 内部敏感:先试本地,本地搞不定就脱敏后云端
return dispatchInternalSensitive(request, sensitivityResult);
}
// 一般数据:成本优先路由
return dispatchByCost(request);
}
/**
* 内部敏感数据的路由逻辑
*/
private SchedulerResult dispatchInternalSensitive(
SchedulerRequest request,
SensitivityCheckResult sensitivityResult) {
// 先尝试本地模型
String taskType = request.getTaskType();
double minQuality = request.getMinQualityScore();
Optional<String> localModel = capabilityMatrix.getBestLocalModel(taskType, minQuality);
if (localModel.isPresent()) {
log.info("内部敏感数据,使用本地模型: model={}", localModel.get());
return dispatchToLocal(request, "LOCAL_PREFERRED");
}
// 本地模型能力不足,脱敏后送云端
if (sensitivityResult.isCanDesensitize()) {
String desensitized = sensitivityDetector.desensitize(buildFullText(request));
SchedulerRequest desensitizedRequest = request.withDesensitizedText(desensitized);
log.info("本地模型能力不足,脱敏后使用云端: featureCode={}", request.getFeatureCode());
return dispatchToCloud(desensitizedRequest, "DESENSITIZED_CLOUD");
}
// 无法脱敏,必须走本地(即使能力不足)
log.warn("无法脱敏且本地能力不足,强制本地处理: featureCode={}", request.getFeatureCode());
return dispatchToLocal(request, "FORCED_LOCAL");
}
/**
* 一般数据的成本优先路由
*/
private SchedulerResult dispatchByCost(SchedulerRequest request) {
String taskType = request.getTaskType();
double minQuality = request.getMinQualityScore();
// 计算本地 vs 云端的成本对比
double localCost = estimateLocalCost(request);
double cloudCheapestCost = estimateCloudCheapestCost(request);
// 判断本地模型是否胜任
boolean localCapable = capabilityMatrix
.getBestLocalModel(taskType, minQuality).isPresent();
if (localCapable && localCost < cloudCheapestCost * 0.5) {
// 本地成本低于云端50%,优先本地
return dispatchToLocal(request, "COST_OPTIMAL_LOCAL");
}
return dispatchToCloud(request, "COST_OPTIMAL_CLOUD");
}
private SchedulerResult dispatchToLocal(SchedulerRequest request, String reason) {
String taskType = request.getTaskType();
String modelName = capabilityMatrix
.getBestLocalModel(taskType, 0.0)
.orElse("qwen2.5-7b"); // fallback到默认本地模型
try {
long start = System.currentTimeMillis();
LocalInferenceResult result = localInference.infer(modelName, request);
long latencyMs = System.currentTimeMillis() - start;
// 记录本地推理成本(电费+硬件折旧)
recordLocalCost(request, modelName, result);
return SchedulerResult.builder()
.content(result.getContent())
.modelName(modelName)
.route("LOCAL")
.routeReason(reason)
.latencyMs(latencyMs)
.build();
} catch (Exception e) {
log.error("本地模型推理失败: model={}", modelName, e);
// 如果数据允许,fallback到云端
if (request.getSensitivityLevel() != DataSensitivityLevel.CONFIDENTIAL) {
return dispatchToCloud(request, "LOCAL_FAILURE_FALLBACK");
}
throw e;
}
}
private SchedulerResult dispatchToCloud(SchedulerRequest request, String reason) {
// 云端路由逻辑(可接入上一篇讲的智能路由)
AIResponse response = cloudService.call(request);
return SchedulerResult.builder()
.content(response.getContent())
.modelName(response.getModelName())
.route("CLOUD")
.routeReason(reason)
.build();
}
}本地模型推理服务(Ollama集成)
本地推理目前最方便的方案是 Ollama,Java 可以通过 HTTP API 调用。
@Service
@Slf4j
public class OllamaInferenceService implements LocalModelInferenceService {
@Value("${ollama.base-url:http://localhost:11434}")
private String ollamaBaseUrl;
private final RestTemplate restTemplate = new RestTemplate();
@Override
public LocalInferenceResult infer(String modelName, SchedulerRequest request) {
String url = ollamaBaseUrl + "/api/chat";
Map<String, Object> requestBody = buildRequestBody(modelName, request);
try {
ResponseEntity<String> response = restTemplate.postForEntity(
url, requestBody, String.class
);
return parseResponse(response.getBody());
} catch (Exception e) {
log.error("Ollama调用失败: model={}, url={}", modelName, url, e);
throw new LocalInferenceException("Ollama推理失败: " + e.getMessage(), e);
}
}
private Map<String, Object> buildRequestBody(String modelName, SchedulerRequest request) {
List<Map<String, String>> messages = new ArrayList<>();
if (request.getSystemPrompt() != null) {
messages.add(Map.of(
"role", "system",
"content", request.getSystemPrompt()
));
}
messages.add(Map.of(
"role", "user",
"content", request.getUserMessage()
));
return Map.of(
"model", modelName,
"messages", messages,
"stream", false,
"options", Map.of(
"temperature", request.getTemperature(),
"num_predict", request.getMaxOutputTokens()
)
);
}
/**
* 检查Ollama服务健康状态
*/
public boolean isHealthy() {
try {
ResponseEntity<String> response = restTemplate.getForEntity(
ollamaBaseUrl + "/api/tags", String.class
);
return response.getStatusCode().is2xxSuccessful();
} catch (Exception e) {
return false;
}
}
/**
* 获取本地可用模型列表
*/
public List<String> getAvailableModels() {
try {
ResponseEntity<Map> response = restTemplate.getForEntity(
ollamaBaseUrl + "/api/tags", Map.class
);
List<Map<String, Object>> models = (List<Map<String, Object>>) response.getBody().get("models");
return models.stream()
.map(m -> (String) m.get("name"))
.collect(Collectors.toList());
} catch (Exception e) {
log.error("获取Ollama模型列表失败", e);
return Collections.emptyList();
}
}
}本地推理的成本计算
本地推理不花API费,但有硬件成本,不能当做"免费"。
@Component
public class LocalCostCalculator {
// 服务器配置(示例)
// GPU: 2 * A100 80G, 购买价格约200万,使用年限5年
// CPU: 双路 AMD EPYC, 服务器成本约50万,使用年限5年
// 电费: 20kW功耗,工业电0.8元/度
@Value("${local.hardware.annual-depreciation-cny:500000}")
private BigDecimal annualDepreciationCny; // 年折旧(含硬件+机房)
@Value("${local.hardware.annual-power-cost-cny:140160}")
private BigDecimal annualPowerCostCny; // 年电费 20kW * 8760h * 0.8元
@Value("${local.hardware.annual-ops-cost-cny:100000}")
private BigDecimal annualOpsCostCny; // 年运维成本
// 每年能处理的Token数(基于GPU利用率和推理速度)
@Value("${local.hardware.annual-token-capacity:10000000000}")
private long annualTokenCapacity; // 100亿 token/年
/**
* 计算每1000个token的本地成本
*/
public BigDecimal getCostPerKToken() {
BigDecimal totalAnnualCost = annualDepreciationCny
.add(annualPowerCostCny)
.add(annualOpsCostCny);
// 元/token
BigDecimal costPerToken = totalAnnualCost
.divide(new BigDecimal(annualTokenCapacity), 10, RoundingMode.HALF_UP);
// 元/1K token
return costPerToken.multiply(new BigDecimal(1000));
}
/**
* 计算一次本地推理的成本
*/
public BigDecimal calcInferenceCost(int inputTokens, int outputTokens) {
BigDecimal costPerK = getCostPerKToken();
return costPerK.multiply(new BigDecimal(inputTokens + outputTokens))
.divide(new BigDecimal(1000), 6, RoundingMode.HALF_UP);
}
}这个成本计算很重要,因为很多人认为本地部署"不花钱"。实际上当请求量不大时,本地部署的每token成本可能比云端API还贵——云端API的边际成本低,本地部署的固定成本高。只有当请求量大到一定程度,本地部署才真的比云端便宜。
混合调度的监控
有个关键指标叫成本分流率,就是本地处理的请求占总请求的比例。这个比例不是越高越好,而是要在成本和质量之间找平衡点。
典型场景的调度决策
| 场景 | 数据敏感度 | 任务复杂度 | 推荐路由 | 原因 |
|---|---|---|---|---|
| 患者病历摘要 | CONFIDENTIAL | 中 | 本地 | 绝对不能外传 |
| 工单情感分析 | INTERNAL | 低 | 本地 | 本地能搞定 |
| 合同关键信息提取 | INTERNAL | 中 | 脱敏后云端 | 本地能力不足,脱敏后安全 |
| 产品FAQ回答 | GENERAL | 低 | 云端mini | 云端更快更准 |
| 技术文档代码审查 | GENERAL | 高 | 云端旗舰 | 需要强推理 |
混合调度这套方案落地时最难的不是技术,而是和安全合规团队、业务方一起对齐"什么数据算敏感"。技术实现是一周的事,但敏感度分级标准的讨论可能要拉好几轮会。
别跳过这个对齐,后面返工比直接对齐要贵得多。
