Files
iot-device-management-service/app/services/mqtt_service.py
16337 b3cf544343 feat: 注册 aiot 路由并更新主程序配置
- main.py:注册 aiot_alarm 和 aiot_edge 路由,保留旧路由兼容
- config.py/alert_service.py/mqtt_service.py:同步更新配置和服务
- 添加 CLAUDE.md 项目说明文档

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-06 16:39:53 +08:00

255 lines
8.2 KiB
Python

"""
MQTT 订阅服务
订阅边缘端告警和心跳消息
"""
import json
import threading
from datetime import datetime, timezone
from typing import Callable, Dict, Any, Optional, List
import paho.mqtt.client as mqtt
from app.config import settings
from app.utils.logger import logger
class MQTTService:
"""MQTT 订阅服务"""
def __init__(self):
self._client: Optional[mqtt.Client] = None
self._connected = False
self._running = False
self._use_v2_callback = False # paho-mqtt 版本标记
self._lock = threading.Lock()
# 回调函数
self._alert_handlers: List[Callable[[Dict[str, Any]], None]] = []
self._heartbeat_handlers: List[Callable[[Dict[str, Any]], None]] = []
# 统计
self._stats = {
"messages_received": 0,
"alerts_received": 0,
"heartbeats_received": 0,
"errors": 0,
}
@property
def is_connected(self) -> bool:
return self._connected
def register_alert_handler(self, handler: Callable[[Dict[str, Any]], None]):
"""注册告警处理回调"""
self._alert_handlers.append(handler)
logger.info(f"已注册告警处理器: {handler.__name__}")
def register_heartbeat_handler(self, handler: Callable[[Dict[str, Any]], None]):
"""注册心跳处理回调"""
self._heartbeat_handlers.append(handler)
logger.info(f"已注册心跳处理器: {handler.__name__}")
def start(self):
"""启动 MQTT 服务"""
if not settings.mqtt.enabled:
logger.info("MQTT 服务已禁用")
return
if self._running:
return
try:
# 兼容 paho-mqtt 1.x 和 2.x 版本
try:
# paho-mqtt 2.0+ 新 API
self._client = mqtt.Client(
client_id=settings.mqtt.client_id,
protocol=mqtt.MQTTv5,
callback_api_version=mqtt.CallbackAPIVersion.VERSION2
)
self._use_v2_callback = True
except AttributeError:
# paho-mqtt 1.x 旧 API
self._client = mqtt.Client(
client_id=settings.mqtt.client_id,
protocol=mqtt.MQTTv5
)
self._use_v2_callback = False
logger.info("使用 paho-mqtt 1.x 兼容模式")
# 设置回调
self._client.on_connect = self._on_connect
self._client.on_disconnect = self._on_disconnect
self._client.on_message = self._on_message
# 设置认证(如果有)
if settings.mqtt.username:
self._client.username_pw_set(
settings.mqtt.username,
settings.mqtt.password
)
# 连接
self._client.connect(
settings.mqtt.broker_host,
settings.mqtt.broker_port,
60
)
# 启动循环
self._client.loop_start()
self._running = True
logger.info(
f"MQTT 服务启动: {settings.mqtt.broker_host}:{settings.mqtt.broker_port}"
)
except Exception as e:
logger.error(f"MQTT 服务启动失败: {e}")
self._running = False
def stop(self):
"""停止 MQTT 服务"""
if not self._running:
return
self._running = False
if self._client:
self._client.loop_stop()
self._client.disconnect()
logger.info("MQTT 服务已停止")
def _on_connect(self, client, userdata, *args):
"""连接回调 (兼容 1.x 和 2.x)"""
# 1.x: (client, userdata, flags, rc)
# 2.x: (client, userdata, connect_flags, reason_code, properties)
if args:
reason_code = args[-2] if len(args) >= 2 else args[-1]
rc = reason_code if isinstance(reason_code, int) else getattr(reason_code, 'value', reason_code)
else:
rc = -1
if rc == 0:
self._connected = True
logger.info("MQTT 连接成功")
# 订阅告警主题
client.subscribe(settings.mqtt.alert_topic, settings.mqtt.qos)
logger.info(f"已订阅告警主题: {settings.mqtt.alert_topic}")
# 心跳主题通常是 edge/alert/heartbeat/# 但它已经被 edge/alert/# 包含
# 所以不需要单独订阅
else:
self._connected = False
logger.error(f"MQTT 连接失败: {reason_code}")
def _on_disconnect(self, client, userdata, *args):
"""断开连接回调 (兼容 1.x 和 2.x)"""
# 1.x: (client, userdata, rc)
# 2.x: (client, userdata, disconnect_flags, reason_code, properties)
self._connected = False
if args:
reason_code = args[-2] if len(args) >= 2 else args[0]
rc = reason_code if isinstance(reason_code, int) else getattr(reason_code, 'value', reason_code)
logger.warning(f"MQTT 连接断开: {rc}")
else:
logger.warning("MQTT 连接断开")
def _on_message(self, client, userdata, msg: mqtt.MQTTMessage):
"""消息回调"""
with self._lock:
self._stats["messages_received"] += 1
try:
topic = msg.topic
payload = json.loads(msg.payload.decode("utf-8"))
logger.debug(f"收到 MQTT 消息: topic={topic}")
# 判断消息类型
if "/heartbeat/" in topic:
self._handle_heartbeat(topic, payload)
else:
self._handle_alert(topic, payload)
except json.JSONDecodeError as e:
logger.error(f"MQTT 消息解析失败: {e}")
with self._lock:
self._stats["errors"] += 1
except Exception as e:
logger.error(f"MQTT 消息处理失败: {e}")
with self._lock:
self._stats["errors"] += 1
def _handle_alert(self, topic: str, payload: Dict[str, Any]):
"""处理告警消息"""
with self._lock:
self._stats["alerts_received"] += 1
# 从 topic 解析 camera_id 和 roi_id
# topic 格式: edge/alert/{camera_id}/{roi_id}
parts = topic.split("/")
if len(parts) >= 4:
camera_id = parts[2]
roi_id = parts[3] if len(parts) > 3 else None
# 确保 payload 中有这些字段
payload.setdefault("camera_id", camera_id)
payload.setdefault("roi_id", roi_id)
logger.info(
f"收到告警: camera={payload.get('camera_id')}, "
f"type={payload.get('alert_type')}, "
f"confidence={payload.get('confidence')}"
)
# 调用所有注册的处理器
for handler in self._alert_handlers:
try:
handler(payload)
except Exception as e:
logger.error(f"告警处理器执行失败: {e}")
def _handle_heartbeat(self, topic: str, payload: Dict[str, Any]):
"""处理心跳消息"""
with self._lock:
self._stats["heartbeats_received"] += 1
# 从 topic 解析 device_id
# topic 格式: edge/alert/heartbeat/{device_id}
parts = topic.split("/")
if len(parts) >= 4:
device_id = parts[3]
payload.setdefault("device_id", device_id)
logger.debug(f"收到心跳: device={payload.get('device_id')}")
# 调用所有注册的处理器
for handler in self._heartbeat_handlers:
try:
handler(payload)
except Exception as e:
logger.error(f"心跳处理器执行失败: {e}")
def get_statistics(self) -> Dict[str, Any]:
"""获取统计信息"""
with self._lock:
return {
**self._stats,
"connected": self._connected,
"running": self._running,
}
# 全局单例
_mqtt_service: Optional[MQTTService] = None
def get_mqtt_service() -> MQTTService:
"""获取 MQTT 服务单例"""
global _mqtt_service
if _mqtt_service is None:
_mqtt_service = MQTTService()
return _mqtt_service