From 1529322ca94fac0579afda58b3a0f032e7d952b5 Mon Sep 17 00:00:00 2001 From: 16337 <1633794139@qq.com> Date: Thu, 5 Feb 2026 09:58:04 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E6=94=B6=E5=88=B0config=5Fupdate?= =?UTF-8?q?=E6=97=B6=E5=90=8C=E6=AD=A5Redis=E9=85=8D=E7=BD=AE=E5=88=B0SQLi?= =?UTF-8?q?te?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 主推理管线从SQLite读取摄像头/ROI/绑定配置,而云端pushConfig 只写入Redis。新增_sync_redis_to_sqlite方法,在收到config_update 通知后将Redis中的camera/roi/bind keys同步写入SQLite, 并清除全部内存缓存确保下次读取获得最新数据。 Co-Authored-By: Claude Opus 4.5 --- core/config_sync.py | 98 ++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 93 insertions(+), 5 deletions(-) diff --git a/core/config_sync.py b/core/config_sync.py index efc3a77..542625e 100644 --- a/core/config_sync.py +++ b/core/config_sync.py @@ -212,18 +212,24 @@ class ConfigSyncManager: update_type = data.get("type", "full") affected_items = data.get("affected_items", []) version = data.get("version", self._config_version) - + logger.info(f"收到配置更新通知: type={update_type}, items={affected_items}") - + + # 从Redis同步最新配置到SQLite(主推理管线从SQLite读取) + self._sync_redis_to_sqlite(affected_items) + if "camera" in affected_items or "all" in affected_items: self._cache.delete("cameras") - + if "roi" in affected_items or "all" in affected_items: self._cache.delete("rois") - + + # 清除所有带 camera_id 前缀的 rois_bindings 缓存 + self._cache.clear() + self._config_version = version self._notify_callbacks("config_update", data) - + self._version_control.record_update( version=version, update_type="配置更新", @@ -232,6 +238,88 @@ class ConfigSyncManager: affected_items=affected_items, details=data ) + + def _sync_redis_to_sqlite(self, affected_items: List[str]): + """从Redis同步配置到SQLite,使主推理管线能读取最新配置""" + self._init_database() + if not self._redis_client or not self._db_manager: + logger.warning("Redis或SQLite不可用,跳过同步") + return + + try: + sync_cameras = "camera" in affected_items or "all" in affected_items + sync_rois = "roi" in affected_items or "all" in affected_items + + count = 0 + + if sync_cameras: + camera_keys = self._redis_client.keys("config:camera:*") + for key in (camera_keys or []): + try: + data = self._redis_client.hgetall(key) + if not data or not data.get("camera_id"): + continue + enabled = data.get("enabled", "True") == "True" + self._db_manager.save_camera_config( + camera_id=data["camera_id"], + rtsp_url=data.get("rtsp_url", ""), + camera_name=data.get("camera_name", ""), + enabled=enabled, + location=data.get("location", ""), + ) + count += 1 + except Exception as e: + logger.error(f"同步摄像头配置到SQLite失败: key={key}, error={e}") + + if sync_rois: + # 同步ROI配置 + roi_keys = self._redis_client.keys("config:roi:*") + for key in (roi_keys or []): + try: + data = self._redis_client.hgetall(key) + if not data or not data.get("roi_id"): + continue + coordinates = eval(data["coordinates"]) if data.get("coordinates") else [] + enabled = data.get("enabled", "True") == "True" + priority = int(data.get("priority", 0)) + self._db_manager.save_roi_config( + roi_id=data["roi_id"], + camera_id=data.get("camera_id", ""), + roi_type=data.get("roi_type", "polygon"), + coordinates=coordinates, + enabled=enabled, + priority=priority, + ) + count += 1 + except Exception as e: + logger.error(f"同步ROI配置到SQLite失败: key={key}, error={e}") + + # 同步绑定配置 + bind_keys = self._redis_client.keys("config:bind:*") + for key in (bind_keys or []): + try: + data = self._redis_client.hgetall(key) + if not data or not data.get("bind_id"): + continue + params = eval(data["params"]) if data.get("params") else {} + enabled = data.get("enabled", "True") == "True" + priority = int(data.get("priority", 0)) + self._db_manager.save_roi_algo_bind( + bind_id=data["bind_id"], + roi_id=data.get("roi_id", ""), + algo_code=data.get("algo_code", ""), + params=params, + priority=priority, + enabled=enabled, + ) + count += 1 + except Exception as e: + logger.error(f"同步绑定配置到SQLite失败: key={key}, error={e}") + + logger.info(f"Redis→SQLite同步完成: {count} 条配置已更新") + + except Exception as e: + logger.error(f"Redis→SQLite同步失败: {e}") def start_config_subscription(self): """启动配置订阅线程"""