Files
iot-device-management-service/app/services/oss_storage.py

347 lines
11 KiB
Python
Raw Normal View History

"""
对象存储服务腾讯云 COS + 本地回退
功能
- COS_ENABLED=true 时使用腾讯云 COS 存储
- COS_ENABLED=false 时回退到本地 uploads/ 目录
- 后端上传服务端直接将图片/文件写入 COS
- 预签名下载生成带鉴权的临时下载 URL
- STS 临时凭证签发前端直传用的临时 AK/SK/Token
"""
import uuid
from datetime import datetime, timezone
from typing import Optional, Dict
from pathlib import Path
from app.config import settings
from app.utils.logger import logger
# 按需导入 COS SDK未安装时不影响本地模式
_cos_client = None
_cos_available = False
try:
from qcloud_cos import CosConfig, CosS3Client
_cos_available = True
except ImportError:
_cos_available = False
def _get_cos_client():
"""懒加载 COS 客户端单例"""
global _cos_client
if _cos_client is not None:
return _cos_client
if not _cos_available:
logger.warning("qcloud_cos 未安装,使用本地存储模式")
return None
cfg = settings.cos
if not cfg.enabled or not cfg.secret_id or not cfg.bucket:
return None
try:
cos_config = CosConfig(
Region=cfg.region,
SecretId=cfg.secret_id,
SecretKey=cfg.secret_key,
Scheme="https",
)
_cos_client = CosS3Client(cos_config)
logger.info(f"COS 客户端初始化成功: bucket={cfg.bucket}, region={cfg.region}")
return _cos_client
except Exception as e:
logger.error(f"COS 客户端初始化失败: {e}")
return None
def _generate_object_key(prefix: str = "", ext: str = ".jpg") -> str:
"""
生成对象存储 Key
格式: {prefix}/{YYYY}/{MM}/{DD}/{YYYYMMDDHHmmss}_{uuid8}{ext}
示例: alerts/2026/02/09/20260209153000_A1B2C3D4.jpg
"""
now = datetime.now(timezone.utc)
date_path = now.strftime("%Y/%m/%d")
timestamp = now.strftime("%Y%m%d%H%M%S")
unique_id = uuid.uuid4().hex[:8].upper()
prefix = prefix.strip("/") if prefix else settings.cos.upload_prefix
return f"{prefix}/{date_path}/{timestamp}_{unique_id}{ext}"
class COSStorage:
"""
对象存储统一接口
- COS 模式调用腾讯云 COS SDK
- 本地模式写入 uploads/ 目录返回相对路径
"""
def __init__(self):
self._client = None
self._use_local = True
self._init()
def _init(self):
"""初始化存储后端"""
if settings.cos.enabled:
client = _get_cos_client()
if client:
self._client = client
self._use_local = False
return
logger.info("使用本地文件存储模式")
self._use_local = True
@property
def is_cos_mode(self) -> bool:
return not self._use_local and self._client is not None
# ======================== 上传 ========================
def upload_image(self, image_data: bytes, filename: Optional[str] = None) -> str:
"""
上传图片返回 object_keyCOS 模式或本地路径本地模式
数据库中存储此返回值下载时通过 get_presigned_url() 获取临时访问地址
"""
if self._use_local:
return self._upload_local(image_data, filename)
return self._upload_cos(image_data, filename)
def _upload_cos(self, image_data: bytes, filename: Optional[str] = None) -> str:
"""上传到 COS"""
object_key = filename or _generate_object_key(ext=".jpg")
try:
self._client.put_object(
Bucket=settings.cos.bucket,
Body=image_data,
Key=object_key,
ContentType="image/jpeg",
)
logger.info(f"COS 上传成功: {object_key}")
return object_key
except Exception as e:
logger.error(f"COS 上传失败,回退本地: {e}")
return self._upload_local(image_data, filename)
def upload_file(self, file_data: bytes, object_key: str, content_type: str = "application/octet-stream") -> str:
"""上传任意文件到 COS"""
if self._use_local:
return self._upload_local(file_data, object_key)
try:
self._client.put_object(
Bucket=settings.cos.bucket,
Body=file_data,
Key=object_key,
ContentType=content_type,
)
logger.info(f"COS 文件上传成功: {object_key}")
return object_key
except Exception as e:
logger.error(f"COS 文件上传失败: {e}")
return self._upload_local(file_data, object_key)
def _upload_local(self, data: bytes, filename: Optional[str] = None) -> str:
"""本地存储回退"""
upload_dir = Path("uploads")
if filename is None:
filename = _generate_object_key(ext=".jpg")
file_path = upload_dir / filename
file_path.parent.mkdir(parents=True, exist_ok=True)
with open(file_path, "wb") as f:
f.write(data)
local_url = f"/uploads/{filename}"
logger.info(f"本地保存: {local_url}")
return local_url
# ======================== 下载(预签名 URL ========================
def get_presigned_url(self, object_key: str, expire: Optional[int] = None) -> str:
"""
获取预签名下载 URL
- COS 模式生成带签名的临时 URL过期后失效
- 本地模式直接返回本地路径
"""
if self._use_local or object_key.startswith("/uploads/") or object_key.startswith("http"):
return object_key
expire = expire or settings.cos.presign_expire
try:
url = self._client.get_presigned_download_url(
Bucket=settings.cos.bucket,
Key=object_key,
Expired=expire,
)
return url
except Exception as e:
logger.error(f"生成预签名 URL 失败: {e}")
return object_key
def get_presigned_upload_url(self, object_key: str, expire: Optional[int] = None) -> str:
"""
获取预签名上传 URL供前端直传
前端拿到此 URL 直接 PUT 文件即可无需经过后端中转
"""
if self._use_local:
return ""
expire = expire or settings.cos.presign_expire
try:
url = self._client.get_presigned_url(
Method="PUT",
Bucket=settings.cos.bucket,
Key=object_key,
Expired=expire,
)
return url
except Exception as e:
logger.error(f"生成预签名上传 URL 失败: {e}")
return ""
# ======================== STS 临时凭证 ========================
def get_sts_credential(self, allow_prefix: Optional[str] = None) -> Optional[Dict]:
"""
获取 STS 临时凭证供前端 SDK 直传
需安装: pip install qcloud-python-sts
返回:
{
"credentials": {"tmpSecretId": ..., "tmpSecretKey": ..., "sessionToken": ...},
"expiredTime": ...,
"startTime": ...,
"bucket": ...,
"region": ...,
"allowPrefix": ...
}
"""
if self._use_local:
return None
try:
from sts.sts import Sts
cfg = settings.cos
prefix = allow_prefix or f"{cfg.upload_prefix}/*"
# 提取 appid from bucket (格式: name-appid)
parts = cfg.bucket.rsplit("-", 1)
appid = parts[1] if len(parts) == 2 else ""
sts_config = {
"duration_seconds": cfg.sts_expire,
"secret_id": cfg.secret_id,
"secret_key": cfg.secret_key,
"bucket": cfg.bucket,
"region": cfg.region,
"allow_prefix": prefix,
"allow_actions": [
"name/cos:PutObject",
"name/cos:PostObject",
"name/cos:InitiateMultipartUpload",
"name/cos:ListMultipartUploads",
"name/cos:ListParts",
"name/cos:UploadPart",
"name/cos:CompleteMultipartUpload",
],
"policy": {
"version": "2.0",
"statement": [
{
"action": [
"name/cos:PutObject",
"name/cos:PostObject",
"name/cos:InitiateMultipartUpload",
"name/cos:ListMultipartUploads",
"name/cos:ListParts",
"name/cos:UploadPart",
"name/cos:CompleteMultipartUpload",
],
"effect": "allow",
"resource": [
f"qcs::cos:{cfg.region}:uid/{appid}:{cfg.bucket}/{prefix}",
],
}
],
},
}
sts = Sts(sts_config)
response = sts.get_credential()
result = {
"credentials": response["credentials"],
"expiredTime": response["expiredTime"],
"startTime": response.get("startTime"),
"bucket": cfg.bucket,
"region": cfg.region,
"allowPrefix": prefix,
}
logger.info(f"STS 凭证签发成功, prefix={prefix}")
return result
except ImportError:
logger.error("qcloud-python-sts 未安装,无法签发 STS 凭证。请运行: pip install qcloud-python-sts")
return None
except Exception as e:
logger.error(f"STS 凭证签发失败: {e}")
return None
# ======================== 删除 ========================
def delete_object(self, object_key: str) -> bool:
"""删除对象"""
if self._use_local:
local_path = Path(object_key.lstrip("/"))
if local_path.exists():
local_path.unlink()
return True
return False
try:
self._client.delete_object(
Bucket=settings.cos.bucket,
Key=object_key,
)
logger.info(f"COS 对象已删除: {object_key}")
return True
except Exception as e:
logger.error(f"COS 删除失败: {e}")
return False
# ======================== 兼容旧接口 ========================
def get_url(self, path: str) -> str:
"""获取访问 URL兼容旧代码调用"""
if not path:
return ""
if path.startswith("http"):
return path
if path.startswith("/uploads/"):
return path
# COS object_key → 预签名 URL
return self.get_presigned_url(path)
# 全局单例
_cos_storage: Optional[COSStorage] = None
def get_oss_storage() -> COSStorage:
"""获取存储服务单例(保持旧函数名兼容)"""
global _cos_storage
if _cos_storage is None:
_cos_storage = COSStorage()
return _cos_storage