Compare commits

4 Commits

Author SHA1 Message Date
lzh
4e387e410c refactor(iot-gateway): JT808编解码器代码重构
Some checks failed
Java CI with Maven / build (11) (push) Has been cancelled
Java CI with Maven / build (17) (push) Has been cancelled
Java CI with Maven / build (8) (push) Has been cancelled
- 提取 enrichParamsWithMetadata 方法统一处理元数据
- 提取 extractPhoneFromMap 和 extractFlowIdFromMap 方法
- 优化 getParamsAsMap 方法,同时支持 params 和 data 字段
- 优化代码注释格式

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-21 22:15:12 +08:00
lzh
110cfc8cf8 fix(iot-server): 消费者优先判断skipReply字段
- handleUpstreamDeviceMessage 优先检查 skipReply 字段
- 协议层已处理回复的消息跳过业务层回复逻辑

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-21 22:12:56 +08:00
lzh
0c2cd85915 fix(iot-gateway): JT808协议使用skipReply标记协议层已应答
- Jt808ProtocolHandler 使用 setSkipReply(true) 替代 setCode(0)
- 避免业务层重复发送回复消息

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-21 22:12:03 +08:00
lzh
529171aca6 feat(iot-core): 添加skipReply字段支持协议层回复标记
- IotDeviceMessage 新增 skipReply 字段,用于标记协议层已处理回复
- IotDeviceMessageUtils 新增 isSkipReply() 方法
- isUpstreamMessage() 支持 JT808 协议方法判断
  - jt808.terminal.* 为上行消息
  - jt808.platform.* 为下行消息

解决 JT808 等协议在协议层已发送应答时,业务层无需再回复��场景

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-21 22:10:29 +08:00
5 changed files with 527 additions and 406 deletions

View File

