diff --git a/src/main/java/com/genersoft/iot/vmp/aiot/service/MqttService.java b/src/main/java/com/genersoft/iot/vmp/aiot/service/MqttService.java index bcf3c6296..c539a8761 100644 --- a/src/main/java/com/genersoft/iot/vmp/aiot/service/MqttService.java +++ b/src/main/java/com/genersoft/iot/vmp/aiot/service/MqttService.java @@ -1,5 +1,8 @@ package com.genersoft.iot.vmp.aiot.service; +import com.alibaba.fastjson2.JSON; +import com.alibaba.fastjson2.JSONObject; +import com.genersoft.iot.vmp.aiot.bean.AiAlert; import com.genersoft.iot.vmp.aiot.config.AiMqttConfig; import jakarta.annotation.PostConstruct; import jakarta.annotation.PreDestroy; @@ -11,6 +14,7 @@ import org.eclipse.paho.mqttv5.common.MqttMessage; import org.eclipse.paho.mqttv5.common.MqttSubscription; import org.eclipse.paho.mqttv5.common.packet.MqttProperties; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Lazy; import org.springframework.stereotype.Service; import java.nio.charset.StandardCharsets; @@ -23,6 +27,14 @@ public class MqttService { @Autowired private AiMqttConfig mqttConfig; + @Lazy + @Autowired + private IAiAlertService alertService; + + @Lazy + @Autowired + private IAiEdgeDeviceService edgeDeviceService; + private MqttAsyncClient client; @PostConstruct @@ -73,6 +85,8 @@ public class MqttService { @Override public void connectComplete(boolean reconnect, String serverURI) { log.info("MQTT{}连接成功: {}", reconnect ? "重新" : "", serverURI); + // 连接成功后订阅告警和心跳主题 + subscribeAlertTopics(); } @Override @@ -84,6 +98,100 @@ public class MqttService { log.info("MQTT连接成功: broker={}, clientId={}", mqttConfig.getBroker(), mqttConfig.getClientId()); } + /** + * 订阅告警和心跳主题 + */ + private void subscribeAlertTopics() { + try { + // 订阅告警主题 edge/alert/+/+ (camera_id/roi_id) + subscribe("edge/alert/+/+", mqttConfig.getQos(), this::handleAlert); + // 订阅心跳主题 edge/alert/heartbeat/+ + subscribe("edge/alert/heartbeat/+", mqttConfig.getQos(), this::handleHeartbeat); + log.info("MQTT告警和心跳主题订阅成功"); + } catch (Exception e) { + log.error("MQTT告警主题订阅失败", e); + } + } + + /** + * 处理告警消息 + * topic格式: edge/alert/{camera_id}/{roi_id} + */ + private void handleAlert(String topic, String payload) { + try { + // 排除心跳主题 + if (topic.contains("heartbeat")) { + return; + } + + String[] parts = topic.split("/"); + if (parts.length < 4) { + log.warn("[MQTT告警] topic格式异常: {}", topic); + return; + } + String cameraId = parts[2]; + String roiId = parts[3]; + + JSONObject json = JSON.parseObject(payload); + AiAlert alert = new AiAlert(); + alert.setAlertId(json.getString("alert_id")); + alert.setCameraId(cameraId); + alert.setRoiId(roiId); + alert.setBindId(json.getString("bind_id")); + alert.setAlertType(json.getString("alert_type")); + alert.setTargetClass(json.getString("target_class")); + alert.setConfidence(json.getDouble("confidence")); + alert.setBbox(json.containsKey("bbox") ? JSON.toJSONString(json.get("bbox")) : null); + alert.setMessage(json.getString("message")); + alert.setImagePath(json.getString("image_path")); + alert.setDurationMinutes(json.getDouble("duration_minutes")); + // timestamp → receivedAt + alert.setReceivedAt(json.getString("timestamp")); + + // 其余字段放入extraData + JSONObject extra = new JSONObject(); + for (String key : json.keySet()) { + if (!isStandardAlertField(key)) { + extra.put(key, json.get(key)); + } + } + if (!extra.isEmpty()) { + alert.setExtraData(extra.toJSONString()); + } + + alertService.save(alert); + log.info("[MQTT告警] 收到告警: type={}, camera={}, roi={}", alert.getAlertType(), cameraId, roiId); + } catch (Exception e) { + log.error("[MQTT告警] 处理告警消息失败: topic={}", topic, e); + } + } + + private boolean isStandardAlertField(String key) { + return "alert_id".equals(key) || "camera_id".equals(key) || "roi_id".equals(key) + || "bind_id".equals(key) || "alert_type".equals(key) || "target_class".equals(key) + || "confidence".equals(key) || "bbox".equals(key) || "message".equals(key) + || "image_path".equals(key) || "duration_minutes".equals(key) || "timestamp".equals(key); + } + + /** + * 处理心跳消息 + * topic格式: edge/alert/heartbeat/{device_id} + */ + private void handleHeartbeat(String topic, String payload) { + try { + String[] parts = topic.split("/"); + if (parts.length < 4) { + log.warn("[MQTT心跳] topic格式异常: {}", topic); + return; + } + String deviceId = parts[3]; + edgeDeviceService.saveOrUpdateHeartbeat(deviceId, payload); + log.debug("[MQTT心跳] 收到心跳: deviceId={}", deviceId); + } catch (Exception e) { + log.error("[MQTT心跳] 处理心跳消息失败: topic={}", topic, e); + } + } + public void publish(String topic, String payload, int qos) { if (client == null || !client.isConnected()) { throw new RuntimeException("MQTT客户端未连接"); @@ -105,28 +213,12 @@ public class MqttService { } try { MqttSubscription subscription = new MqttSubscription(topic, qos); - client.subscribe(new MqttSubscription[]{subscription}, null, null, new MqttCallback() { - @Override - public void disconnected(MqttDisconnectResponse response) {} - - @Override - public void mqttErrorOccurred(MqttException exception) {} - - @Override - public void messageArrived(String t, MqttMessage message) { - String payload = new String(message.getPayload(), StandardCharsets.UTF_8); - callback.accept(t, payload); - } - - @Override - public void deliveryComplete(IMqttToken token) {} - - @Override - public void connectComplete(boolean reconnect, String serverURI) {} - - @Override - public void authPacketArrived(int reasonCode, MqttProperties properties) {} - }, new MqttProperties()); + IMqttMessageListener listener = (t, message) -> { + String payload = new String(message.getPayload(), StandardCharsets.UTF_8); + callback.accept(t, payload); + }; + client.subscribe(new MqttSubscription[]{subscription}, null, null, + new IMqttMessageListener[]{listener}, new MqttProperties()); log.info("MQTT订阅成功: topic={}", topic); } catch (MqttException e) { log.error("MQTT订阅失败: topic={}", topic, e);