跳到主要内容

AOVIS AI 推送 Pipeline 生产实施规划

本文档面向所有 AI 工具(Claude Code / DeepSeek / Gemini / OpenCode / Codex 等)。 从这里接手 AOVIS 的 AI 推送相关研发任务前必读。

架构 source of truth: docs/aws-ai-architecture.md — 任何与本文档冲突以那份为准 选型依据: /Users/guorui/Projects/cloud-ai-api-test/reports/comparison_20260517.md (commit 7264056)


0. 一句话总结

AOVIS 推送分两种独立链路,都依赖共享的 PushToken + CloudEvent + endpoint-based SNS 三件套;先做 Sprint 1(实时分类推送,1 周端到端打通),再做 Sprint 2(Daily Summary,1 周),最后做 Sprint 3(边端 SoC 缩略图协议对齐,跨团队)。


1. 两种推送的定义

Type 1:实时分类推送(low-latency event push)

触发:边端 SoC 检测到事件(人/车/宠物/包裹) 延迟目标:<2 秒(从设备触发到用户手机收到 push) 内容:单条事件分类提示

🚗 Vehicle detected at Front Gate
2:14 PM · Tap to view

模型us.amazon.nova-lite-v1:0事件缩略图 JPEG(不看视频) 成本:~$0.00013/event(1 张缩略图);600k events/月 ≈ $79/月(1000 订阅)

Type 2:Daily Summary 推送(rolling 24h aggregation)

触发:每天用户本地时区 8 AM(EventBridge Scheduler / Vercel Cron) 延迟目标:分钟级(非实时) 内容:过去 24h 事件汇总 narrative

AOVIS · Daily Summary
Front Gate · Mar 18, 2026

8 events yesterday: 3 vehicles, 4 persons, 1 e-bike
Highlights:
10:20 AM A dark SUV entered the driveway and stopped
4:02 PM A person on an e-bike passed by
Tap to review →

模型us.amazon.nova-lite-v1:0过去 24h 的 N 条 CloudEvent.summary → 二次聚合 narrative 成本:~$0.0005/user/day(30 条事件聚合);1000 用户 × 30 天 ≈ $15/月


2. 当前真实状态(基于 2026-05-17 源码审查)

Type 1:实时分类推送

状态位置
IoT 事件 webhook(接收 + HMAC + dedup + 写 IotEvent 表 + trigger worker)✅ 实现app/api/internal/aws/iot-event/route.ts
IotEvent.processed 字段 + worker 消费✅ 实现prisma/schema.prisma model IotEvent + lib/aws/event-push-worker.ts
Nova Lite 看缩略图分类代码✅ 实现lib/aws/bedrock-thumbnail.tsus.amazon.nova-lite-v1:0 messages-v1)
边端 SoC 抓缩略图上传协议❌ 跨团队 Sprint 3固件/S3 上传协议待对齐;worker 已用 payload.thumbnail_s3_key 约定
PushToken 表✅ 实现prisma/schema.prisma model PushToken
Endpoint-based SNS✅ 实现lib/aws/sns.ts + scripts/deploy/setup-sns-platform-apps.sh
Push token 注册 API✅ 实现app/api/internal/push/register/route.ts

Type 2:Daily Summary

状态位置
/api/ai/summary 手动触发单事件分析✅ 实现(同步调用,无持久化)app/api/ai/summary/route.ts
OpenSearch indexEvent✅ 实现lib/aws/opensearch.ts
Cron infra 模式✅ 可复用app/api/cron/eiotclub-reconcile/route.ts(Bearer token + nodejs runtime)
CloudEvent 表✅ 已实现(D2)prisma/schema.prisma model CloudEvent
DailySummary 表❌ 缺失同上
IoT 事件 → 自动跑 analyzeVideo → 写 CloudEvent❌ 缺失待新建 background worker
按用户时区每日触发的 cron❌ 缺失待新建 app/api/cron/daily-summary/route.ts
24h 事件聚合二次摘要❌ 缺失待新建 lib/aws/bedrock-daily.ts