@@ -1,151 +1,160 @@
package com.viewsh.module.iot.core.mq.message;
import cn.hutool.core.map.MapUtil;
import com.viewsh.framework.common.exception.enums.GlobalErrorCodeConstants;
import com.viewsh.module.iot.core.enums.IotDeviceMessageMethodEnum;
import com.viewsh.module.iot.core.enums.IotDeviceStateEnum;
import com.viewsh.module.iot.core.util.IotDeviceMessageUtils;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.time.LocalDateTime;
/**
* IoT 设备消息
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
@Builder
public class IotDeviceMessage {
/**
* 【消息总线】应用的设备消息 Topic由 iot-gateway 发给 iot-biz 进行消费
*/
public static final String MESSAGE_BUS_DEVICE_MESSAGE_TOPIC = "iot_device_message";
/**
* 【消息总线】设备消息 Topic由 iot-biz 发送给 iot-gateway 的某个 "server"(protocol) 进行消费
*
* 其中,%s 就是该"server"(protocol) 的标识
*/
public static final String MESSAGE_BUS_GATEWAY_DEVICE_MESSAGE_TOPIC = MESSAGE_BUS_DEVICE_MESSAGE_TOPIC + "_%s";
/**
* 消息编号
*
* 由后端生成,通过 {@link IotDeviceMessageUtils#generateMessageId()}
*/
private String id;
/**
* 上报时间
*
* 由后端生成,当前时间
*/
private LocalDateTime reportTime;
/**
* 设备编号
*/
private Long deviceId;
/**
* 租户编号
*/
private Long tenantId;
/**
* 服务编号,该消息由哪个 server 发送
*/
private String serverId;
// ========== codec编解码字段 ==========
/**
* 请求编号
*
* 由设备生成,对应阿里云 IoT 的 Alink 协议中的 id、华为云 IoTDA 协议的 request_id
*/
private String requestId;
/**
* 请求方法
*
* 枚举 {@link IotDeviceMessageMethodEnum}
* 例如说thing.property.report 属性上报
*/
private String method;
/**
* 请求参数
*
* 例如说:属性上报的 properties、事件上报的 params
*/
private Object params;
/**
* 响应结果
*/
private Object data;
/**
* 响应错误码
*/
private Integer code;
/**
* 返回结果信息
*/
private String msg;
// ========== 基础方法:只传递"codec编解码字段" ==========
public static IotDeviceMessage requestOf(String method) {
return requestOf(null, method, null);
}
public static IotDeviceMessage requestOf(String method, Object params) {
return requestOf(null, method, params);
}
public static IotDeviceMessage requestOf(String requestId, String method, Object params) {
return of(requestId, method, params, null, null, null);
}
public static IotDeviceMessage replyOf(String requestId, String method,
Object data, Integer code, String msg) {
if (code == null) {
code = GlobalErrorCodeConstants.SUCCESS.getCode();
msg = GlobalErrorCodeConstants.SUCCESS.getMsg();
}
return of(requestId, method, null, data, code, msg);
}
public static IotDeviceMessage of(String requestId, String method,
Object params, Object data, Integer code, String msg) {
// 通用参数
IotDeviceMessage message = new IotDeviceMessage()
.setId(IotDeviceMessageUtils.generateMessageId()).setReportTime(LocalDateTime.now());
// 当前参数
message.setRequestId(requestId).setMethod(method).setParams(params)
.setData(data).setCode(code).setMsg(msg);
return message;
}
// ========== 核心方法:在 of 基础方法之上,添加对应 method ==========
public static IotDeviceMessage buildStateUpdateOnline() {
return requestOf(IotDeviceMessageMethodEnum.STATE_UPDATE.getMethod(),
MapUtil.of("state", IotDeviceStateEnum.ONLINE.getState()));
}
public static IotDeviceMessage buildStateOffline() {
return requestOf(IotDeviceMessageMethodEnum.STATE_UPDATE.getMethod(),
MapUtil.of("state", IotDeviceStateEnum.OFFLINE.getState()));
}
public static IotDeviceMessage buildOtaUpgrade(String version, String fileUrl, Long fileSize,
String fileDigestAlgorithm, String fileDigestValue) {
return requestOf(IotDeviceMessageMethodEnum.OTA_UPGRADE.getMethod(), MapUtil.builder()
.put("version", version).put("fileUrl", fileUrl).put("fileSize", fileSize)
.put("fileDigestAlgorithm", fileDigestAlgorithm).put("fileDigestValue", fileDigestValue)
.build());
}
}
package com.viewsh.module.iot.core.mq.message;
import cn.hutool.core.map.MapUtil;
import com.viewsh.framework.common.exception.enums.GlobalErrorCodeConstants;
import com.viewsh.module.iot.core.enums.IotDeviceMessageMethodEnum;
import com.viewsh.module.iot.core.enums.IotDeviceStateEnum;
import com.viewsh.module.iot.core.util.IotDeviceMessageUtils;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.time.LocalDateTime;
/**
* IoT 设备消息
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
@Builder
public class IotDeviceMessage {
/**
* 【消息总线】应用的设备消息 Topic由 iot-gateway 发给 iot-biz 进行消费
*/
public static final String MESSAGE_BUS_DEVICE_MESSAGE_TOPIC = "iot_device_message";
/**
* 【消息总线】设备消息 Topic由 iot-biz 发送给 iot-gateway 的某个 "server"(protocol) 进行消费
*
* 其中,%s 就是该"server"(protocol) 的标识
*/
public static final String MESSAGE_BUS_GATEWAY_DEVICE_MESSAGE_TOPIC = MESSAGE_BUS_DEVICE_MESSAGE_TOPIC + "_%s";
/**
* 消息编号
*
* 由后端生成,通过 {@link IotDeviceMessageUtils#generateMessageId()}
*/
private String id;
/**
* 上报时间
*
* 由后端生成,当前时间
*/
private LocalDateTime reportTime;
/**
* 设备编号
*/
private Long deviceId;
/**
* 租户编号
*/
private Long tenantId;
/**
* 服务编号,该消息由哪个 server 发送
*/
private String serverId;
// ========== codec编解码字段 ==========
/**
* 请求编号
*
* 由设备生成,对应阿里云 IoT 的 Alink 协议中的 id、华为云 IoTDA 协议的 request_id
*/
private String requestId;
/**
* 请求方法
*
* 枚举 {@link IotDeviceMessageMethodEnum}
* 例如说thing.property.report 属性上报
*/
private String method;
/**
* 请求参数
*
* 例如说:属性上报的 properties、事件上报的 params
*/
private Object params;
/**
* 响应结果
*/
private Object data;
/**
* 响应错误码
*/
private Integer code;
/**
* 是否跳过业务层回复
* <p>
* true = 协议层已处理回复(如 JT808 通用应答),业务层无需再回复
* false 或 null = 需要业务层正常处理回复逻辑
*/
private Boolean skipReply;
/**
* 返回结果信息
*/
private String msg;
// ========== 基础方法:只传递"codec编解码字段" ==========
public static IotDeviceMessage requestOf(String method) {
return requestOf(null, method, null);
}
public static IotDeviceMessage requestOf(String method, Object params) {
return requestOf(null, method, params);
}
public static IotDeviceMessage requestOf(String requestId, String method, Object params) {
return of(requestId, method, params, null, null, null);
}
public static IotDeviceMessage replyOf(String requestId, String method,
Object data, Integer code, String msg) {
if (code == null) {
code = GlobalErrorCodeConstants.SUCCESS.getCode();
msg = GlobalErrorCodeConstants.SUCCESS.getMsg();
}
return of(requestId, method, null, data, code, msg);
}
public static IotDeviceMessage of(String requestId, String method,
Object params, Object data, Integer code, String msg) {
// 通用参数
IotDeviceMessage message = new IotDeviceMessage()
.setId(IotDeviceMessageUtils.generateMessageId()).setReportTime(LocalDateTime.now());
// 当前参数
message.setRequestId(requestId).setMethod(method).setParams(params)
.setData(data).setCode(code).setMsg(msg);
return message;
}
// ========== 核心方法:在 of 基础方法之上,添加对应 method ==========
public static IotDeviceMessage buildStateUpdateOnline() {
return requestOf(IotDeviceMessageMethodEnum.STATE_UPDATE.getMethod(),
MapUtil.of("state", IotDeviceStateEnum.ONLINE.getState()));
}
public static IotDeviceMessage buildStateOffline() {
return requestOf(IotDeviceMessageMethodEnum.STATE_UPDATE.getMethod(),
MapUtil.of("state", IotDeviceStateEnum.OFFLINE.getState()));
}
public static IotDeviceMessage buildOtaUpgrade(String version, String fileUrl, Long fileSize,
String fileDigestAlgorithm, String fileDigestValue) {
return requestOf(IotDeviceMessageMethodEnum.OTA_UPGRADE.getMethod(), MapUtil.builder()
.put("version", version).put("fileUrl", fileUrl).put("fileSize", fileSize)
.put("fileDigestAlgorithm", fileDigestAlgorithm).put("fileDigestValue", fileDigestValue)
.build());
}
}

