""" 对象存储服务(腾讯云 COS + 本地回退) 功能: - COS_ENABLED=true 时使用腾讯云 COS 存储 - COS_ENABLED=false 时回退到本地 uploads/ 目录 - 后端上传:服务端直接将图片/文件写入 COS - 预签名下载:生成带鉴权的临时下载 URL - STS 临时凭证:签发前端直传用的临时 AK/SK/Token """ import uuid from datetime import datetime, timezone from typing import Optional, Dict from pathlib import Path from app.config import settings from app.utils.logger import logger # 按需导入 COS SDK,未安装时不影响本地模式 _cos_client = None _cos_available = False try: from qcloud_cos import CosConfig, CosS3Client _cos_available = True except ImportError: _cos_available = False def _get_cos_client(): """懒加载 COS 客户端单例""" global _cos_client if _cos_client is not None: return _cos_client if not _cos_available: logger.warning("qcloud_cos 未安装,使用本地存储模式") return None cfg = settings.cos if not cfg.enabled or not cfg.secret_id or not cfg.bucket: return None try: cos_config = CosConfig( Region=cfg.region, SecretId=cfg.secret_id, SecretKey=cfg.secret_key, Scheme="https", ) _cos_client = CosS3Client(cos_config) logger.info(f"COS 客户端初始化成功: bucket={cfg.bucket}, region={cfg.region}") return _cos_client except Exception as e: logger.error(f"COS 客户端初始化失败: {e}") return None def _generate_object_key(prefix: str = "", ext: str = ".jpg") -> str: """ 生成对象存储 Key 格式: {prefix}/{YYYY}/{MM}/{DD}/{YYYYMMDDHHmmss}_{uuid8}{ext} 示例: alerts/2026/02/09/20260209153000_A1B2C3D4.jpg """ now = datetime.now(timezone.utc) date_path = now.strftime("%Y/%m/%d") timestamp = now.strftime("%Y%m%d%H%M%S") unique_id = uuid.uuid4().hex[:8].upper() prefix = prefix.strip("/") if prefix else settings.cos.upload_prefix return f"{prefix}/{date_path}/{timestamp}_{unique_id}{ext}" class COSStorage: """ 对象存储统一接口 - COS 模式:调用腾讯云 COS SDK - 本地模式:写入 uploads/ 目录,返回相对路径 """ def __init__(self): self._client = None self._use_local = True self._init() def _init(self): """初始化存储后端""" if settings.cos.enabled: client = _get_cos_client() if client: self._client = client self._use_local = False return logger.info("使用本地文件存储模式") self._use_local = True @property def is_cos_mode(self) -> bool: return not self._use_local and self._client is not None # ======================== 上传 ======================== def upload_image(self, image_data: bytes, filename: Optional[str] = None) -> str: """ 上传图片,返回 object_key(COS 模式)或本地路径(本地模式) 数据库中存储此返回值,下载时通过 get_presigned_url() 获取临时访问地址。 """ if self._use_local: return self._upload_local(image_data, filename) return self._upload_cos(image_data, filename) def _upload_cos(self, image_data: bytes, filename: Optional[str] = None) -> str: """上传到 COS""" object_key = filename or _generate_object_key(ext=".jpg") try: self._client.put_object( Bucket=settings.cos.bucket, Body=image_data, Key=object_key, ContentType="image/jpeg", ) logger.info(f"COS 上传成功: {object_key}") return object_key except Exception as e: logger.error(f"COS 上传失败,回退本地: {e}") return self._upload_local(image_data, filename) def upload_file(self, file_data: bytes, object_key: str, content_type: str = "application/octet-stream") -> str: """上传任意文件到 COS""" if self._use_local: return self._upload_local(file_data, object_key) try: self._client.put_object( Bucket=settings.cos.bucket, Body=file_data, Key=object_key, ContentType=content_type, ) logger.info(f"COS 文件上传成功: {object_key}") return object_key except Exception as e: logger.error(f"COS 文件上传失败: {e}") return self._upload_local(file_data, object_key) def _upload_local(self, data: bytes, filename: Optional[str] = None) -> str: """本地存储回退""" upload_dir = Path("uploads") if filename is None: filename = _generate_object_key(ext=".jpg") file_path = upload_dir / filename file_path.parent.mkdir(parents=True, exist_ok=True) with open(file_path, "wb") as f: f.write(data) local_url = f"/uploads/{filename}" logger.info(f"本地保存: {local_url}") return local_url # ======================== 下载(预签名 URL) ======================== def get_presigned_url(self, object_key: str, expire: Optional[int] = None) -> str: """ 获取预签名下载 URL - COS 模式:生成带签名的临时 URL,过期后失效 - 本地模式:直接返回本地路径 """ if self._use_local or object_key.startswith("/uploads/") or object_key.startswith("http"): return object_key expire = expire or settings.cos.presign_expire try: url = self._client.get_presigned_download_url( Bucket=settings.cos.bucket, Key=object_key, Expired=expire, ) return url except Exception as e: logger.error(f"生成预签名 URL 失败: {e}") return object_key def get_presigned_upload_url(self, object_key: str, expire: Optional[int] = None) -> str: """ 获取预签名上传 URL(供前端直传) 前端拿到此 URL 后,直接 PUT 文件即可,无需经过后端中转。 """ if self._use_local: return "" expire = expire or settings.cos.presign_expire try: url = self._client.get_presigned_url( Method="PUT", Bucket=settings.cos.bucket, Key=object_key, Expired=expire, ) return url except Exception as e: logger.error(f"生成预签名上传 URL 失败: {e}") return "" # ======================== STS 临时凭证 ======================== def get_sts_credential(self, allow_prefix: Optional[str] = None) -> Optional[Dict]: """ 获取 STS 临时凭证(供前端 SDK 直传) 需安装: pip install qcloud-python-sts 返回: { "credentials": {"tmpSecretId": ..., "tmpSecretKey": ..., "sessionToken": ...}, "expiredTime": ..., "startTime": ..., "bucket": ..., "region": ..., "allowPrefix": ... } """ if self._use_local: return None try: from sts.sts import Sts cfg = settings.cos prefix = allow_prefix or f"{cfg.upload_prefix}/*" # 提取 appid from bucket (格式: name-appid) parts = cfg.bucket.rsplit("-", 1) appid = parts[1] if len(parts) == 2 else "" sts_config = { "duration_seconds": cfg.sts_expire, "secret_id": cfg.secret_id, "secret_key": cfg.secret_key, "bucket": cfg.bucket, "region": cfg.region, "allow_prefix": prefix, "allow_actions": [ "name/cos:PutObject", "name/cos:PostObject", "name/cos:InitiateMultipartUpload", "name/cos:ListMultipartUploads", "name/cos:ListParts", "name/cos:UploadPart", "name/cos:CompleteMultipartUpload", ], "policy": { "version": "2.0", "statement": [ { "action": [ "name/cos:PutObject", "name/cos:PostObject", "name/cos:InitiateMultipartUpload", "name/cos:ListMultipartUploads", "name/cos:ListParts", "name/cos:UploadPart", "name/cos:CompleteMultipartUpload", ], "effect": "allow", "resource": [ f"qcs::cos:{cfg.region}:uid/{appid}:{cfg.bucket}/{prefix}", ], } ], }, } sts = Sts(sts_config) response = sts.get_credential() result = { "credentials": response["credentials"], "expiredTime": response["expiredTime"], "startTime": response.get("startTime"), "bucket": cfg.bucket, "region": cfg.region, "allowPrefix": prefix, } logger.info(f"STS 凭证签发成功, prefix={prefix}") return result except ImportError: logger.error("qcloud-python-sts 未安装,无法签发 STS 凭证。请运行: pip install qcloud-python-sts") return None except Exception as e: logger.error(f"STS 凭证签发失败: {e}") return None # ======================== 删除 ======================== def delete_object(self, object_key: str) -> bool: """删除对象""" if self._use_local: local_path = Path(object_key.lstrip("/")) if local_path.exists(): local_path.unlink() return True return False try: self._client.delete_object( Bucket=settings.cos.bucket, Key=object_key, ) logger.info(f"COS 对象已删除: {object_key}") return True except Exception as e: logger.error(f"COS 删除失败: {e}") return False # ======================== 兼容旧接口 ======================== def get_url(self, path: str) -> str: """获取访问 URL(兼容旧代码调用)""" if not path: return "" if path.startswith("http"): return path if path.startswith("/uploads/"): return path # COS object_key → 预签名 URL return self.get_presigned_url(path) # 全局单例 _cos_storage: Optional[COSStorage] = None def get_oss_storage() -> COSStorage: """获取存储服务单例(保持旧函数名兼容)""" global _cos_storage if _cos_storage is None: _cos_storage = COSStorage() return _cos_storage