AI 视频生成的工程化——Sora/Kling 在企业场景的接入
AI 视频生成的工程化——Sora/Kling 在企业场景的接入
适读人群:想在企业项目里接入 AI 视频生成的工程师 | 阅读时长:约15分钟 | 核心价值:工程化接入的完整方案,加上对当前 API 稳定性的实话实说
去年我在一个内容生产项目里接了 Kling 的 API,把 AI 视频生成接入到了电商的短视频素材生产流程里。
接之前我以为会很顺,不就是调个 API 吗?结果踩了一堆坑,有些坑是视频生成本身的特殊性导致的,有些是我对这类 API 的稳定性预判不准。
这篇文章把这个经历里的工程实践写出来,重点放在异步任务管理、存储方案、失败重试,以及——我要说真话——当前视频生成 API 的稳定性到底是什么状况。
为什么视频生成 API 和普通 API 不一样
调 LLM 的文本 API,请求发出去,几秒钟到几十秒内拿到结果,这是同步交互模式。
视频生成不一样。生成一段 5 秒的视频,快则 2-3 分钟,慢则 10-15 分钟,高质量模式甚至更长。这意味着:
必须异步。 你不能让 HTTP 连接挂着等 10 分钟,客户端会超时,连接资源也耗不起。视频生成必须是"提交任务 -> 获取任务 ID -> 轮询或回调"的异步模式。
失败概率高得多。 文本生成极少失败(除非 token 超限),视频生成的失败率在 5%-15% 之间——有时候是服务器内部错误,有时候是内容审核拒绝,有时候是排队超时。重试策略是刚需,不是可选项。
存储是大头。 文本结果直接存数据库就行,视频文件动辄几十 MB,必须有专门的存储方案。
成本难以预测。 文本 API 按 token 计费,成本可以精确预测。视频 API 有的按秒计费,有的按"积分"计费,视频时长、质量选项都影响成本,必须在任务提交前做预算控制。
我用的 API:Kling 的实际体验
市面上主要的视频生成 API 有 Kling(快手旗下)、Runway、Pika、以及 Sora(目前企业 API 还没有完全开放)。
我选 Kling 主要是因为:文本转视频效果在中文场景里相对较好,API 有中文官方文档,价格相对可接受。
先说 Kling API 的基本调用流程:
import httpx
import asyncio
import time
from dataclasses import dataclass
from typing import Optional
from enum import Enum
class VideoStatus(Enum):
PENDING = "pending"
PROCESSING = "processing"
COMPLETED = "completed"
FAILED = "failed"
@dataclass
class VideoTask:
task_id: str
prompt: str
status: VideoStatus
video_url: Optional[str] = None
error_message: Optional[str] = None
created_at: float = 0
completed_at: Optional[float] = None
class KlingVideoClient:
def __init__(self, api_key: str, api_secret: str):
self.api_key = api_key
self.api_secret = api_secret
self.base_url = "https://api.klingai.com"
self.client = httpx.AsyncClient(timeout=30.0)
def _get_auth_token(self) -> str:
"""生成 JWT 鉴权 token(Kling 使用 JWT 而不是直接用 API Key)"""
import jwt
now = int(time.time())
payload = {
"iss": self.api_key,
"exp": now + 1800, # 30分钟有效期
"nbf": now - 5
}
return jwt.encode(payload, self.api_secret, algorithm="HS256")
async def submit_text_to_video(
self,
prompt: str,
duration: int = 5, # 视频时长,5 或 10 秒
mode: str = "std", # std(标准质量)或 pro(高质量)
aspect_ratio: str = "16:9"
) -> str:
"""提交文本生视频任务,返回 task_id"""
headers = {
"Authorization": f"Bearer {self._get_auth_token()}",
"Content-Type": "application/json"
}
payload = {
"model": "kling-v1",
"prompt": prompt,
"negative_prompt": "低质量,模糊,文字水印",
"cfg_scale": 0.5,
"mode": mode,
"duration": str(duration),
"aspect_ratio": aspect_ratio
}
response = await self.client.post(
f"{self.base_url}/v1/videos/text2video",
json=payload,
headers=headers
)
response.raise_for_status()
data = response.json()
if data.get("code") != 0:
raise ValueError(f"API 错误: {data.get('message')}")
return data["data"]["task_id"]
async def get_task_status(self, task_id: str) -> VideoTask:
"""查询任务状态"""
headers = {"Authorization": f"Bearer {self._get_auth_token()}"}
response = await self.client.get(
f"{self.base_url}/v1/videos/text2video/{task_id}",
headers=headers
)
response.raise_for_status()
data = response.json()
task_data = data["data"]
status_map = {
"submitted": VideoStatus.PENDING,
"processing": VideoStatus.PROCESSING,
"succeed": VideoStatus.COMPLETED,
"failed": VideoStatus.FAILED
}
task = VideoTask(
task_id=task_id,
prompt=task_data.get("task_info", {}).get("prompt", ""),
status=status_map.get(task_data["task_status"], VideoStatus.PENDING),
created_at=task_data.get("created_at", 0)
)
if task.status == VideoStatus.COMPLETED:
videos = task_data.get("task_result", {}).get("videos", [])
if videos:
task.video_url = videos[0].get("url")
task.completed_at = task_data.get("updated_at")
elif task.status == VideoStatus.FAILED:
task.error_message = task_data.get("task_status_msg", "未知错误")
return task异步任务管理:用 Celery + Redis
视频生成任务不能在请求线程里轮询,必须用异步任务队列。我用的是 Celery + Redis。
from celery import Celery
from celery.utils.log import get_task_logger
import redis
import json
import asyncio
app = Celery('video_tasks', broker='redis://localhost:6379/0')
logger = get_task_logger(__name__)
# Redis 客户端,用于存储任务状态
r = redis.Redis(host='localhost', port=6379, db=1)
@app.task(
bind=True,
max_retries=3,
default_retry_delay=60, # 失败后等待60秒重试
soft_time_limit=900, # 15分钟软超时
time_limit=1200 # 20分钟硬超时
)
def generate_video_task(self, job_id: str, prompt: str, config: dict):
"""
视频生成的 Celery 任务
包含完整的重试和状态管理逻辑
"""
try:
# 更新任务状态为处理中
update_job_status(job_id, "processing", {"message": "提交到视频生成 API"})
# 创建客户端(在同步上下文里运行异步代码)
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
kling_client = KlingVideoClient(
api_key=config['api_key'],
api_secret=config['api_secret']
)
# 提交任务
task_id = loop.run_until_complete(
kling_client.submit_text_to_video(
prompt=prompt,
duration=config.get('duration', 5),
mode=config.get('mode', 'std')
)
)
update_job_status(job_id, "processing", {"api_task_id": task_id, "message": "等待生成中"})
# 轮询等待结果
max_wait = 900 # 最长等待15分钟
interval = 15 # 每15秒查询一次
elapsed = 0
while elapsed < max_wait:
loop.run_until_complete(asyncio.sleep(interval))
elapsed += interval
video_task = loop.run_until_complete(
kling_client.get_task_status(task_id)
)
if video_task.status == VideoStatus.COMPLETED:
# 下载并上传到我们自己的 CDN
cdn_url = upload_to_cdn(job_id, video_task.video_url)
update_job_status(job_id, "completed", {
"video_url": cdn_url,
"original_url": video_task.video_url,
"generation_time": elapsed
})
logger.info(f"Job {job_id} 完成,耗时 {elapsed}s")
return {"job_id": job_id, "status": "completed", "url": cdn_url}
elif video_task.status == VideoStatus.FAILED:
raise ValueError(f"视频生成失败: {video_task.error_message}")
# PENDING 或 PROCESSING 继续等待
update_job_status(job_id, "processing", {
"message": f"生成中,已等待 {elapsed}s"
})
# 超时
raise TimeoutError(f"视频生成超时({max_wait}s)")
except (ValueError, TimeoutError) as exc:
# 这类错误不重试(内容问题或超时),直接标记失败
update_job_status(job_id, "failed", {"error": str(exc)})
raise
except Exception as exc:
# 其他错误(网络问题、API 错误)才重试
logger.warning(f"Job {job_id} 失败,准备重试: {exc}")
update_job_status(job_id, "pending", {"message": f"重试中(第{self.request.retries+1}次)"})
raise self.retry(exc=exc)
def update_job_status(job_id: str, status: str, extra: dict = None):
"""更新 Redis 中的任务状态"""
data = {"status": status, "updated_at": time.time()}
if extra:
data.update(extra)
r.setex(f"video_job:{job_id}", 86400 * 7, json.dumps(data)) # 保留7天视频存储和 CDN 方案
第三方 API 生成的视频 URL 有有效期,通常 24-72 小时就失效了。必须及时下载并存到自己的存储。
import boto3
import httpx
import uuid
def upload_to_cdn(job_id: str, source_url: str) -> str:
"""
下载视频并上传到自有 OSS,返回 CDN URL
"""
s3 = boto3.client(
's3',
endpoint_url='https://your-oss-endpoint.com',
aws_access_key_id='your-key',
aws_secret_access_key='your-secret'
)
# 下载视频(加超时和重试)
with httpx.Client(timeout=120.0) as client:
response = client.get(source_url)
response.raise_for_status()
video_bytes = response.content
# 生成存储路径
storage_key = f"ai-videos/{job_id}/output.mp4"
# 上传到 OSS
s3.put_object(
Bucket='your-video-bucket',
Key=storage_key,
Body=video_bytes,
ContentType='video/mp4',
CacheControl='max-age=31536000' # CDN 缓存1年
)
# 返回 CDN URL
cdn_url = f"https://cdn.yourdomain.com/{storage_key}"
return cdn_url稳定性:说真话
好,说点真话。
Kling API 在我用的这几个月里,稳定性不算特别理想。主要问题:
高峰时段排队严重。 工作日下午3-6点,任务提交后排队时间可以长达 20-30 分钟,比通常的 3-8 分钟长很多。我们的方案是设置动态超时——低峰时期 15 分钟超时,高峰时期放到 30 分钟,同时在前端提示用户预期等待时间。
内容审核不透明。 有时候一个 prompt 被拒,错误信息只有"内容违规",但完全不知道哪里违规了。同样的 prompt 改一两个字,又能过。这对批量化生产很头痛,我们后来在提交前先用 LLM 做一次"预审",把明显有风险的 prompt 先过滤掉。
偶发的无声失败。 最诡异的问题:有时候任务状态显示 succeed,但视频 URL 访问返回 404。出现频率约 1%-2%,我们加了一个验证步骤:收到完成状态后,先下载验证视频文件可访问、格式正确,再标记任务完成。
def verify_video_accessible(url: str) -> bool:
"""验证视频文件可访问且格式正确"""
try:
with httpx.Client(timeout=30.0) as client:
# 用 HEAD 请求验证,不下载全文件
response = client.head(url, follow_redirects=True)
if response.status_code != 200:
return False
content_type = response.headers.get('content-type', '')
content_length = int(response.headers.get('content-length', 0))
# 检查是否是视频文件,且大小合理(> 100KB)
is_video = 'video' in content_type
is_not_empty = content_length > 100 * 1024
return is_video and is_not_empty
except Exception:
return FalseSora 的状态。 截至我写这篇文章时,Sora 的企业 API 开放非常有限,排队时间漫长,不建议现阶段把它纳入生产依赖。想用的话,作为备用方案可以,别指望它成为主链路。
成本控制:必须做的事
视频生成是所有 AI 功能里最贵的,不做成本控制会翻车。
我们的做法:
预算上限。 每个客户账号设置每月视频生成上限(按秒计)。超了就不让提交,提示客户升级套餐。
质量分级。 预览阶段用 std 模式(便宜),用户确认后再用 pro 模式生成最终版本。价格差大约是 3-5 倍,很多用户 std 效果就满意了。
失败不计费? 要看 API 的计费规则。Kling 的规则是生成失败通常不扣费,但要确认,别因为大量失败重试把预算烧掉。
AI 视频生成这个方向是真实的机会,但现阶段 API 的成熟度还在提升中,工程上要比接文本 API 做更多防御性设计。能把这套流程跑稳的团队,在客户眼里就已经有壁垒了。
