Compare commits

...

12 Commits

Author SHA1 Message Date
lzh
016a1e18b7 Merge remote-tracking branch 'origin/cleaning' into cleaning
# Conflicts:
#	viewshanghai-module-iot/viewshanghai-module-iot-biz/src/main/java/com/viewshanghai/module/iot/service/device/message/IotDeviceMessageServiceImpl.java
#	viewshanghai-module-iot/viewshanghai-module-iot-gateway/src/main/java/com/viewshanghai/module/iot/gateway/codec/jt808/IotJt808DeviceMessageCodec.java
#	viewshanghai-module-iot/viewshanghai-module-iot-gateway/src/main/java/com/viewshanghai/module/iot/gateway/protocol/tcp/handler/Jt808ProtocolHandler.java
2025-12-30 13:36:19 +08:00
lzh
7a81d9e478 fix: JT808事件消息eventId改为identifier
Some checks failed
aiot-platform CI/CD / build-and-deploy (push) Has been cancelled
2025-12-28 00:47:42 +08:00
lzh
ea55347651 fix: 业务层跳过 JT808 协议的回复消息生成
All checks were successful
aiot-platform CI/CD / build-and-deploy (push) Successful in 6m50s
2025-12-28 00:07:10 +08:00
lzh
7561ddf9e9 fix: JT808直接回复通用消息-处理数据上报 2025-12-28 00:06:59 +08:00
lzh
5489125cfa fix: docker部署打开8091端口 2025-12-26 16:17:36 +08:00
lzh
2f4c836d1b chore: iot-gateway的TCP连接打开 2025-12-26 14:27:47 +08:00
lzh
d1d7eec5c9 chore: JT808/2013协议解析融合到现有TCP连接体系,对这个协议的注册鉴权做了定制化改造(注册返回鉴权码(返回phone),鉴权凭借鉴权码) 2025-12-26 14:26:30 +08:00
lzh
1886afc150 chore: JT808/2013协议解析(TCP连接) 2025-12-26 14:23:16 +08:00
lzh
861a78c092 fix: 修复http上报数据自动返回消息的bug 2025-12-25 15:22:50 +08:00
lzh
03a569c269 chore: 1、设备接入验证增加(一型一密PRODUCT_SECRET、无鉴权NONE)
2、产品增加productSecret,authType
3、authType优先级 - 设备级 》 产品级
ps:暂未实现动态注册
2025-12-25 15:22:13 +08:00
lzh
9d6b53c4f6 chore: 客流计数器对接-数据解析、自定义返回DTO 2025-12-25 15:16:53 +08:00
lzh
4066d65da2 fix: 修复插入设备消息覆盖的bug(将TSDB插入device_message的ts字段从 NOW 改为 report_time) 2025-12-25 11:18:39 +08:00
2 changed files with 69 additions and 252 deletions

View File

