From 9cee0a2caca1497af9b4ef55b6e96a4e476fa4e7 Mon Sep 17 00:00:00 2001 From: 16337 <1633794139@qq.com> Date: Mon, 2 Mar 2026 13:18:35 +0800 Subject: [PATCH 1/3] =?UTF-8?q?fix:=20=E6=B7=BB=E5=8A=A0=20pymysql=20?= =?UTF-8?q?=E4=BE=9D=E8=B5=96=20+=20Edge=20=E5=85=BC=E5=AE=B9=E8=B7=AF?= =?UTF-8?q?=E7=94=B1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - requirements.txt 添加 pymysql、cos-python-sdk-v5 - 新增 /api/ai/alert/edge/report 和 /edge/resolve 路由(无认证) - 使 Edge 设备可直接上报到 FastAPI 服务 Co-Authored-By: Claude Opus 4.6 --- app/main.py | 13 ++++--- app/routers/__init__.py | 2 ++ app/routers/edge_compat.py | 71 ++++++++++++++++++++++++++++++++++++++ requirements.txt | 2 ++ 4 files changed, 81 insertions(+), 7 deletions(-) create mode 100644 app/routers/edge_compat.py diff --git a/app/main.py b/app/main.py index c265a22..e5c8385 100644 --- a/app/main.py +++ b/app/main.py @@ -27,7 +27,7 @@ from app.services.ai_analyzer import trigger_async_analysis from app.services.notification_service import get_notification_service from app.services.device_service import get_device_service from app.utils.logger import logger -from app.routers import yudao_alert_router, yudao_auth_router, yudao_aiot_alarm_router, yudao_aiot_edge_router, yudao_aiot_storage_router +from app.routers import yudao_alert_router, yudao_auth_router, yudao_aiot_alarm_router, yudao_aiot_edge_router, yudao_aiot_storage_router, edge_compat_router from app.yudao_compat import yudao_exception_handler import json @@ -83,19 +83,18 @@ app.include_router(yudao_aiot_alarm_router) app.include_router(yudao_aiot_edge_router) app.include_router(yudao_aiot_storage_router) +# ==================== Edge 兼容路由 ==================== +# Edge 设备使用 /api/ai/alert/edge/* 路径上报(与 WVP 一致),无需认证 +app.include_router(edge_compat_router) + # 注册芋道格式异常处理器 app.add_exception_handler(HTTPException, yudao_exception_handler) -# ==================== 静态文件(本地截图) ==================== +# ==================== 静态文件 ==================== _uploads_dir = Path("uploads") _uploads_dir.mkdir(parents=True, exist_ok=True) app.mount("/uploads", StaticFiles(directory=str(_uploads_dir)), name="uploads") -# Edge 本地截图目录(COS 未配置时的回退访问路径) -_edge_captures_dir = Path(r"C:\Users\16337\PycharmProjects\ai_edge\data\captures") -if _edge_captures_dir.exists(): - app.mount("/captures", StaticFiles(directory=str(_edge_captures_dir)), name="captures") - def get_alert_svc(): return alert_service diff --git a/app/routers/__init__.py b/app/routers/__init__.py index 57ffb00..f66b691 100644 --- a/app/routers/__init__.py +++ b/app/routers/__init__.py @@ -9,6 +9,7 @@ from app.routers.yudao_alert import router as yudao_alert_router from app.routers.yudao_aiot_alarm import router as yudao_aiot_alarm_router from app.routers.yudao_aiot_edge import router as yudao_aiot_edge_router from app.routers.yudao_aiot_storage import router as yudao_aiot_storage_router +from app.routers.edge_compat import router as edge_compat_router __all__ = [ "yudao_auth_router", @@ -16,4 +17,5 @@ __all__ = [ "yudao_aiot_alarm_router", "yudao_aiot_edge_router", "yudao_aiot_storage_router", + "edge_compat_router", ] diff --git a/app/routers/edge_compat.py b/app/routers/edge_compat.py new file mode 100644 index 0000000..daa1088 --- /dev/null +++ b/app/routers/edge_compat.py @@ -0,0 +1,71 @@ +""" +Edge 设备兼容路由 + +Edge 设备使用 /api/ai/alert/edge/report 和 /api/ai/alert/edge/resolve 路径上报告警, +该路径与 WVP 端点一致。本模块提供相同路径的路由,无需认证, +使 Edge 设备可以直接上报到 FastAPI 服务。 +""" + +import asyncio +from datetime import datetime +from fastapi import APIRouter, Depends +from typing import Optional + +from app.yudao_compat import YudaoResponse +from app.services.alarm_event_service import get_alarm_event_service, AlarmEventService +from app.services.notification_service import get_notification_service +from app.schemas import EdgeAlarmReport, EdgeAlarmResolve +from app.utils.logger import logger + +router = APIRouter(prefix="/api/ai/alert", tags=["Edge-兼容路由"]) + + +@router.post("/edge/report") +async def edge_alarm_report( + report: EdgeAlarmReport, + service: AlarmEventService = Depends(get_alarm_event_service), +): + """ + Edge 告警上报(无认证) + + 与 /admin-api/aiot/alarm/edge/report 功能相同, + 但不要求认证,供 Edge 设备直接调用。 + """ + alarm = service.create_from_edge_report(report.model_dump()) + + if alarm is None: + return YudaoResponse.error(500, "告警创建失败") + + # WebSocket 通知 + try: + notification_svc = get_notification_service() + notification_svc.notify_sync("new_alert", alarm.to_dict()) + except Exception: + pass + + return YudaoResponse.success({ + "alarmId": alarm.alarm_id, + "created": True, + }) + + +@router.post("/edge/resolve") +async def edge_alarm_resolve( + resolve: EdgeAlarmResolve, + service: AlarmEventService = Depends(get_alarm_event_service), +): + """ + Edge 告警结束通知(无认证) + + 与 /admin-api/aiot/alarm/edge/resolve 功能相同, + 但不要求认证,供 Edge 设备直接调用。 + """ + success = service.resolve_alarm( + alarm_id=resolve.alarm_id, + duration_ms=resolve.duration_ms, + last_frame_time=resolve.last_frame_time, + resolve_type=resolve.resolve_type, + ) + if not success: + return YudaoResponse.error(404, "告警不存在") + return YudaoResponse.success(True) diff --git a/requirements.txt b/requirements.txt index 1169390..9e6c524 100644 --- a/requirements.txt +++ b/requirements.txt @@ -12,3 +12,5 @@ paho-mqtt==2.1.0 python-dotenv==1.0.1 websockets==12.0 redis>=5.0.0 +pymysql>=1.1.0 +cos-python-sdk-v5>=1.9.30 From 51dbad6794487e1fc71bc19773cb9c94fa60b412 Mon Sep 17 00:00:00 2001 From: 16337 <1633794139@qq.com> Date: Mon, 2 Mar 2026 13:33:28 +0800 Subject: [PATCH 2/3] =?UTF-8?q?fix:=20=E5=91=8A=E8=AD=A6=E5=88=97=E8=A1=A8?= =?UTF-8?q?500=E9=94=99=E8=AF=AF=E4=BF=AE=E5=A4=8D=20+=20=E6=91=84?= =?UTF-8?q?=E5=83=8F=E5=A4=B4=E6=9F=A5=E8=AF=A2=E5=AE=B9=E9=94=99=20+=20CO?= =?UTF-8?q?S=E8=AE=A4=E8=AF=81=E9=87=8D=E6=9E=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 添加缺失的 import httpx(修复 _notify_ops_platform NameError) - 摄像头批量查询加 try/except 容错,失败时降级使用 device_id - 摄像头查询超时从 5 秒提升到 15 秒 - COS 存储重构:支持 CVM 角色认证 + 密钥认证双模式 - STS 接口改为返回提示(CVM 模式不支持 STS) Co-Authored-By: Claude Opus 4.6 --- app/config.py | 14 +- app/routers/yudao_aiot_alarm.py | 33 ++- app/routers/yudao_aiot_storage.py | 22 +- app/services/oss_storage.py | 326 +++++++++++++----------------- 4 files changed, 171 insertions(+), 224 deletions(-) diff --git a/app/config.py b/app/config.py index af077e9..190771b 100644 --- a/app/config.py +++ b/app/config.py @@ -16,15 +16,12 @@ class DatabaseConfig: @dataclass class COSConfig: - """腾讯云 COS 存储配置""" - secret_id: str = "" - secret_key: str = "" + """腾讯云 COS 存储配置(通过 CVM 角色认证,无需密钥)""" region: str = "ap-beijing" bucket: str = "" # 格式: bucketname-appid upload_prefix: str = "alerts" # 对象 Key 前缀 presign_expire: int = 1800 # 预签名URL有效期(秒),默认30分钟 - sts_expire: int = 1800 # STS 临时凭证有效期(秒) - enabled: bool = False # 是否启用 COS(False 时使用本地存储) + enabled: bool = False # 是否启用 COS @dataclass @@ -75,7 +72,7 @@ class CameraNameConfig: name_field_priority: list = None # 查询超时(秒) - query_timeout: int = 5 + query_timeout: int = 15 def __post_init__(self): if self.name_field_priority is None: @@ -103,13 +100,10 @@ def load_settings() -> Settings: url=os.getenv("DATABASE_URL", "sqlite:///./data/alert_platform.db"), ), cos=COSConfig( - secret_id=os.getenv("COS_SECRET_ID", ""), - secret_key=os.getenv("COS_SECRET_KEY", ""), region=os.getenv("COS_REGION", "ap-beijing"), bucket=os.getenv("COS_BUCKET", ""), upload_prefix=os.getenv("COS_UPLOAD_PREFIX", "alerts"), presign_expire=int(os.getenv("COS_PRESIGN_EXPIRE", "1800")), - sts_expire=int(os.getenv("COS_STS_EXPIRE", "1800")), enabled=os.getenv("COS_ENABLED", "false").lower() == "true", ), app=AppConfig( @@ -133,7 +127,7 @@ def load_settings() -> Settings: camera_name=CameraNameConfig( wvp_api_base=os.getenv("WVP_API_BASE", "http://localhost:18080"), display_format=os.getenv("CAMERA_NAME_FORMAT", "{name}"), - query_timeout=int(os.getenv("CAMERA_QUERY_TIMEOUT", "5")), + query_timeout=int(os.getenv("CAMERA_QUERY_TIMEOUT", "15")), ), ) diff --git a/app/routers/yudao_aiot_alarm.py b/app/routers/yudao_aiot_alarm.py index e2ad5c0..6683805 100644 --- a/app/routers/yudao_aiot_alarm.py +++ b/app/routers/yudao_aiot_alarm.py @@ -16,6 +16,8 @@ from fastapi import APIRouter, Query, Depends, HTTPException from typing import Optional from datetime import datetime import asyncio +import os +import httpx from app.yudao_compat import YudaoResponse, get_current_user from app.services.alarm_event_service import get_alarm_event_service, AlarmEventService @@ -41,8 +43,8 @@ async def _alarm_to_camel(alarm_dict: dict, camera_info_map: dict = None, camera snapshot_url = alarm_dict.get("snapshot_url") if snapshot_url: if snapshot_url.startswith("local:"): - # 本地截图(COS 未配置时的回退路径) - snapshot_url = "/captures/" + snapshot_url[6:] + # 边缘端 COS 未配置时的本地路径标记,云端无法访问 + snapshot_url = "" else: snapshot_url = storage.get_url(snapshot_url) @@ -173,9 +175,13 @@ async def get_alert_page( # 提取所有唯一的 device_id device_ids = list(set(a.device_id for a in alarms if a.device_id)) - # 批量查询摄像头信息(去重+并发优化) + # 批量查询摄像头信息(去重+并发优化),失败不影响列表返回 camera_service = get_camera_name_service() - camera_info_map = await camera_service.get_camera_infos_batch(device_ids) + camera_info_map = {} + try: + camera_info_map = await camera_service.get_camera_infos_batch(device_ids) + except Exception as e: + logger.warning(f"批量查询摄像头信息失败,将使用 device_id 作为名称: {e}") # 转换为 camelCase 格式(使用摄像头信息映射) alarm_list = [await _alarm_to_camel(a.to_dict(), camera_info_map, camera_service) for a in alarms] @@ -203,13 +209,16 @@ async def get_alert( if not alarm_dict: raise HTTPException(status_code=404, detail="告警不存在") - # 查询单个摄像头信息 + # 查询单个摄像头信息(失败不影响详情返回) device_id = alarm_dict.get("device_id") camera_service = get_camera_name_service() camera_info_map = {} if device_id: - camera_info = await camera_service.get_camera_info(device_id) - camera_info_map[device_id] = camera_info + try: + camera_info = await camera_service.get_camera_info(device_id) + camera_info_map[device_id] = camera_info + except Exception as e: + logger.warning(f"查询摄像头信息失败: device_id={device_id}, error={e}") return YudaoResponse.success(await _alarm_to_camel(alarm_dict, camera_info_map, camera_service)) @@ -296,8 +305,12 @@ async def get_device_summary_page( for item in result.get("list", []): device_id = item.get("deviceId") - # 批量查询摄像头信息 - camera_info = await camera_service.get_camera_info(device_id) + # 查询摄像头信息(失败不影响列表返回) + camera_info = None + try: + camera_info = await camera_service.get_camera_info(device_id) + except Exception as e: + logger.warning(f"查询摄像头信息失败: device_id={device_id}, error={e}") # 提取摄像头名称 device_name = camera_service.format_display_name(device_id, camera_info) @@ -398,7 +411,7 @@ async def edge_alarm_resolve( # ==================== 辅助函数 ==================== -OPS_ALARM_URL = "http://192.168.0.104:48080/admin-api/ops/alarm/receive" +OPS_ALARM_URL = os.getenv("OPS_ALARM_URL", "http://192.168.0.104:48080/admin-api/ops/alarm/receive") async def _notify_ops_platform(data: dict): diff --git a/app/routers/yudao_aiot_storage.py b/app/routers/yudao_aiot_storage.py index 09a60d5..9d807c6 100644 --- a/app/routers/yudao_aiot_storage.py +++ b/app/routers/yudao_aiot_storage.py @@ -117,21 +117,9 @@ async def get_sts_credential( current_user: dict = Depends(get_current_user), ): """ - 获取 STS 临时凭证 - - 前端使用 COS JS SDK 直传时,先调用此接口获取临时密钥。 - 返回的 tmpSecretId / tmpSecretKey / sessionToken 用于初始化 SDK。 - 凭证仅允许向指定前缀上传,不可读取或删除其他路径。 + STS 凭证接口(CVM 角色模式下不可用,请使用 /upload-url 预签名上传) """ - storage = get_oss_storage() - - if not storage.is_cos_mode: - raise HTTPException(status_code=400, detail="COS 未启用,无法签发 STS 凭证") - - allow_prefix = f"{prefix}/*" - credential = storage.get_sts_credential(allow_prefix) - - if not credential: - raise HTTPException(status_code=500, detail="STS 凭证签发失败,请检查 COS 配置或安装 qcloud-python-sts") - - return YudaoResponse.success(credential) + raise HTTPException( + status_code=400, + detail="当前使用 CVM 角色认证,不支持 STS 签发。请使用 /admin-api/aiot/storage/upload-url 获取预签名上传 URL" + ) diff --git a/app/services/oss_storage.py b/app/services/oss_storage.py index c1287fc..3f54728 100644 --- a/app/services/oss_storage.py +++ b/app/services/oss_storage.py @@ -1,23 +1,22 @@ """ -对象存储服务(腾讯云 COS + 本地回退) +对象存储服务(腾讯云 COS) -功能: -- COS_ENABLED=true 时使用腾讯云 COS 存储 -- COS_ENABLED=false 时回退到本地 uploads/ 目录 -- 后端上传:服务端直接将图片/文件写入 COS -- 预签名下载:生成带鉴权的临时下载 URL -- STS 临时凭证:签发前端直传用的临时 AK/SK/Token +支持两种认证模式: +1. CVM 角色认证:通过元数据服务自动获取临时凭证(推荐,Docker 需配置 iptables 转发) +2. 密钥认证:通过 COS_SECRET_ID / COS_SECRET_KEY 环境变量(备选) """ +import os +import time import uuid from datetime import datetime, timezone -from typing import Optional, Dict +from typing import Optional from pathlib import Path from app.config import settings from app.utils.logger import logger -# 按需导入 COS SDK,未安装时不影响本地模式 +# 按需导入 COS SDK _cos_client = None _cos_available = False @@ -28,6 +27,70 @@ except ImportError: _cos_available = False +class CosStorageError(Exception): + """COS 存储操作异常""" + pass + + +class CvmRoleCredential: + """CVM 角色凭证提供者,从元数据服务自动获取和刷新临时凭证""" + + def __init__(self, role_name: str): + self._role_name = role_name + self._expired_time = 0 + self._secret_id = "" + self._secret_key = "" + self._token = "" + self._refresh() + + def _refresh(self): + """从元数据服务获取临时凭证(提前5分钟刷新)""" + if time.time() < self._expired_time - 300: + return + import requests + url = f"http://metadata.tencentyun.com/latest/meta-data/cam/security-credentials/{self._role_name}" + resp = requests.get(url, timeout=5) + resp.raise_for_status() + data = resp.json() + if data.get("Code") != "Success": + raise CosStorageError(f"获取 CVM 角色凭证失败: {data}") + self._secret_id = data["TmpSecretId"] + self._secret_key = data["TmpSecretKey"] + self._token = data["Token"] + self._expired_time = data["ExpiredTime"] + logger.info(f"CVM 角色凭证已刷新,过期时间: {datetime.fromtimestamp(self._expired_time)}") + + @property + def secret_id(self): + self._refresh() + return self._secret_id + + @property + def secret_key(self): + self._refresh() + return self._secret_key + + @property + def token(self): + self._refresh() + return self._token + + +def _discover_cvm_role() -> Optional[str]: + """自动发现 CVM 绑定的角色名""" + import requests + try: + resp = requests.get( + "http://metadata.tencentyun.com/latest/meta-data/cam/security-credentials/", + timeout=3, + ) + if resp.status_code == 200 and resp.text.strip(): + return resp.text.strip() + except Exception: + pass + return None + + def _get_cos_client(): """懒加载 COS 客户端单例""" global _cos_client @@ -35,33 +98,55 @@ def _get_cos_client(): return _cos_client if not _cos_available: - logger.warning("qcloud_cos 未安装,使用本地存储模式") - return None + raise CosStorageError("qcloud_cos 未安装,请运行: pip install cos-python-sdk-v5") cfg = settings.cos - if not cfg.enabled or not cfg.secret_id or not cfg.bucket: + if not cfg.enabled: return None + if not cfg.bucket: + raise CosStorageError("COS_BUCKET 未配置") + + secret_id = os.getenv("COS_SECRET_ID", "") + secret_key = os.getenv("COS_SECRET_KEY", "") try: - cos_config = CosConfig( - Region=cfg.region, - SecretId=cfg.secret_id, - SecretKey=cfg.secret_key, - Scheme="https", - ) + if secret_id and secret_key: + # 模式1:密钥认证 + cos_config = CosConfig( + Region=cfg.region, + SecretId=secret_id, + SecretKey=secret_key, + Scheme="https", + ) + logger.info(f"COS 客户端初始化成功 (密钥模式): bucket={cfg.bucket}") + else: + # 模式2:CVM 角色认证(通过元数据服务获取临时凭证) + role_name = _discover_cvm_role() + if not role_name: + raise CosStorageError( + "COS 认证失败: 未设置 COS_SECRET_ID/COS_SECRET_KEY," + "且无法访问 CVM 元数据服务。请检查 iptables 转发规则或配置密钥。" + ) + credential = CvmRoleCredential(role_name) + cos_config = CosConfig( + Region=cfg.region, + CredentialInstance=credential, + Scheme="https", + ) + logger.info(f"COS 客户端初始化成功 (CVM角色: {role_name}): bucket={cfg.bucket}") + _cos_client = CosS3Client(cos_config) - logger.info(f"COS 客户端初始化成功: bucket={cfg.bucket}, region={cfg.region}") return _cos_client + except CosStorageError: + raise except Exception as e: - logger.error(f"COS 客户端初始化失败: {e}") - return None + raise CosStorageError(f"COS 客户端初始化失败: {e}") def _generate_object_key(prefix: str = "", ext: str = ".jpg") -> str: """ 生成对象存储 Key 格式: {prefix}/{YYYY}/{MM}/{DD}/{YYYYMMDDHHmmss}_{uuid8}{ext} - 示例: alerts/2026/02/09/20260209153000_A1B2C3D4.jpg """ now = datetime.now(timezone.utc) date_path = now.strftime("%Y/%m/%d") @@ -73,47 +158,41 @@ def _generate_object_key(prefix: str = "", ext: str = ".jpg") -> str: class COSStorage: """ - 对象存储统一接口 + 对象存储接口(纯 COS 模式) - - COS 模式:调用腾讯云 COS SDK - - 本地模式:写入 uploads/ 目录,返回相对路径 + COS_ENABLED=true: 使用腾讯云 COS,上传失败直接报错 + COS_ENABLED=false: 未启用,调用上传方法时报错 """ def __init__(self): self._client = None - self._use_local = True + self._enabled = False self._init() def _init(self): - """初始化存储后端""" - if settings.cos.enabled: - client = _get_cos_client() - if client: - self._client = client - self._use_local = False - return + """初始化 COS 客户端""" + if not settings.cos.enabled: + logger.info("COS 未启用 (COS_ENABLED=false)") + return - logger.info("使用本地文件存储模式") - self._use_local = True + self._client = _get_cos_client() + if self._client: + self._enabled = True @property def is_cos_mode(self) -> bool: - return not self._use_local and self._client is not None + return self._enabled and self._client is not None + + def _require_cos(self): + """检查 COS 是否可用,不可用则抛出异常""" + if not self._enabled or self._client is None: + raise CosStorageError("COS 未启用或初始化失败,请检查 COS_ENABLED 和 CVM 角色绑定") # ======================== 上传 ======================== def upload_image(self, image_data: bytes, filename: Optional[str] = None) -> str: - """ - 上传图片,返回 object_key(COS 模式)或本地路径(本地模式) - - 数据库中存储此返回值,下载时通过 get_presigned_url() 获取临时访问地址。 - """ - if self._use_local: - return self._upload_local(image_data, filename) - return self._upload_cos(image_data, filename) - - def _upload_cos(self, image_data: bytes, filename: Optional[str] = None) -> str: - """上传到 COS""" + """上传图片到 COS,返回 object_key""" + self._require_cos() object_key = filename or _generate_object_key(ext=".jpg") try: self._client.put_object( @@ -125,13 +204,11 @@ class COSStorage: logger.info(f"COS 上传成功: {object_key}") return object_key except Exception as e: - logger.error(f"COS 上传失败,回退本地: {e}") - return self._upload_local(image_data, filename) + raise CosStorageError(f"COS 图片上传失败: {e}") def upload_file(self, file_data: bytes, object_key: str, content_type: str = "application/octet-stream") -> str: """上传任意文件到 COS""" - if self._use_local: - return self._upload_local(file_data, object_key) + self._require_cos() try: self._client.put_object( Bucket=settings.cos.bucket, @@ -142,35 +219,16 @@ class COSStorage: logger.info(f"COS 文件上传成功: {object_key}") return object_key except Exception as e: - logger.error(f"COS 文件上传失败: {e}") - return self._upload_local(file_data, object_key) - - def _upload_local(self, data: bytes, filename: Optional[str] = None) -> str: - """本地存储回退""" - upload_dir = Path("uploads") - if filename is None: - filename = _generate_object_key(ext=".jpg") - - file_path = upload_dir / filename - file_path.parent.mkdir(parents=True, exist_ok=True) - - with open(file_path, "wb") as f: - f.write(data) - - local_url = f"/uploads/{filename}" - logger.info(f"本地保存: {local_url}") - return local_url + raise CosStorageError(f"COS 文件上传失败: {e}") # ======================== 下载(预签名 URL) ======================== def get_presigned_url(self, object_key: str, expire: Optional[int] = None) -> str: - """ - 获取预签名下载 URL + """获取预签名下载 URL""" + if object_key.startswith("http"): + return object_key - - COS 模式:生成带签名的临时 URL,过期后失效 - - 本地模式:直接返回本地路径 - """ - if self._use_local or object_key.startswith("/uploads/") or object_key.startswith("http"): + if not self._enabled: return object_key expire = expire or settings.cos.presign_expire @@ -186,14 +244,8 @@ class COSStorage: return object_key def get_presigned_upload_url(self, object_key: str, expire: Optional[int] = None) -> str: - """ - 获取预签名上传 URL(供前端直传) - - 前端拿到此 URL 后,直接 PUT 文件即可,无需经过后端中转。 - """ - if self._use_local: - return "" - + """获取预签名上传 URL(供前端直传)""" + self._require_cos() expire = expire or settings.cos.presign_expire try: url = self._client.get_presigned_url( @@ -204,111 +256,14 @@ class COSStorage: ) return url except Exception as e: - logger.error(f"生成预签名上传 URL 失败: {e}") - return "" - - # ======================== STS 临时凭证 ======================== - - def get_sts_credential(self, allow_prefix: Optional[str] = None) -> Optional[Dict]: - """ - 获取 STS 临时凭证(供前端 SDK 直传) - - 需安装: pip install qcloud-python-sts - - 返回: - { - "credentials": {"tmpSecretId": ..., "tmpSecretKey": ..., "sessionToken": ...}, - "expiredTime": ..., - "startTime": ..., - "bucket": ..., - "region": ..., - "allowPrefix": ... - } - """ - if self._use_local: - return None - - try: - from sts.sts import Sts - - cfg = settings.cos - prefix = allow_prefix or f"{cfg.upload_prefix}/*" - - # 提取 appid from bucket (格式: name-appid) - parts = cfg.bucket.rsplit("-", 1) - appid = parts[1] if len(parts) == 2 else "" - - sts_config = { - "duration_seconds": cfg.sts_expire, - "secret_id": cfg.secret_id, - "secret_key": cfg.secret_key, - "bucket": cfg.bucket, - "region": cfg.region, - "allow_prefix": prefix, - "allow_actions": [ - "name/cos:PutObject", - "name/cos:PostObject", - "name/cos:InitiateMultipartUpload", - "name/cos:ListMultipartUploads", - "name/cos:ListParts", - "name/cos:UploadPart", - "name/cos:CompleteMultipartUpload", - ], - "policy": { - "version": "2.0", - "statement": [ - { - "action": [ - "name/cos:PutObject", - "name/cos:PostObject", - "name/cos:InitiateMultipartUpload", - "name/cos:ListMultipartUploads", - "name/cos:ListParts", - "name/cos:UploadPart", - "name/cos:CompleteMultipartUpload", - ], - "effect": "allow", - "resource": [ - f"qcs::cos:{cfg.region}:uid/{appid}:{cfg.bucket}/{prefix}", - ], - } - ], - }, - } - - sts = Sts(sts_config) - response = sts.get_credential() - - result = { - "credentials": response["credentials"], - "expiredTime": response["expiredTime"], - "startTime": response.get("startTime"), - "bucket": cfg.bucket, - "region": cfg.region, - "allowPrefix": prefix, - } - - logger.info(f"STS 凭证签发成功, prefix={prefix}") - return result - - except ImportError: - logger.error("qcloud-python-sts 未安装,无法签发 STS 凭证。请运行: pip install qcloud-python-sts") - return None - except Exception as e: - logger.error(f"STS 凭证签发失败: {e}") - return None + raise CosStorageError(f"生成预签名上传 URL 失败: {e}") # ======================== 删除 ======================== def delete_object(self, object_key: str) -> bool: - """删除对象""" - if self._use_local: - local_path = Path(object_key.lstrip("/")) - if local_path.exists(): - local_path.unlink() - return True + """删除 COS 对象""" + if not self._enabled: return False - try: self._client.delete_object( Bucket=settings.cos.bucket, @@ -328,9 +283,6 @@ class COSStorage: return "" if path.startswith("http"): return path - if path.startswith("/uploads/"): - return path - # COS object_key → 预签名 URL return self.get_presigned_url(path) @@ -339,7 +291,7 @@ _cos_storage: Optional[COSStorage] = None def get_oss_storage() -> COSStorage: - """获取存储服务单例(保持旧函数名兼容)""" + """获取存储服务单例""" global _cos_storage if _cos_storage is None: _cos_storage = COSStorage() From 94ad8d8045fb2c50859b1ec1eb07ff4935802ea3 Mon Sep 17 00:00:00 2001 From: 16337 <1633794139@qq.com> Date: Mon, 2 Mar 2026 14:21:11 +0800 Subject: [PATCH 3/3] =?UTF-8?q?fix(aiot):=20COS=20=E6=9C=AA=E5=90=AF?= =?UTF-8?q?=E7=94=A8=E6=97=B6=20get=5Furl=20=E8=BF=94=E5=9B=9E=E7=A9=BA?= =?UTF-8?q?=E5=AD=97=E7=AC=A6=E4=B8=B2=E8=80=8C=E9=9D=9E=E8=A3=B8=20object?= =?UTF-8?q?=5Fkey?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 修复告警截图显示为损坏图片的根因: - get_url() 保证返回值为 http(s) URL 或空字符串 - _init() 捕获 COS 初始化异常,优雅降级而非崩溃 Co-Authored-By: Claude Opus 4.6 --- app/services/oss_storage.py | 23 ++++++++++++++++++----- 1 file changed, 18 insertions(+), 5 deletions(-) diff --git a/app/services/oss_storage.py b/app/services/oss_storage.py index 3f54728..49b0cb9 100644 --- a/app/services/oss_storage.py +++ b/app/services/oss_storage.py @@ -175,9 +175,13 @@ class COSStorage: logger.info("COS 未启用 (COS_ENABLED=false)") return - self._client = _get_cos_client() - if self._client: - self._enabled = True + try: + self._client = _get_cos_client() + if self._client: + self._enabled = True + except Exception as e: + logger.error(f"COS 初始化失败,截图预签名 URL 将不可用: {e}") + self._enabled = False @property def is_cos_mode(self) -> bool: @@ -278,12 +282,21 @@ class COSStorage: # ======================== 兼容旧接口 ======================== def get_url(self, path: str) -> str: - """获取访问 URL(兼容旧代码调用)""" + """获取访问 URL(兼容旧代码调用) + + 保证返回值要么是 http(s):// 开头的可访问 URL,要么是空字符串。 + 绝不返回裸 object_key,以免前端加载为相对路径导致损坏图片。 + """ if not path: return "" if path.startswith("http"): return path - return self.get_presigned_url(path) + url = self.get_presigned_url(path) + # 兜底:如果未能生成有效 URL(COS 未启用/异常),返回空字符串 + if not url.startswith("http"): + logger.warning(f"无法生成 COS 预签名 URL,返回空: object_key={path}") + return "" + return url # 全局单例