feat: 新增MQTT配置类和服务

- AiMqttConfig: MQTT连接参数配置(broker/认证/topic前缀/QoS等)
- MqttService: MQTT连接管理、消息发布/订阅、自动重连、配置推送封装

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
2026-02-04 14:44:28 +08:00
parent 5f18a3e165
commit 752048c320
2 changed files with 187 additions and 0 deletions

View File

@@ -0,0 +1,29 @@
package com.genersoft.iot.vmp.aiot.config;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
@Data
@Component
@ConfigurationProperties(prefix = "ai.mqtt")
public class AiMqttConfig {
private boolean enabled = false;
private String broker = "tcp://127.0.0.1:1883";
private String clientId = "wvp-server";
private String username = "wvp";
private String password = "wvp123";
private String topicPrefix = "ai/config";
private int qos = 1;
private int connectTimeout = 10;
private int keepAlive = 60;
}

View File

@@ -0,0 +1,158 @@
package com.genersoft.iot.vmp.aiot.service;
import com.genersoft.iot.vmp.aiot.config.AiMqttConfig;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.mqttv5.client.*;
import org.eclipse.paho.mqttv5.client.persist.MemoryPersistence;
import org.eclipse.paho.mqttv5.common.MqttException;
import org.eclipse.paho.mqttv5.common.MqttMessage;
import org.eclipse.paho.mqttv5.common.MqttSubscription;
import org.eclipse.paho.mqttv5.common.packet.MqttProperties;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.nio.charset.StandardCharsets;
import java.util.function.BiConsumer;
@Slf4j
@Service
public class MqttService {
@Autowired
private AiMqttConfig mqttConfig;
private MqttAsyncClient client;
@PostConstruct
public void init() {
if (!mqttConfig.isEnabled()) {
log.info("MQTT未启用跳过初始化");
return;
}
try {
connect();
} catch (Exception e) {
log.error("MQTT初始化失败", e);
}
}
private void connect() throws MqttException {
client = new MqttAsyncClient(mqttConfig.getBroker(), mqttConfig.getClientId(), new MemoryPersistence());
MqttConnectionOptions options = new MqttConnectionOptions();
options.setUserName(mqttConfig.getUsername());
options.setPassword(mqttConfig.getPassword().getBytes(StandardCharsets.UTF_8));
options.setConnectionTimeout(mqttConfig.getConnectTimeout());
options.setKeepAliveInterval(mqttConfig.getKeepAlive());
options.setAutomaticReconnect(true);
options.setCleanStart(true);
client.setCallback(new MqttCallback() {
@Override
public void disconnected(MqttDisconnectResponse response) {
log.warn("MQTT连接断开: {}", response.getReasonString());
}
@Override
public void mqttErrorOccurred(MqttException exception) {
log.error("MQTT错误", exception);
}
@Override
public void messageArrived(String topic, MqttMessage message) {
log.debug("收到MQTT消息: topic={}, payload={}", topic, new String(message.getPayload(), StandardCharsets.UTF_8));
}
@Override
public void deliveryComplete(IMqttToken token) {
log.debug("MQTT消息投递完成");
}
@Override
public void connectComplete(boolean reconnect, String serverURI) {
log.info("MQTT{}连接成功: {}", reconnect ? "重新" : "", serverURI);
}
@Override
public void authPacketArrived(int reasonCode, MqttProperties properties) {
}
});
client.connect(options).waitForCompletion();
log.info("MQTT连接成功: broker={}, clientId={}", mqttConfig.getBroker(), mqttConfig.getClientId());
}
public void publish(String topic, String payload, int qos) {
if (client == null || !client.isConnected()) {
throw new RuntimeException("MQTT客户端未连接");
}
try {
MqttMessage message = new MqttMessage(payload.getBytes(StandardCharsets.UTF_8));
message.setQos(qos);
client.publish(topic, message).waitForCompletion();
log.info("MQTT消息发布成功: topic={}", topic);
} catch (MqttException e) {
log.error("MQTT消息发布失败: topic={}", topic, e);
throw new RuntimeException("MQTT发布失败: " + e.getMessage());
}
}
public void subscribe(String topic, int qos, BiConsumer<String, String> callback) {
if (client == null || !client.isConnected()) {
throw new RuntimeException("MQTT客户端未连接");
}
try {
MqttSubscription subscription = new MqttSubscription(topic, qos);
client.subscribe(new MqttSubscription[]{subscription}, null, null, new MqttCallback() {
@Override
public void disconnected(MqttDisconnectResponse response) {}
@Override
public void mqttErrorOccurred(MqttException exception) {}
@Override
public void messageArrived(String t, MqttMessage message) {
String payload = new String(message.getPayload(), StandardCharsets.UTF_8);
callback.accept(t, payload);
}
@Override
public void deliveryComplete(IMqttToken token) {}
@Override
public void connectComplete(boolean reconnect, String serverURI) {}
@Override
public void authPacketArrived(int reasonCode, MqttProperties properties) {}
}, new MqttProperties());
log.info("MQTT订阅成功: topic={}", topic);
} catch (MqttException e) {
log.error("MQTT订阅失败: topic={}", topic, e);
throw new RuntimeException("MQTT订阅失败: " + e.getMessage());
}
}
public void pushConfig(String cameraId, String configJson) {
String topic = mqttConfig.getTopicPrefix() + "/" + cameraId + "/push";
publish(topic, configJson, mqttConfig.getQos());
}
public boolean isConnected() {
return client != null && client.isConnected();
}
@PreDestroy
public void destroy() {
if (client != null && client.isConnected()) {
try {
client.disconnect().waitForCompletion();
client.close();
log.info("MQTT连接已关闭");
} catch (MqttException e) {
log.error("MQTT关闭失败", e);
}
}
}
}