diff --git a/app/services/mqtt_service.py b/app/services/mqtt_service.py index 3154dce..5de890c 100644 --- a/app/services/mqtt_service.py +++ b/app/services/mqtt_service.py @@ -4,6 +4,7 @@ MQTT 订阅服务 """ import json import threading +import uuid from datetime import datetime, timezone from typing import Callable, Dict, Any, Optional, List @@ -59,20 +60,23 @@ class MQTTService: return try: + # 给 client_id 添加随机后缀,防止多实例 client_id 冲突导致反复踢连接 + unique_client_id = f"{settings.mqtt.client_id}_{uuid.uuid4().hex[:8]}" + # 兼容 paho-mqtt 1.x 和 2.x 版本 try: # paho-mqtt 2.0+ 新 API self._client = mqtt.Client( - client_id=settings.mqtt.client_id, - protocol=mqtt.MQTTv5, + client_id=unique_client_id, + protocol=mqtt.MQTTv311, callback_api_version=mqtt.CallbackAPIVersion.VERSION2 ) self._use_v2_callback = True except AttributeError: # paho-mqtt 1.x 旧 API self._client = mqtt.Client( - client_id=settings.mqtt.client_id, - protocol=mqtt.MQTTv5 + client_id=unique_client_id, + protocol=mqtt.MQTTv311 ) self._use_v2_callback = False logger.info("使用 paho-mqtt 1.x 兼容模式") @@ -124,11 +128,14 @@ class MQTTService: """连接回调 (兼容 1.x 和 2.x)""" # 1.x: (client, userdata, flags, rc) # 2.x: (client, userdata, connect_flags, reason_code, properties) + rc = 0 if args: - reason_code = args[-2] if len(args) >= 2 else args[-1] - rc = reason_code if isinstance(reason_code, int) else getattr(reason_code, 'value', reason_code) - else: - rc = -1 + # 取倒数第二个参数(1.x 的 rc 或 2.x 的 reason_code) + if len(args) >= 2: + reason_code = args[-2] + else: + reason_code = args[0] + rc = reason_code if isinstance(reason_code, int) else getattr(reason_code, 'value', 0) if rc == 0: self._connected = True @@ -149,12 +156,17 @@ class MQTTService: # 1.x: (client, userdata, rc) # 2.x: (client, userdata, disconnect_flags, reason_code, properties) self._connected = False + rc = 0 if args: - reason_code = args[-2] if len(args) >= 2 else args[0] - rc = reason_code if isinstance(reason_code, int) else getattr(reason_code, 'value', reason_code) - logger.warning(f"MQTT 连接断开: {rc}") + if len(args) >= 2: + reason_code = args[-2] + else: + reason_code = args[0] + rc = reason_code if isinstance(reason_code, int) else getattr(reason_code, 'value', 0) + if rc != 0: + logger.warning(f"MQTT 连接异常断开: rc={rc}") else: - logger.warning("MQTT 连接断开") + logger.info("MQTT 连接断开") def _on_message(self, client, userdata, msg: mqtt.MQTTMessage): """消息回调"""