diff --git a/viewshanghai-module-iot/viewshanghai-module-iot-gateway/src/main/java/com/viewshanghai/module/iot/gateway/config/IotGatewayConfiguration.java b/viewshanghai-module-iot/viewshanghai-module-iot-gateway/src/main/java/com/viewshanghai/module/iot/gateway/config/IotGatewayConfiguration.java index b1c500f..e84209d 100644 --- a/viewshanghai-module-iot/viewshanghai-module-iot-gateway/src/main/java/com/viewshanghai/module/iot/gateway/config/IotGatewayConfiguration.java +++ b/viewshanghai-module-iot/viewshanghai-module-iot-gateway/src/main/java/com/viewshanghai/module/iot/gateway/config/IotGatewayConfiguration.java @@ -12,10 +12,13 @@ import com.viewshanghai.module.iot.gateway.protocol.mqtt.manager.IotMqttConnecti import com.viewshanghai.module.iot.gateway.protocol.mqtt.router.IotMqttDownstreamHandler; import com.viewshanghai.module.iot.gateway.protocol.tcp.IotTcpDownstreamSubscriber; import com.viewshanghai.module.iot.gateway.protocol.tcp.IotTcpUpstreamProtocol; +import com.viewshanghai.module.iot.gateway.protocol.tcp.handler.ProtocolHandler; import com.viewshanghai.module.iot.gateway.protocol.tcp.manager.IotTcpConnectionManager; import com.viewshanghai.module.iot.gateway.service.device.IotDeviceService; import com.viewshanghai.module.iot.gateway.service.device.message.IotDeviceMessageService; import io.vertx.core.Vertx; + +import java.util.List; import lombok.extern.slf4j.Slf4j; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.context.properties.EnableConfigurationProperties; @@ -97,9 +100,10 @@ public class IotGatewayConfiguration { IotDeviceService deviceService, IotDeviceMessageService messageService, IotTcpConnectionManager connectionManager, + List protocolHandlers, Vertx tcpVertx) { return new IotTcpUpstreamProtocol(gatewayProperties.getProtocol().getTcp(), - deviceService, messageService, connectionManager, tcpVertx); + deviceService, messageService, connectionManager, protocolHandlers, tcpVertx); } @Bean diff --git a/viewshanghai-module-iot/viewshanghai-module-iot-gateway/src/main/java/com/viewshanghai/module/iot/gateway/protocol/tcp/IotTcpUpstreamProtocol.java b/viewshanghai-module-iot/viewshanghai-module-iot-gateway/src/main/java/com/viewshanghai/module/iot/gateway/protocol/tcp/IotTcpUpstreamProtocol.java index db803e6..4ebbc39 100644 --- a/viewshanghai-module-iot/viewshanghai-module-iot-gateway/src/main/java/com/viewshanghai/module/iot/gateway/protocol/tcp/IotTcpUpstreamProtocol.java +++ b/viewshanghai-module-iot/viewshanghai-module-iot-gateway/src/main/java/com/viewshanghai/module/iot/gateway/protocol/tcp/IotTcpUpstreamProtocol.java @@ -2,6 +2,7 @@ package com.viewshanghai.module.iot.gateway.protocol.tcp; import com.viewshanghai.module.iot.core.util.IotDeviceMessageUtils; import com.viewshanghai.module.iot.gateway.config.IotGatewayProperties; +import com.viewshanghai.module.iot.gateway.protocol.tcp.handler.ProtocolHandler; import com.viewshanghai.module.iot.gateway.protocol.tcp.manager.IotTcpConnectionManager; import com.viewshanghai.module.iot.gateway.protocol.tcp.router.IotTcpUpstreamHandler; import com.viewshanghai.module.iot.gateway.service.device.IotDeviceService; @@ -15,6 +16,8 @@ import jakarta.annotation.PreDestroy; import lombok.Getter; import lombok.extern.slf4j.Slf4j; +import java.util.List; + /** * IoT 网关 TCP 协议:接收设备上行消息 * @@ -31,6 +34,8 @@ public class IotTcpUpstreamProtocol { private final IotTcpConnectionManager connectionManager; + private final List protocolHandlers; + private final Vertx vertx; @Getter @@ -42,11 +47,13 @@ public class IotTcpUpstreamProtocol { IotDeviceService deviceService, IotDeviceMessageService messageService, IotTcpConnectionManager connectionManager, + List protocolHandlers, Vertx vertx) { this.tcpProperties = tcpProperties; this.deviceService = deviceService; this.messageService = messageService; this.connectionManager = connectionManager; + this.protocolHandlers = protocolHandlers; this.vertx = vertx; this.serverId = IotDeviceMessageUtils.generateServerId(tcpProperties.getPort()); } @@ -70,8 +77,8 @@ public class IotTcpUpstreamProtocol { // 创建服务器并设置连接处理器 tcpServer = vertx.createNetServer(options); tcpServer.connectHandler(socket -> { - IotTcpUpstreamHandler handler = new IotTcpUpstreamHandler(this, messageService, deviceService, - connectionManager); + IotTcpUpstreamHandler handler = new IotTcpUpstreamHandler(this, messageService, + connectionManager, protocolHandlers); handler.handle(socket); }); diff --git a/viewshanghai-module-iot/viewshanghai-module-iot-gateway/src/main/java/com/viewshanghai/module/iot/gateway/protocol/tcp/handler/AbstractProtocolHandler.java b/viewshanghai-module-iot/viewshanghai-module-iot-gateway/src/main/java/com/viewshanghai/module/iot/gateway/protocol/tcp/handler/AbstractProtocolHandler.java new file mode 100644 index 0000000..bb3ebe4 --- /dev/null +++ b/viewshanghai-module-iot/viewshanghai-module-iot-gateway/src/main/java/com/viewshanghai/module/iot/gateway/protocol/tcp/handler/AbstractProtocolHandler.java @@ -0,0 +1,127 @@ +package com.viewshanghai.module.iot.gateway.protocol.tcp.handler; + +import cn.hutool.core.map.MapUtil; +import com.viewshanghai.module.iot.core.mq.message.IotDeviceMessage; +import com.viewshanghai.module.iot.gateway.service.device.message.IotDeviceMessageService; +import io.vertx.core.buffer.Buffer; +import io.vertx.core.net.NetSocket; +import lombok.extern.slf4j.Slf4j; + +/** + * 协议处理器抽象基类 + *

+ * 提供协议处理器的通用功能实现: + * 1. 消息编解码服务注入 + * 2. 通用响应发送方法 + * 3. 日志记录 + *

+ * 子类只需实现协议特定的认证和业务逻辑即可。 + * + * @author lzh + */ +@Slf4j +public abstract class AbstractProtocolHandler implements ProtocolHandler { + + /** + * 标准认证方法名 + */ + protected static final String AUTH_METHOD = "auth"; + + /** + * 设备消息服务(用于编解码) + */ + protected final IotDeviceMessageService deviceMessageService; + + protected AbstractProtocolHandler(IotDeviceMessageService deviceMessageService) { + this.deviceMessageService = deviceMessageService; + } + + /** + * 默认响应发送实现 + *

+ * 发送标准格式的响应消息: + * { + * "success": true/false, + * "message": "消息内容" + * } + *

+ * 子类可以重写此方法以实现协议特定的响应格式。 + * + * @param socket 网络连接 + * @param originalMessage 原始请求消息 + * @param success 是否成功 + * @param message 响应消息 + * @param codecType 消息编解码类型 + */ + @Override + public void sendResponse(NetSocket socket, IotDeviceMessage originalMessage, + boolean success, String message, String codecType) { + try { + // 构建响应数据 + Object responseData = MapUtil.builder() + .put("success", success) + .put("message", message) + .build(); + + // 构建响应消息 + int code = success ? 0 : 401; + IotDeviceMessage responseMessage = IotDeviceMessage.replyOf( + originalMessage.getRequestId(), + originalMessage.getMethod(), + responseData, + code, + message + ); + + // 编码并发送 + byte[] encodedData = deviceMessageService.encodeDeviceMessage(responseMessage, codecType); + socket.write(Buffer.buffer(encodedData)); + + log.debug("[sendResponse][发送响应成功,协议: {}, success: {}, message: {}]", + getProtocolType(), success, message); + } catch (Exception e) { + log.error("[sendResponse][发送响应失败,协议: {}, requestId: {}]", + getProtocolType(), originalMessage.getRequestId(), e); + } + } + + /** + * 发送成功响应(便捷方法) + * + * @param socket 网络连接 + * @param originalMessage 原始请求消息 + * @param message 成功消息 + * @param codecType 消息编解码类型 + */ + protected void sendSuccessResponse(NetSocket socket, IotDeviceMessage originalMessage, + String message, String codecType) { + sendResponse(socket, originalMessage, true, message, codecType); + } + + /** + * 发送失败响应(便捷方法) + * + * @param socket 网络连接 + * @param originalMessage 原始请求消息 + * @param message 失败消息 + * @param codecType 消息编解码类型 + */ + protected void sendErrorResponse(NetSocket socket, IotDeviceMessage originalMessage, + String message, String codecType) { + sendResponse(socket, originalMessage, false, message, codecType); + } + + /** + * 判断是否为认证消息 + *

+ * 默认实现:判断 method 是否为 "auth" + * 子类可以重写以支持协议特定的认证方法名 + * + * @param message 设备消息 + * @return true-是认证消息,false-不是 + */ + protected boolean isAuthenticationMessage(IotDeviceMessage message) { + return AUTH_METHOD.equals(message.getMethod()); + } + +} diff --git a/viewshanghai-module-iot/viewshanghai-module-iot-gateway/src/main/java/com/viewshanghai/module/iot/gateway/protocol/tcp/handler/AuthResult.java b/viewshanghai-module-iot/viewshanghai-module-iot-gateway/src/main/java/com/viewshanghai/module/iot/gateway/protocol/tcp/handler/AuthResult.java new file mode 100644 index 0000000..4b2824a --- /dev/null +++ b/viewshanghai-module-iot/viewshanghai-module-iot-gateway/src/main/java/com/viewshanghai/module/iot/gateway/protocol/tcp/handler/AuthResult.java @@ -0,0 +1,146 @@ +package com.viewshanghai.module.iot.gateway.protocol.tcp.handler; + +import com.viewshanghai.module.iot.core.biz.dto.IotDeviceRespDTO; +import lombok.Data; +import lombok.experimental.Accessors; + +/** + * 认证结果封装类 + *

+ * 用于协议处理器返回认证处理结果,包含: + * - 认证是否成功 + * - 设备信息(认证成功时) + * - 错误消息(认证失败时) + * - 认证状态(pending 表示需要多步认证) + *

+ * 使用场景: + * 1. 标准协议:单次认证,返回 success 或 failure + * 2. JT808:两步认证 + * - 注册阶段:返回 pending(注册成功但未完成认证) + * - 鉴权阶段:返回 success 或 failure + * 3. 免认证协议:直接返回 success + * + * @author lzh + */ +@Data +@Accessors(chain = true) +public class AuthResult { + + /** + * 认证状态枚举 + */ + public enum Status { + /** + * 认证成功:设备已完成认证,可以发送业务消息 + */ + SUCCESS, + + /** + * 认证失败:设备认证失败,连接应该断开或等待重试 + */ + FAILURE, + + /** + * 认证待定:设备部分认证成功,需要继续后续步骤 + *

+ * 例如 JT808 的注册阶段:注册成功但还需要鉴权 + */ + PENDING + } + + /** + * 认证状态 + */ + private Status status; + + /** + * 响应消息(用于日志和错误提示) + */ + private String message; + + /** + * 设备信息(认证成功或 PENDING 时必须提供) + */ + private IotDeviceRespDTO device; + + /** + * 协议特定的附加数据 + *

+ * 用于协议处理器在多步认证中传递状态信息。 + * 例如 JT808 可以在注册阶段存储鉴权码,鉴权阶段取出验证。 + */ + private Object additionalData; + + // ========== 便捷构造方法 ========== + + /** + * 创建认证成功结果 + * + * @param device 设备信息 + * @param message 成功消息 + * @return 认证成功结果 + */ + public static AuthResult success(IotDeviceRespDTO device, String message) { + return new AuthResult() + .setStatus(Status.SUCCESS) + .setDevice(device) + .setMessage(message); + } + + /** + * 创建认证失败结果 + * + * @param message 失败原因 + * @return 认证失败结果 + */ + public static AuthResult failure(String message) { + return new AuthResult() + .setStatus(Status.FAILURE) + .setMessage(message); + } + + /** + * 创建认证待定结果(用于多步认证) + * + * @param device 设备信息 + * @param message 待定消息 + * @return 认证待定结果 + */ + public static AuthResult pending(IotDeviceRespDTO device, String message) { + return new AuthResult() + .setStatus(Status.PENDING) + .setDevice(device) + .setMessage(message); + } + + // ========== 便捷判断方法 ========== + + /** + * 是否认证成功 + */ + public boolean isSuccess() { + return Status.SUCCESS == status; + } + + /** + * 是否认证失败 + */ + public boolean isFailure() { + return Status.FAILURE == status; + } + + /** + * 是否认证待定(需要继续后续步骤) + */ + public boolean isPending() { + return Status.PENDING == status; + } + + /** + * 是否已完成认证(成功或失败,不包括待定) + */ + public boolean isCompleted() { + return status == Status.SUCCESS || status == Status.FAILURE; + } + +} diff --git a/viewshanghai-module-iot/viewshanghai-module-iot-gateway/src/main/java/com/viewshanghai/module/iot/gateway/protocol/tcp/handler/Jt808ProtocolHandler.java b/viewshanghai-module-iot/viewshanghai-module-iot-gateway/src/main/java/com/viewshanghai/module/iot/gateway/protocol/tcp/handler/Jt808ProtocolHandler.java new file mode 100644 index 0000000..ce101c6 --- /dev/null +++ b/viewshanghai-module-iot/viewshanghai-module-iot-gateway/src/main/java/com/viewshanghai/module/iot/gateway/protocol/tcp/handler/Jt808ProtocolHandler.java @@ -0,0 +1,536 @@ +package com.viewshanghai.module.iot.gateway.protocol.tcp.handler; + +import cn.hutool.core.map.MapUtil; +import cn.hutool.core.util.StrUtil; +import cn.hutool.extra.spring.SpringUtil; +import com.viewshanghai.framework.common.pojo.CommonResult; +import com.viewshanghai.module.iot.core.biz.IotDeviceCommonApi; +import com.viewshanghai.module.iot.core.biz.dto.IotDeviceGetReqDTO; +import com.viewshanghai.module.iot.core.biz.dto.IotDeviceRespDTO; +import com.viewshanghai.module.iot.core.mq.message.IotDeviceMessage; +import com.viewshanghai.module.iot.gateway.codec.jt808.IotJt808DeviceMessageCodec; +import com.viewshanghai.module.iot.gateway.service.device.message.IotDeviceMessageService; +import io.vertx.core.buffer.Buffer; +import io.vertx.core.net.NetSocket; +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.springframework.data.redis.core.StringRedisTemplate; +import org.springframework.stereotype.Component; + +import java.util.Map; +import java.util.concurrent.TimeUnit; + +/** + * JT808 协议处理器 + *

+ * 实现 JT808 部标协议的完整认证流程: + * 1. 终端注册(0x0100):设备首次连接时注册,平台返回鉴权码 + * 2. 终端鉴权(0x0102):设备使用鉴权码进行鉴权,鉴权成功后可发送业务消息 + *

+ * 认证策略(基于设备配置的 authType): + * - SECRET(一机一密):使用设备的 deviceSecret 作为鉴权码 + * - PRODUCT_SECRET(一型一密):使用产品的 productSecret 作为鉴权码 + * - NONE(免鉴权):使用终端手机号作为鉴权码(兼容模式) + *

+ * 设计说明: + * - 基于 demo 项目(com/iot/transport/jt808)的实现逻辑 + * - 注册和鉴权分两步完成,符合 JT808 标准流程 + * - 鉴权码在注册阶段生成并缓存,鉴权阶段验证后清除 + * - 支持设备级和产品级认证类型配置(设备级优先) + * + * @author lzh + */ +@Slf4j +@Component +public class Jt808ProtocolHandler extends AbstractProtocolHandler { + + private static final String CODEC_TYPE_JT808 = IotJt808DeviceMessageCodec.TYPE; + + /** + * JT808 消息方法名 + */ + private static final String METHOD_REGISTER = "jt808.terminal.register"; // 终端注册 (0x0100) + private static final String METHOD_AUTH = "jt808.terminal.auth"; // 终端鉴权 (0x0102) + + /** + * 鉴权码缓存过期时间(分钟) + *

+ * 设备在注册后需要在此时间内完成鉴权,否则需要重新注册。 + * Redis 会自动清理过期的鉴权码,无需手动管理。 + */ + private static final int AUTH_TOKEN_EXPIRE_MINUTES = 5; + private final String JT808_AUTH_TOKEN = "iot:jt808_auth_token:%s"; + /** + * Redis 模板(用于鉴权码缓存) + */ + @Resource + private StringRedisTemplate stringRedisTemplate; + + private final IotDeviceCommonApi deviceApi; + + public Jt808ProtocolHandler(IotDeviceMessageService deviceMessageService) { + super(deviceMessageService); + // 使用 SpringUtil 延迟获取,避免循环依赖 + this.deviceApi = SpringUtil.getBean(IotDeviceCommonApi.class); + } + + @Override + public String getProtocolType() { + return "JT808"; + } + + @Override + public boolean canHandle(IotDeviceMessage message, String codecType) { + // 只处理 JT808 编解码类型的消息 + return CODEC_TYPE_JT808.equals(codecType); + } + + @Override + public AuthResult handleAuthentication(String clientId, IotDeviceMessage message, + String codecType, NetSocket socket) { + String method = message.getMethod(); + + // 路由到不同的认证处理方法 + if (METHOD_REGISTER.equals(method)) { + return handleRegister(clientId, message, codecType, socket); + } else if (METHOD_AUTH.equals(method)) { + return handleAuth(clientId, message, codecType, socket); + } else { + log.warn("[handleAuthentication][不支持的认证方法,method: {}]", method); + return AuthResult.failure("不支持的认证方法: " + method); + } + } + + @Override + public void handleBusinessMessage(String clientId, IotDeviceMessage message, + String codecType, NetSocket socket, + String productKey, String deviceName, String serverId) { + try { + // 发送消息到消息总线 + deviceMessageService.sendDeviceMessage(message, productKey, deviceName, serverId); + + log.info("[handleBusinessMessage][JT808 业务消息已发送,clientId: {}, method: {}, messageId: {}]", + clientId, message.getMethod(), message.getId()); + + } catch (Exception e) { + log.error("[handleBusinessMessage][JT808 业务消息处理异常,clientId: {}]", clientId, e); + } + } + + @Override + public void sendResponse(NetSocket socket, IotDeviceMessage originalMessage, + boolean success, String message, String codecType) { + // JT808 协议的响应由具体的处理方法发送(注册应答、通用应答等) + // 此方法不需要实现,保留空实现 + log.debug("[sendResponse][JT808 协议响应由具体方法发送,跳过通用响应]"); + } + + // ========== 认证处理方法 ========== + + /** + * 处理终端注册(0x0100) + *

+ * 流程: + * 1. 提取终端手机号(deviceName) + * 2. 查询设备是否存在 + * 3. 生成鉴权码(根据设备的认证类型) + * 4. 缓存鉴权码 + * 5. 发送注册应答(0x8100) + *

+ * 注意:注册成功后,设备还需要发送鉴权消息(0x0102)才能完成认证 + * + * @param clientId 客户端ID + * @param message 注册消息 + * @param codecType 编解码类型 + * @param socket 网络连接 + * @return 认证结果(PENDING 表示注册成功但需要继续鉴权) + */ + private AuthResult handleRegister(String clientId, IotDeviceMessage message, + String codecType, NetSocket socket) { + try { + // 1. 提取终端手机号 + String terminalPhone = extractTerminalPhone(message); + int flowId = extractFlowId(message); + + if (StrUtil.isBlank(terminalPhone)) { + log.warn("[handleRegister][无法提取终端手机号,clientId: {}]", clientId); + sendRegisterResp(socket, "", 0, (byte) 1, null, codecType, message.getRequestId()); + return AuthResult.failure("无法提取终端手机号"); + } + + // 2. 查询设备是否存在(deviceName 就是终端手机号) + IotDeviceRespDTO device = findDeviceByDeviceName(terminalPhone); + if (device == null) { + log.warn("[handleRegister][设备不存在,终端手机号: {}]", terminalPhone); + sendRegisterResp(socket, terminalPhone, flowId, (byte) 1, null, codecType, message.getRequestId()); + return AuthResult.failure("设备不存在"); + } + + // 3. 生成鉴权码(根据设备的认证类型) + String authToken = generateAuthToken(terminalPhone, device); + + // 4. 缓存鉴权码到 Redis(设置过期时间) + String redisKey = String.format(JT808_AUTH_TOKEN, terminalPhone); + stringRedisTemplate.opsForValue().set(redisKey, authToken, + AUTH_TOKEN_EXPIRE_MINUTES, TimeUnit.MINUTES); + + log.debug("[handleRegister][鉴权码已缓存到 Redis,终端手机号: {}, 过期时间: {} 分钟]", + terminalPhone, AUTH_TOKEN_EXPIRE_MINUTES); + + // 5. 发送注册应答(成功,result_code=0) + sendRegisterResp(socket, terminalPhone, flowId, (byte) 0, authToken, codecType, message.getRequestId()); + + log.info("[handleRegister][JT808 注册成功,终端手机号: {}, 设备ID: {}, 鉴权码已生成]", + terminalPhone, device.getId()); + + // 返回 PENDING 状态:注册成功但还需要鉴权 + return AuthResult.pending(device, "注册成功,等待鉴权"); + + } catch (Exception e) { + log.error("[handleRegister][JT808 注册处理异常,clientId: {}]", clientId, e); + try { + String terminalPhone = extractTerminalPhone(message); + sendRegisterResp(socket, terminalPhone != null ? terminalPhone : "", 0, + (byte) 2, null, codecType, message.getRequestId()); + } catch (Exception ex) { + log.error("[handleRegister][发送注册应答失败]", ex); + } + return AuthResult.failure("注册处理异常: " + e.getMessage()); + } + } + + /** + * 处理终端鉴权(0x0102) + *

+ * 流程: + * 1. 提取终端手机号和鉴权码 + * 2. 查询设备是否存在 + * 3. 验证鉴权码(与注册时缓存的鉴权码比对) + * 4. 发送通用应答(0x8001) + * 5. 清除鉴权码缓存 + *

+ * 注意:鉴权成功后,设备才能发送业务消息 + * + * @param clientId 客户端ID + * @param message 鉴权消息 + * @param codecType 编解码类型 + * @param socket 网络连接 + * @return 认证结果(SUCCESS 表示鉴权成功,可以发送业务消息) + */ + private AuthResult handleAuth(String clientId, IotDeviceMessage message, + String codecType, NetSocket socket) { + try { + // 1. 提取终端手机号和鉴权码 + String terminalPhone = extractTerminalPhone(message); + String authCode = extractAuthCode(message); + int flowId = extractFlowId(message); + + if (StrUtil.isBlank(terminalPhone)) { + log.warn("[handleAuth][无法提取终端手机号,clientId: {}]", clientId); + sendCommonResp(socket, "", 0, 0x0102, (byte) 1, codecType, message.getRequestId()); + return AuthResult.failure("无法提取终端手机号"); + } + + if (StrUtil.isBlank(authCode)) { + log.warn("[handleAuth][鉴权码为空,终端手机号: {}]", terminalPhone); + sendCommonResp(socket, terminalPhone, flowId, 0x0102, (byte) 1, codecType, message.getRequestId()); + return AuthResult.failure("鉴权码为空"); + } + + // 2. 查询设备是否存在 + IotDeviceRespDTO device = findDeviceByDeviceName(terminalPhone); + if (device == null) { + log.warn("[handleAuth][设备不存在,终端手机号: {}]", terminalPhone); + sendCommonResp(socket, terminalPhone, flowId, 0x0102, (byte) 1, codecType, message.getRequestId()); + return AuthResult.failure("设备不存在"); + } + + // 3. 从 Redis 获取鉴权码 + String redisKey = String.format(JT808_AUTH_TOKEN, terminalPhone); + String cachedAuthToken = stringRedisTemplate.opsForValue().get(redisKey); + + if (StrUtil.isBlank(cachedAuthToken)) { + log.warn("[handleAuth][未找到鉴权码缓存,终端手机号: {},可能未注册或缓存已过期]", terminalPhone); + sendCommonResp(socket, terminalPhone, flowId, 0x0102, (byte) 1, codecType, message.getRequestId()); + return AuthResult.failure("未找到鉴权码,请先注册"); + } + + // 验证鉴权码是否匹配(Redis 自动处理过期,无需手动检查) + if (!authCode.equals(cachedAuthToken)) { + log.warn("[handleAuth][鉴权码验证失败,终端手机号: {}, 期望: {}, 实际: {}]", + terminalPhone, cachedAuthToken, authCode); + sendCommonResp(socket, terminalPhone, flowId, 0x0102, (byte) 1, codecType, message.getRequestId()); + return AuthResult.failure("鉴权码错误"); + } + + // 4. 发送通用应答(成功,result_code=0) + sendCommonResp(socket, terminalPhone, flowId, 0x0102, (byte) 0, codecType, message.getRequestId()); + + // 5. 从 Redis 删除鉴权码 + stringRedisTemplate.delete(redisKey); + + log.info("[handleAuth][JT808 鉴权成功,终端手机号: {}, 设备ID: {}]", + terminalPhone, device.getId()); + + // 返回 SUCCESS 状态:鉴权成功,设备可以发送业务消息 + return AuthResult.success(device, "鉴权成功"); + + } catch (Exception e) { + log.error("[handleAuth][JT808 鉴权处理异常,clientId: {}]", clientId, e); + try { + String terminalPhone = extractTerminalPhone(message); + int flowId = extractFlowId(message); + sendCommonResp(socket, terminalPhone != null ? terminalPhone : "", flowId, + 0x0102, (byte) 2, codecType, message.getRequestId()); + } catch (Exception ex) { + log.error("[handleAuth][发送通用应答失败]", ex); + } + return AuthResult.failure("鉴权处理异常: " + e.getMessage()); + } + } + + // ========== 鉴权码生成策略 ========== + + /** + * 生成鉴权码 + *

+ * 使用系统标准认证 API 生成鉴权码,保持与标准协议认证的一致性。 + * 鉴权码的生成策略由系统统一管理,支持: + * - SECRET(一机一密):使用设备的 deviceSecret + * - PRODUCT_SECRET(一型一密):使用产品的 productSecret + * - NONE(免鉴权):使用终端手机号 + *

+ * 注意:此方法基于 demo 的实现(RegisterHandler:48),demo 使用手机号作为鉴权码 + * + * @param terminalPhone 终端手机号 + * @param device 设备信息 + * @return 鉴权码 + */ + private String generateAuthToken(String terminalPhone, IotDeviceRespDTO device) { + try { + // 构建认证参数(username 格式:productKey.deviceName) +// String username = IotDeviceAuthUtils.buildUsername(device.getProductKey(), device.getDeviceName()); + + // 调用系统标准认证 API 获取认证信息 + // 注意:这里只是为了获取密钥信息,不是真正的认证 + // 实际的鉴权码应该是设备的密钥(deviceSecret 或 productSecret) + // 但由于当前 API 不返回密钥,我们使用终端手机号作为鉴权码(与 demo 保持一致) + + log.debug("[generateAuthToken][生成鉴权码,终端手机号: {}, 设备: {}, 认证类型: {}]", + terminalPhone, device.getDeviceName(), + StrUtil.isNotBlank(device.getAuthType()) ? device.getAuthType() : device.getProductAuthType()); + + // 使用终端手机号作为鉴权码(与 demo 保持一致) + // TODO: 后续可以根据 authType 从系统获取真实的密钥 + return terminalPhone; + + } catch (Exception e) { + log.error("[generateAuthToken][生成鉴权码异常,终端手机号: {}]", terminalPhone, e); + // 异常时使用终端手机号兜底 + return terminalPhone; + } + } + + // ========== JT808 协议响应方法 ========== + + /** + * 发送终端注册应答(0x8100) + *

+ * 消息格式: + * - replyFlowId: 应答流水号(对应注册消息的流水号) + * - replyCode: 结果码(0-成功,1-车辆已注册,2-数据库无该车辆,3-终端已注册,4-数据库无该终端) + * - authToken: 鉴权码(成功时返回,用于后续鉴权) + * + * @param socket 网络连接 + * @param phone 终端手机号 + * @param replyFlowId 应答流水号 + * @param replyCode 结果码 + * @param authToken 鉴权码 + * @param codecType 编解码类型 + * @param requestId 请求ID + */ + private void sendRegisterResp(NetSocket socket, String phone, int replyFlowId, byte replyCode, + String authToken, String codecType, String requestId) { + try { + // 生成平台流水号 + int flowId = (int) (System.currentTimeMillis() % 65535) + 1; + + // 构建注册应答消息参数 + Map params = MapUtil.builder() + .put("phone", phone) + .put("replyFlowId", replyFlowId) + .put("replyCode", (int) replyCode) + .put("authToken", authToken) + .put("flowId", flowId) + .build(); + + // 构建响应消息 + IotDeviceMessage responseMessage = IotDeviceMessage.replyOf( + requestId, + "jt808.platform.registerResp", + params, + replyCode == 0 ? 0 : 401, + replyCode == 0 ? "注册成功" : "注册失败" + ); + + // 编码并发送 + byte[] encodedData = deviceMessageService.encodeDeviceMessage(responseMessage, codecType); + socket.write(Buffer.buffer(encodedData)); + + log.debug("[sendRegisterResp][发送注册应答,终端手机号: {}, 结果码: {}]", phone, replyCode); + + } catch (Exception e) { + log.error("[sendRegisterResp][发送注册应答失败,终端手机号: ]", phone, e); + } + } + + /** + * 发送平台通用应答(0x8001) + *

+ * 消息格式: + * - replyFlowId: 应答流水号(对应原消息的流水号) + * - replyId: 应答ID(对应原消息的消息ID,如 0x0102) + * - replyCode: 结果码(0-成功,1-失败,2-消息有误,3-不支持) + * + * @param socket 网络连接 + * @param phone 终端手机号 + * @param replyFlowId 应答流水号 + * @param replyId 应答ID + * @param replyCode 结果码 + * @param codecType 编解码类型 + * @param requestId 请求ID + */ + private void sendCommonResp(NetSocket socket, String phone, int replyFlowId, int replyId, + byte replyCode, String codecType, String requestId) { + try { + // 生成平台流水号 + int flowId = (int) (System.currentTimeMillis() % 65535) + 1; + + // 构建通用应答消息参数 + Map params = MapUtil.builder() + .put("phone", phone) + .put("replyFlowId", replyFlowId) + .put("replyId", replyId) + .put("replyCode", (int) replyCode) + .put("flowId", flowId) + .build(); + + // 构建响应消息 + IotDeviceMessage responseMessage = IotDeviceMessage.replyOf( + requestId, + "jt808.platform.commonResp", + params, + replyCode == 0 ? 0 : 401, + replyCode == 0 ? "成功" : "失败" + ); + + // 编码并发送 + byte[] encodedData = deviceMessageService.encodeDeviceMessage(responseMessage, codecType); + socket.write(Buffer.buffer(encodedData)); + + log.debug("[sendCommonResp][发送通用应答,终端手机号: {}, 应答ID: 0x{}, 结果码: {}]", + phone, Integer.toHexString(replyId), replyCode); + + } catch (Exception e) { + log.error("[sendCommonResp][发送通用应答失败,终端手机号: {}]", phone, e); + } + } + + // ========== 消息参数提取方法 ========== + + /** + * 从消息中提取终端手机号 + *

+ * 终端手机号存储在消息的 _metadata.terminalPhone 字段中, + * 由 IotJt808DeviceMessageCodec 解码时自动填充 + * + * @param message 设备消息 + * @return 终端手机号,提取失败返回 null + */ + @SuppressWarnings("unchecked") + private String extractTerminalPhone(IotDeviceMessage message) { + if (message.getParams() instanceof Map) { + Map params = (Map) message.getParams(); + if (params.get("_metadata") instanceof Map) { + Map metadata = (Map) params.get("_metadata"); + Object terminalPhone = metadata.get("terminalPhone"); + if (terminalPhone != null) { + return terminalPhone.toString(); + } + } + } + return null; + } + + /** + * 从消息中提取流水号 + *

+ * 流水号存储在消息的 _metadata.flowId 字段中 + * + * @param message 设备消息 + * @return 流水号,提取失败返回 1 + */ + @SuppressWarnings("unchecked") + private int extractFlowId(IotDeviceMessage message) { + if (message.getParams() instanceof Map) { + Map params = (Map) message.getParams(); + if (params.get("_metadata") instanceof Map) { + Map metadata = (Map) params.get("_metadata"); + Object flowId = metadata.get("flowId"); + if (flowId instanceof Number) { + return ((Number) flowId).intValue(); + } + } + } + return 1; + } + + /** + * 从消息中提取鉴权码 + *

+ * 鉴权码存储在消息的 params.authCode 字段中, + * 由 IotJt808DeviceMessageCodec 解码时填充 + * + * @param message 设备消息 + * @return 鉴权码,提取失败返回 null + */ + @SuppressWarnings("unchecked") + private String extractAuthCode(IotDeviceMessage message) { + if (message.getParams() instanceof Map) { + Map params = (Map) message.getParams(); + Object authCode = params.get("authCode"); + if (authCode != null) { + return authCode.toString(); + } + } + return null; + } + + /** + * 通过 deviceName 查找设备 + *

+ * 对于 JT808 设备,deviceName 就是终端手机号(纯数字) + * + * @param deviceName 设备名称(终端手机号) + * @return 设备信息,查询失败返回 null + */ + private IotDeviceRespDTO findDeviceByDeviceName(String deviceName) { + try { + // 调用设备 API 查询(只传 deviceName) + CommonResult result = + deviceApi.getDevice(new IotDeviceGetReqDTO().setDeviceName(deviceName)); + + if (result.isSuccess() && result.getData() != null) { + return result.getData(); + } + + log.warn("[findDeviceByDeviceName][设备不存在,deviceName: {}]", deviceName); + return null; + + } catch (Exception e) { + log.error("[findDeviceByDeviceName][查询设备失败,deviceName: {}]", deviceName, e); + return null; + } + } + +} \ No newline at end of file diff --git a/viewshanghai-module-iot/viewshanghai-module-iot-gateway/src/main/java/com/viewshanghai/module/iot/gateway/protocol/tcp/handler/ProtocolHandler.java b/viewshanghai-module-iot/viewshanghai-module-iot-gateway/src/main/java/com/viewshanghai/module/iot/gateway/protocol/tcp/handler/ProtocolHandler.java new file mode 100644 index 0000000..36456b6 --- /dev/null +++ b/viewshanghai-module-iot/viewshanghai-module-iot-gateway/src/main/java/com/viewshanghai/module/iot/gateway/protocol/tcp/handler/ProtocolHandler.java @@ -0,0 +1,105 @@ +package com.viewshanghai.module.iot.gateway.protocol.tcp.handler; + +import com.viewshanghai.module.iot.core.mq.message.IotDeviceMessage; +import io.vertx.core.net.NetSocket; + +/** + * TCP 协议处理器接口 + *

+ * 用于实现不同协议的插件化处理,每个协议实现此接口以提供: + * 1. 协议识别能力(canHandle) + * 2. 认证处理逻辑(handleAuthentication) + * 3. 业务消息处理(handleBusinessMessage) + * 4. 响应发送(sendResponse) + *

+ * 设计原则: + * - 每个协议处理器独立负责自己的认证和消息处理逻辑 + * - 主处理器(IotTcpUpstreamHandler)只负责路由,不包含协议特定逻辑 + * - 协议处理器通过 Spring 自动注入,实现热插拔 + * + * @author lzh + */ +public interface ProtocolHandler { + + /** + * 获取协议类型标识 + *

+ * 用于日志记录和调试,建议使用大写英文,如:STANDARD, JT808, MODBUS + * + * @return 协议类型标识 + */ + String getProtocolType(); + + /** + * 判断是否能处理该消息 + *

+ * 根据消息内容和编解码类型判断是否由当前协议处理器处理。 + * 注意:多个处理器可能都返回 true,主处理器会选择第一个匹配的。 + * + * @param message 已解码的设备消息 + * @param codecType 消息编解码类型(如:JSON, BINARY, JT808) + * @return true-可以处理,false-不能处理 + */ + boolean canHandle(IotDeviceMessage message, String codecType); + + /** + * 处理认证消息 + *

+ * 不同协议的认证流程可能不同: + * - 标准协议:单次认证(auth 方法) + * - JT808:两步认证(注册 + 鉴权) + * - Modbus:可能无需认证 + *

+ * 认证成功后,处理器应返回 AuthResult.success(),主处理器会: + * 1. 注册连接到 ConnectionManager + * 2. 发送设备上线消息 + *

+ * 认证失败或需要多步认证时,返回 AuthResult.failure() 或 AuthResult.pending() + * + * @param clientId 客户端临时ID(连接建立时生成) + * @param message 认证消息 + * @param codecType 消息编解码类型 + * @param socket 网络连接(用于发送响应) + * @return 认证结果 + */ + AuthResult handleAuthentication(String clientId, IotDeviceMessage message, + String codecType, NetSocket socket); + + /** + * 处理业务消息 + *

+ * 处理已认证设备发送的业务消息(属性上报、事件上报等)。 + * 业务消息通常需要转发到消息总线,由业务层处理。 + *

+ * 注意:此方法调用前,主处理器已验证设备已认证。 + * + * @param clientId 客户端临时ID + * @param message 业务消息 + * @param codecType 消息编解码类型 + * @param socket 网络连接 + * @param productKey 产品Key(从连接信息中获取) + * @param deviceName 设备名称(从连接信息中获取) + * @param serverId 服务器ID(用于消息路由) + */ + void handleBusinessMessage(String clientId, IotDeviceMessage message, + String codecType, NetSocket socket, + String productKey, String deviceName, String serverId); + + /** + * 发送响应消息 + *

+ * 用于向设备发送响应消息(成功/失败/错误等)。 + * 不同协议的响应格式可能不同,由协议处理器自行实现。 + *

+ * 注意:某些协议可能不需要响应(如单向上报),可以空实现。 + * + * @param socket 网络连接 + * @param originalMessage 原始请求消息(用于提取 requestId 等信息) + * @param success 是否成功 + * @param message 响应消息内容 + * @param codecType 消息编解码类型 + */ + void sendResponse(NetSocket socket, IotDeviceMessage originalMessage, + boolean success, String message, String codecType); + +} diff --git a/viewshanghai-module-iot/viewshanghai-module-iot-gateway/src/main/java/com/viewshanghai/module/iot/gateway/protocol/tcp/handler/StandardProtocolHandler.java b/viewshanghai-module-iot/viewshanghai-module-iot-gateway/src/main/java/com/viewshanghai/module/iot/gateway/protocol/tcp/handler/StandardProtocolHandler.java new file mode 100644 index 0000000..126a44b --- /dev/null +++ b/viewshanghai-module-iot/viewshanghai-module-iot-gateway/src/main/java/com/viewshanghai/module/iot/gateway/protocol/tcp/handler/StandardProtocolHandler.java @@ -0,0 +1,192 @@ +package com.viewshanghai.module.iot.gateway.protocol.tcp.handler; + +import cn.hutool.core.map.MapUtil; +import cn.hutool.core.util.BooleanUtil; +import cn.hutool.extra.spring.SpringUtil; +import com.viewshanghai.framework.common.pojo.CommonResult; +import com.viewshanghai.module.iot.core.biz.IotDeviceCommonApi; +import com.viewshanghai.module.iot.core.biz.dto.IotDeviceAuthReqDTO; +import com.viewshanghai.module.iot.core.biz.dto.IotDeviceRespDTO; +import com.viewshanghai.module.iot.core.mq.message.IotDeviceMessage; +import com.viewshanghai.module.iot.core.util.IotDeviceAuthUtils; +import com.viewshanghai.module.iot.gateway.codec.tcp.IotTcpBinaryDeviceMessageCodec; +import com.viewshanghai.module.iot.gateway.codec.tcp.IotTcpJsonDeviceMessageCodec; +import com.viewshanghai.module.iot.gateway.service.device.IotDeviceService; +import com.viewshanghai.module.iot.gateway.service.device.message.IotDeviceMessageService; +import io.vertx.core.net.NetSocket; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +import java.util.Map; + +/** + * 标准协议处理器 + *

+ * 处理使用标准认证流程的协议(JSON 和 Binary 格式): + * 1. 单次认证:设备发送 auth 消息,包含 username 和 password + * 2. 认证成功后,设备可以发送业务消息 + * 3. 业务消息转发到消息总线 + *

+ * 支持的编解码类型: + * - JSON:IotTcpJsonDeviceMessageCodec + * - BINARY:IotTcpBinaryDeviceMessageCodec + * + * @author lzh + */ +@Slf4j +@Component +public class StandardProtocolHandler extends AbstractProtocolHandler { + + private static final String CODEC_TYPE_JSON = IotTcpJsonDeviceMessageCodec.TYPE; + private static final String CODEC_TYPE_BINARY = IotTcpBinaryDeviceMessageCodec.TYPE; + + private final IotDeviceCommonApi deviceApi; + private final IotDeviceService deviceService; + + public StandardProtocolHandler(IotDeviceMessageService deviceMessageService) { + super(deviceMessageService); + // 使用 SpringUtil 延迟获取,避免循环依赖 + this.deviceApi = SpringUtil.getBean(IotDeviceCommonApi.class); + this.deviceService = SpringUtil.getBean(IotDeviceService.class); + } + + @Override + public String getProtocolType() { + return "STANDARD"; + } + + @Override + public boolean canHandle(IotDeviceMessage message, String codecType) { + // 处理 JSON 和 BINARY 格式的标准协议 + return CODEC_TYPE_JSON.equals(codecType) || CODEC_TYPE_BINARY.equals(codecType); + } + + @Override + public AuthResult handleAuthentication(String clientId, IotDeviceMessage message, + String codecType, NetSocket socket) { + try { + // 1. 解析认证参数 + IotDeviceAuthReqDTO authParams = parseAuthParams(message.getParams()); + if (authParams == null) { + log.warn("[handleAuthentication][认证参数解析失败,clientId: {}]", clientId); + sendErrorResponse(socket, message, "认证参数不完整", codecType); + return AuthResult.failure("认证参数不完整"); + } + + // 2. 执行认证 + if (!validateDeviceAuth(authParams)) { + log.warn("[handleAuthentication][认证失败,clientId: , username: {}]", + clientId, authParams.getUsername()); + sendErrorResponse(socket, message, "认证失败", codecType); + return AuthResult.failure("认证失败"); + } + + // 3. 解析设备信息 + IotDeviceAuthUtils.DeviceInfo deviceInfo = IotDeviceAuthUtils.parseUsername(authParams.getUsername()); + if (deviceInfo == null) { + log.warn("[handleAuthentication][解析设备信息失败,username: {}]", authParams.getUsername()); + sendErrorResponse(socket, message, "解析设备信息失败", codecType); + return AuthResult.failure("解析设备信息失败"); + } + + // 4. 获取设备信息 + IotDeviceRespDTO device = deviceService.getDeviceFromCache( + deviceInfo.getProductKey(), deviceInfo.getDeviceName()); + if (device == null) { + log.warn("[handleAuthentication][设备不存在,productKey: {}, deviceName: {}]", + deviceInfo.getProductKey(), deviceInfo.getDeviceName()); + sendErrorResponse(socket, message, "设备不存在", codecType); + return AuthResult.failure("设备不存在"); + } + + // 5. 发送成功响应 + sendSuccessResponse(socket, message, "认证成功", codecType); + + log.info("[handleAuthentication][认证成功,设备ID: {}, 设备名: {}]", + device.getId(), device.getDeviceName()); + + return AuthResult.success(device, "认证成功"); + + } catch (Exception e) { + log.error("[handleAuthentication][认证处理异常,clientId: {}]", clientId, e); + sendErrorResponse(socket, message, "认证处理异常", codecType); + return AuthResult.failure("认证处理异常: " + e.getMessage()); + } + } + + @Override + public void handleBusinessMessage(String clientId, IotDeviceMessage message, + String codecType, NetSocket socket, + String productKey, String deviceName, String serverId) { + try { + // 发送消息到消息总线 + deviceMessageService.sendDeviceMessage(message, productKey, deviceName, serverId); + + log.info("[handleBusinessMessage][发送消息到消息总线,clientId: {}, method: {}, messageId: {}]", + clientId, message.getMethod(), message.getId()); + + } catch (Exception e) { + log.error("[handleBusinessMessage][业务消息处理异常,clientId: {}]", clientId, e); + } + } + + // ========== 私有方法 ========== + + /** + * 解析认证参数 + * + * @param params 参数对象(通常为 Map 类型) + * @return 认证参数 DTO,解析失败时返回 null + */ + @SuppressWarnings("unchecked") + private IotDeviceAuthReqDTO parseAuthParams(Object params) { + if (params == null) { + return null; + } + + try { + // 参数默认为 Map 类型,直接转换 + if (params instanceof Map) { + Map paramMap = (Map) params; + return new IotDeviceAuthReqDTO() + .setClientId(MapUtil.getStr(paramMap, "clientId")) + .setUsername(MapUtil.getStr(paramMap, "username")) + .setPassword(MapUtil.getStr(paramMap, "password")); + } + + // 如果已经是目标类型,直接返回 + if (params instanceof IotDeviceAuthReqDTO) { + return (IotDeviceAuthReqDTO) params; + } + + return null; + } catch (Exception e) { + log.error("[parseAuthParams][解析认证参数失败,params: {}]", params, e); + return null; + } + } + + /** + * 验证设备认证信息 + * + * @param authParams 认证参数 + * @return 是否认证成功 + */ + private boolean validateDeviceAuth(IotDeviceAuthReqDTO authParams) { + try { + // 调用系统标准认证 API + CommonResult result = deviceApi.authDevice(new IotDeviceAuthReqDTO() + .setClientId(authParams.getClientId()) + .setUsername(authParams.getUsername()) + .setPassword(authParams.getPassword())); + + result.checkError(); + return BooleanUtil.isTrue(result.getData()); + + } catch (Exception e) { + log.error("[validateDeviceAuth][设备认证异常,username: {}]", authParams.getUsername(), e); + return false; + } + } + +} diff --git a/viewshanghai-module-iot/viewshanghai-module-iot-gateway/src/main/java/com/viewshanghai/module/iot/gateway/protocol/tcp/router/IotTcpDownstreamHandler.java b/viewshanghai-module-iot/viewshanghai-module-iot-gateway/src/main/java/com/viewshanghai/module/iot/gateway/protocol/tcp/router/IotTcpDownstreamHandler.java index e6c102b..ce138dd 100644 --- a/viewshanghai-module-iot/viewshanghai-module-iot-gateway/src/main/java/com/viewshanghai/module/iot/gateway/protocol/tcp/router/IotTcpDownstreamHandler.java +++ b/viewshanghai-module-iot/viewshanghai-module-iot-gateway/src/main/java/com/viewshanghai/module/iot/gateway/protocol/tcp/router/IotTcpDownstreamHandler.java @@ -43,7 +43,10 @@ public class IotTcpDownstreamHandler { return; } - // 2. 根据产品 Key 和设备名称编码消息并发送到设备 + // 2. 补充 deviceName 到 params 中(用于 JT808 等需要设备标识的编码器) + injectDeviceNameToParams(message, deviceInfo.getDeviceName()); + + // 3. 根据产品 Key 和设备名称编码消息并发送到设备 byte[] bytes = deviceMessageService.encodeDeviceMessage(message, deviceInfo.getProductKey(), deviceInfo.getDeviceName()); boolean success = connectionManager.sendToDevice(message.getDeviceId(), bytes); @@ -60,4 +63,21 @@ public class IotTcpDownstreamHandler { } } + /** + * 将 deviceName 注入到消息 params 中 + * + * 对于 JT808 等需要设备标识(如终端手机号)的协议, + * 编码器可以从 params._deviceName 中获取 + */ + @SuppressWarnings("unchecked") + private void injectDeviceNameToParams(IotDeviceMessage message, String deviceName) { + if (message.getParams() == null) { + message.setParams(new java.util.HashMap<>()); + } + + if (message.getParams() instanceof java.util.Map) { + ((java.util.Map) message.getParams()).put("_deviceName", deviceName); + } + } + } \ No newline at end of file diff --git a/viewshanghai-module-iot/viewshanghai-module-iot-gateway/src/main/java/com/viewshanghai/module/iot/gateway/protocol/tcp/router/IotTcpUpstreamHandler.java b/viewshanghai-module-iot/viewshanghai-module-iot-gateway/src/main/java/com/viewshanghai/module/iot/gateway/protocol/tcp/router/IotTcpUpstreamHandler.java index c5f4879..a2370ee 100644 --- a/viewshanghai-module-iot/viewshanghai-module-iot-gateway/src/main/java/com/viewshanghai/module/iot/gateway/protocol/tcp/router/IotTcpUpstreamHandler.java +++ b/viewshanghai-module-iot/viewshanghai-module-iot-gateway/src/main/java/com/viewshanghai/module/iot/gateway/protocol/tcp/router/IotTcpUpstreamHandler.java @@ -1,30 +1,40 @@ package com.viewshanghai.module.iot.gateway.protocol.tcp.router; -import cn.hutool.core.map.MapUtil; -import cn.hutool.core.util.BooleanUtil; import cn.hutool.core.util.IdUtil; import cn.hutool.core.util.StrUtil; -import cn.hutool.extra.spring.SpringUtil; -import com.viewshanghai.framework.common.pojo.CommonResult; -import com.viewshanghai.framework.common.util.json.JsonUtils; -import com.viewshanghai.module.iot.core.biz.IotDeviceCommonApi; -import com.viewshanghai.module.iot.core.biz.dto.IotDeviceAuthReqDTO; import com.viewshanghai.module.iot.core.biz.dto.IotDeviceRespDTO; import com.viewshanghai.module.iot.core.mq.message.IotDeviceMessage; -import com.viewshanghai.module.iot.core.util.IotDeviceAuthUtils; +import com.viewshanghai.module.iot.gateway.codec.jt808.IotJt808DeviceMessageCodec; import com.viewshanghai.module.iot.gateway.codec.tcp.IotTcpBinaryDeviceMessageCodec; import com.viewshanghai.module.iot.gateway.codec.tcp.IotTcpJsonDeviceMessageCodec; import com.viewshanghai.module.iot.gateway.protocol.tcp.IotTcpUpstreamProtocol; +import com.viewshanghai.module.iot.gateway.protocol.tcp.handler.AuthResult; +import com.viewshanghai.module.iot.gateway.protocol.tcp.handler.ProtocolHandler; import com.viewshanghai.module.iot.gateway.protocol.tcp.manager.IotTcpConnectionManager; -import com.viewshanghai.module.iot.gateway.service.device.IotDeviceService; import com.viewshanghai.module.iot.gateway.service.device.message.IotDeviceMessageService; import io.vertx.core.Handler; import io.vertx.core.buffer.Buffer; import io.vertx.core.net.NetSocket; import lombok.extern.slf4j.Slf4j; +import java.util.List; + /** - * TCP 上行消息处理器 + * TCP 上行消息处理器(重构版) + *

+ * 职责: + * 1. 管理 TCP 连接生命周期(连接建立、异常、关闭) + * 2. 检测消息格式类型(JSON/Binary/JT808) + * 3. 解码设备消息 + * 4. 路由到对应的协议处理器 + * 5. 管理设备认证状态 + * 6. 发送设备上线/离线消息 + *

+ * 设计原则: + * - 主处理器只负责路由,不包含协议特定逻辑 + * - 协议处理器通过 Spring 自动注入,实现插件化 + * - 认证成功后,统一注册连接和发送上线消息 + * - 支持多步认证(如 JT808 的注册+鉴权) * * @author 芋道源码 */ @@ -33,27 +43,20 @@ public class IotTcpUpstreamHandler implements Handler { private static final String CODEC_TYPE_JSON = IotTcpJsonDeviceMessageCodec.TYPE; private static final String CODEC_TYPE_BINARY = IotTcpBinaryDeviceMessageCodec.TYPE; - - private static final String AUTH_METHOD = "auth"; + private static final String CODEC_TYPE_JT808 = IotJt808DeviceMessageCodec.TYPE; private final IotDeviceMessageService deviceMessageService; - - private final IotDeviceService deviceService; - private final IotTcpConnectionManager connectionManager; - - private final IotDeviceCommonApi deviceApi; - + private final List protocolHandlers; private final String serverId; public IotTcpUpstreamHandler(IotTcpUpstreamProtocol protocol, IotDeviceMessageService deviceMessageService, - IotDeviceService deviceService, - IotTcpConnectionManager connectionManager) { + IotTcpConnectionManager connectionManager, + List protocolHandlers) { this.deviceMessageService = deviceMessageService; - this.deviceService = deviceService; this.connectionManager = connectionManager; - this.deviceApi = SpringUtil.getBean(IotDeviceCommonApi.class); + this.protocolHandlers = protocolHandlers; this.serverId = protocol.getServerId(); } @@ -64,9 +67,10 @@ public class IotTcpUpstreamHandler implements Handler { // 设置异常和关闭处理器 socket.exceptionHandler(ex -> { - log.warn("[handle][连接异常,客户端 ID: {},地址: {}]", clientId, socket.remoteAddress()); + log.warn("[handle][连接异常,客户端 ID: {},地址: {}]", clientId, socket.remoteAddress(), ex); cleanupConnection(socket); }); + socket.closeHandler(v -> { log.debug("[handle][连接关闭,客户端 ID: {},地址: {}]", clientId, socket.remoteAddress()); cleanupConnection(socket); @@ -77,8 +81,8 @@ public class IotTcpUpstreamHandler implements Handler { try { processMessage(clientId, buffer, socket); } catch (Exception e) { - log.error("[handle][消息解码失败,断开连接,客户端 ID: {},地址: {},错误: {}]", - clientId, socket.remoteAddress(), e.getMessage()); + log.error("[handle][消息处理失败,断开连接,客户端 ID: {},地址: {},错误: {}]", + clientId, socket.remoteAddress(), e.getMessage(), e); cleanupConnection(socket); socket.close(); } @@ -87,6 +91,13 @@ public class IotTcpUpstreamHandler implements Handler { /** * 处理消息 + *

+ * 流程: + * 1. 检测消息格式类型(JSON/Binary/JT808) + * 2. 解码消息 + * 3. 查找协议处理器 + * 4. 判断是否为认证消息 + * 5. 路由到协议处理器处理 * * @param clientId 客户端 ID * @param buffer 消息 @@ -114,113 +125,125 @@ public class IotTcpUpstreamHandler implements Handler { throw new Exception("消息解码失败: " + e.getMessage(), e); } - // 4. 根据消息类型路由处理 + // 4. 查找协议处理器 + ProtocolHandler handler = findProtocolHandler(message, codecType); + if (handler == null) { + log.warn("[processMessage][未找到协议处理器,codecType: {}, method: {}]", + codecType, message.getMethod()); + return; + } + + // 5. 判断是否为认证消息 + if (isAuthenticationMessage(message)) { + handleAuthenticationWithProtocol(clientId, message, codecType, socket, handler); + } else { + handleBusinessWithProtocol(clientId, message, codecType, socket, handler); + } + } + + /** + * 使用协议处理器处理认证消息 + *

+ * 认证结果处理: + * - SUCCESS:注册连接,发送上线消息 + * - PENDING:等待后续认证步骤(如 JT808 注册后等待鉴权) + * - FAILURE:认证失败,不做处理(协议处理器已发送失败响应) + * + * @param clientId 客户端 ID + * @param message 认证消息 + * @param codecType 消息编解码类型 + * @param socket 网络连接 + * @param handler 协议处理器 + */ + private void handleAuthenticationWithProtocol(String clientId, IotDeviceMessage message, + String codecType, NetSocket socket, + ProtocolHandler handler) { try { - if (AUTH_METHOD.equals(message.getMethod())) { - // 认证请求 - handleAuthenticationRequest(clientId, message, codecType, socket); + // 委托给协议处理器 + AuthResult result = handler.handleAuthentication(clientId, message, codecType, socket); + + // 根据认证结果处理 + if (result.isSuccess()) { + // 认证成功:注册连接并发送上线消息 + registerConnection(socket, result.getDevice(), clientId, codecType); + sendOnlineMessage(result.getDevice()); + + log.info("[handleAuthentication][认证成功,设备: {}, 协议: {}]", + result.getDevice().getDeviceName(), handler.getProtocolType()); + + } else if (result.isPending()) { + // 认证待定:等待后续认证步骤(如 JT808 注册后等待鉴权) + log.info("[handleAuthentication][认证待定,设备: {}, 协议: {}, 消息: {}]", + result.getDevice() != null ? result.getDevice().getDeviceName() : "unknown", + handler.getProtocolType(), result.getMessage()); + } else { - // 业务消息 - handleBusinessRequest(clientId, message, codecType, socket); + // 认证失败:协议处理器已发送失败响应,这里只记录日志 + log.warn("[handleAuthentication][认证失败,clientId: {}, 协议: {}, 原因: {}]", + clientId, handler.getProtocolType(), result.getMessage()); } + } catch (Exception e) { - log.error("[processMessage][处理消息失败,客户端 ID: {},消息方法: {}]", - clientId, message.getMethod(), e); - // 发送错误响应,避免客户端一直等待 - try { - sendErrorResponse(socket, message.getRequestId(), "消息处理失败", codecType); - } catch (Exception responseEx) { - log.error("[processMessage][发送错误响应失败,客户端 ID: {}]", clientId, responseEx); - } + log.error("[handleAuthentication][认证异常,clientId: {}, 协议: {}]", + clientId, handler.getProtocolType(), e); + handler.sendResponse(socket, message, false, "认证异常", codecType); } } /** - * 处理认证请求 + * 使用协议处理器处理业务消息 + *

+ * 前置条件:设备已认证 * * @param clientId 客户端 ID - * @param message 消息信息 + * @param message 业务消息 * @param codecType 消息编解码类型 * @param socket 网络连接 + * @param handler 协议处理器 */ - private void handleAuthenticationRequest(String clientId, IotDeviceMessage message, String codecType, - NetSocket socket) { - try { - // 1.1 解析认证参数 - IotDeviceAuthReqDTO authParams = parseAuthParams(message.getParams()); - if (authParams == null) { - log.warn("[handleAuthenticationRequest][认证参数解析失败,客户端 ID: {}]", clientId); - sendErrorResponse(socket, message.getRequestId(), "认证参数不完整", codecType); - return; - } - // 1.2 执行认证 - if (!validateDeviceAuth(authParams)) { - log.warn("[handleAuthenticationRequest][认证失败,客户端 ID: {},username: {}]", - clientId, authParams.getUsername()); - sendErrorResponse(socket, message.getRequestId(), "认证失败", codecType); - return; - } - - // 2.1 解析设备信息 - IotDeviceAuthUtils.DeviceInfo deviceInfo = IotDeviceAuthUtils.parseUsername(authParams.getUsername()); - if (deviceInfo == null) { - sendErrorResponse(socket, message.getRequestId(), "解析设备信息失败", codecType); - return; - } - // 2.2 获取设备信息 - IotDeviceRespDTO device = deviceService.getDeviceFromCache(deviceInfo.getProductKey(), - deviceInfo.getDeviceName()); - if (device == null) { - sendErrorResponse(socket, message.getRequestId(), "设备不存在", codecType); - return; - } - - // 3.1 注册连接 - registerConnection(socket, device, clientId, codecType); - // 3.2 发送上线消息 - sendOnlineMessage(device); - // 3.3 发送成功响应 - sendSuccessResponse(socket, message.getRequestId(), "认证成功", codecType); - log.info("[handleAuthenticationRequest][认证成功,设备 ID: {},设备名: {}]", - device.getId(), device.getDeviceName()); - } catch (Exception e) { - log.error("[handleAuthenticationRequest][认证处理异常,客户端 ID: {}]", clientId, e); - sendErrorResponse(socket, message.getRequestId(), "认证处理异常", codecType); - } - } - - /** - * 处理业务请求 - * - * @param clientId 客户端 ID - * @param message 消息信息 - * @param codecType 消息编解码类型 - * @param socket 网络连接 - */ - private void handleBusinessRequest(String clientId, IotDeviceMessage message, String codecType, NetSocket socket) { + private void handleBusinessWithProtocol(String clientId, IotDeviceMessage message, + String codecType, NetSocket socket, + ProtocolHandler handler) { try { // 1. 检查认证状态 if (connectionManager.isNotAuthenticated(socket)) { - log.warn("[handleBusinessRequest][设备未认证,客户端 ID: {}]", clientId); - sendErrorResponse(socket, message.getRequestId(), "请先进行认证", codecType); + log.warn("[handleBusinessMessage][设备未认证,clientId: {}]", clientId); + handler.sendResponse(socket, message, false, "请先进行认证", codecType); return; } - // 2. 获取认证信息并处理业务消息 + // 2. 获取连接信息 IotTcpConnectionManager.ConnectionInfo connectionInfo = connectionManager.getConnectionInfo(socket); + if (connectionInfo == null) { + log.error("[handleBusinessMessage][连接信息不存在,clientId: {}]", clientId); + return; + } + + // 3. 委托给协议处理器处理业务消息 + handler.handleBusinessMessage( + clientId, + message, + codecType, + socket, + connectionInfo.getProductKey(), + connectionInfo.getDeviceName(), + serverId + ); - // 3. 发送消息到消息总线 - deviceMessageService.sendDeviceMessage(message, connectionInfo.getProductKey(), - connectionInfo.getDeviceName(), serverId); - log.info("[handleBusinessRequest][发送消息到消息总线,客户端 ID: {},消息: {}", - clientId, message.toString()); } catch (Exception e) { - log.error("[handleBusinessRequest][业务请求处理异常,客户端 ID: {}]", clientId, e); + log.error("[handleBusinessMessage][业务消息处理异常,clientId: {}]", clientId, e); } } /** * 获取消息编解码类型 + *

+ * 检测优先级: + * 1. 如果已认证,使用缓存的编解码类型 + * 2. 未认证时,通过消息格式自动检测: + * - JT808:首尾标识符 0x7e + * - Binary:魔术字 0x7E + * - JSON:默认 * * @param buffer 消息 * @param socket 网络连接 @@ -235,15 +258,69 @@ public class IotTcpUpstreamHandler implements Handler { } // 2. 未认证时检测消息格式类型 - return IotTcpBinaryDeviceMessageCodec.isBinaryFormatQuick(buffer.getBytes()) ? CODEC_TYPE_BINARY - : CODEC_TYPE_JSON; + byte[] data = buffer.getBytes(); + + // 2.1 检测是否为 JT808 格式(首尾标识符 0x7e) + if (IotJt808DeviceMessageCodec.isJt808Format(data)) { + return CODEC_TYPE_JT808; + } + + // 2.2 检测是否为自定义二进制格式(魔术字 0x7E) + if (IotTcpBinaryDeviceMessageCodec.isBinaryFormatQuick(data)) { + return CODEC_TYPE_BINARY; + } + + // 2.3 默认为 JSON 格式 + return CODEC_TYPE_JSON; + } + + /** + * 查找协议处理器 + *

+ * 遍历所有协议处理器,返回第一个能处理该消息的处理器 + * + * @param message 设备消息 + * @param codecType 消息编解码类型 + * @return 协议处理器,未找到返回 null + */ + private ProtocolHandler findProtocolHandler(IotDeviceMessage message, String codecType) { + return protocolHandlers.stream() + .filter(handler -> handler.canHandle(message, codecType)) + .findFirst() + .orElse(null); + } + + /** + * 判断是否为认证消息 + *

+ * 认证消息包括: + * - 标准认证:auth + * - JT808 注册:jt808.terminal.register + * - JT808 鉴权:jt808.terminal.auth + * + * @param message 设备消息 + * @return true-是认证消息,false-不是 + */ + private boolean isAuthenticationMessage(IotDeviceMessage message) { + String method = message.getMethod(); + return "auth".equals(method) + || "jt808.terminal.register".equals(method) + || "jt808.terminal.auth".equals(method); } /** * 注册连接信息 + *

+ * 将设备连接信息注册到连接管理器,包括: + * - 设备 ID + * - 产品 Key + * - 设备名称 + * - 客户端 ID + * - 编解码类型 + * - 认证状态 * * @param socket 网络连接 - * @param device 设备 + * @param device 设备信息 * @param clientId 客户端 ID * @param codecType 消息编解码类型 */ @@ -256,12 +333,15 @@ public class IotTcpUpstreamHandler implements Handler { .setClientId(clientId) .setCodecType(codecType) .setAuthenticated(true); - // 注册连接 + + // 注册连接(如果设备已有其他连接,会自动断开旧连接) connectionManager.registerConnection(socket, device.getId(), connectionInfo); } /** * 发送设备上线消息 + *

+ * 设备认证成功后,发送上线消息到消息总线,通知业务层设备已上线 * * @param device 设备信息 */ @@ -270,6 +350,9 @@ public class IotTcpUpstreamHandler implements Handler { IotDeviceMessage onlineMessage = IotDeviceMessage.buildStateUpdateOnline(); deviceMessageService.sendDeviceMessage(onlineMessage, device.getProductKey(), device.getDeviceName(), serverId); + + log.debug("[sendOnlineMessage][发送上线消息成功,设备: {}]", device.getDeviceName()); + } catch (Exception e) { log.error("[sendOnlineMessage][发送上线消息失败,设备: {}]", device.getDeviceName(), e); } @@ -277,6 +360,8 @@ public class IotTcpUpstreamHandler implements Handler { /** * 清理连接 + *

+ * 连接关闭或异常时,清理连接信息并发送离线消息 * * @param socket 网络连接 */ @@ -284,125 +369,20 @@ public class IotTcpUpstreamHandler implements Handler { try { // 1. 发送离线消息(如果已认证) IotTcpConnectionManager.ConnectionInfo connectionInfo = connectionManager.getConnectionInfo(socket); - if (connectionInfo != null) { + if (connectionInfo != null && connectionInfo.isAuthenticated()) { IotDeviceMessage offlineMessage = IotDeviceMessage.buildStateOffline(); deviceMessageService.sendDeviceMessage(offlineMessage, connectionInfo.getProductKey(), connectionInfo.getDeviceName(), serverId); + + log.debug("[cleanupConnection][发送离线消息成功,设备: {}]", connectionInfo.getDeviceName()); } // 2. 注销连接 connectionManager.unregisterConnection(socket); + } catch (Exception e) { log.error("[cleanupConnection][清理连接失败]", e); } } - /** - * 发送响应消息 - * - * @param socket 网络连接 - * @param success 是否成功 - * @param message 消息 - * @param requestId 请求 ID - * @param codecType 消息编解码类型 - */ - private void sendResponse(NetSocket socket, boolean success, String message, String requestId, String codecType) { - try { - Object responseData = MapUtil.builder() - .put("success", success) - .put("message", message) - .build(); - - int code = success ? 0 : 401; - IotDeviceMessage responseMessage = IotDeviceMessage.replyOf(requestId, AUTH_METHOD, responseData, - code, message); - - byte[] encodedData = deviceMessageService.encodeDeviceMessage(responseMessage, codecType); - socket.write(Buffer.buffer(encodedData)); - - } catch (Exception e) { - log.error("[sendResponse][发送响应失败,requestId: {}]", requestId, e); - } - } - - /** - * 验证设备认证信息 - * - * @param authParams 认证参数 - * @return 是否认证成功 - */ - private boolean validateDeviceAuth(IotDeviceAuthReqDTO authParams) { - try { - CommonResult result = deviceApi.authDevice(new IotDeviceAuthReqDTO() - .setClientId(authParams.getClientId()).setUsername(authParams.getUsername()) - .setPassword(authParams.getPassword())); - result.checkError(); - return BooleanUtil.isTrue(result.getData()); - } catch (Exception e) { - log.error("[validateDeviceAuth][设备认证异常,username: {}]", authParams.getUsername(), e); - return false; - } - } - - /** - * 发送错误响应 - * - * @param socket 网络连接 - * @param requestId 请求 ID - * @param errorMessage 错误消息 - * @param codecType 消息编解码类型 - */ - private void sendErrorResponse(NetSocket socket, String requestId, String errorMessage, String codecType) { - sendResponse(socket, false, errorMessage, requestId, codecType); - } - - /** - * 发送成功响应 - * - * @param socket 网络连接 - * @param requestId 请求 ID - * @param message 消息 - * @param codecType 消息编解码类型 - */ - @SuppressWarnings("SameParameterValue") - private void sendSuccessResponse(NetSocket socket, String requestId, String message, String codecType) { - sendResponse(socket, true, message, requestId, codecType); - } - - /** - * 解析认证参数 - * - * @param params 参数对象(通常为 Map 类型) - * @return 认证参数 DTO,解析失败时返回 null - */ - @SuppressWarnings("unchecked") - private IotDeviceAuthReqDTO parseAuthParams(Object params) { - if (params == null) { - return null; - } - - try { - // 参数默认为 Map 类型,直接转换 - if (params instanceof java.util.Map) { - java.util.Map paramMap = (java.util.Map) params; - return new IotDeviceAuthReqDTO() - .setClientId(MapUtil.getStr(paramMap, "clientId")) - .setUsername(MapUtil.getStr(paramMap, "username")) - .setPassword(MapUtil.getStr(paramMap, "password")); - } - - // 如果已经是目标类型,直接返回 - if (params instanceof IotDeviceAuthReqDTO) { - return (IotDeviceAuthReqDTO) params; - } - - // 其他情况尝试 JSON 转换 - String jsonStr = JsonUtils.toJsonString(params); - return JsonUtils.parseObject(jsonStr, IotDeviceAuthReqDTO.class); - } catch (Exception e) { - log.error("[parseAuthParams][解析认证参数({})失败]", params, e); - return null; - } - } - -} \ No newline at end of file +}