Files
security-ai-edge/core/config_sync.py

561 lines
20 KiB
Python
Raw Normal View History

2026-01-29 18:33:12 +08:00
"""
配置同步模块
实现 SQLite 本地配置存储Redis Pub/Sub订阅配置缓存与动态刷新
存储策略
- SQLite: 本地配置存储摄像头ROI
- Redis: 配置变更实时推送
- 内存缓存: 热点配置快速访问
2026-01-29 18:33:12 +08:00
"""
import json
import logging
import threading
import time
from datetime import datetime
from typing import Any, Callable, Dict, List, Optional, Set
import redis
from redis import Redis
from redis.client import PubSub
from config.settings import get_settings, RedisConfig
from config.database import get_sqlite_manager, SQLiteManager
2026-01-29 18:33:12 +08:00
from config.config_models import CameraInfo as CameraInfoModel, ROIInfo, ConfigVersion
from utils.version_control import get_version_control
logger = logging.getLogger(__name__)
class ConfigCache:
"""配置缓存管理类"""
def __init__(self, max_size: int = 1000, ttl: int = 300):
self._cache: Dict[str, Any] = {}
self._access_times: Dict[str, float] = {}
self._max_size = max_size
self._ttl = ttl
self._lock = threading.RLock()
def get(self, key: str) -> Optional[Any]:
"""从缓存获取配置"""
with self._lock:
if key in self._cache:
access_time = self._access_times.get(key, 0)
if (time.time() - access_time) < self._ttl:
self._access_times[key] = time.time()
return self._cache[key]
else:
self._delete(key)
return None
def set(self, key: str, value: Any):
"""设置配置到缓存"""
with self._lock:
if len(self._cache) >= self._max_size:
self._evict_lru()
self._cache[key] = value
self._access_times[key] = time.time()
def delete(self, key: str):
"""删除缓存项"""
with self._lock:
self._delete(key)
def _delete(self, key: str):
"""内部删除方法(不获取锁)"""
self._cache.pop(key, None)
self._access_times.pop(key, None)
def _evict_lru(self):
"""淘汰最少使用的缓存项"""
if not self._access_times:
return
min_access_time = min(self._access_times.values())
lru_keys = [k for k, v in self._access_times.items() if v == min_access_time]
for key in lru_keys[:10]:
self._delete(key)
def clear(self):
"""清空缓存"""
with self._lock:
self._cache.clear()
self._access_times.clear()
def get_stats(self) -> Dict[str, Any]:
"""获取缓存统计信息"""
with self._lock:
return {
"size": len(self._cache),
"max_size": self._max_size,
"ttl": self._ttl,
}
class ConfigSyncManager:
"""配置同步管理器类"""
_instance = None
_lock = threading.Lock()
def __new__(cls):
if cls._instance is None:
with cls._lock:
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._initialized = False
return cls._instance
def __init__(self):
if self._initialized:
return
settings = get_settings()
self._config_version = settings.config_version
self._cache = ConfigCache()
self._redis_client = None
self._redis_pubsub = None
self._pubsub_thread = None
self._stop_event = threading.Event()
self._callbacks: Dict[str, Set[Callable]] = {}
self._db_manager = None
self._initialized = True
self._init_redis()
self._version_control = get_version_control()
def _init_redis(self):
"""初始化Redis连接"""
try:
settings = get_settings()
redis_config = settings.redis
self._redis_client = redis.Redis(
host=redis_config.host,
port=redis_config.port,
db=redis_config.db,
password=redis_config.password,
decode_responses=redis_config.decode_responses,
socket_connect_timeout=10,
socket_timeout=10,
retry_on_timeout=True,
)
self._redis_client.ping()
logger.info(f"Redis连接成功: {redis_config.host}:{redis_config.port}")
except Exception as e:
logger.error(f"Redis连接失败: {e}")
self._redis_client = None
def _init_database(self):
"""初始化数据库连接"""
if self._db_manager is None:
self._db_manager = get_sqlite_manager()
2026-01-29 18:33:12 +08:00
@property
def config_version(self) -> str:
"""获取当前配置版本"""
return self._config_version
def register_callback(self, topic: str, callback: Callable):
"""注册配置变更回调函数"""
if topic not in self._callbacks:
self._callbacks[topic] = set()
self._callbacks[topic].add(callback)
logger.info(f"已注册配置变更回调: {topic}")
def unregister_callback(self, topic: str, callback: Callable):
"""注销配置变更回调函数"""
if topic in self._callbacks:
self._callbacks[topic].discard(callback)
def _notify_callbacks(self, topic: str, data: Dict[str, Any]):
"""触发配置变更回调"""
if topic in self._callbacks:
for callback in self._callbacks[topic]:
try:
callback(topic, data)
except Exception as e:
logger.error(f"配置变更回调执行失败: {e}")
def _subscribe_config_updates(self):
"""订阅配置更新主题"""
if not self._redis_client:
logger.warning("Redis未连接无法订阅配置更新")
return
try:
self._redis_pubsub = self._redis_client.pubsub()
self._redis_pubsub.subscribe("config_update")
logger.info("已订阅config_update主题")
for message in self._redis_pubsub.listen():
if self._stop_event.is_set():
break
if message["type"] == "message":
try:
data = json.loads(message["data"])
self._handle_config_update(data)
except Exception as e:
logger.error(f"处理配置更新消息失败: {e}")
except Exception as e:
logger.error(f"配置更新订阅异常: {e}")
def _handle_config_update(self, data: Dict[str, Any]):
"""处理配置更新消息"""
update_type = data.get("type", "full")
affected_items = data.get("affected_items", [])
version = data.get("version", self._config_version)
logger.info(f"收到配置更新通知: type={update_type}, items={affected_items}")
if "camera" in affected_items or "all" in affected_items:
self._cache.delete("cameras")
if "roi" in affected_items or "all" in affected_items:
self._cache.delete("rois")
self._config_version = version
self._notify_callbacks("config_update", data)
self._version_control.record_update(
version=version,
update_type="配置更新",
description=f"云端配置更新,影响范围: {', '.join(affected_items)}",
updated_by="云端系统",
affected_items=affected_items,
details=data
)
def start_config_subscription(self):
"""启动配置订阅线程"""
if self._pubsub_thread is None or not self._pubsub_thread.is_alive():
self._stop_event.clear()
self._pubsub_thread = threading.Thread(
target=self._subscribe_config_updates,
name="ConfigSubscription",
daemon=True
)
self._pubsub_thread.start()
logger.info("配置订阅线程已启动")
def stop_config_subscription(self):
"""停止配置订阅线程"""
self._stop_event.set()
if self._pubsub_thread and self._pubsub_thread.is_alive():
self._pubsub_thread.join(timeout=5)
logger.info("配置订阅线程已停止")
def get_cameras(self, force_refresh: bool = False) -> List[CameraInfoModel]:
"""获取摄像头配置列表"""
cache_key = "cameras"
if not force_refresh:
cached = self._cache.get(cache_key)
if cached is not None:
return cached
self._init_database()
if self._db_manager is None:
logger.warning("数据库管理器不可用,返回空摄像头列表")
return []
try:
cameras = self._db_manager.get_all_camera_configs()
2026-01-29 18:33:12 +08:00
result = [CameraInfoModel.from_dict(c) for c in cameras]
self._cache.set(cache_key, result)
logger.info(f"已加载摄像头配置: {len(result)}")
return result
except Exception as e:
logger.error(f"获取摄像头配置失败: {e}")
cached = self._cache.get(cache_key)
return cached or []
def get_roi_configs(self, camera_id: Optional[str] = None,
force_refresh: bool = False) -> List[ROIInfo]:
"""获取ROI配置列表"""
cache_key = f"rois_{camera_id}" if camera_id else "rois_all"
if not force_refresh:
cached = self._cache.get(cache_key)
if cached is not None:
return cached
self._init_database()
if self._db_manager is None:
logger.warning("数据库管理器不可用返回空ROI配置列表")
return []
try:
if camera_id:
roi_configs = self._db_manager.get_rois_by_camera(camera_id)
else:
roi_configs = self._db_manager.get_all_roi_configs()
2026-01-29 18:33:12 +08:00
result = [ROIInfo.from_dict(r) for r in roi_configs]
self._cache.set(cache_key, result)
logger.info(f"已加载ROI配置: {len(result)}")
return result
except Exception as e:
logger.error(f"获取ROI配置失败: {e}")
cached = self._cache.get(cache_key)
return cached or []
def get_camera_rois(self, camera_id: str) -> List[ROIInfo]:
"""获取指定摄像头的ROI配置"""
return self.get_roi_configs(camera_id=camera_id)
def get_config_by_id(self, config_type: str, config_id: str) -> Optional[Dict[str, Any]]:
"""根据ID获取配置"""
self._init_database()
try:
if config_type == "camera":
camera = self._db_manager.get_camera_config(config_id)
return camera if camera else None
2026-01-29 18:33:12 +08:00
elif config_type == "roi":
roi = self._db_manager.get_roi_config(config_id)
return roi if roi else None
2026-01-29 18:33:12 +08:00
except Exception as e:
logger.error(f"获取配置失败: {e}")
return None
def publish_config_update(self, update_data: Dict[str, Any]) -> bool:
"""发布配置更新通知"""
if not self._redis_client:
logger.warning("Redis未连接无法发布配置更新")
return False
try:
update_data["version"] = self._config_version
update_data["timestamp"] = datetime.now().isoformat()
self._redis_client.publish("config_update", json.dumps(update_data))
logger.info(f"已发布配置更新: {update_data}")
return True
except Exception as e:
logger.error(f"发布配置更新失败: {e}")
return False
def invalidate_cache(self, cache_key: str):
"""使指定缓存失效"""
self._cache.delete(cache_key)
logger.info(f"缓存已失效: {cache_key}")
def invalidate_all_cache(self):
"""使所有缓存失效"""
self._cache.clear()
logger.info("所有缓存已失效")
def get_cache_stats(self) -> Dict[str, Any]:
"""获取缓存统计信息"""
return self._cache.get_stats()
def get_health_status(self) -> Dict[str, Any]:
"""获取健康状态"""
redis_healthy = False
if self._redis_client:
try:
self._redis_client.ping()
redis_healthy = True
except Exception:
pass
return {
"redis_connected": redis_healthy,
"config_version": self._config_version,
"cache_stats": self.get_cache_stats(),
"subscription_active": (
self._pubsub_thread is not None and
self._pubsub_thread.is_alive()
),
}
def _cache_roi_to_redis(self, roi_config: Dict[str, Any]) -> bool:
"""将ROI配置缓存到Redis"""
if not self._redis_client:
return False
try:
roi_id = roi_config.get("roi_id")
key = f"config:roi:{roi_id}"
self._redis_client.hset(key, mapping={
"roi_id": roi_id,
"camera_id": roi_config.get("camera_id", ""),
"roi_type": roi_config.get("roi_type", ""),
"coordinates": str(roi_config.get("coordinates", [])),
"algorithm_type": roi_config.get("algorithm_type", ""),
"working_hours": str(roi_config.get("working_hours", [])),
"confirm_on_duty_sec": str(roi_config.get("confirm_on_duty_sec", 10)),
"confirm_leave_sec": str(roi_config.get("confirm_leave_sec", 10)),
"cooldown_sec": str(roi_config.get("cooldown_sec", 300)),
"target_class": roi_config.get("target_class", "person"),
"enabled": str(roi_config.get("enabled", True)),
})
self._redis_client.expire(key, 3600)
logger.debug(f"ROI配置已缓存到Redis: {key}")
return True
except Exception as e:
logger.error(f"缓存ROI配置到Redis失败: {e}")
return False
def _cache_camera_to_redis(self, camera_config: Dict[str, Any]) -> bool:
"""将摄像头配置缓存到Redis"""
if not self._redis_client:
return False
try:
camera_id = camera_config.get("camera_id")
key = f"config:camera:{camera_id}"
self._redis_client.hset(key, mapping={
"camera_id": camera_id,
"rtsp_url": camera_config.get("rtsp_url", ""),
"camera_name": camera_config.get("camera_name", ""),
"enabled": str(camera_config.get("enabled", True)),
"location": camera_config.get("location", ""),
})
self._redis_client.expire(key, 3600)
logger.debug(f"摄像头配置已缓存到Redis: {key}")
return True
except Exception as e:
logger.error(f"缓存摄像头配置到Redis失败: {e}")
return False
def sync_all_to_redis(self) -> int:
"""同步所有配置到Redis缓存"""
if not self._redis_client:
return 0
count = 0
try:
cameras = self._db_manager.get_all_camera_configs()
for camera in cameras:
if self._cache_camera_to_redis(camera):
count += 1
rois = self._db_manager.get_all_roi_configs()
for roi in rois:
if self._cache_roi_to_redis(roi):
count += 1
logger.info(f"已同步 {count} 条配置到Redis缓存")
return count
except Exception as e:
logger.error(f"同步配置到Redis失败: {e}")
return count
def get_roi_from_redis(self, roi_id: str) -> Optional[Dict[str, Any]]:
"""从Redis获取ROI配置"""
if not self._redis_client:
return None
try:
key = f"config:roi:{roi_id}"
data = self._redis_client.hgetall(key)
if data:
if data.get('coordinates'):
data['coordinates'] = eval(data['coordinates'])
if data.get('working_hours'):
data['working_hours'] = eval(data['working_hours'])
data['confirm_on_duty_sec'] = int(data.get('confirm_on_duty_sec', 10))
data['confirm_leave_sec'] = int(data.get('confirm_leave_sec', 10))
data['cooldown_sec'] = int(data.get('cooldown_sec', 300))
data['enabled'] = data.get('enabled', 'True') == 'True'
return data
return None
except Exception as e:
logger.error(f"从Redis获取ROI配置失败: {e}")
return None
def get_camera_from_redis(self, camera_id: str) -> Optional[Dict[str, Any]]:
"""从Redis获取摄像头配置"""
if not self._redis_client:
return None
try:
key = f"config:camera:{camera_id}"
data = self._redis_client.hgetall(key)
if data:
data['enabled'] = data.get('enabled', 'True') == 'True'
return data
return None
except Exception as e:
logger.error(f"从Redis获取摄像头配置失败: {e}")
return None
def notify_config_change(self, config_type: str, config_ids: List[str]):
"""通知配置变更发布到Redis频道"""
if not self._redis_client:
return
try:
message = {
"type": config_type,
"ids": config_ids,
"timestamp": datetime.now().isoformat(),
}
self._redis_client.publish("config_update", json.dumps(message))
logger.info(f"已发布配置变更通知: {config_type} - {config_ids}")
except Exception as e:
logger.error(f"发布配置变更通知失败: {e}")
def clear_redis_cache(self, config_type: Optional[str] = None):
"""清理Redis缓存"""
if not self._redis_client:
return
try:
if config_type == "roi":
keys = self._redis_client.keys("config:roi:*")
if keys:
self._redis_client.delete(*keys)
elif config_type == "camera":
keys = self._redis_client.keys("config:camera:*")
if keys:
self._redis_client.delete(*keys)
else:
keys = self._redis_client.keys("config:*")
if keys:
self._redis_client.delete(*keys)
logger.info(f"已清理Redis缓存: {config_type or 'all'}")
except Exception as e:
logger.error(f"清理Redis缓存失败: {e}")
def reload_algorithms(self):
"""重新加载所有算法配置"""
self.invalidate_all_cache()
self.clear_redis_cache()
count = self.sync_all_to_redis()
self.notify_config_change("roi", [])
logger.info(f"算法配置已重新加载,更新了 {count} 条缓存")
2026-01-29 18:33:12 +08:00
def close(self):
"""关闭管理器"""
self.stop_config_subscription()
if self._redis_client:
if self._redis_pubsub:
self._redis_pubsub.close()
self._redis_client.close()
logger.info("Redis连接已关闭")
def get_config_sync_manager() -> ConfigSyncManager:
"""获取配置同步管理器单例"""
return ConfigSyncManager()