From 3266241064fbb3f84a9eee30c4cbfcad5a31f1c1 Mon Sep 17 00:00:00 2001 From: 16337 <1633794139@qq.com> Date: Thu, 9 Apr 2026 17:04:11 +0800 Subject: [PATCH] =?UTF-8?q?=E9=80=82=E9=85=8D:=20Edge=20=E5=85=A8=E5=B1=80?= =?UTF-8?q?=E5=8F=82=E6=95=B0=E8=A7=A3=E6=9E=90=20+=20AlgorithmManager=20?= =?UTF-8?q?=E4=B8=89=E7=BA=A7=E5=8F=82=E6=95=B0=E5=90=88=E5=B9=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- algorithms.py | 27 +++++++++++++++++++++++++++ config/database.py | 44 +++++++++++++++++++++++++++++++++++++++++++- core/config_sync.py | 13 +++++++++++++ main.py | 21 +++++++++++++++++++++ 4 files changed, 104 insertions(+), 1 deletion(-) diff --git a/algorithms.py b/algorithms.py index 2716bba..bf43bed 100644 --- a/algorithms.py +++ b/algorithms.py @@ -1556,6 +1556,7 @@ class AlgorithmManager: self.working_hours = working_hours or [] self._update_lock = threading.Lock() self._registered_keys: set = set() # 已注册的 (roi_id, bind_id, algo_type) 缓存 + self._global_params: Dict[str, Dict] = {} # 全局参数 {algo_code: params_dict} # Bug fix: 默认参数与算法构造函数一致 self.default_params = { @@ -1598,6 +1599,28 @@ class AlgorithmManager: self._pubsub_thread = None self._running = False + def update_global_params(self, global_params_map: Dict[str, Dict]): + """更新全局参数 + + Args: + global_params_map: {algo_code: params_dict} 格式的全局参数 + """ + with self._update_lock: + self._global_params = global_params_map or {} + logger.info(f"全局参数已更新: {list(self._global_params.keys())}") + + def get_min_alarm_duration(self, algorithm_type: str) -> Optional[int]: + """从全局参数获取最小告警持续时间(秒) + + Args: + algorithm_type: 算法类型(如 leave_post, intrusion) + + Returns: + 最小告警持续时间秒数,未配置返回 None + """ + gp = self._global_params.get(algorithm_type, {}) + return gp.get("min_alarm_duration_sec") + def start_config_subscription(self): """启动配置变更订阅""" try: @@ -2074,6 +2097,10 @@ class AlgorithmManager: self.algorithms[roi_id][key] = {} algo_params = self.default_params.get(algorithm_type, {}).copy() + # 三级合并:默认参数 → 全局参数 → 绑定级参数 + global_p = self._global_params.get(algorithm_type, {}) + if global_p: + algo_params.update(global_p) if params: algo_params.update(params) diff --git a/config/database.py b/config/database.py index 03afb25..5c1597c 100644 --- a/config/database.py +++ b/config/database.py @@ -259,8 +259,17 @@ class SQLiteManager: except Exception: pass # 列已存在,忽略 + # 算法全局参数表 + cursor.execute(""" + CREATE TABLE IF NOT EXISTS algo_global_params ( + algo_code TEXT PRIMARY KEY, + params TEXT NOT NULL DEFAULT '{}', + updated_at TEXT + ) + """) + self._init_default_algorithms() - + def _init_default_algorithms(self): """初始化默认算法配置""" try: @@ -948,6 +957,39 @@ class SQLiteManager: logger.error(f"获取所有绑定ID失败: {e}") return [] + def save_global_params(self, algo_code: str, params_dict: Dict[str, Any]) -> bool: + """保存算法全局参数(INSERT OR REPLACE)""" + try: + cursor = self._conn.cursor() + now = datetime.now().isoformat() + cursor.execute(""" + INSERT OR REPLACE INTO algo_global_params (algo_code, params, updated_at) + VALUES (?, ?, ?) + """, (algo_code, json.dumps(params_dict, ensure_ascii=False), now)) + self._conn.commit() + return True + except Exception as e: + logger.error(f"保存算法全局参数失败: {e}") + return False + + def get_all_global_params(self) -> Dict[str, Dict[str, Any]]: + """获取所有算法全局参数,返回 {algo_code: params_dict}""" + result: Dict[str, Dict[str, Any]] = {} + try: + cursor = self._conn.cursor() + cursor.execute("SELECT algo_code, params FROM algo_global_params") + for row in cursor.fetchall(): + algo_code = row[0] + params_str = row[1] + try: + result[algo_code] = json.loads(params_str) if params_str else {} + except (json.JSONDecodeError, TypeError): + result[algo_code] = {} + return result + except Exception as e: + logger.error(f"获取算法全局参数失败: {e}") + return result + def log_config_update( self, config_type: str, diff --git a/core/config_sync.py b/core/config_sync.py index 6083196..f1bd0e4 100644 --- a/core/config_sync.py +++ b/core/config_sync.py @@ -643,6 +643,19 @@ class ConfigSyncManager: # 清理 SQLite 中不在本次推送列表中的旧数据 self._cleanup_stale_records(incoming_camera_ids, incoming_roi_ids, incoming_bind_ids) + # 同步全局参数 + global_params = config_data.get("global_params") or config_data.get("globalParams") or {} + if global_params and isinstance(global_params, dict): + for algo_code, params_dict in global_params.items(): + if isinstance(params_dict, dict): + self._db_manager.save_global_params(algo_code, params_dict) + logger.info(f"全局参数同步完成: {list(global_params.keys())}") + + # 通知全局参数更新回调 + self._notify_callbacks("global_params_update", { + "global_params": global_params, + }) + except Exception as e: logger.error(f"配置同步到 SQLite 失败: {e}") diff --git a/main.py b/main.py index dfc0a05..b372b97 100644 --- a/main.py +++ b/main.py @@ -132,6 +132,15 @@ class EdgeInferenceService: daemon=True ).start() self._config_manager.register_callback("config_update", _on_config_update) + + def _on_global_params_update(topic, data): + if self._algorithm_manager: + global_params = data.get("global_params", {}) + self._algorithm_manager.update_global_params(global_params) + # 全局参数变更后需要清除注册缓存,使下一帧重新注册算法以应用新参数 + self._algorithm_manager._registered_keys.clear() + self._logger.info(f"全局参数回调已触发,已清除算法注册缓存") + self._config_manager.register_callback("global_params_update", _on_global_params_update) self._logger.info("配置管理器初始化成功") except Exception as e: self._logger.error(f"配置管理器初始化失败: {e}") @@ -198,6 +207,18 @@ class EdgeInferenceService: try: self._algorithm_manager = AlgorithmManager() self._algorithm_manager.start_config_subscription() + + # 启动时从 SQLite 加载已有全局参数 + try: + from config.database import get_sqlite_manager + db = get_sqlite_manager() + saved_global_params = db.get_all_global_params() + if saved_global_params: + self._algorithm_manager.update_global_params(saved_global_params) + self._logger.info(f"从 SQLite 加载全局参数: {list(saved_global_params.keys())}") + except Exception as e: + self._logger.warning(f"从 SQLite 加载全局参数失败: {e}") + self._logger.info("算法管理器初始化成功") except Exception as e: self._logger.error(f"算法管理器初始化失败: {e}")