feat: 回复日志打印
All checks were successful
iot-test-platform CI/CD / build-and-deploy (push) Successful in 35s

This commit is contained in:
lzh
2026-01-19 17:15:47 +08:00
parent fd483799a9
commit 593b16fc01
4 changed files with 247 additions and 93 deletions

View File

@@ -103,6 +103,31 @@ public class RawDataCollectorService {
});
}
/**
* 记录平台下发的原始数据
*/
public void collectOutgoing(String key, byte[] rawBytes) {
if (!enabled || rawBytes == null || rawBytes.length == 0) {
return;
}
String hexData = bytesToHex(rawBytes);
String timestamp = LocalDateTime.now().format(timestampFormatter);
executor.submit(() -> {
try {
BufferedWriter writer = getWriter(key);
writer.write("<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<\n");
writer.write(String.format("时间: %s [平台下发]\n", timestamp));
writer.write(String.format("数据(HEX): %s\n", hexData));
writer.write("<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<\n\n");
writer.flush();
} catch (IOException e) {
logger.error("写入下发数据失败: {}", key, e);
}
});
}
/**
* 记录解析后的消息
*

View File

@@ -42,8 +42,10 @@ public class DataDecoder {
public DataPack bytes2PackageData(byte[] data) {
DataPack ret = new DataPack();
// 0. 终端套接字地址信息
// ret.setChannel(msg.getChannel());
// 0. JT808协议反转义处理
// 0x7D 0x02 -> 0x7E
// 0x7D 0x01 -> 0x7D
data = doReceiveUnescape(data);
// 1. 16byte 或 12byte 消息头
PackHead msgHeader = this.parseMsgHeaderFromBytes(data);
@@ -61,8 +63,6 @@ public class DataDecoder {
ret.setBodyBytes(tmp);
// 3. 去掉分隔符之后,最后一位就是校验码
// int checkSumInPkg =
// this.bitOperator.oneByteToInteger(data[data.length - 1]);
int checkSumInPkg = data[data.length - 1];
int calculatedCheckSum = this.bitUtil.getCheckSum4JT808(data, 0, data.length - 1);
ret.setCheckSum(checkSumInPkg);
@@ -72,6 +72,54 @@ public class DataDecoder {
return ret;
}
/**
* JT808协议反转义处理
* 接收消息时:先转义还原
* 0x7D 0x02 -> 0x7E (标识位)
* 0x7D 0x01 -> 0x7D (转义字符)
*
* @param data 转义后的数据
* @return 还原后的原始数据
*/
private byte[] doReceiveUnescape(byte[] data) {
if (data == null || data.length < 2) {
return data;
}
// 使用动态数组收集结果
byte[] result = new byte[data.length];
int resultIndex = 0;
for (int i = 0; i < data.length; i++) {
if (data[i] == 0x7D && i + 1 < data.length) {
// 检查下一个字节
if (data[i + 1] == 0x02) {
// 0x7D 0x02 -> 0x7E
result[resultIndex++] = 0x7E;
i++; // 跳过下一个字节
} else if (data[i + 1] == 0x01) {
// 0x7D 0x01 -> 0x7D
result[resultIndex++] = 0x7D;
i++; // 跳过下一个字节
} else {
// 不是有效的转义序列,保留原字节
result[resultIndex++] = data[i];
}
} else {
result[resultIndex++] = data[i];
}
}
// 返回实际长度的数组
if (resultIndex == data.length) {
return result;
} else {
byte[] finalResult = new byte[resultIndex];
System.arraycopy(result, 0, finalResult, 0, resultIndex);
return finalResult;
}
}
private PackHead parseMsgHeaderFromBytes(byte[] data) {
PackHead msgHeader = new PackHead();
@@ -244,13 +292,15 @@ public class DataDecoder {
// 3. 循环解析位置数据项
int index = 3;
for (int i = 0; i < count; i++) {
if (index + 2 > data.length) break;
if (index + 2 > data.length)
break;
// 每一项的数据长度 (WORD)
int itemLen = this.parseIntFromBytes(data, index, 2);
index += 2;
if (index + itemLen > data.length) break;
if (index + itemLen > data.length)
break;
// 提取该项的字节数组
byte[] itemBytes = new byte[itemLen];
@@ -295,13 +345,22 @@ public class DataDecoder {
System.arraycopy(data, 22, tmp, 0, 6);
String time = this.parseBcdStringFromBytes(data, 22, 6);
try {
java.text.SimpleDateFormat sdf = new java.text.SimpleDateFormat("yyMMddHHmmss");
sdf.setTimeZone(java.util.TimeZone.getTimeZone("GMT+8"));
ret.setTime(sdf.parse(time));
} catch (Exception e) {
log.error("解析时间出错:{}", e.getMessage());
}
// 扩展协议解析 (Starting from index 28)
int index = 28;
while (index < data.length) {
int extId = data[index] & 0xFF;
// Ensure we have at least ID and Length bytes
if (index + 1 >= data.length) break;
if (index + 1 >= data.length)
break;
int extLen = data[index + 1] & 0xFF;
// Validate remaining length

View File

@@ -9,6 +9,11 @@ import com.iot.transport.jt808.server.SessionManager;
import com.iot.transport.jt808.service.codec.DataDecoder;
import com.iot.transport.jt808.service.codec.DataEncoder;
import com.iot.transport.jt808.service.ApiLogService;
import com.iot.transport.jt808.service.RawDataCollectorService;
import java.util.HashMap;
import java.util.Map;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.buffer.Unpooled;
@@ -23,12 +28,23 @@ public abstract class MessageHandler {
protected DataDecoder decoder;
protected SessionManager sessionManager;
protected ApiLogService apiLogService;
protected RawDataCollectorService rawDataCollectorService;
public MessageHandler() {
this.msgEncoder = new DataEncoder();
this.decoder = new DataDecoder();
this.sessionManager = SessionManager.getInstance();
}
public void setApiLogService(ApiLogService apiLogService) {
this.apiLogService = apiLogService;
}
public void setRawDataCollectorService(RawDataCollectorService rawDataCollectorService) {
this.rawDataCollectorService = rawDataCollectorService;
}
protected ByteBuf getByteBuf(byte[] arr) {
ByteBuf byteBuf = PooledByteBufAllocator.DEFAULT.directBuffer(arr.length);
byteBuf.writeBytes(arr);
@@ -36,6 +52,32 @@ public abstract class MessageHandler {
}
public void send2Client(Channel channel, byte[] arr) throws InterruptedException {
log.info("<< [发送数据] Hex: {}", com.iot.transport.jt808.util.HexUtil.toHexString(arr));
// Find Session to identify the user
String key = Session.buildId(channel);
Session session = this.sessionManager.findBySessionId(key);
if (session != null && session.getTerminalPhone() != null) {
key = session.getTerminalPhone();
}
// 1. Raw Data Collection
if (rawDataCollectorService != null) {
rawDataCollectorService.collectOutgoing(key, arr);
}
// 2. Frontend Broadcast
if (apiLogService != null) {
Map<String, Object> logMap = new HashMap<>();
logMap.put("direction", "SEND");
logMap.put("hex", com.iot.transport.jt808.util.HexUtil.toHexString(arr));
logMap.put("details", "平台回复 (Hex Only)");
logMap.put("id", key);
logMap.put("phone", key);
apiLogService.broadcastLog("TCP", logMap);
}
ChannelFuture future = channel.writeAndFlush(Unpooled.copiedBuffer(arr)).sync();
if (!future.isSuccess()) {
log.error("发送数据出错:{}", future.cause());
@@ -55,6 +97,5 @@ public abstract class MessageHandler {
return this.getFlowId(channel, 0);
}
public abstract void process(DataPack req);
}

View File

@@ -25,6 +25,7 @@ import com.iot.transport.jt808.common.Consts;
import com.iot.transport.jt808.entity.request.BatteryVersionInfo;
import com.iot.transport.jt808.entity.request.BluetoothInfo;
import com.iot.transport.jt808.entity.request.LocationPack;
import com.iot.transport.jt808.entity.request.RegisterPack;
import com.iot.transport.jt808.entity.request.ButtonEventPack;
import com.iot.transport.jt808.entity.request.BatchLocationPack;
import java.util.List;
@@ -123,6 +124,17 @@ public class TCPServerHandler extends ChannelInboundHandlerAdapter { // (1)
locMap.put("lon", locPack.getLongitude());
logMap.put("location", locMap);
// BroadCast Badge Event (Logic for Dashboard Update)
apiLogService.broadcastLog("TCP", logMap);
// Remove Badge fields to send a normal log for Statistics
logMap.remove("type");
logMap.remove("location");
logMap.remove("battery");
logMap.remove("status");
logMap.remove("bluetooth");
logMap.remove("id");
} catch (Exception e) {
logMap.put("details", packageData.toString() + " (Parse Error: " + e.getMessage() + ")");
}
@@ -169,6 +181,21 @@ public class TCPServerHandler extends ChannelInboundHandlerAdapter { // (1)
} catch (Exception e) {
logMap.put("details", packageData.toString() + " (Parse Error: " + e.getMessage() + ")");
}
} else if (msgId == Consts.MSGID_REGISTER || msgId == 0x0100) {
try {
RegisterPack registerPack = this.decoder.toTerminalRegisterMsg(packageData);
logMap.put("details", registerPack.toString());
} catch (Exception e) {
logMap.put("details", packageData.toString() + " (Parse Error: " + e.getMessage() + ")");
}
} else if (msgId == Consts.MSGID_AUTHENTICATION || msgId == 0x0102) {
try {
com.iot.transport.jt808.entity.request.AuthenticationPack authPack = new com.iot.transport.jt808.entity.request.AuthenticationPack(
packageData);
logMap.put("details", authPack.toString());
} catch (Exception e) {
logMap.put("details", packageData.toString() + " (Parse Error: " + e.getMessage() + ")");
}
} else {
logMap.put("details", packageData.toString());
}
@@ -178,6 +205,8 @@ public class TCPServerHandler extends ChannelInboundHandlerAdapter { // (1)
MessageHandler handler = MessageHandlerFactory.getInstance(msgId);
if (handler != null) {
handler.setApiLogService(this.apiLogService);
handler.setRawDataCollectorService(this.rawDataCollectorService);
handler.process(packageData);
} else { // 其他情况
logger.error("[未知消息类型],msgId={},phone={},package={}", header.getId(), header.getTerminalPhone(),