Files
security-ai-edge/core/config_sync.py

805 lines
31 KiB
Python
Raw Normal View History

2026-01-29 18:33:12 +08:00
"""
配置同步模块
实现 SQLite 本地配置存储Redis Pub/Sub订阅配置缓存与动态刷新
存储策略
- SQLite: 本地配置存储摄像头ROI
- Redis: 配置变更实时推送
- 内存缓存: 热点配置快速访问
2026-01-29 18:33:12 +08:00
"""
import json
import logging
import threading
import time
from datetime import datetime
from typing import Any, Callable, Dict, List, Optional, Set
import redis
from redis import Redis
from redis.client import PubSub
from config.settings import get_settings, RedisConfig
from config.database import get_sqlite_manager, SQLiteManager
from config.config_models import CameraInfo as CameraInfoModel, ROIInfo, ConfigVersion, ROIInfoNew, ROIAlgoBind
2026-01-29 18:33:12 +08:00
from utils.version_control import get_version_control
logger = logging.getLogger(__name__)
class ConfigCache:
"""配置缓存管理类"""
def __init__(self, max_size: int = 1000, ttl: int = 300):
self._cache: Dict[str, Any] = {}
self._access_times: Dict[str, float] = {}
self._max_size = max_size
self._ttl = ttl
self._lock = threading.RLock()
def get(self, key: str) -> Optional[Any]:
"""从缓存获取配置"""
with self._lock:
if key in self._cache:
access_time = self._access_times.get(key, 0)
if (time.time() - access_time) < self._ttl:
self._access_times[key] = time.time()
return self._cache[key]
else:
self._delete(key)
return None
def set(self, key: str, value: Any):
"""设置配置到缓存"""
with self._lock:
if len(self._cache) >= self._max_size:
self._evict_lru()
self._cache[key] = value
self._access_times[key] = time.time()
def delete(self, key: str):
"""删除缓存项"""
with self._lock:
self._delete(key)
def _delete(self, key: str):
"""内部删除方法(不获取锁)"""
self._cache.pop(key, None)
self._access_times.pop(key, None)
def _evict_lru(self):
"""淘汰最少使用的缓存项"""
if not self._access_times:
return
min_access_time = min(self._access_times.values())
lru_keys = [k for k, v in self._access_times.items() if v == min_access_time]
for key in lru_keys[:10]:
self._delete(key)
def clear(self):
"""清空缓存"""
with self._lock:
self._cache.clear()
self._access_times.clear()
def get_stats(self) -> Dict[str, Any]:
"""获取缓存统计信息"""
with self._lock:
return {
"size": len(self._cache),
"max_size": self._max_size,
"ttl": self._ttl,
}
class ConfigSyncManager:
"""配置同步管理器类"""
_instance = None
_lock = threading.Lock()
def __new__(cls):
if cls._instance is None:
with cls._lock:
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._initialized = False
return cls._instance
def __init__(self):
if self._initialized:
return
settings = get_settings()
self._config_version = settings.config_version
self._cache = ConfigCache()
self._redis_client = None
self._redis_pubsub_client = None
2026-01-29 18:33:12 +08:00
self._redis_pubsub = None
self._pubsub_thread = None
self._stop_event = threading.Event()
self._callbacks: Dict[str, Set[Callable]] = {}
self._db_manager = None
self._initialized = True
self._init_redis()
self._version_control = get_version_control()
def _init_redis(self):
"""初始化Redis连接"""
try:
settings = get_settings()
redis_config = settings.redis
2026-01-29 18:33:12 +08:00
self._redis_client = redis.Redis(
host=redis_config.host,
port=redis_config.port,
db=redis_config.db,
password=redis_config.password,
decode_responses=redis_config.decode_responses,
socket_connect_timeout=10,
socket_timeout=10,
retry_on_timeout=True,
)
# Pub/Sub专用连接不设socket_timeout避免listen()周期性超时
self._redis_pubsub_client = redis.Redis(
host=redis_config.host,
port=redis_config.port,
db=redis_config.db,
password=redis_config.password,
decode_responses=redis_config.decode_responses,
socket_connect_timeout=10,
)
2026-01-29 18:33:12 +08:00
self._redis_client.ping()
logger.info(f"Redis连接成功: {redis_config.host}:{redis_config.port}")
except Exception as e:
logger.error(f"Redis连接失败: {e}")
self._redis_client = None
self._redis_pubsub_client = None
2026-01-29 18:33:12 +08:00
def _init_database(self):
"""初始化数据库连接"""
if self._db_manager is None:
self._db_manager = get_sqlite_manager()
2026-01-29 18:33:12 +08:00
@property
def config_version(self) -> str:
"""获取当前配置版本"""
return self._config_version
def register_callback(self, topic: str, callback: Callable):
"""注册配置变更回调函数"""
if topic not in self._callbacks:
self._callbacks[topic] = set()
self._callbacks[topic].add(callback)
logger.info(f"已注册配置变更回调: {topic}")
def unregister_callback(self, topic: str, callback: Callable):
"""注销配置变更回调函数"""
if topic in self._callbacks:
self._callbacks[topic].discard(callback)
def _notify_callbacks(self, topic: str, data: Dict[str, Any]):
"""触发配置变更回调"""
if topic in self._callbacks:
for callback in self._callbacks[topic]:
try:
callback(topic, data)
except Exception as e:
logger.error(f"配置变更回调执行失败: {e}")
def _subscribe_config_updates(self):
"""订阅配置更新主题(带自动重连)"""
if not self._redis_pubsub_client:
2026-01-29 18:33:12 +08:00
logger.warning("Redis未连接无法订阅配置更新")
return
while not self._stop_event.is_set():
try:
self._redis_pubsub = self._redis_pubsub_client.pubsub()
self._redis_pubsub.subscribe("config_update")
logger.info("已订阅config_update主题")
for message in self._redis_pubsub.listen():
if self._stop_event.is_set():
return
if message["type"] == "message":
try:
data = json.loads(message["data"])
self._handle_config_update(data)
except Exception as e:
logger.error(f"处理配置更新消息失败: {e}")
except Exception as e:
2026-01-29 18:33:12 +08:00
if self._stop_event.is_set():
return
logger.debug(f"配置更新订阅重连中: {e}")
try:
if self._redis_pubsub:
self._redis_pubsub.close()
except Exception:
pass
self._stop_event.wait(3) # 等3秒后重连
2026-01-29 18:33:12 +08:00
def _handle_config_update(self, data: Dict[str, Any]):
"""处理配置更新消息"""
update_type = data.get("type", "full")
affected_items = data.get("affected_items", [])
version = data.get("version", self._config_version)
2026-01-29 18:33:12 +08:00
logger.info(f"收到配置更新通知: type={update_type}, items={affected_items}")
# 从Redis同步最新配置到SQLite主推理管线从SQLite读取
self._sync_redis_to_sqlite(affected_items)
2026-01-29 18:33:12 +08:00
if "camera" in affected_items or "all" in affected_items:
self._cache.delete("cameras")
2026-01-29 18:33:12 +08:00
if "roi" in affected_items or "all" in affected_items:
self._cache.delete("rois")
# 清除所有带 camera_id 前缀的 rois_bindings 缓存
self._cache.clear()
2026-01-29 18:33:12 +08:00
self._config_version = version
self._notify_callbacks("config_update", data)
2026-01-29 18:33:12 +08:00
self._version_control.record_update(
version=version,
update_type="配置更新",
description=f"云端配置更新,影响范围: {', '.join(affected_items)}",
updated_by="云端系统",
affected_items=affected_items,
details=data
)
def _sync_redis_to_sqlite(self, affected_items: List[str]):
"""从Redis同步配置到SQLite使主推理管线能读取最新配置"""
self._init_database()
if not self._redis_client or not self._db_manager:
logger.warning("Redis或SQLite不可用跳过同步")
return
try:
sync_cameras = "camera" in affected_items or "all" in affected_items
sync_rois = "roi" in affected_items or "all" in affected_items
count = 0
if sync_cameras:
camera_keys = self._redis_client.keys("config:camera:*")
for key in (camera_keys or []):
try:
data = self._redis_client.hgetall(key)
if not data or not data.get("camera_id"):
continue
enabled = data.get("enabled", "True") == "True"
self._db_manager.save_camera_config(
camera_id=data["camera_id"],
rtsp_url=data.get("rtsp_url", ""),
camera_name=data.get("camera_name", ""),
enabled=enabled,
location=data.get("location", ""),
)
count += 1
except Exception as e:
logger.error(f"同步摄像头配置到SQLite失败: key={key}, error={e}")
if sync_rois:
# 同步ROI配置
roi_keys = self._redis_client.keys("config:roi:*")
for key in (roi_keys or []):
try:
data = self._redis_client.hgetall(key)
if not data or not data.get("roi_id"):
continue
raw_coords = eval(data["coordinates"]) if data.get("coordinates") else []
# 兼容 [{x:0.1, y:0.2},...] 格式,转为 [[x,y],...]
if raw_coords and isinstance(raw_coords[0], dict):
coordinates = [[p.get("x", 0), p.get("y", 0)] for p in raw_coords]
else:
coordinates = raw_coords
enabled = data.get("enabled", "True") == "True"
priority = int(data.get("priority", 0))
self._db_manager.save_roi_config(
roi_id=data["roi_id"],
camera_id=data.get("camera_id", ""),
roi_type=data.get("roi_type", "polygon"),
coordinates=coordinates,
enabled=enabled,
priority=priority,
)
count += 1
except Exception as e:
logger.error(f"同步ROI配置到SQLite失败: key={key}, error={e}")
# 同步绑定配置
bind_keys = self._redis_client.keys("config:bind:*")
for key in (bind_keys or []):
try:
data = self._redis_client.hgetall(key)
if not data or not data.get("bind_id"):
continue
params = eval(data["params"]) if data.get("params") else {}
enabled = data.get("enabled", "True") == "True"
priority = int(data.get("priority", 0))
self._db_manager.save_roi_algo_bind(
bind_id=data["bind_id"],
roi_id=data.get("roi_id", ""),
algo_code=data.get("algo_code", ""),
params=params,
priority=priority,
enabled=enabled,
)
count += 1
except Exception as e:
logger.error(f"同步绑定配置到SQLite失败: key={key}, error={e}")
logger.info(f"Redis→SQLite同步完成: {count} 条配置已更新")
except Exception as e:
logger.error(f"Redis→SQLite同步失败: {e}")
2026-01-29 18:33:12 +08:00
def start_config_subscription(self):
"""启动配置订阅线程"""
# 启动时先从Redis做一次全量同步防止服务重启时丢失配置
if self._redis_client:
try:
logger.info("启动时执行Redis→SQLite全量同步...")
self._sync_redis_to_sqlite(["all"])
except Exception as e:
logger.warning(f"启动时Redis同步失败将使用本地SQLite配置: {e}")
2026-01-29 18:33:12 +08:00
if self._pubsub_thread is None or not self._pubsub_thread.is_alive():
self._stop_event.clear()
self._pubsub_thread = threading.Thread(
target=self._subscribe_config_updates,
name="ConfigSubscription",
daemon=True
)
self._pubsub_thread.start()
logger.info("配置订阅线程已启动")
def stop_config_subscription(self):
"""停止配置订阅线程"""
self._stop_event.set()
if self._pubsub_thread and self._pubsub_thread.is_alive():
self._pubsub_thread.join(timeout=5)
logger.info("配置订阅线程已停止")
def get_cameras(self, force_refresh: bool = False) -> List[CameraInfoModel]:
"""获取摄像头配置列表"""
cache_key = "cameras"
if not force_refresh:
cached = self._cache.get(cache_key)
if cached is not None:
return cached
self._init_database()
if self._db_manager is None:
logger.warning("数据库管理器不可用,返回空摄像头列表")
return []
try:
cameras = self._db_manager.get_all_camera_configs()
2026-01-29 18:33:12 +08:00
result = [CameraInfoModel.from_dict(c) for c in cameras]
self._cache.set(cache_key, result)
logger.info(f"已加载摄像头配置: {len(result)}")
return result
except Exception as e:
logger.error(f"获取摄像头配置失败: {e}")
cached = self._cache.get(cache_key)
return cached or []
def get_roi_configs(self, camera_id: Optional[str] = None,
force_refresh: bool = False) -> List[ROIInfo]:
"""获取ROI配置列表兼容旧版本"""
2026-01-29 18:33:12 +08:00
cache_key = f"rois_{camera_id}" if camera_id else "rois_all"
if not force_refresh:
cached = self._cache.get(cache_key)
if cached is not None:
return cached
self._init_database()
if self._db_manager is None:
logger.warning("数据库管理器不可用返回空ROI配置列表")
return []
try:
if camera_id:
roi_configs = self._db_manager.get_rois_by_camera(camera_id)
else:
roi_configs = self._db_manager.get_all_roi_configs()
2026-01-29 18:33:12 +08:00
result = [ROIInfo.from_dict(r) for r in roi_configs]
self._cache.set(cache_key, result)
logger.info(f"已加载ROI配置: {len(result)}")
return result
except Exception as e:
logger.error(f"获取ROI配置失败: {e}")
cached = self._cache.get(cache_key)
return cached or []
def get_roi_configs_with_bindings(self, camera_id: Optional[str] = None,
force_refresh: bool = False) -> List[ROIInfoNew]:
"""获取ROI配置列表包含算法绑定信息"""
cache_key = f"rois_bindings_{camera_id}" if camera_id else "rois_bindings_all"
if not force_refresh:
cached = self._cache.get(cache_key)
if cached is not None:
return cached
self._init_database()
if self._db_manager is None:
logger.warning("数据库管理器不可用返回空ROI配置列表")
return []
try:
if camera_id:
roi_configs = self._db_manager.get_rois_by_camera(camera_id)
bindings_list = self._db_manager.get_bindings_by_camera(camera_id)
else:
roi_configs = self._db_manager.get_all_roi_configs()
bindings_list = []
for roi in roi_configs:
bindings = self._db_manager.get_bindings_by_roi(roi['roi_id'])
bindings_list.extend(bindings)
roi_dict = {r['roi_id']: r for r in roi_configs}
bindings_dict = {}
for b in bindings_list:
roi_id = b['roi_id']
if roi_id not in bindings_dict:
bindings_dict[roi_id] = []
bindings_dict[roi_id].append(b)
result = []
for roi_id, roi_data in roi_dict.items():
roi_info = ROIInfoNew.from_dict(roi_data)
if roi_id in bindings_dict:
roi_info.bindings = [ROIAlgoBind.from_dict(b) for b in bindings_dict[roi_id]]
result.append(roi_info)
result.sort(key=lambda x: x.priority, reverse=True)
self._cache.set(cache_key, result)
logger.info(f"已加载ROI配置含绑定: {len(result)}")
return result
except Exception as e:
logger.error(f"获取ROI配置含绑定失败: {e}")
cached = self._cache.get(cache_key)
return cached or []
2026-01-29 18:33:12 +08:00
def get_camera_rois(self, camera_id: str) -> List[ROIInfo]:
"""获取指定摄像头的ROI配置"""
return self.get_roi_configs(camera_id=camera_id)
def get_config_by_id(self, config_type: str, config_id: str) -> Optional[Dict[str, Any]]:
"""根据ID获取配置"""
self._init_database()
try:
if config_type == "camera":
camera = self._db_manager.get_camera_config(config_id)
return camera if camera else None
2026-01-29 18:33:12 +08:00
elif config_type == "roi":
roi = self._db_manager.get_roi_config(config_id)
return roi if roi else None
2026-01-29 18:33:12 +08:00
except Exception as e:
logger.error(f"获取配置失败: {e}")
return None
def publish_config_update(self, update_data: Dict[str, Any]) -> bool:
"""发布配置更新通知"""
if not self._redis_client:
logger.warning("Redis未连接无法发布配置更新")
return False
try:
update_data["version"] = self._config_version
update_data["timestamp"] = datetime.now().isoformat()
self._redis_client.publish("config_update", json.dumps(update_data))
logger.info(f"已发布配置更新: {update_data}")
return True
except Exception as e:
logger.error(f"发布配置更新失败: {e}")
return False
def invalidate_cache(self, cache_key: str):
"""使指定缓存失效"""
self._cache.delete(cache_key)
logger.info(f"缓存已失效: {cache_key}")
def invalidate_all_cache(self):
"""使所有缓存失效"""
self._cache.clear()
logger.info("所有缓存已失效")
def get_cache_stats(self) -> Dict[str, Any]:
"""获取缓存统计信息"""
return self._cache.get_stats()
def get_health_status(self) -> Dict[str, Any]:
"""获取健康状态"""
redis_healthy = False
if self._redis_client:
try:
self._redis_client.ping()
redis_healthy = True
except Exception:
pass
return {
"redis_connected": redis_healthy,
"config_version": self._config_version,
"cache_stats": self.get_cache_stats(),
"subscription_active": (
self._pubsub_thread is not None and
self._pubsub_thread.is_alive()
),
}
def _cache_roi_to_redis(self, roi_config: Dict[str, Any]) -> bool:
"""将ROI配置缓存到Redis"""
if not self._redis_client:
return False
try:
roi_id = roi_config.get("roi_id")
key = f"config:roi:{roi_id}"
self._redis_client.hset(key, mapping={
"roi_id": roi_id,
"camera_id": roi_config.get("camera_id", ""),
"roi_type": roi_config.get("roi_type", ""),
"coordinates": str(roi_config.get("coordinates", [])),
"enabled": str(roi_config.get("enabled", True)),
"priority": str(roi_config.get("priority", 0)),
})
self._redis_client.expire(key, 3600)
logger.debug(f"ROI配置已缓存到Redis: {key}")
return True
except Exception as e:
logger.error(f"缓存ROI配置到Redis失败: {e}")
return False
def _cache_algo_bind_to_redis(self, bind_config: Dict[str, Any]) -> bool:
"""将ROI算法绑定配置缓存到Redis"""
if not self._redis_client:
return False
try:
bind_id = bind_config.get("bind_id")
key = f"config:bind:{bind_id}"
self._redis_client.hset(key, mapping={
"bind_id": bind_id,
"roi_id": bind_config.get("roi_id", ""),
"algo_code": bind_config.get("algo_code", ""),
"params": str(bind_config.get("params", {})),
"priority": str(bind_config.get("priority", 0)),
"enabled": str(bind_config.get("enabled", True)),
"algo_name": bind_config.get("algo_name", ""),
"target_class": bind_config.get("target_class", "person"),
})
self._redis_client.expire(key, 3600)
logger.debug(f"ROI算法绑定配置已缓存到Redis: {key}")
return True
except Exception as e:
logger.error(f"缓存ROI算法绑定配置到Redis失败: {e}")
return False
def _cache_camera_to_redis(self, camera_config: Dict[str, Any]) -> bool:
"""将摄像头配置缓存到Redis"""
if not self._redis_client:
return False
try:
camera_id = camera_config.get("camera_id")
key = f"config:camera:{camera_id}"
self._redis_client.hset(key, mapping={
"camera_id": camera_id,
"rtsp_url": camera_config.get("rtsp_url", ""),
"camera_name": camera_config.get("camera_name", ""),
"enabled": str(camera_config.get("enabled", True)),
"location": camera_config.get("location", ""),
})
self._redis_client.expire(key, 3600)
logger.debug(f"摄像头配置已缓存到Redis: {key}")
return True
except Exception as e:
logger.error(f"缓存摄像头配置到Redis失败: {e}")
return False
def sync_all_to_redis(self) -> int:
"""同步所有配置到Redis缓存"""
if not self._redis_client:
return 0
count = 0
try:
cameras = self._db_manager.get_all_camera_configs()
for camera in cameras:
if self._cache_camera_to_redis(camera):
count += 1
rois = self._db_manager.get_all_roi_configs()
for roi in rois:
if self._cache_roi_to_redis(roi):
count += 1
self.clear_redis_cache("bind")
bindings = self._db_manager.get_bindings_by_camera("")
for bind in bindings:
if self._cache_algo_bind_to_redis(bind):
count += 1
logger.info(f"已同步 {count} 条配置到Redis缓存")
return count
except Exception as e:
logger.error(f"同步配置到Redis失败: {e}")
return count
def get_roi_from_redis(self, roi_id: str) -> Optional[Dict[str, Any]]:
"""从Redis获取ROI配置"""
if not self._redis_client:
return None
try:
key = f"config:roi:{roi_id}"
data = self._redis_client.hgetall(key)
if data:
data['coordinates'] = eval(data['coordinates']) if data.get('coordinates') else []
data['priority'] = int(data.get('priority', 0))
data['enabled'] = data.get('enabled', 'True') == 'True'
return data
return None
except Exception as e:
logger.error(f"从Redis获取ROI配置失败: {e}")
return None
def get_algo_bind_from_redis(self, bind_id: str) -> Optional[Dict[str, Any]]:
"""从Redis获取ROI算法绑定配置"""
if not self._redis_client:
return None
try:
key = f"config:bind:{bind_id}"
data = self._redis_client.hgetall(key)
if data:
data['params'] = eval(data['params']) if data.get('params') else {}
data['priority'] = int(data.get('priority', 0))
data['enabled'] = data.get('enabled', 'True') == 'True'
data['target_class'] = data.get('target_class', 'person')
return data
return None
except Exception as e:
logger.error(f"从Redis获取ROI算法绑定配置失败: {e}")
return None
def get_bindings_from_redis(self, roi_id: str) -> List[Dict[str, Any]]:
"""从Redis获取ROI的所有算法绑定"""
if not self._redis_client:
return []
try:
pattern = "config:bind:*"
keys = self._redis_client.keys(pattern)
results = []
for key in keys:
data = self._redis_client.hgetall(key)
if data and (not roi_id or data.get('roi_id') == roi_id):
data['params'] = eval(data['params']) if data.get('params') else {}
data['priority'] = int(data.get('priority', 0))
data['enabled'] = data.get('enabled', 'True') == 'True'
data['target_class'] = data.get('target_class', 'person')
results.append(data)
return sorted(results, key=lambda x: x['priority'], reverse=True)
except Exception as e:
logger.error(f"从Redis获取ROI算法绑定列表失败: {e}")
return []
def get_camera_from_redis(self, camera_id: str) -> Optional[Dict[str, Any]]:
"""从Redis获取摄像头配置"""
if not self._redis_client:
return None
try:
key = f"config:camera:{camera_id}"
data = self._redis_client.hgetall(key)
if data:
data['enabled'] = data.get('enabled', 'True') == 'True'
return data
return None
except Exception as e:
logger.error(f"从Redis获取摄像头配置失败: {e}")
return None
def notify_config_change(self, config_type: str, config_ids: List[str]):
"""通知配置变更发布到Redis频道"""
if not self._redis_client:
return
try:
message = {
"type": config_type,
"ids": config_ids,
"timestamp": datetime.now().isoformat(),
}
self._redis_client.publish("config_update", json.dumps(message))
logger.info(f"已发布配置变更通知: {config_type} - {config_ids}")
except Exception as e:
logger.error(f"发布配置变更通知失败: {e}")
def clear_redis_cache(self, config_type: Optional[str] = None):
"""清理Redis缓存"""
if not self._redis_client:
return
try:
if config_type == "roi":
keys = self._redis_client.keys("config:roi:*")
if keys:
self._redis_client.delete(*keys)
elif config_type == "camera":
keys = self._redis_client.keys("config:camera:*")
if keys:
self._redis_client.delete(*keys)
elif config_type == "bind":
keys = self._redis_client.keys("config:bind:*")
if keys:
self._redis_client.delete(*keys)
else:
keys = self._redis_client.keys("config:*")
if keys:
self._redis_client.delete(*keys)
logger.info(f"已清理Redis缓存: {config_type or 'all'}")
except Exception as e:
logger.error(f"清理Redis缓存失败: {e}")
def reload_algorithms(self):
"""重新加载所有算法配置"""
self.invalidate_all_cache()
self.clear_redis_cache()
count = self.sync_all_to_redis()
self.notify_config_change("roi", [])
self.notify_config_change("bind", [])
logger.info(f"算法配置已重新加载,更新了 {count} 条缓存")
2026-01-29 18:33:12 +08:00
def close(self):
"""关闭管理器"""
self.stop_config_subscription()
if self._redis_client:
if self._redis_pubsub:
self._redis_pubsub.close()
self._redis_client.close()
logger.info("Redis连接已关闭")
def get_config_sync_manager() -> ConfigSyncManager:
"""获取配置同步管理器单例"""
return ConfigSyncManager()