Compare commits
4 Commits
c75696c644
...
4e387e410c
| Author | SHA1 | Date | |
|---|---|---|---|
| 4e387e410c | |||
| 110cfc8cf8 | |||
| 0c2cd85915 | |||
| 529171aca6 |
@@ -1,151 +1,160 @@
|
||||
package com.viewsh.module.iot.core.mq.message;
|
||||
|
||||
import cn.hutool.core.map.MapUtil;
|
||||
import com.viewsh.framework.common.exception.enums.GlobalErrorCodeConstants;
|
||||
import com.viewsh.module.iot.core.enums.IotDeviceMessageMethodEnum;
|
||||
import com.viewsh.module.iot.core.enums.IotDeviceStateEnum;
|
||||
import com.viewsh.module.iot.core.util.IotDeviceMessageUtils;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Builder;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
import java.time.LocalDateTime;
|
||||
|
||||
/**
|
||||
* IoT 设备消息
|
||||
*/
|
||||
@Data
|
||||
@NoArgsConstructor
|
||||
@AllArgsConstructor
|
||||
@Builder
|
||||
public class IotDeviceMessage {
|
||||
|
||||
/**
|
||||
* 【消息总线】应用的设备消息 Topic,由 iot-gateway 发给 iot-biz 进行消费
|
||||
*/
|
||||
public static final String MESSAGE_BUS_DEVICE_MESSAGE_TOPIC = "iot_device_message";
|
||||
|
||||
/**
|
||||
* 【消息总线】设备消息 Topic,由 iot-biz 发送给 iot-gateway 的某个 "server"(protocol) 进行消费
|
||||
*
|
||||
* 其中,%s 就是该"server"(protocol) 的标识
|
||||
*/
|
||||
public static final String MESSAGE_BUS_GATEWAY_DEVICE_MESSAGE_TOPIC = MESSAGE_BUS_DEVICE_MESSAGE_TOPIC + "_%s";
|
||||
|
||||
/**
|
||||
* 消息编号
|
||||
*
|
||||
* 由后端生成,通过 {@link IotDeviceMessageUtils#generateMessageId()}
|
||||
*/
|
||||
private String id;
|
||||
/**
|
||||
* 上报时间
|
||||
*
|
||||
* 由后端生成,当前时间
|
||||
*/
|
||||
private LocalDateTime reportTime;
|
||||
|
||||
/**
|
||||
* 设备编号
|
||||
*/
|
||||
private Long deviceId;
|
||||
/**
|
||||
* 租户编号
|
||||
*/
|
||||
private Long tenantId;
|
||||
|
||||
/**
|
||||
* 服务编号,该消息由哪个 server 发送
|
||||
*/
|
||||
private String serverId;
|
||||
|
||||
// ========== codec(编解码)字段 ==========
|
||||
|
||||
/**
|
||||
* 请求编号
|
||||
*
|
||||
* 由设备生成,对应阿里云 IoT 的 Alink 协议中的 id、华为云 IoTDA 协议的 request_id
|
||||
*/
|
||||
private String requestId;
|
||||
/**
|
||||
* 请求方法
|
||||
*
|
||||
* 枚举 {@link IotDeviceMessageMethodEnum}
|
||||
* 例如说:thing.property.report 属性上报
|
||||
*/
|
||||
private String method;
|
||||
/**
|
||||
* 请求参数
|
||||
*
|
||||
* 例如说:属性上报的 properties、事件上报的 params
|
||||
*/
|
||||
private Object params;
|
||||
/**
|
||||
* 响应结果
|
||||
*/
|
||||
private Object data;
|
||||
/**
|
||||
* 响应错误码
|
||||
*/
|
||||
private Integer code;
|
||||
/**
|
||||
* 返回结果信息
|
||||
*/
|
||||
private String msg;
|
||||
|
||||
// ========== 基础方法:只传递"codec(编解码)字段" ==========
|
||||
|
||||
public static IotDeviceMessage requestOf(String method) {
|
||||
return requestOf(null, method, null);
|
||||
}
|
||||
|
||||
public static IotDeviceMessage requestOf(String method, Object params) {
|
||||
return requestOf(null, method, params);
|
||||
}
|
||||
|
||||
public static IotDeviceMessage requestOf(String requestId, String method, Object params) {
|
||||
return of(requestId, method, params, null, null, null);
|
||||
}
|
||||
|
||||
public static IotDeviceMessage replyOf(String requestId, String method,
|
||||
Object data, Integer code, String msg) {
|
||||
if (code == null) {
|
||||
code = GlobalErrorCodeConstants.SUCCESS.getCode();
|
||||
msg = GlobalErrorCodeConstants.SUCCESS.getMsg();
|
||||
}
|
||||
return of(requestId, method, null, data, code, msg);
|
||||
}
|
||||
|
||||
public static IotDeviceMessage of(String requestId, String method,
|
||||
Object params, Object data, Integer code, String msg) {
|
||||
// 通用参数
|
||||
IotDeviceMessage message = new IotDeviceMessage()
|
||||
.setId(IotDeviceMessageUtils.generateMessageId()).setReportTime(LocalDateTime.now());
|
||||
// 当前参数
|
||||
message.setRequestId(requestId).setMethod(method).setParams(params)
|
||||
.setData(data).setCode(code).setMsg(msg);
|
||||
return message;
|
||||
}
|
||||
|
||||
// ========== 核心方法:在 of 基础方法之上,添加对应 method ==========
|
||||
|
||||
public static IotDeviceMessage buildStateUpdateOnline() {
|
||||
return requestOf(IotDeviceMessageMethodEnum.STATE_UPDATE.getMethod(),
|
||||
MapUtil.of("state", IotDeviceStateEnum.ONLINE.getState()));
|
||||
}
|
||||
|
||||
public static IotDeviceMessage buildStateOffline() {
|
||||
return requestOf(IotDeviceMessageMethodEnum.STATE_UPDATE.getMethod(),
|
||||
MapUtil.of("state", IotDeviceStateEnum.OFFLINE.getState()));
|
||||
}
|
||||
|
||||
public static IotDeviceMessage buildOtaUpgrade(String version, String fileUrl, Long fileSize,
|
||||
String fileDigestAlgorithm, String fileDigestValue) {
|
||||
return requestOf(IotDeviceMessageMethodEnum.OTA_UPGRADE.getMethod(), MapUtil.builder()
|
||||
.put("version", version).put("fileUrl", fileUrl).put("fileSize", fileSize)
|
||||
.put("fileDigestAlgorithm", fileDigestAlgorithm).put("fileDigestValue", fileDigestValue)
|
||||
.build());
|
||||
}
|
||||
|
||||
}
|
||||
package com.viewsh.module.iot.core.mq.message;
|
||||
|
||||
import cn.hutool.core.map.MapUtil;
|
||||
import com.viewsh.framework.common.exception.enums.GlobalErrorCodeConstants;
|
||||
import com.viewsh.module.iot.core.enums.IotDeviceMessageMethodEnum;
|
||||
import com.viewsh.module.iot.core.enums.IotDeviceStateEnum;
|
||||
import com.viewsh.module.iot.core.util.IotDeviceMessageUtils;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Builder;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
import java.time.LocalDateTime;
|
||||
|
||||
/**
|
||||
* IoT 设备消息
|
||||
*/
|
||||
@Data
|
||||
@NoArgsConstructor
|
||||
@AllArgsConstructor
|
||||
@Builder
|
||||
public class IotDeviceMessage {
|
||||
|
||||
/**
|
||||
* 【消息总线】应用的设备消息 Topic,由 iot-gateway 发给 iot-biz 进行消费
|
||||
*/
|
||||
public static final String MESSAGE_BUS_DEVICE_MESSAGE_TOPIC = "iot_device_message";
|
||||
|
||||
/**
|
||||
* 【消息总线】设备消息 Topic,由 iot-biz 发送给 iot-gateway 的某个 "server"(protocol) 进行消费
|
||||
*
|
||||
* 其中,%s 就是该"server"(protocol) 的标识
|
||||
*/
|
||||
public static final String MESSAGE_BUS_GATEWAY_DEVICE_MESSAGE_TOPIC = MESSAGE_BUS_DEVICE_MESSAGE_TOPIC + "_%s";
|
||||
|
||||
/**
|
||||
* 消息编号
|
||||
*
|
||||
* 由后端生成,通过 {@link IotDeviceMessageUtils#generateMessageId()}
|
||||
*/
|
||||
private String id;
|
||||
/**
|
||||
* 上报时间
|
||||
*
|
||||
* 由后端生成,当前时间
|
||||
*/
|
||||
private LocalDateTime reportTime;
|
||||
|
||||
/**
|
||||
* 设备编号
|
||||
*/
|
||||
private Long deviceId;
|
||||
/**
|
||||
* 租户编号
|
||||
*/
|
||||
private Long tenantId;
|
||||
|
||||
/**
|
||||
* 服务编号,该消息由哪个 server 发送
|
||||
*/
|
||||
private String serverId;
|
||||
|
||||
// ========== codec(编解码)字段 ==========
|
||||
|
||||
/**
|
||||
* 请求编号
|
||||
*
|
||||
* 由设备生成,对应阿里云 IoT 的 Alink 协议中的 id、华为云 IoTDA 协议的 request_id
|
||||
*/
|
||||
private String requestId;
|
||||
/**
|
||||
* 请求方法
|
||||
*
|
||||
* 枚举 {@link IotDeviceMessageMethodEnum}
|
||||
* 例如说:thing.property.report 属性上报
|
||||
*/
|
||||
private String method;
|
||||
/**
|
||||
* 请求参数
|
||||
*
|
||||
* 例如说:属性上报的 properties、事件上报的 params
|
||||
*/
|
||||
private Object params;
|
||||
/**
|
||||
* 响应结果
|
||||
*/
|
||||
private Object data;
|
||||
/**
|
||||
* 响应错误码
|
||||
*/
|
||||
private Integer code;
|
||||
|
||||
/**
|
||||
* 是否跳过业务层回复
|
||||
* <p>
|
||||
* true = 协议层已处理回复(如 JT808 通用应答),业务层无需再回复
|
||||
* false 或 null = 需要业务层正常处理回复逻辑
|
||||
*/
|
||||
private Boolean skipReply;
|
||||
|
||||
/**
|
||||
* 返回结果信息
|
||||
*/
|
||||
private String msg;
|
||||
|
||||
// ========== 基础方法:只传递"codec(编解码)字段" ==========
|
||||
|
||||
public static IotDeviceMessage requestOf(String method) {
|
||||
return requestOf(null, method, null);
|
||||
}
|
||||
|
||||
public static IotDeviceMessage requestOf(String method, Object params) {
|
||||
return requestOf(null, method, params);
|
||||
}
|
||||
|
||||
public static IotDeviceMessage requestOf(String requestId, String method, Object params) {
|
||||
return of(requestId, method, params, null, null, null);
|
||||
}
|
||||
|
||||
public static IotDeviceMessage replyOf(String requestId, String method,
|
||||
Object data, Integer code, String msg) {
|
||||
if (code == null) {
|
||||
code = GlobalErrorCodeConstants.SUCCESS.getCode();
|
||||
msg = GlobalErrorCodeConstants.SUCCESS.getMsg();
|
||||
}
|
||||
return of(requestId, method, null, data, code, msg);
|
||||
}
|
||||
|
||||
public static IotDeviceMessage of(String requestId, String method,
|
||||
Object params, Object data, Integer code, String msg) {
|
||||
// 通用参数
|
||||
IotDeviceMessage message = new IotDeviceMessage()
|
||||
.setId(IotDeviceMessageUtils.generateMessageId()).setReportTime(LocalDateTime.now());
|
||||
// 当前参数
|
||||
message.setRequestId(requestId).setMethod(method).setParams(params)
|
||||
.setData(data).setCode(code).setMsg(msg);
|
||||
return message;
|
||||
}
|
||||
|
||||
// ========== 核心方法:在 of 基础方法之上,添加对应 method ==========
|
||||
|
||||
public static IotDeviceMessage buildStateUpdateOnline() {
|
||||
return requestOf(IotDeviceMessageMethodEnum.STATE_UPDATE.getMethod(),
|
||||
MapUtil.of("state", IotDeviceStateEnum.ONLINE.getState()));
|
||||
}
|
||||
|
||||
public static IotDeviceMessage buildStateOffline() {
|
||||
return requestOf(IotDeviceMessageMethodEnum.STATE_UPDATE.getMethod(),
|
||||
MapUtil.of("state", IotDeviceStateEnum.OFFLINE.getState()));
|
||||
}
|
||||
|
||||
public static IotDeviceMessage buildOtaUpgrade(String version, String fileUrl, Long fileSize,
|
||||
String fileDigestAlgorithm, String fileDigestValue) {
|
||||
return requestOf(IotDeviceMessageMethodEnum.OTA_UPGRADE.getMethod(), MapUtil.builder()
|
||||
.put("version", version).put("fileUrl", fileUrl).put("fileSize", fileSize)
|
||||
.put("fileDigestAlgorithm", fileDigestAlgorithm).put("fileDigestValue", fileDigestValue)
|
||||
.build());
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -1,167 +1,192 @@
|
||||
package com.viewsh.module.iot.core.util;
|
||||
|
||||
import cn.hutool.core.lang.Assert;
|
||||
import cn.hutool.core.map.MapUtil;
|
||||
import cn.hutool.core.util.IdUtil;
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
import cn.hutool.system.SystemUtil;
|
||||
import com.viewsh.module.iot.core.enums.IotDeviceMessageMethodEnum;
|
||||
import com.viewsh.module.iot.core.mq.message.IotDeviceMessage;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* IoT 设备【消息】的工具类
|
||||
*
|
||||
* @author 芋道源码
|
||||
*/
|
||||
public class IotDeviceMessageUtils {
|
||||
|
||||
// ========== Message 相关 ==========
|
||||
|
||||
public static String generateMessageId() {
|
||||
return IdUtil.fastSimpleUUID();
|
||||
}
|
||||
|
||||
/**
|
||||
* 是否是上行消息:由设备发送
|
||||
*
|
||||
* @param message 消息
|
||||
* @return 是否
|
||||
*/
|
||||
@SuppressWarnings("SimplifiableConditionalExpression")
|
||||
public static boolean isUpstreamMessage(IotDeviceMessage message) {
|
||||
IotDeviceMessageMethodEnum methodEnum = IotDeviceMessageMethodEnum.of(message.getMethod());
|
||||
Assert.notNull(methodEnum, "无法识别的消息方法:" + message.getMethod());
|
||||
// 注意:回复消息时,需要取反
|
||||
return !isReplyMessage(message) ? methodEnum.getUpstream() : !methodEnum.getUpstream();
|
||||
}
|
||||
|
||||
/**
|
||||
* 是否是回复消息,通过 {@link IotDeviceMessage#getCode()} 非空进行识别
|
||||
*
|
||||
* @param message 消息
|
||||
* @return 是否
|
||||
*/
|
||||
public static boolean isReplyMessage(IotDeviceMessage message) {
|
||||
return message.getCode() != null;
|
||||
}
|
||||
|
||||
/**
|
||||
* 提取消息中的标识符
|
||||
*
|
||||
* @param message 消息
|
||||
* @return 标识符
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
public static String getIdentifier(IotDeviceMessage message) {
|
||||
if (message.getParams() == null) {
|
||||
return null;
|
||||
}
|
||||
if (StrUtil.equalsAny(message.getMethod(), IotDeviceMessageMethodEnum.EVENT_POST.getMethod(),
|
||||
IotDeviceMessageMethodEnum.SERVICE_INVOKE.getMethod())) {
|
||||
Map<String, Object> params = (Map<String, Object>) message.getParams();
|
||||
return MapUtil.getStr(params, "identifier");
|
||||
} else if (StrUtil.equalsAny(message.getMethod(), IotDeviceMessageMethodEnum.STATE_UPDATE.getMethod())) {
|
||||
Map<String, Object> params = (Map<String, Object>) message.getParams();
|
||||
return MapUtil.getStr(params, "state");
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* 从设备消息中提取指定标识符的属性值
|
||||
* - 支持多种消息格式和属性值提取策略
|
||||
* - 兼容现有的消息结构
|
||||
* - 提供统一的属性值提取接口
|
||||
* <p>
|
||||
* 支持的提取策略(按优先级顺序):
|
||||
* 1. 直接值:如果 params 不是 Map,直接返回该值(适用于简单消息)
|
||||
* 2. 标识符字段:从 params[identifier] 获取
|
||||
* 3. properties 结构:从 params.properties[identifier] 获取(标准属性上报)
|
||||
* 4. data 结构:从 params.data[identifier] 获取
|
||||
* 5. value 字段:从 params.value 获取(单值消息)
|
||||
* 6. 单值 Map:如果 Map 只包含 identifier 和一个值,返回该值
|
||||
*
|
||||
* @param message 设备消息
|
||||
* @param identifier 属性标识符
|
||||
* @return 属性值,如果未找到则返回 null
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
public static Object extractPropertyValue(IotDeviceMessage message, String identifier) {
|
||||
Object params = message.getParams();
|
||||
if (params == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
// 策略1:如果 params 不是 Map,直接返回该值(适用于简单的单属性消息)
|
||||
if (!(params instanceof Map)) {
|
||||
return params;
|
||||
}
|
||||
|
||||
Map<String, Object> paramsMap = (Map<String, Object>) params;
|
||||
|
||||
// 策略2:直接通过标识符获取属性值
|
||||
Object directValue = paramsMap.get(identifier);
|
||||
if (directValue != null) {
|
||||
return directValue;
|
||||
}
|
||||
|
||||
// 策略3:从 properties 字段中获取(适用于标准属性上报消息)
|
||||
Object properties = paramsMap.get("properties");
|
||||
if (properties instanceof Map) {
|
||||
Map<String, Object> propertiesMap = (Map<String, Object>) properties;
|
||||
Object propertyValue = propertiesMap.get(identifier);
|
||||
if (propertyValue != null) {
|
||||
return propertyValue;
|
||||
}
|
||||
}
|
||||
|
||||
// 策略4:从 data 字段中获取(适用于某些消息格式)
|
||||
Object data = paramsMap.get("data");
|
||||
if (data instanceof Map) {
|
||||
Map<String, Object> dataMap = (Map<String, Object>) data;
|
||||
Object dataValue = dataMap.get(identifier);
|
||||
if (dataValue != null) {
|
||||
return dataValue;
|
||||
}
|
||||
}
|
||||
|
||||
// 策略5:从 value 字段中获取(适用于单值消息)
|
||||
Object value = paramsMap.get("value");
|
||||
if (value != null) {
|
||||
return value;
|
||||
}
|
||||
|
||||
// 策略6:如果 Map 只有两个字段且包含 identifier,返回另一个字段的值
|
||||
if (paramsMap.size() == 2 && paramsMap.containsKey("identifier")) {
|
||||
for (Map.Entry<String, Object> entry : paramsMap.entrySet()) {
|
||||
if (!"identifier".equals(entry.getKey())) {
|
||||
return entry.getValue();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// 未找到对应的属性值
|
||||
return null;
|
||||
}
|
||||
|
||||
// ========== Topic 相关 ==========
|
||||
|
||||
public static String buildMessageBusGatewayDeviceMessageTopic(String serverId) {
|
||||
return String.format(IotDeviceMessage.MESSAGE_BUS_GATEWAY_DEVICE_MESSAGE_TOPIC, serverId);
|
||||
}
|
||||
|
||||
/**
|
||||
* 生成服务器编号
|
||||
*
|
||||
* @param serverPort 服务器端口
|
||||
* @return 服务器编号
|
||||
*/
|
||||
public static String generateServerId(Integer serverPort) {
|
||||
String serverId = String.format("%s.%d", SystemUtil.getHostInfo().getAddress(), serverPort);
|
||||
// 避免一些场景无法使用 . 符号,例如说 RocketMQ Topic
|
||||
return serverId.replaceAll("\\.", "_");
|
||||
}
|
||||
|
||||
package com.viewsh.module.iot.core.util;
|
||||
|
||||
import cn.hutool.core.lang.Assert;
|
||||
import cn.hutool.core.map.MapUtil;
|
||||
import cn.hutool.core.util.IdUtil;
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
import cn.hutool.system.SystemUtil;
|
||||
import com.viewsh.module.iot.core.enums.IotDeviceMessageMethodEnum;
|
||||
import com.viewsh.module.iot.core.mq.message.IotDeviceMessage;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* IoT 设备【消息】的工具类
|
||||
*
|
||||
* @author 芋道源码
|
||||
*/
|
||||
public class IotDeviceMessageUtils {
|
||||
|
||||
// ========== Message 相关 ==========
|
||||
|
||||
public static String generateMessageId() {
|
||||
return IdUtil.fastSimpleUUID();
|
||||
}
|
||||
|
||||
/**
|
||||
* 是否是上行消息:由设备发送
|
||||
*
|
||||
* @param message 消息
|
||||
* @return 是否
|
||||
*/
|
||||
@SuppressWarnings("SimplifiableConditionalExpression")
|
||||
public static boolean isUpstreamMessage(IotDeviceMessage message) {
|
||||
// 如果是回复消息,则认为是下行消息
|
||||
if (isReplyMessage(message)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
String method = message.getMethod();
|
||||
// JT808 协议方法:terminal 开头为上行,platform 开头为下行
|
||||
if (method != null && method.startsWith("jt808.")) {
|
||||
return method.startsWith("jt808.terminal.");
|
||||
}
|
||||
|
||||
// 标准物模型方法
|
||||
IotDeviceMessageMethodEnum methodEnum = IotDeviceMessageMethodEnum.of(method);
|
||||
Assert.notNull(methodEnum, "无法识别的消息方法:" + method);
|
||||
return methodEnum.getUpstream();
|
||||
}
|
||||
|
||||
/**
|
||||
* 是否是回复消息
|
||||
* <p>
|
||||
* 判断逻辑:code != null 表示是业务层的回复消息
|
||||
*
|
||||
* @param message 消息
|
||||
* @return 是否
|
||||
*/
|
||||
public static boolean isReplyMessage(IotDeviceMessage message) {
|
||||
return message.getCode() != null;
|
||||
}
|
||||
|
||||
/**
|
||||
* 是否跳过业务层回复
|
||||
* <p>
|
||||
* 判断逻辑:skipReply=true 表示协议层已处理回复(如 JT808 通用应答),业务层无需再回复
|
||||
*
|
||||
* @param message 消息
|
||||
* @return 是否
|
||||
*/
|
||||
public static boolean isSkipReply(IotDeviceMessage message) {
|
||||
return Boolean.TRUE.equals(message.getSkipReply());
|
||||
}
|
||||
|
||||
/**
|
||||
* 提取消息中的标识符
|
||||
*
|
||||
* @param message 消息
|
||||
* @return 标识符
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
public static String getIdentifier(IotDeviceMessage message) {
|
||||
if (message.getParams() == null) {
|
||||
return null;
|
||||
}
|
||||
if (StrUtil.equalsAny(message.getMethod(), IotDeviceMessageMethodEnum.EVENT_POST.getMethod(),
|
||||
IotDeviceMessageMethodEnum.SERVICE_INVOKE.getMethod())) {
|
||||
Map<String, Object> params = (Map<String, Object>) message.getParams();
|
||||
return MapUtil.getStr(params, "identifier");
|
||||
} else if (StrUtil.equalsAny(message.getMethod(), IotDeviceMessageMethodEnum.STATE_UPDATE.getMethod())) {
|
||||
Map<String, Object> params = (Map<String, Object>) message.getParams();
|
||||
return MapUtil.getStr(params, "state");
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* 从设备消息中提取指定标识符的属性值
|
||||
* - 支持多种消息格式和属性值提取策略
|
||||
* - 兼容现有的消息结构
|
||||
* - 提供统一的属性值提取接口
|
||||
* <p>
|
||||
* 支持的提取策略(按优先级顺序):
|
||||
* 1. 直接值:如果 params 不是 Map,直接返回该值(适用于简单消息)
|
||||
* 2. 标识符字段:从 params[identifier] 获取
|
||||
* 3. properties 结构:从 params.properties[identifier] 获取(标准属性上报)
|
||||
* 4. data 结构:从 params.data[identifier] 获取
|
||||
* 5. value 字段:从 params.value 获取(单值消息)
|
||||
* 6. 单值 Map:如果 Map 只包含 identifier 和一个值,返回该值
|
||||
*
|
||||
* @param message 设备消息
|
||||
* @param identifier 属性标识符
|
||||
* @return 属性值,如果未找到则返回 null
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
public static Object extractPropertyValue(IotDeviceMessage message, String identifier) {
|
||||
Object params = message.getParams();
|
||||
if (params == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
// 策略1:如果 params 不是 Map,直接返回该值(适用于简单的单属性消息)
|
||||
if (!(params instanceof Map)) {
|
||||
return params;
|
||||
}
|
||||
|
||||
Map<String, Object> paramsMap = (Map<String, Object>) params;
|
||||
|
||||
// 策略2:直接通过标识符获取属性值
|
||||
Object directValue = paramsMap.get(identifier);
|
||||
if (directValue != null) {
|
||||
return directValue;
|
||||
}
|
||||
|
||||
// 策略3:从 properties 字段中获取(适用于标准属性上报消息)
|
||||
Object properties = paramsMap.get("properties");
|
||||
if (properties instanceof Map) {
|
||||
Map<String, Object> propertiesMap = (Map<String, Object>) properties;
|
||||
Object propertyValue = propertiesMap.get(identifier);
|
||||
if (propertyValue != null) {
|
||||
return propertyValue;
|
||||
}
|
||||
}
|
||||
|
||||
// 策略4:从 data 字段中获取(适用于某些消息格式)
|
||||
Object data = paramsMap.get("data");
|
||||
if (data instanceof Map) {
|
||||
Map<String, Object> dataMap = (Map<String, Object>) data;
|
||||
Object dataValue = dataMap.get(identifier);
|
||||
if (dataValue != null) {
|
||||
return dataValue;
|
||||
}
|
||||
}
|
||||
|
||||
// 策略5:从 value 字段中获取(适用于单值消息)
|
||||
Object value = paramsMap.get("value");
|
||||
if (value != null) {
|
||||
return value;
|
||||
}
|
||||
|
||||
// 策略6:如果 Map 只有两个字段且包含 identifier,返回另一个字段的值
|
||||
if (paramsMap.size() == 2 && paramsMap.containsKey("identifier")) {
|
||||
for (Map.Entry<String, Object> entry : paramsMap.entrySet()) {
|
||||
if (!"identifier".equals(entry.getKey())) {
|
||||
return entry.getValue();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// 未找到对应的属性值
|
||||
return null;
|
||||
}
|
||||
|
||||
// ========== Topic 相关 ==========
|
||||
|
||||
public static String buildMessageBusGatewayDeviceMessageTopic(String serverId) {
|
||||
return String.format(IotDeviceMessage.MESSAGE_BUS_GATEWAY_DEVICE_MESSAGE_TOPIC, serverId);
|
||||
}
|
||||
|
||||
/**
|
||||
* 生成服务器编号
|
||||
*
|
||||
* @param serverPort 服务器端口
|
||||
* @return 服务器编号
|
||||
*/
|
||||
public static String generateServerId(Integer serverPort) {
|
||||
String serverId = String.format("%s.%d", SystemUtil.getHostInfo().getAddress(), serverPort);
|
||||
// 避免一些场景无法使用 . 符号,例如说 RocketMQ Topic
|
||||
return serverId.replaceAll("\\.", "_");
|
||||
}
|
||||
|
||||
}
|
||||
@@ -11,7 +11,6 @@ import com.viewsh.module.iot.gateway.codec.jt808.util.Jt808ProtocolUtil;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.time.LocalDateTime;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
@@ -115,36 +114,23 @@ public class IotJt808DeviceMessageCodec implements IotDeviceMessageCodec {
|
||||
int msgId = head.getId();
|
||||
|
||||
// 生成消息ID(使用流水号作为标识)
|
||||
String messageId = head.getTerminalPhone() + "_" + head.getFlowId();
|
||||
String terminalPhone = head.getTerminalPhone();
|
||||
int flowId = head.getFlowId();
|
||||
String messageId = terminalPhone + "_" + flowId;
|
||||
|
||||
// 根据消息ID确定物模型标准方法名
|
||||
String method = getStandardMethodName(msgId);
|
||||
Object params = parseMessageParams(dataPack, msgId);
|
||||
|
||||
// 构建元数据(保留在 params 中,用于调试和追踪)
|
||||
if (params instanceof Map) {
|
||||
@SuppressWarnings("unchecked")
|
||||
// 确保 paramsMap 是可变的,防止 parseMessageParams 返回不可变 Map 导致 put 报错
|
||||
Map<String, Object> paramsMap = new HashMap<>((Map<String, Object>) params);
|
||||
Map<String, Object> metadata = new HashMap<>();
|
||||
metadata.put("jt808MsgId", String.format("0x%04X", msgId));
|
||||
metadata.put("terminalPhone", head.getTerminalPhone());
|
||||
metadata.put("flowId", head.getFlowId());
|
||||
metadata.put("encryptionType", head.getEncryptionType());
|
||||
paramsMap.put("_metadata", metadata);
|
||||
// 更新 params 引用,确保使用的是包含 metadata 的可变 Map
|
||||
params = paramsMap;
|
||||
}
|
||||
// 构建元数据并添加到 params 中(用于调试和追踪)
|
||||
params = enrichParamsWithMetadata(params, msgId, terminalPhone, flowId, head.getEncryptionType());
|
||||
|
||||
// 创建 IotDeviceMessage
|
||||
IotDeviceMessage message = IotDeviceMessage.of(messageId, method, params, null, null, null);
|
||||
message.setReportTime(LocalDateTime.now());
|
||||
return message;
|
||||
return IotDeviceMessage.of(messageId, method, params, null, null, null);
|
||||
}
|
||||
|
||||
/**
|
||||
* 根据 JT808 消息ID获取物模型标准方法名
|
||||
*
|
||||
* <p>
|
||||
* 映射关系:
|
||||
* - 0x0002 心跳 -> jt808.terminal.heartbeat(JT808 专用,不映射到物模型)
|
||||
* - 0x0200 位置上报 -> thing.property.post(属性上报)
|
||||
@@ -214,18 +200,18 @@ public class IotJt808DeviceMessageCodec implements IotDeviceMessageCodec {
|
||||
|
||||
/**
|
||||
* 解析按键事件为事件上报格式(thing.event.post)
|
||||
*
|
||||
* <p>
|
||||
* 物模型标准格式:
|
||||
* {
|
||||
* "identifier": "button_event",
|
||||
* "eventTime": 1234567890,
|
||||
* "params": {
|
||||
* "keyId": 1,
|
||||
* "keyState": 1,
|
||||
* "keyType": "short_press",
|
||||
* "keyNumber": 1,
|
||||
* "isLongPress": false
|
||||
* }
|
||||
* "identifier": "button_event",
|
||||
* "eventTime": 1234567890,
|
||||
* "params": {
|
||||
* "keyId": 1,
|
||||
* "keyState": 1,
|
||||
* "keyType": "short_press",
|
||||
* "keyNumber": 1,
|
||||
* "isLongPress": false
|
||||
* }
|
||||
* }
|
||||
*/
|
||||
private Map<String, Object> parseButtonEventAsEvent(Jt808DataPack dataPack) {
|
||||
@@ -233,20 +219,17 @@ public class IotJt808DeviceMessageCodec implements IotDeviceMessageCodec {
|
||||
|
||||
Map<String, Object> result = new HashMap<>();
|
||||
|
||||
// 统一使用一个事件标识符,通过 isLongPress 参数区分短按和长按
|
||||
// 注意:必须使用 "identifier" 字段,以便 IotDeviceMessageUtils.getIdentifier() 正确提取
|
||||
// 使用 identifier 字段(符合物模型标准格式),用于存储到数据库的 identifier 字段
|
||||
result.put("identifier", "button_event");
|
||||
|
||||
// 事件时间戳
|
||||
result.put("eventTime", System.currentTimeMillis());
|
||||
|
||||
// 事件参数(包含 isLongPress 字段用于区分短按和长按)
|
||||
// 事件参数(只保留 keyId 和 keyState 两个字段)
|
||||
// 通过 keyId 可以判断是长按还是短按:keyId >= 0x0B 为长按
|
||||
Map<String, Object> eventParams = new HashMap<>();
|
||||
eventParams.put("keyId", event.getKeyId());
|
||||
eventParams.put("keyState", event.getKeyState());
|
||||
eventParams.put("keyType", event.getKeyType());
|
||||
eventParams.put("keyNumber", event.getKeyNumber());
|
||||
eventParams.put("isLongPress", event.getIsLongPress());
|
||||
result.put("params", eventParams);
|
||||
|
||||
return result;
|
||||
@@ -254,13 +237,13 @@ public class IotJt808DeviceMessageCodec implements IotDeviceMessageCodec {
|
||||
|
||||
/**
|
||||
* 解析位置信息为属性上报格式(thing.property.post)
|
||||
*
|
||||
* <p>
|
||||
* 物模型标准格式:params 直接就是属性键值对
|
||||
* {
|
||||
* "latitude": 31.123456,
|
||||
* "longitude": 121.123456,
|
||||
* "batteryLevel": 80,
|
||||
* ...
|
||||
* "latitude": 31.123456,
|
||||
* "longitude": 121.123456,
|
||||
* "batteryLevel": 80,
|
||||
* ...
|
||||
* }
|
||||
*/
|
||||
private Map<String, Object> parseLocationInfoAsProperties(Jt808DataPack dataPack) {
|
||||
@@ -430,15 +413,15 @@ public class IotJt808DeviceMessageCodec implements IotDeviceMessageCodec {
|
||||
|
||||
/**
|
||||
* 编码服务调用(thing.service.invoke)
|
||||
*
|
||||
* <p>
|
||||
* 根据服务标识符映射到不同的 JT808 指令
|
||||
*
|
||||
* <p>
|
||||
* 消息格式:
|
||||
* {
|
||||
* "identifier": "服务标识符",
|
||||
* "params": {
|
||||
* // 服务参数
|
||||
* }
|
||||
* "identifier": "服务标识符",
|
||||
* "params": {
|
||||
* // 服务参数
|
||||
* }
|
||||
* }
|
||||
*/
|
||||
private byte[] encodeServiceInvoke(IotDeviceMessage message, String phone, int flowId) {
|
||||
@@ -483,15 +466,15 @@ public class IotJt808DeviceMessageCodec implements IotDeviceMessageCodec {
|
||||
|
||||
/**
|
||||
* 编码属性设置(thing.property.set)
|
||||
*
|
||||
* <p>
|
||||
* 属性设置映射到 JT808 的参数设置指令 (0x8103)
|
||||
*
|
||||
* <p>
|
||||
* 消息格式:
|
||||
* {
|
||||
* "properties": {
|
||||
* "identifier1": value1,
|
||||
* "identifier2": value2
|
||||
* }
|
||||
* "properties": {
|
||||
* "identifier1": value1,
|
||||
* "identifier2": value2
|
||||
* }
|
||||
* }
|
||||
*/
|
||||
private byte[] encodePropertySet(IotDeviceMessage message, String phone, int flowId) {
|
||||
@@ -520,7 +503,7 @@ public class IotJt808DeviceMessageCodec implements IotDeviceMessageCodec {
|
||||
|
||||
/**
|
||||
* 将物模型属性映射到 JT808 参数
|
||||
*
|
||||
* <p>
|
||||
* 映射关系参考 JT808 协议标准:
|
||||
* - 0x0029: 心跳发送间隔
|
||||
* - 0x0027: 位置汇报间隔
|
||||
@@ -560,35 +543,61 @@ public class IotJt808DeviceMessageCodec implements IotDeviceMessageCodec {
|
||||
|
||||
/**
|
||||
* 提取终端手机号
|
||||
*
|
||||
* <p>
|
||||
* 优先级:
|
||||
* 1. 从 params._deviceName 中获取(下发场景,IotTcpDownstreamHandler 自动注入)
|
||||
* 2. 从 params._metadata.terminalPhone 中获取(上行消息回复场景)
|
||||
* 3. 从 params.phone 中获取(手动指定,向下兼容,不推荐)
|
||||
*/
|
||||
private String extractPhoneNumber(IotDeviceMessage message) {
|
||||
if (!(message.getParams() instanceof Map)) {
|
||||
log.error("[extractPhoneNumber][params 不是 Map 类型,消息: {}]", message);
|
||||
throw new IllegalArgumentException("消息参数格式错误,params 必须是 Map 类型");
|
||||
// 尝试从 params 提取
|
||||
if (message.getParams() instanceof Map) {
|
||||
Map<?, ?> params = (Map<?, ?>) message.getParams();
|
||||
String phone = extractPhoneFromMap(params);
|
||||
if (phone != null) {
|
||||
return phone;
|
||||
}
|
||||
}
|
||||
|
||||
Map<?, ?> params = (Map<?, ?>) message.getParams();
|
||||
// 尝试从 data 提取(应答消息使用 replyOf 时,参数在 data 字段)
|
||||
if (message.getData() instanceof Map) {
|
||||
Map<?, ?> data = (Map<?, ?>) message.getData();
|
||||
String phone = extractPhoneFromMap(data);
|
||||
if (phone != null) {
|
||||
return phone;
|
||||
}
|
||||
}
|
||||
|
||||
// 如果都获取不到,抛出异常
|
||||
log.error("[extractPhoneNumber][无法提取终端手机号,params: {}, data: {}]",
|
||||
message.getParams(), message.getData());
|
||||
throw new IllegalArgumentException(
|
||||
"消息中缺少终端手机号。请确保设备的 deviceName 为终端手机号(纯数字),例如: \"13800138000\"");
|
||||
}
|
||||
|
||||
/**
|
||||
* 从 Map 中提取终端手机号
|
||||
*/
|
||||
private String extractPhoneFromMap(Map<?, ?> map) {
|
||||
if (map == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
// 1. 优先从 _deviceName 获取(下发场景,由 IotTcpDownstreamHandler 注入)
|
||||
Object deviceName = params.get("_deviceName");
|
||||
Object deviceName = map.get("_deviceName");
|
||||
if (deviceName != null && StrUtil.isNotBlank(deviceName.toString())) {
|
||||
String deviceNameStr = deviceName.toString().trim();
|
||||
// 验证是否为数字(终端手机号应该是纯数字)
|
||||
if (deviceNameStr.matches("\\d+")) {
|
||||
return deviceNameStr;
|
||||
} else {
|
||||
log.warn("[extractPhoneNumber][_deviceName 不是纯数字: {}]", deviceNameStr);
|
||||
log.warn("[extractPhoneFromMap][_deviceName 不是纯数字: {}]", deviceNameStr);
|
||||
}
|
||||
}
|
||||
|
||||
// 2. 从 metadata 中获取(上行消息回复场景)
|
||||
if (params.get("_metadata") instanceof Map) {
|
||||
Map<?, ?> metadata = (Map<?, ?>) params.get("_metadata");
|
||||
if (map.get("_metadata") instanceof Map) {
|
||||
Map<?, ?> metadata = (Map<?, ?>) map.get("_metadata");
|
||||
Object terminalPhone = metadata.get("terminalPhone");
|
||||
if (terminalPhone != null && StrUtil.isNotBlank(terminalPhone.toString())) {
|
||||
return terminalPhone.toString();
|
||||
@@ -596,7 +605,7 @@ public class IotJt808DeviceMessageCodec implements IotDeviceMessageCodec {
|
||||
}
|
||||
|
||||
// 3. 从 phone 字段获取(向下兼容,不推荐)
|
||||
Object phone = params.get("phone");
|
||||
Object phone = map.get("phone");
|
||||
if (phone != null && StrUtil.isNotBlank(phone.toString())) {
|
||||
String phoneStr = phone.toString().trim();
|
||||
if (phoneStr.matches("\\d+")) {
|
||||
@@ -604,34 +613,30 @@ public class IotJt808DeviceMessageCodec implements IotDeviceMessageCodec {
|
||||
}
|
||||
}
|
||||
|
||||
// 4. 如果都获取不到,抛出异常
|
||||
log.error("[extractPhoneNumber][无法提取终端手机号,params: {}]", params);
|
||||
throw new IllegalArgumentException(
|
||||
"消息中缺少终端手机号。请确保设备的 deviceName 为终端手机号(纯数字),例如: \"13800138000\"");
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* 提取流水号
|
||||
*
|
||||
* <p>
|
||||
* 对于下发消息,如果没有指定流水号,则生成一个随机流水号
|
||||
*/
|
||||
private int extractFlowId(IotDeviceMessage message) {
|
||||
// 尝试从 params 提取
|
||||
if (message.getParams() instanceof Map) {
|
||||
Map<?, ?> params = (Map<?, ?>) message.getParams();
|
||||
|
||||
// 尝试获取显式指定的流水号
|
||||
Object flowId = params.get("flowId");
|
||||
if (flowId instanceof Number) {
|
||||
return ((Number) flowId).intValue();
|
||||
Integer flowId = extractFlowIdFromMap(params);
|
||||
if (flowId != null) {
|
||||
return flowId;
|
||||
}
|
||||
}
|
||||
|
||||
// 尝试从 metadata 中获取(上行消息的流水号)
|
||||
if (params.get("_metadata") instanceof Map) {
|
||||
Map<?, ?> metadata = (Map<?, ?>) params.get("_metadata");
|
||||
Object metaFlowId = metadata.get("flowId");
|
||||
if (metaFlowId instanceof Number) {
|
||||
return ((Number) metaFlowId).intValue();
|
||||
}
|
||||
// 尝试从 data 提取(应答消息使用 replyOf 时,参数在 data 字段)
|
||||
if (message.getData() instanceof Map) {
|
||||
Map<?, ?> data = (Map<?, ?>) message.getData();
|
||||
Integer flowId = extractFlowIdFromMap(data);
|
||||
if (flowId != null) {
|
||||
return flowId;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -639,19 +644,61 @@ public class IotJt808DeviceMessageCodec implements IotDeviceMessageCodec {
|
||||
return (int) (System.currentTimeMillis() % 65535) + 1;
|
||||
}
|
||||
|
||||
/**
|
||||
* 从 Map 中提取流水号
|
||||
*/
|
||||
private Integer extractFlowIdFromMap(Map<?, ?> map) {
|
||||
if (map == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
// 尝试获取显式指定的流水号
|
||||
Object flowId = map.get("flowId");
|
||||
if (flowId instanceof Number) {
|
||||
return ((Number) flowId).intValue();
|
||||
}
|
||||
|
||||
// 尝试从 metadata 中获取(上行消息的流水号)
|
||||
if (map.get("_metadata") instanceof Map) {
|
||||
Map<?, ?> metadata = (Map<?, ?>) map.get("_metadata");
|
||||
Object metaFlowId = metadata.get("flowId");
|
||||
if (metaFlowId instanceof Number) {
|
||||
return ((Number) metaFlowId).intValue();
|
||||
}
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取参数为Map
|
||||
* <p>
|
||||
* 优先从 params 获取,如果 params 为空则从 data 获取(应答消息场景)
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
private Map<String, Object> getParamsAsMap(IotDeviceMessage message) {
|
||||
// 优先从 params 获取
|
||||
if (message.getParams() instanceof Map) {
|
||||
return (Map<String, Object>) message.getParams();
|
||||
}
|
||||
// 尝试JSON转换
|
||||
|
||||
// 从 data 获取(应答消息使用 replyOf 时,参数在 data 字段)
|
||||
if (message.getData() instanceof Map) {
|
||||
return (Map<String, Object>) message.getData();
|
||||
}
|
||||
|
||||
// 尝试JSON转换 params
|
||||
if (message.getParams() != null) {
|
||||
String json = JsonUtils.toJsonString(message.getParams());
|
||||
return JsonUtils.parseObject(json, Map.class);
|
||||
}
|
||||
|
||||
// 尝试JSON转换 data
|
||||
if (message.getData() != null) {
|
||||
String json = JsonUtils.toJsonString(message.getData());
|
||||
return JsonUtils.parseObject(json, Map.class);
|
||||
}
|
||||
|
||||
return new HashMap<>();
|
||||
}
|
||||
|
||||
@@ -682,4 +729,42 @@ public class IotJt808DeviceMessageCodec implements IotDeviceMessageCodec {
|
||||
&& data[data.length - 1] == (byte) Jt808Constants.PKG_DELIMITER;
|
||||
}
|
||||
|
||||
/**
|
||||
* 为参数对象添加元数据
|
||||
*
|
||||
* @param params 原始参数对象
|
||||
* @param msgId JT808 消息ID
|
||||
* @param terminalPhone 终端手机号
|
||||
* @param flowId 流水号
|
||||
* @param encryptionType 加密类型
|
||||
* @return 包含元数据的参数对象
|
||||
*/
|
||||
private Object enrichParamsWithMetadata(Object params, int msgId, String terminalPhone,
|
||||
int flowId, int encryptionType) {
|
||||
// 构建元数据 Map
|
||||
Map<String, Object> metadata = new HashMap<>(4);
|
||||
metadata.put("jt808MsgId", String.format("0x%04X", msgId));
|
||||
metadata.put("terminalPhone", terminalPhone);
|
||||
metadata.put("flowId", flowId);
|
||||
metadata.put("encryptionType", encryptionType);
|
||||
|
||||
// 如果 params 是 Map 类型,则添加元数据
|
||||
if (params instanceof Map) {
|
||||
@SuppressWarnings("unchecked")
|
||||
Map<String, Object> paramsMap = (Map<String, Object>) params;
|
||||
|
||||
// 创建新的可变 Map,确保可以添加元数据
|
||||
// 注意:某些解析方法可能返回不可变的 Map(如 Map.of()),需要转换为可变 Map
|
||||
Map<String, Object> enrichedParams = new HashMap<>(paramsMap);
|
||||
enrichedParams.put("_metadata", metadata);
|
||||
return enrichedParams;
|
||||
}
|
||||
|
||||
// 如果 params 不是 Map,则包装为包含元数据的 Map
|
||||
// 保留原始 params 在 _data 字段中
|
||||
Map<String, Object> wrapper = new HashMap<>(2);
|
||||
wrapper.put("_data", params);
|
||||
wrapper.put("_metadata", metadata);
|
||||
return wrapper;
|
||||
}
|
||||
}
|
||||
@@ -114,8 +114,8 @@ public class Jt808ProtocolHandler extends AbstractProtocolHandler {
|
||||
String productKey, String deviceName, String serverId) {
|
||||
try {
|
||||
// 1. 标记消息为"协议层已应答",业务层无需再发送 reply
|
||||
// 设置 code = 0 后,isReplyMessage() 返回 true,业务层跳过 reply
|
||||
message.setCode(0);
|
||||
// 设置 skipReply = true 后,业务层跳过 reply
|
||||
message.setSkipReply(true);
|
||||
|
||||
// 2. 发送消息到消息总线
|
||||
deviceMessageService.sendDeviceMessage(message, productKey, deviceName, serverId);
|
||||
|
||||
@@ -169,10 +169,12 @@ public class IotDeviceMessageServiceImpl implements IotDeviceMessageService {
|
||||
getSelf().createDeviceLogAsync(message);
|
||||
|
||||
// 3. 回复消息。前提:非 _reply 消息,并且非禁用回复的消息
|
||||
// 条件1:防止对"回复消息"再次回复,避免无限循环
|
||||
// 条件2:某些特定的method不需要回复(如设备状态变更、OTA进度上报)
|
||||
// 条件3(新增):HTTP短连接场景,因为已经在请求中直接响应了,不需要再通过消息总线发送回复
|
||||
if (IotDeviceMessageUtils.isReplyMessage(message)
|
||||
// 条件1:协议层已处理回复(skipReply=true),业务层无需再回复
|
||||
// 条件2:防止对"回复消息"再次回复,避免无限循环
|
||||
// 条件3:某些特定的method不需要回复(如设备状态变更、OTA进度上报)
|
||||
// 条件4:HTTP短连接场景,因为已经在请求中直接响应了,不需要再通过消息总线发送回复
|
||||
if (IotDeviceMessageUtils.isSkipReply(message)
|
||||
|| IotDeviceMessageUtils.isReplyMessage(message)
|
||||
|| IotDeviceMessageMethodEnum.isReplyDisabled(message.getMethod())
|
||||
|| StrUtil.isEmpty(message.getServerId())) {
|
||||
return; // serverId 为空,不记录回复消息
|
||||
|
||||
Reference in New Issue
Block a user