Compare commits

...

3 Commits

Author SHA1 Message Date
lzh
7a81d9e478 fix: JT808事件消息eventId改为identifier
Some checks failed
aiot-platform CI/CD / build-and-deploy (push) Has been cancelled
2025-12-28 00:47:42 +08:00
lzh
ea55347651 fix: 业务层跳过 JT808 协议的回复消息生成
All checks were successful
aiot-platform CI/CD / build-and-deploy (push) Successful in 6m50s
2025-12-28 00:07:10 +08:00
lzh
7561ddf9e9 fix: JT808直接回复通用消息-处理数据上报 2025-12-28 00:06:59 +08:00
3 changed files with 145 additions and 37 deletions

View File

@@ -23,6 +23,8 @@ import com.viewshanghai.module.iot.dal.tdengine.IotDeviceMessageMapper;
import com.viewshanghai.module.iot.service.device.IotDeviceService; import com.viewshanghai.module.iot.service.device.IotDeviceService;
import com.viewshanghai.module.iot.service.device.property.IotDevicePropertyService; import com.viewshanghai.module.iot.service.device.property.IotDevicePropertyService;
import com.viewshanghai.module.iot.service.ota.IotOtaTaskRecordService; import com.viewshanghai.module.iot.service.ota.IotOtaTaskRecordService;
import com.viewshanghai.module.iot.service.product.IotProductService;
import com.viewshanghai.module.iot.dal.dataobject.product.IotProductDO;
import com.baomidou.mybatisplus.core.metadata.IPage; import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.google.common.base.Objects; import com.google.common.base.Objects;
@@ -66,6 +68,9 @@ public class IotDeviceMessageServiceImpl implements IotDeviceMessageService {
@Resource @Resource
private IotDeviceMessageProducer deviceMessageProducer; private IotDeviceMessageProducer deviceMessageProducer;
@Resource
private IotProductService productService;
@Override @Override
public void defineDeviceMessageStable() { public void defineDeviceMessageStable() {
if (StrUtil.isNotEmpty(deviceMessageMapper.showSTable())) { if (StrUtil.isNotEmpty(deviceMessageMapper.showSTable())) {
@@ -172,11 +177,21 @@ public class IotDeviceMessageServiceImpl implements IotDeviceMessageService {
// 条件1防止对"回复消息"再次回复,避免无限循环 // 条件1防止对"回复消息"再次回复,避免无限循环
// 条件2某些特定的method不需要回复如设备状态变更、OTA进度上报 // 条件2某些特定的method不需要回复如设备状态变更、OTA进度上报
// 条件3新增HTTP短连接场景因为已经在请求中直接响应了不需要再通过消息总线发送回复 // 条件3新增HTTP短连接场景因为已经在请求中直接响应了不需要再通过消息总线发送回复
// 条件4新增JT808 协议由协议处理器自动生成通用应答0x8001业务层不需要生成回复消息
if (IotDeviceMessageUtils.isReplyMessage(message) if (IotDeviceMessageUtils.isReplyMessage(message)
|| IotDeviceMessageMethodEnum.isReplyDisabled(message.getMethod()) || IotDeviceMessageMethodEnum.isReplyDisabled(message.getMethod())
|| StrUtil.isEmpty(message.getServerId())) { || StrUtil.isEmpty(message.getServerId())) {
return; // serverId 为空,不记录回复消息 return; // serverId 为空,不记录回复消息
} }
// 检查设备的编解码类型,如果是 JT808则跳过回复消息生成
// JT808 协议的回复由协议处理器Jt808ProtocolHandler自动生成通用应答
IotProductDO product = productService.getProductFromCache(device.getProductId());
if (product != null && "JT808".equals(product.getCodecType())) {
log.debug("[handleUpstreamDeviceMessage][JT808 协议消息,跳过业务层回复消息生成,由协议处理器自动生成通用应答]");
return;
}
try { try {
IotDeviceMessage replyMessage = IotDeviceMessage.replyOf(message.getRequestId(), message.getMethod(), replyData, IotDeviceMessage replyMessage = IotDeviceMessage.replyOf(message.getRequestId(), message.getMethod(), replyData,
serviceException != null ? serviceException.getCode() : null, serviceException != null ? serviceException.getCode() : null,

View File

@@ -68,8 +68,12 @@ public class IotJt808DeviceMessageCodec implements IotDeviceMessageCodec {
case "jt808.platform.textDown", "textDown" -> encodeTextDown(message, phone, flowId); case "jt808.platform.textDown", "textDown" -> encodeTextDown(message, phone, flowId);
// === 上行消息方法(不应用于下行) === // === 上行消息方法(不应用于下行) ===
// 注意:正常情况下不应该到达这里,因为:
// 1. 业务层已跳过 JT808 协议的回复消息生成
// 2. 协议处理器会自动生成通用应答
// 这里保留作为防御性检查
case "thing.property.post", "thing.property.report" -> { case "thing.property.post", "thing.property.report" -> {
log.warn("[encode][JT808 协议不支持下行属性上报方法: {},请使用 thing.property.set 进行属性设置]", method); log.warn("[encode][JT808 协议不支持下行属性上报方法: {},请使用 thing.property.set 进行属性设置。如果这是回复消息,应使用 jt808.platform.commonResp 方法]", method);
yield new byte[0]; yield new byte[0];
} }
@@ -84,6 +88,16 @@ public class IotJt808DeviceMessageCodec implements IotDeviceMessageCodec {
} }
} }
/**
* 判断是否为上行消息方法
*/
private boolean isUpstreamMethod(String method) {
return "thing.property.post".equals(method)
|| "thing.property.report".equals(method)
|| "thing.event.post".equals(method)
|| "thing.state.update".equals(method);
}
@Override @Override
public IotDeviceMessage decode(byte[] bytes) { public IotDeviceMessage decode(byte[] bytes) {
Assert.notNull(bytes, "待解码数据不能为空"); Assert.notNull(bytes, "待解码数据不能为空");
@@ -259,38 +273,41 @@ public class IotJt808DeviceMessageCodec implements IotDeviceMessageCodec {
/** /**
* 解析按键事件为事件上报格式thing.event.post * 解析按键事件为事件上报格式thing.event.post
* * <p>
* 物模型标准格式 * 根据 JT808 协议
* - 短按keyId = 0x01-0x0A对应1-10号键keyState = 按键次数
* - 长按keyId = 0x0B-0x0E对应长按1-4号键keyState = 按键次数
* <p>
* 物模型标准格式(简化版,只保留 keyId 和 keyState
* { * {
* "eventId": "button_event", * "identifier": "button_event", // 事件标识符,用于存储到数据库的 identifier 字段
* "eventTime": 1234567890, * "eventTime": 1234567890,
* "params": { * "params": {
* "keyId": 1, * "keyId": 1, // 0x01=短按1号键, 0x0B=长按1号键
* "keyState": 1, * "keyState": 1 // 按键次数0x01=按键一次)
* "keyType": "short_press",
* "keyNumber": 1,
* "isLongPress": false
* } * }
* } * }
* <p>
* 判断长按/短按:通过 keyId 判断
* - keyId < 0x0B短按
* - keyId >= 0x0B长按
*/ */
private Map<String, Object> parseButtonEventAsEvent(Jt808DataPack dataPack) { private Map<String, Object> parseButtonEventAsEvent(Jt808DataPack dataPack) {
Jt808ButtonEvent event = decoder.toButtonEventMsg(dataPack); Jt808ButtonEvent event = decoder.toButtonEventMsg(dataPack);
Map<String, Object> result = new HashMap<>(); Map<String, Object> result = new HashMap<>();
// 统一使用一个事件标识符,通过 isLongPress 参数区分短按和长按 // 使用 identifier 字段(符合物模型标准格式),用于存储到数据库的 identifier 字段
result.put("eventId", "button_event"); result.put("identifier", "button_event");
// 事件时间戳 // 事件时间戳
result.put("eventTime", System.currentTimeMillis()); result.put("eventTime", System.currentTimeMillis());
// 事件参数(包含 isLongPress 字段用于区分短按和长按 // 事件参数(只保留 keyId 和 keyState 两个字段
// 通过 keyId 可以判断是长按还是短按keyId >= 0x0B 为长按
Map<String, Object> eventParams = new HashMap<>(); Map<String, Object> eventParams = new HashMap<>();
eventParams.put("keyId", event.getKeyId()); eventParams.put("keyId", event.getKeyId());
eventParams.put("keyState", event.getKeyState()); eventParams.put("keyState", event.getKeyState());
eventParams.put("keyType", event.getKeyType());
eventParams.put("keyNumber", event.getKeyNumber());
eventParams.put("isLongPress", event.getIsLongPress());
result.put("params", eventParams); result.put("params", eventParams);
return result; return result;

View File

@@ -113,11 +113,109 @@ public class Jt808ProtocolHandler extends AbstractProtocolHandler {
log.info("[handleBusinessMessage][JT808 业务消息已发送clientId: {}, method: {}, messageId: {}]", log.info("[handleBusinessMessage][JT808 业务消息已发送clientId: {}, method: {}, messageId: {}]",
clientId, message.getMethod(), message.getId()); clientId, message.getMethod(), message.getId());
// JT808 协议要求对每个上行消息都要回复通用应答0x8001
// 从消息的 requestId 中提取流水号和消息ID格式terminalPhone_flowId_timestamp
// 从消息的 metadata 中提取终端手机号和原始消息ID
String terminalPhone = extractTerminalPhone(message);
int flowId = extractFlowId(message);
int msgId = extractJt808MsgId(message);
if (terminalPhone != null && flowId > 0 && msgId > 0) {
// 发送通用应答(成功)
sendCommonResp(socket, terminalPhone, flowId, msgId, (byte) 0, codecType, message.getRequestId());
} else {
log.warn("[handleBusinessMessage][无法提取 JT808 消息信息跳过通用应答terminalPhone: {}, flowId: {}, msgId: {}]",
terminalPhone, flowId, msgId);
}
} catch (Exception e) { } catch (Exception e) {
log.error("[handleBusinessMessage][JT808 业务消息处理异常clientId: {}]", clientId, e); log.error("[handleBusinessMessage][JT808 业务消息处理异常clientId: {}]", clientId, e);
} }
} }
/**
* 从消息中提取 JT808 流水号
* <p>
* 提取优先级:
* 1. 从 metadata.flowId 中提取(最可靠)
* 2. 从 requestId 中提取格式terminalPhone_flowId_timestamp
*/
@SuppressWarnings("unchecked")
private int extractFlowId(IotDeviceMessage message) {
// 优先从 metadata 中提取
if (message.getParams() instanceof Map) {
Map<?, ?> params = (Map<?, ?>) message.getParams();
if (params.get("_metadata") instanceof Map) {
Map<?, ?> metadata = (Map<?, ?>) params.get("_metadata");
Object flowId = metadata.get("flowId");
if (flowId instanceof Number) {
return ((Number) flowId).intValue();
}
}
}
// 如果 metadata 中没有,尝试从 requestId 中提取
String requestId = message.getRequestId();
if (requestId != null && requestId.contains("_")) {
try {
String[] parts = requestId.split("_");
if (parts.length >= 2) {
return Integer.parseInt(parts[1]);
}
} catch (Exception e) {
log.debug("[extractFlowId][从 requestId 提取流水号失败: {}]", requestId);
}
}
return 0;
}
/**
* 从消息中提取 JT808 消息ID
* 从 metadata 中获取原始 JT808 消息ID格式0x%04X如 "0x0200"
*/
@SuppressWarnings("unchecked")
private int extractJt808MsgId(IotDeviceMessage message) {
Object params = message.getParams();
if (params instanceof Map) {
Map<String, Object> paramsMap = (Map<String, Object>) params;
// 尝试从 _metadata 中获取 jt808MsgId格式0x%04X
Object metadata = paramsMap.get("_metadata");
if (metadata instanceof Map) {
Map<String, Object> metadataMap = (Map<String, Object>) metadata;
Object jt808MsgIdObj = metadataMap.get("jt808MsgId");
if (jt808MsgIdObj instanceof String) {
String jt808MsgIdStr = (String) jt808MsgIdObj;
try {
// 解析 "0x0200" 格式的字符串
if (jt808MsgIdStr.startsWith("0x") || jt808MsgIdStr.startsWith("0X")) {
return Integer.parseInt(jt808MsgIdStr.substring(2), 16);
} else {
return Integer.parseInt(jt808MsgIdStr, 16);
}
} catch (NumberFormatException e) {
log.debug("[extractJt808MsgId][解析 jt808MsgId 失败: {}]", jt808MsgIdStr);
}
}
}
}
// 如果无法从 metadata 中提取,尝试根据 method 映射到 JT808 消息ID
String method = message.getMethod();
return mapMethodToJt808MsgId(method);
}
/**
* 将物模型方法名映射到 JT808 消息ID
*/
private int mapMethodToJt808MsgId(String method) {
return switch (method) {
case "thing.property.post", "thing.property.report" -> 0x0200; // 位置信息汇报
case "thing.event.post" -> 0x0006; // 按键事件
case "thing.state.update" -> 0x0002; // 心跳
default -> 0x0001; // 默认使用通用应答
};
}
@Override @Override
public void sendResponse(NetSocket socket, IotDeviceMessage originalMessage, public void sendResponse(NetSocket socket, IotDeviceMessage originalMessage,
boolean success, String message, String codecType) { boolean success, String message, String codecType) {
@@ -522,28 +620,6 @@ public class Jt808ProtocolHandler extends AbstractProtocolHandler {
return null; return null;
} }
/**
* 从消息中提取流水号
* <p>
* 流水号存储在消息的 _metadata.flowId 字段中
*
* @param message 设备消息
* @return 流水号,提取失败返回 1
*/
@SuppressWarnings("unchecked")
private int extractFlowId(IotDeviceMessage message) {
if (message.getParams() instanceof Map) {
Map<?, ?> params = (Map<?, ?>) message.getParams();
if (params.get("_metadata") instanceof Map) {
Map<?, ?> metadata = (Map<?, ?>) params.get("_metadata");
Object flowId = metadata.get("flowId");
if (flowId instanceof Number) {
return ((Number) flowId).intValue();
}
}
}
return 1;
}
/** /**
* 从消息中提取鉴权码 * 从消息中提取鉴权码