2026-02-09 17:47:35 +08:00
|
|
|
|
"""
|
2026-03-02 13:33:28 +08:00
|
|
|
|
对象存储服务(腾讯云 COS)
|
|
|
|
|
|
|
|
|
|
|
|
支持两种认证模式:
|
|
|
|
|
|
1. CVM 角色认证:通过元数据服务自动获取临时凭证(推荐,Docker 需配置 iptables 转发)
|
|
|
|
|
|
2. 密钥认证:通过 COS_SECRET_ID / COS_SECRET_KEY 环境变量(备选)
|
2026-02-09 17:47:35 +08:00
|
|
|
|
"""
|
2026-03-02 13:33:28 +08:00
|
|
|
|
import os
|
|
|
|
|
|
import time
|
2026-02-02 09:40:02 +08:00
|
|
|
|
import uuid
|
|
|
|
|
|
from datetime import datetime, timezone
|
2026-03-02 13:33:28 +08:00
|
|
|
|
from typing import Optional
|
2026-02-05 13:57:49 +08:00
|
|
|
|
from pathlib import Path
|
2026-02-02 09:40:02 +08:00
|
|
|
|
|
|
|
|
|
|
from app.config import settings
|
|
|
|
|
|
from app.utils.logger import logger
|
|
|
|
|
|
|
|
|
|
|
|
|
2026-03-02 13:33:28 +08:00
|
|
|
|
# 按需导入 COS SDK
|
2026-02-09 17:47:35 +08:00
|
|
|
|
_cos_client = None
|
|
|
|
|
|
_cos_available = False
|
|
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
|
from qcloud_cos import CosConfig, CosS3Client
|
|
|
|
|
|
_cos_available = True
|
|
|
|
|
|
except ImportError:
|
|
|
|
|
|
_cos_available = False
|
|
|
|
|
|
|
|
|
|
|
|
|
2026-03-02 13:33:28 +08:00
|
|
|
|
class CosStorageError(Exception):
|
|
|
|
|
|
"""COS 存储操作异常"""
|
|
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class CvmRoleCredential:
|
|
|
|
|
|
"""CVM 角色凭证提供者,从元数据服务自动获取和刷新临时凭证"""
|
|
|
|
|
|
|
|
|
|
|
|
def __init__(self, role_name: str):
|
|
|
|
|
|
self._role_name = role_name
|
|
|
|
|
|
self._expired_time = 0
|
|
|
|
|
|
self._secret_id = ""
|
|
|
|
|
|
self._secret_key = ""
|
|
|
|
|
|
self._token = ""
|
|
|
|
|
|
self._refresh()
|
|
|
|
|
|
|
|
|
|
|
|
def _refresh(self):
|
|
|
|
|
|
"""从元数据服务获取临时凭证(提前5分钟刷新)"""
|
|
|
|
|
|
if time.time() < self._expired_time - 300:
|
|
|
|
|
|
return
|
|
|
|
|
|
import requests
|
|
|
|
|
|
url = f"http://metadata.tencentyun.com/latest/meta-data/cam/security-credentials/{self._role_name}"
|
|
|
|
|
|
resp = requests.get(url, timeout=5)
|
|
|
|
|
|
resp.raise_for_status()
|
|
|
|
|
|
data = resp.json()
|
|
|
|
|
|
if data.get("Code") != "Success":
|
|
|
|
|
|
raise CosStorageError(f"获取 CVM 角色凭证失败: {data}")
|
|
|
|
|
|
self._secret_id = data["TmpSecretId"]
|
|
|
|
|
|
self._secret_key = data["TmpSecretKey"]
|
|
|
|
|
|
self._token = data["Token"]
|
|
|
|
|
|
self._expired_time = data["ExpiredTime"]
|
|
|
|
|
|
logger.info(f"CVM 角色凭证已刷新,过期时间: {datetime.fromtimestamp(self._expired_time)}")
|
|
|
|
|
|
|
|
|
|
|
|
@property
|
|
|
|
|
|
def secret_id(self):
|
|
|
|
|
|
self._refresh()
|
|
|
|
|
|
return self._secret_id
|
|
|
|
|
|
|
|
|
|
|
|
@property
|
|
|
|
|
|
def secret_key(self):
|
|
|
|
|
|
self._refresh()
|
|
|
|
|
|
return self._secret_key
|
|
|
|
|
|
|
|
|
|
|
|
@property
|
|
|
|
|
|
def token(self):
|
|
|
|
|
|
self._refresh()
|
|
|
|
|
|
return self._token
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _discover_cvm_role() -> Optional[str]:
|
|
|
|
|
|
"""自动发现 CVM 绑定的角色名"""
|
|
|
|
|
|
import requests
|
|
|
|
|
|
try:
|
|
|
|
|
|
resp = requests.get(
|
|
|
|
|
|
"http://metadata.tencentyun.com/latest/meta-data/cam/security-credentials/",
|
|
|
|
|
|
timeout=3,
|
|
|
|
|
|
)
|
|
|
|
|
|
if resp.status_code == 200 and resp.text.strip():
|
|
|
|
|
|
return resp.text.strip()
|
|
|
|
|
|
except Exception:
|
|
|
|
|
|
pass
|
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
|
|
|
2026-02-09 17:47:35 +08:00
|
|
|
|
def _get_cos_client():
|
|
|
|
|
|
"""懒加载 COS 客户端单例"""
|
|
|
|
|
|
global _cos_client
|
|
|
|
|
|
if _cos_client is not None:
|
|
|
|
|
|
return _cos_client
|
|
|
|
|
|
|
|
|
|
|
|
if not _cos_available:
|
2026-03-02 13:33:28 +08:00
|
|
|
|
raise CosStorageError("qcloud_cos 未安装,请运行: pip install cos-python-sdk-v5")
|
2026-02-09 17:47:35 +08:00
|
|
|
|
|
|
|
|
|
|
cfg = settings.cos
|
2026-03-02 13:33:28 +08:00
|
|
|
|
if not cfg.enabled:
|
2026-02-09 17:47:35 +08:00
|
|
|
|
return None
|
2026-03-02 13:33:28 +08:00
|
|
|
|
if not cfg.bucket:
|
|
|
|
|
|
raise CosStorageError("COS_BUCKET 未配置")
|
|
|
|
|
|
|
|
|
|
|
|
secret_id = os.getenv("COS_SECRET_ID", "")
|
|
|
|
|
|
secret_key = os.getenv("COS_SECRET_KEY", "")
|
2026-02-09 17:47:35 +08:00
|
|
|
|
|
|
|
|
|
|
try:
|
2026-03-02 13:33:28 +08:00
|
|
|
|
if secret_id and secret_key:
|
|
|
|
|
|
# 模式1:密钥认证
|
|
|
|
|
|
cos_config = CosConfig(
|
|
|
|
|
|
Region=cfg.region,
|
|
|
|
|
|
SecretId=secret_id,
|
|
|
|
|
|
SecretKey=secret_key,
|
|
|
|
|
|
Scheme="https",
|
|
|
|
|
|
)
|
|
|
|
|
|
logger.info(f"COS 客户端初始化成功 (密钥模式): bucket={cfg.bucket}")
|
|
|
|
|
|
else:
|
|
|
|
|
|
# 模式2:CVM 角色认证(通过元数据服务获取临时凭证)
|
|
|
|
|
|
role_name = _discover_cvm_role()
|
|
|
|
|
|
if not role_name:
|
|
|
|
|
|
raise CosStorageError(
|
|
|
|
|
|
"COS 认证失败: 未设置 COS_SECRET_ID/COS_SECRET_KEY,"
|
|
|
|
|
|
"且无法访问 CVM 元数据服务。请检查 iptables 转发规则或配置密钥。"
|
|
|
|
|
|
)
|
|
|
|
|
|
credential = CvmRoleCredential(role_name)
|
|
|
|
|
|
cos_config = CosConfig(
|
|
|
|
|
|
Region=cfg.region,
|
|
|
|
|
|
CredentialInstance=credential,
|
|
|
|
|
|
Scheme="https",
|
|
|
|
|
|
)
|
|
|
|
|
|
logger.info(f"COS 客户端初始化成功 (CVM角色: {role_name}): bucket={cfg.bucket}")
|
|
|
|
|
|
|
2026-02-09 17:47:35 +08:00
|
|
|
|
_cos_client = CosS3Client(cos_config)
|
|
|
|
|
|
return _cos_client
|
2026-03-02 13:33:28 +08:00
|
|
|
|
except CosStorageError:
|
|
|
|
|
|
raise
|
2026-02-09 17:47:35 +08:00
|
|
|
|
except Exception as e:
|
2026-03-02 13:33:28 +08:00
|
|
|
|
raise CosStorageError(f"COS 客户端初始化失败: {e}")
|
2026-02-09 17:47:35 +08:00
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _generate_object_key(prefix: str = "", ext: str = ".jpg") -> str:
|
2026-02-05 13:57:49 +08:00
|
|
|
|
"""
|
2026-02-09 17:47:35 +08:00
|
|
|
|
生成对象存储 Key
|
|
|
|
|
|
格式: {prefix}/{YYYY}/{MM}/{DD}/{YYYYMMDDHHmmss}_{uuid8}{ext}
|
2026-02-05 13:57:49 +08:00
|
|
|
|
"""
|
2026-02-09 17:47:35 +08:00
|
|
|
|
now = datetime.now(timezone.utc)
|
|
|
|
|
|
date_path = now.strftime("%Y/%m/%d")
|
|
|
|
|
|
timestamp = now.strftime("%Y%m%d%H%M%S")
|
|
|
|
|
|
unique_id = uuid.uuid4().hex[:8].upper()
|
|
|
|
|
|
prefix = prefix.strip("/") if prefix else settings.cos.upload_prefix
|
|
|
|
|
|
return f"{prefix}/{date_path}/{timestamp}_{unique_id}{ext}"
|
2026-02-05 13:57:49 +08:00
|
|
|
|
|
|
|
|
|
|
|
2026-02-09 17:47:35 +08:00
|
|
|
|
class COSStorage:
|
|
|
|
|
|
"""
|
2026-03-02 13:33:28 +08:00
|
|
|
|
对象存储接口(纯 COS 模式)
|
2026-02-02 09:40:02 +08:00
|
|
|
|
|
2026-03-02 13:33:28 +08:00
|
|
|
|
COS_ENABLED=true: 使用腾讯云 COS,上传失败直接报错
|
|
|
|
|
|
COS_ENABLED=false: 未启用,调用上传方法时报错
|
2026-02-09 17:47:35 +08:00
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
|
|
def __init__(self):
|
|
|
|
|
|
self._client = None
|
2026-03-02 13:33:28 +08:00
|
|
|
|
self._enabled = False
|
2026-02-09 17:47:35 +08:00
|
|
|
|
self._init()
|
|
|
|
|
|
|
|
|
|
|
|
def _init(self):
|
2026-03-02 13:33:28 +08:00
|
|
|
|
"""初始化 COS 客户端"""
|
|
|
|
|
|
if not settings.cos.enabled:
|
|
|
|
|
|
logger.info("COS 未启用 (COS_ENABLED=false)")
|
|
|
|
|
|
return
|
2026-02-09 17:47:35 +08:00
|
|
|
|
|
2026-03-02 14:21:11 +08:00
|
|
|
|
try:
|
|
|
|
|
|
self._client = _get_cos_client()
|
|
|
|
|
|
if self._client:
|
|
|
|
|
|
self._enabled = True
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
logger.error(f"COS 初始化失败,截图预签名 URL 将不可用: {e}")
|
|
|
|
|
|
self._enabled = False
|
2026-02-09 17:47:35 +08:00
|
|
|
|
|
|
|
|
|
|
@property
|
|
|
|
|
|
def is_cos_mode(self) -> bool:
|
2026-03-02 13:33:28 +08:00
|
|
|
|
return self._enabled and self._client is not None
|
|
|
|
|
|
|
|
|
|
|
|
def _require_cos(self):
|
|
|
|
|
|
"""检查 COS 是否可用,不可用则抛出异常"""
|
|
|
|
|
|
if not self._enabled or self._client is None:
|
|
|
|
|
|
raise CosStorageError("COS 未启用或初始化失败,请检查 COS_ENABLED 和 CVM 角色绑定")
|
2026-02-09 17:47:35 +08:00
|
|
|
|
|
|
|
|
|
|
# ======================== 上传 ========================
|
2026-02-02 09:40:02 +08:00
|
|
|
|
|
|
|
|
|
|
def upload_image(self, image_data: bytes, filename: Optional[str] = None) -> str:
|
2026-03-02 13:33:28 +08:00
|
|
|
|
"""上传图片到 COS,返回 object_key"""
|
|
|
|
|
|
self._require_cos()
|
2026-02-09 17:47:35 +08:00
|
|
|
|
object_key = filename or _generate_object_key(ext=".jpg")
|
|
|
|
|
|
try:
|
|
|
|
|
|
self._client.put_object(
|
|
|
|
|
|
Bucket=settings.cos.bucket,
|
|
|
|
|
|
Body=image_data,
|
|
|
|
|
|
Key=object_key,
|
|
|
|
|
|
ContentType="image/jpeg",
|
|
|
|
|
|
)
|
|
|
|
|
|
logger.info(f"COS 上传成功: {object_key}")
|
|
|
|
|
|
return object_key
|
|
|
|
|
|
except Exception as e:
|
2026-03-02 13:33:28 +08:00
|
|
|
|
raise CosStorageError(f"COS 图片上传失败: {e}")
|
2026-02-02 09:40:02 +08:00
|
|
|
|
|
2026-02-09 17:47:35 +08:00
|
|
|
|
def upload_file(self, file_data: bytes, object_key: str, content_type: str = "application/octet-stream") -> str:
|
|
|
|
|
|
"""上传任意文件到 COS"""
|
2026-03-02 13:33:28 +08:00
|
|
|
|
self._require_cos()
|
2026-02-09 17:47:35 +08:00
|
|
|
|
try:
|
|
|
|
|
|
self._client.put_object(
|
|
|
|
|
|
Bucket=settings.cos.bucket,
|
|
|
|
|
|
Body=file_data,
|
|
|
|
|
|
Key=object_key,
|
|
|
|
|
|
ContentType=content_type,
|
|
|
|
|
|
)
|
|
|
|
|
|
logger.info(f"COS 文件上传成功: {object_key}")
|
|
|
|
|
|
return object_key
|
|
|
|
|
|
except Exception as e:
|
2026-03-02 13:33:28 +08:00
|
|
|
|
raise CosStorageError(f"COS 文件上传失败: {e}")
|
2026-02-02 09:40:02 +08:00
|
|
|
|
|
2026-02-09 17:47:35 +08:00
|
|
|
|
# ======================== 下载(预签名 URL) ========================
|
|
|
|
|
|
|
|
|
|
|
|
def get_presigned_url(self, object_key: str, expire: Optional[int] = None) -> str:
|
2026-03-02 13:33:28 +08:00
|
|
|
|
"""获取预签名下载 URL"""
|
|
|
|
|
|
if object_key.startswith("http"):
|
|
|
|
|
|
return object_key
|
2026-02-09 17:47:35 +08:00
|
|
|
|
|
2026-03-02 13:33:28 +08:00
|
|
|
|
if not self._enabled:
|
2026-02-09 17:47:35 +08:00
|
|
|
|
return object_key
|
|
|
|
|
|
|
|
|
|
|
|
expire = expire or settings.cos.presign_expire
|
|
|
|
|
|
try:
|
|
|
|
|
|
url = self._client.get_presigned_download_url(
|
|
|
|
|
|
Bucket=settings.cos.bucket,
|
|
|
|
|
|
Key=object_key,
|
|
|
|
|
|
Expired=expire,
|
|
|
|
|
|
)
|
|
|
|
|
|
return url
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
logger.error(f"生成预签名 URL 失败: {e}")
|
|
|
|
|
|
return object_key
|
|
|
|
|
|
|
|
|
|
|
|
def get_presigned_upload_url(self, object_key: str, expire: Optional[int] = None) -> str:
|
2026-03-02 13:33:28 +08:00
|
|
|
|
"""获取预签名上传 URL(供前端直传)"""
|
|
|
|
|
|
self._require_cos()
|
2026-02-09 17:47:35 +08:00
|
|
|
|
expire = expire or settings.cos.presign_expire
|
|
|
|
|
|
try:
|
|
|
|
|
|
url = self._client.get_presigned_url(
|
|
|
|
|
|
Method="PUT",
|
|
|
|
|
|
Bucket=settings.cos.bucket,
|
|
|
|
|
|
Key=object_key,
|
|
|
|
|
|
Expired=expire,
|
|
|
|
|
|
)
|
|
|
|
|
|
return url
|
|
|
|
|
|
except Exception as e:
|
2026-03-02 13:33:28 +08:00
|
|
|
|
raise CosStorageError(f"生成预签名上传 URL 失败: {e}")
|
2026-02-09 17:47:35 +08:00
|
|
|
|
|
|
|
|
|
|
# ======================== 删除 ========================
|
|
|
|
|
|
|
|
|
|
|
|
def delete_object(self, object_key: str) -> bool:
|
2026-03-02 13:33:28 +08:00
|
|
|
|
"""删除 COS 对象"""
|
|
|
|
|
|
if not self._enabled:
|
2026-02-09 17:47:35 +08:00
|
|
|
|
return False
|
|
|
|
|
|
try:
|
|
|
|
|
|
self._client.delete_object(
|
|
|
|
|
|
Bucket=settings.cos.bucket,
|
|
|
|
|
|
Key=object_key,
|
|
|
|
|
|
)
|
|
|
|
|
|
logger.info(f"COS 对象已删除: {object_key}")
|
|
|
|
|
|
return True
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
logger.error(f"COS 删除失败: {e}")
|
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
|
|
# ======================== 兼容旧接口 ========================
|
|
|
|
|
|
|
|
|
|
|
|
def get_url(self, path: str) -> str:
|
2026-03-02 14:21:11 +08:00
|
|
|
|
"""获取访问 URL(兼容旧代码调用)
|
|
|
|
|
|
|
|
|
|
|
|
保证返回值要么是 http(s):// 开头的可访问 URL,要么是空字符串。
|
|
|
|
|
|
绝不返回裸 object_key,以免前端加载为相对路径导致损坏图片。
|
|
|
|
|
|
"""
|
2026-02-09 17:47:35 +08:00
|
|
|
|
if not path:
|
|
|
|
|
|
return ""
|
2026-02-02 09:40:02 +08:00
|
|
|
|
if path.startswith("http"):
|
|
|
|
|
|
return path
|
2026-03-02 14:21:11 +08:00
|
|
|
|
url = self.get_presigned_url(path)
|
|
|
|
|
|
# 兜底:如果未能生成有效 URL(COS 未启用/异常),返回空字符串
|
|
|
|
|
|
if not url.startswith("http"):
|
|
|
|
|
|
logger.warning(f"无法生成 COS 预签名 URL,返回空: object_key={path}")
|
|
|
|
|
|
return ""
|
|
|
|
|
|
return url
|
2026-02-02 09:40:02 +08:00
|
|
|
|
|
|
|
|
|
|
|
2026-02-09 17:47:35 +08:00
|
|
|
|
# 全局单例
|
|
|
|
|
|
_cos_storage: Optional[COSStorage] = None
|
2026-02-02 09:40:02 +08:00
|
|
|
|
|
|
|
|
|
|
|
2026-02-09 17:47:35 +08:00
|
|
|
|
def get_oss_storage() -> COSStorage:
|
2026-03-02 13:33:28 +08:00
|
|
|
|
"""获取存储服务单例"""
|
2026-02-09 17:47:35 +08:00
|
|
|
|
global _cos_storage
|
|
|
|
|
|
if _cos_storage is None:
|
|
|
|
|
|
_cos_storage = COSStorage()
|
|
|
|
|
|
return _cos_storage
|