@@ -58,7 +58,7 @@ public class IotJt808DeviceMessageCodec implements IotDeviceMessageCodec {
// 根据方法名路由到不同的编码器
return switch (method) {
// === 标准物模型方法(下行) ===
// === 标准物模型方法 ===
case "thing.service.invoke" -> encodeServiceInvoke(message, phone, flowId); // 服务调用
case "thing.property.set" -> encodePropertySet(message, phone, flowId); // 属性设置
@@ -67,14 +67,8 @@ public class IotJt808DeviceMessageCodec implements IotDeviceMessageCodec {
case "jt808.platform.registerResp", "registerResp" -> encodeRegisterResp(message, phone, flowId);
case "jt808.platform.textDown", "textDown" -> encodeTextDown(message, phone, flowId);
// === 上行消息方法(不应用于下行) ===
case "thing.property.post", "thing.property.report" -> {
log.warn("[encode][JT808 协议不支持下行属性上报方法: {},请使用 thing.property.set 进行属性设置]", method);
yield new byte[0];
}
default -> {
log.warn("[encode][不支持的消息方法: {}JT808 协议仅支持下行方法: thing.service.invoke, thing.property.set]", method);
log.warn("[encode][不支持的消息方法: {}]", method);
yield new byte[0];
}
};
@@ -88,27 +82,15 @@ public class IotJt808DeviceMessageCodec implements IotDeviceMessageCodec {
public IotDeviceMessage decode(byte[] bytes) {
Assert.notNull(bytes, "待解码数据不能为空");
Assert.isTrue(bytes.length >= 12, "数据包长度不足");
try {
// 1. 验证并去除首尾标识符 0x7e
// JT808格式: 0x7e + 消息头 + 消息体 + 校验码 + 0x7e
if (bytes[0] != (byte) 0x7e || bytes[bytes.length - 1] != (byte) 0x7e) {
throw new IllegalArgumentException("JT808消息格式错误缺少首尾标识符0x7e");
}
// 2. 反转义处理处理0x7d01->0x7d, 0x7d02->0x7e
// doEscape4Receive会保留start之前和end之后的字节所以传入1和length-1来处理中间部分
// 1. 反转义(去除首尾标识符
byte[] unescapedBytes = protocolUtil.doEscape4Receive(bytes, 1, bytes.length - 1);
// 3. 从反转义后的数据中提取实际内容去掉首尾的0x7e
// doEscape4Receive保留了首尾的0x7e需要再次去除
byte[] actualData = new byte[unescapedBytes.length - 2];
System.arraycopy(unescapedBytes, 1, actualData, 0, actualData.length);
// 4. 解析为 JT808 数据包此时数据已经是纯净的消息内容不含0x7e
Jt808DataPack dataPack = decoder.bytes2PackageData(actualData);
// 5. 转换为统一的 IotDeviceMessage
// 2. 解析为 JT808 数据包
Jt808DataPack dataPack = decoder.bytes2PackageData(unescapedBytes);
// 3. 转换为统一的 IotDeviceMessage
return convertToIotDeviceMessage(dataPack);
} catch (Exception e) {
log.error("[decode][JT808 消息解码失败,数据长度: {}]", bytes.length, e);
@@ -118,74 +100,34 @@ public class IotJt808DeviceMessageCodec implements IotDeviceMessageCodec {
/**
* 将 JT808 数据包转换为 IotDeviceMessage
*
* @param dataPack JT808 数据包
* @return IotDeviceMessage 消息对象
*/
private IotDeviceMessage convertToIotDeviceMessage(Jt808DataPack dataPack) {
Assert.notNull(dataPack, "JT808 数据包不能为空");
Jt808DataPack.PackHead head = dataPack.getPackHead();
Assert.notNull(head, "JT808 数据包头部不能为空");
int msgId = head.getId();
String terminalPhone = head.getTerminalPhone();
int flowId = head.getFlowId();
// 生成请求ID用于追踪和关联格式终端手机号_流水号_时间戳
// 添加时间戳确保唯一性,避免同一终端同一流水号重复
String requestId = String.format("%s_%d_%d", terminalPhone, flowId, System.currentTimeMillis());
// 生成消息ID使用流水号作为标识
String messageId = head.getTerminalPhone() + "_" + head.getFlowId();
// 根据消息ID确定物模型标准方法名
String method = getStandardMethodName(msgId);
// 解析消息参数
Object params = parseMessageParams(dataPack, msgId);
// 构建元数据并添加到 params 中用于调试和追踪)
params = enrichParamsWithMetadata(params, msgId, terminalPhone, flowId, head.getEncryptionType());
// 创建 IotDeviceMessageof 方法会自动生成 id 和 reportTime
return IotDeviceMessage.of(requestId, method, params, null, null, null);
}
/**
* 为参数对象添加元数据
*
* @param params 原始参数对象
* @param msgId JT808 消息ID
* @param terminalPhone 终端手机号
* @param flowId 流水号
* @param encryptionType 加密类型
* @return 包含元数据的参数对象
*/
private Object enrichParamsWithMetadata(Object params, int msgId, String terminalPhone,
int flowId, int encryptionType) {
// 构建元数据 Map
Map<String, Object> metadata = new HashMap<>(4);
metadata.put("jt808MsgId", String.format("0x%04X", msgId));
metadata.put("terminalPhone", terminalPhone);
metadata.put("flowId", flowId);
metadata.put("encryptionType", encryptionType);
// 如果 params 是 Map 类型,则添加元数据
// 构建元数据(保留在 params 中用于调试和追踪)
if (params instanceof Map) {
@SuppressWarnings("unchecked")
Map<String, Object> paramsMap = (Map<String, Object>) params;
// 创建新的可变 Map确保可以添加元数据
// 注意:某些解析方法可能返回不可变的 Map如 Map.of()),需要转换为可变 Map
Map<String, Object> enrichedParams = new HashMap<>(paramsMap);
enrichedParams.put("_metadata", metadata);
return enrichedParams;
Map<String, Object> metadata = new HashMap<>();
metadata.put("jt808MsgId", String.format("0x%04X", msgId));
metadata.put("terminalPhone", head.getTerminalPhone());
metadata.put("flowId", head.getFlowId());
metadata.put("encryptionType", head.getEncryptionType());
paramsMap.put("_metadata", metadata);
}
// 如果 params 不是 Map则包装为包含元数据的 Map
// 保留原始 params 在 _data 字段中
Map<String, Object> wrapper = new HashMap<>(2);
wrapper.put("_data", params);
wrapper.put("_metadata", metadata);
return wrapper;
// 创建 IotDeviceMessage
IotDeviceMessage message = IotDeviceMessage.of(messageId, method, params, null, null, null);
message.setReportTime(LocalDateTime.now());
return message;
}
/**
@@ -604,163 +546,98 @@ public class IotJt808DeviceMessageCodec implements IotDeviceMessageCodec {
/**
* 提取终端手机号
*
*
* 优先级:
* 1. 从 params._deviceName 中获取下发场景IotTcpDownstreamHandler 自动注入)
* 2. 从 params._metadata.terminalPhone 中获取(上行消息回复场景)
* 3. 从 params.phone 中获取(手动指定,向下兼容,不推荐)
* 4. 从 data.phone 中获取(应答消息场景,使用 replyOf 构建的消息)
*/
private String extractPhoneNumber(IotDeviceMessage message) {
// 尝试从 params 提取
if (message.getParams() instanceof Map) {
Map<?, ?> params = (Map<?, ?>) message.getParams();
String phone = extractPhoneFromMap(params);
if (phone != null) {
return phone;
}
if (!(message.getParams() instanceof Map)) {
log.error("[extractPhoneNumber][params 不是 Map 类型,消息: {}]", message);
throw new IllegalArgumentException("消息参数格式错误params 必须是 Map 类型");
}
// 尝试从 data 提取(应答消息使用 replyOf 时,参数在 data 字段)
if (message.getData() instanceof Map) {
Map<?, ?> data = (Map<?, ?>) message.getData();
String phone = extractPhoneFromMap(data);
if (phone != null) {
return phone;
}
}
// 如果都获取不到,抛出异常
log.error("[extractPhoneNumber][无法提取终端手机号params: {}, data: {}]",
message.getParams(), message.getData());
throw new IllegalArgumentException(
"消息中缺少终端手机号。请确保设备的 deviceName 为终端手机号(纯数字),例如: \"13800138000\"");
}
/**
* 从 Map 中提取终端手机号
*/
private String extractPhoneFromMap(Map<?, ?> map) {
if (map == null) {
return null;
}
Map<?, ?> params = (Map<?, ?>) message.getParams();
// 1. 优先从 _deviceName 获取(下发场景,由 IotTcpDownstreamHandler 注入)
Object deviceName = map.get("_deviceName");
Object deviceName = params.get("_deviceName");
if (deviceName != null && StrUtil.isNotBlank(deviceName.toString())) {
String deviceNameStr = deviceName.toString().trim();
// 验证是否为数字(终端手机号应该是纯数字)
if (deviceNameStr.matches("\\d+")) {
return deviceNameStr;
} else {
log.warn("[extractPhoneFromMap][_deviceName 不是纯数字: {}]", deviceNameStr);
log.warn("[extractPhoneNumber][_deviceName 不是纯数字: {}]", deviceNameStr);
}
}
// 2. 从 metadata 中获取(上行消息回复场景)
if (map.get("_metadata") instanceof Map) {
Map<?, ?> metadata = (Map<?, ?>) map.get("_metadata");
if (params.get("_metadata") instanceof Map) {
Map<?, ?> metadata = (Map<?, ?>) params.get("_metadata");
Object terminalPhone = metadata.get("terminalPhone");
if (terminalPhone != null && StrUtil.isNotBlank(terminalPhone.toString())) {
return terminalPhone.toString();
}
}
// 3. 从 phone 字段获取(向下兼容,不推荐)
Object phone = map.get("phone");
Object phone = params.get("phone");
if (phone != null && StrUtil.isNotBlank(phone.toString())) {
String phoneStr = phone.toString().trim();
if (phoneStr.matches("\\d+")) {
return phoneStr;
}
}
return null;
// 4. 如果都获取不到,抛出异常
log.error("[extractPhoneNumber][无法提取终端手机号params: {}]", params);
throw new IllegalArgumentException(
"消息中缺少终端手机号。请确保设备的 deviceName 为终端手机号(纯数字),例如: \"13800138000\"");
}
/**
* 提取流水号
*
*
* 对于下发消息,如果没有指定流水号,则生成一个随机流水号
*/
private int extractFlowId(IotDeviceMessage message) {
// 尝试从 params 提取
if (message.getParams() instanceof Map) {
Map<?, ?> params = (Map<?, ?>) message.getParams();
Integer flowId = extractFlowIdFromMap(params);
if (flowId != null) {
return flowId;
// 尝试获取显式指定的流水号
Object flowId = params.get("flowId");
if (flowId instanceof Number) {
return ((Number) flowId).intValue();
}
// 尝试从 metadata 中获取(上行消息的流水号)
if (params.get("_metadata") instanceof Map) {
Map<?, ?> metadata = (Map<?, ?>) params.get("_metadata");
Object metaFlowId = metadata.get("flowId");
if (metaFlowId instanceof Number) {
return ((Number) metaFlowId).intValue();
}
}
}
// 尝试从 data 提取(应答消息使用 replyOf 时,参数在 data 字段)
if (message.getData() instanceof Map) {
Map<?, ?> data = (Map<?, ?>) message.getData();
Integer flowId = extractFlowIdFromMap(data);
if (flowId != null) {
return flowId;
}
}
// 生成随机流水号1-65535
return (int) (System.currentTimeMillis() % 65535) + 1;
}
/**
* 从 Map 中提取流水号
*/
private Integer extractFlowIdFromMap(Map<?, ?> map) {
if (map == null) {
return null;
}
// 尝试获取显式指定的流水号
Object flowId = map.get("flowId");
if (flowId instanceof Number) {
return ((Number) flowId).intValue();
}
// 尝试从 metadata 中获取(上行消息的流水号)
if (map.get("_metadata") instanceof Map) {
Map<?, ?> metadata = (Map<?, ?>) map.get("_metadata");
Object metaFlowId = metadata.get("flowId");
if (metaFlowId instanceof Number) {
return ((Number) metaFlowId).intValue();
}
}
return null;
}
/**
* 获取参数为Map
*
* 优先从 params 获取,如果 params 为空则从 data 获取(应答消息场景)
*/
@SuppressWarnings("unchecked")
private Map<String, Object> getParamsAsMap(IotDeviceMessage message) {
// 优先从 params 获取
if (message.getParams() instanceof Map) {
return (Map<String, Object>) message.getParams();
}
// 从 data 获取(应答消息使用 replyOf 时,参数在 data 字段)
if (message.getData() instanceof Map) {
return (Map<String, Object>) message.getData();
}
// 尝试JSON转换 params
// 尝试JSON转换
if (message.getParams() != null) {
String json = JsonUtils.toJsonString(message.getParams());
return JsonUtils.parseObject(json, Map.class);
}
// 尝试JSON转换 data
if (message.getData() != null) {
String json = JsonUtils.toJsonString(message.getData());
return JsonUtils.parseObject(json, Map.class);
}
return new HashMap<>();
}

View File

@@ -7,7 +7,6 @@ import com.viewshanghai.framework.common.pojo.CommonResult;
import com.viewshanghai.module.iot.core.biz.IotDeviceCommonApi;
import com.viewshanghai.module.iot.core.biz.dto.IotDeviceGetReqDTO;
import com.viewshanghai.module.iot.core.biz.dto.IotDeviceRespDTO;
import com.viewshanghai.module.iot.core.enums.IotAuthTypeEnum;
import com.viewshanghai.module.iot.core.mq.message.IotDeviceMessage;
import com.viewshanghai.module.iot.gateway.codec.jt808.IotJt808DeviceMessageCodec;
import com.viewshanghai.module.iot.gateway.service.device.message.IotDeviceMessageService;
@@ -246,30 +245,22 @@ public class Jt808ProtocolHandler extends AbstractProtocolHandler {
return AuthResult.failure("设备不存在");
}
// 3. 从 Redis 获取鉴权码(优先使用注册时缓存的鉴权码)
// 3. 从 Redis 获取鉴权码
String redisKey = String.format(JT808_AUTH_TOKEN, terminalPhone);
String cachedAuthToken = stringRedisTemplate.opsForValue().get(redisKey);
boolean authSuccess = false;
if (StrUtil.isNotBlank(cachedAuthToken)) {
// 3.1 如果找到缓存,验证鉴权码是否匹配
if (authCode.equals(cachedAuthToken)) {
authSuccess = true;
log.debug("[handleAuth][使用缓存的鉴权码验证成功,终端手机号: {}]", terminalPhone);
} else {
log.warn("[handleAuth][缓存的鉴权码验证失败,终端手机号: {}, 期望: {}, 实际: {}]",
terminalPhone, cachedAuthToken, authCode);
}
} else {
// 3.2 如果未找到缓存,尝试直接验证(支持跳过注册步骤)
log.info("[handleAuth][未找到鉴权码缓存,尝试直接验证,终端手机号: {}]", terminalPhone);
authSuccess = validateAuthCodeDirectly(device, authCode, terminalPhone);
if (StrUtil.isBlank(cachedAuthToken)) {
log.warn("[handleAuth][未找到鉴权码缓存,终端手机号: {},可能未注册或缓存已过期]", terminalPhone);
sendCommonResp(socket, terminalPhone, flowId, 0x0102, (byte) 1, codecType, message.getRequestId());
return AuthResult.failure("未找到鉴权码,请先注册");
}
if (!authSuccess) {
// 验证鉴权码是否匹配Redis 自动处理过期,无需手动检查)
if (!authCode.equals(cachedAuthToken)) {
log.warn("[handleAuth][鉴权码验证失败,终端手机号: {}, 期望: {}, 实际: {}]",
terminalPhone, cachedAuthToken, authCode);
sendCommonResp(socket, terminalPhone, flowId, 0x0102, (byte) 1, codecType, message.getRequestId());
return AuthResult.failure("鉴权码验证失败");
return AuthResult.failure("鉴权码错误");
}
// 4. 发送通用应答成功result_code=0
@@ -298,58 +289,7 @@ public class Jt808ProtocolHandler extends AbstractProtocolHandler {
}
}
// ========== 鉴权码生成和验证策略 ==========
/**
* 直接验证鉴权码(跳过注册步骤)
* <p>
* 当设备未注册或缓存过期时,根据设备的认证类型直接验证鉴权码
*
* @param device 设备信息
* @param authCode 设备发送的鉴权码
* @param terminalPhone 终端手机号
* @return true-验证成功false-验证失败
*/
private boolean validateAuthCodeDirectly(IotDeviceRespDTO device, String authCode, String terminalPhone) {
try {
// 获取生效的认证类型(设备级优先,否则使用产品级)
String effectiveAuthType = StrUtil.isNotBlank(device.getAuthType())
? device.getAuthType()
: device.getProductAuthType();
if (StrUtil.isBlank(effectiveAuthType)) {
log.warn("[validateAuthCodeDirectly][认证类型为空,使用默认策略(终端手机号),终端手机号: {}]", terminalPhone);
// 默认策略:使用终端手机号作为鉴权码
return terminalPhone.equals(authCode);
}
// 根据认证类型验证
if (IotAuthTypeEnum.NONE.getType().equals(effectiveAuthType)) {
// 免鉴权:直接通过(或使用终端手机号验证)
log.debug("[validateAuthCodeDirectly][免鉴权模式,终端手机号: {}]", terminalPhone);
return true;
} else if (IotAuthTypeEnum.SECRET.getType().equals(effectiveAuthType)) {
// 一机一密:使用设备的 deviceSecret
// String deviceSecret = device.getDeviceSecret();
return terminalPhone.equals(authCode);
} else if (IotAuthTypeEnum.PRODUCT_SECRET.getType().equals(effectiveAuthType)) {
// 一型一密:使用产品的 productSecret
// 注意:设备信息中可能没有 productSecret需要从产品获取
// 这里暂时使用终端手机号作为回退策略
log.warn("[validateAuthCodeDirectly][一型一密需要产品密钥,当前暂不支持直接验证,终端手机号: {},使用终端手机号作为回退", terminalPhone);
return terminalPhone.equals(authCode);
} else {
// 其他类型:使用终端手机号作为回退策略
log.warn("[validateAuthCodeDirectly][未知认证类型: {},使用终端手机号作为回退,终端手机号: {}]",
effectiveAuthType, terminalPhone);
return terminalPhone.equals(authCode);
}
} catch (Exception e) {
log.error("[validateAuthCodeDirectly][直接验证鉴权码异常,终端手机号: {}]", terminalPhone, e);
return false;
}
}
// ========== 鉴权码生成策略 ==========
/**
* 生成鉴权码