3. 共享前置依赖

共享缺口Type 1 用Type 2 用工作量
PushToken 表 + 注册 API半天
CloudEvent✅(写 AI 分类结果)✅(24h 聚合数据源)半天
SNS Topic-based → Endpoint-based 改造1-2 天
lib/aws/bedrock.ts 拆分(视频 / 缩略图 / daily aggregator)半天

这四项是 Type 1 + Type 2 的公共基础,必须先做。


4. Sprint 1:Type 1 端到端打通(1 周)

目标

用户手机收到来自真实 IoT 事件的实时分类推送("Vehicle detected at Front Gate")。

任务清单

D1:Prisma — 加 PushToken 表 + 注册 API(半天)

新增 Prisma modelprisma/schema.prisma):

model PushToken {
id String @id @default(cuid())
userId String
user User @relation(fields: [userId], references: [id], onDelete: Cascade)
deviceId String? // 可选:绑定到特定 IPC 设备
platform String // "ios" | "android"
token String // APNS device token / FCM registration token
endpointArn String? // SNS Platform Endpoint ARN(创建后回填)
locale String? @default("en")
timezone String? @default("America/New_York")
isActive Boolean @default(true)
lastUsedAt DateTime?
createdAt DateTime @default(now())
updatedAt DateTime @updatedAt

@@unique([userId, platform, token])
@@index([userId, isActive])
}

User model 加反向关系 pushTokens PushToken[]

新增 API: POST /api/internal/push/register/route.ts

  • 鉴权: App Bearer Token(沿用 lib/aws/device-service-access.ts 模式)
  • Body: { platform, token, deviceId?, locale?, timezone? }
  • 行为: upsert PushToken + 调 createPlatformEndpoint 拿 ARN 回填

Definition of Done:

  • Prisma migration 跑通
  • 单测验证 upsert 逻辑
  • App 可调用 register API 成功返回 { success: true, push_token_id }

D2:Prisma — 加 CloudEvent 表(半天)

新增 Prisma model

model CloudEvent {
id String @id @default(cuid())
deviceId String
device Device @relation(fields: [deviceId], references: [id], onDelete: Cascade)
aoviseDeviceId String
eventId String @unique // 设备生成的事件 ID
iotEventId String? @unique // 关联 IotEvent.id (来源追溯)

s3VideoKey String? // clips/{aoviseDeviceId}/{eventId}.mp4(可空,链路 A 不一定有视频)
s3ThumbnailKey String? // thumbnails/{aoviseDeviceId}/{eventId}.jpg

startTime DateTime
endTime DateTime?
durationSec Int?

status String @default("uploaded")
// uploaded → classifying → classified | failed

// 链路 A 分类结果(Nova Lite 看缩略图)
classifyEventType String? // person | vehicle | pet | package | other
classifyConfidence Float?
classifySummaryShort String? // <=80 chars push 用

// 链路 B 摘要结果(Nova Lite 看视频,按需触发)
videoSummary Json? // { summary, eventType, tags, confidence } 来自 analyzeVideo
videoAnalyzedAt DateTime?

// 推送状态
pushSent Boolean @default(false)
pushSentAt DateTime?

// 索引状态
indexedAt DateTime? // OpenSearch indexed time

createdAt DateTime @default(now())
updatedAt DateTime @updatedAt

@@index([deviceId, startTime])
@@index([status, createdAt])
@@index([aoviseDeviceId, eventId])
@@index([pushSent, createdAt])
}

Device model 加反向 cloudEvents CloudEvent[]

Definition of Done: migration 通过,不需要 API(这一步只是表结构)


D3:lib/aws/bedrock-thumbnail.ts — Nova Lite 看缩略图(半天)

新增文件: lib/aws/bedrock-thumbnail.ts

接口签名:

export interface ThumbnailClassificationResult {
eventType: "person" | "vehicle" | "pet" | "package" | "other";
confidence: number;
summary: string; // <=80 chars English, push-notification style
rawJson: string;
usage: { inputTokens: number; outputTokens: number };
durationMs: number;
}

