1. 新增三表结构: alarm_event(主表), alarm_event_ext(算法扩展), alarm_llm_analysis(大模型分析) 2. 新增 AlarmEventService 服务,支持 MQTT/HTTP 双路创建告警 3. MQTT handler 双写新旧表,平滑过渡 4. 重写 yudao_aiot_alarm 路由,对接新告警服务 5. 集成腾讯云 COS 对象存储:上传、预签名URL、STS临时凭证 6. 新增 storage 路由:upload/presign/upload-url/sts 四个接口 7. COS 未启用时自动降级本地 uploads/ 目录存储 8. 新增数据迁移脚本 migrate_to_alarm_event.py 9. 删除根目录 main.py(非项目入口) Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
347 lines
11 KiB
Python
347 lines
11 KiB
Python
"""
|
||
对象存储服务(腾讯云 COS + 本地回退)
|
||
|
||
功能:
|
||
- COS_ENABLED=true 时使用腾讯云 COS 存储
|
||
- COS_ENABLED=false 时回退到本地 uploads/ 目录
|
||
- 后端上传:服务端直接将图片/文件写入 COS
|
||
- 预签名下载:生成带鉴权的临时下载 URL
|
||
- STS 临时凭证:签发前端直传用的临时 AK/SK/Token
|
||
"""
|
||
import uuid
|
||
from datetime import datetime, timezone
|
||
from typing import Optional, Dict
|
||
from pathlib import Path
|
||
|
||
from app.config import settings
|
||
from app.utils.logger import logger
|
||
|
||
|
||
# 按需导入 COS SDK,未安装时不影响本地模式
|
||
_cos_client = None
|
||
_cos_available = False
|
||
|
||
try:
|
||
from qcloud_cos import CosConfig, CosS3Client
|
||
_cos_available = True
|
||
except ImportError:
|
||
_cos_available = False
|
||
|
||
|
||
def _get_cos_client():
|
||
"""懒加载 COS 客户端单例"""
|
||
global _cos_client
|
||
if _cos_client is not None:
|
||
return _cos_client
|
||
|
||
if not _cos_available:
|
||
logger.warning("qcloud_cos 未安装,使用本地存储模式")
|
||
return None
|
||
|
||
cfg = settings.cos
|
||
if not cfg.enabled or not cfg.secret_id or not cfg.bucket:
|
||
return None
|
||
|
||
try:
|
||
cos_config = CosConfig(
|
||
Region=cfg.region,
|
||
SecretId=cfg.secret_id,
|
||
SecretKey=cfg.secret_key,
|
||
Scheme="https",
|
||
)
|
||
_cos_client = CosS3Client(cos_config)
|
||
logger.info(f"COS 客户端初始化成功: bucket={cfg.bucket}, region={cfg.region}")
|
||
return _cos_client
|
||
except Exception as e:
|
||
logger.error(f"COS 客户端初始化失败: {e}")
|
||
return None
|
||
|
||
|
||
def _generate_object_key(prefix: str = "", ext: str = ".jpg") -> str:
|
||
"""
|
||
生成对象存储 Key
|
||
格式: {prefix}/{YYYY}/{MM}/{DD}/{YYYYMMDDHHmmss}_{uuid8}{ext}
|
||
示例: alerts/2026/02/09/20260209153000_A1B2C3D4.jpg
|
||
"""
|
||
now = datetime.now(timezone.utc)
|
||
date_path = now.strftime("%Y/%m/%d")
|
||
timestamp = now.strftime("%Y%m%d%H%M%S")
|
||
unique_id = uuid.uuid4().hex[:8].upper()
|
||
prefix = prefix.strip("/") if prefix else settings.cos.upload_prefix
|
||
return f"{prefix}/{date_path}/{timestamp}_{unique_id}{ext}"
|
||
|
||
|
||
class COSStorage:
|
||
"""
|
||
对象存储统一接口
|
||
|
||
- COS 模式:调用腾讯云 COS SDK
|
||
- 本地模式:写入 uploads/ 目录,返回相对路径
|
||
"""
|
||
|
||
def __init__(self):
|
||
self._client = None
|
||
self._use_local = True
|
||
self._init()
|
||
|
||
def _init(self):
|
||
"""初始化存储后端"""
|
||
if settings.cos.enabled:
|
||
client = _get_cos_client()
|
||
if client:
|
||
self._client = client
|
||
self._use_local = False
|
||
return
|
||
|
||
logger.info("使用本地文件存储模式")
|
||
self._use_local = True
|
||
|
||
@property
|
||
def is_cos_mode(self) -> bool:
|
||
return not self._use_local and self._client is not None
|
||
|
||
# ======================== 上传 ========================
|
||
|
||
def upload_image(self, image_data: bytes, filename: Optional[str] = None) -> str:
|
||
"""
|
||
上传图片,返回 object_key(COS 模式)或本地路径(本地模式)
|
||
|
||
数据库中存储此返回值,下载时通过 get_presigned_url() 获取临时访问地址。
|
||
"""
|
||
if self._use_local:
|
||
return self._upload_local(image_data, filename)
|
||
return self._upload_cos(image_data, filename)
|
||
|
||
def _upload_cos(self, image_data: bytes, filename: Optional[str] = None) -> str:
|
||
"""上传到 COS"""
|
||
object_key = filename or _generate_object_key(ext=".jpg")
|
||
try:
|
||
self._client.put_object(
|
||
Bucket=settings.cos.bucket,
|
||
Body=image_data,
|
||
Key=object_key,
|
||
ContentType="image/jpeg",
|
||
)
|
||
logger.info(f"COS 上传成功: {object_key}")
|
||
return object_key
|
||
except Exception as e:
|
||
logger.error(f"COS 上传失败,回退本地: {e}")
|
||
return self._upload_local(image_data, filename)
|
||
|
||
def upload_file(self, file_data: bytes, object_key: str, content_type: str = "application/octet-stream") -> str:
|
||
"""上传任意文件到 COS"""
|
||
if self._use_local:
|
||
return self._upload_local(file_data, object_key)
|
||
try:
|
||
self._client.put_object(
|
||
Bucket=settings.cos.bucket,
|
||
Body=file_data,
|
||
Key=object_key,
|
||
ContentType=content_type,
|
||
)
|
||
logger.info(f"COS 文件上传成功: {object_key}")
|
||
return object_key
|
||
except Exception as e:
|
||
logger.error(f"COS 文件上传失败: {e}")
|
||
return self._upload_local(file_data, object_key)
|
||
|
||
def _upload_local(self, data: bytes, filename: Optional[str] = None) -> str:
|
||
"""本地存储回退"""
|
||
upload_dir = Path("uploads")
|
||
if filename is None:
|
||
filename = _generate_object_key(ext=".jpg")
|
||
|
||
file_path = upload_dir / filename
|
||
file_path.parent.mkdir(parents=True, exist_ok=True)
|
||
|
||
with open(file_path, "wb") as f:
|
||
f.write(data)
|
||
|
||
local_url = f"/uploads/{filename}"
|
||
logger.info(f"本地保存: {local_url}")
|
||
return local_url
|
||
|
||
# ======================== 下载(预签名 URL) ========================
|
||
|
||
def get_presigned_url(self, object_key: str, expire: Optional[int] = None) -> str:
|
||
"""
|
||
获取预签名下载 URL
|
||
|
||
- COS 模式:生成带签名的临时 URL,过期后失效
|
||
- 本地模式:直接返回本地路径
|
||
"""
|
||
if self._use_local or object_key.startswith("/uploads/") or object_key.startswith("http"):
|
||
return object_key
|
||
|
||
expire = expire or settings.cos.presign_expire
|
||
try:
|
||
url = self._client.get_presigned_download_url(
|
||
Bucket=settings.cos.bucket,
|
||
Key=object_key,
|
||
Expired=expire,
|
||
)
|
||
return url
|
||
except Exception as e:
|
||
logger.error(f"生成预签名 URL 失败: {e}")
|
||
return object_key
|
||
|
||
def get_presigned_upload_url(self, object_key: str, expire: Optional[int] = None) -> str:
|
||
"""
|
||
获取预签名上传 URL(供前端直传)
|
||
|
||
前端拿到此 URL 后,直接 PUT 文件即可,无需经过后端中转。
|
||
"""
|
||
if self._use_local:
|
||
return ""
|
||
|
||
expire = expire or settings.cos.presign_expire
|
||
try:
|
||
url = self._client.get_presigned_url(
|
||
Method="PUT",
|
||
Bucket=settings.cos.bucket,
|
||
Key=object_key,
|
||
Expired=expire,
|
||
)
|
||
return url
|
||
except Exception as e:
|
||
logger.error(f"生成预签名上传 URL 失败: {e}")
|
||
return ""
|
||
|
||
# ======================== STS 临时凭证 ========================
|
||
|
||
def get_sts_credential(self, allow_prefix: Optional[str] = None) -> Optional[Dict]:
|
||
"""
|
||
获取 STS 临时凭证(供前端 SDK 直传)
|
||
|
||
需安装: pip install qcloud-python-sts
|
||
|
||
返回:
|
||
{
|
||
"credentials": {"tmpSecretId": ..., "tmpSecretKey": ..., "sessionToken": ...},
|
||
"expiredTime": ...,
|
||
"startTime": ...,
|
||
"bucket": ...,
|
||
"region": ...,
|
||
"allowPrefix": ...
|
||
}
|
||
"""
|
||
if self._use_local:
|
||
return None
|
||
|
||
try:
|
||
from sts.sts import Sts
|
||
|
||
cfg = settings.cos
|
||
prefix = allow_prefix or f"{cfg.upload_prefix}/*"
|
||
|
||
# 提取 appid from bucket (格式: name-appid)
|
||
parts = cfg.bucket.rsplit("-", 1)
|
||
appid = parts[1] if len(parts) == 2 else ""
|
||
|
||
sts_config = {
|
||
"duration_seconds": cfg.sts_expire,
|
||
"secret_id": cfg.secret_id,
|
||
"secret_key": cfg.secret_key,
|
||
"bucket": cfg.bucket,
|
||
"region": cfg.region,
|
||
"allow_prefix": prefix,
|
||
"allow_actions": [
|
||
"name/cos:PutObject",
|
||
"name/cos:PostObject",
|
||
"name/cos:InitiateMultipartUpload",
|
||
"name/cos:ListMultipartUploads",
|
||
"name/cos:ListParts",
|
||
"name/cos:UploadPart",
|
||
"name/cos:CompleteMultipartUpload",
|
||
],
|
||
"policy": {
|
||
"version": "2.0",
|
||
"statement": [
|
||
{
|
||
"action": [
|
||
"name/cos:PutObject",
|
||
"name/cos:PostObject",
|
||
"name/cos:InitiateMultipartUpload",
|
||
"name/cos:ListMultipartUploads",
|
||
"name/cos:ListParts",
|
||
"name/cos:UploadPart",
|
||
"name/cos:CompleteMultipartUpload",
|
||
],
|
||
"effect": "allow",
|
||
"resource": [
|
||
f"qcs::cos:{cfg.region}:uid/{appid}:{cfg.bucket}/{prefix}",
|
||
],
|
||
}
|
||
],
|
||
},
|
||
}
|
||
|
||
sts = Sts(sts_config)
|
||
response = sts.get_credential()
|
||
|
||
result = {
|
||
"credentials": response["credentials"],
|
||
"expiredTime": response["expiredTime"],
|
||
"startTime": response.get("startTime"),
|
||
"bucket": cfg.bucket,
|
||
"region": cfg.region,
|
||
"allowPrefix": prefix,
|
||
}
|
||
|
||
logger.info(f"STS 凭证签发成功, prefix={prefix}")
|
||
return result
|
||
|
||
except ImportError:
|
||
logger.error("qcloud-python-sts 未安装,无法签发 STS 凭证。请运行: pip install qcloud-python-sts")
|
||
return None
|
||
except Exception as e:
|
||
logger.error(f"STS 凭证签发失败: {e}")
|
||
return None
|
||
|
||
# ======================== 删除 ========================
|
||
|
||
def delete_object(self, object_key: str) -> bool:
|
||
"""删除对象"""
|
||
if self._use_local:
|
||
local_path = Path(object_key.lstrip("/"))
|
||
if local_path.exists():
|
||
local_path.unlink()
|
||
return True
|
||
return False
|
||
|
||
try:
|
||
self._client.delete_object(
|
||
Bucket=settings.cos.bucket,
|
||
Key=object_key,
|
||
)
|
||
logger.info(f"COS 对象已删除: {object_key}")
|
||
return True
|
||
except Exception as e:
|
||
logger.error(f"COS 删除失败: {e}")
|
||
return False
|
||
|
||
# ======================== 兼容旧接口 ========================
|
||
|
||
def get_url(self, path: str) -> str:
|
||
"""获取访问 URL(兼容旧代码调用)"""
|
||
if not path:
|
||
return ""
|
||
if path.startswith("http"):
|
||
return path
|
||
if path.startswith("/uploads/"):
|
||
return path
|
||
# COS object_key → 预签名 URL
|
||
return self.get_presigned_url(path)
|
||
|
||
|
||
# 全局单例
|
||
_cos_storage: Optional[COSStorage] = None
|
||
|
||
|
||
def get_oss_storage() -> COSStorage:
|
||
"""获取存储服务单例(保持旧函数名兼容)"""
|
||
global _cos_storage
|
||
if _cos_storage is None:
|
||
_cos_storage = COSStorage()
|
||
return _cos_storage
|