diff --git a/src/main/java/com/genersoft/iot/vmp/aiot/config/AiMqttConfig.java b/src/main/java/com/genersoft/iot/vmp/aiot/config/AiMqttConfig.java new file mode 100644 index 000000000..0021b5107 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/aiot/config/AiMqttConfig.java @@ -0,0 +1,29 @@ +package com.genersoft.iot.vmp.aiot.config; + +import lombok.Data; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.stereotype.Component; + +@Data +@Component +@ConfigurationProperties(prefix = "ai.mqtt") +public class AiMqttConfig { + + private boolean enabled = false; + + private String broker = "tcp://127.0.0.1:1883"; + + private String clientId = "wvp-server"; + + private String username = "wvp"; + + private String password = "wvp123"; + + private String topicPrefix = "ai/config"; + + private int qos = 1; + + private int connectTimeout = 10; + + private int keepAlive = 60; +} diff --git a/src/main/java/com/genersoft/iot/vmp/aiot/service/MqttService.java b/src/main/java/com/genersoft/iot/vmp/aiot/service/MqttService.java new file mode 100644 index 000000000..bcf3c6296 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/aiot/service/MqttService.java @@ -0,0 +1,158 @@ +package com.genersoft.iot.vmp.aiot.service; + +import com.genersoft.iot.vmp.aiot.config.AiMqttConfig; +import jakarta.annotation.PostConstruct; +import jakarta.annotation.PreDestroy; +import lombok.extern.slf4j.Slf4j; +import org.eclipse.paho.mqttv5.client.*; +import org.eclipse.paho.mqttv5.client.persist.MemoryPersistence; +import org.eclipse.paho.mqttv5.common.MqttException; +import org.eclipse.paho.mqttv5.common.MqttMessage; +import org.eclipse.paho.mqttv5.common.MqttSubscription; +import org.eclipse.paho.mqttv5.common.packet.MqttProperties; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import java.nio.charset.StandardCharsets; +import java.util.function.BiConsumer; + +@Slf4j +@Service +public class MqttService { + + @Autowired + private AiMqttConfig mqttConfig; + + private MqttAsyncClient client; + + @PostConstruct + public void init() { + if (!mqttConfig.isEnabled()) { + log.info("MQTT未启用,跳过初始化"); + return; + } + try { + connect(); + } catch (Exception e) { + log.error("MQTT初始化失败", e); + } + } + + private void connect() throws MqttException { + client = new MqttAsyncClient(mqttConfig.getBroker(), mqttConfig.getClientId(), new MemoryPersistence()); + + MqttConnectionOptions options = new MqttConnectionOptions(); + options.setUserName(mqttConfig.getUsername()); + options.setPassword(mqttConfig.getPassword().getBytes(StandardCharsets.UTF_8)); + options.setConnectionTimeout(mqttConfig.getConnectTimeout()); + options.setKeepAliveInterval(mqttConfig.getKeepAlive()); + options.setAutomaticReconnect(true); + options.setCleanStart(true); + + client.setCallback(new MqttCallback() { + @Override + public void disconnected(MqttDisconnectResponse response) { + log.warn("MQTT连接断开: {}", response.getReasonString()); + } + + @Override + public void mqttErrorOccurred(MqttException exception) { + log.error("MQTT错误", exception); + } + + @Override + public void messageArrived(String topic, MqttMessage message) { + log.debug("收到MQTT消息: topic={}, payload={}", topic, new String(message.getPayload(), StandardCharsets.UTF_8)); + } + + @Override + public void deliveryComplete(IMqttToken token) { + log.debug("MQTT消息投递完成"); + } + + @Override + public void connectComplete(boolean reconnect, String serverURI) { + log.info("MQTT{}连接成功: {}", reconnect ? "重新" : "", serverURI); + } + + @Override + public void authPacketArrived(int reasonCode, MqttProperties properties) { + } + }); + + client.connect(options).waitForCompletion(); + log.info("MQTT连接成功: broker={}, clientId={}", mqttConfig.getBroker(), mqttConfig.getClientId()); + } + + public void publish(String topic, String payload, int qos) { + if (client == null || !client.isConnected()) { + throw new RuntimeException("MQTT客户端未连接"); + } + try { + MqttMessage message = new MqttMessage(payload.getBytes(StandardCharsets.UTF_8)); + message.setQos(qos); + client.publish(topic, message).waitForCompletion(); + log.info("MQTT消息发布成功: topic={}", topic); + } catch (MqttException e) { + log.error("MQTT消息发布失败: topic={}", topic, e); + throw new RuntimeException("MQTT发布失败: " + e.getMessage()); + } + } + + public void subscribe(String topic, int qos, BiConsumer callback) { + if (client == null || !client.isConnected()) { + throw new RuntimeException("MQTT客户端未连接"); + } + try { + MqttSubscription subscription = new MqttSubscription(topic, qos); + client.subscribe(new MqttSubscription[]{subscription}, null, null, new MqttCallback() { + @Override + public void disconnected(MqttDisconnectResponse response) {} + + @Override + public void mqttErrorOccurred(MqttException exception) {} + + @Override + public void messageArrived(String t, MqttMessage message) { + String payload = new String(message.getPayload(), StandardCharsets.UTF_8); + callback.accept(t, payload); + } + + @Override + public void deliveryComplete(IMqttToken token) {} + + @Override + public void connectComplete(boolean reconnect, String serverURI) {} + + @Override + public void authPacketArrived(int reasonCode, MqttProperties properties) {} + }, new MqttProperties()); + log.info("MQTT订阅成功: topic={}", topic); + } catch (MqttException e) { + log.error("MQTT订阅失败: topic={}", topic, e); + throw new RuntimeException("MQTT订阅失败: " + e.getMessage()); + } + } + + public void pushConfig(String cameraId, String configJson) { + String topic = mqttConfig.getTopicPrefix() + "/" + cameraId + "/push"; + publish(topic, configJson, mqttConfig.getQos()); + } + + public boolean isConnected() { + return client != null && client.isConnected(); + } + + @PreDestroy + public void destroy() { + if (client != null && client.isConnected()) { + try { + client.disconnect().waitForCompletion(); + client.close(); + log.info("MQTT连接已关闭"); + } catch (MqttException e) { + log.error("MQTT关闭失败", e); + } + } + } +}