/**
* Classify event thumbnails using Bedrock Nova Lite.
* Input: 1-3 JPEG thumbnails (bytes)
* Output: structured classification + English summary suitable for push
*
* Model: us.amazon.nova-lite-v1:0 (cross-region inference profile, MUST use us. prefix)
* Verified 2026-05-17: ~$0.00013/event with 1 thumbnail, \<2s latency
*/
export async function classifyThumbnails(
thumbnails: Buffer[],
options?: { region?: string }
): Promise<ThumbnailClassificationResult>;

实现要点:

  • Port from /Users/guorui/Projects/cloud-ai-api-test/src/bedrock/nova_lite_thumbnail.py(测试床已 verified 的 prompt + schema)
  • 用 messages-v1 schema,content 数组含 N 张 image (base64) + 1 段 text
  • system prompt 严格约束 JSON 输出格式(参考测试床 prompt)
  • 调用 os.environ.pop('AWS_BEARER_TOKEN_BEDROCK') 等价的 Node.js 处理:delete process.env.AWS_BEARER_TOKEN_BEDROCK(兜底 Bedrock 鉴权陷阱,见 memory)
  • model id 硬编码 "us.amazon.nova-lite-v1:0" 或读 env.bedrockThumbnailModelId

Definition of Done:

  • 单测:传一张本地 fixture JPEG,返回 valid ThumbnailClassificationResult
  • 集成测试:从 S3 拉一张缩略图分类,<2s 完成

D4:lib/aws/sns.ts 改造 — Topic → Platform Endpoint(1-2 天)

重写lib/aws/sns.ts(保留向后兼容的 Topic-based sendPushNotification,新增 endpoint-based 函数)

新增接口签名:

/** 为新设备 token 注册 SNS Platform Endpoint,返回 EndpointArn */
export async function createPlatformEndpoint(
platform: "ios" | "android",
token: string,
customUserData?: string
): Promise<string>;

/** 删除失效的 endpoint(清理 token) */
export async function deletePlatformEndpoint(endpointArn: string): Promise<void>;

/** 向单个用户的设备 endpoint 推送 */
export async function sendPushToEndpoint(
endpointArn: string,
payload: {
title: string;
body: string;
deepLink?: string; // aovis://event/{eventId} 等
data?: Record<string, string>;
}
): Promise<{ messageId: string } | { failed: "endpoint_disabled" | "throttled" | "unknown"; error: Error }>;

配置依赖

  • 新增 env: SNS_APNS_PLATFORM_ARN / SNS_FCM_PLATFORM_ARN
  • 运行自动化脚本创建:bash scripts/deploy/setup-sns-platform-apps.sh both
    • 幂等:平台应用已存在时跳过,直接输出 ARN
    • 输出两行 SNS_APNS_PLATFORM_ARN=... / SNS_FCM_PLATFORM_ARN=...,追加到 .env.production
    • APNS 使用 Apple p8 key(PlatformPrincipal=key ID, PlatformCredential=p8 content, ApplePlatformTeamID, ApplePlatformBundleID)
    • Production → platform APNS;Sandbox → platform APNS_SANDBOX
    • FCM 优先使用 HTTP v1 service account JSON(--service-json-file);保留 legacy --server-key 兼容
    • 所有 aws sns 命令显式传 --region(默认 us-east-1,支持 SNS_AWS_REGION / AWS_REGION 覆盖)
    • 运行时凭据不应存放在仓库内;参见 scripts/deploy/setup-sns-platform-apps.sh 顶部注释了解 env var 用法

失效处理

  • SNS Publish 返回 EndpointDisabled → 标记 PushToken isActive=false(不删,留作 audit)
  • 客户端定期重新调 register API 刷新 token

Definition of Done:

  • 单测:mock SNSClient 验证 payload format(APNS / GCM 双格式)
  • Staging 集成测试:用一台真机 push token 收到推送

