From cea4cb877b41f5a0d557ef53d08e2ca228e8496d Mon Sep 17 00:00:00 2001 From: 16337 <1633794139@qq.com> Date: Thu, 5 Feb 2026 16:02:32 +0800 Subject: [PATCH] =?UTF-8?q?fix:=20MQTT=E8=BF=9E=E6=8E=A5=E5=85=BC=E5=AE=B9?= =?UTF-8?q?paho-mqtt=201.x=E5=92=8C2.x=E7=89=88=E6=9C=AC?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 添加paho-mqtt版本检测,兼容CallbackAPIVersion - 修复回调函数签名,支持1.x的整数返回码和2.x的对象返回码 - 增强错误处理和日志输出 - 确保ResultReporter.initialize()被正确调用 Co-Authored-By: Claude Opus 4.5 --- core/result_reporter.py | 56 ++++++++++++++++++++++++++--------------- main.py | 12 ++++++--- 2 files changed, 45 insertions(+), 23 deletions(-) diff --git a/core/result_reporter.py b/core/result_reporter.py index 1893c74..52b8b95 100644 --- a/core/result_reporter.py +++ b/core/result_reporter.py @@ -108,29 +108,44 @@ class ResultReporter: def _init_mqtt(self): """初始化MQTT客户端""" + self._logger.info(f"正在连接 MQTT: {self._mqtt_broker}:{self._mqtt_port}") try: - self._client = mqtt.Client( - client_id=self._mqtt_client_id, - protocol=mqtt.MQTTv5, - callback_api_version=mqtt.CallbackAPIVersion.VERSION2 - ) - + # 兼容不同版本的 paho-mqtt + try: + # paho-mqtt 2.0+ 版本 + self._client = mqtt.Client( + client_id=self._mqtt_client_id, + protocol=mqtt.MQTTv5, + callback_api_version=mqtt.CallbackAPIVersion.VERSION2 + ) + except (AttributeError, TypeError): + # paho-mqtt 1.x 版本 + self._client = mqtt.Client( + client_id=self._mqtt_client_id, + protocol=mqtt.MQTTv5 + ) + self._client.on_connect = self._on_connect self._client.on_disconnect = self._on_disconnect self._client.on_publish = self._on_publish - + self._client.connect(self._mqtt_broker, self._mqtt_port, 60) self._client.loop_start() - - self._logger.info(f"MQTT 客户端初始化: {self._mqtt_broker}:{self._mqtt_port}") - + + self._logger.info(f"MQTT 客户端初始化完成: {self._mqtt_broker}:{self._mqtt_port}") + except Exception as e: - self._logger.warning(f"MQTT 初始化失败: {e}") + self._logger.error(f"MQTT 初始化失败: {e}") + import traceback + self._logger.error(traceback.format_exc()) self._client = None - def _on_connect(self, client, userdata, flags, reason_code, properties): - """MQTT连接回调""" - if reason_code == 0: + def _on_connect(self, client, userdata, flags, reason_code, properties=None): + """MQTT连接回调(兼容 1.x 和 2.x)""" + # paho-mqtt 1.x: rc 是整数,0 表示成功 + # paho-mqtt 2.x: reason_code 是对象,需要检查 .value 或直接比较 + rc = reason_code if isinstance(reason_code, int) else getattr(reason_code, 'value', reason_code) + if rc == 0: self._connected = True self._reconnect_count = 0 self._logger.info("MQTT 连接成功") @@ -142,14 +157,15 @@ class ResultReporter: else: self._logger.warning(f"MQTT 连接失败: {reason_code}") - def _on_disconnect(self, client, userdata, reason_code, properties): - """MQTT断开连接回调""" + def _on_disconnect(self, client, userdata, reason_code, properties=None): + """MQTT断开连接回调(兼容 1.x 和 2.x)""" self._connected = False self._logger.warning(f"MQTT 连接断开: {reason_code}") - - def _on_publish(self, client, userdata, mid, reason_code, properties): - """MQTT发布回调""" - if reason_code == 0: + + def _on_publish(self, client, userdata, mid, reason_code=None, properties=None): + """MQTT发布回调(兼容 1.x 和 2.x)""" + rc = 0 if reason_code is None else (reason_code if isinstance(reason_code, int) else getattr(reason_code, 'value', 0)) + if rc == 0: self._logger.debug(f"MQTT 消息发布成功: {mid}") def report_alert( diff --git a/main.py b/main.py index 9c6ec97..de65e17 100644 --- a/main.py +++ b/main.py @@ -122,9 +122,15 @@ class EdgeInferenceService: def _init_reporter(self): """初始化结果上报器""" - self._reporter = ResultReporter() - self._reporter.initialize() # 初始化存储和MQTT连接 - self._logger.info("结果上报器初始化成功") + try: + self._reporter = ResultReporter() + self._logger.info("ResultReporter 对象已创建,准备初始化...") + self._reporter.initialize() # 初始化存储和MQTT连接 + self._logger.info("结果上报器初始化成功") + except Exception as e: + self._logger.error(f"结果上报器初始化失败: {e}") + import traceback + self._logger.error(traceback.format_exc()) def _init_algorithm_manager(self): """初始化算法管理器"""