AOVIS AI 推送 Pipeline 生产实施规划
本文档面向所有 AI 工具(Claude Code / DeepSeek / Gemini / OpenCode / Codex 等)。 从这里接手 AOVIS 的 AI 推送相关研发任务前必读。
架构 source of truth:
docs/aws-ai-architecture.md— 任何与本文档冲突以那份为准 选型依据:/Users/guorui/Projects/cloud-ai-api-test/reports/comparison_20260517.md(commit 7264056)
0. 一句话总结
AOVIS 推送分两种独立链路,都依赖共享的 PushToken + CloudEvent + endpoint-based SNS 三件套;先做 Sprint 1(实时分类推送,1 周端到端打通),再做 Sprint 2(Daily Summary,1 周),最后做 Sprint 3(边端 SoC 缩略图协议对齐,跨团队)。
1. 两种推送的定义
Type 1:实时分类推送(low-latency event push)
触发:边端 SoC 检测到事件(人/车/宠物/包裹) 延迟目标:<2 秒(从设备触发到用户手机收到 push) 内容:单条事件分类提示
🚗 Vehicle detected at Front Gate
2:14 PM · Tap to view
模型:us.amazon.nova-lite-v1:0 看事件缩略图 JPEG(不看视频)
成本:~$0.00013/event(1 张缩略图);600k events/月 ≈ $79/月(1000 订阅)
Type 2:Daily Summary 推送(rolling 24h aggregation)
触发:每天用户本地时区 8 AM(EventBridge Scheduler / Vercel Cron) 延迟目标:分钟级(非实时) 内容:过去 24h 事件汇总 narrative
AOVIS · Daily Summary
Front Gate · Mar 18, 2026
8 events yesterday: 3 vehicles, 4 persons, 1 e-bike
Highlights:
10:20 AM A dark SUV entered the driveway and stopped
4:02 PM A person on an e-bike passed by
Tap to review →
模型:us.amazon.nova-lite-v1:0 看过去 24h 的 N 条 CloudEvent.summary → 二次聚合 narrative
成本:~$0.0005/user/day(30 条事件聚合);1000 用户 × 30 天 ≈ $15/月
2. 当前真实状态(基于 2026-05-17 源码审查)
Type 1:实时分类推送
| 项 | 状态 | 位置 |
|---|---|---|
| IoT 事件 webhook(接收 + HMAC + dedup + 写 IotEvent 表 + trigger worker) | ✅ 实现 | app/api/internal/aws/iot-event/route.ts |
IotEvent.processed 字段 + worker 消费 | ✅ 实现 | prisma/schema.prisma model IotEvent + lib/aws/event-push-worker.ts |
| Nova Lite 看缩略图分类代码 | ✅ 实现 | lib/aws/bedrock-thumbnail.ts(us.amazon.nova-lite-v1:0 messages-v1) |
| 边端 SoC 抓缩略图上传协议 | ❌ 跨团队 Sprint 3 | 固件/S3 上传协议待对齐;worker 已用 payload.thumbnail_s3_key 约定 |
| PushToken 表 | ✅ 实现 | prisma/schema.prisma model PushToken |
| Endpoint-based SNS | ✅ 实现 | lib/aws/sns.ts + scripts/deploy/setup-sns-platform-apps.sh |
| Push token 注册 API | ✅ 实现 | app/api/internal/push/register/route.ts |
Type 2:Daily Summary
| 项 | 状态 | 位置 |
|---|---|---|
/api/ai/summary 手动触发单事件分析 | ✅ 实现(同步调用,无持久化) | app/api/ai/summary/route.ts |
| OpenSearch indexEvent | ✅ 实现 | lib/aws/opensearch.ts |
| Cron infra 模式 | ✅ 可复用 | app/api/cron/eiotclub-reconcile/route.ts(Bearer token + nodejs runtime) |
| CloudEvent 表 | ✅ 已实现(D2) | prisma/schema.prisma model CloudEvent |
| DailySummary 表 | ❌ 缺失 | 同上 |
| IoT 事件 → 自动跑 analyzeVideo → 写 CloudEvent | ❌ 缺失 | 待新建 background worker |
| 按用户时区每日触发的 cron | ❌ 缺失 | 待新建 app/api/cron/daily-summary/route.ts |
| 24h 事件聚合二次摘要 | ❌ 缺失 | 待新建 lib/aws/bedrock-daily.ts |
3. 共享前置依赖
| 共享缺口 | Type 1 用 | Type 2 用 | 工作量 |
|---|---|---|---|
PushToken 表 + 注册 API | ✅ | ✅ | 半天 |
CloudEvent 表 | ✅(写 AI 分类结果) | ✅(24h 聚合数据源) | 半天 |
| SNS Topic-based → Endpoint-based 改造 | ✅ | ✅ | 1-2 天 |
lib/aws/bedrock.ts 拆分(视频 / 缩略图 / daily aggregator) | ✅ | ✅ | 半天 |
这四项是 Type 1 + Type 2 的公共基础,必须先做。
4. Sprint 1:Type 1 端到端打通(1 周)
目标
用户手机收到来自真实 IoT 事件的实时分类推送("Vehicle detected at Front Gate")。
任务清单
D1:Prisma — 加 PushToken 表 + 注册 API(半天)
新增 Prisma model(prisma/schema.prisma):
model PushToken {
id String @id @default(cuid())
userId String
user User @relation(fields: [userId], references: [id], onDelete: Cascade)
deviceId String? // 可选:绑定到特定 IPC 设备
platform String // "ios" | "android"
token String // APNS device token / FCM registration token
endpointArn String? // SNS Platform Endpoint ARN(创建后回填)
locale String? @default("en")
timezone String? @default("America/New_York")
isActive Boolean @default(true)
lastUsedAt DateTime?
createdAt DateTime @default(now())
updatedAt DateTime @updatedAt
@@unique([userId, platform, token])
@@index([userId, isActive])
}
User model 加反向关系 pushTokens PushToken[]。
新增 API: POST /api/internal/push/register/route.ts
- 鉴权: App Bearer Token(沿用
lib/aws/device-service-access.ts模式) - Body:
{ platform, token, deviceId?, locale?, timezone? } - 行为: upsert PushToken + 调
createPlatformEndpoint拿 ARN 回填
Definition of Done:
- Prisma migration 跑通
- 单测验证 upsert 逻辑
- App 可调用 register API 成功返回
{ success: true, push_token_id }
D2:Prisma — 加 CloudEvent 表(半天)
新增 Prisma model:
model CloudEvent {
id String @id @default(cuid())
deviceId String
device Device @relation(fields: [deviceId], references: [id], onDelete: Cascade)
aoviseDeviceId String
eventId String @unique // 设备生成的事件 ID
iotEventId String? @unique // 关联 IotEvent.id (来源追溯)
s3VideoKey String? // clips/{aoviseDeviceId}/{eventId}.mp4(可空,链路 A 不一定有视频)
s3ThumbnailKey String? // thumbnails/{aoviseDeviceId}/{eventId}.jpg
startTime DateTime
endTime DateTime?
durationSec Int?
status String @default("uploaded")
// uploaded → classifying → classified | failed
// 链路 A 分类结果(Nova Lite 看缩略图)
classifyEventType String? // person | vehicle | pet | package | other
classifyConfidence Float?
classifySummaryShort String? // <=80 chars push 用
// 链路 B 摘要结果(Nova Lite 看视频,按需触发)
videoSummary Json? // { summary, eventType, tags, confidence } 来自 analyzeVideo
videoAnalyzedAt DateTime?
// 推送状态
pushSent Boolean @default(false)
pushSentAt DateTime?
// 索引状态
indexedAt DateTime? // OpenSearch indexed time
createdAt DateTime @default(now())
updatedAt DateTime @updatedAt
@@index([deviceId, startTime])
@@index([status, createdAt])
@@index([aoviseDeviceId, eventId])
@@index([pushSent, createdAt])
}
Device model 加反向 cloudEvents CloudEvent[]。
Definition of Done: migration 通过,不需要 API(这一步只是表结构)
D3:lib/aws/bedrock-thumbnail.ts — Nova Lite 看缩略图(半天)
新增文件: lib/aws/bedrock-thumbnail.ts
接口签名:
export interface ThumbnailClassificationResult {
eventType: "person" | "vehicle" | "pet" | "package" | "other";
confidence: number;
summary: string; // <=80 chars English, push-notification style
rawJson: string;
usage: { inputTokens: number; outputTokens: number };
durationMs: number;
}
/**
* Classify event thumbnails using Bedrock Nova Lite.
* Input: 1-3 JPEG thumbnails (bytes)
* Output: structured classification + English summary suitable for push
*
* Model: us.amazon.nova-lite-v1:0 (cross-region inference profile, MUST use us. prefix)
* Verified 2026-05-17: ~$0.00013/event with 1 thumbnail, \<2s latency
*/
export async function classifyThumbnails(
thumbnails: Buffer[],
options?: { region?: string }
): Promise<ThumbnailClassificationResult>;
实现要点:
- Port from
/Users/guorui/Projects/cloud-ai-api-test/src/bedrock/nova_lite_thumbnail.py(测试床已 verified 的 prompt + schema) - 用 messages-v1 schema,content 数组含 N 张 image (base64) + 1 段 text
- system prompt 严格约束 JSON 输出格式(参考测试床 prompt)
- 调用
os.environ.pop('AWS_BEARER_TOKEN_BEDROCK')等价的 Node.js 处理:delete process.env.AWS_BEARER_TOKEN_BEDROCK(兜底 Bedrock 鉴权陷阱,见 memory) - model id 硬编码
"us.amazon.nova-lite-v1:0"或读env.bedrockThumbnailModelId
Definition of Done:
- 单测:传一张本地 fixture JPEG,返回 valid
ThumbnailClassificationResult - 集成测试:从 S3 拉一张缩略图分类,<2s 完成
D4:lib/aws/sns.ts 改造 — Topic → Platform Endpoint(1-2 天)
重写:lib/aws/sns.ts(保留向后兼容的 Topic-based sendPushNotification,新增 endpoint-based 函数)
新增接口签名:
/** 为新设备 token 注册 SNS Platform Endpoint,返回 EndpointArn */
export async function createPlatformEndpoint(
platform: "ios" | "android",
token: string,
customUserData?: string
): Promise<string>;
/** 删除失效的 endpoint(清理 token) */
export async function deletePlatformEndpoint(endpointArn: string): Promise<void>;
/** 向单个用户的设备 endpoint 推送 */
export async function sendPushToEndpoint(
endpointArn: string,
payload: {
title: string;
body: string;
deepLink?: string; // aovis://event/{eventId} 等
data?: Record<string, string>;
}
): Promise<{ messageId: string } | { failed: "endpoint_disabled" | "throttled" | "unknown"; error: Error }>;
配置依赖:
- 新增 env:
SNS_APNS_PLATFORM_ARN/SNS_FCM_PLATFORM_ARN - 运行自动化脚本创建:
bash scripts/deploy/setup-sns-platform-apps.sh both- 幂等:平台应用已存在时跳过,直接输出 ARN
- 输出两行
SNS_APNS_PLATFORM_ARN=.../SNS_FCM_PLATFORM_ARN=...,追加到.env.production - APNS 使用 Apple p8 key(PlatformPrincipal=key ID, PlatformCredential=p8 content, ApplePlatformTeamID, ApplePlatformBundleID)
- Production → platform
APNS;Sandbox → platformAPNS_SANDBOX - FCM 优先使用 HTTP v1 service account JSON(
--service-json-file);保留 legacy--server-key兼容 - 所有 aws sns 命令显式传
--region(默认us-east-1,支持SNS_AWS_REGION/AWS_REGION覆盖) - 运行时凭据不应存放在仓库内;参见
scripts/deploy/setup-sns-platform-apps.sh顶部注释了解 env var 用法
失效处理:
- SNS Publish 返回
EndpointDisabled→ 标记 PushTokenisActive=false(不删,留作 audit) - 客户端定期重新调 register API 刷新 token
Definition of Done:
- 单测:mock SNSClient 验证 payload format(APNS / GCM 双格式)
- Staging 集成测试:用一台真机 push token 收到推送
D5:IoT 事件 → 异步分类 + push 链路打通(1-2 天)
改造:app/api/internal/aws/iot-event/route.ts(保留现有 dedup + 写 IotEvent 表逻辑,加 background hook)
伪流程:
// 现有逻辑:写 IotEvent 表(保留)
const iotEvent = await prisma.iotEvent.create(...);
// 新增:判断是否需要触发 AI 分类
if (payload.event_type === "motion" || payload.event_type === "alarm") {
// fire-and-forget background worker
void processEventForPush(iotEvent.id, payload).catch(err => {
console.error("[iot-event] push processing failed", err);
});
}
新增 worker 函数 lib/aws/event-push-worker.ts:
export async function processEventForPush(iotEventId: string, payload: IotEventPayload) {
// 1. 从 payload 取缩略图 S3 key(约定字段:payload.thumbnail_s3_key)
// 2. S3 GetObject 拿 JPEG bytes
// 3. 调 classifyThumbnails
// 4. 写 CloudEvent 表(status=classified, classifyEventType, classifySummaryShort)
// 5. 找设备所属 user 的 active PushToken
// 6. 对每个 token 调 sendPushToEndpoint
// 7. 更新 CloudEvent.pushSent=true
// 8. 更新 IotEvent.processed=true
}
Definition of Done:
- Staging 环境用 mock IoT 事件(含 fixture 缩略图 S3 key)触发 → 真机收到 push
- CloudEvent 表有完整记录
- IotEvent.processed=true
- 失败场景(缩略图缺失 / Nova 调用失败 / SNS 失败)都有清晰 log + audit 写入
Sprint 1 验收清单
-
pnpm prisma migrate dev跑通新 schema(PushToken + CloudEvent) - App 可调
/api/internal/push/register注册 token - Staging 推一条 mock IoT motion 事件 → 用户手机 1-3 秒收到 "Vehicle detected" push
- CloudEvent 表数据完整
-
npm run build通过 - 文档更新:本文档 §2 状态表更新,已完成项打 ✅
Sprint 1 当前完成边界 / Staging 手动验证
代码闭环已完成:
IoT webhook → processEventForPush → S3 下载缩略图 → Nova Lite thumbnail classification → CloudEvent 写入 → SNS endpoint push
真实端到端 push 仍依赖以下前提:
SNS_APNS_PLATFORM_ARN/SNS_FCM_PLATFORM_ARN已配置(Apple p8 key + Firebase HTTP v1 凭据)- App 已调用
POST /api/internal/push/register注册真实 APNS/FCM push token 并回填endpointArn - IoT 事件
payload.thumbnail_s3_key指向 S3 上真实 JPEG(320×240 缩略图) - AWS credentials / Bedrock Nova Lite / S3 / SNS 权限可用(IAM role + region us-east-1)
- staging 域名(如
https://staging.aovis.app)可访问
Staging 手动验证命令(使用占位符,不含真实 secret):
# 1. 准备测试缩略图(fixtures/ 目录已有 .gitkeep,放入 JPEG 即可)
aws s3 cp ./fixtures/thumbnail-test.jpg \
s3://aovis-video-storage/thumbnails/test-device/test-event.jpg \
--region us-east-1
# 2. 构造 HMAC 签名并发送 mock IoT event
IOT_EVENT_WEBHOOK_SECRET="<your-secret>"
BODY='{"thing_name":"<aws-iot-thing-name>","event_type":"motion","timestamp":'$(date +%s)',"event_id":"test-event","thumbnail_s3_key":"thumbnails/test-device/test-event.jpg"}'
SIG=$(printf '%s' "$BODY" | openssl dgst -sha256 -hmac "$IOT_EVENT_WEBHOOK_SECRET" -hex | awk '{print $2}')
curl -X POST https://<staging-domain>/api/internal/aws/iot-event \
-H "content-type: application/json" \
-H "x-iot-signature: $SIG" \
--data "$BODY"
# 3. 验证 CloudEvent 记录
# 连接 staging DB 查询:
# SELECT id, status, classify_event_type, push_sent FROM "CloudEvent"
# WHERE event_id = 'test-event' ORDER BY created_at DESC;
验证成功标准:
- webhook 返回
{ "received": true }(触发 worker) - CloudEvent 表出现一条
status=classified+classify_event_type非空 +push_sent=true的记录 - 目标手机 1-3 秒内收到 push notification("Vehicle detected" 或 "Person detected")
如果 push 未到达,依次排查:
- DB 确认 CloudEvent 已创建且
status≠failed endpointArn是否非空:SELECT endpoint_arn FROM "PushToken" WHERE user_id = '<user>'SNS_APNS_PLATFORM_ARN对应的 SNS Platform Application 是否已用正确 Apple p8 key 创建- APNS/FCM device token 是否已过期
生产/测试验证归档见 docs/deploy-archive-2026-05-17-ai-push-sprint1.md。
5. Sprint 2:Daily Summary(1 周)
目标
每天用户本地时区 8 AM 收到过去 24h 事件汇总 push,点击进入 App 看详情页。
前置
- Sprint 1 已完成(CloudEvent 表里有 ≥24h 真实数据)
任务清单
D6:Prisma — 加 DailySummary 表(半天)
model DailySummary {
id String @id @default(cuid())
deviceId String
device Device @relation(fields: [deviceId], references: [id], onDelete: Cascade)
userId String
user User @relation(fields: [userId], references: [id], onDelete: Cascade)
date DateTime @db.Date
timezone String @default("America/New_York")
eventCount Int @default(0)
eventTypes String[] // ["person", "vehicle", ...]
keyEvents Json[] // [{cloudEventId, time, type, summary, thumbnailUrl}, ...]
summary String? // English narrative (140 chars push-friendly)
pushSent Boolean @default(false)
pushSentAt DateTime?
createdAt DateTime @default(now())
updatedAt DateTime @updatedAt
@@unique([deviceId, userId, date])
@@index([userId, date])
@@index([pushSent, date])
}
D7:lib/aws/bedrock-daily.ts — 24h 事件聚合摘要(半天)
export interface DailySummaryGenerationInput {
deviceName: string;
date: string; // ISO date
events: Array<{
time: string; // HH:MM
eventType: string;
summaryShort: string;
}>;
}
export interface DailySummaryGenerationResult {
narrative: string; // <=400 chars English, push 折叠后可展开
pushTitle: string; // <=50 chars
pushBody: string; // <=140 chars
keyMomentIndices: number[]; // events 数组里被选为 "key moment" 的 index
}
export async function generateDailySummary(
input: DailySummaryGenerationInput
): Promise<DailySummaryGenerationResult>;
实现:用 us.amazon.nova-lite-v1:0,prompt 输入 N 条 event short summary → 输出聚合 narrative。
D8:app/api/cron/daily-summary/route.ts(半天)
仿照 app/api/cron/eiotclub-reconcile/route.ts:
- Bearer token 鉴权(env
CRON_SECRET) - Query:
?dryRun=true仅打印不写库 - 逻辑:
- 找出当前 UTC 时间对应本地时区是 8 AM ± 30min 的所有 user(用 PushToken.timezone)
- 对每个 (user, device) 拿过去 24h 的 CloudEvent(status=classified)
- 调
generateDailySummary - upsert DailySummary 表
- 调
sendPushToEndpoint推送 - 更新
pushSent=true
D9:调度配置(半天)
两种选项:
- A. Vercel Cron:在
vercel.json配crons字段,每小时跑一次/api/cron/daily-summary?hour={UTC},路由内部按用户时区筛 8 AM - B. EventBridge Scheduler:AWS Console 配 hourly schedule 调上面这个 endpoint
推荐 A(vercel.json 维护简单,与现有 cron 体系一致;future 高并发再切 B)
D10:App 端 Daily Summary 详情页 API(半天)
GET /api/account/devices/[deviceId]/daily-summaries?start=...&end=... 返回 DailySummary 列表
Sprint 2 验收清单
- DailySummary migration 通过
- Staging 跑 cron
?dryRun=true打印预期 push 内容 - Staging 真跑:用户手机 8 AM 收到 daily summary push
- App 详情页 API 返回正确数据
- 时区边界测试(US East / West / Hawaii / Alaska)
6. Sprint 3:边端 SoC 缩略图协议对齐(跨团队,时间未定)
阻塞依赖
NEXA Prime 4K 样机出来后才能做。需要跟 ODM / 固件团队对齐:
- 边端 SoC 检测到事件时,额外抓 1 张缩略图(推荐 320×240 JPEG)
- 通过 IoT MQTT 上行 + S3 直传(用 IoT Credentials Provider 临时凭证)
- S3 路径约定:
s3://aovis-video-storage/thumbnails/{aoviseDeviceId}/{eventId}.jpg - 在 IoT 事件 MQTT payload 中带上
thumbnail_s3_key字段
Sprint 3 之前的兼容方案
Sprint 1 的 worker 在 payload.thumbnail_s3_key 不存在时:
- 选项 A:回退到 GetClip + ffmpeg 抽帧(增加 ~3-5s 延迟,违背"实时推送 <2s"目标)
- 选项 B:跳过 Type 1 push,只走 Type 2 Daily Summary
样机就绪前生产推荐选项 B;样机就绪后切到协议直传。
7. 从测试床迁移的 artifact 对照表
| 测试床产物 | 落到 aovis | Sprint |
|---|---|---|
cloud-ai-api-test/src/bedrock/nova_lite_thumbnail.py (Python) | lib/aws/bedrock-thumbnail.ts (TypeScript port) | Sprint 1 D3 |
cloud-ai-api-test/src/bedrock/nova_video.py 的 prompt 模板 | 替换 lib/aws/bedrock.ts 中 analyzeVideo() 的 system prompt | Sprint 2 D7(顺手) |
cloud-ai-api-test/src/common/cost.py 价目表 | 新建 lib/aws/cost-tracker.ts(记录每次 InvokeModel 真实成本到 DB) | Sprint 1 D3 或独立 |
测试床确定的 model id us.amazon.nova-lite-v1:0 | 改 env.bedrockModelId 默认值 + 加 env.bedrockThumbnailModelId | Sprint 1 D3 |
| 测试床 IAM policy(前缀 + 模型白名单) | 生产 IAM role 加上 bedrock:InvokeModel on us.amazon.nova-* resource | Sprint 1 之前 |
8. AI 工具执行指引
接到 AOVIS AI 推送相关任务时,按以下顺序:
- 读
aws-ai-architecture.md§1-§3,确认架构边界 - 读本文档,定位你的任务在哪个 Sprint / D 编号
- 看 §2 当前状态表,确认你要做的是"补缺失项"还是"修已实现项"
- 不要跳过 Sprint 顺序:Sprint 1 D1-D5 严格串行;Sprint 2 必须在 Sprint 1 完成后才动
- 每个 D 完成后更新本文档:把对应 §2 状态表的 ❌ 改成 ✅,commit
- 不要扩范围:本文档没列的功能(如 Smart Search 改造、套餐分层、视频压缩流水线)不在 Sprint 1/2 范围
禁止行为:
- 不要引入 Amazon Rekognition(见
aws-ai-architecture.md§3.1) - 不要写中文 push / UI 文案(见 §3.3)
- 不要用 foundation model id(必须
us.前缀,见 §3.6) - 不要硬编码 S3 bucket / region(用 env / config)
- 不要在 IoT webhook 里同步等 Bedrock 调用(必须 fire-and-forget background worker)
文档版本:v1.0 (2026-05-17)
关联:docs/aws-ai-architecture.md | 测试床选型报告