From 593b16fc01c8b391cf5eb59b73031f1c26119ebf Mon Sep 17 00:00:00 2001 From: lzh Date: Mon, 19 Jan 2026 17:15:47 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E5=9B=9E=E5=A4=8D=E6=97=A5=E5=BF=97?= =?UTF-8?q?=E6=89=93=E5=8D=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../service/RawDataCollectorService.java | 25 ++ .../jt808/service/codec/DataDecoder.java | 243 +++++++++++------- .../jt808/service/handler/MessageHandler.java | 43 +++- .../service/handler/TCPServerHandler.java | 29 +++ 4 files changed, 247 insertions(+), 93 deletions(-) diff --git a/src/main/java/com/iot/transport/jt808/service/RawDataCollectorService.java b/src/main/java/com/iot/transport/jt808/service/RawDataCollectorService.java index 0a61882..c57c413 100644 --- a/src/main/java/com/iot/transport/jt808/service/RawDataCollectorService.java +++ b/src/main/java/com/iot/transport/jt808/service/RawDataCollectorService.java @@ -103,6 +103,31 @@ public class RawDataCollectorService { }); } + /** + * 记录平台下发的原始数据 + */ + public void collectOutgoing(String key, byte[] rawBytes) { + if (!enabled || rawBytes == null || rawBytes.length == 0) { + return; + } + + String hexData = bytesToHex(rawBytes); + String timestamp = LocalDateTime.now().format(timestampFormatter); + + executor.submit(() -> { + try { + BufferedWriter writer = getWriter(key); + writer.write("<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<\n"); + writer.write(String.format("时间: %s [平台下发]\n", timestamp)); + writer.write(String.format("数据(HEX): %s\n", hexData)); + writer.write("<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<\n\n"); + writer.flush(); + } catch (IOException e) { + logger.error("写入下发数据失败: {}", key, e); + } + }); + } + /** * 记录解析后的消息 * diff --git a/src/main/java/com/iot/transport/jt808/service/codec/DataDecoder.java b/src/main/java/com/iot/transport/jt808/service/codec/DataDecoder.java index 2b1ff7b..b6cff5d 100644 --- a/src/main/java/com/iot/transport/jt808/service/codec/DataDecoder.java +++ b/src/main/java/com/iot/transport/jt808/service/codec/DataDecoder.java @@ -42,8 +42,10 @@ public class DataDecoder { public DataPack bytes2PackageData(byte[] data) { DataPack ret = new DataPack(); - // 0. 终端套接字地址信息 - // ret.setChannel(msg.getChannel()); + // 0. JT808协议反转义处理 + // 0x7D 0x02 -> 0x7E + // 0x7D 0x01 -> 0x7D + data = doReceiveUnescape(data); // 1. 16byte 或 12byte 消息头 PackHead msgHeader = this.parseMsgHeaderFromBytes(data); @@ -61,8 +63,6 @@ public class DataDecoder { ret.setBodyBytes(tmp); // 3. 去掉分隔符之后,最后一位就是校验码 - // int checkSumInPkg = - // this.bitOperator.oneByteToInteger(data[data.length - 1]); int checkSumInPkg = data[data.length - 1]; int calculatedCheckSum = this.bitUtil.getCheckSum4JT808(data, 0, data.length - 1); ret.setCheckSum(checkSumInPkg); @@ -72,6 +72,54 @@ public class DataDecoder { return ret; } + /** + * JT808协议反转义处理 + * 接收消息时:先转义还原 + * 0x7D 0x02 -> 0x7E (标识位) + * 0x7D 0x01 -> 0x7D (转义字符) + * + * @param data 转义后的数据 + * @return 还原后的原始数据 + */ + private byte[] doReceiveUnescape(byte[] data) { + if (data == null || data.length < 2) { + return data; + } + + // 使用动态数组收集结果 + byte[] result = new byte[data.length]; + int resultIndex = 0; + + for (int i = 0; i < data.length; i++) { + if (data[i] == 0x7D && i + 1 < data.length) { + // 检查下一个字节 + if (data[i + 1] == 0x02) { + // 0x7D 0x02 -> 0x7E + result[resultIndex++] = 0x7E; + i++; // 跳过下一个字节 + } else if (data[i + 1] == 0x01) { + // 0x7D 0x01 -> 0x7D + result[resultIndex++] = 0x7D; + i++; // 跳过下一个字节 + } else { + // 不是有效的转义序列,保留原字节 + result[resultIndex++] = data[i]; + } + } else { + result[resultIndex++] = data[i]; + } + } + + // 返回实际长度的数组 + if (resultIndex == data.length) { + return result; + } else { + byte[] finalResult = new byte[resultIndex]; + System.arraycopy(result, 0, finalResult, 0, resultIndex); + return finalResult; + } + } + private PackHead parseMsgHeaderFromBytes(byte[] data) { PackHead msgHeader = new PackHead(); @@ -213,61 +261,63 @@ public class DataDecoder { return ret; } - public ButtonEventPack toButtonEventPack(DataPack packageData) { - ButtonEventPack ret = new ButtonEventPack(packageData); - byte[] data = ret.getBodyBytes(); - - // 1. byte[0] 按键ID - if (data.length > 0) { - ret.setKeyId(this.parseIntFromBytes(data, 0, 1)); - } - - // 2. byte[1] 按键状态/次数 - if (data.length > 1) { - ret.setKeyState(this.parseIntFromBytes(data, 1, 1)); - } - + public ButtonEventPack toButtonEventPack(DataPack packageData) { + ButtonEventPack ret = new ButtonEventPack(packageData); + byte[] data = ret.getBodyBytes(); + + // 1. byte[0] 按键ID + if (data.length > 0) { + ret.setKeyId(this.parseIntFromBytes(data, 0, 1)); + } + + // 2. byte[1] 按键状态/次数 + if (data.length > 1) { + ret.setKeyState(this.parseIntFromBytes(data, 1, 1)); + } + return ret; } - public BatchLocationPack toBatchLocationPack(DataPack packageData) { - BatchLocationPack ret = new BatchLocationPack(packageData); - byte[] data = ret.getBodyBytes(); - - // 1. 数据项个数 (WORD) - int count = this.parseIntFromBytes(data, 0, 2); - ret.setCount(count); - - // 2. 位置数据类型 (BYTE) - ret.setType(this.parseIntFromBytes(data, 2, 1)); - - // 3. 循环解析位置数据项 - int index = 3; - for (int i = 0; i < count; i++) { - if (index + 2 > data.length) break; - - // 每一项的数据长度 (WORD) - int itemLen = this.parseIntFromBytes(data, index, 2); - index += 2; - - if (index + itemLen > data.length) break; - - // 提取该项的字节数组 - byte[] itemBytes = new byte[itemLen]; - System.arraycopy(data, index, itemBytes, 0, itemLen); - - // 使用 toLocationInfoUploadMsg 复用解析逻辑 - // 构造一个临时的 DataPack - DataPack itemPack = new DataPack(); - itemPack.setBodyBytes(itemBytes); - LocationPack loc = this.toLocationInfoUploadMsg(itemPack); - ret.addItem(loc); - - index += itemLen; - } - - return ret; - } + public BatchLocationPack toBatchLocationPack(DataPack packageData) { + BatchLocationPack ret = new BatchLocationPack(packageData); + byte[] data = ret.getBodyBytes(); + + // 1. 数据项个数 (WORD) + int count = this.parseIntFromBytes(data, 0, 2); + ret.setCount(count); + + // 2. 位置数据类型 (BYTE) + ret.setType(this.parseIntFromBytes(data, 2, 1)); + + // 3. 循环解析位置数据项 + int index = 3; + for (int i = 0; i < count; i++) { + if (index + 2 > data.length) + break; + + // 每一项的数据长度 (WORD) + int itemLen = this.parseIntFromBytes(data, index, 2); + index += 2; + + if (index + itemLen > data.length) + break; + + // 提取该项的字节数组 + byte[] itemBytes = new byte[itemLen]; + System.arraycopy(data, index, itemBytes, 0, itemLen); + + // 使用 toLocationInfoUploadMsg 复用解析逻辑 + // 构造一个临时的 DataPack + DataPack itemPack = new DataPack(); + itemPack.setBodyBytes(itemBytes); + LocationPack loc = this.toLocationInfoUploadMsg(itemPack); + ret.addItem(loc); + + index += itemLen; + } + + return ret; + } public LocationPack toLocationInfoUploadMsg(DataPack packageData) { LocationPack ret = new LocationPack(packageData); @@ -284,7 +334,7 @@ public class DataDecoder { // 5. byte[16-17] 高程(WORD(16)) 海拔高度,单位为米( m) ret.setElevation(this.parseIntFromBytes(data, 16, 2)); // byte[18-19] 速度(WORD) 1/10km/h - ret.setSpeed(this.parseIntFromBytes(data, 18, 2) / 10.0f); + ret.setSpeed(this.parseIntFromBytes(data, 18, 2) / 10.0f); // byte[20-21] 方向(WORD) 0-359,正北为 0,顺时针 ret.setDirection(this.parseIntFromBytes(data, 20, 2)); // byte[22-x] 时间(BCD[6]) YY-MM-DD-hh-mm-ss @@ -294,41 +344,50 @@ public class DataDecoder { byte[] tmp = new byte[6]; System.arraycopy(data, 22, tmp, 0, 6); String time = this.parseBcdStringFromBytes(data, 22, 6); - - // 扩展协议解析 (Starting from index 28) - int index = 28; - - while (index < data.length) { - int extId = data[index] & 0xFF; - // Ensure we have at least ID and Length bytes - if (index + 1 >= data.length) break; - int extLen = data[index + 1] & 0xFF; - - // Validate remaining length - if (index + 2 + extLen > data.length) { - log.warn("Extension length out of bounds: ID={}, Len={}, Remaining={}", - String.format("0x%02X", extId), extLen, data.length - index - 2); - break; - } - - int contentStart = index + 2; - - ExtensionParser parser = ExtensionParserFactory.getParser(extId); - if (parser != null) { - try { - parser.parse(ret, data, contentStart, extLen); - } catch (Exception e) { - log.error("Failed to parse extension 0x{}: {}", String.format("%02X", extId), e.getMessage()); - } - } else { - // Store unknown extensions as raw hex string - byte[] raw = new byte[extLen]; - System.arraycopy(data, contentStart, raw, 0, extLen); - ret.addExtension(extId, HexUtil.toHexString(raw)); - } - - index += (2 + extLen); - } + + try { + java.text.SimpleDateFormat sdf = new java.text.SimpleDateFormat("yyMMddHHmmss"); + sdf.setTimeZone(java.util.TimeZone.getTimeZone("GMT+8")); + ret.setTime(sdf.parse(time)); + } catch (Exception e) { + log.error("解析时间出错:{}", e.getMessage()); + } + + // 扩展协议解析 (Starting from index 28) + int index = 28; + + while (index < data.length) { + int extId = data[index] & 0xFF; + // Ensure we have at least ID and Length bytes + if (index + 1 >= data.length) + break; + int extLen = data[index + 1] & 0xFF; + + // Validate remaining length + if (index + 2 + extLen > data.length) { + log.warn("Extension length out of bounds: ID={}, Len={}, Remaining={}", + String.format("0x%02X", extId), extLen, data.length - index - 2); + break; + } + + int contentStart = index + 2; + + ExtensionParser parser = ExtensionParserFactory.getParser(extId); + if (parser != null) { + try { + parser.parse(ret, data, contentStart, extLen); + } catch (Exception e) { + log.error("Failed to parse extension 0x{}: {}", String.format("%02X", extId), e.getMessage()); + } + } else { + // Store unknown extensions as raw hex string + byte[] raw = new byte[extLen]; + System.arraycopy(data, contentStart, raw, 0, extLen); + ret.addExtension(extId, HexUtil.toHexString(raw)); + } + + index += (2 + extLen); + } return ret; } diff --git a/src/main/java/com/iot/transport/jt808/service/handler/MessageHandler.java b/src/main/java/com/iot/transport/jt808/service/handler/MessageHandler.java index 17ea803..10e9620 100644 --- a/src/main/java/com/iot/transport/jt808/service/handler/MessageHandler.java +++ b/src/main/java/com/iot/transport/jt808/service/handler/MessageHandler.java @@ -9,6 +9,11 @@ import com.iot.transport.jt808.server.SessionManager; import com.iot.transport.jt808.service.codec.DataDecoder; import com.iot.transport.jt808.service.codec.DataEncoder; +import com.iot.transport.jt808.service.ApiLogService; +import com.iot.transport.jt808.service.RawDataCollectorService; +import java.util.HashMap; +import java.util.Map; + import io.netty.buffer.ByteBuf; import io.netty.buffer.PooledByteBufAllocator; import io.netty.buffer.Unpooled; @@ -23,12 +28,23 @@ public abstract class MessageHandler { protected DataDecoder decoder; protected SessionManager sessionManager; + protected ApiLogService apiLogService; + protected RawDataCollectorService rawDataCollectorService; + public MessageHandler() { this.msgEncoder = new DataEncoder(); this.decoder = new DataDecoder(); this.sessionManager = SessionManager.getInstance(); } + public void setApiLogService(ApiLogService apiLogService) { + this.apiLogService = apiLogService; + } + + public void setRawDataCollectorService(RawDataCollectorService rawDataCollectorService) { + this.rawDataCollectorService = rawDataCollectorService; + } + protected ByteBuf getByteBuf(byte[] arr) { ByteBuf byteBuf = PooledByteBufAllocator.DEFAULT.directBuffer(arr.length); byteBuf.writeBytes(arr); @@ -36,6 +52,32 @@ public abstract class MessageHandler { } public void send2Client(Channel channel, byte[] arr) throws InterruptedException { + log.info("<< [发送数据] Hex: {}", com.iot.transport.jt808.util.HexUtil.toHexString(arr)); + + // Find Session to identify the user + String key = Session.buildId(channel); + Session session = this.sessionManager.findBySessionId(key); + if (session != null && session.getTerminalPhone() != null) { + key = session.getTerminalPhone(); + } + + // 1. Raw Data Collection + if (rawDataCollectorService != null) { + rawDataCollectorService.collectOutgoing(key, arr); + } + + // 2. Frontend Broadcast + if (apiLogService != null) { + Map logMap = new HashMap<>(); + logMap.put("direction", "SEND"); + logMap.put("hex", com.iot.transport.jt808.util.HexUtil.toHexString(arr)); + logMap.put("details", "平台回复 (Hex Only)"); + logMap.put("id", key); + logMap.put("phone", key); + + apiLogService.broadcastLog("TCP", logMap); + } + ChannelFuture future = channel.writeAndFlush(Unpooled.copiedBuffer(arr)).sync(); if (!future.isSuccess()) { log.error("发送数据出错:{}", future.cause()); @@ -55,6 +97,5 @@ public abstract class MessageHandler { return this.getFlowId(channel, 0); } - public abstract void process(DataPack req); } diff --git a/src/main/java/com/iot/transport/jt808/service/handler/TCPServerHandler.java b/src/main/java/com/iot/transport/jt808/service/handler/TCPServerHandler.java index 618b925..5c6a332 100644 --- a/src/main/java/com/iot/transport/jt808/service/handler/TCPServerHandler.java +++ b/src/main/java/com/iot/transport/jt808/service/handler/TCPServerHandler.java @@ -25,6 +25,7 @@ import com.iot.transport.jt808.common.Consts; import com.iot.transport.jt808.entity.request.BatteryVersionInfo; import com.iot.transport.jt808.entity.request.BluetoothInfo; import com.iot.transport.jt808.entity.request.LocationPack; +import com.iot.transport.jt808.entity.request.RegisterPack; import com.iot.transport.jt808.entity.request.ButtonEventPack; import com.iot.transport.jt808.entity.request.BatchLocationPack; import java.util.List; @@ -123,6 +124,17 @@ public class TCPServerHandler extends ChannelInboundHandlerAdapter { // (1) locMap.put("lon", locPack.getLongitude()); logMap.put("location", locMap); + // BroadCast Badge Event (Logic for Dashboard Update) + apiLogService.broadcastLog("TCP", logMap); + + // Remove Badge fields to send a normal log for Statistics + logMap.remove("type"); + logMap.remove("location"); + logMap.remove("battery"); + logMap.remove("status"); + logMap.remove("bluetooth"); + logMap.remove("id"); + } catch (Exception e) { logMap.put("details", packageData.toString() + " (Parse Error: " + e.getMessage() + ")"); } @@ -169,6 +181,21 @@ public class TCPServerHandler extends ChannelInboundHandlerAdapter { // (1) } catch (Exception e) { logMap.put("details", packageData.toString() + " (Parse Error: " + e.getMessage() + ")"); } + } else if (msgId == Consts.MSGID_REGISTER || msgId == 0x0100) { + try { + RegisterPack registerPack = this.decoder.toTerminalRegisterMsg(packageData); + logMap.put("details", registerPack.toString()); + } catch (Exception e) { + logMap.put("details", packageData.toString() + " (Parse Error: " + e.getMessage() + ")"); + } + } else if (msgId == Consts.MSGID_AUTHENTICATION || msgId == 0x0102) { + try { + com.iot.transport.jt808.entity.request.AuthenticationPack authPack = new com.iot.transport.jt808.entity.request.AuthenticationPack( + packageData); + logMap.put("details", authPack.toString()); + } catch (Exception e) { + logMap.put("details", packageData.toString() + " (Parse Error: " + e.getMessage() + ")"); + } } else { logMap.put("details", packageData.toString()); } @@ -178,6 +205,8 @@ public class TCPServerHandler extends ChannelInboundHandlerAdapter { // (1) MessageHandler handler = MessageHandlerFactory.getInstance(msgId); if (handler != null) { + handler.setApiLogService(this.apiLogService); + handler.setRawDataCollectorService(this.rawDataCollectorService); handler.process(packageData); } else { // 其他情况 logger.error("[未知消息类型],msgId={},phone={},package={}", header.getId(), header.getTerminalPhone(),