diff --git a/viewsh-module-iot/viewsh-module-iot-core/src/main/java/com/viewsh/module/iot/core/enums/IotDeviceMessageMethodEnum.java b/viewsh-module-iot/viewsh-module-iot-core/src/main/java/com/viewsh/module/iot/core/enums/IotDeviceMessageMethodEnum.java index 57991078..3bfa15c2 100644 --- a/viewsh-module-iot/viewsh-module-iot-core/src/main/java/com/viewsh/module/iot/core/enums/IotDeviceMessageMethodEnum.java +++ b/viewsh-module-iot/viewsh-module-iot-core/src/main/java/com/viewsh/module/iot/core/enums/IotDeviceMessageMethodEnum.java @@ -1,86 +1,86 @@ -package com.viewsh.module.iot.core.enums; - -import cn.hutool.core.util.ArrayUtil; -import com.viewsh.framework.common.core.ArrayValuable; -import com.viewsh.framework.common.util.collection.SetUtils; -import lombok.AllArgsConstructor; -import lombok.Getter; - -import java.util.Arrays; -import java.util.Set; - -/** - * IoT 设备消息的方法枚举 - * - * @author haohao - */ -@Getter -@AllArgsConstructor -public enum IotDeviceMessageMethodEnum implements ArrayValuable { - - // ========== 设备状态 ========== - - STATE_UPDATE("thing.state.update", "设备状态更新", true), - - // TODO 芋艿:要不要加个 ping 消息; - - // ========== 设备属性 ========== - // 可参考:https://help.aliyun.com/zh/iot/user-guide/device-properties-events-and-services - - PROPERTY_POST("thing.property.post", "属性上报", true), - PROPERTY_SET("thing.property.set", "属性设置", false), - - // ========== 设备事件 ========== - // 可参考:https://help.aliyun.com/zh/iot/user-guide/device-properties-events-and-services - - EVENT_POST("thing.event.post", "事件上报", true), - - // ========== 设备服务调用 ========== - // 可参考:https://help.aliyun.com/zh/iot/user-guide/device-properties-events-and-services - - SERVICE_INVOKE("thing.service.invoke", "服务调用", false), - - // ========== 设备配置 ========== - // 可参考:https://help.aliyun.com/zh/iot/user-guide/remote-configuration-1 - - CONFIG_PUSH("thing.config.push", "配置推送", false), - - // ========== OTA 固件 ========== - // 可参考:https://help.aliyun.com/zh/iot/user-guide/perform-ota-updates - - OTA_UPGRADE("thing.ota.upgrade", "OTA 固定信息推送", false), - OTA_PROGRESS("thing.ota.progress", "OTA 升级进度上报", true), - ; - - public static final String[] ARRAYS = Arrays.stream(values()).map(IotDeviceMessageMethodEnum::getMethod) - .toArray(String[]::new); - - /** - * 不进行 reply 回复的方法集合 - */ - public static final Set REPLY_DISABLED = SetUtils.asSet( - STATE_UPDATE.getMethod(), - OTA_PROGRESS.getMethod() // 参考阿里云,OTA 升级进度上报,不进行回复 - ); - - private final String method; - - private final String name; - - private final Boolean upstream; - - @Override - public String[] array() { - return ARRAYS; - } - - public static IotDeviceMessageMethodEnum of(String method) { - return ArrayUtil.firstMatch(item -> item.getMethod().equals(method), - IotDeviceMessageMethodEnum.values()); - } - - public static boolean isReplyDisabled(String method) { - return REPLY_DISABLED.contains(method); - } - -} +package com.viewsh.module.iot.core.enums; + +import cn.hutool.core.util.ArrayUtil; +import com.viewsh.framework.common.core.ArrayValuable; +import com.viewsh.framework.common.util.collection.SetUtils; +import lombok.AllArgsConstructor; +import lombok.Getter; + +import java.util.Arrays; +import java.util.Set; + +/** + * IoT 设备消息的方法枚举 + * + * @author haohao + */ +@Getter +@AllArgsConstructor +public enum IotDeviceMessageMethodEnum implements ArrayValuable { + + // ========== 设备状态 ========== + + STATE_UPDATE("thing.state.update", "设备状态更新", true), + + // TODO 芋艿:要不要加个 ping 消息; + + // ========== 设备属性 ========== + // 可参考:https://help.aliyun.com/zh/iot/user-guide/device-properties-events-and-services + + PROPERTY_POST("thing.property.post", "属性上报", true), + PROPERTY_SET("thing.property.set", "属性设置", false), + + // ========== 设备事件 ========== + // 可参考:https://help.aliyun.com/zh/iot/user-guide/device-properties-events-and-services + + EVENT_POST("thing.event.post", "事件上报", true), + + // ========== 设备服务调用 ========== + // 可参考:https://help.aliyun.com/zh/iot/user-guide/device-properties-events-and-services + + SERVICE_INVOKE("thing.service.invoke", "服务调用", false), + + // ========== 设备配置 ========== + // 可参考:https://help.aliyun.com/zh/iot/user-guide/remote-configuration-1 + + CONFIG_PUSH("thing.config.push", "配置推送", false), + + // ========== OTA 固件 ========== + // 可参考:https://help.aliyun.com/zh/iot/user-guide/perform-ota-updates + + OTA_UPGRADE("thing.ota.upgrade", "OTA 固定信息推送", false), + OTA_PROGRESS("thing.ota.progress", "OTA 升级进度上报", true), + ; + + public static final String[] ARRAYS = Arrays.stream(values()).map(IotDeviceMessageMethodEnum::getMethod) + .toArray(String[]::new); + + /** + * 不进行 reply 回复的方法集合 + */ + public static final Set REPLY_DISABLED = SetUtils.asSet( + STATE_UPDATE.getMethod(), + OTA_PROGRESS.getMethod() // 参考阿里云,OTA 升级进度上报,不进行回复 + ); + + private final String method; + + private final String name; + + private final Boolean upstream; + + @Override + public String[] array() { + return ARRAYS; + } + + public static IotDeviceMessageMethodEnum of(String method) { + return ArrayUtil.firstMatch(item -> item.getMethod().equals(method), + IotDeviceMessageMethodEnum.values()); + } + + public static boolean isReplyDisabled(String method) { + return REPLY_DISABLED.contains(method); + } + +} diff --git a/viewsh-module-iot/viewsh-module-iot-gateway/src/main/java/com/viewsh/module/iot/gateway/codec/jt808/IotJt808DeviceMessageCodec.java b/viewsh-module-iot/viewsh-module-iot-gateway/src/main/java/com/viewsh/module/iot/gateway/codec/jt808/IotJt808DeviceMessageCodec.java index e72b1f6d..31bf9db1 100644 --- a/viewsh-module-iot/viewsh-module-iot-gateway/src/main/java/com/viewsh/module/iot/gateway/codec/jt808/IotJt808DeviceMessageCodec.java +++ b/viewsh-module-iot/viewsh-module-iot-gateway/src/main/java/com/viewsh/module/iot/gateway/codec/jt808/IotJt808DeviceMessageCodec.java @@ -327,8 +327,8 @@ public class IotJt808DeviceMessageCodec implements IotDeviceMessageCodec { Map result = new HashMap<>(); try { // 解析注册消息体 - com.viewsh.module.iot.gateway.codec.jt808.entity.Jt808RegisterInfo registerInfo = - decoder.toRegisterMsg(dataPack); + Jt808RegisterInfo registerInfo = + decoder.toRegisterMsg(dataPack); result.put("provinceId", registerInfo.getProvinceId()); result.put("cityId", registerInfo.getCityId()); @@ -538,7 +538,7 @@ public class IotJt808DeviceMessageCodec implements IotDeviceMessageCodec { if (paramId != null) { jt808Params.put(paramId, value); log.debug("[mapPropertiesToJt808Params][属性映射] {} -> 0x{} = {}", - identifier, Integer.toHexString(paramId), value); + identifier, Integer.toHexString(paramId), value); } else { log.warn("[mapPropertiesToJt808Params][未知的属性标识符: {}]", identifier); } @@ -596,7 +596,7 @@ public class IotJt808DeviceMessageCodec implements IotDeviceMessageCodec { // 4. 如果都获取不到,抛出异常 log.error("[extractPhoneNumber][无法提取终端手机号,params: {}]", params); throw new IllegalArgumentException( - "消息中缺少终端手机号。请确保设备的 deviceName 为终端手机号(纯数字),例如: \"13800138000\""); + "消息中缺少终端手机号。请确保设备的 deviceName 为终端手机号(纯数字),例如: \"13800138000\""); } /** @@ -671,5 +671,4 @@ public class IotJt808DeviceMessageCodec implements IotDeviceMessageCodec { && data[data.length - 1] == (byte) Jt808Constants.PKG_DELIMITER; } -} - +} \ No newline at end of file diff --git a/viewsh-module-iot/viewsh-module-iot-gateway/src/main/java/com/viewsh/module/iot/gateway/protocol/tcp/handler/Jt808ProtocolHandler.java b/viewsh-module-iot/viewsh-module-iot-gateway/src/main/java/com/viewsh/module/iot/gateway/protocol/tcp/handler/Jt808ProtocolHandler.java index 0694936a..c1da7396 100644 --- a/viewsh-module-iot/viewsh-module-iot-gateway/src/main/java/com/viewsh/module/iot/gateway/protocol/tcp/handler/Jt808ProtocolHandler.java +++ b/viewsh-module-iot/viewsh-module-iot-gateway/src/main/java/com/viewsh/module/iot/gateway/protocol/tcp/handler/Jt808ProtocolHandler.java @@ -7,6 +7,7 @@ import com.viewsh.framework.common.pojo.CommonResult; import com.viewsh.module.iot.core.biz.IotDeviceCommonApi; import com.viewsh.module.iot.core.biz.dto.IotDeviceGetReqDTO; import com.viewsh.module.iot.core.biz.dto.IotDeviceRespDTO; +import com.viewsh.module.iot.core.enums.IotDeviceMessageMethodEnum; import com.viewsh.module.iot.core.mq.message.IotDeviceMessage; import com.viewsh.module.iot.gateway.codec.jt808.IotJt808DeviceMessageCodec; import com.viewsh.module.iot.gateway.service.device.message.IotDeviceMessageService; @@ -106,12 +107,37 @@ public class Jt808ProtocolHandler extends AbstractProtocolHandler { String codecType, NetSocket socket, String productKey, String deviceName, String serverId) { try { - // 发送消息到消息总线 + // 1. 发送消息到消息总线 deviceMessageService.sendDeviceMessage(message, productKey, deviceName, serverId); log.info("[handleBusinessMessage][JT808 业务消息已发送,clientId: {}, method: {}, messageId: {}]", clientId, message.getMethod(), message.getId()); + // 2. 发送平台通用应答 (0x8001) + // 根据 JT808 协议规范,平台需要对终端上报的业务消息进行应答,包括: + // - 0x0200 位置信息汇报 + // - 0x0002 终端心跳 + // - 0x0704 批量位置上报 + // - 等其他业务消息 + if (!IotDeviceMessageMethodEnum.isReplyDisabled(message.getMethod())) { + // 提取终端手机号、流水号和原始消息ID + String terminalPhone = extractTerminalPhone(message); + int flowId = extractFlowId(message); + int msgId = extractMessageId(message); + + if (terminalPhone != null && msgId != 0) { + // 发送通用应答,结果码为 0(成功/确认) + sendCommonResp(socket, terminalPhone, flowId, msgId, + (byte) 0, codecType, message.getRequestId()); + + log.debug("[handleBusinessMessage][已发送通用应答,msgId: 0x{}, flowId: {}, phone: {}]", + Integer.toHexString(msgId), flowId, terminalPhone); + } else { + log.warn("[handleBusinessMessage][无法发送应答,缺少必要参数,phone: {}, msgId: {}]", + terminalPhone, msgId); + } + } + } catch (Exception e) { log.error("[handleBusinessMessage][JT808 业务消息处理异常,clientId: {}]", clientId, e); } @@ -365,12 +391,11 @@ public class Jt808ProtocolHandler extends AbstractProtocolHandler { .put("flowId", flowId) .build(); - // 构建响应消息(使用 of 方法确保 params 正确传递给编码器) - IotDeviceMessage responseMessage = IotDeviceMessage.of( + // 构建响应消息 + IotDeviceMessage responseMessage = IotDeviceMessage.replyOf( requestId, "jt808.platform.registerResp", - params, // params 必须在第3个参数位置 - null, // data + params, replyCode == 0 ? 0 : 401, replyCode == 0 ? "注册成功" : "注册失败"); @@ -416,12 +441,11 @@ public class Jt808ProtocolHandler extends AbstractProtocolHandler { .put("flowId", flowId) .build(); - // 构建响应消息(使用 of 方法确保 params 正确传递给编码器) - IotDeviceMessage responseMessage = IotDeviceMessage.of( + // 构建响应消息 + IotDeviceMessage responseMessage = IotDeviceMessage.replyOf( requestId, "jt808.platform.commonResp", - params, // params 必须在第3个参数位置 - null, // data + params, replyCode == 0 ? 0 : 401, replyCode == 0 ? "成功" : "失败"); @@ -507,6 +531,40 @@ public class Jt808ProtocolHandler extends AbstractProtocolHandler { return null; } + /** + * 从消息中提取原始 JT808 消息ID + *

+ * 消息ID存储在消息的 _metadata.jt808MsgId 字段中, + * 由 IotJt808DeviceMessageCodec 解码时自动填充,格式为十六进制字符串(如 "0x0200") + * + * @param message 设备消息 + * @return 消息ID(十进制整数),提取失败返回 0 + */ + @SuppressWarnings("unchecked") + private int extractMessageId(IotDeviceMessage message) { + if (message.getParams() instanceof Map) { + Map params = (Map) message.getParams(); + if (params.get("_metadata") instanceof Map) { + Map metadata = (Map) params.get("_metadata"); + Object jt808MsgId = metadata.get("jt808MsgId"); + if (jt808MsgId != null) { + String msgIdStr = jt808MsgId.toString(); + try { + // 解析十六进制字符串,如 "0x0200" -> 512 + if (msgIdStr.startsWith("0x") || msgIdStr.startsWith("0X")) { + return Integer.parseInt(msgIdStr.substring(2), 16); + } + // 如果不是十六进制格式,尝试直接解析为整数 + return Integer.parseInt(msgIdStr); + } catch (NumberFormatException e) { + log.warn("[extractMessageId][无法解析消息ID: {}]", msgIdStr); + } + } + } + } + return 0; + } + /** * 通过 deviceName 查找设备 *

diff --git a/viewsh-module-iot/viewsh-module-iot-gateway/src/main/java/com/viewsh/module/iot/gateway/protocol/tcp/router/IotTcpUpstreamHandler.java b/viewsh-module-iot/viewsh-module-iot-gateway/src/main/java/com/viewsh/module/iot/gateway/protocol/tcp/router/IotTcpUpstreamHandler.java index 39656e15..0d3b89f8 100644 --- a/viewsh-module-iot/viewsh-module-iot-gateway/src/main/java/com/viewsh/module/iot/gateway/protocol/tcp/router/IotTcpUpstreamHandler.java +++ b/viewsh-module-iot/viewsh-module-iot-gateway/src/main/java/com/viewsh/module/iot/gateway/protocol/tcp/router/IotTcpUpstreamHandler.java @@ -51,9 +51,9 @@ public class IotTcpUpstreamHandler implements Handler { private final String serverId; public IotTcpUpstreamHandler(IotTcpUpstreamProtocol protocol, - IotDeviceMessageService deviceMessageService, - IotTcpConnectionManager connectionManager, - List protocolHandlers) { + IotDeviceMessageService deviceMessageService, + IotTcpConnectionManager connectionManager, + List protocolHandlers) { this.deviceMessageService = deviceMessageService; this.connectionManager = connectionManager; this.protocolHandlers = protocolHandlers; @@ -112,12 +112,6 @@ public class IotTcpUpstreamHandler implements Handler { // 2. 获取消息格式类型 String codecType = getMessageCodecType(buffer, socket); - if (codecType == null) { - log.warn("[processMessage][未知消息格式,断开连接,clientId: {},数据开头: {}]", - clientId, buffer.length() > 20 ? buffer.getString(0, 20) : buffer.toString()); - socket.close(); - return; - } // 3. 解码消息 IotDeviceMessage message; @@ -139,50 +133,13 @@ public class IotTcpUpstreamHandler implements Handler { return; } - // 5. 判断是否为认证消息 - if (isAuthenticationMessage(message)) { - handleAuthenticationWithProtocol(clientId, message, codecType, socket, handler); - } else { - // 针对 JT808 协议的特殊处理:如果是位置上报(thing.property.post)且包含身份信息(手机号),尝试自动注册 - // 这可以解决设备重启后未注册就发送数据的问题,或者简化认证流程 - if (CODEC_TYPE_JT808.equals(codecType)) { - tryAutoRegisterJt808(clientId, message, codecType, socket, handler); - } else { - handleBusinessWithProtocol(clientId, message, codecType, socket, handler); - } - } - } - - /** - * 尝试自动注册 JT808 设备 - *

- * JT808 协议中,消息头包含终端手机号。如果设备未认证但发送了有效数据, - * 我们可以尝试提取手机号并进行隐式认证,而不是直接断开连接。 - */ - private void tryAutoRegisterJt808(String clientId, IotDeviceMessage message, - String codecType, NetSocket socket, - ProtocolHandler handler) { - // 如果已认证,直接处理业务 - if (!connectionManager.isNotAuthenticated(socket)) { - handleBusinessWithProtocol(clientId, message, codecType, socket, handler); - return; - } - - log.info("[tryAutoRegisterJt808][尝试自动认证 JT808 设备,clientId: {}]", clientId); - - // 使用认证逻辑处理(JT808 协议处理器会提取手机号进行认证) - // 这里把业务消息当作认证消息处理,如果协议处理器支持从业务消息提取身份,就能成功 - // 注意:这依赖于 Jt808ProtocolHandler 的 handleAuthentication 实现能够处理非注册消息 - handleAuthenticationWithProtocol(clientId, message, codecType, socket, handler); - - // 如果认证成功(handleAuthenticationWithProtocol 内部会注册连接),则继续处理业务消息 - if (!connectionManager.isNotAuthenticated(socket)) { - log.info("[tryAutoRegisterJt808][自动认证成功,继续处理业务消息,clientId: {}]", clientId); - handleBusinessWithProtocol(clientId, message, codecType, socket, handler); - } else { - // 认证依然失败,提示未认证(日志已在 handleAuthenticationWithProtocol 中打印) - } - } + // 5. 判断是否为认证消息 + if (isAuthenticationMessage(message)) { + handleAuthenticationWithProtocol(clientId, message, codecType, socket, handler); + } else { + handleBusinessWithProtocol(clientId, message, codecType, socket, handler); + } + } /** * 使用协议处理器处理认证消息 @@ -199,8 +156,8 @@ public class IotTcpUpstreamHandler implements Handler { * @param handler 协议处理器 */ private void handleAuthenticationWithProtocol(String clientId, IotDeviceMessage message, - String codecType, NetSocket socket, - ProtocolHandler handler) { + String codecType, NetSocket socket, + ProtocolHandler handler) { try { // 委托给协议处理器 AuthResult result = handler.handleAuthentication(clientId, message, codecType, socket); @@ -245,8 +202,8 @@ public class IotTcpUpstreamHandler implements Handler { * @param handler 协议处理器 */ private void handleBusinessWithProtocol(String clientId, IotDeviceMessage message, - String codecType, NetSocket socket, - ProtocolHandler handler) { + String codecType, NetSocket socket, + ProtocolHandler handler) { try { // 1. 检查认证状态 if (connectionManager.isNotAuthenticated(socket)) { @@ -270,7 +227,8 @@ public class IotTcpUpstreamHandler implements Handler { socket, connectionInfo.getProductKey(), connectionInfo.getDeviceName(), - serverId); + serverId + ); } catch (Exception e) { log.error("[handleBusinessMessage][业务消息处理异常,clientId: {}]", clientId, e); @@ -283,9 +241,9 @@ public class IotTcpUpstreamHandler implements Handler { * 检测优先级: * 1. 如果已认证,使用缓存的编解码类型 * 2. 未认证时,通过消息格式自动检测: - * - JT808:首尾标识符 0x7e - * - Binary:魔术字 0x7E - * - JSON:默认 + * - JT808:首尾标识符 0x7e + * - Binary:魔术字 0x7E + * - JSON:默认 * * @param buffer 消息 * @param socket 网络连接 @@ -373,7 +331,7 @@ public class IotTcpUpstreamHandler implements Handler { * @param codecType 消息编解码类型 */ private void registerConnection(NetSocket socket, IotDeviceRespDTO device, - String clientId, String codecType) { + String clientId, String codecType) { IotTcpConnectionManager.ConnectionInfo connectionInfo = new IotTcpConnectionManager.ConnectionInfo() .setDeviceId(device.getId()) .setProductKey(device.getProductKey())