Files
iot-device-management-service/app/routers/yudao_aiot_storage.py
16337 6cf1524013 feat(aiot): 告警三表结构升级 + 腾讯云COS对象存储集成
1. 新增三表结构: alarm_event(主表), alarm_event_ext(算法扩展), alarm_llm_analysis(大模型分析)
2. 新增 AlarmEventService 服务,支持 MQTT/HTTP 双路创建告警
3. MQTT handler 双写新旧表,平滑过渡
4. 重写 yudao_aiot_alarm 路由,对接新告警服务
5. 集成腾讯云 COS 对象存储:上传、预签名URL、STS临时凭证
6. 新增 storage 路由:upload/presign/upload-url/sts 四个接口
7. COS 未启用时自动降级本地 uploads/ 目录存储
8. 新增数据迁移脚本 migrate_to_alarm_event.py
9. 删除根目录 main.py(非项目入口)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-09 17:47:35 +08:00

138 lines
4.6 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
AIoT 文件存储路由 - COS 对象存储接口
API 路径规范:
- POST /admin-api/aiot/storage/upload - 后端中转上传
- GET /admin-api/aiot/storage/presign - 获取预签名下载 URL
- GET /admin-api/aiot/storage/sts - 获取 STS 临时凭证(前端直传)
- GET /admin-api/aiot/storage/upload-url - 获取预签名上传 URL前端直传
"""
import os
from fastapi import APIRouter, Query, Depends, HTTPException, UploadFile, File
from app.yudao_compat import YudaoResponse, get_current_user
from app.services.oss_storage import get_oss_storage, COSStorage, _generate_object_key
router = APIRouter(prefix="/admin-api/aiot/storage", tags=["AIoT-文件存储"])
@router.post("/upload")
async def upload_file(
file: UploadFile = File(..., description="上传文件"),
prefix: str = Query("alerts", description="存储路径前缀"),
current_user: dict = Depends(get_current_user),
):
"""
后端中转上传
文件经过后端写入 COS / 本地,返回 object_key。
适用于服务端需要对文件做校验或处理的场景。
"""
storage = get_oss_storage()
# 文件大小检查(限制 20MB
content = await file.read()
if len(content) > 20 * 1024 * 1024:
raise HTTPException(status_code=400, detail="文件大小超过 20MB 限制")
# 文件类型检查
allowed_types = {".jpg", ".jpeg", ".png", ".gif", ".bmp", ".webp", ".mp4", ".mov", ".pdf"}
_, ext = os.path.splitext(file.filename or "unknown.jpg")
ext = ext.lower()
if ext not in allowed_types:
raise HTTPException(status_code=400, detail=f"不支持的文件类型: {ext}")
# 确定 content_type
content_type = file.content_type or "application/octet-stream"
# 生成 object_key
object_key = _generate_object_key(prefix=prefix, ext=ext)
# 上传
result_key = storage.upload_file(content, object_key, content_type)
return YudaoResponse.success({
"objectKey": result_key,
"filename": file.filename,
"size": len(content),
"contentType": content_type,
})
@router.get("/presign")
async def get_presigned_download_url(
objectKey: str = Query(..., description="对象 Key"),
expire: int = Query(1800, description="有效期(秒)"),
current_user: dict = Depends(get_current_user),
):
"""
获取预签名下载 URL
前端查看告警截图时调用此接口,拿到临时 URL 后直接访问 COS。
URL 过期后失效,防止泄露。
"""
storage = get_oss_storage()
url = storage.get_presigned_url(objectKey, expire)
return YudaoResponse.success({
"url": url,
"expire": expire,
})
@router.get("/upload-url")
async def get_presigned_upload_url(
prefix: str = Query("alerts", description="存储路径前缀"),
ext: str = Query(".jpg", description="文件扩展名"),
expire: int = Query(1800, description="有效期(秒)"),
current_user: dict = Depends(get_current_user),
):
"""
获取预签名上传 URL
前端拿到此 URL 后,直接 PUT 文件到 COS无需经过后端中转。
适用于大文件或高频上传场景。
"""
storage = get_oss_storage()
if not storage.is_cos_mode:
raise HTTPException(status_code=400, detail="COS 未启用,请使用 /upload 接口")
object_key = _generate_object_key(prefix=prefix, ext=ext)
url = storage.get_presigned_upload_url(object_key, expire)
if not url:
raise HTTPException(status_code=500, detail="生成上传 URL 失败")
return YudaoResponse.success({
"uploadUrl": url,
"objectKey": object_key,
"expire": expire,
})
@router.get("/sts")
async def get_sts_credential(
prefix: str = Query("alerts", description="允许上传的路径前缀"),
current_user: dict = Depends(get_current_user),
):
"""
获取 STS 临时凭证
前端使用 COS JS SDK 直传时,先调用此接口获取临时密钥。
返回的 tmpSecretId / tmpSecretKey / sessionToken 用于初始化 SDK。
凭证仅允许向指定前缀上传,不可读取或删除其他路径。
"""
storage = get_oss_storage()
if not storage.is_cos_mode:
raise HTTPException(status_code=400, detail="COS 未启用,无法签发 STS 凭证")
allow_prefix = f"{prefix}/*"
credential = storage.get_sts_credential(allow_prefix)
if not credential:
raise HTTPException(status_code=500, detail="STS 凭证签发失败,请检查 COS 配置或安装 qcloud-python-sts")
return YudaoResponse.success(credential)