Files
16337 94ad8d8045 fix(aiot): COS 未启用时 get_url 返回空字符串而非裸 object_key
修复告警截图显示为损坏图片的根因:
- get_url() 保证返回值为 http(s) URL 或空字符串
- _init() 捕获 COS 初始化异常,优雅降级而非崩溃

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-02 14:21:11 +08:00

312 lines
10 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
对象存储服务(腾讯云 COS
支持两种认证模式:
1. CVM 角色认证通过元数据服务自动获取临时凭证推荐Docker 需配置 iptables 转发)
2. 密钥认证:通过 COS_SECRET_ID / COS_SECRET_KEY 环境变量(备选)
"""
import os
import time
import uuid
from datetime import datetime, timezone
from typing import Optional
from pathlib import Path
from app.config import settings
from app.utils.logger import logger
# 按需导入 COS SDK
_cos_client = None
_cos_available = False
try:
from qcloud_cos import CosConfig, CosS3Client
_cos_available = True
except ImportError:
_cos_available = False
class CosStorageError(Exception):
"""COS 存储操作异常"""
pass
class CvmRoleCredential:
"""CVM 角色凭证提供者,从元数据服务自动获取和刷新临时凭证"""
def __init__(self, role_name: str):
self._role_name = role_name
self._expired_time = 0
self._secret_id = ""
self._secret_key = ""
self._token = ""
self._refresh()
def _refresh(self):
"""从元数据服务获取临时凭证提前5分钟刷新"""
if time.time() < self._expired_time - 300:
return
import requests
url = f"http://metadata.tencentyun.com/latest/meta-data/cam/security-credentials/{self._role_name}"
resp = requests.get(url, timeout=5)
resp.raise_for_status()
data = resp.json()
if data.get("Code") != "Success":
raise CosStorageError(f"获取 CVM 角色凭证失败: {data}")
self._secret_id = data["TmpSecretId"]
self._secret_key = data["TmpSecretKey"]
self._token = data["Token"]
self._expired_time = data["ExpiredTime"]
logger.info(f"CVM 角色凭证已刷新,过期时间: {datetime.fromtimestamp(self._expired_time)}")
@property
def secret_id(self):
self._refresh()
return self._secret_id
@property
def secret_key(self):
self._refresh()
return self._secret_key
@property
def token(self):
self._refresh()
return self._token
def _discover_cvm_role() -> Optional[str]:
"""自动发现 CVM 绑定的角色名"""
import requests
try:
resp = requests.get(
"http://metadata.tencentyun.com/latest/meta-data/cam/security-credentials/",
timeout=3,
)
if resp.status_code == 200 and resp.text.strip():
return resp.text.strip()
except Exception:
pass
return None
def _get_cos_client():
"""懒加载 COS 客户端单例"""
global _cos_client
if _cos_client is not None:
return _cos_client
if not _cos_available:
raise CosStorageError("qcloud_cos 未安装,请运行: pip install cos-python-sdk-v5")
cfg = settings.cos
if not cfg.enabled:
return None
if not cfg.bucket:
raise CosStorageError("COS_BUCKET 未配置")
secret_id = os.getenv("COS_SECRET_ID", "")
secret_key = os.getenv("COS_SECRET_KEY", "")
try:
if secret_id and secret_key:
# 模式1密钥认证
cos_config = CosConfig(
Region=cfg.region,
SecretId=secret_id,
SecretKey=secret_key,
Scheme="https",
)
logger.info(f"COS 客户端初始化成功 (密钥模式): bucket={cfg.bucket}")
else:
# 模式2CVM 角色认证(通过元数据服务获取临时凭证)
role_name = _discover_cvm_role()
if not role_name:
raise CosStorageError(
"COS 认证失败: 未设置 COS_SECRET_ID/COS_SECRET_KEY"
"且无法访问 CVM 元数据服务。请检查 iptables 转发规则或配置密钥。"
)
credential = CvmRoleCredential(role_name)
cos_config = CosConfig(
Region=cfg.region,
CredentialInstance=credential,
Scheme="https",
)
logger.info(f"COS 客户端初始化成功 (CVM角色: {role_name}): bucket={cfg.bucket}")
_cos_client = CosS3Client(cos_config)
return _cos_client
except CosStorageError:
raise
except Exception as e:
raise CosStorageError(f"COS 客户端初始化失败: {e}")
def _generate_object_key(prefix: str = "", ext: str = ".jpg") -> str:
"""
生成对象存储 Key
格式: {prefix}/{YYYY}/{MM}/{DD}/{YYYYMMDDHHmmss}_{uuid8}{ext}
"""
now = datetime.now(timezone.utc)
date_path = now.strftime("%Y/%m/%d")
timestamp = now.strftime("%Y%m%d%H%M%S")
unique_id = uuid.uuid4().hex[:8].upper()
prefix = prefix.strip("/") if prefix else settings.cos.upload_prefix
return f"{prefix}/{date_path}/{timestamp}_{unique_id}{ext}"
class COSStorage:
"""
对象存储接口(纯 COS 模式)
COS_ENABLED=true: 使用腾讯云 COS上传失败直接报错
COS_ENABLED=false: 未启用,调用上传方法时报错
"""
def __init__(self):
self._client = None
self._enabled = False
self._init()
def _init(self):
"""初始化 COS 客户端"""
if not settings.cos.enabled:
logger.info("COS 未启用 (COS_ENABLED=false)")
return
try:
self._client = _get_cos_client()
if self._client:
self._enabled = True
except Exception as e:
logger.error(f"COS 初始化失败,截图预签名 URL 将不可用: {e}")
self._enabled = False
@property
def is_cos_mode(self) -> bool:
return self._enabled and self._client is not None
def _require_cos(self):
"""检查 COS 是否可用,不可用则抛出异常"""
if not self._enabled or self._client is None:
raise CosStorageError("COS 未启用或初始化失败,请检查 COS_ENABLED 和 CVM 角色绑定")
# ======================== 上传 ========================
def upload_image(self, image_data: bytes, filename: Optional[str] = None) -> str:
"""上传图片到 COS返回 object_key"""
self._require_cos()
object_key = filename or _generate_object_key(ext=".jpg")
try:
self._client.put_object(
Bucket=settings.cos.bucket,
Body=image_data,
Key=object_key,
ContentType="image/jpeg",
)
logger.info(f"COS 上传成功: {object_key}")
return object_key
except Exception as e:
raise CosStorageError(f"COS 图片上传失败: {e}")
def upload_file(self, file_data: bytes, object_key: str, content_type: str = "application/octet-stream") -> str:
"""上传任意文件到 COS"""
self._require_cos()
try:
self._client.put_object(
Bucket=settings.cos.bucket,
Body=file_data,
Key=object_key,
ContentType=content_type,
)
logger.info(f"COS 文件上传成功: {object_key}")
return object_key
except Exception as e:
raise CosStorageError(f"COS 文件上传失败: {e}")
# ======================== 下载(预签名 URL ========================
def get_presigned_url(self, object_key: str, expire: Optional[int] = None) -> str:
"""获取预签名下载 URL"""
if object_key.startswith("http"):
return object_key
if not self._enabled:
return object_key
expire = expire or settings.cos.presign_expire
try:
url = self._client.get_presigned_download_url(
Bucket=settings.cos.bucket,
Key=object_key,
Expired=expire,
)
return url
except Exception as e:
logger.error(f"生成预签名 URL 失败: {e}")
return object_key
def get_presigned_upload_url(self, object_key: str, expire: Optional[int] = None) -> str:
"""获取预签名上传 URL供前端直传"""
self._require_cos()
expire = expire or settings.cos.presign_expire
try:
url = self._client.get_presigned_url(
Method="PUT",
Bucket=settings.cos.bucket,
Key=object_key,
Expired=expire,
)
return url
except Exception as e:
raise CosStorageError(f"生成预签名上传 URL 失败: {e}")
# ======================== 删除 ========================
def delete_object(self, object_key: str) -> bool:
"""删除 COS 对象"""
if not self._enabled:
return False
try:
self._client.delete_object(
Bucket=settings.cos.bucket,
Key=object_key,
)
logger.info(f"COS 对象已删除: {object_key}")
return True
except Exception as e:
logger.error(f"COS 删除失败: {e}")
return False
# ======================== 兼容旧接口 ========================
def get_url(self, path: str) -> str:
"""获取访问 URL兼容旧代码调用
保证返回值要么是 http(s):// 开头的可访问 URL要么是空字符串。
绝不返回裸 object_key以免前端加载为相对路径导致损坏图片。
"""
if not path:
return ""
if path.startswith("http"):
return path
url = self.get_presigned_url(path)
# 兜底:如果未能生成有效 URLCOS 未启用/异常),返回空字符串
if not url.startswith("http"):
logger.warning(f"无法生成 COS 预签名 URL返回空: object_key={path}")
return ""
return url
# 全局单例
_cos_storage: Optional[COSStorage] = None
def get_oss_storage() -> COSStorage:
"""获取存储服务单例"""
global _cos_storage
if _cos_storage is None:
_cos_storage = COSStorage()
return _cos_storage