feat(aiot): MqttService支持告警和心跳订阅

- 订阅 edge/alert/# 接收AI告警
- 订阅 edge/alert/heartbeat/# 接收设备心跳
- 告警入库并触发通知
- 心跳更新设备状态

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
2026-02-05 13:32:16 +08:00
parent 0a6931e093
commit 3848a517b9

View File

@@ -1,5 +1,8 @@
package com.genersoft.iot.vmp.aiot.service;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.aiot.bean.AiAlert;
import com.genersoft.iot.vmp.aiot.config.AiMqttConfig;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
@@ -11,6 +14,7 @@ import org.eclipse.paho.mqttv5.common.MqttMessage;
import org.eclipse.paho.mqttv5.common.MqttSubscription;
import org.eclipse.paho.mqttv5.common.packet.MqttProperties;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service;
import java.nio.charset.StandardCharsets;
@@ -23,6 +27,14 @@ public class MqttService {
@Autowired
private AiMqttConfig mqttConfig;
@Lazy
@Autowired
private IAiAlertService alertService;
@Lazy
@Autowired
private IAiEdgeDeviceService edgeDeviceService;
private MqttAsyncClient client;
@PostConstruct
@@ -73,6 +85,8 @@ public class MqttService {
@Override
public void connectComplete(boolean reconnect, String serverURI) {
log.info("MQTT{}连接成功: {}", reconnect ? "重新" : "", serverURI);
// 连接成功后订阅告警和心跳主题
subscribeAlertTopics();
}
@Override
@@ -84,6 +98,100 @@ public class MqttService {
log.info("MQTT连接成功: broker={}, clientId={}", mqttConfig.getBroker(), mqttConfig.getClientId());
}
/**
* 订阅告警和心跳主题
*/
private void subscribeAlertTopics() {
try {
// 订阅告警主题 edge/alert/+/+ (camera_id/roi_id)
subscribe("edge/alert/+/+", mqttConfig.getQos(), this::handleAlert);
// 订阅心跳主题 edge/alert/heartbeat/+
subscribe("edge/alert/heartbeat/+", mqttConfig.getQos(), this::handleHeartbeat);
log.info("MQTT告警和心跳主题订阅成功");
} catch (Exception e) {
log.error("MQTT告警主题订阅失败", e);
}
}
/**
* 处理告警消息
* topic格式: edge/alert/{camera_id}/{roi_id}
*/
private void handleAlert(String topic, String payload) {
try {
// 排除心跳主题
if (topic.contains("heartbeat")) {
return;
}
String[] parts = topic.split("/");
if (parts.length < 4) {
log.warn("[MQTT告警] topic格式异常: {}", topic);
return;
}
String cameraId = parts[2];
String roiId = parts[3];
JSONObject json = JSON.parseObject(payload);
AiAlert alert = new AiAlert();
alert.setAlertId(json.getString("alert_id"));
alert.setCameraId(cameraId);
alert.setRoiId(roiId);
alert.setBindId(json.getString("bind_id"));
alert.setAlertType(json.getString("alert_type"));
alert.setTargetClass(json.getString("target_class"));
alert.setConfidence(json.getDouble("confidence"));
alert.setBbox(json.containsKey("bbox") ? JSON.toJSONString(json.get("bbox")) : null);
alert.setMessage(json.getString("message"));
alert.setImagePath(json.getString("image_path"));
alert.setDurationMinutes(json.getDouble("duration_minutes"));
// timestamp → receivedAt
alert.setReceivedAt(json.getString("timestamp"));
// 其余字段放入extraData
JSONObject extra = new JSONObject();
for (String key : json.keySet()) {
if (!isStandardAlertField(key)) {
extra.put(key, json.get(key));
}
}
if (!extra.isEmpty()) {
alert.setExtraData(extra.toJSONString());
}
alertService.save(alert);
log.info("[MQTT告警] 收到告警: type={}, camera={}, roi={}", alert.getAlertType(), cameraId, roiId);
} catch (Exception e) {
log.error("[MQTT告警] 处理告警消息失败: topic={}", topic, e);
}
}
private boolean isStandardAlertField(String key) {
return "alert_id".equals(key) || "camera_id".equals(key) || "roi_id".equals(key)
|| "bind_id".equals(key) || "alert_type".equals(key) || "target_class".equals(key)
|| "confidence".equals(key) || "bbox".equals(key) || "message".equals(key)
|| "image_path".equals(key) || "duration_minutes".equals(key) || "timestamp".equals(key);
}
/**
* 处理心跳消息
* topic格式: edge/alert/heartbeat/{device_id}
*/
private void handleHeartbeat(String topic, String payload) {
try {
String[] parts = topic.split("/");
if (parts.length < 4) {
log.warn("[MQTT心跳] topic格式异常: {}", topic);
return;
}
String deviceId = parts[3];
edgeDeviceService.saveOrUpdateHeartbeat(deviceId, payload);
log.debug("[MQTT心跳] 收到心跳: deviceId={}", deviceId);
} catch (Exception e) {
log.error("[MQTT心跳] 处理心跳消息失败: topic={}", topic, e);
}
}
public void publish(String topic, String payload, int qos) {
if (client == null || !client.isConnected()) {
throw new RuntimeException("MQTT客户端未连接");
@@ -105,28 +213,12 @@ public class MqttService {
}
try {
MqttSubscription subscription = new MqttSubscription(topic, qos);
client.subscribe(new MqttSubscription[]{subscription}, null, null, new MqttCallback() {
@Override
public void disconnected(MqttDisconnectResponse response) {}
@Override
public void mqttErrorOccurred(MqttException exception) {}
@Override
public void messageArrived(String t, MqttMessage message) {
String payload = new String(message.getPayload(), StandardCharsets.UTF_8);
callback.accept(t, payload);
}
@Override
public void deliveryComplete(IMqttToken token) {}
@Override
public void connectComplete(boolean reconnect, String serverURI) {}
@Override
public void authPacketArrived(int reasonCode, MqttProperties properties) {}
}, new MqttProperties());
IMqttMessageListener listener = (t, message) -> {
String payload = new String(message.getPayload(), StandardCharsets.UTF_8);
callback.accept(t, payload);
};
client.subscribe(new MqttSubscription[]{subscription}, null, null,
new IMqttMessageListener[]{listener}, new MqttProperties());
log.info("MQTT订阅成功: topic={}", topic);
} catch (MqttException e) {
log.error("MQTT订阅失败: topic={}", topic, e);