- mqtt_service: MQTT 订阅服务,对接 ai_edge 告警和心跳 - notification_service: WebSocket 实时推送服务 - device_service: 边缘设备心跳管理和离线检测 Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
227 lines
7.0 KiB
Python
227 lines
7.0 KiB
Python
"""
|
|
MQTT 订阅服务
|
|
订阅边缘端告警和心跳消息
|
|
"""
|
|
import json
|
|
import threading
|
|
from datetime import datetime, timezone
|
|
from typing import Callable, Dict, Any, Optional, List
|
|
|
|
import paho.mqtt.client as mqtt
|
|
|
|
from app.config import settings
|
|
from app.utils.logger import logger
|
|
|
|
|
|
class MQTTService:
|
|
"""MQTT 订阅服务"""
|
|
|
|
def __init__(self):
|
|
self._client: Optional[mqtt.Client] = None
|
|
self._connected = False
|
|
self._running = False
|
|
self._lock = threading.Lock()
|
|
|
|
# 回调函数
|
|
self._alert_handlers: List[Callable[[Dict[str, Any]], None]] = []
|
|
self._heartbeat_handlers: List[Callable[[Dict[str, Any]], None]] = []
|
|
|
|
# 统计
|
|
self._stats = {
|
|
"messages_received": 0,
|
|
"alerts_received": 0,
|
|
"heartbeats_received": 0,
|
|
"errors": 0,
|
|
}
|
|
|
|
@property
|
|
def is_connected(self) -> bool:
|
|
return self._connected
|
|
|
|
def register_alert_handler(self, handler: Callable[[Dict[str, Any]], None]):
|
|
"""注册告警处理回调"""
|
|
self._alert_handlers.append(handler)
|
|
logger.info(f"已注册告警处理器: {handler.__name__}")
|
|
|
|
def register_heartbeat_handler(self, handler: Callable[[Dict[str, Any]], None]):
|
|
"""注册心跳处理回调"""
|
|
self._heartbeat_handlers.append(handler)
|
|
logger.info(f"已注册心跳处理器: {handler.__name__}")
|
|
|
|
def start(self):
|
|
"""启动 MQTT 服务"""
|
|
if not settings.mqtt.enabled:
|
|
logger.info("MQTT 服务已禁用")
|
|
return
|
|
|
|
if self._running:
|
|
return
|
|
|
|
try:
|
|
self._client = mqtt.Client(
|
|
client_id=settings.mqtt.client_id,
|
|
protocol=mqtt.MQTTv5,
|
|
callback_api_version=mqtt.CallbackAPIVersion.VERSION2
|
|
)
|
|
|
|
# 设置回调
|
|
self._client.on_connect = self._on_connect
|
|
self._client.on_disconnect = self._on_disconnect
|
|
self._client.on_message = self._on_message
|
|
|
|
# 设置认证(如果有)
|
|
if settings.mqtt.username:
|
|
self._client.username_pw_set(
|
|
settings.mqtt.username,
|
|
settings.mqtt.password
|
|
)
|
|
|
|
# 连接
|
|
self._client.connect(
|
|
settings.mqtt.broker_host,
|
|
settings.mqtt.broker_port,
|
|
60
|
|
)
|
|
|
|
# 启动循环
|
|
self._client.loop_start()
|
|
self._running = True
|
|
|
|
logger.info(
|
|
f"MQTT 服务启动: {settings.mqtt.broker_host}:{settings.mqtt.broker_port}"
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"MQTT 服务启动失败: {e}")
|
|
self._running = False
|
|
|
|
def stop(self):
|
|
"""停止 MQTT 服务"""
|
|
if not self._running:
|
|
return
|
|
|
|
self._running = False
|
|
|
|
if self._client:
|
|
self._client.loop_stop()
|
|
self._client.disconnect()
|
|
logger.info("MQTT 服务已停止")
|
|
|
|
def _on_connect(self, client, userdata, flags, reason_code, properties):
|
|
"""连接回调"""
|
|
if reason_code == 0:
|
|
self._connected = True
|
|
logger.info("MQTT 连接成功")
|
|
|
|
# 订阅告警主题
|
|
client.subscribe(settings.mqtt.alert_topic, settings.mqtt.qos)
|
|
logger.info(f"已订阅告警主题: {settings.mqtt.alert_topic}")
|
|
|
|
# 心跳主题通常是 edge/alert/heartbeat/# 但它已经被 edge/alert/# 包含
|
|
# 所以不需要单独订阅
|
|
else:
|
|
self._connected = False
|
|
logger.error(f"MQTT 连接失败: {reason_code}")
|
|
|
|
def _on_disconnect(self, client, userdata, reason_code, properties):
|
|
"""断开连接回调"""
|
|
self._connected = False
|
|
logger.warning(f"MQTT 连接断开: {reason_code}")
|
|
|
|
def _on_message(self, client, userdata, msg: mqtt.MQTTMessage):
|
|
"""消息回调"""
|
|
with self._lock:
|
|
self._stats["messages_received"] += 1
|
|
|
|
try:
|
|
topic = msg.topic
|
|
payload = json.loads(msg.payload.decode("utf-8"))
|
|
|
|
logger.debug(f"收到 MQTT 消息: topic={topic}")
|
|
|
|
# 判断消息类型
|
|
if "/heartbeat/" in topic:
|
|
self._handle_heartbeat(topic, payload)
|
|
else:
|
|
self._handle_alert(topic, payload)
|
|
|
|
except json.JSONDecodeError as e:
|
|
logger.error(f"MQTT 消息解析失败: {e}")
|
|
with self._lock:
|
|
self._stats["errors"] += 1
|
|
except Exception as e:
|
|
logger.error(f"MQTT 消息处理失败: {e}")
|
|
with self._lock:
|
|
self._stats["errors"] += 1
|
|
|
|
def _handle_alert(self, topic: str, payload: Dict[str, Any]):
|
|
"""处理告警消息"""
|
|
with self._lock:
|
|
self._stats["alerts_received"] += 1
|
|
|
|
# 从 topic 解析 camera_id 和 roi_id
|
|
# topic 格式: edge/alert/{camera_id}/{roi_id}
|
|
parts = topic.split("/")
|
|
if len(parts) >= 4:
|
|
camera_id = parts[2]
|
|
roi_id = parts[3] if len(parts) > 3 else None
|
|
|
|
# 确保 payload 中有这些字段
|
|
payload.setdefault("camera_id", camera_id)
|
|
payload.setdefault("roi_id", roi_id)
|
|
|
|
logger.info(
|
|
f"收到告警: camera={payload.get('camera_id')}, "
|
|
f"type={payload.get('alert_type')}, "
|
|
f"confidence={payload.get('confidence')}"
|
|
)
|
|
|
|
# 调用所有注册的处理器
|
|
for handler in self._alert_handlers:
|
|
try:
|
|
handler(payload)
|
|
except Exception as e:
|
|
logger.error(f"告警处理器执行失败: {e}")
|
|
|
|
def _handle_heartbeat(self, topic: str, payload: Dict[str, Any]):
|
|
"""处理心跳消息"""
|
|
with self._lock:
|
|
self._stats["heartbeats_received"] += 1
|
|
|
|
# 从 topic 解析 device_id
|
|
# topic 格式: edge/alert/heartbeat/{device_id}
|
|
parts = topic.split("/")
|
|
if len(parts) >= 4:
|
|
device_id = parts[3]
|
|
payload.setdefault("device_id", device_id)
|
|
|
|
logger.debug(f"收到心跳: device={payload.get('device_id')}")
|
|
|
|
# 调用所有注册的处理器
|
|
for handler in self._heartbeat_handlers:
|
|
try:
|
|
handler(payload)
|
|
except Exception as e:
|
|
logger.error(f"心跳处理器执行失败: {e}")
|
|
|
|
def get_statistics(self) -> Dict[str, Any]:
|
|
"""获取统计信息"""
|
|
with self._lock:
|
|
return {
|
|
**self._stats,
|
|
"connected": self._connected,
|
|
"running": self._running,
|
|
}
|
|
|
|
|
|
# 全局单例
|
|
_mqtt_service: Optional[MQTTService] = None
|
|
|
|
|
|
def get_mqtt_service() -> MQTTService:
|
|
"""获取 MQTT 服务单例"""
|
|
global _mqtt_service
|
|
if _mqtt_service is None:
|
|
_mqtt_service = MQTTService()
|
|
return _mqtt_service
|