第1942篇:Claude的Extended Thinking在复杂推理任务中的工程应用
第1942篇:Claude的Extended Thinking在复杂推理任务中的工程应用
有一段时间我对Extended Thinking这个特性的态度是:能不用就不用,太贵了。直到有个项目逼着我去认真搞清楚它的边界,我才意识到这个判断太草率了。
Extended Thinking不只是"让Claude想久一点",它在架构上的设计思路和对工程实践的影响,值得专门聊一聊。
Extended Thinking是什么,不是什么
很多人把Extended Thinking理解成"更长的思维链",这个理解有点对但不全对。
思维链(CoT)在标准模式下也是存在的,只是隐藏了中间步骤,直接给你最终答案。Extended Thinking做的事情是:把这个中间推理过程暴露出来,而且给它分配了独立的token预算,并且保证这部分内容不计入输出限制,模型可以"想"得更彻底。
更重要的是,Extended Thinking的推理过程是不受输出格式限制的。正式输出你可能要求JSON格式或者Markdown,但thinking block里模型可以随意写,类似人在草稿纸上推演——乱、直接、自由。这对需要大量中间推导的任务来说,是有实质性帮助的。
从工程角度看,Extended Thinking响应的结构变了:
{
"id": "msg_xxx",
"content": [
{
"type": "thinking",
"thinking": "让我先理解这个问题的结构...\n首先需要确认...\n然后分析...\n所以结论是..."
},
{
"type": "text",
"text": "根据分析,最终答案是..."
}
]
}这个结构在处理响应的时候要特别注意,不能直接拿content[0].text,要先找type为text的块。
什么任务用Extended Thinking有收益
这个问题我测试了不少场景,结论是:对任务的推理深度要求,比任务的领域复杂度更关键。
有收益的场景:
- 多步逻辑推导:比如给定一段业务规则描述,推导出所有边界情况
- 数学/算法问题:需要精确计算或者证明
- 复杂代码分析:理解一段有多层依赖的代码逻辑,找出潜在bug
- 多约束优化:在多个相互冲突的条件下找平衡方案
- 长文档摘要+推断:不只是摘要,还需要从文档内容推断出没有明说的结论
收益不明显的场景:
- 信息检索类:问题本身没有推理链,就是查个知识点
- 创意写作:创意不靠推理,靠发散,Extended Thinking反而可能让输出变得"过于严谨"
- 简单分类任务:给一段文本打标签,这种任务标准模式已经够用
- 实时交互场景:Extended Thinking延迟明显更高,聊天机器人这种场景用户体验会变差
Java代码实现
Spring AI目前对Extended Thinking的支持还在完善中,我这里直接用Anthropic的官方Java HTTP客户端实现,更灵活。
首先是基础调用:
@Service
public class ExtendedThinkingService {
private final String apiKey;
private final OkHttpClient httpClient;
private final ObjectMapper objectMapper;
public ExtendedThinkingService(@Value("${anthropic.api.key}") String apiKey) {
this.apiKey = apiKey;
this.httpClient = new OkHttpClient.Builder()
.connectTimeout(30, TimeUnit.SECONDS)
.readTimeout(300, TimeUnit.SECONDS) // Extended Thinking可能需要很长时间
.writeTimeout(30, TimeUnit.SECONDS)
.build();
this.objectMapper = new ObjectMapper();
}
/**
* 带Extended Thinking的推理请求
*
* @param prompt 用户问题
* @param thinkingBudget thinking tokens预算,建议1000-10000
* @return 包含thinking过程和最终答案的结果
*/
public ThinkingResult reasonWithThinking(String prompt, int thinkingBudget) {
Map<String, Object> requestBody = buildRequest(prompt, thinkingBudget);
String json;
try {
json = objectMapper.writeValueAsString(requestBody);
} catch (JsonProcessingException e) {
throw new RuntimeException("请求序列化失败", e);
}
Request request = new Request.Builder()
.url("https://api.anthropic.com/v1/messages")
.post(RequestBody.create(json, MediaType.parse("application/json")))
.header("x-api-key", apiKey)
.header("anthropic-version", "2023-06-01")
.header("anthropic-beta", "interleaved-thinking-2025-05-14")
.build();
try (Response response = httpClient.newCall(request).execute()) {
if (!response.isSuccessful()) {
String errorBody = response.body() != null ? response.body().string() : "unknown";
throw new RuntimeException("API调用失败: " + response.code() + " " + errorBody);
}
String responseBody = response.body().string();
return parseThinkingResponse(responseBody);
} catch (IOException e) {
throw new RuntimeException("HTTP请求失败", e);
}
}
private Map<String, Object> buildRequest(String prompt, int thinkingBudget) {
return Map.of(
"model", "claude-opus-4-5",
"max_tokens", thinkingBudget + 8000, // max_tokens必须大于thinking budget
"thinking", Map.of(
"type", "enabled",
"budget_tokens", thinkingBudget
),
"messages", List.of(
Map.of("role", "user", "content", prompt)
)
);
}
private ThinkingResult parseThinkingResponse(String responseBody) throws IOException {
JsonNode root = objectMapper.readTree(responseBody);
JsonNode contentArray = root.get("content");
String thinkingText = "";
String answerText = "";
for (JsonNode block : contentArray) {
String type = block.get("type").asText();
if ("thinking".equals(type)) {
thinkingText = block.get("thinking").asText();
} else if ("text".equals(type)) {
answerText = block.get("text").asText();
}
}
// 提取token使用信息
JsonNode usage = root.get("usage");
int inputTokens = usage.get("input_tokens").asInt();
int outputTokens = usage.get("output_tokens").asInt();
return new ThinkingResult(thinkingText, answerText, inputTokens, outputTokens);
}
}结果对象:
@Data
@AllArgsConstructor
public class ThinkingResult {
private String thinkingProcess; // 推理过程
private String finalAnswer; // 最终答案
private int inputTokens;
private int outputTokens;
public int getThinkingTokenCount() {
// 粗略估算thinking部分的token数
return thinkingProcess.length() / 4;
}
public double estimateCost() {
// claude-opus-4-5的价格参考:input $15/M, output $75/M
double inputCost = inputTokens * 15.0 / 1_000_000;
double outputCost = outputTokens * 75.0 / 1_000_000;
return inputCost + outputCost;
}
}流式处理Extended Thinking
实际项目里,Extended Thinking的响应时间可能达到几十秒甚至更长,如果用同步请求,用户等待体验很差。要用流式输出。
@Service
public class StreamingThinkingService {
/**
* 流式Extended Thinking,实时推送思考过程和答案
*/
public Flux<ThinkingEvent> streamReasoning(String prompt, int thinkingBudget) {
return Flux.create(sink -> {
Map<String, Object> requestBody = buildStreamRequest(prompt, thinkingBudget);
// 使用WebClient做流式请求
WebClient.create("https://api.anthropic.com")
.post()
.uri("/v1/messages")
.header("x-api-key", apiKey)
.header("anthropic-version", "2023-06-01")
.header("anthropic-beta", "interleaved-thinking-2025-05-14")
.contentType(MediaType.APPLICATION_JSON)
.bodyValue(requestBody)
.retrieve()
.bodyToFlux(String.class)
.subscribe(
line -> parseStreamLine(line, sink),
sink::error,
sink::complete
);
});
}
private Map<String, Object> buildStreamRequest(String prompt, int thinkingBudget) {
Map<String, Object> body = new HashMap<>();
body.put("model", "claude-opus-4-5");
body.put("max_tokens", thinkingBudget + 8000);
body.put("stream", true);
body.put("thinking", Map.of("type", "enabled", "budget_tokens", thinkingBudget));
body.put("messages", List.of(Map.of("role", "user", "content", prompt)));
return body;
}
private void parseStreamLine(String line, FluxSink<ThinkingEvent> sink) {
if (!line.startsWith("data: ")) return;
String data = line.substring(6).trim();
if ("[DONE]".equals(data)) return;
try {
JsonNode event = objectMapper.readTree(data);
String eventType = event.path("type").asText();
switch (eventType) {
case "content_block_start":
handleBlockStart(event, sink);
break;
case "content_block_delta":
handleBlockDelta(event, sink);
break;
case "content_block_stop":
sink.next(ThinkingEvent.blockEnd());
break;
}
} catch (Exception e) {
// SSE流里的解析错误通常可以忽略,继续处理下一行
log.debug("SSE行解析跳过: {}", e.getMessage());
}
}
private void handleBlockStart(JsonNode event, FluxSink<ThinkingEvent> sink) {
JsonNode block = event.path("content_block");
String type = block.path("type").asText();
if ("thinking".equals(type)) {
sink.next(ThinkingEvent.thinkingStart());
} else if ("text".equals(type)) {
sink.next(ThinkingEvent.answerStart());
}
}
private void handleBlockDelta(JsonNode event, FluxSink<ThinkingEvent> sink) {
JsonNode delta = event.path("delta");
String deltaType = delta.path("type").asText();
if ("thinking_delta".equals(deltaType)) {
sink.next(ThinkingEvent.thinkingChunk(delta.path("thinking").asText()));
} else if ("text_delta".equals(deltaType)) {
sink.next(ThinkingEvent.answerChunk(delta.path("text").asText()));
}
}
}前端展示时,可以把thinking过程做成一个可展开的"推理过程"折叠框,既不干扰用户看最终答案,又让用户有信心知道AI是经过认真推理的。这个交互设计的细节很值得做,我们测过,有thinking过程展示的界面,用户对结果的信任度明显更高。
精细控制thinking预算
这是最容易被忽视的优化点。thinking budget设多少直接影响效果和成本。
我做了一套动态预算分配:
@Component
public class ThinkingBudgetCalculator {
/**
* 根据问题特征计算合适的thinking budget
*/
public int calculateBudget(String question) {
int baseBudget = 1000;
// 问题长度加权
int questionLength = question.length();
if (questionLength > 2000) baseBudget += 2000;
else if (questionLength > 500) baseBudget += 1000;
// 关键词检测
if (containsComplexIndicators(question)) {
baseBudget += 3000;
}
// 数学/代码问题需要更多thinking
if (containsMathOrCode(question)) {
baseBudget += 2000;
}
// 上限控制
return Math.min(baseBudget, 16000);
}
private boolean containsComplexIndicators(String question) {
List<String> indicators = Arrays.asList(
"分析", "推导", "证明", "比较", "评估",
"analyze", "prove", "compare", "evaluate",
"为什么", "如何", "原因", "机制"
);
return indicators.stream().anyMatch(question::contains);
}
private boolean containsMathOrCode(String question) {
return question.contains("计算") ||
question.contains("算法") ||
question.contains("代码") ||
question.contains("程序") ||
question.matches(".*[\\d+\\-*/=]+.*");
}
}实际案例:用Extended Thinking做代码安全审计
分享一个我们用在生产上的场景:代码安全审计。给定一段Java代码,让Claude找出潜在的安全漏洞。
这个任务用标准模式效果一般,因为很多安全漏洞需要追踪数据流、理解多层调用,模型需要"想清楚"整个调用链才能正确判断是否有漏洞。Extended Thinking在这里的提升很明显。
@Service
public class SecurityAuditService {
@Autowired
private ExtendedThinkingService thinkingService;
private static final String AUDIT_PROMPT_TEMPLATE = """
请对以下Java代码进行安全审计,找出所有潜在的安全漏洞。
需要重点检查:
1. SQL注入漏洞
2. XSS漏洞
3. 权限绑过漏洞
4. 敏感数据泄露
5. 不安全的反序列化
6. 路径遍历漏洞
代码如下:
```java
%s
```
请按严重程度(高/中/低)列出每个问题,并给出修复建议。
""";
public SecurityAuditReport audit(String javaCode) {
String prompt = String.format(AUDIT_PROMPT_TEMPLATE, javaCode);
// 代码审计给较大的thinking预算
int budget = calculateAuditBudget(javaCode);
ThinkingResult result = thinkingService.reasonWithThinking(prompt, budget);
return SecurityAuditReport.builder()
.rawAnswer(result.getFinalAnswer())
.thinkingProcess(result.getThinkingProcess())
.estimatedCost(result.estimateCost())
.vulnerabilities(parseVulnerabilities(result.getFinalAnswer()))
.build();
}
private int calculateAuditBudget(String code) {
// 按代码行数估算需要的thinking量
long lineCount = code.lines().count();
if (lineCount > 500) return 16000;
if (lineCount > 200) return 8000;
if (lineCount > 50) return 4000;
return 2000;
}
}实测下来,对200行以上的复杂业务代码,Extended Thinking能发现一些标准模式漏掉的间接数据流问题。
成本控制的实用建议
Extended Thinking的费用是实实在在的。claude-opus-4-5大概$15/M输入,$75/M输出,thinking tokens计入输出。一次10000 token的thinking,光这部分就要$0.75。
我们的控制策略:
按问题复杂度分流:搭一个轻量分类器,简单问题走标准模式,复杂问题才开thinking。这一步单独可以节省60-70%的成本。
thinking结果缓存:相同或近似的复杂问题,缓存thinking结果。用embedding相似度做命中判断,相似度>0.92就用缓存。
budget动态调整:用上面的自动预算计算,不要无脑给最大值。大多数问题4000以内就够了。
异步批处理:不需要实时响应的审计任务,批量处理,可以用cheaper的模型先筛一遍,只把筛出的高风险代码送给Extended Thinking深度分析。
一个反直觉的发现
最后分享一个我觉得挺有意思的发现:Extended Thinking在某些情况下会让结果更差,不是更好。
具体场景是:当问题本身是开放性的、没有"正确答案"的时候,比如"给我写一首关于秋天的诗"。这时候thinking过程会让模型过度分析"什么是好诗",反而输出变得僵化,缺乏自然的创意感。
Extended Thinking适合"有答案需要找到"的任务,不适合"需要发散创造"的任务。这个判断基准比"任务复杂不复杂"更准。
