第1816篇:时序数据库与LLM结合——IoT设备日志的智能分析
第1816篇:时序数据库与LLM结合——IoT设备日志的智能分析
我见过太多IoT项目的监控系统,都是一个模子刻出来的:一大堆折线图、一堆阈值告警、值班的人盯着屏幕,看到红灯就去查日志。
这种模式有几个根本性的问题。一是信息密度太低——几十台设备、几百个指标,人根本看不过来;二是告警没有上下文——温度超标了,是正常工况还是真的故障?需要翻日志对比;三是分析能力的天花板就是人——再有经验的运维,也很难在脑子里同时处理多个设备的多维度关联分析。
这篇文章聊的是我在一个数控机床监控项目里做的一套系统:时序数据库(TimescaleDB)+ LLM,把设备日志分析从"人看图表"升级到"AI读懂数据"。
为什么是TimescaleDB
选时序数据库有好几个备选:InfluxDB、TimescaleDB、TDengine、Prometheus等。
我选TimescaleDB的原因很实际:
- 基于PostgreSQL:团队已经熟悉PostgreSQL,学习成本几乎为零;标准SQL可用,不用学新的查询语言
- 时序优化:自动分区(hypertable)、压缩、时间聚合函数(time_bucket、first、last等)
- 与LangChain4j集成方便:可以直接用JDBC,不需要额外的适配层
- 功能完整性:支持continuous aggregates(物化视图+自动刷新),非常适合做多粒度聚合
当然,如果你的团队对InfluxDB更熟悉,或者数据量特别大需要更好的横向扩展,InfluxDB也是不错的选择。工具没有绝对的好坏,适合的才是最好的。
数据模型设计
-- 创建设备指标表(TimescaleDB hypertable)
CREATE TABLE device_metrics (
time TIMESTAMPTZ NOT NULL,
device_id TEXT NOT NULL,
metric_name TEXT NOT NULL,
value DOUBLE PRECISION NOT NULL,
unit TEXT,
quality SMALLINT DEFAULT 100 -- 数据质量分,0-100
);
-- 创建hypertable(时序分区表)
SELECT create_hypertable('device_metrics', 'time',
chunk_time_interval => INTERVAL '1 day');
-- 创建索引
CREATE INDEX ON device_metrics (device_id, metric_name, time DESC);
-- 设备告警事件表
CREATE TABLE device_alerts (
id BIGSERIAL,
time TIMESTAMPTZ NOT NULL,
device_id TEXT NOT NULL,
alert_type TEXT NOT NULL,
severity SMALLINT NOT NULL, -- 1=INFO, 2=WARN, 3=ERROR, 4=CRITICAL
message TEXT,
raw_data JSONB,
resolved_at TIMESTAMPTZ,
PRIMARY KEY (id, time)
);
SELECT create_hypertable('device_alerts', 'time');
-- 连续聚合视图:1分钟粒度统计(自动刷新)
CREATE MATERIALIZED VIEW device_metrics_1m
WITH (timescaledb.continuous) AS
SELECT
time_bucket('1 minute', time) AS bucket,
device_id,
metric_name,
AVG(value) AS avg_val,
MAX(value) AS max_val,
MIN(value) AS min_val,
STDDEV(value) AS std_val,
COUNT(*) AS sample_count
FROM device_metrics
GROUP BY bucket, device_id, metric_name
WITH NO DATA;
-- 设置自动刷新策略
SELECT add_continuous_aggregate_policy('device_metrics_1m',
start_offset => INTERVAL '10 minutes',
end_offset => INTERVAL '1 minute',
schedule_interval => INTERVAL '1 minute');
-- 数据保留策略(原始数据保留7天,1分钟聚合保留90天)
SELECT add_retention_policy('device_metrics', INTERVAL '7 days');Java数据采集服务
@Service
@Slf4j
public class MetricsCollectionService {
@Autowired
private JdbcTemplate jdbcTemplate;
@Autowired
private KafkaTemplate<String, DeviceMetricsBatch> kafkaTemplate;
// 批量写入缓冲区
private final BlockingQueue<DeviceMetric> writeBuffer =
new LinkedBlockingQueue<>(10000);
private final ScheduledExecutorService flushExecutor =
Executors.newSingleThreadScheduledExecutor();
@PostConstruct
public void startFlushTask() {
// 每秒批量写入一次,减少数据库写入压力
flushExecutor.scheduleAtFixedRate(this::flushBuffer, 1, 1, TimeUnit.SECONDS);
}
/**
* 接收来自IoT网关的指标数据
*/
public void receiveMetric(DeviceMetric metric) {
if (!writeBuffer.offer(metric)) {
log.warn("Write buffer full, dropping metric for device: {}", metric.getDeviceId());
}
}
/**
* 批量写入数据库
*/
private void flushBuffer() {
List<DeviceMetric> batch = new ArrayList<>(500);
writeBuffer.drainTo(batch, 500);
if (batch.isEmpty()) return;
try {
String sql = "INSERT INTO device_metrics (time, device_id, metric_name, value, unit, quality) " +
"VALUES (?, ?, ?, ?, ?, ?)";
jdbcTemplate.batchUpdate(sql, batch, 500, (ps, metric) -> {
ps.setTimestamp(1, Timestamp.from(Instant.ofEpochMilli(metric.getTimestamp())));
ps.setString(2, metric.getDeviceId());
ps.setString(3, metric.getMetricName());
ps.setDouble(4, metric.getValue());
ps.setString(5, metric.getUnit());
ps.setInt(6, metric.getQuality());
});
log.debug("Flushed {} metrics to TimescaleDB", batch.size());
} catch (Exception e) {
log.error("Failed to flush metrics batch", e);
// 写入失败的数据放回缓冲区(简化处理,生产上应该写入补偿队列)
batch.forEach(m -> writeBuffer.offer(m));
}
}
}时序数据查询层
@Repository
@Slf4j
public class TimeSeriesQueryRepository {
@Autowired
private JdbcTemplate jdbcTemplate;
/**
* 查询设备最近N分钟的指标数据
*/
public List<MetricDataPoint> queryRecent(String deviceId, String metricName, int minutes) {
String sql = """
SELECT time, value, quality
FROM device_metrics
WHERE device_id = ?
AND metric_name = ?
AND time > NOW() - INTERVAL '%d minutes'
ORDER BY time ASC
""".formatted(minutes);
return jdbcTemplate.query(sql, (rs, rowNum) -> MetricDataPoint.builder()
.timestamp(rs.getTimestamp("time").toInstant().toEpochMilli())
.value(rs.getDouble("value"))
.quality(rs.getInt("quality"))
.build(),
deviceId, metricName);
}
/**
* 聚合查询:时间桶内的统计信息
* 使用连续聚合视图,查询性能极佳
*/
public List<MetricAggregation> queryAggregated(String deviceId, String metricName,
Instant from, Instant to,
String bucketInterval) {
String sql = """
SELECT bucket, avg_val, max_val, min_val, std_val, sample_count
FROM device_metrics_1m
WHERE device_id = ?
AND metric_name = ?
AND bucket BETWEEN ? AND ?
ORDER BY bucket ASC
""";
return jdbcTemplate.query(sql, (rs, rowNum) -> MetricAggregation.builder()
.bucketTime(rs.getTimestamp("bucket").toInstant().toEpochMilli())
.avg(rs.getDouble("avg_val"))
.max(rs.getDouble("max_val"))
.min(rs.getDouble("min_val"))
.stdDev(rs.getDouble("std_val"))
.sampleCount(rs.getLong("sample_count"))
.build(),
deviceId, metricName,
Timestamp.from(from), Timestamp.from(to));
}
/**
* 异常检测查询:基于统计学方法的异常识别
* 使用3-sigma法则:超过均值±3个标准差视为异常
*/
public List<AnomalyPoint> detectStatisticalAnomalies(String deviceId, String metricName,
int lookbackHours) {
String sql = """
WITH stats AS (
SELECT
AVG(value) AS mean_val,
STDDEV(value) AS std_val
FROM device_metrics
WHERE device_id = ?
AND metric_name = ?
AND time > NOW() - INTERVAL '%d hours'
)
SELECT
m.time,
m.value,
s.mean_val,
s.std_val,
ABS(m.value - s.mean_val) / NULLIF(s.std_val, 0) AS z_score
FROM device_metrics m, stats s
WHERE m.device_id = ?
AND m.metric_name = ?
AND m.time > NOW() - INTERVAL '%d hours'
AND ABS(m.value - s.mean_val) > 3 * s.std_val
ORDER BY m.time DESC
LIMIT 100
""".formatted(lookbackHours, lookbackHours);
return jdbcTemplate.query(sql, (rs, rowNum) -> AnomalyPoint.builder()
.timestamp(rs.getTimestamp("time").toInstant().toEpochMilli())
.value(rs.getDouble("value"))
.meanValue(rs.getDouble("mean_val"))
.zScore(rs.getDouble("z_score"))
.build(),
deviceId, metricName, deviceId, metricName);
}
/**
* 相关性分析:找出与目标指标相关的其他指标
* 用于LLM分析时提供多维度上下文
*/
public Map<String, Double> computeCorrelations(String deviceId,
String targetMetric,
int hours) {
// 获取该设备所有指标
List<String> allMetrics = jdbcTemplate.queryForList(
"SELECT DISTINCT metric_name FROM device_metrics WHERE device_id = ?",
String.class, deviceId);
Map<String, Double> correlations = new HashMap<>();
for (String metric : allMetrics) {
if (metric.equals(targetMetric)) continue;
// 计算Pearson相关系数
String corrSql = """
WITH aligned AS (
SELECT
time_bucket('1 minute', t.time) AS bucket,
AVG(t.value) AS target_val,
AVG(o.value) AS other_val
FROM device_metrics t
JOIN device_metrics o ON
time_bucket('1 minute', t.time) = time_bucket('1 minute', o.time)
AND t.device_id = o.device_id
WHERE t.device_id = ?
AND t.metric_name = ?
AND o.metric_name = ?
AND t.time > NOW() - INTERVAL '%d hours'
GROUP BY bucket
)
SELECT corr(target_val, other_val) AS correlation
FROM aligned
""".formatted(hours);
try {
Double corr = jdbcTemplate.queryForObject(corrSql, Double.class,
deviceId, targetMetric, metric);
if (corr != null && !corr.isNaN()) {
correlations.put(metric, corr);
}
} catch (Exception e) {
log.debug("Correlation computation failed for {}: {}", metric, e.getMessage());
}
}
return correlations;
}
}LLM智能分析服务
把时序数据分析结果喂给LLM,生成可读的诊断报告:
@Service
@Slf4j
public class DeviceIntelligenceService {
private final TimeSeriesQueryRepository tsRepo;
private final ChatLanguageModel chatModel;
private final DeviceKnowledgeBase knowledgeBase; // 设备手册、故障历史等知识
/**
* 设备健康诊断:综合多维度数据生成分析报告
*/
public DeviceHealthReport analyzeDeviceHealth(String deviceId) {
long startTime = System.currentTimeMillis();
// 1. 收集多维度数据
DeviceAnalysisData data = collectAnalysisData(deviceId);
// 2. 初步判断是否有异常(避免每次都调LLM)
boolean hasAnomalies = !data.getAnomalyPoints().isEmpty()
|| !data.getActiveAlerts().isEmpty();
if (!hasAnomalies) {
// 无异常,返回简单的"设备正常"报告,不调LLM
return DeviceHealthReport.normal(deviceId);
}
// 3. 检索相关知识(RAG)
String knowledgeContext = knowledgeBase.retrieveRelevantKnowledge(
deviceId, data.getAnomalyTypes());
// 4. 构造分析Prompt
String prompt = buildAnalysisPrompt(deviceId, data, knowledgeContext);
// 5. LLM分析
String analysis = chatModel.generate(prompt);
// 6. 解析并组装报告
DeviceHealthReport report = parseAnalysisResult(deviceId, data, analysis);
report.setAnalysisTimeMs(System.currentTimeMillis() - startTime);
return report;
}
private DeviceAnalysisData collectAnalysisData(String deviceId) {
// 并行收集,提高效率
CompletableFuture<List<AnomalyPoint>> anomaliesFuture = CompletableFuture.supplyAsync(() ->
tsRepo.detectStatisticalAnomalies(deviceId, "temperature", 1));
CompletableFuture<List<MetricAggregation>> tempAggFuture = CompletableFuture.supplyAsync(() ->
tsRepo.queryAggregated(deviceId, "temperature",
Instant.now().minus(1, ChronoUnit.HOURS), Instant.now(), "1m"));
CompletableFuture<List<MetricAggregation>> vibAggFuture = CompletableFuture.supplyAsync(() ->
tsRepo.queryAggregated(deviceId, "vibration",
Instant.now().minus(1, ChronoUnit.HOURS), Instant.now(), "1m"));
CompletableFuture<Map<String, Double>> correlationsFuture = CompletableFuture.supplyAsync(() ->
tsRepo.computeCorrelations(deviceId, "temperature", 2));
try {
CompletableFuture.allOf(anomaliesFuture, tempAggFuture, vibAggFuture, correlationsFuture)
.get(5, TimeUnit.SECONDS);
return DeviceAnalysisData.builder()
.anomalyPoints(anomaliesFuture.get())
.temperatureAggregations(tempAggFuture.get())
.vibrationAggregations(vibAggFuture.get())
.correlations(correlationsFuture.get())
.build();
} catch (Exception e) {
log.error("Failed to collect analysis data for device: {}", deviceId, e);
return DeviceAnalysisData.empty();
}
}
private String buildAnalysisPrompt(String deviceId, DeviceAnalysisData data,
String knowledgeContext) {
StringBuilder sb = new StringBuilder();
sb.append("你是一位资深的数控机床维护专家。请分析以下设备数据并给出诊断报告。\n\n");
sb.append("## 设备ID: ").append(deviceId).append("\n\n");
// 添加异常点信息
if (!data.getAnomalyPoints().isEmpty()) {
sb.append("## 检测到的统计异常(过去1小时)\n");
data.getAnomalyPoints().forEach(ap ->
sb.append(String.format("- 时间: %s, 值: %.2f (Z-score: %.1f)\n",
formatTime(ap.getTimestamp()), ap.getValue(), ap.getZScore())));
sb.append("\n");
}
// 添加温度趋势摘要
if (!data.getTemperatureAggregations().isEmpty()) {
sb.append("## 温度趋势摘要(过去1小时)\n");
addAggregationSummary(sb, data.getTemperatureAggregations(), "°C");
sb.append("\n");
}
// 添加相关性信息
if (!data.getCorrelations().isEmpty()) {
sb.append("## 温度与其他指标相关性\n");
data.getCorrelations().entrySet().stream()
.filter(e -> Math.abs(e.getValue()) > 0.6) // 只展示强相关
.sorted(Map.Entry.<String, Double>comparingByValue().reversed())
.forEach(e -> sb.append(String.format("- %s: %.2f\n", e.getKey(), e.getValue())));
sb.append("\n");
}
// 添加设备知识
if (knowledgeContext != null && !knowledgeContext.isBlank()) {
sb.append("## 参考资料\n").append(knowledgeContext).append("\n\n");
}
sb.append("请提供:\n");
sb.append("1. 健康状态评估(正常/注意/警告/危险)\n");
sb.append("2. 可能的故障原因分析\n");
sb.append("3. 建议的维护措施(按优先级排序)\n");
sb.append("4. 下次预防性维护的建议时间\n\n");
sb.append("以JSON格式返回:{\"status\": \"...\", \"causes\": [], \"actions\": [], \"nextMaintenance\": \"...\"}");
return sb.toString();
}
private void addAggregationSummary(StringBuilder sb, List<MetricAggregation> aggs, String unit) {
if (aggs.isEmpty()) return;
double overallMax = aggs.stream().mapToDouble(MetricAggregation::getMax).max().orElse(0);
double overallMin = aggs.stream().mapToDouble(MetricAggregation::getMin).min().orElse(0);
double overallAvg = aggs.stream().mapToDouble(MetricAggregation::getAvg).average().orElse(0);
// 检测趋势
String trend = detectTrend(aggs);
sb.append(String.format("最高: %.2f%s, 最低: %.2f%s, 平均: %.2f%s, 趋势: %s\n",
overallMax, unit, overallMin, unit, overallAvg, unit, trend));
}
private String detectTrend(List<MetricAggregation> aggs) {
if (aggs.size() < 5) return "数据不足";
// 简单线性趋势检测:比较前后半段的平均值
int half = aggs.size() / 2;
double firstHalfAvg = aggs.subList(0, half).stream()
.mapToDouble(MetricAggregation::getAvg).average().orElse(0);
double secondHalfAvg = aggs.subList(half, aggs.size()).stream()
.mapToDouble(MetricAggregation::getAvg).average().orElse(0);
double change = (secondHalfAvg - firstHalfAvg) / firstHalfAvg * 100;
if (change > 5) return String.format("上升趋势(+%.1f%%)", change);
if (change < -5) return String.format("下降趋势(%.1f%%)", change);
return "平稳";
}
private String formatTime(long timestamp) {
return LocalDateTime.ofEpochSecond(timestamp/1000, 0, ZoneOffset.UTC)
.format(DateTimeFormatter.ofPattern("HH:mm:ss"));
}
private DeviceHealthReport parseAnalysisResult(String deviceId, DeviceAnalysisData data,
String analysisJson) {
try {
ObjectMapper mapper = new ObjectMapper();
String jsonStr = extractJson(analysisJson);
JsonNode node = mapper.readTree(jsonStr);
List<String> causes = new ArrayList<>();
node.get("causes").forEach(c -> causes.add(c.asText()));
List<String> actions = new ArrayList<>();
node.get("actions").forEach(a -> actions.add(a.asText()));
return DeviceHealthReport.builder()
.deviceId(deviceId)
.status(node.get("status").asText("注意"))
.causes(causes)
.actions(actions)
.nextMaintenance(node.get("nextMaintenance").asText())
.anomalyCount(data.getAnomalyPoints().size())
.generatedAt(LocalDateTime.now())
.build();
} catch (Exception e) {
log.error("Failed to parse LLM analysis result", e);
return DeviceHealthReport.builder()
.deviceId(deviceId)
.status("需要人工检查")
.causes(List.of("分析结果解析失败"))
.actions(List.of("请人工查看原始日志"))
.generatedAt(LocalDateTime.now())
.build();
}
}
private String extractJson(String text) {
int start = text.indexOf('{');
int end = text.lastIndexOf('}');
return (start >= 0 && end > start) ? text.substring(start, end + 1) : "{}";
}
}自然语言查询:让运维能"问"数据
这是我最喜欢的功能:运维不需要写SQL,直接用自然语言提问:
@Service
@Slf4j
public class NaturalLanguageQueryService {
private final ChatLanguageModel chatModel;
private final JdbcTemplate jdbcTemplate;
private static final String SQL_GENERATION_SYSTEM_PROMPT = """
你是一个TimescaleDB SQL专家。根据用户的自然语言问题,生成正确的SQL查询。
数据库表结构:
- device_metrics(time TIMESTAMPTZ, device_id TEXT, metric_name TEXT, value DOUBLE PRECISION)
- device_metrics_1m(bucket TIMESTAMPTZ, device_id TEXT, metric_name TEXT, avg_val FLOAT, max_val FLOAT, min_val FLOAT)
- device_alerts(time TIMESTAMPTZ, device_id TEXT, alert_type TEXT, severity SMALLINT, message TEXT)
TimescaleDB特殊函数:
- time_bucket('间隔', time_column) -- 时间分桶
- first(value, time) -- 时间最早的值
- last(value, time) -- 时间最新的值
规则:
1. 只生成SELECT语句,禁止生成修改数据的SQL
2. 时间范围用NOW() - INTERVAL 'xxx'表示
3. 只返回SQL,不要有其他说明文字
""";
public QueryResult executeNaturalLanguageQuery(String question) {
// Step 1: 将自然语言转SQL
String sqlGenerationPrompt = "用户问题: " + question + "\n\n生成对应的SQL查询:";
String generatedSQL;
try {
generatedSQL = chatModel.generate(
SystemMessage.from(SQL_GENERATION_SYSTEM_PROMPT),
UserMessage.from(sqlGenerationPrompt)
);
generatedSQL = cleanSQL(generatedSQL);
} catch (Exception e) {
return QueryResult.error("SQL生成失败: " + e.getMessage());
}
log.info("Generated SQL: {}", generatedSQL);
// Step 2: 安全校验(只允许SELECT)
if (!isSafeSQL(generatedSQL)) {
return QueryResult.error("生成的SQL不安全,已被拦截");
}
// Step 3: 执行查询
List<Map<String, Object>> rawResults;
try {
rawResults = jdbcTemplate.queryForList(generatedSQL);
} catch (Exception e) {
log.error("SQL execution failed: {}", generatedSQL, e);
return QueryResult.error("查询执行失败: " + e.getMessage());
}
// Step 4: 用LLM将结果转化为自然语言回答
String answer = generateNaturalLanguageAnswer(question, rawResults);
return QueryResult.builder()
.question(question)
.generatedSQL(generatedSQL)
.rawResults(rawResults)
.answer(answer)
.build();
}
private String cleanSQL(String sql) {
// 去除Markdown代码块标记
return sql.replaceAll("```sql", "")
.replaceAll("```", "")
.trim();
}
private boolean isSafeSQL(String sql) {
String upperSQL = sql.toUpperCase().trim();
// 只允许SELECT语句
if (!upperSQL.startsWith("SELECT")) return false;
// 禁止危险关键词
List<String> dangerous = List.of("DROP", "DELETE", "UPDATE", "INSERT",
"TRUNCATE", "ALTER", "CREATE", "EXEC");
return dangerous.stream().noneMatch(upperSQL::contains);
}
private String generateNaturalLanguageAnswer(String question,
List<Map<String, Object>> results) {
if (results.isEmpty()) {
return "没有找到相关数据。";
}
// 限制结果数量,避免超出token限制
List<Map<String, Object>> limitedResults = results.size() > 20
? results.subList(0, 20) : results;
String prompt = String.format("""
用户问题:%s
查询结果(共%d条):
%s
请用简洁的自然语言回答用户的问题,重点突出关键信息和趋势。
""", question, results.size(), limitedResults.toString());
try {
return chatModel.generate(prompt);
} catch (Exception e) {
return "查询到" + results.size() + "条结果,请查看原始数据。";
}
}
}实际使用效果
这套系统上线后,运维团队最喜欢的功能排名:
第一是自然语言查询。以前要翻Grafana各个面板找数据,现在直接问"3号机床昨天温度最高是什么时候?那个时间点振动数据怎么样?",5秒钟出答案。
第二是健康诊断报告。每天早上值班人员打开系统,看的不是一堆图表,而是AI生成的每台设备的健康摘要和需要关注的点。整个晨会检查时间从30分钟降到了10分钟。
第三是异常关联分析。以前温度报警,运维要自己去翻振动、电流等其他指标,现在系统自动做相关性计算,"温度异常与振动强相关(0.87),建议检查冷却系统和轴承"这种结论直接给出来。
当然也有不足的地方:自然语言转SQL有时会生成错误的SQL,特别是复杂的时间范围表达。这个问题目前通过给运维培训一些标准问法模板来缓解,但从工程角度讲,还需要更好的SQL验证和修复机制。
