From 0cfd659bd83cc1124d08b9122bb45e1deead3190 Mon Sep 17 00:00:00 2001 From: lzh Date: Fri, 16 Jan 2026 13:24:39 +0800 Subject: [PATCH] fix(iot): fix UnsupportedOperationException in JT808 codec --- .../jt808/IotJt808DeviceMessageCodec.java | 155 +++++++++--------- 1 file changed, 79 insertions(+), 76 deletions(-) diff --git a/viewsh-module-iot/viewsh-module-iot-gateway/src/main/java/com/viewsh/module/iot/gateway/codec/jt808/IotJt808DeviceMessageCodec.java b/viewsh-module-iot/viewsh-module-iot-gateway/src/main/java/com/viewsh/module/iot/gateway/codec/jt808/IotJt808DeviceMessageCodec.java index 1c13d5b..e72b1f6 100644 --- a/viewsh-module-iot/viewsh-module-iot-gateway/src/main/java/com/viewsh/module/iot/gateway/codec/jt808/IotJt808DeviceMessageCodec.java +++ b/viewsh-module-iot/viewsh-module-iot-gateway/src/main/java/com/viewsh/module/iot/gateway/codec/jt808/IotJt808DeviceMessageCodec.java @@ -49,7 +49,7 @@ public class IotJt808DeviceMessageCodec implements IotDeviceMessageCodec { public byte[] encode(IotDeviceMessage message) { Assert.notNull(message, "消息不能为空"); Assert.notBlank(message.getMethod(), "消息方法不能为空"); - + try { // 从消息中提取必要信息 String phone = extractPhoneNumber(message); @@ -61,12 +61,12 @@ public class IotJt808DeviceMessageCodec implements IotDeviceMessageCodec { // === 标准物模型方法 === case "thing.service.invoke" -> encodeServiceInvoke(message, phone, flowId); // 服务调用 case "thing.property.set" -> encodePropertySet(message, phone, flowId); // 属性设置 - + // === JT808 内部协议方法(向下兼容) === case "jt808.platform.commonResp", "commonResp" -> encodeCommonResp(message, phone, flowId); case "jt808.platform.registerResp", "registerResp" -> encodeRegisterResp(message, phone, flowId); case "jt808.platform.textDown", "textDown" -> encodeTextDown(message, phone, flowId); - + default -> { log.warn("[encode][不支持的消息方法: {}]", method); yield new byte[0]; @@ -82,14 +82,14 @@ public class IotJt808DeviceMessageCodec implements IotDeviceMessageCodec { public IotDeviceMessage decode(byte[] bytes) { Assert.notNull(bytes, "待解码数据不能为空"); Assert.isTrue(bytes.length >= 12, "数据包长度不足"); - + try { // 1. 反转义(去除首尾标识符) byte[] unescapedBytes = protocolUtil.doEscape4Receive(bytes, 1, bytes.length - 1); - + // 2. 解析为 JT808 数据包 Jt808DataPack dataPack = decoder.bytes2PackageData(unescapedBytes); - + // 3. 转换为统一的 IotDeviceMessage return convertToIotDeviceMessage(dataPack); } catch (Exception e) { @@ -104,26 +104,29 @@ public class IotJt808DeviceMessageCodec implements IotDeviceMessageCodec { private IotDeviceMessage convertToIotDeviceMessage(Jt808DataPack dataPack) { Jt808DataPack.PackHead head = dataPack.getPackHead(); int msgId = head.getId(); - + // 生成消息ID(使用流水号作为标识) String messageId = head.getTerminalPhone() + "_" + head.getFlowId(); - + // 根据消息ID确定物模型标准方法名 String method = getStandardMethodName(msgId); Object params = parseMessageParams(dataPack, msgId); - + // 构建元数据(保留在 params 中,用于调试和追踪) if (params instanceof Map) { @SuppressWarnings("unchecked") - Map paramsMap = (Map) params; + // 确保 paramsMap 是可变的,防止 parseMessageParams 返回不可变 Map 导致 put 报错 + Map paramsMap = new HashMap<>((Map) params); Map metadata = new HashMap<>(); metadata.put("jt808MsgId", String.format("0x%04X", msgId)); metadata.put("terminalPhone", head.getTerminalPhone()); metadata.put("flowId", head.getFlowId()); metadata.put("encryptionType", head.getEncryptionType()); paramsMap.put("_metadata", metadata); + // 更新 params 引用,确保使用的是包含 metadata 的可变 Map + params = paramsMap; } - + // 创建 IotDeviceMessage IotDeviceMessage message = IotDeviceMessage.of(messageId, method, params, null, null, null); message.setReportTime(LocalDateTime.now()); @@ -132,7 +135,7 @@ public class IotJt808DeviceMessageCodec implements IotDeviceMessageCodec { /** * 根据 JT808 消息ID获取物模型标准方法名 - * + * * 映射关系: * - 0x0002 心跳 -> thing.state.update(设备状态更新) * - 0x0200 位置上报 -> thing.property.post(属性上报) @@ -146,20 +149,20 @@ public class IotJt808DeviceMessageCodec implements IotDeviceMessageCodec { return switch (msgId) { // 设备状态类 case 0x0002 -> IotDeviceMessageMethodEnum.STATE_UPDATE.getMethod(); // 心跳 -> 状态更新 - + // 属性上报类 case 0x0200 -> IotDeviceMessageMethodEnum.PROPERTY_POST.getMethod(); // 位置信息汇报 -> 属性上报 case 0x0704 -> IotDeviceMessageMethodEnum.PROPERTY_POST.getMethod(); // 批量位置上传 -> 属性上报 - + // 事件上报类 case 0x0006 -> IotDeviceMessageMethodEnum.EVENT_POST.getMethod(); // 按键事件 -> 事件上报 - + // 内部协议类(不映射到标准方法,使用 JT808 特定方法名) case 0x0001 -> "jt808.terminal.commonResp"; // 终端通用应答 case 0x0100 -> "jt808.terminal.register"; // 终端注册 case 0x0102 -> "jt808.terminal.auth"; // 终端鉴权 case 0x0003 -> "jt808.terminal.logout"; // 终端注销 - + // 未知消息 default -> "jt808.unknown.0x" + Integer.toHexString(msgId); }; @@ -170,18 +173,18 @@ public class IotJt808DeviceMessageCodec implements IotDeviceMessageCodec { */ private Object parseMessageParams(Jt808DataPack dataPack, int msgId) { byte[] bodyBytes = dataPack.getBodyBytes(); - + // 针对不同消息类型进行特殊解析,返回符合物模型标准的格式 return switch (msgId) { // thing.property.post - 返回 properties case 0x0200, 0x0704 -> parseLocationInfoAsProperties(dataPack); - + // thing.state.update - 返回 state 信息 case 0x0002 -> parseHeartbeatAsState(); - + // thing.event.post - 返回 event 信息 case 0x0006 -> parseButtonEventAsEvent(dataPack); - + // JT808 内部协议消息 - 保持原有格式 case 0x0100 -> parseRegisterInfo(dataPack); case 0x0102 -> parseAuthInfo(bodyBytes); @@ -201,7 +204,7 @@ public class IotJt808DeviceMessageCodec implements IotDeviceMessageCodec { /** * 解析按键事件为事件上报格式(thing.event.post) - * + * * 物模型标准格式: * { * "eventId": "button_event", @@ -217,15 +220,15 @@ public class IotJt808DeviceMessageCodec implements IotDeviceMessageCodec { */ private Map parseButtonEventAsEvent(Jt808DataPack dataPack) { Jt808ButtonEvent event = decoder.toButtonEventMsg(dataPack); - + Map result = new HashMap<>(); - + // 统一使用一个事件标识符,通过 isLongPress 参数区分短按和长按 result.put("eventId", "button_event"); - + // 事件时间戳 result.put("eventTime", System.currentTimeMillis()); - + // 事件参数(包含 isLongPress 字段用于区分短按和长按) Map eventParams = new HashMap<>(); eventParams.put("keyId", event.getKeyId()); @@ -234,13 +237,13 @@ public class IotJt808DeviceMessageCodec implements IotDeviceMessageCodec { eventParams.put("keyNumber", event.getKeyNumber()); eventParams.put("isLongPress", event.getIsLongPress()); result.put("params", eventParams); - + return result; } /** * 解析位置信息为属性上报格式(thing.property.post) - * + * * 物模型标准格式:params 直接就是属性键值对 * { * "latitude": 31.123456, @@ -251,10 +254,10 @@ public class IotJt808DeviceMessageCodec implements IotDeviceMessageCodec { */ private Map parseLocationInfoAsProperties(Jt808DataPack dataPack) { Jt808LocationInfo locationInfo = decoder.toLocationInfoUploadMsg(dataPack); - + // 物模型属性集合(所有属性平铺在同一层) Map properties = new HashMap<>(); - + // === 基础位置信息(核心属性) === properties.put("latitude", locationInfo.getLatitude()); properties.put("longitude", locationInfo.getLongitude()); @@ -262,13 +265,13 @@ public class IotJt808DeviceMessageCodec implements IotDeviceMessageCodec { properties.put("speed", locationInfo.getSpeed()); properties.put("direction", locationInfo.getDirection()); properties.put("time", locationInfo.getTime()); - + // 状态和告警字段(转为整数便于处理) properties.put("warningFlag", locationInfo.getWarningFlagField()); properties.put("status", locationInfo.getStatusField()); - + // === 扩展信息(物模型属性) === - + // 电池信息 if (locationInfo.getBatteryInfo() != null) { Jt808BatteryInfo batteryInfo = locationInfo.getBatteryInfo(); @@ -282,17 +285,17 @@ public class IotJt808DeviceMessageCodec implements IotDeviceMessageCodec { properties.put("iccid", batteryInfo.getIccid()); } } - + // 里程 if (locationInfo.getMileage() != null) { properties.put("mileage", locationInfo.getMileage()); } - + // 信号强度 if (locationInfo.getSignalStrength() != null) { properties.put("signalStrength", locationInfo.getSignalStrength()); } - + // 蓝牙设备列表 if (locationInfo.getBluetoothInfos() != null && !locationInfo.getBluetoothInfos().isEmpty()) { List> bluetoothList = new ArrayList<>(); @@ -308,12 +311,12 @@ public class IotJt808DeviceMessageCodec implements IotDeviceMessageCodec { } properties.put("bluetoothDevices", bluetoothList); } - + // 保留原始扩展字段(用于调试和高级用途) if (locationInfo.getExtensions() != null && !locationInfo.getExtensions().isEmpty()) { properties.put("_rawExtensions", locationInfo.getExtensions()); } - + return properties; } @@ -324,9 +327,9 @@ public class IotJt808DeviceMessageCodec implements IotDeviceMessageCodec { Map result = new HashMap<>(); try { // 解析注册消息体 - com.viewsh.module.iot.gateway.codec.jt808.entity.Jt808RegisterInfo registerInfo = + com.viewsh.module.iot.gateway.codec.jt808.entity.Jt808RegisterInfo registerInfo = decoder.toRegisterMsg(dataPack); - + result.put("provinceId", registerInfo.getProvinceId()); result.put("cityId", registerInfo.getCityId()); result.put("manufacturerId", registerInfo.getManufacturerId()); @@ -365,7 +368,7 @@ public class IotJt808DeviceMessageCodec implements IotDeviceMessageCodec { if (bodyBytes == null || bodyBytes.length == 0) { return Map.of(); } - + // 尝试解析为字符串,如果失败则返回十六进制 try { String str = new String(bodyBytes, Jt808Constants.DEFAULT_CHARSET); @@ -375,7 +378,7 @@ public class IotJt808DeviceMessageCodec implements IotDeviceMessageCodec { } } catch (Exception ignored) { } - + return Map.of("rawData", bytesToHex(bodyBytes)); } @@ -387,7 +390,7 @@ public class IotJt808DeviceMessageCodec implements IotDeviceMessageCodec { int replyFlowId = ((Number) params.getOrDefault("replyFlowId", 0)).intValue(); int replyId = ((Number) params.getOrDefault("replyId", 0)).intValue(); byte replyCode = ((Number) params.getOrDefault("replyCode", 0)).byteValue(); - + return encoder.encodeCommonResp(phone, replyFlowId, replyId, replyCode, flowId); } @@ -399,7 +402,7 @@ public class IotJt808DeviceMessageCodec implements IotDeviceMessageCodec { int replyFlowId = ((Number) params.getOrDefault("replyFlowId", 0)).intValue(); byte replyCode = ((Number) params.getOrDefault("replyCode", 0)).byteValue(); String authToken = (String) params.get("authToken"); - + return encoder.encodeRegisterResp(phone, replyFlowId, replyCode, authToken, flowId); } @@ -410,15 +413,15 @@ public class IotJt808DeviceMessageCodec implements IotDeviceMessageCodec { Map params = getParamsAsMap(message); byte flag = ((Number) params.getOrDefault("flag", 0)).byteValue(); String content = (String) params.getOrDefault("content", ""); - + return encoder.encodeTextInfoDown(phone, flag, content, flowId); } /** * 编码服务调用(thing.service.invoke) - * + * * 根据服务标识符映射到不同的 JT808 指令 - * + * * 消息格式: * { * "identifier": "服务标识符", @@ -429,29 +432,29 @@ public class IotJt808DeviceMessageCodec implements IotDeviceMessageCodec { */ private byte[] encodeServiceInvoke(IotDeviceMessage message, String phone, int flowId) { Map params = getParamsAsMap(message); - + // 获取服务标识符 String serviceIdentifier = (String) params.get("identifier"); if (StrUtil.isBlank(serviceIdentifier)) { log.error("[encodeServiceInvoke][服务标识符为空]"); return new byte[0]; } - + // 获取服务参数 @SuppressWarnings("unchecked") Map serviceParams = (Map) params.getOrDefault("params", new HashMap<>()); - + // 根据服务标识符路由到不同的 JT808 指令 return switch (serviceIdentifier) { case "TTS" -> { // 语音播报服务 -> JT808 文本信息下发 (0x8300) String ttsText = (String) serviceParams.getOrDefault("tts_text", ""); int ttsFlag = ((Number) serviceParams.getOrDefault("tts_flag", 4)).intValue(); // 默认 4-TTS 播读 - + log.info("[encodeServiceInvoke][TTS 语音播报] phone={}, flag={}, text={}", phone, ttsFlag, ttsText); yield encoder.encodeTextInfoDown(phone, (byte) ttsFlag, ttsText, flowId); } - + case "locationQuery" -> { // 位置查询服务 -> JT808 位置信息查询 (0x8201) log.info("[encodeServiceInvoke][位置查询] phone={}", phone); @@ -459,7 +462,7 @@ public class IotJt808DeviceMessageCodec implements IotDeviceMessageCodec { log.warn("[encodeServiceInvoke][位置查询服务暂未实现]"); yield new byte[0]; } - + default -> { log.warn("[encodeServiceInvoke][不支持的服务标识符: {}]", serviceIdentifier); yield new byte[0]; @@ -469,9 +472,9 @@ public class IotJt808DeviceMessageCodec implements IotDeviceMessageCodec { /** * 编码属性设置(thing.property.set) - * + * * 属性设置映射到 JT808 的参数设置指令 (0x8103) - * + * * 消息格式: * { * "properties": { @@ -482,23 +485,23 @@ public class IotJt808DeviceMessageCodec implements IotDeviceMessageCodec { */ private byte[] encodePropertySet(IotDeviceMessage message, String phone, int flowId) { Map params = getParamsAsMap(message); - + @SuppressWarnings("unchecked") Map properties = (Map) params.get("properties"); if (properties == null || properties.isEmpty()) { log.error("[encodePropertySet][属性列表为空]"); return new byte[0]; } - + // 将物模型属性映射到 JT808 参数 Map jt808Params = mapPropertiesToJt808Params(properties); if (jt808Params.isEmpty()) { log.warn("[encodePropertySet][没有可映射的 JT808 参数]"); return new byte[0]; } - + log.info("[encodePropertySet][属性设置] phone={}, params={}", phone, jt808Params); - + // TODO: 实现 encoder.encodeParamSettings(phone, jt808Params, flowId); log.warn("[encodePropertySet][参数设置指令暂未实现]"); return new byte[0]; @@ -506,19 +509,19 @@ public class IotJt808DeviceMessageCodec implements IotDeviceMessageCodec { /** * 将物模型属性映射到 JT808 参数 - * + * * 映射关系参考 JT808 协议标准: * - 0x0029: 心跳发送间隔 * - 0x0027: 位置汇报间隔 * - 0x0028: 休眠时汇报间隔 * - 等等... - * + * * @param properties 物模型属性 * @return JT808 参数映射(参数ID -> 参数值) */ private Map mapPropertiesToJt808Params(Map properties) { Map jt808Params = new HashMap<>(); - + properties.forEach((identifier, value) -> { Integer paramId = switch (identifier) { case "heartbeatInterval" -> 0x0029; // 心跳发送间隔(单位:秒) @@ -531,22 +534,22 @@ public class IotJt808DeviceMessageCodec implements IotDeviceMessageCodec { // 更多映射关系可根据实际需求添加 default -> null; }; - + if (paramId != null) { jt808Params.put(paramId, value); - log.debug("[mapPropertiesToJt808Params][属性映射] {} -> 0x{} = {}", + log.debug("[mapPropertiesToJt808Params][属性映射] {} -> 0x{} = {}", identifier, Integer.toHexString(paramId), value); } else { log.warn("[mapPropertiesToJt808Params][未知的属性标识符: {}]", identifier); } }); - + return jt808Params; } /** * 提取终端手机号 - * + * * 优先级: * 1. 从 params._deviceName 中获取(下发场景,IotTcpDownstreamHandler 自动注入) * 2. 从 params._metadata.terminalPhone 中获取(上行消息回复场景) @@ -557,9 +560,9 @@ public class IotJt808DeviceMessageCodec implements IotDeviceMessageCodec { log.error("[extractPhoneNumber][params 不是 Map 类型,消息: {}]", message); throw new IllegalArgumentException("消息参数格式错误,params 必须是 Map 类型"); } - + Map params = (Map) message.getParams(); - + // 1. 优先从 _deviceName 获取(下发场景,由 IotTcpDownstreamHandler 注入) Object deviceName = params.get("_deviceName"); if (deviceName != null && StrUtil.isNotBlank(deviceName.toString())) { @@ -571,7 +574,7 @@ public class IotJt808DeviceMessageCodec implements IotDeviceMessageCodec { log.warn("[extractPhoneNumber][_deviceName 不是纯数字: {}]", deviceNameStr); } } - + // 2. 从 metadata 中获取(上行消息回复场景) if (params.get("_metadata") instanceof Map) { Map metadata = (Map) params.get("_metadata"); @@ -580,7 +583,7 @@ public class IotJt808DeviceMessageCodec implements IotDeviceMessageCodec { return terminalPhone.toString(); } } - + // 3. 从 phone 字段获取(向下兼容,不推荐) Object phone = params.get("phone"); if (phone != null && StrUtil.isNotBlank(phone.toString())) { @@ -589,7 +592,7 @@ public class IotJt808DeviceMessageCodec implements IotDeviceMessageCodec { return phoneStr; } } - + // 4. 如果都获取不到,抛出异常 log.error("[extractPhoneNumber][无法提取终端手机号,params: {}]", params); throw new IllegalArgumentException( @@ -598,19 +601,19 @@ public class IotJt808DeviceMessageCodec implements IotDeviceMessageCodec { /** * 提取流水号 - * + * * 对于下发消息,如果没有指定流水号,则生成一个随机流水号 */ private int extractFlowId(IotDeviceMessage message) { if (message.getParams() instanceof Map) { Map params = (Map) message.getParams(); - + // 尝试获取显式指定的流水号 Object flowId = params.get("flowId"); if (flowId instanceof Number) { return ((Number) flowId).intValue(); } - + // 尝试从 metadata 中获取(上行消息的流水号) if (params.get("_metadata") instanceof Map) { Map metadata = (Map) params.get("_metadata"); @@ -620,7 +623,7 @@ public class IotJt808DeviceMessageCodec implements IotDeviceMessageCodec { } } } - + // 生成随机流水号(1-65535) return (int) (System.currentTimeMillis() % 65535) + 1; } @@ -663,8 +666,8 @@ public class IotJt808DeviceMessageCodec implements IotDeviceMessageCodec { */ public static boolean isJt808Format(byte[] data) { // 检查起止标识符 0x7e - return data != null && data.length >= 12 - && data[0] == (byte) Jt808Constants.PKG_DELIMITER + return data != null && data.length >= 12 + && data[0] == (byte) Jt808Constants.PKG_DELIMITER && data[data.length - 1] == (byte) Jt808Constants.PKG_DELIMITER; }