protocolHandlers) {
- this.deviceMessageService = deviceMessageService;
- this.connectionManager = connectionManager;
- this.protocolHandlers = protocolHandlers;
- this.serverId = protocol.getServerId();
- }
-
- @Override
- public void handle(NetSocket socket) {
- String clientId = IdUtil.simpleUUID();
- log.debug("[handle][设备连接,客户端 ID: {},地址: {}]", clientId, socket.remoteAddress());
-
- // 设置异常和关闭处理器
- socket.exceptionHandler(ex -> {
- log.warn("[handle][连接异常,客户端 ID: {},地址: {}]", clientId, socket.remoteAddress(), ex);
- cleanupConnection(socket);
- });
-
- socket.closeHandler(v -> {
- log.debug("[handle][连接关闭,客户端 ID: {},地址: {}]", clientId, socket.remoteAddress());
- cleanupConnection(socket);
- });
-
- // 设置消息处理器
- socket.handler(buffer -> {
- try {
- processMessage(clientId, buffer, socket);
- } catch (Exception e) {
- log.error("[handle][消息处理失败,断开连接,客户端 ID: {},地址: {},错误: {}]",
- clientId, socket.remoteAddress(), e.getMessage(), e);
- cleanupConnection(socket);
- socket.close();
- }
- });
- }
-
- /**
- * 处理消息
- *
- * 流程:
- * 1. 检测消息格式类型(JSON/Binary/JT808)
- * 2. 解码消息
- * 3. 查找协议处理器
- * 4. 判断是否为认证消息
- * 5. 路由到协议处理器处理
- *
- * @param clientId 客户端 ID
- * @param buffer 消息
- * @param socket 网络连接
- * @throws Exception 消息解码失败时抛出异常
- */
- private void processMessage(String clientId, Buffer buffer, NetSocket socket) throws Exception {
- // 1. 基础检查
- if (buffer == null || buffer.length() == 0) {
- return;
- }
-
- // 2. 获取消息格式类型
- String codecType = getMessageCodecType(buffer, socket);
-
- // 3. 解码消息
- IotDeviceMessage message;
- try {
- message = deviceMessageService.decodeDeviceMessage(buffer.getBytes(), codecType);
- if (message == null) {
- throw new Exception("解码后消息为空");
- }
- } catch (Exception e) {
- // 消息格式错误时抛出异常,由上层处理连接断开
- throw new Exception("消息解码失败: " + e.getMessage(), e);
- }
-
- // 4. 查找协议处理器
- ProtocolHandler handler = findProtocolHandler(message, codecType);
- if (handler == null) {
- log.warn("[processMessage][未找到协议处理器,codecType: {}, method: {}]",
- codecType, message.getMethod());
- return;
- }
-
- // 5. 判断是否为认证消息
- if (isAuthenticationMessage(message)) {
- handleAuthenticationWithProtocol(clientId, message, codecType, socket, handler);
- } else {
- handleBusinessWithProtocol(clientId, message, codecType, socket, handler);
- }
- }
-
- /**
- * 使用协议处理器处理认证消息
- *
- * 认证结果处理:
- * - SUCCESS:注册连接,发送上线消息
- * - PENDING:等待后续认证步骤(如 JT808 注册后等待鉴权)
- * - FAILURE:认证失败,不做处理(协议处理器已发送失败响应)
- *
- * @param clientId 客户端 ID
- * @param message 认证消息
- * @param codecType 消息编解码类型
- * @param socket 网络连接
- * @param handler 协议处理器
- */
- private void handleAuthenticationWithProtocol(String clientId, IotDeviceMessage message,
- String codecType, NetSocket socket,
- ProtocolHandler handler) {
- try {
- // 委托给协议处理器
- AuthResult result = handler.handleAuthentication(clientId, message, codecType, socket);
-
- // 根据认证结果处理
- if (result.isSuccess()) {
- // 认证成功:注册连接并发送上线消息
- registerConnection(socket, result.getDevice(), clientId, codecType);
- sendOnlineMessage(result.getDevice());
-
- log.info("[handleAuthentication][认证成功,设备: {}, 协议: {}]",
- result.getDevice().getDeviceName(), handler.getProtocolType());
-
- } else if (result.isPending()) {
- // 认证待定:等待后续认证步骤(如 JT808 注册后等待鉴权)
- log.info("[handleAuthentication][认证待定,设备: {}, 协议: {}, 消息: {}]",
- result.getDevice() != null ? result.getDevice().getDeviceName() : "unknown",
- handler.getProtocolType(), result.getMessage());
-
- } else {
- // 认证失败:协议处理器已发送失败响应,这里只记录日志
- log.warn("[handleAuthentication][认证失败,clientId: {}, 协议: {}, 原因: {}]",
- clientId, handler.getProtocolType(), result.getMessage());
- }
-
- } catch (Exception e) {
- log.error("[handleAuthentication][认证异常,clientId: {}, 协议: {}]",
- clientId, handler.getProtocolType(), e);
- handler.sendResponse(socket, message, false, "认证异常", codecType);
- }
- }
-
- /**
- * 使用协议处理器处理业务消息
- *
- * 前置条件:设备已认证
- *
- * @param clientId 客户端 ID
- * @param message 业务消息
- * @param codecType 消息编解码类型
- * @param socket 网络连接
- * @param handler 协议处理器
- */
- private void handleBusinessWithProtocol(String clientId, IotDeviceMessage message,
- String codecType, NetSocket socket,
- ProtocolHandler handler) {
- try {
- // 1. 检查认证状态
- if (connectionManager.isNotAuthenticated(socket)) {
- log.warn("[handleBusinessMessage][设备未认证,clientId: {}]", clientId);
- handler.sendResponse(socket, message, false, "请先进行认证", codecType);
- return;
- }
-
- // 2. 获取连接信息
- IotTcpConnectionManager.ConnectionInfo connectionInfo = connectionManager.getConnectionInfo(socket);
- if (connectionInfo == null) {
- log.error("[handleBusinessMessage][连接信息不存在,clientId: {}]", clientId);
- return;
- }
-
- // 3. 委托给协议处理器处理业务消息
- handler.handleBusinessMessage(
- clientId,
- message,
- codecType,
- socket,
- connectionInfo.getProductKey(),
- connectionInfo.getDeviceName(),
- serverId
- );
-
- } catch (Exception e) {
- log.error("[handleBusinessMessage][业务消息处理异常,clientId: {}]", clientId, e);
- }
- }
-
- /**
- * 获取消息编解码类型
- *
- * 检测优先级:
- * 1. 如果已认证,使用缓存的编解码类型
- * 2. 未认证时,通过消息格式自动检测:
- * - JT808:首尾标识符 0x7e
- * - Binary:魔术字 0x7E
- * - JSON:默认
- *
- * @param buffer 消息
- * @param socket 网络连接
- * @return 消息编解码类型
- */
- private String getMessageCodecType(Buffer buffer, NetSocket socket) {
- // 1. 如果已认证,优先使用缓存的编解码类型
- IotTcpConnectionManager.ConnectionInfo connectionInfo = connectionManager.getConnectionInfo(socket);
- if (connectionInfo != null && connectionInfo.isAuthenticated() &&
- StrUtil.isNotBlank(connectionInfo.getCodecType())) {
- return connectionInfo.getCodecType();
- }
-
- // 2. 未认证时检测消息格式类型
- byte[] data = buffer.getBytes();
-
- // 2.1 检测是否为 JT808 格式(首尾标识符 0x7e)
- if (IotJt808DeviceMessageCodec.isJt808Format(data)) {
- return CODEC_TYPE_JT808;
- }
-
- // 2.2 检测是否为自定义二进制格式(魔术字 0x7E)
- if (IotTcpBinaryDeviceMessageCodec.isBinaryFormatQuick(data)) {
- return CODEC_TYPE_BINARY;
- }
-
- // 2.3 默认为 JSON 格式
- return CODEC_TYPE_JSON;
- }
-
- /**
- * 查找协议处理器
- *
- * 遍历所有协议处理器,返回第一个能处理该消息的处理器
- *
- * @param message 设备消息
- * @param codecType 消息编解码类型
- * @return 协议处理器,未找到返回 null
- */
- private ProtocolHandler findProtocolHandler(IotDeviceMessage message, String codecType) {
- return protocolHandlers.stream()
- .filter(handler -> handler.canHandle(message, codecType))
- .findFirst()
- .orElse(null);
- }
-
- /**
- * 判断是否为认证消息
- *
- * 认证消息包括:
- * - 标准认证:auth
- * - JT808 注册:jt808.terminal.register
- * - JT808 鉴权:jt808.terminal.auth
- *
- * @param message 设备消息
- * @return true-是认证消息,false-不是
- */
- private boolean isAuthenticationMessage(IotDeviceMessage message) {
- String method = message.getMethod();
- return "auth".equals(method)
- || "jt808.terminal.register".equals(method)
- || "jt808.terminal.auth".equals(method);
- }
-
- /**
- * 注册连接信息
- *
- * 将设备连接信息注册到连接管理器,包括:
- * - 设备 ID
- * - 产品 Key
- * - 设备名称
- * - 客户端 ID
- * - 编解码类型
- * - 认证状态
- *
- * @param socket 网络连接
- * @param device 设备信息
- * @param clientId 客户端 ID
- * @param codecType 消息编解码类型
- */
- private void registerConnection(NetSocket socket, IotDeviceRespDTO device,
- String clientId, String codecType) {
- IotTcpConnectionManager.ConnectionInfo connectionInfo = new IotTcpConnectionManager.ConnectionInfo()
- .setDeviceId(device.getId())
- .setProductKey(device.getProductKey())
- .setDeviceName(device.getDeviceName())
- .setClientId(clientId)
- .setCodecType(codecType)
- .setAuthenticated(true);
-
- // 注册连接(如果设备已有其他连接,会自动断开旧连接)
- connectionManager.registerConnection(socket, device.getId(), connectionInfo);
- }
-
- /**
- * 发送设备上线消息
- *
- * 设备认证成功后,发送上线消息到消息总线,通知业务层设备已上线
- *
- * @param device 设备信息
- */
- private void sendOnlineMessage(IotDeviceRespDTO device) {
- try {
- IotDeviceMessage onlineMessage = IotDeviceMessage.buildStateUpdateOnline();
- deviceMessageService.sendDeviceMessage(onlineMessage, device.getProductKey(),
- device.getDeviceName(), serverId);
-
- log.debug("[sendOnlineMessage][发送上线消息成功,设备: {}]", device.getDeviceName());
-
- } catch (Exception e) {
- log.error("[sendOnlineMessage][发送上线消息失败,设备: {}]", device.getDeviceName(), e);
- }
- }
-
- /**
- * 清理连接
- *
- * 连接关闭或异常时,清理连接信息并发送离线消息
- *
- * @param socket 网络连接
- */
- private void cleanupConnection(NetSocket socket) {
- try {
- // 1. 发送离线消息(如果已认证)
- IotTcpConnectionManager.ConnectionInfo connectionInfo = connectionManager.getConnectionInfo(socket);
- if (connectionInfo != null && connectionInfo.isAuthenticated()) {
- IotDeviceMessage offlineMessage = IotDeviceMessage.buildStateOffline();
- deviceMessageService.sendDeviceMessage(offlineMessage, connectionInfo.getProductKey(),
- connectionInfo.getDeviceName(), serverId);
-
- log.debug("[cleanupConnection][发送离线消息成功,设备: {}]", connectionInfo.getDeviceName());
- }
-
- // 2. 注销连接
- connectionManager.unregisterConnection(socket);
-
- } catch (Exception e) {
- log.error("[cleanupConnection][清理连接失败]", e);
- }
- }
-
-}
+package com.viewsh.module.iot.gateway.protocol.tcp.router;
+
+import cn.hutool.core.util.IdUtil;
+import cn.hutool.core.util.StrUtil;
+import com.viewsh.module.iot.core.biz.dto.IotDeviceRespDTO;
+import com.viewsh.module.iot.core.mq.message.IotDeviceMessage;
+import com.viewsh.module.iot.gateway.codec.jt808.IotJt808DeviceMessageCodec;
+import com.viewsh.module.iot.gateway.codec.tcp.IotTcpBinaryDeviceMessageCodec;
+import com.viewsh.module.iot.gateway.codec.tcp.IotTcpJsonDeviceMessageCodec;
+import com.viewsh.module.iot.gateway.protocol.tcp.IotTcpUpstreamProtocol;
+import com.viewsh.module.iot.gateway.protocol.tcp.handler.AuthResult;
+import com.viewsh.module.iot.gateway.protocol.tcp.handler.ProtocolHandler;
+import com.viewsh.module.iot.gateway.protocol.tcp.manager.IotTcpConnectionManager;
+import com.viewsh.module.iot.gateway.service.device.message.IotDeviceMessageService;
+import io.vertx.core.Handler;
+import io.vertx.core.buffer.Buffer;
+import io.vertx.core.net.NetSocket;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.List;
+
+/**
+ * TCP 上行消息处理器(重构版)
+ *
+ * 职责:
+ * 1. 管理 TCP 连接生命周期(连接建立、异常、关闭)
+ * 2. 检测消息格式类型(JSON/Binary/JT808)
+ * 3. 解码设备消息
+ * 4. 路由到对应的协议处理器
+ * 5. 管理设备认证状态
+ * 6. 发送设备上线/离线消息
+ *
+ * 设计原则:
+ * - 主处理器只负责路由,不包含协议特定逻辑
+ * - 协议处理器通过 Spring 自动注入,实现插件化
+ * - 认证成功后,统一注册连接和发送上线消息
+ * - 支持多步认证(如 JT808 的注册+鉴权)
+ *
+ * @author 芋道源码
+ */
+@Slf4j
+public class IotTcpUpstreamHandler implements Handler {
+
+ private static final String CODEC_TYPE_JSON = IotTcpJsonDeviceMessageCodec.TYPE;
+ private static final String CODEC_TYPE_BINARY = IotTcpBinaryDeviceMessageCodec.TYPE;
+ private static final String CODEC_TYPE_JT808 = IotJt808DeviceMessageCodec.TYPE;
+
+ private final IotDeviceMessageService deviceMessageService;
+ private final IotTcpConnectionManager connectionManager;
+ private final List protocolHandlers;
+ private final String serverId;
+
+ public IotTcpUpstreamHandler(IotTcpUpstreamProtocol protocol,
+ IotDeviceMessageService deviceMessageService,
+ IotTcpConnectionManager connectionManager,
+ List protocolHandlers) {
+ this.deviceMessageService = deviceMessageService;
+ this.connectionManager = connectionManager;
+ this.protocolHandlers = protocolHandlers;
+ this.serverId = protocol.getServerId();
+ }
+
+ @Override
+ public void handle(NetSocket socket) {
+ String clientId = IdUtil.simpleUUID();
+ log.debug("[handle][设备连接,客户端 ID: {},地址: {}]", clientId, socket.remoteAddress());
+
+ // 设置异常和关闭处理器
+ socket.exceptionHandler(ex -> {
+ log.warn("[handle][连接异常,客户端 ID: {},地址: {}]", clientId, socket.remoteAddress(), ex);
+ cleanupConnection(socket);
+ });
+
+ socket.closeHandler(v -> {
+ log.debug("[handle][连接关闭,客户端 ID: {},地址: {}]", clientId, socket.remoteAddress());
+ cleanupConnection(socket);
+ });
+
+ // 设置消息处理器
+ socket.handler(buffer -> {
+ try {
+ processMessage(clientId, buffer, socket);
+ } catch (Exception e) {
+ log.error("[handle][消息处理失败,断开连接,客户端 ID: {},地址: {},错误: {}]",
+ clientId, socket.remoteAddress(), e.getMessage(), e);
+ cleanupConnection(socket);
+ socket.close();
+ }
+ });
+ }
+
+ /**
+ * 处理消息
+ *
+ * 流程:
+ * 1. 检测消息格式类型(JSON/Binary/JT808)
+ * 2. 解码消息
+ * 3. 查找协议处理器
+ * 4. 判断是否为认证消息
+ * 5. 路由到协议处理器处理
+ *
+ * @param clientId 客户端 ID
+ * @param buffer 消息
+ * @param socket 网络连接
+ * @throws Exception 消息解码失败时抛出异常
+ */
+ private void processMessage(String clientId, Buffer buffer, NetSocket socket) throws Exception {
+ // 1. 基础检查
+ if (buffer == null || buffer.length() == 0) {
+ return;
+ }
+
+ // 2. 获取消息格式类型
+ String codecType = getMessageCodecType(buffer, socket);
+ if (codecType == null) {
+ log.warn("[processMessage][未知消息格式,断开连接,clientId: {},数据开头: {}]",
+ clientId, buffer.length() > 20 ? buffer.getString(0, 20) : buffer.toString());
+ socket.close();
+ return;
+ }
+
+ // 3. 解码消息
+ IotDeviceMessage message;
+ try {
+ message = deviceMessageService.decodeDeviceMessage(buffer.getBytes(), codecType);
+ if (message == null) {
+ throw new Exception("解码后消息为空");
+ }
+ } catch (Exception e) {
+ // 消息格式错误时抛出异常,由上层处理连接断开
+ throw new Exception("消息解码失败: " + e.getMessage(), e);
+ }
+
+ // 4. 查找协议处理器
+ ProtocolHandler handler = findProtocolHandler(message, codecType);
+ if (handler == null) {
+ log.warn("[processMessage][未找到协议处理器,codecType: {}, method: {}]",
+ codecType, message.getMethod());
+ return;
+ }
+
+ // 5. 判断是否为认证消息
+ if (isAuthenticationMessage(message)) {
+ handleAuthenticationWithProtocol(clientId, message, codecType, socket, handler);
+ } else {
+ handleBusinessWithProtocol(clientId, message, codecType, socket, handler);
+ }
+ }
+
+ /**
+ * 使用协议处理器处理认证消息
+ *
+ * 认证结果处理:
+ * - SUCCESS:注册连接,发送上线消息
+ * - PENDING:等待后续认证步骤(如 JT808 注册后等待鉴权)
+ * - FAILURE:认证失败,不做处理(协议处理器已发送失败响应)
+ *
+ * @param clientId 客户端 ID
+ * @param message 认证消息
+ * @param codecType 消息编解码类型
+ * @param socket 网络连接
+ * @param handler 协议处理器
+ */
+ private void handleAuthenticationWithProtocol(String clientId, IotDeviceMessage message,
+ String codecType, NetSocket socket,
+ ProtocolHandler handler) {
+ try {
+ // 委托给协议处理器
+ AuthResult result = handler.handleAuthentication(clientId, message, codecType, socket);
+
+ // 根据认证结果处理
+ if (result.isSuccess()) {
+ // 认证成功:注册连接并发送上线消息
+ registerConnection(socket, result.getDevice(), clientId, codecType);
+ sendOnlineMessage(result.getDevice());
+
+ log.info("[handleAuthentication][认证成功,设备: {}, 协议: {}]",
+ result.getDevice().getDeviceName(), handler.getProtocolType());
+
+ } else if (result.isPending()) {
+ // 认证待定:等待后续认证步骤(如 JT808 注册后等待鉴权)
+ log.info("[handleAuthentication][认证待定,设备: {}, 协议: {}, 消息: {}]",
+ result.getDevice() != null ? result.getDevice().getDeviceName() : "unknown",
+ handler.getProtocolType(), result.getMessage());
+
+ } else {
+ // 认证失败:协议处理器已发送失败响应,这里只记录日志
+ log.warn("[handleAuthentication][认证失败,clientId: {}, 协议: {}, 原因: {}]",
+ clientId, handler.getProtocolType(), result.getMessage());
+ }
+
+ } catch (Exception e) {
+ log.error("[handleAuthentication][认证异常,clientId: {}, 协议: {}]",
+ clientId, handler.getProtocolType(), e);
+ handler.sendResponse(socket, message, false, "认证异常", codecType);
+ }
+ }
+
+ /**
+ * 使用协议处理器处理业务消息
+ *
+ * 前置条件:设备已认证
+ *
+ * @param clientId 客户端 ID
+ * @param message 业务消息
+ * @param codecType 消息编解码类型
+ * @param socket 网络连接
+ * @param handler 协议处理器
+ */
+ private void handleBusinessWithProtocol(String clientId, IotDeviceMessage message,
+ String codecType, NetSocket socket,
+ ProtocolHandler handler) {
+ try {
+ // 1. 检查认证状态
+ if (connectionManager.isNotAuthenticated(socket)) {
+ log.warn("[handleBusinessMessage][设备未认证,clientId: {}]", clientId);
+ handler.sendResponse(socket, message, false, "请先进行认证", codecType);
+ return;
+ }
+
+ // 2. 获取连接信息
+ IotTcpConnectionManager.ConnectionInfo connectionInfo = connectionManager.getConnectionInfo(socket);
+ if (connectionInfo == null) {
+ log.error("[handleBusinessMessage][连接信息不存在,clientId: {}]", clientId);
+ return;
+ }
+
+ // 3. 委托给协议处理器处理业务消息
+ handler.handleBusinessMessage(
+ clientId,
+ message,
+ codecType,
+ socket,
+ connectionInfo.getProductKey(),
+ connectionInfo.getDeviceName(),
+ serverId);
+
+ } catch (Exception e) {
+ log.error("[handleBusinessMessage][业务消息处理异常,clientId: {}]", clientId, e);
+ }
+ }
+
+ /**
+ * 获取消息编解码类型
+ *
+ * 检测优先级:
+ * 1. 如果已认证,使用缓存的编解码类型
+ * 2. 未认证时,通过消息格式自动检测:
+ * - JT808:首尾标识符 0x7e
+ * - Binary:魔术字 0x7E
+ * - JSON:默认
+ *
+ * @param buffer 消息
+ * @param socket 网络连接
+ * @return 消息编解码类型
+ */
+ private String getMessageCodecType(Buffer buffer, NetSocket socket) {
+ // 1. 如果已认证,优先使用缓存的编解码类型
+ IotTcpConnectionManager.ConnectionInfo connectionInfo = connectionManager.getConnectionInfo(socket);
+ if (connectionInfo != null && connectionInfo.isAuthenticated() &&
+ StrUtil.isNotBlank(connectionInfo.getCodecType())) {
+ return connectionInfo.getCodecType();
+ }
+
+ // 2. 未认证时检测消息格式类型
+ byte[] data = buffer.getBytes();
+
+ // 2.1 检测是否为 JT808 格式(首尾标识符 0x7e)
+ if (IotJt808DeviceMessageCodec.isJt808Format(data)) {
+ return CODEC_TYPE_JT808;
+ }
+
+ // 2.2 检测是否为自定义二进制格式(魔术字 0x7E)
+ if (IotTcpBinaryDeviceMessageCodec.isBinaryFormatQuick(data)) {
+ return CODEC_TYPE_BINARY;
+ }
+
+ // 2.3 检测是否为 JSON 格式(以 { 或 [ 开头)
+ String jsonStr = StrUtil.utf8Str(data).trim();
+ if (StrUtil.startWithAny(jsonStr, "{", "[")) {
+ return CODEC_TYPE_JSON;
+ }
+
+ // 2.4 未知格式
+ return null;
+ }
+
+ /**
+ * 查找协议处理器
+ *
+ * 遍历所有协议处理器,返回第一个能处理该消息的处理器
+ *
+ * @param message 设备消息
+ * @param codecType 消息编解码类型
+ * @return 协议处理器,未找到返回 null
+ */
+ private ProtocolHandler findProtocolHandler(IotDeviceMessage message, String codecType) {
+ return protocolHandlers.stream()
+ .filter(handler -> handler.canHandle(message, codecType))
+ .findFirst()
+ .orElse(null);
+ }
+
+ /**
+ * 判断是否为认证消息
+ *
+ * 认证消息包括:
+ * - 标准认证:auth
+ * - JT808 注册:jt808.terminal.register
+ * - JT808 鉴权:jt808.terminal.auth
+ *
+ * @param message 设备消息
+ * @return true-是认证消息,false-不是
+ */
+ private boolean isAuthenticationMessage(IotDeviceMessage message) {
+ String method = message.getMethod();
+ return "auth".equals(method)
+ || "jt808.terminal.register".equals(method)
+ || "jt808.terminal.auth".equals(method);
+ }
+
+ /**
+ * 注册连接信息
+ *
+ * 将设备连接信息注册到连接管理器,包括:
+ * - 设备 ID
+ * - 产品 Key
+ * - 设备名称
+ * - 客户端 ID
+ * - 编解码类型
+ * - 认证状态
+ *
+ * @param socket 网络连接
+ * @param device 设备信息
+ * @param clientId 客户端 ID
+ * @param codecType 消息编解码类型
+ */
+ private void registerConnection(NetSocket socket, IotDeviceRespDTO device,
+ String clientId, String codecType) {
+ IotTcpConnectionManager.ConnectionInfo connectionInfo = new IotTcpConnectionManager.ConnectionInfo()
+ .setDeviceId(device.getId())
+ .setProductKey(device.getProductKey())
+ .setDeviceName(device.getDeviceName())
+ .setClientId(clientId)
+ .setCodecType(codecType)
+ .setAuthenticated(true);
+
+ // 注册连接(如果设备已有其他连接,会自动断开旧连接)
+ connectionManager.registerConnection(socket, device.getId(), connectionInfo);
+ }
+
+ /**
+ * 发送设备上线消息
+ *
+ * 设备认证成功后,发送上线消息到消息总线,通知业务层设备已上线
+ *
+ * @param device 设备信息
+ */
+ private void sendOnlineMessage(IotDeviceRespDTO device) {
+ try {
+ IotDeviceMessage onlineMessage = IotDeviceMessage.buildStateUpdateOnline();
+ deviceMessageService.sendDeviceMessage(onlineMessage, device.getProductKey(),
+ device.getDeviceName(), serverId);
+
+ log.debug("[sendOnlineMessage][发送上线消息成功,设备: {}]", device.getDeviceName());
+
+ } catch (Exception e) {
+ log.error("[sendOnlineMessage][发送上线消息失败,设备: {}]", device.getDeviceName(), e);
+ }
+ }
+
+ /**
+ * 清理连接
+ *
+ * 连接关闭或异常时,清理连接信息并发送离线消息
+ *
+ * @param socket 网络连接
+ */
+ private void cleanupConnection(NetSocket socket) {
+ try {
+ // 1. 发送离线消息(如果已认证)
+ IotTcpConnectionManager.ConnectionInfo connectionInfo = connectionManager.getConnectionInfo(socket);
+ if (connectionInfo != null && connectionInfo.isAuthenticated()) {
+ IotDeviceMessage offlineMessage = IotDeviceMessage.buildStateOffline();
+ deviceMessageService.sendDeviceMessage(offlineMessage, connectionInfo.getProductKey(),
+ connectionInfo.getDeviceName(), serverId);
+
+ log.debug("[cleanupConnection][发送离线消息成功,设备: {}]", connectionInfo.getDeviceName());
+ }
+
+ // 2. 注销连接
+ connectionManager.unregisterConnection(socket);
+
+ } catch (Exception e) {
+ log.error("[cleanupConnection][清理连接失败]", e);
+ }
+ }
+
+}
diff --git a/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/api/device/IoTDeviceApiImpl.java b/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/api/device/IoTDeviceApiImpl.java
index 58fed2d..19e2798 100644
--- a/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/api/device/IoTDeviceApiImpl.java
+++ b/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/api/device/IoTDeviceApiImpl.java
@@ -1,75 +1,75 @@
-package com.viewsh.module.iot.api.device;
-
-import com.viewsh.framework.common.enums.RpcConstants;
-import com.viewsh.framework.common.pojo.CommonResult;
-import com.viewsh.framework.common.util.object.BeanUtils;
-import com.viewsh.module.iot.core.biz.IotDeviceCommonApi;
-import com.viewsh.module.iot.core.biz.dto.IotDeviceAuthReqDTO;
-import com.viewsh.module.iot.core.biz.dto.IotDeviceGetReqDTO;
-import com.viewsh.module.iot.core.biz.dto.IotDeviceRespDTO;
-import com.viewsh.module.iot.dal.dataobject.device.IotDeviceDO;
-import com.viewsh.module.iot.dal.dataobject.product.IotProductDO;
-import com.viewsh.module.iot.service.device.IotDeviceService;
-import com.viewsh.module.iot.service.product.IotProductService;
-import jakarta.annotation.Resource;
-import jakarta.annotation.security.PermitAll;
-import org.springframework.context.annotation.Primary;
-import org.springframework.validation.annotation.Validated;
-import org.springframework.web.bind.annotation.PostMapping;
-import org.springframework.web.bind.annotation.RequestBody;
-import org.springframework.web.bind.annotation.RestController;
-
-import static com.viewsh.framework.common.pojo.CommonResult.success;
-
-/**
- * IoT 设备 API 实现类
- *
- * @author haohao
- */
-@RestController
-@Validated
-@Primary // 保证优先匹配,因为 viewsh-iot-gateway 也有 IotDeviceCommonApi 的实现,并且也可能会被 biz 引入
-public class IoTDeviceApiImpl implements IotDeviceCommonApi {
-
- @Resource
- private IotDeviceService deviceService;
- @Resource
- private IotProductService productService;
-
- @Override
- @PostMapping(RpcConstants.RPC_API_PREFIX + "/iot/device/auth")
- @PermitAll
- public CommonResult authDevice(@RequestBody IotDeviceAuthReqDTO authReqDTO) {
- return success(deviceService.authDevice(authReqDTO));
- }
-
- @Override
- @PostMapping(RpcConstants.RPC_API_PREFIX + "/iot/device/get") // 特殊:方便调用,暂时使用 POST,实际更推荐 GET
- @PermitAll
- public CommonResult getDevice(@RequestBody IotDeviceGetReqDTO getReqDTO) {
- IotDeviceDO device;
-
- // 查询优先级:id > (productKey + deviceName) > deviceName
- if (getReqDTO.getId() != null) {
- // 通过设备 ID 查询
- device = deviceService.getDeviceFromCache(getReqDTO.getId());
- } else if (getReqDTO.getProductKey() != null && getReqDTO.getDeviceName() != null) {
- // 通过 productKey + deviceName 查询
- device = deviceService.getDeviceFromCache(getReqDTO.getProductKey(), getReqDTO.getDeviceName());
- } else if (getReqDTO.getDeviceName() != null) {
- // 仅通过 deviceName 查询(用于 JT808 等协议,终端手机号应该是全局唯一的)
- device = deviceService.getDeviceFromCacheByDeviceName(getReqDTO.getDeviceName());
- } else {
- device = null;
- }
-
- return success(BeanUtils.toBean(device, IotDeviceRespDTO.class, deviceDTO -> {
- IotProductDO product = productService.getProductFromCache(deviceDTO.getProductId());
- if (product != null) {
- deviceDTO.setCodecType(product.getCodecType());
- deviceDTO.setProductAuthType(product.getAuthType());
- }
- }));
- }
-
+package com.viewsh.module.iot.api.device;
+
+import com.viewsh.framework.common.enums.RpcConstants;
+import com.viewsh.framework.common.pojo.CommonResult;
+import com.viewsh.framework.common.util.object.BeanUtils;
+import com.viewsh.module.iot.core.biz.IotDeviceCommonApi;
+import com.viewsh.module.iot.core.biz.dto.IotDeviceAuthReqDTO;
+import com.viewsh.module.iot.core.biz.dto.IotDeviceGetReqDTO;
+import com.viewsh.module.iot.core.biz.dto.IotDeviceRespDTO;
+import com.viewsh.module.iot.dal.dataobject.device.IotDeviceDO;
+import com.viewsh.module.iot.dal.dataobject.product.IotProductDO;
+import com.viewsh.module.iot.service.device.IotDeviceService;
+import com.viewsh.module.iot.service.product.IotProductService;
+import jakarta.annotation.Resource;
+import jakarta.annotation.security.PermitAll;
+import org.springframework.context.annotation.Primary;
+import org.springframework.validation.annotation.Validated;
+import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.RequestBody;
+import org.springframework.web.bind.annotation.RestController;
+
+import static com.viewsh.framework.common.pojo.CommonResult.success;
+
+/**
+ * IoT 设备 API 实现类(供IOT GATEWAY使用)
+ *
+ * @author haohao
+ */
+@RestController
+@Validated
+@Primary // 保证优先匹配,因为 viewsh-iot-gateway 也有 IotDeviceCommonApi 的实现,并且也可能会被 biz 引入
+public class IoTDeviceApiImpl implements IotDeviceCommonApi {
+
+ @Resource
+ private IotDeviceService deviceService;
+ @Resource
+ private IotProductService productService;
+
+ @Override
+ @PostMapping(RpcConstants.RPC_API_PREFIX + "/iot/device/auth")
+ @PermitAll
+ public CommonResult authDevice(@RequestBody IotDeviceAuthReqDTO authReqDTO) {
+ return success(deviceService.authDevice(authReqDTO));
+ }
+
+ @Override
+ @PostMapping(RpcConstants.RPC_API_PREFIX + "/iot/device/get") // 特殊:方便调用,暂时使用 POST,实际更推荐 GET
+ @PermitAll
+ public CommonResult getDevice(@RequestBody IotDeviceGetReqDTO getReqDTO) {
+ IotDeviceDO device;
+
+ // 查询优先级:id > (productKey + deviceName) > deviceName
+ if (getReqDTO.getId() != null) {
+ // 通过设备 ID 查询
+ device = deviceService.getDeviceFromCache(getReqDTO.getId());
+ } else if (getReqDTO.getProductKey() != null && getReqDTO.getDeviceName() != null) {
+ // 通过 productKey + deviceName 查询
+ device = deviceService.getDeviceFromCache(getReqDTO.getProductKey(), getReqDTO.getDeviceName());
+ } else if (getReqDTO.getDeviceName() != null) {
+ // 仅通过 deviceName 查询(用于 JT808 等协议,终端手机号应该是全局唯一的)
+ device = deviceService.getDeviceFromCacheByDeviceName(getReqDTO.getDeviceName());
+ } else {
+ device = null;
+ }
+
+ return success(BeanUtils.toBean(device, IotDeviceRespDTO.class, deviceDTO -> {
+ IotProductDO product = productService.getProductFromCache(deviceDTO.getProductId());
+ if (product != null) {
+ deviceDTO.setCodecType(product.getCodecType());
+ deviceDTO.setProductAuthType(product.getAuthType());
+ }
+ }));
+ }
+
}
\ No newline at end of file
diff --git a/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/api/device/IotDeviceControlApiImpl.java b/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/api/device/IotDeviceControlApiImpl.java
new file mode 100644
index 0000000..c72af38
--- /dev/null
+++ b/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/api/device/IotDeviceControlApiImpl.java
@@ -0,0 +1,123 @@
+package com.viewsh.module.iot.api.device;
+
+import cn.hutool.core.map.MapUtil;
+import com.viewsh.framework.common.pojo.CommonResult;
+import com.viewsh.module.iot.api.device.dto.IotDeviceServiceInvokeReqDTO;
+import com.viewsh.module.iot.api.device.dto.IotDeviceServiceInvokeRespDTO;
+import com.viewsh.module.iot.core.enums.IotDeviceMessageMethodEnum;
+import com.viewsh.module.iot.core.enums.IotDeviceStateEnum;
+import com.viewsh.module.iot.core.mq.message.IotDeviceMessage;
+import com.viewsh.module.iot.dal.dataobject.device.IotDeviceDO;
+import com.viewsh.module.iot.service.device.IotDeviceService;
+import com.viewsh.module.iot.service.device.message.IotDeviceMessageService;
+import io.swagger.v3.oas.annotations.Operation;
+import jakarta.annotation.Resource;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.context.annotation.Primary;
+import org.springframework.validation.annotation.Validated;
+import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.RequestBody;
+import org.springframework.web.bind.annotation.RestController;
+
+import java.time.LocalDateTime;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static com.viewsh.framework.common.pojo.CommonResult.success;
+
+/**
+ * IoT 设备控制 API 实现类
+ *
+ * 提供 RPC 接口供其他模块调用 IoT 设备服务
+ *
+ * @author lzh
+ */
+@RestController
+@Validated
+@Primary
+@Slf4j
+public class IotDeviceControlApiImpl implements IotDeviceControlApi {
+
+ @Resource
+ private IotDeviceService deviceService;
+
+ @Resource
+ private IotDeviceMessageService deviceMessageService;
+
+ @Override
+ @PostMapping(PREFIX + "/invoke-service")
+ @Operation(summary = "调用设备服务")
+ public CommonResult invokeService(@RequestBody IotDeviceServiceInvokeReqDTO reqDTO) {
+ try {
+ // 1. 获取设备信息
+ IotDeviceDO device = deviceService.getDeviceFromCache(reqDTO.getDeviceId());
+ if (device == null) {
+ return success(IotDeviceServiceInvokeRespDTO.builder()
+ .success(false)
+ .code(404)
+ .errorMsg("设备不存在")
+ .responseTime(LocalDateTime.now())
+ .build());
+ }
+
+ // 2. 检查设备是否在线
+ if (!IotDeviceStateEnum.ONLINE.getState().equals(device.getState())) {
+ return success(IotDeviceServiceInvokeRespDTO.builder()
+ .success(false)
+ .code(400)
+ .errorMsg("设备不在线,当前状态: " + device.getState())
+ .responseTime(LocalDateTime.now())
+ .build());
+ }
+
+ // 3. 构建服务调用消息
+ @SuppressWarnings("unchecked")
+ Map params = (Map) (Map, ?>) MapUtil.builder()
+ .put("identifier", reqDTO.getIdentifier())
+ .put("params", reqDTO.getParams() != null ? reqDTO.getParams() : new HashMap<>())
+ .build();
+
+ IotDeviceMessage message = IotDeviceMessage.requestOf(
+ IotDeviceMessageMethodEnum.SERVICE_INVOKE.getMethod(), params);
+
+ // 4. 发送消息
+ IotDeviceMessage result = deviceMessageService.sendDeviceMessage(message, device);
+
+ // 5. 返回结果(异步模式,立即返回消息ID)
+ return success(IotDeviceServiceInvokeRespDTO.builder()
+ .messageId(result.getId())
+ .success(true)
+ .data(null)
+ .responseTime(LocalDateTime.now())
+ .build());
+
+ } catch (Exception e) {
+ log.error("[invokeService] 设备服务调用失败: deviceId={}, identifier={}",
+ reqDTO.getDeviceId(), reqDTO.getIdentifier(), e);
+ return success(IotDeviceServiceInvokeRespDTO.builder()
+ .success(false)
+ .code(500)
+ .errorMsg(e.getMessage())
+ .responseTime(LocalDateTime.now())
+ .build());
+ }
+ }
+
+ @Override
+ @PostMapping(PREFIX + "/invoke-service-batch")
+ @Operation(summary = "批量调用设备服务")
+ public CommonResult> invokeServiceBatch(
+ @RequestBody List reqDTOList) {
+
+ List results = new java.util.ArrayList<>();
+ for (IotDeviceServiceInvokeReqDTO reqDTO : reqDTOList) {
+ CommonResult result = invokeService(reqDTO);
+ results.add(result.getData());
+ }
+
+ return success(results);
+ }
+
+}
+
diff --git a/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/api/device/IotDevicePropertyQueryApiImpl.java b/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/api/device/IotDevicePropertyQueryApiImpl.java
new file mode 100644
index 0000000..f5c9047
--- /dev/null
+++ b/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/api/device/IotDevicePropertyQueryApiImpl.java
@@ -0,0 +1,175 @@
+package com.viewsh.module.iot.api.device;
+
+import com.viewsh.framework.common.pojo.CommonResult;
+import com.viewsh.module.iot.api.device.dto.property.DevicePropertyBatchQueryReqDTO;
+import com.viewsh.module.iot.api.device.dto.property.DevicePropertyHistoryQueryReqDTO;
+import com.viewsh.module.iot.api.device.dto.property.DevicePropertyHistoryRespDTO;
+import com.viewsh.module.iot.api.device.dto.property.DevicePropertyRespDTO;
+import com.viewsh.module.iot.controller.admin.device.vo.property.IotDevicePropertyHistoryListReqVO;
+import com.viewsh.module.iot.controller.admin.device.vo.property.IotDevicePropertyRespVO;
+import com.viewsh.module.iot.dal.dataobject.device.IotDeviceDO;
+import com.viewsh.module.iot.dal.dataobject.device.IotDevicePropertyDO;
+import com.viewsh.module.iot.service.device.IotDeviceService;
+import com.viewsh.module.iot.service.device.property.IotDevicePropertyService;
+import io.swagger.v3.oas.annotations.Operation;
+import jakarta.annotation.Resource;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.context.annotation.Primary;
+import org.springframework.validation.annotation.Validated;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.RequestBody;
+import org.springframework.web.bind.annotation.RequestParam;
+import org.springframework.web.bind.annotation.RestController;
+
+import java.time.LocalDateTime;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static com.viewsh.framework.common.pojo.CommonResult.success;
+
+/**
+ * IoT 设备属性查询 API 实现类
+ *
+ * 提供 RPC 接口供其他模块(如 Ops 模块)查询设备属性
+ *
+ * @author lzh
+ */
+@RestController
+@Validated
+@Primary
+@Slf4j
+public class IotDevicePropertyQueryApiImpl implements IotDevicePropertyQueryApi {
+
+ @Resource
+ private IotDeviceService deviceService;
+
+ @Resource
+ private IotDevicePropertyService devicePropertyService;
+
+ @Override
+ @GetMapping(PREFIX + "/get")
+ @Operation(summary = "获取设备单个属性")
+ public CommonResult getProperty(@RequestParam("deviceId") Long deviceId,
+ @RequestParam("identifier") String identifier) {
+ try {
+ Map properties = devicePropertyService.getLatestDeviceProperties(deviceId);
+ IotDevicePropertyDO property = properties.get(identifier);
+
+ if (property == null) {
+ return success(DevicePropertyRespDTO.builder()
+ .identifier(identifier)
+ .value(null)
+ .updateTime(null)
+ .build());
+ }
+
+ return success(DevicePropertyRespDTO.builder()
+ .identifier(identifier)
+ .value(property.getValue())
+ .updateTime(property.getUpdateTime() != null ?
+ property.getUpdateTime().atZone(java.time.ZoneId.systemDefault()).toInstant().toEpochMilli() : null)
+ .build());
+ } catch (Exception e) {
+ log.error("[getProperty] 获取设备属性失败: deviceId={}, identifier={}", deviceId, identifier, e);
+ return success(DevicePropertyRespDTO.builder()
+ .identifier(identifier)
+ .value(null)
+ .updateTime(null)
+ .build());
+ }
+ }
+
+ @Override
+ @GetMapping(PREFIX + "/get-latest")
+ @Operation(summary = "获取设备最新属性")
+ public CommonResult