2026-01-30 11:34:51 +08:00
|
|
|
|
"""
|
|
|
|
|
|
存储管理器模块
|
|
|
|
|
|
负责图片保存、异步写入和断网容灾
|
|
|
|
|
|
|
|
|
|
|
|
特性:
|
|
|
|
|
|
- 异步保存抓拍图片
|
|
|
|
|
|
- 断网本地缓存
|
|
|
|
|
|
- 批量同步到云端
|
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
|
|
import os
|
|
|
|
|
|
import cv2
|
|
|
|
|
|
import uuid
|
|
|
|
|
|
import logging
|
|
|
|
|
|
import threading
|
|
|
|
|
|
import queue
|
|
|
|
|
|
import time
|
|
|
|
|
|
from datetime import datetime
|
|
|
|
|
|
from pathlib import Path
|
|
|
|
|
|
from typing import Optional, Dict, Any, List
|
|
|
|
|
|
from dataclasses import dataclass, field
|
|
|
|
|
|
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@dataclass
|
|
|
|
|
|
class CaptureConfig:
|
|
|
|
|
|
"""抓拍配置"""
|
|
|
|
|
|
image_dir: str = "./data/captures"
|
|
|
|
|
|
quality: int = 85
|
|
|
|
|
|
max_width: int = 1920
|
|
|
|
|
|
max_height: int = 1080
|
|
|
|
|
|
save_format: str = ".jpg"
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@dataclass
|
|
|
|
|
|
class PendingCapture:
|
|
|
|
|
|
"""待保存的抓拍"""
|
|
|
|
|
|
image: Any
|
|
|
|
|
|
camera_id: str
|
|
|
|
|
|
alert_id: str
|
|
|
|
|
|
timestamp: datetime = field(default_factory=datetime.now)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class ImageStorageManager:
|
|
|
|
|
|
"""图片存储管理器"""
|
2026-02-11 09:57:02 +08:00
|
|
|
|
|
2026-01-30 11:34:51 +08:00
|
|
|
|
_instance = None
|
|
|
|
|
|
_lock = threading.Lock()
|
2026-02-11 09:57:02 +08:00
|
|
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
|
|
def _sanitize_filename(name: str) -> str:
|
|
|
|
|
|
"""清理文件名中的非法字符(/ \\ 等路径分隔符替换为下划线)"""
|
|
|
|
|
|
return name.replace("/", "_").replace("\\", "_")
|
2026-01-30 11:34:51 +08:00
|
|
|
|
|
|
|
|
|
|
def __new__(cls, config: Optional[CaptureConfig] = None):
|
|
|
|
|
|
if cls._instance is None:
|
|
|
|
|
|
with cls._lock:
|
|
|
|
|
|
if cls._instance is None:
|
|
|
|
|
|
cls._instance = super().__new__(cls)
|
|
|
|
|
|
cls._instance._initialized = False
|
|
|
|
|
|
return cls._instance
|
|
|
|
|
|
|
|
|
|
|
|
def __init__(self, config: Optional[CaptureConfig] = None):
|
|
|
|
|
|
if self._initialized:
|
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
|
|
if config is None:
|
|
|
|
|
|
config = CaptureConfig()
|
|
|
|
|
|
|
|
|
|
|
|
self.config = config
|
|
|
|
|
|
self._save_queue: queue.Queue = queue.Queue()
|
|
|
|
|
|
self._running = False
|
|
|
|
|
|
self._save_thread: Optional[threading.Thread] = None
|
|
|
|
|
|
self._saved_count = 0
|
|
|
|
|
|
self._failed_count = 0
|
|
|
|
|
|
|
|
|
|
|
|
self._init_directories()
|
|
|
|
|
|
self._start_save_thread()
|
|
|
|
|
|
|
|
|
|
|
|
self._initialized = True
|
|
|
|
|
|
logger.info(f"图片存储管理器初始化: {config.image_dir}")
|
|
|
|
|
|
|
|
|
|
|
|
def _init_directories(self):
|
|
|
|
|
|
"""初始化目录"""
|
|
|
|
|
|
Path(self.config.image_dir).mkdir(parents=True, exist_ok=True)
|
|
|
|
|
|
|
|
|
|
|
|
def _start_save_thread(self):
|
|
|
|
|
|
"""启动保存线程"""
|
|
|
|
|
|
self._running = True
|
|
|
|
|
|
self._save_thread = threading.Thread(
|
|
|
|
|
|
target=self._save_worker,
|
|
|
|
|
|
name="ImageSave",
|
|
|
|
|
|
daemon=True
|
|
|
|
|
|
)
|
|
|
|
|
|
self._save_thread.start()
|
|
|
|
|
|
|
|
|
|
|
|
def _save_worker(self):
|
|
|
|
|
|
"""图片保存工作线程"""
|
|
|
|
|
|
while self._running:
|
|
|
|
|
|
try:
|
|
|
|
|
|
try:
|
|
|
|
|
|
capture: PendingCapture = self._save_queue.get(timeout=1.0)
|
|
|
|
|
|
self._save_image(capture)
|
|
|
|
|
|
except queue.Empty:
|
|
|
|
|
|
pass
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
logger.error(f"图片保存异常: {e}")
|
|
|
|
|
|
|
|
|
|
|
|
def _save_image(self, capture: PendingCapture) -> Optional[str]:
|
2026-02-11 09:57:02 +08:00
|
|
|
|
"""保存单张图片(使用 imencode+write_bytes 避免中文路径问题)"""
|
2026-01-30 11:34:51 +08:00
|
|
|
|
try:
|
|
|
|
|
|
image = capture.image
|
2026-02-11 09:57:02 +08:00
|
|
|
|
|
2026-01-30 11:34:51 +08:00
|
|
|
|
if image is None:
|
|
|
|
|
|
self._failed_count += 1
|
|
|
|
|
|
return None
|
2026-02-11 09:57:02 +08:00
|
|
|
|
|
2026-01-30 11:34:51 +08:00
|
|
|
|
if image.shape[1] > self.config.max_width or image.shape[0] > self.config.max_height:
|
|
|
|
|
|
scale = min(
|
|
|
|
|
|
self.config.max_width / image.shape[1],
|
|
|
|
|
|
self.config.max_height / image.shape[0]
|
|
|
|
|
|
)
|
|
|
|
|
|
new_size = (
|
|
|
|
|
|
int(image.shape[1] * scale),
|
|
|
|
|
|
int(image.shape[0] * scale)
|
|
|
|
|
|
)
|
|
|
|
|
|
image = cv2.resize(image, new_size, interpolation=cv2.INTER_AREA)
|
2026-02-11 09:57:02 +08:00
|
|
|
|
|
2026-01-30 11:34:51 +08:00
|
|
|
|
date_dir = capture.timestamp.strftime("%Y%m%d")
|
|
|
|
|
|
save_dir = Path(self.config.image_dir) / date_dir
|
|
|
|
|
|
save_dir.mkdir(parents=True, exist_ok=True)
|
2026-02-11 09:57:02 +08:00
|
|
|
|
|
|
|
|
|
|
safe_camera_id = self._sanitize_filename(capture.camera_id)
|
|
|
|
|
|
filename = f"{safe_camera_id}_{capture.alert_id}{self.config.save_format}"
|
2026-01-30 11:34:51 +08:00
|
|
|
|
filepath = save_dir / filename
|
2026-02-11 09:57:02 +08:00
|
|
|
|
|
|
|
|
|
|
# 使用 imencode + write_bytes 代替 imwrite,
|
|
|
|
|
|
# 因为 cv2.imwrite 在 Windows 上无法处理中文路径
|
|
|
|
|
|
success, buffer = cv2.imencode(
|
|
|
|
|
|
self.config.save_format,
|
2026-01-30 11:34:51 +08:00
|
|
|
|
image,
|
|
|
|
|
|
[cv2.IMWRITE_JPEG_QUALITY, self.config.quality]
|
|
|
|
|
|
)
|
2026-02-11 09:57:02 +08:00
|
|
|
|
|
2026-01-30 11:34:51 +08:00
|
|
|
|
if success:
|
2026-02-11 09:57:02 +08:00
|
|
|
|
filepath.write_bytes(buffer.tobytes())
|
2026-01-30 11:34:51 +08:00
|
|
|
|
self._saved_count += 1
|
2026-02-11 09:57:02 +08:00
|
|
|
|
logger.info(f"图片已保存: {filepath}")
|
2026-01-30 11:34:51 +08:00
|
|
|
|
return str(filepath)
|
|
|
|
|
|
else:
|
2026-02-11 09:57:02 +08:00
|
|
|
|
logger.warning(f"图片编码失败: {filepath}")
|
2026-01-30 11:34:51 +08:00
|
|
|
|
self._failed_count += 1
|
|
|
|
|
|
return None
|
2026-02-11 09:57:02 +08:00
|
|
|
|
|
2026-01-30 11:34:51 +08:00
|
|
|
|
except Exception as e:
|
|
|
|
|
|
logger.error(f"保存图片失败: {e}")
|
|
|
|
|
|
self._failed_count += 1
|
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
|
|
def save_capture(
|
|
|
|
|
|
self,
|
|
|
|
|
|
image: Any,
|
|
|
|
|
|
camera_id: str,
|
|
|
|
|
|
alert_id: str,
|
|
|
|
|
|
timestamp: Optional[datetime] = None
|
|
|
|
|
|
) -> Optional[str]:
|
2026-02-11 09:57:02 +08:00
|
|
|
|
"""异步保存抓拍图片,返回预计的文件路径"""
|
|
|
|
|
|
ts = timestamp or datetime.now()
|
2026-01-30 11:34:51 +08:00
|
|
|
|
capture = PendingCapture(
|
|
|
|
|
|
image=image,
|
|
|
|
|
|
camera_id=camera_id,
|
|
|
|
|
|
alert_id=alert_id,
|
2026-02-11 09:57:02 +08:00
|
|
|
|
timestamp=ts,
|
2026-01-30 11:34:51 +08:00
|
|
|
|
)
|
|
|
|
|
|
self._save_queue.put(capture)
|
2026-02-11 09:57:02 +08:00
|
|
|
|
|
|
|
|
|
|
# 返回确定性的文件路径(与 _save_image 使用相同的命名规则)
|
|
|
|
|
|
date_dir = ts.strftime("%Y%m%d")
|
|
|
|
|
|
safe_camera_id = self._sanitize_filename(camera_id)
|
|
|
|
|
|
filename = f"{safe_camera_id}_{alert_id}{self.config.save_format}"
|
|
|
|
|
|
filepath = Path(self.config.image_dir) / date_dir / filename
|
|
|
|
|
|
return str(filepath)
|
2026-01-30 11:34:51 +08:00
|
|
|
|
|
|
|
|
|
|
def get_image_path(self, camera_id: str, alert_id: str) -> Optional[str]:
|
|
|
|
|
|
"""获取已保存图片路径"""
|
|
|
|
|
|
date_str = datetime.now().strftime("%Y%m%d")
|
2026-02-11 09:57:02 +08:00
|
|
|
|
safe_camera_id = self._sanitize_filename(camera_id)
|
|
|
|
|
|
filename = f"{safe_camera_id}_{alert_id}{self.config.save_format}"
|
2026-01-30 11:34:51 +08:00
|
|
|
|
filepath = Path(self.config.image_dir) / date_str / filename
|
|
|
|
|
|
|
|
|
|
|
|
if filepath.exists():
|
|
|
|
|
|
return str(filepath)
|
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
|
|
def cleanup_old_images(self, days: int = 7):
|
|
|
|
|
|
"""清理过期图片"""
|
|
|
|
|
|
cutoff = datetime.now().timestamp() - (days * 24 * 60 * 60)
|
|
|
|
|
|
cleaned = 0
|
|
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
|
for filepath in Path(self.config.image_dir).rglob(f"*{self.config.save_format}"):
|
|
|
|
|
|
if filepath.stat().st_mtime < cutoff:
|
|
|
|
|
|
filepath.unlink()
|
|
|
|
|
|
cleaned += 1
|
|
|
|
|
|
|
|
|
|
|
|
logger.info(f"清理完成: 删除 {cleaned} 张过期图片")
|
|
|
|
|
|
return cleaned
|
|
|
|
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
logger.error(f"清理图片失败: {e}")
|
|
|
|
|
|
return 0
|
|
|
|
|
|
|
|
|
|
|
|
def get_statistics(self) -> Dict[str, Any]:
|
|
|
|
|
|
"""获取统计信息"""
|
|
|
|
|
|
return {
|
|
|
|
|
|
"saved_count": self._saved_count,
|
|
|
|
|
|
"failed_count": self._failed_count,
|
|
|
|
|
|
"queue_size": self._save_queue.qsize(),
|
|
|
|
|
|
"image_dir": self.config.image_dir,
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
def close(self):
|
|
|
|
|
|
"""关闭存储管理器"""
|
|
|
|
|
|
self._running = False
|
|
|
|
|
|
|
|
|
|
|
|
if self._save_thread and self._save_thread.is_alive():
|
|
|
|
|
|
self._save_thread.join(timeout=10)
|
|
|
|
|
|
|
|
|
|
|
|
logger.info(f"图片存储已关闭: 成功 {self._saved_count}, 失败 {self._failed_count}")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class LocalCacheManager:
|
|
|
|
|
|
"""本地缓存管理器(断网容灾)"""
|
|
|
|
|
|
|
|
|
|
|
|
def __init__(self, cache_dir: str = "./data/cache"):
|
|
|
|
|
|
self.cache_dir = Path(cache_dir)
|
|
|
|
|
|
self.cache_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
|
|
|
|
|
|
|
|
self._pending_dir = self.cache_dir / "pending"
|
|
|
|
|
|
self._pending_dir.mkdir(exist_ok=True)
|
|
|
|
|
|
|
|
|
|
|
|
self._sync_lock = threading.Lock()
|
|
|
|
|
|
|
|
|
|
|
|
def cache_alert(self, alert_data: Dict[str, Any]) -> str:
|
|
|
|
|
|
"""缓存告警数据(断网时使用)"""
|
|
|
|
|
|
cache_id = str(uuid.uuid4())
|
|
|
|
|
|
cache_path = self._pending_dir / f"{cache_id}.json"
|
|
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
|
import json
|
|
|
|
|
|
with open(cache_path, 'w', encoding='utf-8') as f:
|
|
|
|
|
|
json.dump(alert_data, f, ensure_ascii=False, indent=2)
|
|
|
|
|
|
return cache_id
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
logger.error(f"缓存告警失败: {e}")
|
|
|
|
|
|
return ""
|
|
|
|
|
|
|
|
|
|
|
|
def get_pending_count(self) -> int:
|
|
|
|
|
|
"""获取待同步数量"""
|
|
|
|
|
|
return len(list(self._pending_dir.glob("*.json")))
|
|
|
|
|
|
|
|
|
|
|
|
def get_pending_alerts(self) -> List[Dict[str, Any]]:
|
|
|
|
|
|
"""获取待同步的告警"""
|
|
|
|
|
|
alerts = []
|
|
|
|
|
|
try:
|
|
|
|
|
|
import json
|
|
|
|
|
|
for cache_path in self._pending_dir.glob("*.json"):
|
|
|
|
|
|
with open(cache_path, 'r', encoding='utf-8') as f:
|
|
|
|
|
|
alert = json.load(f)
|
|
|
|
|
|
alert['_cache_id'] = cache_path.stem
|
|
|
|
|
|
alerts.append(alert)
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
logger.error(f"读取缓存告警失败: {e}")
|
|
|
|
|
|
return alerts
|
|
|
|
|
|
|
|
|
|
|
|
def remove_cached(self, cache_id: str):
|
|
|
|
|
|
"""移除已同步的缓存"""
|
|
|
|
|
|
cache_path = self._pending_dir / f"{cache_id}.json"
|
|
|
|
|
|
if cache_path.exists():
|
|
|
|
|
|
cache_path.unlink()
|
|
|
|
|
|
|
|
|
|
|
|
def clear_cache(self):
|
|
|
|
|
|
"""清空缓存"""
|
|
|
|
|
|
for cache_path in self._pending_dir.glob("*.json"):
|
|
|
|
|
|
cache_path.unlink()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_image_storage() -> ImageStorageManager:
|
|
|
|
|
|
"""获取图片存储管理器"""
|
|
|
|
|
|
return ImageStorageManager()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_local_cache() -> LocalCacheManager:
|
|
|
|
|
|
"""获取本地缓存管理器"""
|
|
|
|
|
|
return LocalCacheManager()
|