""" 对象存储服务(腾讯云 COS) 支持两种认证模式: 1. CVM 角色认证:通过元数据服务自动获取临时凭证(推荐,Docker 需配置 iptables 转发) 2. 密钥认证:通过 COS_SECRET_ID / COS_SECRET_KEY 环境变量(备选) """ import os import time import uuid from datetime import datetime, timezone from typing import Optional from pathlib import Path from app.config import settings from app.utils.logger import logger # 按需导入 COS SDK _cos_client = None _cos_available = False try: from qcloud_cos import CosConfig, CosS3Client _cos_available = True except ImportError: _cos_available = False class CosStorageError(Exception): """COS 存储操作异常""" pass class CvmRoleCredential: """CVM 角色凭证提供者,从元数据服务自动获取和刷新临时凭证""" def __init__(self, role_name: str): self._role_name = role_name self._expired_time = 0 self._secret_id = "" self._secret_key = "" self._token = "" self._refresh() def _refresh(self): """从元数据服务获取临时凭证(提前5分钟刷新)""" if time.time() < self._expired_time - 300: return import requests url = f"http://metadata.tencentyun.com/latest/meta-data/cam/security-credentials/{self._role_name}" resp = requests.get(url, timeout=5) resp.raise_for_status() data = resp.json() if data.get("Code") != "Success": raise CosStorageError(f"获取 CVM 角色凭证失败: {data}") self._secret_id = data["TmpSecretId"] self._secret_key = data["TmpSecretKey"] self._token = data["Token"] self._expired_time = data["ExpiredTime"] logger.info(f"CVM 角色凭证已刷新,过期时间: {datetime.fromtimestamp(self._expired_time)}") @property def secret_id(self): self._refresh() return self._secret_id @property def secret_key(self): self._refresh() return self._secret_key @property def token(self): self._refresh() return self._token def _discover_cvm_role() -> Optional[str]: """自动发现 CVM 绑定的角色名""" import requests try: resp = requests.get( "http://metadata.tencentyun.com/latest/meta-data/cam/security-credentials/", timeout=3, ) if resp.status_code == 200 and resp.text.strip(): return resp.text.strip() except Exception: pass return None def _get_cos_client(): """懒加载 COS 客户端单例""" global _cos_client if _cos_client is not None: return _cos_client if not _cos_available: raise CosStorageError("qcloud_cos 未安装,请运行: pip install cos-python-sdk-v5") cfg = settings.cos if not cfg.enabled: return None if not cfg.bucket: raise CosStorageError("COS_BUCKET 未配置") secret_id = os.getenv("COS_SECRET_ID", "") secret_key = os.getenv("COS_SECRET_KEY", "") try: if secret_id and secret_key: # 模式1:密钥认证 cos_config = CosConfig( Region=cfg.region, SecretId=secret_id, SecretKey=secret_key, Scheme="https", ) logger.info(f"COS 客户端初始化成功 (密钥模式): bucket={cfg.bucket}") else: # 模式2:CVM 角色认证(通过元数据服务获取临时凭证) role_name = _discover_cvm_role() if not role_name: raise CosStorageError( "COS 认证失败: 未设置 COS_SECRET_ID/COS_SECRET_KEY," "且无法访问 CVM 元数据服务。请检查 iptables 转发规则或配置密钥。" ) credential = CvmRoleCredential(role_name) cos_config = CosConfig( Region=cfg.region, CredentialInstance=credential, Scheme="https", ) logger.info(f"COS 客户端初始化成功 (CVM角色: {role_name}): bucket={cfg.bucket}") _cos_client = CosS3Client(cos_config) return _cos_client except CosStorageError: raise except Exception as e: raise CosStorageError(f"COS 客户端初始化失败: {e}") def _generate_object_key(prefix: str = "", ext: str = ".jpg") -> str: """ 生成对象存储 Key 格式: {prefix}/{YYYY}/{MM}/{DD}/{YYYYMMDDHHmmss}_{uuid8}{ext} """ now = datetime.now(timezone.utc) date_path = now.strftime("%Y/%m/%d") timestamp = now.strftime("%Y%m%d%H%M%S") unique_id = uuid.uuid4().hex[:8].upper() prefix = prefix.strip("/") if prefix else settings.cos.upload_prefix return f"{prefix}/{date_path}/{timestamp}_{unique_id}{ext}" class COSStorage: """ 对象存储接口(纯 COS 模式) COS_ENABLED=true: 使用腾讯云 COS,上传失败直接报错 COS_ENABLED=false: 未启用,调用上传方法时报错 """ def __init__(self): self._client = None self._enabled = False self._init() def _init(self): """初始化 COS 客户端""" if not settings.cos.enabled: logger.info("COS 未启用 (COS_ENABLED=false)") return self._client = _get_cos_client() if self._client: self._enabled = True @property def is_cos_mode(self) -> bool: return self._enabled and self._client is not None def _require_cos(self): """检查 COS 是否可用,不可用则抛出异常""" if not self._enabled or self._client is None: raise CosStorageError("COS 未启用或初始化失败,请检查 COS_ENABLED 和 CVM 角色绑定") # ======================== 上传 ======================== def upload_image(self, image_data: bytes, filename: Optional[str] = None) -> str: """上传图片到 COS,返回 object_key""" self._require_cos() object_key = filename or _generate_object_key(ext=".jpg") try: self._client.put_object( Bucket=settings.cos.bucket, Body=image_data, Key=object_key, ContentType="image/jpeg", ) logger.info(f"COS 上传成功: {object_key}") return object_key except Exception as e: raise CosStorageError(f"COS 图片上传失败: {e}") def upload_file(self, file_data: bytes, object_key: str, content_type: str = "application/octet-stream") -> str: """上传任意文件到 COS""" self._require_cos() try: self._client.put_object( Bucket=settings.cos.bucket, Body=file_data, Key=object_key, ContentType=content_type, ) logger.info(f"COS 文件上传成功: {object_key}") return object_key except Exception as e: raise CosStorageError(f"COS 文件上传失败: {e}") # ======================== 下载(预签名 URL) ======================== def get_presigned_url(self, object_key: str, expire: Optional[int] = None) -> str: """获取预签名下载 URL""" if object_key.startswith("http"): return object_key if not self._enabled: return object_key expire = expire or settings.cos.presign_expire try: url = self._client.get_presigned_download_url( Bucket=settings.cos.bucket, Key=object_key, Expired=expire, ) return url except Exception as e: logger.error(f"生成预签名 URL 失败: {e}") return object_key def get_presigned_upload_url(self, object_key: str, expire: Optional[int] = None) -> str: """获取预签名上传 URL(供前端直传)""" self._require_cos() expire = expire or settings.cos.presign_expire try: url = self._client.get_presigned_url( Method="PUT", Bucket=settings.cos.bucket, Key=object_key, Expired=expire, ) return url except Exception as e: raise CosStorageError(f"生成预签名上传 URL 失败: {e}") # ======================== 删除 ======================== def delete_object(self, object_key: str) -> bool: """删除 COS 对象""" if not self._enabled: return False try: self._client.delete_object( Bucket=settings.cos.bucket, Key=object_key, ) logger.info(f"COS 对象已删除: {object_key}") return True except Exception as e: logger.error(f"COS 删除失败: {e}") return False # ======================== 兼容旧接口 ======================== def get_url(self, path: str) -> str: """获取访问 URL(兼容旧代码调用)""" if not path: return "" if path.startswith("http"): return path return self.get_presigned_url(path) # 全局单例 _cos_storage: Optional[COSStorage] = None def get_oss_storage() -> COSStorage: """获取存储服务单例""" global _cos_storage if _cos_storage is None: _cos_storage = COSStorage() return _cos_storage