D5:IoT 事件 → 异步分类 + push 链路打通(1-2 天)

改造app/api/internal/aws/iot-event/route.ts(保留现有 dedup + 写 IotEvent 表逻辑,加 background hook)

伪流程:

// 现有逻辑:写 IotEvent 表(保留)
const iotEvent = await prisma.iotEvent.create(...);

// 新增:判断是否需要触发 AI 分类
if (payload.event_type === "motion" || payload.event_type === "alarm") {
// fire-and-forget background worker
void processEventForPush(iotEvent.id, payload).catch(err => {
console.error("[iot-event] push processing failed", err);
});
}

新增 worker 函数 lib/aws/event-push-worker.ts:

export async function processEventForPush(iotEventId: string, payload: IotEventPayload) {
// 1. 从 payload 取缩略图 S3 key(约定字段:payload.thumbnail_s3_key)
// 2. S3 GetObject 拿 JPEG bytes
// 3. 调 classifyThumbnails
// 4. 写 CloudEvent 表(status=classified, classifyEventType, classifySummaryShort)
// 5. 找设备所属 user 的 active PushToken
// 6. 对每个 token 调 sendPushToEndpoint
// 7. 更新 CloudEvent.pushSent=true
// 8. 更新 IotEvent.processed=true
}

Definition of Done:

  • Staging 环境用 mock IoT 事件(含 fixture 缩略图 S3 key)触发 → 真机收到 push
  • CloudEvent 表有完整记录
  • IotEvent.processed=true
  • 失败场景(缩略图缺失 / Nova 调用失败 / SNS 失败)都有清晰 log + audit 写入

Sprint 1 验收清单

  • pnpm prisma migrate dev 跑通新 schema(PushToken + CloudEvent)
  • App 可调 /api/internal/push/register 注册 token
  • Staging 推一条 mock IoT motion 事件 → 用户手机 1-3 秒收到 "Vehicle detected" push
  • CloudEvent 表数据完整
  • npm run build 通过
  • 文档更新:本文档 §2 状态表更新,已完成项打 ✅

Sprint 1 当前完成边界 / Staging 手动验证

代码闭环已完成: IoT webhook → processEventForPush → S3 下载缩略图 → Nova Lite thumbnail classification → CloudEvent 写入 → SNS endpoint push