View File

@@ -1,167 +1,192 @@
package com.viewsh.module.iot.core.util;
import cn.hutool.core.lang.Assert;
import cn.hutool.core.map.MapUtil;
import cn.hutool.core.util.IdUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.system.SystemUtil;
import com.viewsh.module.iot.core.enums.IotDeviceMessageMethodEnum;
import com.viewsh.module.iot.core.mq.message.IotDeviceMessage;
import java.util.Map;
/**
* IoT 设备【消息】的工具类
*
* @author 芋道源码
*/
public class IotDeviceMessageUtils {
// ========== Message 相关 ==========
public static String generateMessageId() {
return IdUtil.fastSimpleUUID();
}
/**
* 是否是上行消息:由设备发送
*
* @param message 消息
* @return 是否
*/
@SuppressWarnings("SimplifiableConditionalExpression")
public static boolean isUpstreamMessage(IotDeviceMessage message) {
IotDeviceMessageMethodEnum methodEnum = IotDeviceMessageMethodEnum.of(message.getMethod());
Assert.notNull(methodEnum, "无法识别的消息方法:" + message.getMethod());
// 注意:回复消息时,需要取反
return !isReplyMessage(message) ? methodEnum.getUpstream() : !methodEnum.getUpstream();
}
/**
* 是否是回复消息,通过 {@link IotDeviceMessage#getCode()} 非空进行识别
*
* @param message 消息
* @return 是否
*/
public static boolean isReplyMessage(IotDeviceMessage message) {
return message.getCode() != null;
}
/**
* 提取消息中的标识符
*
* @param message 消息
* @return 标识符
*/
@SuppressWarnings("unchecked")
public static String getIdentifier(IotDeviceMessage message) {
if (message.getParams() == null) {
return null;
}
if (StrUtil.equalsAny(message.getMethod(), IotDeviceMessageMethodEnum.EVENT_POST.getMethod(),
IotDeviceMessageMethodEnum.SERVICE_INVOKE.getMethod())) {
Map<String, Object> params = (Map<String, Object>) message.getParams();
return MapUtil.getStr(params, "identifier");
} else if (StrUtil.equalsAny(message.getMethod(), IotDeviceMessageMethodEnum.STATE_UPDATE.getMethod())) {
Map<String, Object> params = (Map<String, Object>) message.getParams();
return MapUtil.getStr(params, "state");
}
return null;
}
/**
* 从设备消息中提取指定标识符的属性值
* - 支持多种消息格式和属性值提取策略
* - 兼容现有的消息结构
* - 提供统一的属性值提取接口
* <p>
* 支持的提取策略(按优先级顺序):
* 1. 直接值:如果 params 不是 Map直接返回该值适用于简单消息
* 2. 标识符字段:从 params[identifier] 获取
* 3. properties 结构:从 params.properties[identifier] 获取(标准属性上报)
* 4. data 结构:从 params.data[identifier] 获取
* 5. value 字段:从 params.value 获取(单值消息)
* 6. 单值 Map如果 Map 只包含 identifier 和一个值,返回该值
*
* @param message 设备消息
* @param identifier 属性标识符
* @return 属性值,如果未找到则返回 null
*/
@SuppressWarnings("unchecked")
public static Object extractPropertyValue(IotDeviceMessage message, String identifier) {
Object params = message.getParams();
if (params == null) {
return null;
}
// 策略1如果 params 不是 Map直接返回该值适用于简单的单属性消息
if (!(params instanceof Map)) {
return params;
}
Map<String, Object> paramsMap = (Map<String, Object>) params;
// 策略2直接通过标识符获取属性值
Object directValue = paramsMap.get(identifier);
if (directValue != null) {
return directValue;
}
// 策略3从 properties 字段中获取(适用于标准属性上报消息)
Object properties = paramsMap.get("properties");
if (properties instanceof Map) {
Map<String, Object> propertiesMap = (Map<String, Object>) properties;
Object propertyValue = propertiesMap.get(identifier);
if (propertyValue != null) {
return propertyValue;
}
}
// 策略4从 data 字段中获取(适用于某些消息格式)
Object data = paramsMap.get("data");
if (data instanceof Map) {
Map<String, Object> dataMap = (Map<String, Object>) data;
Object dataValue = dataMap.get(identifier);
if (dataValue != null) {
return dataValue;
}
}
// 策略5从 value 字段中获取(适用于单值消息)
Object value = paramsMap.get("value");
if (value != null) {
return value;
}
// 策略6如果 Map 只有两个字段且包含 identifier返回另一个字段的值
if (paramsMap.size() == 2 && paramsMap.containsKey("identifier")) {
for (Map.Entry<String, Object> entry : paramsMap.entrySet()) {
if (!"identifier".equals(entry.getKey())) {
return entry.getValue();
}
}
}
// 未找到对应的属性值
return null;
}
// ========== Topic 相关 ==========
public static String buildMessageBusGatewayDeviceMessageTopic(String serverId) {
return String.format(IotDeviceMessage.MESSAGE_BUS_GATEWAY_DEVICE_MESSAGE_TOPIC, serverId);
}
/**
* 生成服务器编号
*
* @param serverPort 服务器端口
* @return 服务器编号
*/
public static String generateServerId(Integer serverPort) {
String serverId = String.format("%s.%d", SystemUtil.getHostInfo().getAddress(), serverPort);
// 避免一些场景无法使用 . 符号,例如说 RocketMQ Topic
return serverId.replaceAll("\\.", "_");
}
package com.viewsh.module.iot.core.util;
import cn.hutool.core.lang.Assert;
import cn.hutool.core.map.MapUtil;
import cn.hutool.core.util.IdUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.system.SystemUtil;
import com.viewsh.module.iot.core.enums.IotDeviceMessageMethodEnum;
import com.viewsh.module.iot.core.mq.message.IotDeviceMessage;
import java.util.Map;
/**
* IoT 设备【消息】的工具类
*
* @author 芋道源码
*/
public class IotDeviceMessageUtils {
// ========== Message 相关 ==========
public static String generateMessageId() {
return IdUtil.fastSimpleUUID();
}
/**
* 是否是上行消息:由设备发送
*
* @param message 消息
* @return 是否
*/
@SuppressWarnings("SimplifiableConditionalExpression")
public static boolean isUpstreamMessage(IotDeviceMessage message) {
// 如果是回复消息,则认为是下行消息
if (isReplyMessage(message)) {
return false;
}
String method = message.getMethod();
// JT808 协议方法terminal 开头为上行platform 开头为下行
if (method != null && method.startsWith("jt808.")) {
return method.startsWith("jt808.terminal.");
}
// 标准物模型方法
IotDeviceMessageMethodEnum methodEnum = IotDeviceMessageMethodEnum.of(method);
Assert.notNull(methodEnum, "无法识别的消息方法:" + method);
return methodEnum.getUpstream();
}
/**
* 是否是回复消息
* <p>
* 判断逻辑code != null 表示是业务层的回复消息
*
* @param message 消息
* @return 是否
*/
public static boolean isReplyMessage(IotDeviceMessage message) {
return message.getCode() != null;
}
/**
* 是否跳过业务层回复
* <p>
* 判断逻辑skipReply=true 表示协议层已处理回复(如 JT808 通用应答),业务层无需再回复
*
* @param message 消息
* @return 是否
*/
public static boolean isSkipReply(IotDeviceMessage message) {
return Boolean.TRUE.equals(message.getSkipReply());
}
/**
* 提取消息中的标识符
*
* @param message 消息
* @return 标识符
*/
@SuppressWarnings("unchecked")
public static String getIdentifier(IotDeviceMessage message) {
if (message.getParams() == null) {
return null;
}
if (StrUtil.equalsAny(message.getMethod(), IotDeviceMessageMethodEnum.EVENT_POST.getMethod(),
IotDeviceMessageMethodEnum.SERVICE_INVOKE.getMethod())) {
Map<String, Object> params = (Map<String, Object>) message.getParams();
return MapUtil.getStr(params, "identifier");
} else if (StrUtil.equalsAny(message.getMethod(), IotDeviceMessageMethodEnum.STATE_UPDATE.getMethod())) {
Map<String, Object> params = (Map<String, Object>) message.getParams();
return MapUtil.getStr(params, "state");
}
return null;
}
/**
* 从设备消息中提取指定标识符的属性值
* - 支持多种消息格式和属性值提取策略
* - 兼容现有的消息结构
* - 提供统一的属性值提取接口
* <p>
* 支持的提取策略(按优先级顺序):
* 1. 直接值:如果 params 不是 Map直接返回该值适用于简单消息
* 2. 标识符字段:从 params[identifier] 获取
* 3. properties 结构:从 params.properties[identifier] 获取(标准属性上报)
* 4. data 结构:从 params.data[identifier] 获取
* 5. value 字段:从 params.value 获取(单值消息)
* 6. 单值 Map如果 Map 只包含 identifier 和一个值,返回该值
*
* @param message 设备消息
* @param identifier 属性标识符
* @return 属性值,如果未找到则返回 null
*/
@SuppressWarnings("unchecked")
public static Object extractPropertyValue(IotDeviceMessage message, String identifier) {
Object params = message.getParams();
if (params == null) {
return null;
}
// 策略1如果 params 不是 Map直接返回该值适用于简单的单属性消息
if (!(params instanceof Map)) {
return params;
}
Map<String, Object> paramsMap = (Map<String, Object>) params;
// 策略2直接通过标识符获取属性值
Object directValue = paramsMap.get(identifier);
if (directValue != null) {
return directValue;
}
// 策略3从 properties 字段中获取(适用于标准属性上报消息)
Object properties = paramsMap.get("properties");
if (properties instanceof Map) {
Map<String, Object> propertiesMap = (Map<String, Object>) properties;
Object propertyValue = propertiesMap.get(identifier);
if (propertyValue != null) {
return propertyValue;
}
}
// 策略4从 data 字段中获取(适用于某些消息格式)
Object data = paramsMap.get("data");
if (data instanceof Map) {
Map<String, Object> dataMap = (Map<String, Object>) data;
Object dataValue = dataMap.get(identifier);
if (dataValue != null) {
return dataValue;
}
}
// 策略5从 value 字段中获取(适用于单值消息)
Object value = paramsMap.get("value");
if (value != null) {
return value;
}
// 策略6如果 Map 只有两个字段且包含 identifier返回另一个字段的值
if (paramsMap.size() == 2 && paramsMap.containsKey("identifier")) {
for (Map.Entry<String, Object> entry : paramsMap.entrySet()) {
if (!"identifier".equals(entry.getKey())) {
return entry.getValue();
}
}
}
// 未找到对应的属性值
return null;
}
// ========== Topic 相关 ==========
public static String buildMessageBusGatewayDeviceMessageTopic(String serverId) {
return String.format(IotDeviceMessage.MESSAGE_BUS_GATEWAY_DEVICE_MESSAGE_TOPIC, serverId);
}
/**
* 生成服务器编号
*
* @param serverPort 服务器端口
* @return 服务器编号
*/
public static String generateServerId(Integer serverPort) {
String serverId = String.format("%s.%d", SystemUtil.getHostInfo().getAddress(), serverPort);
// 避免一些场景无法使用 . 符号,例如说 RocketMQ Topic
return serverId.replaceAll("\\.", "_");
}
}

View File

@@ -11,7 +11,6 @@ import com.viewsh.module.iot.gateway.codec.jt808.util.Jt808ProtocolUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -115,36 +114,23 @@ public class IotJt808DeviceMessageCodec implements IotDeviceMessageCodec {
int msgId = head.getId();
// 生成消息ID使用流水号作为标识
String messageId = head.getTerminalPhone() + "_" + head.getFlowId();
String terminalPhone = head.getTerminalPhone();
int flowId = head.getFlowId();
String messageId = terminalPhone + "_" + flowId;
// 根据消息ID确定物模型标准方法名
String method = getStandardMethodName(msgId);
Object params = parseMessageParams(dataPack, msgId);
// 构建元数据(保留在 params 中用于调试和追踪)
if (params instanceof Map) {
@SuppressWarnings("unchecked")
// 确保 paramsMap 是可变的,防止 parseMessageParams 返回不可变 Map 导致 put 报错
Map<String, Object> paramsMap = new HashMap<>((Map<String, Object>) params);
Map<String, Object> metadata = new HashMap<>();
metadata.put("jt808MsgId", String.format("0x%04X", msgId));
metadata.put("terminalPhone", head.getTerminalPhone());
metadata.put("flowId", head.getFlowId());
metadata.put("encryptionType", head.getEncryptionType());
paramsMap.put("_metadata", metadata);
// 更新 params 引用,确保使用的是包含 metadata 的可变 Map
params = paramsMap;
}
// 构建元数据并添加到 params 中用于调试和追踪)
params = enrichParamsWithMetadata(params, msgId, terminalPhone, flowId, head.getEncryptionType());
// 创建 IotDeviceMessage
IotDeviceMessage message = IotDeviceMessage.of(messageId, method, params, null, null, null);
message.setReportTime(LocalDateTime.now());
return message;
return IotDeviceMessage.of(messageId, method, params, null, null, null);
}
/**
* 根据 JT808 消息ID获取物模型标准方法名
*
* <p>
* 映射关系:
* - 0x0002 心跳 -> jt808.terminal.heartbeatJT808 专用,不映射到物模型)
* - 0x0200 位置上报 -> thing.property.post属性上报
@@ -214,18 +200,18 @@ public class IotJt808DeviceMessageCodec implements IotDeviceMessageCodec {
/**
* 解析按键事件为事件上报格式thing.event.post
*
* <p>
* 物模型标准格式:
* {
* "identifier": "button_event",
* "eventTime": 1234567890,
* "params": {
* "keyId": 1,
* "keyState": 1,
* "keyType": "short_press",
* "keyNumber": 1,
* "isLongPress": false
* }
* "identifier": "button_event",
* "eventTime": 1234567890,
* "params": {
* "keyId": 1,
* "keyState": 1,
* "keyType": "short_press",
* "keyNumber": 1,
* "isLongPress": false
* }
* }
*/
private Map<String, Object> parseButtonEventAsEvent(Jt808DataPack dataPack) {
@@ -233,20 +219,17 @@ public class IotJt808DeviceMessageCodec implements IotDeviceMessageCodec {
Map<String, Object> result = new HashMap<>();
// 统一使用一个事件标识符,通过 isLongPress 参数区分短按和长按
// 注意:必须使用 "identifier" 字段,以便 IotDeviceMessageUtils.getIdentifier() 正确提取
// 使用 identifier 字段(符合物模型标准格式),用于存储到数据库的 identifier 字段
result.put("identifier", "button_event");
// 事件时间戳
result.put("eventTime", System.currentTimeMillis());
// 事件参数(包含 isLongPress 字段用于区分短按和长按
// 事件参数(只保留 keyId 和 keyState 两个字段
// 通过 keyId 可以判断是长按还是短按keyId >= 0x0B 为长按
Map<String, Object> eventParams = new HashMap<>();
eventParams.put("keyId", event.getKeyId());
eventParams.put("keyState", event.getKeyState());
eventParams.put("keyType", event.getKeyType());
eventParams.put("keyNumber", event.getKeyNumber());
eventParams.put("isLongPress", event.getIsLongPress());
result.put("params", eventParams);
return result;
@@ -254,13 +237,13 @@ public class IotJt808DeviceMessageCodec implements IotDeviceMessageCodec {
/**
* 解析位置信息为属性上报格式thing.property.post
*
* <p>
* 物模型标准格式params 直接就是属性键值对
* {
* "latitude": 31.123456,
* "longitude": 121.123456,
* "batteryLevel": 80,
* ...
* "latitude": 31.123456,
* "longitude": 121.123456,
* "batteryLevel": 80,
* ...
* }
*/
private Map<String, Object> parseLocationInfoAsProperties(Jt808DataPack dataPack) {
@@ -430,15 +413,15 @@ public class IotJt808DeviceMessageCodec implements IotDeviceMessageCodec {
/**
* 编码服务调用thing.service.invoke
*
* <p>
* 根据服务标识符映射到不同的 JT808 指令
*
* <p>
* 消息格式:
* {
* "identifier": "服务标识符",
* "params": {
* // 服务参数
* }
* "identifier": "服务标识符",
* "params": {
* // 服务参数
* }
* }
*/
private byte[] encodeServiceInvoke(IotDeviceMessage message, String phone, int flowId) {
@@ -483,15 +466,15 @@ public class IotJt808DeviceMessageCodec implements IotDeviceMessageCodec {
/**
* 编码属性设置thing.property.set
*
* <p>
* 属性设置映射到 JT808 的参数设置指令 (0x8103)
*
* <p>
* 消息格式:
* {
* "properties": {
* "identifier1": value1,
* "identifier2": value2
* }
* "properties": {
* "identifier1": value1,
* "identifier2": value2
* }
* }
*/
private byte[] encodePropertySet(IotDeviceMessage message, String phone, int flowId) {
@@ -520,7 +503,7 @@ public class IotJt808DeviceMessageCodec implements IotDeviceMessageCodec {
/**
* 将物模型属性映射到 JT808 参数
*
* <p>
* 映射关系参考 JT808 协议标准:
* - 0x0029: 心跳发送间隔
* - 0x0027: 位置汇报间隔
@@ -560,35 +543,61 @@ public class IotJt808DeviceMessageCodec implements IotDeviceMessageCodec {
/**
* 提取终端手机号
*
* <p>
* 优先级:
* 1. 从 params._deviceName 中获取下发场景IotTcpDownstreamHandler 自动注入)
* 2. 从 params._metadata.terminalPhone 中获取(上行消息回复场景)
* 3. 从 params.phone 中获取(手动指定,向下兼容,不推荐)
*/
private String extractPhoneNumber(IotDeviceMessage message) {
if (!(message.getParams() instanceof Map)) {
log.error("[extractPhoneNumber][params 不是 Map 类型,消息: {}]", message);
throw new IllegalArgumentException("消息参数格式错误params 必须是 Map 类型");
// 尝试从 params 提取
if (message.getParams() instanceof Map) {
Map<?, ?> params = (Map<?, ?>) message.getParams();
String phone = extractPhoneFromMap(params);
if (phone != null) {
return phone;
}
}
Map<?, ?> params = (Map<?, ?>) message.getParams();
// 尝试从 data 提取(应答消息使用 replyOf 时,参数在 data 字段)
if (message.getData() instanceof Map) {
Map<?, ?> data = (Map<?, ?>) message.getData();
String phone = extractPhoneFromMap(data);
if (phone != null) {
return phone;
}
}
// 如果都获取不到,抛出异常
log.error("[extractPhoneNumber][无法提取终端手机号params: {}, data: {}]",
message.getParams(), message.getData());
throw new IllegalArgumentException(
"消息中缺少终端手机号。请确保设备的 deviceName 为终端手机号(纯数字),例如: \"13800138000\"");
}
/**
* 从 Map 中提取终端手机号
*/
private String extractPhoneFromMap(Map<?, ?> map) {
if (map == null) {
return null;
}
// 1. 优先从 _deviceName 获取(下发场景,由 IotTcpDownstreamHandler 注入)
Object deviceName = params.get("_deviceName");
Object deviceName = map.get("_deviceName");
if (deviceName != null && StrUtil.isNotBlank(deviceName.toString())) {
String deviceNameStr = deviceName.toString().trim();
// 验证是否为数字(终端手机号应该是纯数字)
if (deviceNameStr.matches("\\d+")) {
return deviceNameStr;
} else {
log.warn("[extractPhoneNumber][_deviceName 不是纯数字: {}]", deviceNameStr);
log.warn("[extractPhoneFromMap][_deviceName 不是纯数字: {}]", deviceNameStr);
}
}
// 2. 从 metadata 中获取(上行消息回复场景)
if (params.get("_metadata") instanceof Map) {
Map<?, ?> metadata = (Map<?, ?>) params.get("_metadata");
if (map.get("_metadata") instanceof Map) {
Map<?, ?> metadata = (Map<?, ?>) map.get("_metadata");
Object terminalPhone = metadata.get("terminalPhone");
if (terminalPhone != null && StrUtil.isNotBlank(terminalPhone.toString())) {
return terminalPhone.toString();
@@ -596,7 +605,7 @@ public class IotJt808DeviceMessageCodec implements IotDeviceMessageCodec {
}
// 3. 从 phone 字段获取(向下兼容,不推荐)
Object phone = params.get("phone");
Object phone = map.get("phone");
if (phone != null && StrUtil.isNotBlank(phone.toString())) {
String phoneStr = phone.toString().trim();
if (phoneStr.matches("\\d+")) {
@@ -604,34 +613,30 @@ public class IotJt808DeviceMessageCodec implements IotDeviceMessageCodec {
}
}
// 4. 如果都获取不到,抛出异常
log.error("[extractPhoneNumber][无法提取终端手机号params: {}]", params);
throw new IllegalArgumentException(
"消息中缺少终端手机号。请确保设备的 deviceName 为终端手机号(纯数字),例如: \"13800138000\"");
return null;
}
/**
* 提取流水号
*
* <p>
* 对于下发消息,如果没有指定流水号,则生成一个随机流水号
*/
private int extractFlowId(IotDeviceMessage message) {
// 尝试从 params 提取
if (message.getParams() instanceof Map) {
Map<?, ?> params = (Map<?, ?>) message.getParams();
// 尝试获取显式指定的流水号
Object flowId = params.get("flowId");
if (flowId instanceof Number) {
return ((Number) flowId).intValue();
Integer flowId = extractFlowIdFromMap(params);
if (flowId != null) {
return flowId;
}
}
// 尝试从 metadata 中获取(上行消息的流水号
if (params.get("_metadata") instanceof Map) {
Map<?, ?> metadata = (Map<?, ?>) params.get("_metadata");
Object metaFlowId = metadata.get("flowId");
if (metaFlowId instanceof Number) {
return ((Number) metaFlowId).intValue();
}
// 尝试从 data 提取(应答消息使用 replyOf 时,参数在 data 字段
if (message.getData() instanceof Map) {
Map<?, ?> data = (Map<?, ?>) message.getData();
Integer flowId = extractFlowIdFromMap(data);
if (flowId != null) {
return flowId;
}
}
@@ -639,19 +644,61 @@ public class IotJt808DeviceMessageCodec implements IotDeviceMessageCodec {
return (int) (System.currentTimeMillis() % 65535) + 1;
}
/**
* 从 Map 中提取流水号
*/
private Integer extractFlowIdFromMap(Map<?, ?> map) {
if (map == null) {
return null;
}
// 尝试获取显式指定的流水号
Object flowId = map.get("flowId");
if (flowId instanceof Number) {
return ((Number) flowId).intValue();
}
// 尝试从 metadata 中获取(上行消息的流水号)
if (map.get("_metadata") instanceof Map) {
Map<?, ?> metadata = (Map<?, ?>) map.get("_metadata");
Object metaFlowId = metadata.get("flowId");
if (metaFlowId instanceof Number) {
return ((Number) metaFlowId).intValue();
}
}
return null;
}
/**
* 获取参数为Map
* <p>
* 优先从 params 获取,如果 params 为空则从 data 获取(应答消息场景)
*/
@SuppressWarnings("unchecked")
private Map<String, Object> getParamsAsMap(IotDeviceMessage message) {
// 优先从 params 获取
if (message.getParams() instanceof Map) {
return (Map<String, Object>) message.getParams();
}
// 尝试JSON转换
// 从 data 获取(应答消息使用 replyOf 时,参数在 data 字段)
if (message.getData() instanceof Map) {
return (Map<String, Object>) message.getData();
}
// 尝试JSON转换 params
if (message.getParams() != null) {
String json = JsonUtils.toJsonString(message.getParams());
return JsonUtils.parseObject(json, Map.class);
}
// 尝试JSON转换 data
if (message.getData() != null) {
String json = JsonUtils.toJsonString(message.getData());
return JsonUtils.parseObject(json, Map.class);
}
return new HashMap<>();
}
@@ -682,4 +729,42 @@ public class IotJt808DeviceMessageCodec implements IotDeviceMessageCodec {
&& data[data.length - 1] == (byte) Jt808Constants.PKG_DELIMITER;
}
/**
* 为参数对象添加元数据
*
* @param params 原始参数对象
* @param msgId JT808 消息ID
* @param terminalPhone 终端手机号
* @param flowId 流水号
* @param encryptionType 加密类型
* @return 包含元数据的参数对象
*/
private Object enrichParamsWithMetadata(Object params, int msgId, String terminalPhone,
int flowId, int encryptionType) {
// 构建元数据 Map
Map<String, Object> metadata = new HashMap<>(4);
metadata.put("jt808MsgId", String.format("0x%04X", msgId));
metadata.put("terminalPhone", terminalPhone);
metadata.put("flowId", flowId);
metadata.put("encryptionType", encryptionType);
// 如果 params 是 Map 类型,则添加元数据
if (params instanceof Map) {
@SuppressWarnings("unchecked")
Map<String, Object> paramsMap = (Map<String, Object>) params;
// 创建新的可变 Map确保可以添加元数据
// 注意:某些解析方法可能返回不可变的 Map如 Map.of()),需要转换为可变 Map
Map<String, Object> enrichedParams = new HashMap<>(paramsMap);
enrichedParams.put("_metadata", metadata);
return enrichedParams;
}
// 如果 params 不是 Map则包装为包含元数据的 Map
// 保留原始 params 在 _data 字段中
Map<String, Object> wrapper = new HashMap<>(2);
wrapper.put("_data", params);
wrapper.put("_metadata", metadata);
return wrapper;
}
}

View File

@@ -114,8 +114,8 @@ public class Jt808ProtocolHandler extends AbstractProtocolHandler {
String productKey, String deviceName, String serverId) {
try {
// 1. 标记消息为"协议层已应答",业务层无需再发送 reply
// 设置 code = 0 后isReplyMessage() 返回 true业务层跳过 reply
message.setCode(0);
// 设置 skipReply = true,业务层跳过 reply
message.setSkipReply(true);
// 2. 发送消息到消息总线
deviceMessageService.sendDeviceMessage(message, productKey, deviceName, serverId);

View File

@@ -169,10 +169,12 @@ public class IotDeviceMessageServiceImpl implements IotDeviceMessageService {
getSelf().createDeviceLogAsync(message);
// 3. 回复消息。前提:非 _reply 消息,并且非禁用回复的消息
// 条件1防止对"回复消息"再次回复,避免无限循环
// 条件2某些特定的method不需要回复如设备状态变更、OTA进度上报
// 条件3新增HTTP短连接场景因为已经在请求中直接响应了不需要再通过消息总线发送回复
if (IotDeviceMessageUtils.isReplyMessage(message)
// 条件1协议层已处理回复skipReply=true业务层无需再回复
// 条件2防止对"回复消息"再次回复,避免无限循环
// 条件3某些特定的method不需要回复如设备状态变更、OTA进度上报
// 条件4HTTP短连接场景因为已经在请求中直接响应了不需要再通过消息总线发送回复
if (IotDeviceMessageUtils.isSkipReply(message)
|| IotDeviceMessageUtils.isReplyMessage(message)
|| IotDeviceMessageMethodEnum.isReplyDisabled(message.getMethod())
|| StrUtil.isEmpty(message.getServerId())) {
return; // serverId 为空,不记录回复消息