Files
iot-device-management-service/app/services/mqtt_service.py

267 lines
8.6 KiB
Python
Raw Normal View History

"""
MQTT 订阅服务
订阅边缘端告警和心跳消息
"""
import json
import threading
import uuid
from datetime import datetime, timezone
from typing import Callable, Dict, Any, Optional, List
import paho.mqtt.client as mqtt
from app.config import settings
from app.utils.logger import logger
class MQTTService:
"""MQTT 订阅服务"""
def __init__(self):
self._client: Optional[mqtt.Client] = None
self._connected = False
self._running = False
self._use_v2_callback = False # paho-mqtt 版本标记
self._lock = threading.Lock()
# 回调函数
self._alert_handlers: List[Callable[[Dict[str, Any]], None]] = []
self._heartbeat_handlers: List[Callable[[Dict[str, Any]], None]] = []
# 统计
self._stats = {
"messages_received": 0,
"alerts_received": 0,
"heartbeats_received": 0,
"errors": 0,
}
@property
def is_connected(self) -> bool:
return self._connected
def register_alert_handler(self, handler: Callable[[Dict[str, Any]], None]):
"""注册告警处理回调"""
self._alert_handlers.append(handler)
logger.info(f"已注册告警处理器: {handler.__name__}")
def register_heartbeat_handler(self, handler: Callable[[Dict[str, Any]], None]):
"""注册心跳处理回调"""
self._heartbeat_handlers.append(handler)
logger.info(f"已注册心跳处理器: {handler.__name__}")
def start(self):
"""启动 MQTT 服务"""
if not settings.mqtt.enabled:
logger.info("MQTT 服务已禁用")
return
if self._running:
return
try:
# 给 client_id 添加随机后缀,防止多实例 client_id 冲突导致反复踢连接
unique_client_id = f"{settings.mqtt.client_id}_{uuid.uuid4().hex[:8]}"
# 兼容 paho-mqtt 1.x 和 2.x 版本
try:
# paho-mqtt 2.0+ 新 API
self._client = mqtt.Client(
client_id=unique_client_id,
protocol=mqtt.MQTTv311,
callback_api_version=mqtt.CallbackAPIVersion.VERSION2
)
self._use_v2_callback = True
except AttributeError:
# paho-mqtt 1.x 旧 API
self._client = mqtt.Client(
client_id=unique_client_id,
protocol=mqtt.MQTTv311
)
self._use_v2_callback = False
logger.info("使用 paho-mqtt 1.x 兼容模式")
# 设置回调
self._client.on_connect = self._on_connect
self._client.on_disconnect = self._on_disconnect
self._client.on_message = self._on_message
# 设置认证(如果有)
if settings.mqtt.username:
self._client.username_pw_set(
settings.mqtt.username,
settings.mqtt.password
)
# 连接
self._client.connect(
settings.mqtt.broker_host,
settings.mqtt.broker_port,
60
)
# 启动循环
self._client.loop_start()
self._running = True
logger.info(
f"MQTT 服务启动: {settings.mqtt.broker_host}:{settings.mqtt.broker_port}"
)
except Exception as e:
logger.error(f"MQTT 服务启动失败: {e}")
self._running = False
def stop(self):
"""停止 MQTT 服务"""
if not self._running:
return
self._running = False
if self._client:
self._client.loop_stop()
self._client.disconnect()
logger.info("MQTT 服务已停止")
def _on_connect(self, client, userdata, *args):
"""连接回调 (兼容 1.x 和 2.x)"""
# 1.x: (client, userdata, flags, rc)
# 2.x: (client, userdata, connect_flags, reason_code, properties)
rc = 0
if args:
# 取倒数第二个参数1.x 的 rc 或 2.x 的 reason_code
if len(args) >= 2:
reason_code = args[-2]
else:
reason_code = args[0]
rc = reason_code if isinstance(reason_code, int) else getattr(reason_code, 'value', 0)
if rc == 0:
self._connected = True
logger.info("MQTT 连接成功")
# 订阅告警主题
client.subscribe(settings.mqtt.alert_topic, settings.mqtt.qos)
logger.info(f"已订阅告警主题: {settings.mqtt.alert_topic}")
# 心跳主题通常是 edge/alert/heartbeat/# 但它已经被 edge/alert/# 包含
# 所以不需要单独订阅
else:
self._connected = False
logger.error(f"MQTT 连接失败: {reason_code}")
def _on_disconnect(self, client, userdata, *args):
"""断开连接回调 (兼容 1.x 和 2.x)"""
# 1.x: (client, userdata, rc)
# 2.x: (client, userdata, disconnect_flags, reason_code, properties)
self._connected = False
rc = 0
if args:
if len(args) >= 2:
reason_code = args[-2]
else:
reason_code = args[0]
rc = reason_code if isinstance(reason_code, int) else getattr(reason_code, 'value', 0)
if rc != 0:
logger.warning(f"MQTT 连接异常断开: rc={rc}")
else:
logger.info("MQTT 连接断开")
def _on_message(self, client, userdata, msg: mqtt.MQTTMessage):
"""消息回调"""
with self._lock:
self._stats["messages_received"] += 1
try:
topic = msg.topic
payload = json.loads(msg.payload.decode("utf-8"))
logger.debug(f"收到 MQTT 消息: topic={topic}")
# 判断消息类型
if "/heartbeat/" in topic:
self._handle_heartbeat(topic, payload)
else:
self._handle_alert(topic, payload)
except json.JSONDecodeError as e:
logger.error(f"MQTT 消息解析失败: {e}")
with self._lock:
self._stats["errors"] += 1
except Exception as e:
logger.error(f"MQTT 消息处理失败: {e}")
with self._lock:
self._stats["errors"] += 1
def _handle_alert(self, topic: str, payload: Dict[str, Any]):
"""处理告警消息"""
with self._lock:
self._stats["alerts_received"] += 1
# 从 topic 解析 camera_id 和 roi_id
# topic 格式: edge/alert/{camera_id}/{roi_id}
parts = topic.split("/")
if len(parts) >= 4:
camera_id = parts[2]
roi_id = parts[3] if len(parts) > 3 else None
# 确保 payload 中有这些字段
payload.setdefault("camera_id", camera_id)
payload.setdefault("roi_id", roi_id)
logger.info(
f"收到告警: camera={payload.get('camera_id')}, "
f"type={payload.get('alert_type')}, "
f"confidence={payload.get('confidence')}"
)
# 调用所有注册的处理器
for handler in self._alert_handlers:
try:
handler(payload)
except Exception as e:
logger.error(f"告警处理器执行失败: {e}")
def _handle_heartbeat(self, topic: str, payload: Dict[str, Any]):
"""处理心跳消息"""
with self._lock:
self._stats["heartbeats_received"] += 1
# 从 topic 解析 device_id
# topic 格式: edge/alert/heartbeat/{device_id}
parts = topic.split("/")
if len(parts) >= 4:
device_id = parts[3]
payload.setdefault("device_id", device_id)
logger.debug(f"收到心跳: device={payload.get('device_id')}")
# 调用所有注册的处理器
for handler in self._heartbeat_handlers:
try:
handler(payload)
except Exception as e:
logger.error(f"心跳处理器执行失败: {e}")
def get_statistics(self) -> Dict[str, Any]:
"""获取统计信息"""
with self._lock:
return {
**self._stats,
"connected": self._connected,
"running": self._running,
}
# 全局单例
_mqtt_service: Optional[MQTTService] = None
def get_mqtt_service() -> MQTTService:
"""获取 MQTT 服务单例"""
global _mqtt_service
if _mqtt_service is None:
_mqtt_service = MQTTService()
return _mqtt_service