diff --git a/core/config_sync.py b/core/config_sync.py index 542625e..f88ee4c 100644 --- a/core/config_sync.py +++ b/core/config_sync.py @@ -116,6 +116,7 @@ class ConfigSyncManager: self._config_version = settings.config_version self._cache = ConfigCache() self._redis_client = None + self._redis_pubsub_client = None self._redis_pubsub = None self._pubsub_thread = None self._stop_event = threading.Event() @@ -131,7 +132,7 @@ class ConfigSyncManager: try: settings = get_settings() redis_config = settings.redis - + self._redis_client = redis.Redis( host=redis_config.host, port=redis_config.port, @@ -142,13 +143,24 @@ class ConfigSyncManager: socket_timeout=10, retry_on_timeout=True, ) - + + # Pub/Sub专用连接,不设socket_timeout,避免listen()周期性超时 + self._redis_pubsub_client = redis.Redis( + host=redis_config.host, + port=redis_config.port, + db=redis_config.db, + password=redis_config.password, + decode_responses=redis_config.decode_responses, + socket_connect_timeout=10, + ) + self._redis_client.ping() logger.info(f"Redis连接成功: {redis_config.host}:{redis_config.port}") except Exception as e: logger.error(f"Redis连接失败: {e}") self._redis_client = None + self._redis_pubsub_client = None def _init_database(self): """初始化数据库连接""" @@ -182,30 +194,39 @@ class ConfigSyncManager: logger.error(f"配置变更回调执行失败: {e}") def _subscribe_config_updates(self): - """订阅配置更新主题""" - if not self._redis_client: + """订阅配置更新主题(带自动重连)""" + if not self._redis_pubsub_client: logger.warning("Redis未连接,无法订阅配置更新") return - - try: - self._redis_pubsub = self._redis_client.pubsub() - self._redis_pubsub.subscribe("config_update") - - logger.info("已订阅config_update主题") - - for message in self._redis_pubsub.listen(): + + while not self._stop_event.is_set(): + try: + self._redis_pubsub = self._redis_pubsub_client.pubsub() + self._redis_pubsub.subscribe("config_update") + + logger.info("已订阅config_update主题") + + for message in self._redis_pubsub.listen(): + if self._stop_event.is_set(): + return + + if message["type"] == "message": + try: + data = json.loads(message["data"]) + self._handle_config_update(data) + except Exception as e: + logger.error(f"处理配置更新消息失败: {e}") + + except Exception as e: if self._stop_event.is_set(): - break - - if message["type"] == "message": - try: - data = json.loads(message["data"]) - self._handle_config_update(data) - except Exception as e: - logger.error(f"处理配置更新消息失败: {e}") - - except Exception as e: - logger.error(f"配置更新订阅异常: {e}") + return + logger.debug(f"配置更新订阅重连中: {e}") + try: + if self._redis_pubsub: + self._redis_pubsub.close() + except Exception: + pass + self._stop_event.wait(3) # 等3秒后重连 def _handle_config_update(self, data: Dict[str, Any]): """处理配置更新消息""" @@ -279,7 +300,12 @@ class ConfigSyncManager: data = self._redis_client.hgetall(key) if not data or not data.get("roi_id"): continue - coordinates = eval(data["coordinates"]) if data.get("coordinates") else [] + raw_coords = eval(data["coordinates"]) if data.get("coordinates") else [] + # 兼容 [{x:0.1, y:0.2},...] 格式,转为 [[x,y],...] + if raw_coords and isinstance(raw_coords[0], dict): + coordinates = [[p.get("x", 0), p.get("y", 0)] for p in raw_coords] + else: + coordinates = raw_coords enabled = data.get("enabled", "True") == "True" priority = int(data.get("priority", 0)) self._db_manager.save_roi_config( @@ -323,6 +349,14 @@ class ConfigSyncManager: def start_config_subscription(self): """启动配置订阅线程""" + # 启动时先从Redis做一次全量同步(防止服务重启时丢失配置) + if self._redis_client: + try: + logger.info("启动时执行Redis→SQLite全量同步...") + self._sync_redis_to_sqlite(["all"]) + except Exception as e: + logger.warning(f"启动时Redis同步失败(将使用本地SQLite配置): {e}") + if self._pubsub_thread is None or not self._pubsub_thread.is_alive(): self._stop_event.clear() self._pubsub_thread = threading.Thread(