Files
iot-device-management-service/app/services/oss_storage.py

80 lines
2.5 KiB
Python
Raw Normal View History

import uuid
from datetime import datetime, timezone
from typing import Optional
import oss2
from app.config import settings
from app.utils.logger import logger
class OSSStorage:
def __init__(self):
self.bucket = None
self._init_bucket()
def _init_bucket(self):
if not settings.oss.access_key_id or not settings.oss.bucket_name:
logger.warning("OSS配置不完整将使用本地存储")
return
try:
auth = oss2.Auth(settings.oss.access_key_id, settings.oss.access_key_secret)
self.bucket = oss2.Bucket(auth, settings.oss.endpoint, settings.oss.bucket_name)
logger.info(f"OSS连接成功: {settings.oss.bucket_name}")
except Exception as e:
logger.error(f"OSS连接失败: {e}")
self.bucket = None
def upload_image(self, image_data: bytes, filename: Optional[str] = None) -> str:
if not self.bucket:
return self._upload_local(image_data, filename)
if filename is None:
timestamp = datetime.now().strftime("%Y%m%d%H%M%S")
unique_id = uuid.uuid4().hex[:8]
ext = ".jpg"
filename = f"alerts/{timestamp}_{unique_id}{ext}"
try:
self.bucket.put_object(filename, image_data)
url = f"{settings.oss.url_prefix}/{filename}"
logger.info(f"图片上传OSS成功: {url}")
return url
except Exception as e:
logger.error(f"图片上传OSS失败: {e}")
return self._upload_local(image_data, filename)
def _upload_local(self, image_data: bytes, filename: Optional[str] = None) -> str:
upload_dir = Path("uploads")
upload_dir.mkdir(exist_ok=True)
if filename is None:
timestamp = datetime.now().strftime("%Y%m%d%H%M%S")
unique_id = uuid.uuid4().hex[:8]
filename = f"alerts/{timestamp}_{unique_id}.jpg"
file_path = upload_dir / filename
file_path.parent.mkdir(parents=True, exist_ok=True)
with open(file_path, "wb") as f:
f.write(image_data)
local_url = f"/uploads/{filename}"
logger.info(f"图片保存本地: {local_url}")
return local_url
def get_url(self, path: str) -> str:
if path.startswith("http"):
return path
if self.bucket:
return f"{settings.oss.url_prefix}/{path}"
return f"/uploads/{path}" if not path.startswith("/") else path
oss_storage = OSSStorage()
def get_oss_storage() -> OSSStorage:
return oss_storage