第1912篇:Redis Vector Search的工程实践——RediSearch在实时推荐中的应用
第1912篇:Redis Vector Search的工程实践——RediSearch在实时推荐中的应用
做推荐系统的同学都清楚,实时推荐有一个绕不过去的矛盾:又要快,又要准。
快,意味着毫秒级响应,用户刷动态的时候你不能让他等;准,意味着召回的内容要和用户真实兴趣匹配。传统的基于规则或协同过滤的推荐已经很难满足现代应用的要求,但上了向量搜索之后,延迟又往往成了新的瓶颈。
我在做一个内容推荐平台的时候,尝试过 Milvus、pgvector,最终在实时推荐这个场景里选择了 Redis + RediSearch。这篇文章把当时的选型逻辑和工程实践全部拆开讲一遍。
一、为什么是 Redis,而不是专用向量数据库
首先说清楚适用场景:Redis Vector Search 不是要替代 Milvus、pgvector 这些东西,它有自己的定位。
Redis 的核心优势是内存存储,天然低延迟。RediSearch 模块在 Redis 7.2 之后已经内置(不需要单独安装模块),支持向量字段的 HNSW 和 Flat 索引。
对于实时推荐这个场景,数据特点是:
- 向量维度适中(256~768 维居多)
- 数据量可控(千万以内的热点数据)
- 对延迟极其敏感(P99 要在 5ms 以内)
- 业务数据本身就存在 Redis 里(用户会话、实时特征等)
如果你的业务数据本身就在 Redis 里,那把向量索引也放进来,省去了跨系统的网络开销,架构更简洁。这个收益比想象中要大。
二、环境搭建与 RediSearch 配置
2.1 Docker 启动
# docker-compose.yml
version: '3.8'
services:
redis:
image: redis/redis-stack:latest
ports:
- "6379:6379"
- "8001:8001" # RedisInsight 管理界面
environment:
REDIS_ARGS: "--requirepass redis123"
volumes:
- redis_data:/data
command: >
redis-stack-server
--save 60 1
--loglevel warning
--maxmemory 8gb
--maxmemory-policy allkeys-lru
volumes:
redis_data:注意 maxmemory-policy 设置为 allkeys-lru,这样 Redis 内存不够时会自动淘汰最久未访问的数据。向量数据通常有时效性,这个策略比较合理。
2.2 Java 依赖
<dependencies>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>5.1.0</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<!-- Redis JSON 支持 -->
<dependency>
<groupId>com.redis</groupId>
<artifactId>redis-om-spring</artifactId>
<version>0.9.0</version>
</dependency>
</dependencies>三、索引设计:向量字段 + 业务字段的联合索引
RediSearch 索引的设计是最关键的一步,字段类型和索引类型直接影响后续查询能力。
3.1 创建向量索引(通过 Jedis 命令行)
import redis.clients.jedis.UnifiedJedis;
import redis.clients.jedis.search.IndexDefinition;
import redis.clients.jedis.search.IndexOptions;
import redis.clients.jedis.search.Schema;
@Component
@RequiredArgsConstructor
public class RedisVectorIndexManager {
private final UnifiedJedis jedis;
// 内容向量索引名
private static final String CONTENT_INDEX = "content_vector_idx";
public void createContentIndex() {
try {
// 检查索引是否已存在
jedis.ftInfo(CONTENT_INDEX);
log.info("索引 {} 已存在,跳过创建", CONTENT_INDEX);
return;
} catch (Exception e) {
// 索引不存在,继续创建
}
// 定义 Hash 结构的索引(比 JSON 格式性能更好)
IndexDefinition definition = new IndexDefinition(IndexDefinition.Type.HASH)
.setPrefixes("content:");
Schema schema = new Schema()
// 内容 ID(可用于精确查找)
.addTagField("content_id")
// 内容类型:article/video/post
.addTagField("content_type")
// 作者 ID
.addTagField("author_id")
// 发布时间戳(用于时间衰减排序)
.addNumericField("publish_ts")
// 热度分(点赞+评论+转发的加权)
.addNumericField("hot_score")
// 向量字段(HNSW 索引,余弦相似度,768维)
.addVectorField("embedding",
Schema.VectorField.VectorAlgo.HNSW,
Map.of(
"TYPE", "FLOAT32",
"DIM", "768",
"DISTANCE_METRIC", "COSINE",
"M", "16",
"EF_CONSTRUCTION", "200",
"EF_RUNTIME", "100"
)
);
jedis.ftCreate(CONTENT_INDEX,
IndexOptions.defaultOptions().setDefinition(definition),
schema);
log.info("向量索引 {} 创建成功", CONTENT_INDEX);
}
}这里有几个细节:
TYPE用FLOAT32,不用FLOAT64,内存占用减半,精度损失可以忽略EF_RUNTIME建议比EF_CONSTRUCTION小,这是运行时搜索宽度,控制查询精度和速度的平衡- 前缀
content:是 Redis Key 的前缀,RediSearch 会索引所有以这个前缀开头的 Hash
3.2 数据写入
@Service
@RequiredArgsConstructor
@Slf4j
public class ContentVectorService {
private final UnifiedJedis jedis;
private final EmbeddingClient embeddingClient;
/**
* 写入单条内容向量
*/
public void indexContent(ContentDTO content) {
// 1. 获取或计算 embedding
float[] embedding = embeddingClient.embed(
content.getTitle() + " " + content.getSummary()
);
// 2. 序列化向量为字节数组(FLOAT32 Little-Endian)
byte[] embeddingBytes = floatArrayToBytes(embedding);
// 3. 构建 Redis Hash 字段
String key = "content:" + content.getContentId();
Map<String, String> fields = new HashMap<>();
fields.put("content_id", content.getContentId());
fields.put("content_type", content.getContentType());
fields.put("author_id", content.getAuthorId());
fields.put("publish_ts",
String.valueOf(content.getPublishTime().toEpochSecond()));
fields.put("hot_score", String.valueOf(content.getHotScore()));
// 向量字段用二进制写入
jedis.hset(key.getBytes(), "embedding".getBytes(), embeddingBytes);
// 其他字段用文本写入
jedis.hset(key, fields);
// 设置过期时间(7天),避免数据无限膨胀
jedis.expire(key, 7 * 24 * 3600);
}
/**
* float[] 转 FLOAT32 字节数组(Little-Endian)
*/
private byte[] floatArrayToBytes(float[] floats) {
ByteBuffer buffer = ByteBuffer.allocate(floats.length * 4)
.order(ByteOrder.LITTLE_ENDIAN);
for (float f : floats) {
buffer.putFloat(f);
}
return buffer.array();
}
}这里最容易踩的坑是字节序。Redis Vector Search 要求向量必须是 Little-Endian 的 FLOAT32 字节数组,如果用默认的 Big-Endian 写进去,查询结果会完全乱掉,但不会报错,这个错误很难排查。
四、实时推荐的查询逻辑
4.1 基础向量搜索
/**
* 向量搜索 + 业务过滤
*/
public List<ContentSearchResult> searchSimilarContent(
float[] userEmbedding,
String contentType,
long minPublishTs,
int topK) {
byte[] queryBytes = floatArrayToBytes(userEmbedding);
// 构建 KNN 查询
// 语法:*=>[KNN {k} @{vectorField} $vec AS score]
String baseFilter = "*";
if (contentType != null) {
baseFilter = "@content_type:{" + contentType + "}";
}
if (minPublishTs > 0) {
String tsFilter = "@publish_ts:[" + minPublishTs + " +inf]";
baseFilter = baseFilter.equals("*")
? tsFilter
: "(" + baseFilter + " " + tsFilter + ")";
}
String queryStr = baseFilter + "=>[KNN " + topK
+ " @embedding $vec AS vec_score]";
Query query = new Query(queryStr)
.addParam("vec", queryBytes)
.returnFields("content_id", "content_type", "author_id",
"hot_score", "vec_score")
.setSortBy("vec_score", true) // 升序(距离越小越相似)
.limit(0, topK)
.dialect(2); // 必须用 dialect 2 才支持向量搜索
SearchResult result = jedis.ftSearch(CONTENT_INDEX, query);
return result.getDocuments().stream()
.map(doc -> ContentSearchResult.builder()
.contentId(doc.getString("content_id"))
.contentType(doc.getString("content_type"))
.hotScore(Double.parseDouble(doc.getString("hot_score")))
.vectorScore(Double.parseDouble(doc.getString("vec_score")))
.build())
.collect(Collectors.toList());
}注意 .dialect(2) 这一行不能省,RediSearch 2.6+ 用 dialect 2 才能正确解析向量查询语法。这个坑我在文档里没看到明显提示,是在 GitHub issue 里找到的。
4.2 混合排序:向量相似度 + 时间衰减 + 热度
纯向量相似度排序在推荐场景里往往效果不好,因为用户对内容的时效性和热度也有诉求。我们在 Java 层做了一个二次排序:
/**
* 混合排序算法
* 最终得分 = 相似度分 * α + 热度分 * β + 时效分 * γ
*/
public List<ContentSearchResult> hybridRanking(
List<ContentSearchResult> candidates,
RankingWeights weights) {
long now = System.currentTimeMillis() / 1000;
return candidates.stream()
.map(item -> {
// 相似度分:cosine distance 转换为 [0,1] 的相似度
double simScore = 1.0 - item.getVectorScore();
// 热度分:归一化到 [0,1]
double hotScore = Math.log1p(item.getHotScore()) / Math.log1p(10000);
hotScore = Math.min(1.0, hotScore);
// 时效分:指数衰减,24小时衰减到0.5
double ageHours = (now - item.getPublishTs()) / 3600.0;
double timeScore = Math.exp(-0.693 * ageHours / 24.0);
double finalScore = simScore * weights.getSimilarity()
+ hotScore * weights.getHotScore()
+ timeScore * weights.getTimeDecay();
return item.toBuilder().finalScore(finalScore).build();
})
.sorted(Comparator.comparingDouble(ContentSearchResult::getFinalScore)
.reversed())
.collect(Collectors.toList());
}权重参数 α=0.5, β=0.2, γ=0.3 是我们通过离线实验得到的,不同业务场景差异很大,建议做 A/B 测试来调整。
五、用户兴趣向量的实时更新
推荐系统的核心是用户画像,用户兴趣向量要随着用户的实时行为动态更新。
5.1 用户向量更新策略
@Service
@RequiredArgsConstructor
public class UserInterestVectorService {
private final UnifiedJedis jedis;
private final EmbeddingClient embeddingClient;
// 用户兴趣向量的 Redis Key 前缀
private static final String USER_VECTOR_KEY = "user:interest:";
// 滑动窗口大小:最近 50 次交互
private static final int WINDOW_SIZE = 50;
/**
* 用户产生行为时,更新兴趣向量
* 行为类型:click/like/comment/share,权重不同
*/
public void updateUserInterest(String userId, String contentId,
BehaviorType behaviorType) {
// 1. 获取内容的向量
String contentKey = "content:" + contentId;
byte[] contentEmbBytes = (byte[]) jedis.hget(
contentKey.getBytes(), "embedding".getBytes());
if (contentEmbBytes == null) {
log.warn("内容 {} 的向量不存在,跳过用户兴趣更新", contentId);
return;
}
float[] contentEmbedding = bytesToFloatArray(contentEmbBytes);
// 2. 获取当前用户兴趣向量(不存在则初始化为零向量)
String userKey = USER_VECTOR_KEY + userId;
byte[] userEmbBytes = (byte[]) jedis.get(userKey.getBytes());
float[] userEmbedding = userEmbBytes != null
? bytesToFloatArray(userEmbBytes)
: new float[768];
// 3. 指数移动平均更新(EMA)
// α 由行为类型决定:点击 0.05,点赞 0.1,评论 0.15,分享 0.2
double alpha = behaviorType.getAlpha();
float[] newEmbedding = new float[768];
for (int i = 0; i < 768; i++) {
newEmbedding[i] = (float) (
alpha * contentEmbedding[i] + (1 - alpha) * userEmbedding[i]
);
}
// 4. L2 归一化
newEmbedding = normalize(newEmbedding);
// 5. 写回 Redis(30 天过期)
jedis.set(userKey.getBytes(), floatArrayToBytes(newEmbedding));
jedis.expire(userKey, 30 * 24 * 3600);
log.debug("用户 {} 兴趣向量已更新,行为类型={}", userId, behaviorType);
}
private float[] normalize(float[] vector) {
double norm = 0;
for (float v : vector) norm += v * v;
norm = Math.sqrt(norm);
if (norm == 0) return vector;
float[] result = new float[vector.length];
for (int i = 0; i < vector.length; i++) {
result[i] = (float) (vector[i] / norm);
}
return result;
}
private float[] bytesToFloatArray(byte[] bytes) {
ByteBuffer buffer = ByteBuffer.wrap(bytes).order(ByteOrder.LITTLE_ENDIAN);
float[] floats = new float[bytes.length / 4];
for (int i = 0; i < floats.length; i++) {
floats[i] = buffer.getFloat();
}
return floats;
}
}EMA(指数移动平均)更新方式的好处是:不需要维护用户历史行为列表,只需要一个向量就能捕获兴趣漂移。α 值越小,新行为的影响越小,兴趣向量越稳定;α 值越大,实时性越高,但也更容易被单次行为带偏。
5.2 完整推荐流程
六、性能优化:Pipeline 与连接池
6.1 批量写入用 Pipeline
单条写入 Redis 的延迟主要来自网络 RTT,批量写入时务必用 Pipeline:
public void batchIndexContents(List<ContentDTO> contents) {
// 批量获取 embedding
List<String> texts = contents.stream()
.map(c -> c.getTitle() + " " + c.getSummary())
.collect(Collectors.toList());
List<float[]> embeddings = embeddingClient.embedBatch(texts);
// Pipeline 批量写入
Pipeline pipeline = jedis.pipelined();
for (int i = 0; i < contents.size(); i++) {
ContentDTO content = contents.get(i);
byte[] embBytes = floatArrayToBytes(embeddings.get(i));
String key = "content:" + content.getContentId();
byte[] keyBytes = key.getBytes();
pipeline.hset(keyBytes, "embedding".getBytes(), embBytes);
pipeline.hset(key, Map.of(
"content_id", content.getContentId(),
"content_type", content.getContentType(),
"author_id", content.getAuthorId(),
"publish_ts", String.valueOf(
content.getPublishTime().toEpochSecond()),
"hot_score", String.valueOf(content.getHotScore())
));
pipeline.expire(key, 7 * 24 * 3600);
}
pipeline.sync();
log.info("Pipeline 批量写入 {} 条内容向量完成", contents.size());
}Pipeline 的写入速度能比逐条写入快 5~10 倍,在批量初始化数据时收益非常明显。
6.2 连接池配置
@Configuration
public class RedisConfig {
@Bean
public UnifiedJedis jedis(@Value("${redis.host}") String host,
@Value("${redis.port}") int port,
@Value("${redis.password}") String password) {
JedisPoolConfig poolConfig = new JedisPoolConfig();
poolConfig.setMaxTotal(50); // 最大连接数
poolConfig.setMaxIdle(20); // 最大空闲连接
poolConfig.setMinIdle(5); // 最小空闲连接
poolConfig.setMaxWait(Duration.ofMillis(2000)); // 获取连接超时
poolConfig.setTestOnBorrow(true);
poolConfig.setTestWhileIdle(true);
JedisPool pool = new JedisPool(poolConfig, host, port, 2000, password);
return new UnifiedJedis(pool);
}
}连接数的设置要结合实际 QPS 和每次查询的耗时计算,不是越多越好。我们线上 2000 QPS 的推荐服务,50 个连接绰绰有余。
七、踩过的坑
坑1:索引更新不实时
往 Redis 里写了新数据,立刻查发现没有。RediSearch 的索引更新是异步的,写入和索引建立之间有一个很短的延迟(通常几十毫秒)。这个延迟在写入压力大的时候会拉长。
解决方案:写入后不要立刻查询,或者用 FT.SEARCH 的 NOCONTENT 标志配合业务 ID 查。
坑2:FLOAT32 字节序问题
上面提到过,Little-Endian 是必须的。如果从 Python 那边生成向量存过来,默认是 Little-Endian 没问题;如果是 Java 这边生成,ByteBuffer 默认是 Big-Endian,务必指定 .order(ByteOrder.LITTLE_ENDIAN)。
坑3:向量维度在索引创建后无法修改
RediSearch 的向量字段维度是在创建索引时固定的,后续不能修改。如果要换 embedding 模型(比如从 768 维换到 1536 维),必须删除索引重建,同时所有历史数据都要重新写入。
做好文档,写清楚当前用的 embedding 模型和维度,是个好习惯。
坑4:内存估算要留 buffer
RediSearch HNSW 索引的内存占用大约是:数据量 × 维度 × 4字节 × (1 + M × 2)。100 万条 768 维的数据,m=16 时大约需要 100万 × 768 × 4 × 33 ≈ 100GB。
Redis 内存规划必须把索引内存算进去,不然生产环境会 OOM。实际经验是预留 2 倍的 buffer。
八、监控与运维
@Component
@RequiredArgsConstructor
public class RedisVectorMonitor {
private final UnifiedJedis jedis;
private final MeterRegistry meterRegistry;
@Scheduled(fixedDelay = 60000) // 每分钟上报一次
public void reportIndexStats() {
try {
Map<String, Object> info = jedis.ftInfo(CONTENT_INDEX);
long numDocs = Long.parseLong(info.get("num_docs").toString());
long indexingTime = Long.parseLong(
info.get("total_indexing_time").toString());
meterRegistry.gauge("redis.search.num_docs",
Tags.of("index", CONTENT_INDEX), numDocs);
meterRegistry.gauge("redis.search.indexing_time_ms",
Tags.of("index", CONTENT_INDEX), indexingTime);
log.debug("RediSearch 索引统计:文档数={}, 索引耗时={}ms",
numDocs, indexingTime);
} catch (Exception e) {
log.error("RediSearch 监控数据采集失败", e);
}
}
}重点监控几个指标:
num_docs:索引文档数,应该和写入量一致total_indexing_time:总索引耗时,持续增长说明索引压力大- 向量搜索 P99 延迟:生产环境应该保持在 5ms 以内
九、什么时候不应该用 Redis Vector Search
说完优点,也要说局限性。
数据量超过亿级:纯内存存储,成本太高。
向量维度超过 1536:维度越高,内存占用越大,查询越慢。
需要复杂的标量过滤:RediSearch 的过滤能力不如 pgvector + PostgreSQL 强大,复杂的 SQL 表达式没法用。
数据持久化要求高:Redis 的 RDB/AOF 持久化写放大比较严重,向量数据量大时 dump 很慢。
选技术栈没有银弹。Redis Vector Search 在实时推荐、在线特征检索、会话相关的向量搜索这几个场景里确实很合适。超出这个范围,还是老老实实用专用向量数据库。
