From 51dbad6794487e1fc71bc19773cb9c94fa60b412 Mon Sep 17 00:00:00 2001 From: 16337 <1633794139@qq.com> Date: Mon, 2 Mar 2026 13:33:28 +0800 Subject: [PATCH] =?UTF-8?q?fix:=20=E5=91=8A=E8=AD=A6=E5=88=97=E8=A1=A8500?= =?UTF-8?q?=E9=94=99=E8=AF=AF=E4=BF=AE=E5=A4=8D=20+=20=E6=91=84=E5=83=8F?= =?UTF-8?q?=E5=A4=B4=E6=9F=A5=E8=AF=A2=E5=AE=B9=E9=94=99=20+=20COS?= =?UTF-8?q?=E8=AE=A4=E8=AF=81=E9=87=8D=E6=9E=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 添加缺失的 import httpx(修复 _notify_ops_platform NameError) - 摄像头批量查询加 try/except 容错,失败时降级使用 device_id - 摄像头查询超时从 5 秒提升到 15 秒 - COS 存储重构:支持 CVM 角色认证 + 密钥认证双模式 - STS 接口改为返回提示(CVM 模式不支持 STS) Co-Authored-By: Claude Opus 4.6 --- app/config.py | 14 +- app/routers/yudao_aiot_alarm.py | 33 ++- app/routers/yudao_aiot_storage.py | 22 +- app/services/oss_storage.py | 326 +++++++++++++----------------- 4 files changed, 171 insertions(+), 224 deletions(-) diff --git a/app/config.py b/app/config.py index af077e9..190771b 100644 --- a/app/config.py +++ b/app/config.py @@ -16,15 +16,12 @@ class DatabaseConfig: @dataclass class COSConfig: - """腾讯云 COS 存储配置""" - secret_id: str = "" - secret_key: str = "" + """腾讯云 COS 存储配置(通过 CVM 角色认证,无需密钥)""" region: str = "ap-beijing" bucket: str = "" # 格式: bucketname-appid upload_prefix: str = "alerts" # 对象 Key 前缀 presign_expire: int = 1800 # 预签名URL有效期(秒),默认30分钟 - sts_expire: int = 1800 # STS 临时凭证有效期(秒) - enabled: bool = False # 是否启用 COS(False 时使用本地存储) + enabled: bool = False # 是否启用 COS @dataclass @@ -75,7 +72,7 @@ class CameraNameConfig: name_field_priority: list = None # 查询超时(秒) - query_timeout: int = 5 + query_timeout: int = 15 def __post_init__(self): if self.name_field_priority is None: @@ -103,13 +100,10 @@ def load_settings() -> Settings: url=os.getenv("DATABASE_URL", "sqlite:///./data/alert_platform.db"), ), cos=COSConfig( - secret_id=os.getenv("COS_SECRET_ID", ""), - secret_key=os.getenv("COS_SECRET_KEY", ""), region=os.getenv("COS_REGION", "ap-beijing"), bucket=os.getenv("COS_BUCKET", ""), upload_prefix=os.getenv("COS_UPLOAD_PREFIX", "alerts"), presign_expire=int(os.getenv("COS_PRESIGN_EXPIRE", "1800")), - sts_expire=int(os.getenv("COS_STS_EXPIRE", "1800")), enabled=os.getenv("COS_ENABLED", "false").lower() == "true", ), app=AppConfig( @@ -133,7 +127,7 @@ def load_settings() -> Settings: camera_name=CameraNameConfig( wvp_api_base=os.getenv("WVP_API_BASE", "http://localhost:18080"), display_format=os.getenv("CAMERA_NAME_FORMAT", "{name}"), - query_timeout=int(os.getenv("CAMERA_QUERY_TIMEOUT", "5")), + query_timeout=int(os.getenv("CAMERA_QUERY_TIMEOUT", "15")), ), ) diff --git a/app/routers/yudao_aiot_alarm.py b/app/routers/yudao_aiot_alarm.py index e2ad5c0..6683805 100644 --- a/app/routers/yudao_aiot_alarm.py +++ b/app/routers/yudao_aiot_alarm.py @@ -16,6 +16,8 @@ from fastapi import APIRouter, Query, Depends, HTTPException from typing import Optional from datetime import datetime import asyncio +import os +import httpx from app.yudao_compat import YudaoResponse, get_current_user from app.services.alarm_event_service import get_alarm_event_service, AlarmEventService @@ -41,8 +43,8 @@ async def _alarm_to_camel(alarm_dict: dict, camera_info_map: dict = None, camera snapshot_url = alarm_dict.get("snapshot_url") if snapshot_url: if snapshot_url.startswith("local:"): - # 本地截图(COS 未配置时的回退路径) - snapshot_url = "/captures/" + snapshot_url[6:] + # 边缘端 COS 未配置时的本地路径标记,云端无法访问 + snapshot_url = "" else: snapshot_url = storage.get_url(snapshot_url) @@ -173,9 +175,13 @@ async def get_alert_page( # 提取所有唯一的 device_id device_ids = list(set(a.device_id for a in alarms if a.device_id)) - # 批量查询摄像头信息(去重+并发优化) + # 批量查询摄像头信息(去重+并发优化),失败不影响列表返回 camera_service = get_camera_name_service() - camera_info_map = await camera_service.get_camera_infos_batch(device_ids) + camera_info_map = {} + try: + camera_info_map = await camera_service.get_camera_infos_batch(device_ids) + except Exception as e: + logger.warning(f"批量查询摄像头信息失败,将使用 device_id 作为名称: {e}") # 转换为 camelCase 格式(使用摄像头信息映射) alarm_list = [await _alarm_to_camel(a.to_dict(), camera_info_map, camera_service) for a in alarms] @@ -203,13 +209,16 @@ async def get_alert( if not alarm_dict: raise HTTPException(status_code=404, detail="告警不存在") - # 查询单个摄像头信息 + # 查询单个摄像头信息(失败不影响详情返回) device_id = alarm_dict.get("device_id") camera_service = get_camera_name_service() camera_info_map = {} if device_id: - camera_info = await camera_service.get_camera_info(device_id) - camera_info_map[device_id] = camera_info + try: + camera_info = await camera_service.get_camera_info(device_id) + camera_info_map[device_id] = camera_info + except Exception as e: + logger.warning(f"查询摄像头信息失败: device_id={device_id}, error={e}") return YudaoResponse.success(await _alarm_to_camel(alarm_dict, camera_info_map, camera_service)) @@ -296,8 +305,12 @@ async def get_device_summary_page( for item in result.get("list", []): device_id = item.get("deviceId") - # 批量查询摄像头信息 - camera_info = await camera_service.get_camera_info(device_id) + # 查询摄像头信息(失败不影响列表返回) + camera_info = None + try: + camera_info = await camera_service.get_camera_info(device_id) + except Exception as e: + logger.warning(f"查询摄像头信息失败: device_id={device_id}, error={e}") # 提取摄像头名称 device_name = camera_service.format_display_name(device_id, camera_info) @@ -398,7 +411,7 @@ async def edge_alarm_resolve( # ==================== 辅助函数 ==================== -OPS_ALARM_URL = "http://192.168.0.104:48080/admin-api/ops/alarm/receive" +OPS_ALARM_URL = os.getenv("OPS_ALARM_URL", "http://192.168.0.104:48080/admin-api/ops/alarm/receive") async def _notify_ops_platform(data: dict): diff --git a/app/routers/yudao_aiot_storage.py b/app/routers/yudao_aiot_storage.py index 09a60d5..9d807c6 100644 --- a/app/routers/yudao_aiot_storage.py +++ b/app/routers/yudao_aiot_storage.py @@ -117,21 +117,9 @@ async def get_sts_credential( current_user: dict = Depends(get_current_user), ): """ - 获取 STS 临时凭证 - - 前端使用 COS JS SDK 直传时,先调用此接口获取临时密钥。 - 返回的 tmpSecretId / tmpSecretKey / sessionToken 用于初始化 SDK。 - 凭证仅允许向指定前缀上传,不可读取或删除其他路径。 + STS 凭证接口(CVM 角色模式下不可用,请使用 /upload-url 预签名上传) """ - storage = get_oss_storage() - - if not storage.is_cos_mode: - raise HTTPException(status_code=400, detail="COS 未启用,无法签发 STS 凭证") - - allow_prefix = f"{prefix}/*" - credential = storage.get_sts_credential(allow_prefix) - - if not credential: - raise HTTPException(status_code=500, detail="STS 凭证签发失败,请检查 COS 配置或安装 qcloud-python-sts") - - return YudaoResponse.success(credential) + raise HTTPException( + status_code=400, + detail="当前使用 CVM 角色认证,不支持 STS 签发。请使用 /admin-api/aiot/storage/upload-url 获取预签名上传 URL" + ) diff --git a/app/services/oss_storage.py b/app/services/oss_storage.py index c1287fc..3f54728 100644 --- a/app/services/oss_storage.py +++ b/app/services/oss_storage.py @@ -1,23 +1,22 @@ """ -对象存储服务(腾讯云 COS + 本地回退) +对象存储服务(腾讯云 COS) -功能: -- COS_ENABLED=true 时使用腾讯云 COS 存储 -- COS_ENABLED=false 时回退到本地 uploads/ 目录 -- 后端上传:服务端直接将图片/文件写入 COS -- 预签名下载:生成带鉴权的临时下载 URL -- STS 临时凭证:签发前端直传用的临时 AK/SK/Token +支持两种认证模式: +1. CVM 角色认证:通过元数据服务自动获取临时凭证(推荐,Docker 需配置 iptables 转发) +2. 密钥认证:通过 COS_SECRET_ID / COS_SECRET_KEY 环境变量(备选) """ +import os +import time import uuid from datetime import datetime, timezone -from typing import Optional, Dict +from typing import Optional from pathlib import Path from app.config import settings from app.utils.logger import logger -# 按需导入 COS SDK,未安装时不影响本地模式 +# 按需导入 COS SDK _cos_client = None _cos_available = False @@ -28,6 +27,70 @@ except ImportError: _cos_available = False +class CosStorageError(Exception): + """COS 存储操作异常""" + pass + + +class CvmRoleCredential: + """CVM 角色凭证提供者,从元数据服务自动获取和刷新临时凭证""" + + def __init__(self, role_name: str): + self._role_name = role_name + self._expired_time = 0 + self._secret_id = "" + self._secret_key = "" + self._token = "" + self._refresh() + + def _refresh(self): + """从元数据服务获取临时凭证(提前5分钟刷新)""" + if time.time() < self._expired_time - 300: + return + import requests + url = f"http://metadata.tencentyun.com/latest/meta-data/cam/security-credentials/{self._role_name}" + resp = requests.get(url, timeout=5) + resp.raise_for_status() + data = resp.json() + if data.get("Code") != "Success": + raise CosStorageError(f"获取 CVM 角色凭证失败: {data}") + self._secret_id = data["TmpSecretId"] + self._secret_key = data["TmpSecretKey"] + self._token = data["Token"] + self._expired_time = data["ExpiredTime"] + logger.info(f"CVM 角色凭证已刷新,过期时间: {datetime.fromtimestamp(self._expired_time)}") + + @property + def secret_id(self): + self._refresh() + return self._secret_id + + @property + def secret_key(self): + self._refresh() + return self._secret_key + + @property + def token(self): + self._refresh() + return self._token + + +def _discover_cvm_role() -> Optional[str]: + """自动发现 CVM 绑定的角色名""" + import requests + try: + resp = requests.get( + "http://metadata.tencentyun.com/latest/meta-data/cam/security-credentials/", + timeout=3, + ) + if resp.status_code == 200 and resp.text.strip(): + return resp.text.strip() + except Exception: + pass + return None + + def _get_cos_client(): """懒加载 COS 客户端单例""" global _cos_client @@ -35,33 +98,55 @@ def _get_cos_client(): return _cos_client if not _cos_available: - logger.warning("qcloud_cos 未安装,使用本地存储模式") - return None + raise CosStorageError("qcloud_cos 未安装,请运行: pip install cos-python-sdk-v5") cfg = settings.cos - if not cfg.enabled or not cfg.secret_id or not cfg.bucket: + if not cfg.enabled: return None + if not cfg.bucket: + raise CosStorageError("COS_BUCKET 未配置") + + secret_id = os.getenv("COS_SECRET_ID", "") + secret_key = os.getenv("COS_SECRET_KEY", "") try: - cos_config = CosConfig( - Region=cfg.region, - SecretId=cfg.secret_id, - SecretKey=cfg.secret_key, - Scheme="https", - ) + if secret_id and secret_key: + # 模式1:密钥认证 + cos_config = CosConfig( + Region=cfg.region, + SecretId=secret_id, + SecretKey=secret_key, + Scheme="https", + ) + logger.info(f"COS 客户端初始化成功 (密钥模式): bucket={cfg.bucket}") + else: + # 模式2:CVM 角色认证(通过元数据服务获取临时凭证) + role_name = _discover_cvm_role() + if not role_name: + raise CosStorageError( + "COS 认证失败: 未设置 COS_SECRET_ID/COS_SECRET_KEY," + "且无法访问 CVM 元数据服务。请检查 iptables 转发规则或配置密钥。" + ) + credential = CvmRoleCredential(role_name) + cos_config = CosConfig( + Region=cfg.region, + CredentialInstance=credential, + Scheme="https", + ) + logger.info(f"COS 客户端初始化成功 (CVM角色: {role_name}): bucket={cfg.bucket}") + _cos_client = CosS3Client(cos_config) - logger.info(f"COS 客户端初始化成功: bucket={cfg.bucket}, region={cfg.region}") return _cos_client + except CosStorageError: + raise except Exception as e: - logger.error(f"COS 客户端初始化失败: {e}") - return None + raise CosStorageError(f"COS 客户端初始化失败: {e}") def _generate_object_key(prefix: str = "", ext: str = ".jpg") -> str: """ 生成对象存储 Key 格式: {prefix}/{YYYY}/{MM}/{DD}/{YYYYMMDDHHmmss}_{uuid8}{ext} - 示例: alerts/2026/02/09/20260209153000_A1B2C3D4.jpg """ now = datetime.now(timezone.utc) date_path = now.strftime("%Y/%m/%d") @@ -73,47 +158,41 @@ def _generate_object_key(prefix: str = "", ext: str = ".jpg") -> str: class COSStorage: """ - 对象存储统一接口 + 对象存储接口(纯 COS 模式) - - COS 模式:调用腾讯云 COS SDK - - 本地模式:写入 uploads/ 目录,返回相对路径 + COS_ENABLED=true: 使用腾讯云 COS,上传失败直接报错 + COS_ENABLED=false: 未启用,调用上传方法时报错 """ def __init__(self): self._client = None - self._use_local = True + self._enabled = False self._init() def _init(self): - """初始化存储后端""" - if settings.cos.enabled: - client = _get_cos_client() - if client: - self._client = client - self._use_local = False - return + """初始化 COS 客户端""" + if not settings.cos.enabled: + logger.info("COS 未启用 (COS_ENABLED=false)") + return - logger.info("使用本地文件存储模式") - self._use_local = True + self._client = _get_cos_client() + if self._client: + self._enabled = True @property def is_cos_mode(self) -> bool: - return not self._use_local and self._client is not None + return self._enabled and self._client is not None + + def _require_cos(self): + """检查 COS 是否可用,不可用则抛出异常""" + if not self._enabled or self._client is None: + raise CosStorageError("COS 未启用或初始化失败,请检查 COS_ENABLED 和 CVM 角色绑定") # ======================== 上传 ======================== def upload_image(self, image_data: bytes, filename: Optional[str] = None) -> str: - """ - 上传图片,返回 object_key(COS 模式)或本地路径(本地模式) - - 数据库中存储此返回值,下载时通过 get_presigned_url() 获取临时访问地址。 - """ - if self._use_local: - return self._upload_local(image_data, filename) - return self._upload_cos(image_data, filename) - - def _upload_cos(self, image_data: bytes, filename: Optional[str] = None) -> str: - """上传到 COS""" + """上传图片到 COS,返回 object_key""" + self._require_cos() object_key = filename or _generate_object_key(ext=".jpg") try: self._client.put_object( @@ -125,13 +204,11 @@ class COSStorage: logger.info(f"COS 上传成功: {object_key}") return object_key except Exception as e: - logger.error(f"COS 上传失败,回退本地: {e}") - return self._upload_local(image_data, filename) + raise CosStorageError(f"COS 图片上传失败: {e}") def upload_file(self, file_data: bytes, object_key: str, content_type: str = "application/octet-stream") -> str: """上传任意文件到 COS""" - if self._use_local: - return self._upload_local(file_data, object_key) + self._require_cos() try: self._client.put_object( Bucket=settings.cos.bucket, @@ -142,35 +219,16 @@ class COSStorage: logger.info(f"COS 文件上传成功: {object_key}") return object_key except Exception as e: - logger.error(f"COS 文件上传失败: {e}") - return self._upload_local(file_data, object_key) - - def _upload_local(self, data: bytes, filename: Optional[str] = None) -> str: - """本地存储回退""" - upload_dir = Path("uploads") - if filename is None: - filename = _generate_object_key(ext=".jpg") - - file_path = upload_dir / filename - file_path.parent.mkdir(parents=True, exist_ok=True) - - with open(file_path, "wb") as f: - f.write(data) - - local_url = f"/uploads/{filename}" - logger.info(f"本地保存: {local_url}") - return local_url + raise CosStorageError(f"COS 文件上传失败: {e}") # ======================== 下载(预签名 URL) ======================== def get_presigned_url(self, object_key: str, expire: Optional[int] = None) -> str: - """ - 获取预签名下载 URL + """获取预签名下载 URL""" + if object_key.startswith("http"): + return object_key - - COS 模式:生成带签名的临时 URL,过期后失效 - - 本地模式:直接返回本地路径 - """ - if self._use_local or object_key.startswith("/uploads/") or object_key.startswith("http"): + if not self._enabled: return object_key expire = expire or settings.cos.presign_expire @@ -186,14 +244,8 @@ class COSStorage: return object_key def get_presigned_upload_url(self, object_key: str, expire: Optional[int] = None) -> str: - """ - 获取预签名上传 URL(供前端直传) - - 前端拿到此 URL 后,直接 PUT 文件即可,无需经过后端中转。 - """ - if self._use_local: - return "" - + """获取预签名上传 URL(供前端直传)""" + self._require_cos() expire = expire or settings.cos.presign_expire try: url = self._client.get_presigned_url( @@ -204,111 +256,14 @@ class COSStorage: ) return url except Exception as e: - logger.error(f"生成预签名上传 URL 失败: {e}") - return "" - - # ======================== STS 临时凭证 ======================== - - def get_sts_credential(self, allow_prefix: Optional[str] = None) -> Optional[Dict]: - """ - 获取 STS 临时凭证(供前端 SDK 直传) - - 需安装: pip install qcloud-python-sts - - 返回: - { - "credentials": {"tmpSecretId": ..., "tmpSecretKey": ..., "sessionToken": ...}, - "expiredTime": ..., - "startTime": ..., - "bucket": ..., - "region": ..., - "allowPrefix": ... - } - """ - if self._use_local: - return None - - try: - from sts.sts import Sts - - cfg = settings.cos - prefix = allow_prefix or f"{cfg.upload_prefix}/*" - - # 提取 appid from bucket (格式: name-appid) - parts = cfg.bucket.rsplit("-", 1) - appid = parts[1] if len(parts) == 2 else "" - - sts_config = { - "duration_seconds": cfg.sts_expire, - "secret_id": cfg.secret_id, - "secret_key": cfg.secret_key, - "bucket": cfg.bucket, - "region": cfg.region, - "allow_prefix": prefix, - "allow_actions": [ - "name/cos:PutObject", - "name/cos:PostObject", - "name/cos:InitiateMultipartUpload", - "name/cos:ListMultipartUploads", - "name/cos:ListParts", - "name/cos:UploadPart", - "name/cos:CompleteMultipartUpload", - ], - "policy": { - "version": "2.0", - "statement": [ - { - "action": [ - "name/cos:PutObject", - "name/cos:PostObject", - "name/cos:InitiateMultipartUpload", - "name/cos:ListMultipartUploads", - "name/cos:ListParts", - "name/cos:UploadPart", - "name/cos:CompleteMultipartUpload", - ], - "effect": "allow", - "resource": [ - f"qcs::cos:{cfg.region}:uid/{appid}:{cfg.bucket}/{prefix}", - ], - } - ], - }, - } - - sts = Sts(sts_config) - response = sts.get_credential() - - result = { - "credentials": response["credentials"], - "expiredTime": response["expiredTime"], - "startTime": response.get("startTime"), - "bucket": cfg.bucket, - "region": cfg.region, - "allowPrefix": prefix, - } - - logger.info(f"STS 凭证签发成功, prefix={prefix}") - return result - - except ImportError: - logger.error("qcloud-python-sts 未安装,无法签发 STS 凭证。请运行: pip install qcloud-python-sts") - return None - except Exception as e: - logger.error(f"STS 凭证签发失败: {e}") - return None + raise CosStorageError(f"生成预签名上传 URL 失败: {e}") # ======================== 删除 ======================== def delete_object(self, object_key: str) -> bool: - """删除对象""" - if self._use_local: - local_path = Path(object_key.lstrip("/")) - if local_path.exists(): - local_path.unlink() - return True + """删除 COS 对象""" + if not self._enabled: return False - try: self._client.delete_object( Bucket=settings.cos.bucket, @@ -328,9 +283,6 @@ class COSStorage: return "" if path.startswith("http"): return path - if path.startswith("/uploads/"): - return path - # COS object_key → 预签名 URL return self.get_presigned_url(path) @@ -339,7 +291,7 @@ _cos_storage: Optional[COSStorage] = None def get_oss_storage() -> COSStorage: - """获取存储服务单例(保持旧函数名兼容)""" + """获取存储服务单例""" global _cos_storage if _cos_storage is None: _cos_storage = COSStorage()