683 lines
25 KiB
Python
683 lines
25 KiB
Python
"""
|
||
配置同步模块
|
||
实现 SQLite 本地配置存储、Redis Pub/Sub订阅、配置缓存与动态刷新
|
||
|
||
存储策略:
|
||
- SQLite: 本地配置存储(摄像头、ROI)
|
||
- Redis: 配置变更实时推送
|
||
- 内存缓存: 热点配置快速访问
|
||
"""
|
||
|
||
import json
|
||
import logging
|
||
import threading
|
||
import time
|
||
from datetime import datetime
|
||
from typing import Any, Callable, Dict, List, Optional, Set
|
||
|
||
import redis
|
||
from redis import Redis
|
||
from redis.client import PubSub
|
||
|
||
from config.settings import get_settings, RedisConfig
|
||
from config.database import get_sqlite_manager, SQLiteManager
|
||
from config.config_models import CameraInfo as CameraInfoModel, ROIInfo, ConfigVersion, ROIInfoNew, ROIAlgoBind
|
||
from utils.version_control import get_version_control
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
class ConfigCache:
|
||
"""配置缓存管理类"""
|
||
|
||
def __init__(self, max_size: int = 1000, ttl: int = 300):
|
||
self._cache: Dict[str, Any] = {}
|
||
self._access_times: Dict[str, float] = {}
|
||
self._max_size = max_size
|
||
self._ttl = ttl
|
||
self._lock = threading.RLock()
|
||
|
||
def get(self, key: str) -> Optional[Any]:
|
||
"""从缓存获取配置"""
|
||
with self._lock:
|
||
if key in self._cache:
|
||
access_time = self._access_times.get(key, 0)
|
||
if (time.time() - access_time) < self._ttl:
|
||
self._access_times[key] = time.time()
|
||
return self._cache[key]
|
||
else:
|
||
self._delete(key)
|
||
return None
|
||
|
||
def set(self, key: str, value: Any):
|
||
"""设置配置到缓存"""
|
||
with self._lock:
|
||
if len(self._cache) >= self._max_size:
|
||
self._evict_lru()
|
||
self._cache[key] = value
|
||
self._access_times[key] = time.time()
|
||
|
||
def delete(self, key: str):
|
||
"""删除缓存项"""
|
||
with self._lock:
|
||
self._delete(key)
|
||
|
||
def _delete(self, key: str):
|
||
"""内部删除方法(不获取锁)"""
|
||
self._cache.pop(key, None)
|
||
self._access_times.pop(key, None)
|
||
|
||
def _evict_lru(self):
|
||
"""淘汰最少使用的缓存项"""
|
||
if not self._access_times:
|
||
return
|
||
|
||
min_access_time = min(self._access_times.values())
|
||
lru_keys = [k for k, v in self._access_times.items() if v == min_access_time]
|
||
|
||
for key in lru_keys[:10]:
|
||
self._delete(key)
|
||
|
||
def clear(self):
|
||
"""清空缓存"""
|
||
with self._lock:
|
||
self._cache.clear()
|
||
self._access_times.clear()
|
||
|
||
def get_stats(self) -> Dict[str, Any]:
|
||
"""获取缓存统计信息"""
|
||
with self._lock:
|
||
return {
|
||
"size": len(self._cache),
|
||
"max_size": self._max_size,
|
||
"ttl": self._ttl,
|
||
}
|
||
|
||
|
||
class ConfigSyncManager:
|
||
"""配置同步管理器类"""
|
||
|
||
_instance = None
|
||
_lock = threading.Lock()
|
||
|
||
def __new__(cls):
|
||
if cls._instance is None:
|
||
with cls._lock:
|
||
if cls._instance is None:
|
||
cls._instance = super().__new__(cls)
|
||
cls._instance._initialized = False
|
||
return cls._instance
|
||
|
||
def __init__(self):
|
||
if self._initialized:
|
||
return
|
||
|
||
settings = get_settings()
|
||
self._config_version = settings.config_version
|
||
self._cache = ConfigCache()
|
||
self._redis_client = None
|
||
self._redis_pubsub = None
|
||
self._pubsub_thread = None
|
||
self._stop_event = threading.Event()
|
||
self._callbacks: Dict[str, Set[Callable]] = {}
|
||
self._db_manager = None
|
||
self._initialized = True
|
||
|
||
self._init_redis()
|
||
self._version_control = get_version_control()
|
||
|
||
def _init_redis(self):
|
||
"""初始化Redis连接"""
|
||
try:
|
||
settings = get_settings()
|
||
redis_config = settings.redis
|
||
|
||
self._redis_client = redis.Redis(
|
||
host=redis_config.host,
|
||
port=redis_config.port,
|
||
db=redis_config.db,
|
||
password=redis_config.password,
|
||
decode_responses=redis_config.decode_responses,
|
||
socket_connect_timeout=10,
|
||
socket_timeout=10,
|
||
retry_on_timeout=True,
|
||
)
|
||
|
||
self._redis_client.ping()
|
||
logger.info(f"Redis连接成功: {redis_config.host}:{redis_config.port}")
|
||
|
||
except Exception as e:
|
||
logger.error(f"Redis连接失败: {e}")
|
||
self._redis_client = None
|
||
|
||
def _init_database(self):
|
||
"""初始化数据库连接"""
|
||
if self._db_manager is None:
|
||
self._db_manager = get_sqlite_manager()
|
||
|
||
@property
|
||
def config_version(self) -> str:
|
||
"""获取当前配置版本"""
|
||
return self._config_version
|
||
|
||
def register_callback(self, topic: str, callback: Callable):
|
||
"""注册配置变更回调函数"""
|
||
if topic not in self._callbacks:
|
||
self._callbacks[topic] = set()
|
||
self._callbacks[topic].add(callback)
|
||
logger.info(f"已注册配置变更回调: {topic}")
|
||
|
||
def unregister_callback(self, topic: str, callback: Callable):
|
||
"""注销配置变更回调函数"""
|
||
if topic in self._callbacks:
|
||
self._callbacks[topic].discard(callback)
|
||
|
||
def _notify_callbacks(self, topic: str, data: Dict[str, Any]):
|
||
"""触发配置变更回调"""
|
||
if topic in self._callbacks:
|
||
for callback in self._callbacks[topic]:
|
||
try:
|
||
callback(topic, data)
|
||
except Exception as e:
|
||
logger.error(f"配置变更回调执行失败: {e}")
|
||
|
||
def _subscribe_config_updates(self):
|
||
"""订阅配置更新主题"""
|
||
if not self._redis_client:
|
||
logger.warning("Redis未连接,无法订阅配置更新")
|
||
return
|
||
|
||
try:
|
||
self._redis_pubsub = self._redis_client.pubsub()
|
||
self._redis_pubsub.subscribe("config_update")
|
||
|
||
logger.info("已订阅config_update主题")
|
||
|
||
for message in self._redis_pubsub.listen():
|
||
if self._stop_event.is_set():
|
||
break
|
||
|
||
if message["type"] == "message":
|
||
try:
|
||
data = json.loads(message["data"])
|
||
self._handle_config_update(data)
|
||
except Exception as e:
|
||
logger.error(f"处理配置更新消息失败: {e}")
|
||
|
||
except Exception as e:
|
||
logger.error(f"配置更新订阅异常: {e}")
|
||
|
||
def _handle_config_update(self, data: Dict[str, Any]):
|
||
"""处理配置更新消息"""
|
||
update_type = data.get("type", "full")
|
||
affected_items = data.get("affected_items", [])
|
||
version = data.get("version", self._config_version)
|
||
|
||
logger.info(f"收到配置更新通知: type={update_type}, items={affected_items}")
|
||
|
||
if "camera" in affected_items or "all" in affected_items:
|
||
self._cache.delete("cameras")
|
||
|
||
if "roi" in affected_items or "all" in affected_items:
|
||
self._cache.delete("rois")
|
||
|
||
self._config_version = version
|
||
self._notify_callbacks("config_update", data)
|
||
|
||
self._version_control.record_update(
|
||
version=version,
|
||
update_type="配置更新",
|
||
description=f"云端配置更新,影响范围: {', '.join(affected_items)}",
|
||
updated_by="云端系统",
|
||
affected_items=affected_items,
|
||
details=data
|
||
)
|
||
|
||
def start_config_subscription(self):
|
||
"""启动配置订阅线程"""
|
||
if self._pubsub_thread is None or not self._pubsub_thread.is_alive():
|
||
self._stop_event.clear()
|
||
self._pubsub_thread = threading.Thread(
|
||
target=self._subscribe_config_updates,
|
||
name="ConfigSubscription",
|
||
daemon=True
|
||
)
|
||
self._pubsub_thread.start()
|
||
logger.info("配置订阅线程已启动")
|
||
|
||
def stop_config_subscription(self):
|
||
"""停止配置订阅线程"""
|
||
self._stop_event.set()
|
||
if self._pubsub_thread and self._pubsub_thread.is_alive():
|
||
self._pubsub_thread.join(timeout=5)
|
||
logger.info("配置订阅线程已停止")
|
||
|
||
def get_cameras(self, force_refresh: bool = False) -> List[CameraInfoModel]:
|
||
"""获取摄像头配置列表"""
|
||
cache_key = "cameras"
|
||
|
||
if not force_refresh:
|
||
cached = self._cache.get(cache_key)
|
||
if cached is not None:
|
||
return cached
|
||
|
||
self._init_database()
|
||
|
||
if self._db_manager is None:
|
||
logger.warning("数据库管理器不可用,返回空摄像头列表")
|
||
return []
|
||
|
||
try:
|
||
cameras = self._db_manager.get_all_camera_configs()
|
||
result = [CameraInfoModel.from_dict(c) for c in cameras]
|
||
|
||
self._cache.set(cache_key, result)
|
||
logger.info(f"已加载摄像头配置: {len(result)} 个")
|
||
return result
|
||
|
||
except Exception as e:
|
||
logger.error(f"获取摄像头配置失败: {e}")
|
||
cached = self._cache.get(cache_key)
|
||
return cached or []
|
||
|
||
def get_roi_configs(self, camera_id: Optional[str] = None,
|
||
force_refresh: bool = False) -> List[ROIInfo]:
|
||
"""获取ROI配置列表(兼容旧版本)"""
|
||
cache_key = f"rois_{camera_id}" if camera_id else "rois_all"
|
||
|
||
if not force_refresh:
|
||
cached = self._cache.get(cache_key)
|
||
if cached is not None:
|
||
return cached
|
||
|
||
self._init_database()
|
||
|
||
if self._db_manager is None:
|
||
logger.warning("数据库管理器不可用,返回空ROI配置列表")
|
||
return []
|
||
|
||
try:
|
||
if camera_id:
|
||
roi_configs = self._db_manager.get_rois_by_camera(camera_id)
|
||
else:
|
||
roi_configs = self._db_manager.get_all_roi_configs()
|
||
result = [ROIInfo.from_dict(r) for r in roi_configs]
|
||
|
||
self._cache.set(cache_key, result)
|
||
logger.info(f"已加载ROI配置: {len(result)} 个")
|
||
return result
|
||
|
||
except Exception as e:
|
||
logger.error(f"获取ROI配置失败: {e}")
|
||
cached = self._cache.get(cache_key)
|
||
return cached or []
|
||
|
||
def get_roi_configs_with_bindings(self, camera_id: Optional[str] = None,
|
||
force_refresh: bool = False) -> List[ROIInfoNew]:
|
||
"""获取ROI配置列表(包含算法绑定信息)"""
|
||
cache_key = f"rois_bindings_{camera_id}" if camera_id else "rois_bindings_all"
|
||
|
||
if not force_refresh:
|
||
cached = self._cache.get(cache_key)
|
||
if cached is not None:
|
||
return cached
|
||
|
||
self._init_database()
|
||
|
||
if self._db_manager is None:
|
||
logger.warning("数据库管理器不可用,返回空ROI配置列表")
|
||
return []
|
||
|
||
try:
|
||
if camera_id:
|
||
roi_configs = self._db_manager.get_rois_by_camera(camera_id)
|
||
bindings_list = self._db_manager.get_bindings_by_camera(camera_id)
|
||
else:
|
||
roi_configs = self._db_manager.get_all_roi_configs()
|
||
bindings_list = []
|
||
for roi in roi_configs:
|
||
bindings = self._db_manager.get_bindings_by_roi(roi['roi_id'])
|
||
bindings_list.extend(bindings)
|
||
|
||
roi_dict = {r['roi_id']: r for r in roi_configs}
|
||
bindings_dict = {}
|
||
for b in bindings_list:
|
||
roi_id = b['roi_id']
|
||
if roi_id not in bindings_dict:
|
||
bindings_dict[roi_id] = []
|
||
bindings_dict[roi_id].append(b)
|
||
|
||
result = []
|
||
for roi_id, roi_data in roi_dict.items():
|
||
roi_info = ROIInfoNew.from_dict(roi_data)
|
||
if roi_id in bindings_dict:
|
||
roi_info.bindings = [ROIAlgoBind.from_dict(b) for b in bindings_dict[roi_id]]
|
||
result.append(roi_info)
|
||
|
||
result.sort(key=lambda x: x.priority, reverse=True)
|
||
|
||
self._cache.set(cache_key, result)
|
||
logger.info(f"已加载ROI配置(含绑定): {len(result)} 个")
|
||
return result
|
||
|
||
except Exception as e:
|
||
logger.error(f"获取ROI配置(含绑定)失败: {e}")
|
||
cached = self._cache.get(cache_key)
|
||
return cached or []
|
||
|
||
def get_camera_rois(self, camera_id: str) -> List[ROIInfo]:
|
||
"""获取指定摄像头的ROI配置"""
|
||
return self.get_roi_configs(camera_id=camera_id)
|
||
|
||
def get_config_by_id(self, config_type: str, config_id: str) -> Optional[Dict[str, Any]]:
|
||
"""根据ID获取配置"""
|
||
self._init_database()
|
||
|
||
try:
|
||
if config_type == "camera":
|
||
camera = self._db_manager.get_camera_config(config_id)
|
||
return camera if camera else None
|
||
elif config_type == "roi":
|
||
roi = self._db_manager.get_roi_config(config_id)
|
||
return roi if roi else None
|
||
except Exception as e:
|
||
logger.error(f"获取配置失败: {e}")
|
||
return None
|
||
|
||
def publish_config_update(self, update_data: Dict[str, Any]) -> bool:
|
||
"""发布配置更新通知"""
|
||
if not self._redis_client:
|
||
logger.warning("Redis未连接,无法发布配置更新")
|
||
return False
|
||
|
||
try:
|
||
update_data["version"] = self._config_version
|
||
update_data["timestamp"] = datetime.now().isoformat()
|
||
|
||
self._redis_client.publish("config_update", json.dumps(update_data))
|
||
logger.info(f"已发布配置更新: {update_data}")
|
||
return True
|
||
|
||
except Exception as e:
|
||
logger.error(f"发布配置更新失败: {e}")
|
||
return False
|
||
|
||
def invalidate_cache(self, cache_key: str):
|
||
"""使指定缓存失效"""
|
||
self._cache.delete(cache_key)
|
||
logger.info(f"缓存已失效: {cache_key}")
|
||
|
||
def invalidate_all_cache(self):
|
||
"""使所有缓存失效"""
|
||
self._cache.clear()
|
||
logger.info("所有缓存已失效")
|
||
|
||
def get_cache_stats(self) -> Dict[str, Any]:
|
||
"""获取缓存统计信息"""
|
||
return self._cache.get_stats()
|
||
|
||
def get_health_status(self) -> Dict[str, Any]:
|
||
"""获取健康状态"""
|
||
redis_healthy = False
|
||
if self._redis_client:
|
||
try:
|
||
self._redis_client.ping()
|
||
redis_healthy = True
|
||
except Exception:
|
||
pass
|
||
|
||
return {
|
||
"redis_connected": redis_healthy,
|
||
"config_version": self._config_version,
|
||
"cache_stats": self.get_cache_stats(),
|
||
"subscription_active": (
|
||
self._pubsub_thread is not None and
|
||
self._pubsub_thread.is_alive()
|
||
),
|
||
}
|
||
|
||
def _cache_roi_to_redis(self, roi_config: Dict[str, Any]) -> bool:
|
||
"""将ROI配置缓存到Redis"""
|
||
if not self._redis_client:
|
||
return False
|
||
|
||
try:
|
||
roi_id = roi_config.get("roi_id")
|
||
key = f"config:roi:{roi_id}"
|
||
|
||
self._redis_client.hset(key, mapping={
|
||
"roi_id": roi_id,
|
||
"camera_id": roi_config.get("camera_id", ""),
|
||
"roi_type": roi_config.get("roi_type", ""),
|
||
"coordinates": str(roi_config.get("coordinates", [])),
|
||
"enabled": str(roi_config.get("enabled", True)),
|
||
"priority": str(roi_config.get("priority", 0)),
|
||
})
|
||
|
||
self._redis_client.expire(key, 3600)
|
||
logger.debug(f"ROI配置已缓存到Redis: {key}")
|
||
return True
|
||
except Exception as e:
|
||
logger.error(f"缓存ROI配置到Redis失败: {e}")
|
||
return False
|
||
|
||
def _cache_algo_bind_to_redis(self, bind_config: Dict[str, Any]) -> bool:
|
||
"""将ROI算法绑定配置缓存到Redis"""
|
||
if not self._redis_client:
|
||
return False
|
||
|
||
try:
|
||
bind_id = bind_config.get("bind_id")
|
||
key = f"config:bind:{bind_id}"
|
||
|
||
self._redis_client.hset(key, mapping={
|
||
"bind_id": bind_id,
|
||
"roi_id": bind_config.get("roi_id", ""),
|
||
"algo_code": bind_config.get("algo_code", ""),
|
||
"params": str(bind_config.get("params", {})),
|
||
"priority": str(bind_config.get("priority", 0)),
|
||
"enabled": str(bind_config.get("enabled", True)),
|
||
"algo_name": bind_config.get("algo_name", ""),
|
||
"target_class": bind_config.get("target_class", "person"),
|
||
})
|
||
|
||
self._redis_client.expire(key, 3600)
|
||
logger.debug(f"ROI算法绑定配置已缓存到Redis: {key}")
|
||
return True
|
||
except Exception as e:
|
||
logger.error(f"缓存ROI算法绑定配置到Redis失败: {e}")
|
||
return False
|
||
|
||
def _cache_camera_to_redis(self, camera_config: Dict[str, Any]) -> bool:
|
||
"""将摄像头配置缓存到Redis"""
|
||
if not self._redis_client:
|
||
return False
|
||
|
||
try:
|
||
camera_id = camera_config.get("camera_id")
|
||
key = f"config:camera:{camera_id}"
|
||
|
||
self._redis_client.hset(key, mapping={
|
||
"camera_id": camera_id,
|
||
"rtsp_url": camera_config.get("rtsp_url", ""),
|
||
"camera_name": camera_config.get("camera_name", ""),
|
||
"enabled": str(camera_config.get("enabled", True)),
|
||
"location": camera_config.get("location", ""),
|
||
})
|
||
|
||
self._redis_client.expire(key, 3600)
|
||
logger.debug(f"摄像头配置已缓存到Redis: {key}")
|
||
return True
|
||
except Exception as e:
|
||
logger.error(f"缓存摄像头配置到Redis失败: {e}")
|
||
return False
|
||
|
||
def sync_all_to_redis(self) -> int:
|
||
"""同步所有配置到Redis缓存"""
|
||
if not self._redis_client:
|
||
return 0
|
||
|
||
count = 0
|
||
try:
|
||
cameras = self._db_manager.get_all_camera_configs()
|
||
for camera in cameras:
|
||
if self._cache_camera_to_redis(camera):
|
||
count += 1
|
||
|
||
rois = self._db_manager.get_all_roi_configs()
|
||
for roi in rois:
|
||
if self._cache_roi_to_redis(roi):
|
||
count += 1
|
||
|
||
self.clear_redis_cache("bind")
|
||
bindings = self._db_manager.get_bindings_by_camera("")
|
||
for bind in bindings:
|
||
if self._cache_algo_bind_to_redis(bind):
|
||
count += 1
|
||
|
||
logger.info(f"已同步 {count} 条配置到Redis缓存")
|
||
return count
|
||
except Exception as e:
|
||
logger.error(f"同步配置到Redis失败: {e}")
|
||
return count
|
||
|
||
def get_roi_from_redis(self, roi_id: str) -> Optional[Dict[str, Any]]:
|
||
"""从Redis获取ROI配置"""
|
||
if not self._redis_client:
|
||
return None
|
||
|
||
try:
|
||
key = f"config:roi:{roi_id}"
|
||
data = self._redis_client.hgetall(key)
|
||
if data:
|
||
data['coordinates'] = eval(data['coordinates']) if data.get('coordinates') else []
|
||
data['priority'] = int(data.get('priority', 0))
|
||
data['enabled'] = data.get('enabled', 'True') == 'True'
|
||
return data
|
||
return None
|
||
except Exception as e:
|
||
logger.error(f"从Redis获取ROI配置失败: {e}")
|
||
return None
|
||
|
||
def get_algo_bind_from_redis(self, bind_id: str) -> Optional[Dict[str, Any]]:
|
||
"""从Redis获取ROI算法绑定配置"""
|
||
if not self._redis_client:
|
||
return None
|
||
|
||
try:
|
||
key = f"config:bind:{bind_id}"
|
||
data = self._redis_client.hgetall(key)
|
||
if data:
|
||
data['params'] = eval(data['params']) if data.get('params') else {}
|
||
data['priority'] = int(data.get('priority', 0))
|
||
data['enabled'] = data.get('enabled', 'True') == 'True'
|
||
data['target_class'] = data.get('target_class', 'person')
|
||
return data
|
||
return None
|
||
except Exception as e:
|
||
logger.error(f"从Redis获取ROI算法绑定配置失败: {e}")
|
||
return None
|
||
|
||
def get_bindings_from_redis(self, roi_id: str) -> List[Dict[str, Any]]:
|
||
"""从Redis获取ROI的所有算法绑定"""
|
||
if not self._redis_client:
|
||
return []
|
||
|
||
try:
|
||
pattern = "config:bind:*"
|
||
keys = self._redis_client.keys(pattern)
|
||
results = []
|
||
for key in keys:
|
||
data = self._redis_client.hgetall(key)
|
||
if data and data.get('roi_id') == roi_id:
|
||
data['params'] = eval(data['params']) if data.get('params') else {}
|
||
data['priority'] = int(data.get('priority', 0))
|
||
data['enabled'] = data.get('enabled', 'True') == 'True'
|
||
data['target_class'] = data.get('target_class', 'person')
|
||
results.append(data)
|
||
return sorted(results, key=lambda x: x['priority'], reverse=True)
|
||
except Exception as e:
|
||
logger.error(f"从Redis获取ROI算法绑定列表失败: {e}")
|
||
return []
|
||
|
||
def get_camera_from_redis(self, camera_id: str) -> Optional[Dict[str, Any]]:
|
||
"""从Redis获取摄像头配置"""
|
||
if not self._redis_client:
|
||
return None
|
||
|
||
try:
|
||
key = f"config:camera:{camera_id}"
|
||
data = self._redis_client.hgetall(key)
|
||
if data:
|
||
data['enabled'] = data.get('enabled', 'True') == 'True'
|
||
return data
|
||
return None
|
||
except Exception as e:
|
||
logger.error(f"从Redis获取摄像头配置失败: {e}")
|
||
return None
|
||
|
||
def notify_config_change(self, config_type: str, config_ids: List[str]):
|
||
"""通知配置变更(发布到Redis频道)"""
|
||
if not self._redis_client:
|
||
return
|
||
|
||
try:
|
||
message = {
|
||
"type": config_type,
|
||
"ids": config_ids,
|
||
"timestamp": datetime.now().isoformat(),
|
||
}
|
||
self._redis_client.publish("config_update", json.dumps(message))
|
||
logger.info(f"已发布配置变更通知: {config_type} - {config_ids}")
|
||
except Exception as e:
|
||
logger.error(f"发布配置变更通知失败: {e}")
|
||
|
||
def clear_redis_cache(self, config_type: Optional[str] = None):
|
||
"""清理Redis缓存"""
|
||
if not self._redis_client:
|
||
return
|
||
|
||
try:
|
||
if config_type == "roi":
|
||
keys = self._redis_client.keys("config:roi:*")
|
||
if keys:
|
||
self._redis_client.delete(*keys)
|
||
elif config_type == "camera":
|
||
keys = self._redis_client.keys("config:camera:*")
|
||
if keys:
|
||
self._redis_client.delete(*keys)
|
||
elif config_type == "bind":
|
||
keys = self._redis_client.keys("config:bind:*")
|
||
if keys:
|
||
self._redis_client.delete(*keys)
|
||
else:
|
||
keys = self._redis_client.keys("config:*")
|
||
if keys:
|
||
self._redis_client.delete(*keys)
|
||
logger.info(f"已清理Redis缓存: {config_type or 'all'}")
|
||
except Exception as e:
|
||
logger.error(f"清理Redis缓存失败: {e}")
|
||
|
||
def reload_algorithms(self):
|
||
"""重新加载所有算法配置"""
|
||
self.invalidate_all_cache()
|
||
self.clear_redis_cache()
|
||
count = self.sync_all_to_redis()
|
||
self.notify_config_change("roi", [])
|
||
self.notify_config_change("bind", [])
|
||
logger.info(f"算法配置已重新加载,更新了 {count} 条缓存")
|
||
|
||
def close(self):
|
||
"""关闭管理器"""
|
||
self.stop_config_subscription()
|
||
if self._redis_client:
|
||
if self._redis_pubsub:
|
||
self._redis_pubsub.close()
|
||
self._redis_client.close()
|
||
logger.info("Redis连接已关闭")
|
||
|
||
|
||
def get_config_sync_manager() -> ConfigSyncManager:
|
||
"""获取配置同步管理器单例"""
|
||
return ConfigSyncManager()
|