fix: MQTT连接兼容paho-mqtt 1.x和2.x版本

- 添加paho-mqtt版本检测,兼容CallbackAPIVersion
- 修复回调函数签名,支持1.x的整数返回码和2.x的对象返回码
- 增强错误处理和日志输出
- 确保ResultReporter.initialize()被正确调用

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
2026-02-05 16:02:32 +08:00
parent 6c6c9b4c03
commit cea4cb877b
2 changed files with 45 additions and 23 deletions

View File

@@ -108,12 +108,22 @@ class ResultReporter:
def _init_mqtt(self):
"""初始化MQTT客户端"""
self._logger.info(f"正在连接 MQTT: {self._mqtt_broker}:{self._mqtt_port}")
try:
# 兼容不同版本的 paho-mqtt
try:
# paho-mqtt 2.0+ 版本
self._client = mqtt.Client(
client_id=self._mqtt_client_id,
protocol=mqtt.MQTTv5,
callback_api_version=mqtt.CallbackAPIVersion.VERSION2
)
except (AttributeError, TypeError):
# paho-mqtt 1.x 版本
self._client = mqtt.Client(
client_id=self._mqtt_client_id,
protocol=mqtt.MQTTv5
)
self._client.on_connect = self._on_connect
self._client.on_disconnect = self._on_disconnect
@@ -122,15 +132,20 @@ class ResultReporter:
self._client.connect(self._mqtt_broker, self._mqtt_port, 60)
self._client.loop_start()
self._logger.info(f"MQTT 客户端初始化: {self._mqtt_broker}:{self._mqtt_port}")
self._logger.info(f"MQTT 客户端初始化完成: {self._mqtt_broker}:{self._mqtt_port}")
except Exception as e:
self._logger.warning(f"MQTT 初始化失败: {e}")
self._logger.error(f"MQTT 初始化失败: {e}")
import traceback
self._logger.error(traceback.format_exc())
self._client = None
def _on_connect(self, client, userdata, flags, reason_code, properties):
"""MQTT连接回调"""
if reason_code == 0:
def _on_connect(self, client, userdata, flags, reason_code, properties=None):
"""MQTT连接回调(兼容 1.x 和 2.x"""
# paho-mqtt 1.x: rc 是整数0 表示成功
# paho-mqtt 2.x: reason_code 是对象,需要检查 .value 或直接比较
rc = reason_code if isinstance(reason_code, int) else getattr(reason_code, 'value', reason_code)
if rc == 0:
self._connected = True
self._reconnect_count = 0
self._logger.info("MQTT 连接成功")
@@ -142,14 +157,15 @@ class ResultReporter:
else:
self._logger.warning(f"MQTT 连接失败: {reason_code}")
def _on_disconnect(self, client, userdata, reason_code, properties):
"""MQTT断开连接回调"""
def _on_disconnect(self, client, userdata, reason_code, properties=None):
"""MQTT断开连接回调(兼容 1.x 和 2.x"""
self._connected = False
self._logger.warning(f"MQTT 连接断开: {reason_code}")
def _on_publish(self, client, userdata, mid, reason_code, properties):
"""MQTT发布回调"""
if reason_code == 0:
def _on_publish(self, client, userdata, mid, reason_code=None, properties=None):
"""MQTT发布回调(兼容 1.x 和 2.x"""
rc = 0 if reason_code is None else (reason_code if isinstance(reason_code, int) else getattr(reason_code, 'value', 0))
if rc == 0:
self._logger.debug(f"MQTT 消息发布成功: {mid}")
def report_alert(

View File

@@ -122,9 +122,15 @@ class EdgeInferenceService:
def _init_reporter(self):
"""初始化结果上报器"""
try:
self._reporter = ResultReporter()
self._logger.info("ResultReporter 对象已创建,准备初始化...")
self._reporter.initialize() # 初始化存储和MQTT连接
self._logger.info("结果上报器初始化成功")
except Exception as e:
self._logger.error(f"结果上报器初始化失败: {e}")
import traceback
self._logger.error(traceback.format_exc())
def _init_algorithm_manager(self):
"""初始化算法管理器"""