feat: 实现配置热更新机制
数据库扩展: - roi_configs 新增算法参数字段(working_hours, confirm_on_duty_sec等) - 新增 config_update_log 表记录配置变更日志 Redis缓存: - ROI/摄像头配置缓存到 Redis(TTL 1小时) - sync_all_to_redis() 批量同步配置 - notify_config_change() 发布配置变更通知 热更新: - AlgorithmManager 订阅 Redis config_update 频道 - load_from_redis() 从 Redis 加载算法参数 - reload_algorithm() 热更新单个算法 - reload_all_algorithms() 重新加载所有算法 配置模型: - ROIInfo 添加算法参数字段
This commit is contained in:
138
algorithms.py
138
algorithms.py
@@ -264,6 +264,7 @@ class AlgorithmManager:
|
||||
def __init__(self, working_hours: Optional[List[Dict]] = None):
|
||||
self.algorithms: Dict[str, Dict[str, Any]] = {}
|
||||
self.working_hours = working_hours or []
|
||||
self._update_lock = threading.Lock()
|
||||
|
||||
self.default_params = {
|
||||
"leave_post": {
|
||||
@@ -278,7 +279,142 @@ class AlgorithmManager:
|
||||
"target_class": None,
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
self._pubsub = None
|
||||
self._pubsub_thread = None
|
||||
self._running = False
|
||||
|
||||
def start_config_subscription(self):
|
||||
"""启动配置变更订阅"""
|
||||
try:
|
||||
from config.settings import get_settings
|
||||
settings = get_settings()
|
||||
redis_client = redis.Redis(
|
||||
host=settings.redis.host,
|
||||
port=settings.redis.port,
|
||||
db=settings.redis.db,
|
||||
password=settings.redis.password,
|
||||
decode_responses=True,
|
||||
)
|
||||
|
||||
self._pubsub = redis_client.pubsub()
|
||||
self._pubsub.subscribe("config_update")
|
||||
|
||||
self._running = True
|
||||
self._pubsub_thread = threading.Thread(
|
||||
target=self._config_update_worker,
|
||||
name="ConfigUpdateSub",
|
||||
daemon=True
|
||||
)
|
||||
self._pubsub_thread.start()
|
||||
logger.info("已启动配置变更订阅")
|
||||
except Exception as e:
|
||||
logger.error(f"启动配置订阅失败: {e}")
|
||||
|
||||
def _config_update_worker(self):
|
||||
"""配置更新订阅工作线程"""
|
||||
try:
|
||||
for message in self._pubsub.listen():
|
||||
if not self._running:
|
||||
break
|
||||
if message["type"] == "message":
|
||||
try:
|
||||
import json
|
||||
data = json.loads(message["data"])
|
||||
if data.get("type") == "roi":
|
||||
roi_ids = data.get("ids", [])
|
||||
if roi_ids:
|
||||
for roi_id in roi_ids:
|
||||
self.reload_algorithm(roi_id)
|
||||
else:
|
||||
self.reload_all_algorithms()
|
||||
except Exception as e:
|
||||
logger.error(f"处理配置更新消息失败: {e}")
|
||||
except Exception as e:
|
||||
logger.error(f"配置订阅线程异常: {e}")
|
||||
|
||||
def stop_config_subscription(self):
|
||||
"""停止配置变更订阅"""
|
||||
self._running = False
|
||||
if self._pubsub:
|
||||
self._pubsub.close()
|
||||
if self._pubsub_thread and self._pubsub_thread.is_alive():
|
||||
self._pubsub_thread.join(timeout=5)
|
||||
logger.info("配置订阅已停止")
|
||||
|
||||
def load_from_redis(self, roi_id: str) -> bool:
|
||||
"""从Redis加载单个ROI的算法配置"""
|
||||
try:
|
||||
from core.config_sync import get_config_sync_manager
|
||||
config_manager = get_config_sync_manager()
|
||||
roi_config = config_manager.get_roi_from_redis(roi_id)
|
||||
|
||||
if not roi_config:
|
||||
return False
|
||||
|
||||
with self._update_lock:
|
||||
algorithm_type = roi_config.get("algorithm_type", "leave_post")
|
||||
|
||||
if algorithm_type == "leave_post":
|
||||
params = {
|
||||
"working_hours": roi_config.get("working_hours"),
|
||||
"confirm_on_duty_sec": roi_config.get("confirm_on_duty_sec", 10),
|
||||
"confirm_leave_sec": roi_config.get("confirm_leave_sec", 10),
|
||||
"cooldown_sec": roi_config.get("cooldown_sec", 300),
|
||||
"target_class": roi_config.get("target_class", "person"),
|
||||
}
|
||||
if roi_id in self.algorithms and "leave_post" in self.algorithms[roi_id]:
|
||||
algo = self.algorithms[roi_id]["leave_post"]
|
||||
algo.confirm_on_duty_sec = params["confirm_on_duty_sec"]
|
||||
algo.confirm_leave_sec = params["confirm_leave_sec"]
|
||||
algo.cooldown_sec = params["cooldown_sec"]
|
||||
algo.target_class = params["target_class"]
|
||||
if params["working_hours"]:
|
||||
algo.working_hours = params["working_hours"]
|
||||
logger.info(f"已热更新算法参数: {roi_id}")
|
||||
else:
|
||||
self.register_algorithm(roi_id, "leave_post", params)
|
||||
logger.info(f"已从Redis加载算法: {roi_id}")
|
||||
else:
|
||||
params = {}
|
||||
if roi_id in self.algorithms and algorithm_type in self.algorithms[roi_id]:
|
||||
pass
|
||||
else:
|
||||
self.register_algorithm(roi_id, algorithm_type, params)
|
||||
logger.info(f"已从Redis加载算法: {roi_id}")
|
||||
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.error(f"从Redis加载算法配置失败: {e}")
|
||||
return False
|
||||
|
||||
def reload_algorithm(self, roi_id: str) -> bool:
|
||||
"""重新加载单个ROI的算法配置"""
|
||||
if roi_id not in self.algorithms:
|
||||
return self.load_from_redis(roi_id)
|
||||
|
||||
self.reset_algorithm(roi_id)
|
||||
return self.load_from_redis(roi_id)
|
||||
|
||||
def reload_all_algorithms(self) -> int:
|
||||
"""重新加载所有算法配置"""
|
||||
count = 0
|
||||
try:
|
||||
from core.config_sync import get_config_sync_manager
|
||||
config_manager = get_config_sync_manager()
|
||||
roi_configs = config_manager.get_all_roi_configs()
|
||||
|
||||
for roi_config in roi_configs:
|
||||
roi_id = roi_config.get("roi_id")
|
||||
if self.reload_algorithm(roi_id):
|
||||
count += 1
|
||||
|
||||
logger.info(f"已重新加载 {count} 个算法配置")
|
||||
return count
|
||||
except Exception as e:
|
||||
logger.error(f"重新加载所有算法配置失败: {e}")
|
||||
return count
|
||||
|
||||
def register_algorithm(
|
||||
self,
|
||||
roi_id: str,
|
||||
|
||||
Reference in New Issue
Block a user