import json import threading import time from typing import Dict, List, Optional, Callable from collections import deque class ROICacheManager: _instance = None _lock = threading.Lock() def __new__(cls): if cls._instance is None: with cls._lock: if cls._instance is None: cls._instance = super().__new__(cls) cls._instance._initialized = False return cls._instance def __init__(self): if self._initialized: return self._initialized = True self._cache: Dict[int, List[Dict]] = {} self._cache_timestamps: Dict[int, float] = {} self._refresh_interval = 10.0 self._db_session_factory = None self._refresh_thread: Optional[threading.Thread] = None self._stop_event = threading.Event() self._last_refresh_time = 0 self._on_cache_update: Optional[Callable[[int], None]] = None self._update_callbacks: Dict[int, List[Callable]] = {} def initialize(self, session_factory, refresh_interval: float = 10.0): self._db_session_factory = session_factory self._refresh_interval = refresh_interval def start_background_refresh(self): if self._refresh_thread is not None and self._refresh_thread.is_alive(): return self._stop_event.clear() self._refresh_thread = threading.Thread(target=self._background_refresh_loop, daemon=True) self._refresh_thread.start() def stop_background_refresh(self): self._stop_event.set() if self._refresh_thread is not None: self._refresh_thread.join(timeout=2) self._refresh_thread = None def _background_refresh_loop(self): while not self._stop_event.is_set(): try: self.refresh_all() except Exception: pass self._stop_event.wait(self._refresh_interval) def _load_rois_from_db(self, camera_id: int) -> List[Dict]: if self._db_session_factory is None: return [] session = self._db_session_factory() try: from db.crud import get_all_rois rois = get_all_rois(session, camera_id) roi_configs = [] for roi in rois: try: points = json.loads(roi.points) if isinstance(roi.points, str) else roi.points except (json.JSONDecodeError, TypeError): points = [] roi_config = { "id": roi.id, "roi_id": roi.roi_id, "name": roi.name, "type": roi.roi_type, "points": points, "rule": roi.rule_type, "direction": roi.direction, "enabled": roi.enabled, "threshold_sec": roi.threshold_sec, "confirm_sec": roi.confirm_sec, "return_sec": roi.return_sec, "working_hours": json.loads(roi.working_hours) if roi.working_hours else None, } roi_configs.append(roi_config) return roi_configs finally: session.close() def refresh_all(self): if self._db_session_factory is None: return current_time = time.time() if current_time - self._last_refresh_time < 1.0: return self._last_refresh_time = current_time camera_ids = list(self._cache.keys()) for camera_id in camera_ids: try: new_rois = self._load_rois_from_db(camera_id) old_rois_str = str(self._cache.get(camera_id, [])) new_rois_str = str(new_rois) if old_rois_str != new_rois_str: self._cache[camera_id] = new_rois self._cache_timestamps[camera_id] = current_time self._notify_update(camera_id) except Exception: pass def get_rois(self, camera_id: int, force_refresh: bool = False) -> List[Dict]: if force_refresh or camera_id not in self._cache: self._cache[camera_id] = self._load_rois_from_db(camera_id) self._cache_timestamps[camera_id] = time.time() return self._cache.get(camera_id, []) def get_rois_by_rule(self, camera_id: int, rule_type: str) -> List[Dict]: rois = self.get_rois(camera_id) return [roi for roi in rois if roi.get("rule") == rule_type and roi.get("enabled", True)] def invalidate(self, camera_id: Optional[int] = None): if camera_id is None: self._cache.clear() self._cache_timestamps.clear() elif camera_id in self._cache: del self._cache[camera_id] if camera_id in self._cache_timestamps: del self._cache_timestamps[camera_id] def register_update_callback(self, camera_id: int, callback: Callable): if camera_id not in self._update_callbacks: self._update_callbacks[camera_id] = [] self._update_callbacks[camera_id].append(callback) def _notify_update(self, camera_id: int): if camera_id in self._update_callbacks: for callback in self._update_callbacks[camera_id]: try: callback(camera_id) except Exception: pass def get_cache_info(self) -> Dict: return { "camera_count": len(self._cache), "refresh_interval": self._refresh_interval, "cameras": { cam_id: { "roi_count": len(rois), "last_update": self._cache_timestamps.get(cam_id, 0), } for cam_id, rois in self._cache.items() }, } def get_roi_cache() -> ROICacheManager: return ROICacheManager()