Files
security-ai-edge/core/storage_manager.py

282 lines
8.7 KiB
Python
Raw Normal View History

"""
存储管理器模块
负责图片保存异步写入和断网容灾
特性
- 异步保存抓拍图片
- 断网本地缓存
- 批量同步到云端
"""
import os
import cv2
import uuid
import logging
import threading
import queue
import time
from datetime import datetime
from pathlib import Path
from typing import Optional, Dict, Any, List
from dataclasses import dataclass, field
logger = logging.getLogger(__name__)
@dataclass
class CaptureConfig:
"""抓拍配置"""
image_dir: str = "./data/captures"
quality: int = 85
max_width: int = 1920
max_height: int = 1080
save_format: str = ".jpg"
@dataclass
class PendingCapture:
"""待保存的抓拍"""
image: Any
camera_id: str
alert_id: str
timestamp: datetime = field(default_factory=datetime.now)
class ImageStorageManager:
"""图片存储管理器"""
_instance = None
_lock = threading.Lock()
def __new__(cls, config: Optional[CaptureConfig] = None):
if cls._instance is None:
with cls._lock:
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._initialized = False
return cls._instance
def __init__(self, config: Optional[CaptureConfig] = None):
if self._initialized:
return
if config is None:
config = CaptureConfig()
self.config = config
self._save_queue: queue.Queue = queue.Queue()
self._running = False
self._save_thread: Optional[threading.Thread] = None
self._saved_count = 0
self._failed_count = 0
self._init_directories()
self._start_save_thread()
self._initialized = True
logger.info(f"图片存储管理器初始化: {config.image_dir}")
def _init_directories(self):
"""初始化目录"""
Path(self.config.image_dir).mkdir(parents=True, exist_ok=True)
def _start_save_thread(self):
"""启动保存线程"""
self._running = True
self._save_thread = threading.Thread(
target=self._save_worker,
name="ImageSave",
daemon=True
)
self._save_thread.start()
def _save_worker(self):
"""图片保存工作线程"""
while self._running:
try:
try:
capture: PendingCapture = self._save_queue.get(timeout=1.0)
self._save_image(capture)
except queue.Empty:
pass
except Exception as e:
logger.error(f"图片保存异常: {e}")
def _save_image(self, capture: PendingCapture) -> Optional[str]:
"""保存单张图片"""
try:
image = capture.image
if image is None:
self._failed_count += 1
return None
if len(image.shape) == 3 and image.shape[2] == 3:
image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
if image.shape[1] > self.config.max_width or image.shape[0] > self.config.max_height:
scale = min(
self.config.max_width / image.shape[1],
self.config.max_height / image.shape[0]
)
new_size = (
int(image.shape[1] * scale),
int(image.shape[0] * scale)
)
image = cv2.resize(image, new_size, interpolation=cv2.INTER_AREA)
date_dir = capture.timestamp.strftime("%Y%m%d")
save_dir = Path(self.config.image_dir) / date_dir
save_dir.mkdir(parents=True, exist_ok=True)
filename = f"{capture.camera_id}_{capture.alert_id}{self.config.save_format}"
filepath = save_dir / filename
success = cv2.imwrite(
str(filepath),
image,
[cv2.IMWRITE_JPEG_QUALITY, self.config.quality]
)
if success:
self._saved_count += 1
logger.debug(f"图片已保存: {filepath}")
return str(filepath)
else:
self._failed_count += 1
return None
except Exception as e:
logger.error(f"保存图片失败: {e}")
self._failed_count += 1
return None
def save_capture(
self,
image: Any,
camera_id: str,
alert_id: str,
timestamp: Optional[datetime] = None
) -> Optional[str]:
"""异步保存抓拍图片"""
capture = PendingCapture(
image=image,
camera_id=camera_id,
alert_id=alert_id,
timestamp=timestamp or datetime.now()
)
self._save_queue.put(capture)
return f"<queued: {alert_id}>"
def get_image_path(self, camera_id: str, alert_id: str) -> Optional[str]:
"""获取已保存图片路径"""
date_str = datetime.now().strftime("%Y%m%d")
filename = f"{camera_id}_{alert_id}{self.config.save_format}"
filepath = Path(self.config.image_dir) / date_str / filename
if filepath.exists():
return str(filepath)
return None
def cleanup_old_images(self, days: int = 7):
"""清理过期图片"""
cutoff = datetime.now().timestamp() - (days * 24 * 60 * 60)
cleaned = 0
try:
for filepath in Path(self.config.image_dir).rglob(f"*{self.config.save_format}"):
if filepath.stat().st_mtime < cutoff:
filepath.unlink()
cleaned += 1
logger.info(f"清理完成: 删除 {cleaned} 张过期图片")
return cleaned
except Exception as e:
logger.error(f"清理图片失败: {e}")
return 0
def get_statistics(self) -> Dict[str, Any]:
"""获取统计信息"""
return {
"saved_count": self._saved_count,
"failed_count": self._failed_count,
"queue_size": self._save_queue.qsize(),
"image_dir": self.config.image_dir,
}
def close(self):
"""关闭存储管理器"""
self._running = False
if self._save_thread and self._save_thread.is_alive():
self._save_thread.join(timeout=10)
logger.info(f"图片存储已关闭: 成功 {self._saved_count}, 失败 {self._failed_count}")
class LocalCacheManager:
"""本地缓存管理器(断网容灾)"""
def __init__(self, cache_dir: str = "./data/cache"):
self.cache_dir = Path(cache_dir)
self.cache_dir.mkdir(parents=True, exist_ok=True)
self._pending_dir = self.cache_dir / "pending"
self._pending_dir.mkdir(exist_ok=True)
self._sync_lock = threading.Lock()
def cache_alert(self, alert_data: Dict[str, Any]) -> str:
"""缓存告警数据(断网时使用)"""
cache_id = str(uuid.uuid4())
cache_path = self._pending_dir / f"{cache_id}.json"
try:
import json
with open(cache_path, 'w', encoding='utf-8') as f:
json.dump(alert_data, f, ensure_ascii=False, indent=2)
return cache_id
except Exception as e:
logger.error(f"缓存告警失败: {e}")
return ""
def get_pending_count(self) -> int:
"""获取待同步数量"""
return len(list(self._pending_dir.glob("*.json")))
def get_pending_alerts(self) -> List[Dict[str, Any]]:
"""获取待同步的告警"""
alerts = []
try:
import json
for cache_path in self._pending_dir.glob("*.json"):
with open(cache_path, 'r', encoding='utf-8') as f:
alert = json.load(f)
alert['_cache_id'] = cache_path.stem
alerts.append(alert)
except Exception as e:
logger.error(f"读取缓存告警失败: {e}")
return alerts
def remove_cached(self, cache_id: str):
"""移除已同步的缓存"""
cache_path = self._pending_dir / f"{cache_id}.json"
if cache_path.exists():
cache_path.unlink()
def clear_cache(self):
"""清空缓存"""
for cache_path in self._pending_dir.glob("*.json"):
cache_path.unlink()
def get_image_storage() -> ImageStorageManager:
"""获取图片存储管理器"""
return ImageStorageManager()
def get_local_cache() -> LocalCacheManager:
"""获取本地缓存管理器"""
return LocalCacheManager()