第2025篇:动态批处理与吞吐量优化——LLM服务的高并发实战
第2025篇:动态批处理与吞吐量优化——LLM服务的高并发实战
适读人群:需要提升LLM推理服务吞吐量的工程师 | 阅读时长:约18分钟 | 核心价值:理解动态批处理原理,掌握提升LLM服务并发处理能力的方法
压测那天,我盯着监控看:GPU利用率30%,QPS才12。
按说A100 40GB应该能跑到几十QPS,30%的利用率完全没压榨到GPU的性能。问题出在哪里?
答案是:我们的服务在等请求。每个请求来一个处理一个,GPU处理完就闲着等下一个请求到来。这就是没有批处理的典型症状。
GPU为什么需要批处理
GPU是为大规模并行计算设计的,有数千个计算核心。处理单个请求时,这些核心大多数是空闲的——就像一辆60座的大巴只坐了一个人。
批处理(Batching)就是把多个请求合并在一起,让GPU同时处理,把那些空闲的核心利用起来。
Continuous Batching(连续批处理)是vLLM等现代推理框架的核心技术:GPU在处理当前batch时,新到的请求可以在下一个token生成时加入进来,不需要等当前batch全部完成。
Continuous Batching的工作原理
传统批处理的问题:一个batch里的所有请求必须等最慢的那个完成,才能处理下一批。比如batch里有一个需要生成500 tokens的请求,其他只需要20 tokens的请求都得等它。
Continuous Batching的关键改进:每生成一个token后,立即检查有没有新请求,可以加入就加入。
// 概念示意:Continuous Batching的调度逻辑
// 注意:这是简化版,真实vLLM的实现更复杂
public class ContinuousBatchScheduler {
private final Queue<InferenceRequest> waitingQueue = new ConcurrentLinkedQueue<>();
private final List<RunningRequest> currentBatch = new ArrayList<>();
private final int maxBatchSize;
private final int maxNumBatchedTokens;
public ContinuousBatchScheduler(int maxBatchSize, int maxNumBatchedTokens) {
this.maxBatchSize = maxBatchSize;
this.maxNumBatchedTokens = maxNumBatchedTokens;
}
/**
* 每生成一个token后调用:调整batch组成
*
* 1. 移除已完成的请求
* 2. 从等待队列中拉入新请求
* 3. 检查token预算限制
*/
public void scheduleNextStep() {
// 1. 移除已完成的请求(已达到max_tokens或生成了EOS)
currentBatch.removeIf(req -> {
if (req.isFinished()) {
req.sendResult(); // 立即返回给客户端,不等其他请求
return true;
}
return false;
});
// 2. 计算当前batch还能接受多少token的处理
int currentTokenBudget = currentBatch.stream()
.mapToInt(RunningRequest::getCurrentSequenceLength)
.sum();
// 3. 从等待队列中尝试加入新请求
while (!waitingQueue.isEmpty() &&
currentBatch.size() < maxBatchSize) {
InferenceRequest candidate = waitingQueue.peek();
int candidateTokens = candidate.getInputLength();
if (currentTokenBudget + candidateTokens <= maxNumBatchedTokens) {
waitingQueue.poll();
currentBatch.add(new RunningRequest(candidate));
currentTokenBudget += candidateTokens;
} else {
break; // 超预算,等下一步
}
}
// 4. 对当前batch做一步推理(生成下一个token)
generateNextToken(currentBatch);
}
private void generateNextToken(List<RunningRequest> batch) {
// 实际上这里是GPU的并行计算
// batch越大,GPU利用率越高,throughput越好
}
}vLLM的批处理配置
理解了原理,来看实际配置:
# 高吞吐量配置(适合批量处理,比如夜间任务)
python -m vllm.entrypoints.openai.api_server \
--model Qwen/Qwen2-7B-Instruct \
\
# 关键配置
--max-num-seqs 512 \ # 最大并发序列数(越大吞吐越高)
--max-num-batched-tokens 65536 \ # 单步最大token数(越大GPU利用率越高)
\
# 显存配置
--gpu-memory-utilization 0.92 \
--max-model-len 4096 \ # 控制短一点,让KV Cache小,支持更多并发
\
# Prefix Cache(减少重复计算)
--enable-prefix-caching \
\
--port 8080
# 低延迟配置(适合在线服务,对延迟敏感)
python -m vllm.entrypoints.openai.api_server \
--model Qwen/Qwen2-7B-Instruct \
--max-num-seqs 64 \ # 并发数低一些,减少排队
--max-num-batched-tokens 8192 \ # 小一点,每步处理快一些
--gpu-memory-utilization 0.85 \
--max-model-len 4096 \
--enable-prefix-caching \
--port 8080Java端的连接池和并发控制
批处理是服务端的事,但Java端的连接管理同样重要:
/**
* vLLM客户端,配置合理的连接池和并发控制
*/
@Configuration
public class VllmBatchClientConfig {
@Value("${vllm.max-concurrent-requests:50}")
private int maxConcurrentRequests;
@Value("${vllm.request-timeout-ms:90000}")
private int requestTimeoutMs;
@Bean
public WebClient vllmWebClient(@Value("${vllm.base-url}") String baseUrl) {
// 使用WebClient做异步非阻塞请求,配合批处理
HttpClient httpClient = HttpClient.create()
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000)
.responseTimeout(Duration.ofMillis(requestTimeoutMs))
.connectionProvider(ConnectionProvider.builder("vllm-pool")
.maxConnections(maxConcurrentRequests)
.maxIdleTime(Duration.ofSeconds(30))
.maxLifeTime(Duration.ofMinutes(10))
.pendingAcquireMaxCount(maxConcurrentRequests * 2)
.evictInBackground(Duration.ofSeconds(60))
.build());
return WebClient.builder()
.baseUrl(baseUrl)
.clientConnector(new ReactorClientHttpConnector(httpClient))
.defaultHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
.build();
}
}
/**
* 批量推理服务:把多个请求打包成批次发送
* 适合离线批量处理场景
*/
@Service
@RequiredArgsConstructor
@Slf4j
public class BatchInferenceService {
private final WebClient vllmWebClient;
/**
* 并行处理批量推理请求
* 利用vLLM的Continuous Batching,所有请求几乎同时到达,GPU处理效率最高
*/
public List<String> batchInfer(List<String> prompts) {
// 将所有请求并发发出(不等前一个完成再发下一个)
// vLLM的Continuous Batching会自动合并它们
List<Mono<String>> futures = prompts.stream()
.map(prompt -> callModelAsync(prompt)
.doOnError(e -> log.warn("批量请求失败: {}", e.getMessage()))
.onErrorReturn("ERROR: " + e.getMessage())) // 单个失败不影响其他
.collect(Collectors.toList());
// 并发等待所有请求完成
return Flux.merge(futures)
.collectList()
.block(Duration.ofMinutes(5));
}
private Mono<String> callModelAsync(String prompt) {
Map<String, Object> requestBody = Map.of(
"model", "qwen2-7b",
"messages", List.of(Map.of("role", "user", "content", prompt)),
"max_tokens", 512
);
return vllmWebClient.post()
.uri("/v1/chat/completions")
.bodyValue(requestBody)
.retrieve()
.bodyToMono(Map.class)
.map(response -> {
List<Map> choices = (List<Map>) response.get("choices");
Map message = (Map) choices.get(0).get("message");
return (String) message.get("content");
});
}
/**
* 带优先级的批量处理:高优先级请求快速通道
*/
public CompletableFuture<String> infer(String prompt, RequestPriority priority) {
if (priority == RequestPriority.HIGH) {
// 高优先级:单独发送,不排队
return CompletableFuture.supplyAsync(() ->
callModelAsync(prompt).block(Duration.ofSeconds(60)));
} else {
// 普通优先级:加入批次队列,等待批处理
return addToBatchQueue(prompt);
}
}
}吞吐量优化的实测数据
同样的A100 40GB,Qwen2-7B,不同配置下的性能对比:
| 配置 | QPS | P50延迟 | P99延迟 | GPU利用率 |
|---|---|---|---|---|
| 无批处理(串行) | 12 | 1.1s | 1.5s | 28% |
| 静态批处理(batch=4) | 38 | 1.8s | 2.3s | 71% |
| Continuous Batching(默认) | 65 | 1.5s | 2.8s | 88% |
| Continuous Batching(调优) | 85 | 1.6s | 3.1s | 95% |
调优配置:--max-num-seqs 256 --max-num-batched-tokens 32768 --enable-prefix-caching
Continuous Batching的QPS是无批处理的7倍,而P50延迟只增加了40%——用稍微高一点的延迟换来了极大的吞吐提升。
真实场景的优化建议
场景一:AI报告生成(批量、非实时)
这种场景可以把请求排队,等凑够一定数量再统一处理:
@Component
@RequiredArgsConstructor
public class ReportGenerationBatcher {
private final BlockingQueue<ReportTask> taskQueue = new LinkedBlockingQueue<>(1000);
private final BatchInferenceService batchService;
@Scheduled(fixedDelay = 5000) // 每5秒批量处理一次
public void processBatch() {
List<ReportTask> batch = new ArrayList<>();
taskQueue.drainTo(batch, 50); // 每次最多处理50个
if (batch.isEmpty()) return;
List<String> prompts = batch.stream()
.map(ReportTask::getPrompt)
.collect(Collectors.toList());
List<String> results = batchService.batchInfer(prompts);
for (int i = 0; i < batch.size(); i++) {
batch.get(i).complete(results.get(i));
}
}
}场景二:实时对话
实时对话不能攒批次,但可以确保Java端的连接不阻塞:
// 使用线程池 + 信号量控制并发上限
@Service
public class RealtimeChatService {
private final Semaphore concurrentCallLimit;
private final BatchInferenceService batchService;
@Value("${llm.max.concurrent:30}")
public RealtimeChatService(int maxConcurrent, BatchInferenceService batchService) {
this.concurrentCallLimit = new Semaphore(maxConcurrent);
this.batchService = batchService;
}
public String chat(String userMessage) {
if (!concurrentCallLimit.tryAcquire(100, TimeUnit.MILLISECONDS)) {
throw new ServiceOverloadException("服务繁忙,请稍后再试");
}
try {
return batchService.callModelAsync(userMessage).block(Duration.ofSeconds(60));
} finally {
concurrentCallLimit.release();
}
}
}批处理是LLM服务性能优化的基础,理解了Continuous Batching的工作原理,就明白了为什么vLLM能在高并发下表现出色。
工程实践中的核心原则:让GPU永远有活干,不要让它空等。Continuous Batching、合理的队列深度、非阻塞的连接管理,都是在为这个目标服务。
