Files
iot-device-management-service/app/services/oss_storage.py

312 lines
10 KiB
Python
Raw Normal View History

"""
对象存储服务腾讯云 COS
支持两种认证模式
1. CVM 角色认证通过元数据服务自动获取临时凭证推荐Docker 需配置 iptables 转发
2. 密钥认证通过 COS_SECRET_ID / COS_SECRET_KEY 环境变量备选
"""
import os
import time
import uuid
from datetime import datetime, timezone
from typing import Optional
from pathlib import Path
from app.config import settings
from app.utils.logger import logger
# 按需导入 COS SDK
_cos_client = None
_cos_available = False
try:
from qcloud_cos import CosConfig, CosS3Client
_cos_available = True
except ImportError:
_cos_available = False
class CosStorageError(Exception):
"""COS 存储操作异常"""
pass
class CvmRoleCredential:
"""CVM 角色凭证提供者,从元数据服务自动获取和刷新临时凭证"""
def __init__(self, role_name: str):
self._role_name = role_name
self._expired_time = 0
self._secret_id = ""
self._secret_key = ""
self._token = ""
self._refresh()
def _refresh(self):
"""从元数据服务获取临时凭证提前5分钟刷新"""
if time.time() < self._expired_time - 300:
return
import requests
url = f"http://metadata.tencentyun.com/latest/meta-data/cam/security-credentials/{self._role_name}"
resp = requests.get(url, timeout=5)
resp.raise_for_status()
data = resp.json()
if data.get("Code") != "Success":
raise CosStorageError(f"获取 CVM 角色凭证失败: {data}")
self._secret_id = data["TmpSecretId"]
self._secret_key = data["TmpSecretKey"]
self._token = data["Token"]
self._expired_time = data["ExpiredTime"]
logger.info(f"CVM 角色凭证已刷新,过期时间: {datetime.fromtimestamp(self._expired_time)}")
@property
def secret_id(self):
self._refresh()
return self._secret_id
@property
def secret_key(self):
self._refresh()
return self._secret_key
@property
def token(self):
self._refresh()
return self._token
def _discover_cvm_role() -> Optional[str]:
"""自动发现 CVM 绑定的角色名"""
import requests
try:
resp = requests.get(
"http://metadata.tencentyun.com/latest/meta-data/cam/security-credentials/",
timeout=3,
)
if resp.status_code == 200 and resp.text.strip():
return resp.text.strip()
except Exception:
pass
return None
def _get_cos_client():
"""懒加载 COS 客户端单例"""
global _cos_client
if _cos_client is not None:
return _cos_client
if not _cos_available:
raise CosStorageError("qcloud_cos 未安装,请运行: pip install cos-python-sdk-v5")
cfg = settings.cos
if not cfg.enabled:
return None
if not cfg.bucket:
raise CosStorageError("COS_BUCKET 未配置")
secret_id = os.getenv("COS_SECRET_ID", "")
secret_key = os.getenv("COS_SECRET_KEY", "")
try:
if secret_id and secret_key:
# 模式1密钥认证
cos_config = CosConfig(
Region=cfg.region,
SecretId=secret_id,
SecretKey=secret_key,
Scheme="https",
)
logger.info(f"COS 客户端初始化成功 (密钥模式): bucket={cfg.bucket}")
else:
# 模式2CVM 角色认证(通过元数据服务获取临时凭证)
role_name = _discover_cvm_role()
if not role_name:
raise CosStorageError(
"COS 认证失败: 未设置 COS_SECRET_ID/COS_SECRET_KEY"
"且无法访问 CVM 元数据服务。请检查 iptables 转发规则或配置密钥。"
)
credential = CvmRoleCredential(role_name)
cos_config = CosConfig(
Region=cfg.region,
CredentialInstance=credential,
Scheme="https",
)
logger.info(f"COS 客户端初始化成功 (CVM角色: {role_name}): bucket={cfg.bucket}")
_cos_client = CosS3Client(cos_config)
return _cos_client
except CosStorageError:
raise
except Exception as e:
raise CosStorageError(f"COS 客户端初始化失败: {e}")
def _generate_object_key(prefix: str = "", ext: str = ".jpg") -> str:
"""
生成对象存储 Key
格式: {prefix}/{YYYY}/{MM}/{DD}/{YYYYMMDDHHmmss}_{uuid8}{ext}
"""
now = datetime.now(timezone.utc)
date_path = now.strftime("%Y/%m/%d")
timestamp = now.strftime("%Y%m%d%H%M%S")
unique_id = uuid.uuid4().hex[:8].upper()
prefix = prefix.strip("/") if prefix else settings.cos.upload_prefix
return f"{prefix}/{date_path}/{timestamp}_{unique_id}{ext}"
class COSStorage:
"""
对象存储接口 COS 模式
COS_ENABLED=true: 使用腾讯云 COS上传失败直接报错
COS_ENABLED=false: 未启用调用上传方法时报错
"""
def __init__(self):
self._client = None
self._enabled = False
self._init()
def _init(self):
"""初始化 COS 客户端"""
if not settings.cos.enabled:
logger.info("COS 未启用 (COS_ENABLED=false)")
return
try:
self._client = _get_cos_client()
if self._client:
self._enabled = True
except Exception as e:
logger.error(f"COS 初始化失败,截图预签名 URL 将不可用: {e}")
self._enabled = False
@property
def is_cos_mode(self) -> bool:
return self._enabled and self._client is not None
def _require_cos(self):
"""检查 COS 是否可用,不可用则抛出异常"""
if not self._enabled or self._client is None:
raise CosStorageError("COS 未启用或初始化失败,请检查 COS_ENABLED 和 CVM 角色绑定")
# ======================== 上传 ========================
def upload_image(self, image_data: bytes, filename: Optional[str] = None) -> str:
"""上传图片到 COS返回 object_key"""
self._require_cos()
object_key = filename or _generate_object_key(ext=".jpg")
try:
self._client.put_object(
Bucket=settings.cos.bucket,
Body=image_data,
Key=object_key,
ContentType="image/jpeg",
)
logger.info(f"COS 上传成功: {object_key}")
return object_key
except Exception as e:
raise CosStorageError(f"COS 图片上传失败: {e}")
def upload_file(self, file_data: bytes, object_key: str, content_type: str = "application/octet-stream") -> str:
"""上传任意文件到 COS"""
self._require_cos()
try:
self._client.put_object(
Bucket=settings.cos.bucket,
Body=file_data,
Key=object_key,
ContentType=content_type,
)
logger.info(f"COS 文件上传成功: {object_key}")
return object_key
except Exception as e:
raise CosStorageError(f"COS 文件上传失败: {e}")
# ======================== 下载(预签名 URL ========================
def get_presigned_url(self, object_key: str, expire: Optional[int] = None) -> str:
"""获取预签名下载 URL"""
if object_key.startswith("http"):
return object_key
if not self._enabled:
return object_key
expire = expire or settings.cos.presign_expire
try:
url = self._client.get_presigned_download_url(
Bucket=settings.cos.bucket,
Key=object_key,
Expired=expire,
)
return url
except Exception as e:
logger.error(f"生成预签名 URL 失败: {e}")
return object_key
def get_presigned_upload_url(self, object_key: str, expire: Optional[int] = None) -> str:
"""获取预签名上传 URL供前端直传"""
self._require_cos()
expire = expire or settings.cos.presign_expire
try:
url = self._client.get_presigned_url(
Method="PUT",
Bucket=settings.cos.bucket,
Key=object_key,
Expired=expire,
)
return url
except Exception as e:
raise CosStorageError(f"生成预签名上传 URL 失败: {e}")
# ======================== 删除 ========================
def delete_object(self, object_key: str) -> bool:
"""删除 COS 对象"""
if not self._enabled:
return False
try:
self._client.delete_object(
Bucket=settings.cos.bucket,
Key=object_key,
)
logger.info(f"COS 对象已删除: {object_key}")
return True
except Exception as e:
logger.error(f"COS 删除失败: {e}")
return False
# ======================== 兼容旧接口 ========================
def get_url(self, path: str) -> str:
"""获取访问 URL兼容旧代码调用
保证返回值要么是 http(s):// 开头的可访问 URL要么是空字符串
绝不返回裸 object_key以免前端加载为相对路径导致损坏图片
"""
if not path:
return ""
if path.startswith("http"):
return path
url = self.get_presigned_url(path)
# 兜底:如果未能生成有效 URLCOS 未启用/异常),返回空字符串
if not url.startswith("http"):
logger.warning(f"无法生成 COS 预签名 URL返回空: object_key={path}")
return ""
return url
# 全局单例
_cos_storage: Optional[COSStorage] = None
def get_oss_storage() -> COSStorage:
"""获取存储服务单例"""
global _cos_storage
if _cos_storage is None:
_cos_storage = COSStorage()
return _cos_storage