第1704篇:Java函数式编程与AI流水线——Function Composition构建Prompt链
第1704篇:Java函数式编程与AI流水线——Function Composition构建Prompt链
去年我参与了一个RAG系统的重构,最让我头疼的不是向量检索,而是Prompt的预处理流水线。
用户输入进来之后,要经历:去掉SQL注入风险的特殊字符、截断到合适长度、注入上下文信息、添加格式化指令、最后组装成最终Prompt。原来的代码是一个700行的Service类,一堆私有方法,像一盘意大利面。
后来用Function Composition重写,同样的逻辑压缩到不到200行,而且每个步骤可以单独测试,随时增删步骤,还能在不同场景下复用各个处理器。今天把这个思路完整讲出来。
一、Function Composition的核心概念
Java 8引入了 java.util.function 包,里面有几个关键接口:Function<T, R>、UnaryOperator<T>、Predicate<T>、Consumer<T>。
函数组合(Function Composition)就是把多个函数串联成一个新函数。这不是什么新概念,函数式语言里叫"函数管道"或"函数合成"。
Function<String, String> f1 = String::trim; // 去空白
Function<String, String> f2 = String::toLowerCase; // 转小写
Function<String, Integer> f3 = String::length; // 获取长度
// andThen:先执行当前函数,再执行后面的函数
Function<String, Integer> pipeline = f1.andThen(f2).andThen(f3);
int result = pipeline.apply(" HELLO WORLD "); // 结果是 11
// compose:先执行参数函数,再执行当前函数(顺序相反)
Function<String, Integer> reversed = f3.compose(f2).compose(f1);
// 效果等同于 f1 -> f2 -> f3andThen 和 compose 是两种组合方式,我通常用 andThen,因为它的执行顺序和代码阅读顺序一致,更直观。
二、为什么AI的Prompt处理适合函数式流水线
Prompt处理天然是一个线性变换序列:输入字符串 → 经过若干转换 → 输出字符串。
而且这些转换通常:
- 相互独立,没有副作用(理想情况下)
- 可以按需组合,不同场景用不同的步骤组合
- 每个步骤可以单独测试
- 有时候需要条件性地跳过某个步骤
这完全符合函数式编程的适用场景。
我们先定义一个Prompt处理器的基础抽象:
// 基础接口:Prompt变换器
@FunctionalInterface
public interface PromptTransformer {
String transform(PromptContext context, String prompt);
// 组合两个变换器:先执行this,再执行other
default PromptTransformer andThen(PromptTransformer other) {
return (ctx, prompt) -> other.transform(ctx, this.transform(ctx, prompt));
}
// 条件执行:只在条件满足时执行变换
default PromptTransformer when(Predicate<PromptContext> condition) {
return (ctx, prompt) -> condition.test(ctx) ? this.transform(ctx, prompt) : prompt;
}
// 包装一个带统计的版本
default PromptTransformer withMetrics(String transformerName) {
return (ctx, prompt) -> {
int before = prompt.length();
String result = this.transform(ctx, prompt);
int after = result.length();
// 可以在这里记录metrics
return result;
};
}
}
// Prompt上下文:携带处理所需的元数据
public record PromptContext(
String userId,
String sessionId,
String language, // 用户语言
Map<String, String> variables, // 模板变量
List<String> retrievedDocs, // RAG检索到的文档
boolean safeMode, // 是否启用安全过滤
int maxLength // 最大长度
) {
public PromptContext {
Objects.requireNonNull(userId, "userId required");
variables = variables != null ? Map.copyOf(variables) : Map.of();
retrievedDocs = retrievedDocs != null ? List.copyOf(retrievedDocs) : List.of();
if (maxLength <= 0) maxLength = 4000;
}
public boolean hasRetrievedDocs() {
return !retrievedDocs.isEmpty();
}
public boolean isChinese() {
return "zh".equals(language) || "zh-CN".equals(language);
}
}三、构建标准Prompt变换器库
现在来实现各种可复用的变换器:
// Prompt变换器工具集
public final class PromptTransformers {
private PromptTransformers() {}
// 1. 清理特殊字符(防止prompt注入)
public static PromptTransformer sanitize() {
return (ctx, prompt) -> {
// 移除可能的prompt注入攻击字符
return prompt
.replaceAll("\\[SYSTEM\\]", "[S]")
.replaceAll("\\[INST\\]", "[I]")
.replaceAll("(?i)ignore previous instructions", "[已过滤]")
.replaceAll("(?i)you are now", "[已过滤]")
.strip();
};
}
// 2. 截断到最大长度(按字符,智能截断到句子边界)
public static PromptTransformer truncate() {
return (ctx, prompt) -> {
int maxLen = ctx.maxLength();
if (prompt.length() <= maxLen) return prompt;
// 尝试在句子边界截断
String truncated = prompt.substring(0, maxLen);
int lastPeriod = Math.max(
truncated.lastIndexOf('。'),
Math.max(truncated.lastIndexOf('.'), truncated.lastIndexOf('!'))
);
if (lastPeriod > maxLen * 0.7) {
// 找到了合适的句子边界
return truncated.substring(0, lastPeriod + 1);
}
return truncated + "...(已截断)";
};
}
// 3. 变量替换(模板系统)
public static PromptTransformer resolveVariables() {
return (ctx, prompt) -> {
String result = prompt;
for (Map.Entry<String, String> entry : ctx.variables().entrySet()) {
result = result.replace("{{" + entry.getKey() + "}}", entry.getValue());
}
return result;
};
}
// 4. 注入RAG上下文
public static PromptTransformer injectRetrievedContext() {
return (ctx, prompt) -> {
if (!ctx.hasRetrievedDocs()) return prompt;
StringBuilder contextBlock = new StringBuilder();
contextBlock.append("以下是相关背景资料:\n\n");
for (int i = 0; i < ctx.retrievedDocs().size(); i++) {
contextBlock.append("[资料").append(i + 1).append("]\n");
contextBlock.append(ctx.retrievedDocs().get(i));
contextBlock.append("\n\n");
}
contextBlock.append("---\n\n");
contextBlock.append("基于以上资料,请回答:\n");
contextBlock.append(prompt);
return contextBlock.toString();
};
}
// 5. 添加语言指令
public static PromptTransformer enforceLanguage() {
return (ctx, prompt) -> {
if (ctx.isChinese()) {
return prompt + "\n\n请用中文回答。";
}
return prompt;
};
}
// 6. 添加格式化要求
public static PromptTransformer addFormatInstruction(String format) {
return (ctx, prompt) -> switch (format) {
case "json" -> prompt + "\n\n请以JSON格式返回,不要包含markdown代码块标记。";
case "markdown" -> prompt + "\n\n请使用Markdown格式,包括适当的标题和列表。";
case "concise" -> prompt + "\n\n请简洁回答,不超过200字。";
case "detailed" -> prompt + "\n\n请详细说明,包含原因、步骤和示例。";
default -> prompt;
};
}
// 7. 去重(有时候RAG注入会导致重复内容)
public static PromptTransformer deduplicateLines() {
return (ctx, prompt) -> {
String[] lines = prompt.split("\n");
Set<String> seen = new LinkedHashSet<>();
for (String line : lines) {
seen.add(line.trim());
}
return String.join("\n", seen);
};
}
// 8. 添加用户上下文信息
public static PromptTransformer addUserContext() {
return (ctx, prompt) -> {
String contextInfo = "用户ID: " + ctx.userId() +
"\n会话ID: " + ctx.sessionId() + "\n\n";
// 这通常放在系统提示里,这里只是演示
return contextInfo + prompt;
};
}
// 工具方法:把多个变换器合并成一个
public static PromptTransformer compose(PromptTransformer... transformers) {
return Arrays.stream(transformers)
.reduce((a, b) -> a.andThen(b))
.orElse((ctx, prompt) -> prompt);
}
}四、组装流水线:针对不同场景
有了这些构建块,可以针对不同业务场景快速组装流水线:
@Service
public class PromptPipelineFactory {
// 标准问答流水线
public PromptTransformer standardQAPipeline() {
return PromptTransformers.compose(
PromptTransformers.sanitize(),
PromptTransformers.resolveVariables(),
PromptTransformers.truncate(),
PromptTransformers.enforceLanguage()
);
}
// RAG增强流水线
public PromptTransformer ragPipeline() {
return PromptTransformers.compose(
PromptTransformers.sanitize(),
PromptTransformers.resolveVariables(),
// 只在有检索结果时注入上下文
PromptTransformers.injectRetrievedContext()
.when(PromptContext::hasRetrievedDocs),
PromptTransformers.truncate(),
PromptTransformers.enforceLanguage()
);
}
// JSON输出流水线(严格格式要求)
public PromptTransformer jsonOutputPipeline() {
return PromptTransformers.compose(
PromptTransformers.sanitize(),
PromptTransformers.resolveVariables(),
PromptTransformers.addFormatInstruction("json"),
PromptTransformers.truncate()
);
}
// 代码生成流水线(需要截断但不需要语言指令)
public PromptTransformer codeGenerationPipeline() {
return PromptTransformers.compose(
PromptTransformers.sanitize(),
PromptTransformers.resolveVariables(),
PromptTransformers.truncate(),
PromptTransformers.addFormatInstruction("markdown")
);
}
// 安全模式流水线(更严格的过滤)
public PromptTransformer safePipeline() {
return PromptTransformers.compose(
PromptTransformers.sanitize(),
PromptTransformers.resolveVariables(),
PromptTransformers.truncate().when(ctx -> ctx.maxLength() > 0),
PromptTransformers.enforceLanguage(),
// 只在安全模式下添加额外提示
((PromptTransformer) (ctx, prompt) ->
ctx.safeMode()
? prompt + "\n\n注意:请确保回答内容安全、积极、符合社区准则。"
: prompt
)
);
}
}五、Function组合处理AI响应
同样的思路也适用于AI响应的后处理:
// 响应后处理器
@FunctionalInterface
public interface ResponsePostProcessor {
String process(String rawResponse);
default ResponsePostProcessor andThen(ResponsePostProcessor next) {
return raw -> next.process(this.process(raw));
}
}
// 常用后处理器
public final class ResponsePostProcessors {
// 清除markdown代码块标记
public static ResponsePostProcessor stripMarkdownCodeBlocks() {
return response -> response
.replaceAll("```[a-zA-Z]*\\n", "")
.replaceAll("```\\s*$", "")
.replaceAll("^```\\s*", "")
.strip();
}
// 提取JSON
public static ResponsePostProcessor extractJson() {
return response -> {
Pattern jsonPattern = Pattern.compile("\\{[\\s\\S]*\\}|\\[[\\s\\S]*\\]");
Matcher matcher = jsonPattern.matcher(response);
return matcher.find() ? matcher.group() : response;
};
}
// 规范化列表格式
public static ResponsePostProcessor normalizeListFormat() {
return response -> response
.replaceAll("^\\s*[•·]\\s*", "- ") // 统一bullet符号
.replaceAll("^\\s*[0-9]+[.)、]\\s*", "$0"); // 保留数字列表
}
// 截断到目标长度
public static ResponsePostProcessor truncateTo(int maxLength) {
return response -> {
if (response.length() <= maxLength) return response;
return response.substring(0, maxLength) + "...";
};
}
// 过滤掉AI的免责声明(有时候不需要)
public static ResponsePostProcessor removeDisclaimers() {
return response -> response
.replaceAll("(?i)as an ai language model,?\\s*", "")
.replaceAll("(?i)as an artificial intelligence,?\\s*", "")
.replaceAll("作为一个AI助手,?\\s*", "")
.strip();
}
}六、带错误处理的函数式流水线
纯函数流水线有个问题:异常处理比较尴尬。要么用try-catch包住整个流水线,要么让异常逃出流水线。更好的方式是用类似 Either 的容器来传递成功/失败:
// 简单的Either类型
public sealed interface ProcessResult<T>
permits ProcessResult.Success, ProcessResult.Failure {
record Success<T>(T value) implements ProcessResult<T> {}
record Failure<T>(String error, Throwable cause) implements ProcessResult<T> {}
static <T> ProcessResult<T> success(T value) {
return new Success<>(value);
}
static <T> ProcessResult<T> failure(String error, Throwable cause) {
return new Failure<>(error, cause);
}
default boolean isSuccess() {
return this instanceof Success;
}
// 链式变换:成功时应用变换,失败时传递失败
default <R> ProcessResult<R> map(Function<T, R> mapper) {
return switch (this) {
case Success<T> s -> {
try {
yield ProcessResult.success(mapper.apply(s.value()));
} catch (Exception e) {
yield ProcessResult.failure("变换失败: " + e.getMessage(), e);
}
}
case Failure<T> f -> ProcessResult.failure(f.error(), f.cause());
};
}
// flatMap:允许变换返回ProcessResult
default <R> ProcessResult<R> flatMap(Function<T, ProcessResult<R>> mapper) {
return switch (this) {
case Success<T> s -> {
try {
yield mapper.apply(s.value());
} catch (Exception e) {
yield ProcessResult.failure("变换失败: " + e.getMessage(), e);
}
}
case Failure<T> f -> ProcessResult.failure(f.error(), f.cause());
};
}
// 获取值,失败时使用默认值
default T getOrElse(T defaultValue) {
return switch (this) {
case Success<T> s -> s.value();
case Failure<T> ignored -> defaultValue;
};
}
}
// 使用Either风格的流水线
@Service
public class SafePromptPipeline {
public ProcessResult<String> process(String rawInput, PromptContext ctx) {
return ProcessResult.success(rawInput)
.map(s -> PromptTransformers.sanitize().transform(ctx, s))
.map(s -> PromptTransformers.resolveVariables().transform(ctx, s))
.flatMap(s -> {
if (s.isBlank()) {
return ProcessResult.failure("处理后Prompt为空", null);
}
return ProcessResult.success(s);
})
.map(s -> PromptTransformers.truncate().transform(ctx, s))
.map(s -> PromptTransformers.enforceLanguage().transform(ctx, s));
}
}七、Prompt链(Prompt Chaining)
上面讲的是单个Prompt的处理。在更复杂的场景里,一个用户请求需要多次调用LLM,每次调用的输出作为下次的输入,这叫Prompt链(Prompt Chain)或者LLM Chain。
// 一次AI调用的配置
public record ChainStep(
String name,
String systemPrompt,
Function<ChainContext, String> userPromptBuilder,
Function<String, ChainContext> outputParser
) {}
// 链的上下文
public record ChainContext(
String originalInput,
Map<String, String> intermediateResults
) {
public ChainContext withResult(String stepName, String result) {
Map<String, String> newResults = new HashMap<>(intermediateResults);
newResults.put(stepName, result);
return new ChainContext(originalInput, Collections.unmodifiableMap(newResults));
}
public String getResult(String stepName) {
return intermediateResults.getOrDefault(stepName, "");
}
}
// Prompt链执行器
@Service
public class PromptChainExecutor {
private final ChatClient chatClient;
// 执行一个链,依次处理每个步骤
public ChainContext execute(String input, List<ChainStep> steps) {
ChainContext context = new ChainContext(input, new HashMap<>());
for (ChainStep step : steps) {
String userPrompt = step.userPromptBuilder().apply(context);
String response = chatClient.prompt()
.system(step.systemPrompt())
.user(userPrompt)
.call()
.content();
context = step.outputParser().apply(response)
.withResult(step.name(), response);
// 添加当前步骤结果到上下文
}
return context;
}
// 用Builder模式构建链
public static ChainBuilder builder() {
return new ChainBuilder();
}
public static class ChainBuilder {
private final List<ChainStep> steps = new ArrayList<>();
public ChainBuilder step(String name, String systemPrompt,
Function<ChainContext, String> promptBuilder) {
steps.add(new ChainStep(
name, systemPrompt, promptBuilder,
response -> new ChainContext("", new HashMap<>()) // 默认不解析
));
return this;
}
public ChainBuilder stepWithParser(String name, String systemPrompt,
Function<ChainContext, String> promptBuilder,
Function<String, ChainContext> parser) {
steps.add(new ChainStep(name, systemPrompt, promptBuilder, parser));
return this;
}
public List<ChainStep> build() {
return Collections.unmodifiableList(steps);
}
}
}使用这个链执行器,构建一个"简历优化"的多步骤流程:
@Service
public class ResumeOptimizationService {
private final PromptChainExecutor chainExecutor;
public String optimizeResume(String resumeText) {
List<ChainStep> chain = PromptChainExecutor.builder()
// 第一步:分析简历的问题
.step(
"analyze",
"你是一位资深HR专家,擅长简历优化",
ctx -> "请分析以下简历的主要问题和改进点:\n\n" + ctx.originalInput()
)
// 第二步:基于分析结果提供具体改进建议
.step(
"suggest",
"你是一位简历优化顾问,给出具体可执行的建议",
ctx -> "基于以下分析:\n" + ctx.getResult("analyze") +
"\n\n请给出具体的改进建议,每条建议包含原文和改后的示例"
)
// 第三步:直接输出优化后的完整简历
.step(
"rewrite",
"你是一位专业的简历写作专家,用词精练、突出亮点",
ctx -> "原简历:\n" + ctx.originalInput() +
"\n\n改进建议:\n" + ctx.getResult("suggest") +
"\n\n请基于以上信息,输出完整的优化后简历"
)
.build();
ChainContext result = chainExecutor.execute(resumeText, chain);
return result.getResult("rewrite");
}
}这种链式调用的好处是每步的结果都可以检查,出问题了很容易定位是哪一步出的问题。
八、并行分支与结果合并
有时候需要并行执行多个Prompt,然后合并结果:
@Service
public class ParallelPromptProcessor {
private final ChatClient chatClient;
// 并行执行多个子任务,合并结果
public <T> List<T> processParallel(
List<String> inputs,
Function<String, String> promptBuilder,
Function<String, T> resultParser) {
return inputs.parallelStream()
.map(input -> {
String prompt = promptBuilder.apply(input);
String response = chatClient.prompt()
.user(prompt)
.call()
.content();
return resultParser.apply(response);
})
.toList();
}
// 多角度分析:同时从几个维度分析同一段内容
public Map<String, String> multiPerspectiveAnalysis(String content) {
Map<String, String> perspectives = Map.of(
"technical", "从技术角度分析:",
"business", "从商业价值角度分析:",
"risk", "从风险角度分析潜在问题:"
);
return perspectives.entrySet().parallelStream()
.collect(Collectors.toMap(
Map.Entry::getKey,
entry -> chatClient.prompt()
.user(entry.getValue() + "\n\n" + content)
.call()
.content()
));
}
}九、架构图:Prompt流水线全貌
小结
函数式编程在AI Prompt工程里的核心价值:
- 每个变换器单一职责,可独立测试
- 流水线组合灵活,按场景搭积木
- 条件执行(
.when())避免了if-else的散落 - Either风格的错误处理让失败情况显式可见
- Prompt链让多步骤AI任务结构清晰可调试
这种思路放在团队里的另一个好处:新人加入后,添加一个新的处理步骤只需要实现一个 PromptTransformer,然后在合适的地方插入流水线,不会改动其他逻辑,风险极低。