真实端到端 push 仍依赖以下前提

  1. SNS_APNS_PLATFORM_ARN / SNS_FCM_PLATFORM_ARN 已配置(Apple p8 key + Firebase HTTP v1 凭据)
  2. App 已调用 POST /api/internal/push/register 注册真实 APNS/FCM push token 并回填 endpointArn
  3. IoT 事件 payload.thumbnail_s3_key 指向 S3 上真实 JPEG(320×240 缩略图)
  4. AWS credentials / Bedrock Nova Lite / S3 / SNS 权限可用(IAM role + region us-east-1)
  5. staging 域名(如 https://staging.aovis.app)可访问

Staging 手动验证命令(使用占位符,不含真实 secret):

# 1. 准备测试缩略图(fixtures/ 目录已有 .gitkeep,放入 JPEG 即可)
aws s3 cp ./fixtures/thumbnail-test.jpg \
s3://aovis-video-storage/thumbnails/test-device/test-event.jpg \
--region us-east-1

# 2. 构造 HMAC 签名并发送 mock IoT event
IOT_EVENT_WEBHOOK_SECRET="<your-secret>"
BODY='{"thing_name":"<aws-iot-thing-name>","event_type":"motion","timestamp":'$(date +%s)',"event_id":"test-event","thumbnail_s3_key":"thumbnails/test-device/test-event.jpg"}'
SIG=$(printf '%s' "$BODY" | openssl dgst -sha256 -hmac "$IOT_EVENT_WEBHOOK_SECRET" -hex | awk '{print $2}')
curl -X POST https://<staging-domain>/api/internal/aws/iot-event \
-H "content-type: application/json" \
-H "x-iot-signature: $SIG" \
--data "$BODY"

# 3. 验证 CloudEvent 记录
# 连接 staging DB 查询:
# SELECT id, status, classify_event_type, push_sent FROM "CloudEvent"
# WHERE event_id = 'test-event' ORDER BY created_at DESC;

验证成功标准

  • webhook 返回 { "received": true }(触发 worker)
  • CloudEvent 表出现一条 status=classified + classify_event_type 非空 + push_sent=true 的记录
  • 目标手机 1-3 秒内收到 push notification("Vehicle detected" 或 "Person detected")

如果 push 未到达,依次排查:

  1. DB 确认 CloudEvent 已创建且 status≠failed
  2. endpointArn 是否非空:SELECT endpoint_arn FROM "PushToken" WHERE user_id = '<user>'
  3. SNS_APNS_PLATFORM_ARN 对应的 SNS Platform Application 是否已用正确 Apple p8 key 创建
  4. APNS/FCM device token 是否已过期

生产/测试验证归档docs/deploy-archive-2026-05-17-ai-push-sprint1.md


5. Sprint 2:Daily Summary(1 周)

目标

每天用户本地时区 8 AM 收到过去 24h 事件汇总 push,点击进入 App 看详情页。

前置

  • Sprint 1 已完成(CloudEvent 表里有 ≥24h 真实数据)

任务清单

D6:Prisma — 加 DailySummary 表(半天)

model DailySummary {
id String @id @default(cuid())
deviceId String
device Device @relation(fields: [deviceId], references: [id], onDelete: Cascade)
userId String
user User @relation(fields: [userId], references: [id], onDelete: Cascade)

date DateTime @db.Date
timezone String @default("America/New_York")

eventCount Int @default(0)
eventTypes String[] // ["person", "vehicle", ...]
keyEvents Json[] // [{cloudEventId, time, type, summary, thumbnailUrl}, ...]

summary String? // English narrative (140 chars push-friendly)

pushSent Boolean @default(false)
pushSentAt DateTime?

createdAt DateTime @default(now())
updatedAt DateTime @updatedAt

@@unique([deviceId, userId, date])
@@index([userId, date])
@@index([pushSent, date])
}

D7:lib/aws/bedrock-daily.ts — 24h 事件聚合摘要(半天)

export interface DailySummaryGenerationInput {
deviceName: string;
date: string; // ISO date
events: Array<{
time: string; // HH:MM
eventType: string;
summaryShort: string;
}>;
}

export interface DailySummaryGenerationResult {
narrative: string; // <=400 chars English, push 折叠后可展开
pushTitle: string; // <=50 chars
pushBody: string; // <=140 chars
keyMomentIndices: number[]; // events 数组里被选为 "key moment" 的 index
}

export async function generateDailySummary(
input: DailySummaryGenerationInput
): Promise<DailySummaryGenerationResult>;

实现:用 us.amazon.nova-lite-v1:0,prompt 输入 N 条 event short summary → 输出聚合 narrative。

D8:app/api/cron/daily-summary/route.ts(半天)

仿照 app/api/cron/eiotclub-reconcile/route.ts

  • Bearer token 鉴权(env CRON_SECRET
  • Query:?dryRun=true 仅打印不写库
  • 逻辑:
    1. 找出当前 UTC 时间对应本地时区是 8 AM ± 30min 的所有 user(用 PushToken.timezone)
    2. 对每个 (user, device) 拿过去 24h 的 CloudEvent(status=classified)
    3. generateDailySummary
    4. upsert DailySummary 表
    5. sendPushToEndpoint 推送
    6. 更新 pushSent=true

D9:调度配置(半天)

两种选项:

  • A. Vercel Cron:在 vercel.jsoncrons 字段,每小时跑一次 /api/cron/daily-summary?hour={UTC},路由内部按用户时区筛 8 AM
  • B. EventBridge Scheduler:AWS Console 配 hourly schedule 调上面这个 endpoint

推荐 A(vercel.json 维护简单,与现有 cron 体系一致;future 高并发再切 B)

D10:App 端 Daily Summary 详情页 API(半天)

GET /api/account/devices/[deviceId]/daily-summaries?start=...&end=... 返回 DailySummary 列表

Sprint 2 验收清单

  • DailySummary migration 通过
  • Staging 跑 cron ?dryRun=true 打印预期 push 内容
  • Staging 真跑:用户手机 8 AM 收到 daily summary push
  • App 详情页 API 返回正确数据
  • 时区边界测试(US East / West / Hawaii / Alaska)

6. Sprint 3:边端 SoC 缩略图协议对齐(跨团队,时间未定)

阻塞依赖

NEXA Prime 4K 样机出来后才能做。需要跟 ODM / 固件团队对齐:

  • 边端 SoC 检测到事件时,额外抓 1 张缩略图(推荐 320×240 JPEG)
  • 通过 IoT MQTT 上行 + S3 直传(用 IoT Credentials Provider 临时凭证)
  • S3 路径约定:s3://aovis-video-storage/thumbnails/{aoviseDeviceId}/{eventId}.jpg
  • 在 IoT 事件 MQTT payload 中带上 thumbnail_s3_key 字段

Sprint 3 之前的兼容方案

Sprint 1 的 worker 在 payload.thumbnail_s3_key 不存在时:

  • 选项 A:回退到 GetClip + ffmpeg 抽帧(增加 ~3-5s 延迟,违背"实时推送 <2s"目标)
  • 选项 B:跳过 Type 1 push,只走 Type 2 Daily Summary

样机就绪前生产推荐选项 B;样机就绪后切到协议直传。


7. 从测试床迁移的 artifact 对照表

测试床产物落到 aovisSprint
cloud-ai-api-test/src/bedrock/nova_lite_thumbnail.py (Python)lib/aws/bedrock-thumbnail.ts (TypeScript port)Sprint 1 D3
cloud-ai-api-test/src/bedrock/nova_video.py 的 prompt 模板替换 lib/aws/bedrock.tsanalyzeVideo() 的 system promptSprint 2 D7(顺手)
cloud-ai-api-test/src/common/cost.py 价目表新建 lib/aws/cost-tracker.ts(记录每次 InvokeModel 真实成本到 DB)Sprint 1 D3 或独立
测试床确定的 model id us.amazon.nova-lite-v1:0env.bedrockModelId 默认值 + 加 env.bedrockThumbnailModelIdSprint 1 D3
测试床 IAM policy(前缀 + 模型白名单)生产 IAM role 加上 bedrock:InvokeModel on us.amazon.nova-* resourceSprint 1 之前

8. AI 工具执行指引

接到 AOVIS AI 推送相关任务时,按以下顺序:

  1. aws-ai-architecture.md §1-§3,确认架构边界
  2. 读本文档,定位你的任务在哪个 Sprint / D 编号
  3. 看 §2 当前状态表,确认你要做的是"补缺失项"还是"修已实现项"
  4. 不要跳过 Sprint 顺序:Sprint 1 D1-D5 严格串行;Sprint 2 必须在 Sprint 1 完成后才动
  5. 每个 D 完成后更新本文档:把对应 §2 状态表的 ❌ 改成 ✅,commit
  6. 不要扩范围:本文档没列的功能(如 Smart Search 改造、套餐分层、视频压缩流水线)不在 Sprint 1/2 范围

禁止行为

  • 不要引入 Amazon Rekognition(见 aws-ai-architecture.md §3.1)
  • 不要写中文 push / UI 文案(见 §3.3)
  • 不要用 foundation model id(必须 us. 前缀,见 §3.6)
  • 不要硬编码 S3 bucket / region(用 env / config)
  • 不要在 IoT webhook 里同步等 Bedrock 调用(必须 fire-and-forget background worker)

文档版本:v1.0 (2026-05-17) 关联docs/aws-ai-architecture.md | 测试床选型报告