""" 存储管理器模块 负责图片保存、异步写入和断网容灾 特性: - 异步保存抓拍图片 - 断网本地缓存 - 批量同步到云端 """ import os import cv2 import uuid import logging import threading import queue import time from datetime import datetime from pathlib import Path from typing import Optional, Dict, Any, List from dataclasses import dataclass, field logger = logging.getLogger(__name__) @dataclass class CaptureConfig: """抓拍配置""" image_dir: str = "./data/captures" quality: int = 85 max_width: int = 1920 max_height: int = 1080 save_format: str = ".jpg" @dataclass class PendingCapture: """待保存的抓拍""" image: Any camera_id: str alert_id: str timestamp: datetime = field(default_factory=datetime.now) class ImageStorageManager: """图片存储管理器""" _instance = None _lock = threading.Lock() @staticmethod def _sanitize_filename(name: str) -> str: """清理文件名中的非法字符(/ \\ 等路径分隔符替换为下划线)""" return name.replace("/", "_").replace("\\", "_") def __new__(cls, config: Optional[CaptureConfig] = None): if cls._instance is None: with cls._lock: if cls._instance is None: cls._instance = super().__new__(cls) cls._instance._initialized = False return cls._instance def __init__(self, config: Optional[CaptureConfig] = None): if self._initialized: return if config is None: config = CaptureConfig() self.config = config self._save_queue: queue.Queue = queue.Queue() self._running = False self._save_thread: Optional[threading.Thread] = None self._saved_count = 0 self._failed_count = 0 self._init_directories() self._start_save_thread() self._initialized = True logger.info(f"图片存储管理器初始化: {config.image_dir}") def _init_directories(self): """初始化目录""" Path(self.config.image_dir).mkdir(parents=True, exist_ok=True) def _start_save_thread(self): """启动保存线程""" self._running = True self._save_thread = threading.Thread( target=self._save_worker, name="ImageSave", daemon=True ) self._save_thread.start() def _save_worker(self): """图片保存工作线程""" while self._running: try: try: capture: PendingCapture = self._save_queue.get(timeout=1.0) self._save_image(capture) except queue.Empty: pass except Exception as e: logger.error(f"图片保存异常: {e}") def _save_image(self, capture: PendingCapture) -> Optional[str]: """保存单张图片(使用 imencode+write_bytes 避免中文路径问题)""" try: image = capture.image if image is None: self._failed_count += 1 return None if image.shape[1] > self.config.max_width or image.shape[0] > self.config.max_height: scale = min( self.config.max_width / image.shape[1], self.config.max_height / image.shape[0] ) new_size = ( int(image.shape[1] * scale), int(image.shape[0] * scale) ) image = cv2.resize(image, new_size, interpolation=cv2.INTER_AREA) date_dir = capture.timestamp.strftime("%Y%m%d") save_dir = Path(self.config.image_dir) / date_dir save_dir.mkdir(parents=True, exist_ok=True) safe_camera_id = self._sanitize_filename(capture.camera_id) filename = f"{safe_camera_id}_{capture.alert_id}{self.config.save_format}" filepath = save_dir / filename # 使用 imencode + write_bytes 代替 imwrite, # 因为 cv2.imwrite 在 Windows 上无法处理中文路径 success, buffer = cv2.imencode( self.config.save_format, image, [cv2.IMWRITE_JPEG_QUALITY, self.config.quality] ) if success: filepath.write_bytes(buffer.tobytes()) self._saved_count += 1 logger.info(f"图片已保存: {filepath}") return str(filepath) else: logger.warning(f"图片编码失败: {filepath}") self._failed_count += 1 return None except Exception as e: logger.error(f"保存图片失败: {e}") self._failed_count += 1 return None def save_capture( self, image: Any, camera_id: str, alert_id: str, timestamp: Optional[datetime] = None ) -> Optional[str]: """异步保存抓拍图片,返回预计的文件路径""" ts = timestamp or datetime.now() capture = PendingCapture( image=image, camera_id=camera_id, alert_id=alert_id, timestamp=ts, ) self._save_queue.put(capture) # 返回确定性的文件路径(与 _save_image 使用相同的命名规则) date_dir = ts.strftime("%Y%m%d") safe_camera_id = self._sanitize_filename(camera_id) filename = f"{safe_camera_id}_{alert_id}{self.config.save_format}" filepath = Path(self.config.image_dir) / date_dir / filename return str(filepath) def get_image_path(self, camera_id: str, alert_id: str) -> Optional[str]: """获取已保存图片路径""" date_str = datetime.now().strftime("%Y%m%d") safe_camera_id = self._sanitize_filename(camera_id) filename = f"{safe_camera_id}_{alert_id}{self.config.save_format}" filepath = Path(self.config.image_dir) / date_str / filename if filepath.exists(): return str(filepath) return None def cleanup_old_images(self, days: int = 7): """清理过期图片""" cutoff = datetime.now().timestamp() - (days * 24 * 60 * 60) cleaned = 0 try: for filepath in Path(self.config.image_dir).rglob(f"*{self.config.save_format}"): if filepath.stat().st_mtime < cutoff: filepath.unlink() cleaned += 1 logger.info(f"清理完成: 删除 {cleaned} 张过期图片") return cleaned except Exception as e: logger.error(f"清理图片失败: {e}") return 0 def get_statistics(self) -> Dict[str, Any]: """获取统计信息""" return { "saved_count": self._saved_count, "failed_count": self._failed_count, "queue_size": self._save_queue.qsize(), "image_dir": self.config.image_dir, } def close(self): """关闭存储管理器""" self._running = False if self._save_thread and self._save_thread.is_alive(): self._save_thread.join(timeout=10) logger.info(f"图片存储已关闭: 成功 {self._saved_count}, 失败 {self._failed_count}") class LocalCacheManager: """本地缓存管理器(断网容灾)""" def __init__(self, cache_dir: str = "./data/cache"): self.cache_dir = Path(cache_dir) self.cache_dir.mkdir(parents=True, exist_ok=True) self._pending_dir = self.cache_dir / "pending" self._pending_dir.mkdir(exist_ok=True) self._sync_lock = threading.Lock() def cache_alert(self, alert_data: Dict[str, Any]) -> str: """缓存告警数据(断网时使用)""" cache_id = str(uuid.uuid4()) cache_path = self._pending_dir / f"{cache_id}.json" try: import json with open(cache_path, 'w', encoding='utf-8') as f: json.dump(alert_data, f, ensure_ascii=False, indent=2) return cache_id except Exception as e: logger.error(f"缓存告警失败: {e}") return "" def get_pending_count(self) -> int: """获取待同步数量""" return len(list(self._pending_dir.glob("*.json"))) def get_pending_alerts(self) -> List[Dict[str, Any]]: """获取待同步的告警""" alerts = [] try: import json for cache_path in self._pending_dir.glob("*.json"): with open(cache_path, 'r', encoding='utf-8') as f: alert = json.load(f) alert['_cache_id'] = cache_path.stem alerts.append(alert) except Exception as e: logger.error(f"读取缓存告警失败: {e}") return alerts def remove_cached(self, cache_id: str): """移除已同步的缓存""" cache_path = self._pending_dir / f"{cache_id}.json" if cache_path.exists(): cache_path.unlink() def clear_cache(self): """清空缓存""" for cache_path in self._pending_dir.glob("*.json"): cache_path.unlink() def get_image_storage() -> ImageStorageManager: """获取图片存储管理器""" return ImageStorageManager() def get_local_cache() -> LocalCacheManager: """获取本地缓存管理器""" return LocalCacheManager()