feat: 添加JT808原始数据收集功能
Some checks failed
iot-test-platform CI/CD / build-and-deploy (push) Failing after 45s

This commit is contained in:
lzh
2026-01-16 22:42:39 +08:00
parent 31f2cc61a2
commit cb6874b942
7 changed files with 386 additions and 126 deletions

View File

@@ -1,6 +1,7 @@
package com.iot.transport.jt808.server;
import com.iot.transport.jt808.service.ApiLogService;
import com.iot.transport.jt808.service.RawDataCollectorService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.CommandLineRunner;
@@ -21,21 +22,23 @@ public class Jt808NettyServer implements CommandLineRunner {
@Autowired
private ApiLogService apiLogService;
@Autowired
private RawDataCollectorService rawDataCollectorService;
private TCPServer tcpServer;
@Override
public void run(String... args) throws Exception {
log.info("Initializing JT808 TCP Server on port: {}", port);
tcpServer = new TCPServer(port, apiLogService);
tcpServer = new TCPServer(port, apiLogService, rawDataCollectorService);
tcpServer.startServer();
}
// You might want to add a @PreDestroy method to stop the server gracefully
// @PreDestroy
// public void destroy() {
// if (tcpServer != null) {
// tcpServer.stopServer();
// }
// if (tcpServer != null) {
// tcpServer.stopServer();
// }
// }
}

View File

@@ -1,6 +1,5 @@
package com.iot.transport.jt808.server;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.TimeUnit;
@@ -21,13 +20,15 @@ import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.concurrent.Future;
import com.iot.transport.jt808.service.ApiLogService;
import com.iot.transport.jt808.service.RawDataCollectorService;
public class TCPServer {
private Logger log = LoggerFactory.getLogger(getClass());
private int port;
private ApiLogService apiLogService;
private ApiLogService apiLogService;
private RawDataCollectorService rawDataCollectorService;
private EventLoopGroup bossGroup = null;
private EventLoopGroup workerGroup = null;
private volatile boolean isRunning = false;
@@ -35,10 +36,11 @@ public class TCPServer {
public TCPServer() {
}
public TCPServer(int port, ApiLogService apiLogService) {
public TCPServer(int port, ApiLogService apiLogService, RawDataCollectorService rawDataCollectorService) {
this();
this.port = port;
this.apiLogService = apiLogService;
this.apiLogService = apiLogService;
this.rawDataCollectorService = rawDataCollectorService;
}
private void bind() throws Exception {
@@ -50,19 +52,19 @@ public class TCPServer {
.childHandler(new ChannelInitializer<SocketChannel>() { //
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast("idleStateHandler",
new IdleStateHandler(Consts.TCP_CLIENT_IDLE, 0, 0, TimeUnit.MINUTES));
ch.pipeline().addLast(new LogDecoder());
// 1024表示单条消息的最大长度解码器在查找分隔符的时候达到该长度还没找到的话会抛异常
ch.pipeline().addLast(
new DelimiterBasedFrameDecoder(1024, Unpooled.copiedBuffer(new byte[] { 0x7e }),
Unpooled.copiedBuffer(new byte[] { 0x7e, 0x7e })));
//ch.pipeline().addLast(new PackageDataDecoder());
ch.pipeline().addLast(new TCPServerHandler(apiLogService));
// ch.pipeline().addLast(new PackageDataDecoder());
ch.pipeline().addLast(new TCPServerHandler(apiLogService, rawDataCollectorService));
}
}).option(ChannelOption.SO_BACKLOG, 128) //
.childOption(ChannelOption.SO_KEEPALIVE, true);

View File

@@ -0,0 +1,227 @@
package com.iot.transport.jt808.service;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import com.iot.transport.jt808.entity.DataPack;
import com.iot.transport.jt808.entity.DataPack.PackHead;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
* 原始数据收集服务
* 用于收集JT808设备上传的所有原始数据和解析后的消息内容
* 便于分析设备的数据上传习惯
*/
@Service
public class RawDataCollectorService {
private final Logger logger = LoggerFactory.getLogger(getClass());
@Value("${jt808.raw-data.enabled:true}")
private boolean enabled;
@Value("${jt808.raw-data.output-dir:./raw-data}")
private String outputDir;
private final ExecutorService executor = Executors.newSingleThreadExecutor();
private final ConcurrentHashMap<String, BufferedWriter> writers = new ConcurrentHashMap<>();
private final DateTimeFormatter timestampFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS");
private final DateTimeFormatter dateFormatter = DateTimeFormatter.ofPattern("yyyyMMdd");
@PostConstruct
public void init() {
if (enabled) {
File dir = new File(outputDir);
if (!dir.exists()) {
dir.mkdirs();
}
logger.info("原始数据收集服务已启动, 输出目录: {}", dir.getAbsolutePath());
} else {
logger.info("原始数据收集服务已禁用");
}
}
@PreDestroy
public void shutdown() {
executor.shutdown();
try {
executor.awaitTermination(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
// 关闭所有writer
writers.forEach((key, writer) -> {
try {
writer.close();
} catch (IOException e) {
logger.error("关闭writer失败: {}", key, e);
}
});
writers.clear();
logger.info("原始数据收集服务已关闭");
}
/**
* 记录原始字节数据
*
* @param sessionId 会话ID
* @param rawBytes 原始字节数据
*/
public void collectRawBytes(String sessionId, byte[] rawBytes) {
if (!enabled || rawBytes == null || rawBytes.length == 0) {
return;
}
String hexData = bytesToHex(rawBytes);
String timestamp = LocalDateTime.now().format(timestampFormatter);
executor.submit(() -> {
try {
BufferedWriter writer = getWriter(sessionId);
writer.write("================================================================================\n");
writer.write(String.format("时间: %s\n", timestamp));
writer.write(String.format("会话ID: %s\n", sessionId));
writer.write(String.format("原始数据(HEX): %s\n", hexData));
writer.flush();
} catch (IOException e) {
logger.error("写入原始数据失败: {}", sessionId, e);
}
});
}
/**
* 记录解析后的消息
*
* @param sessionId 会话ID
* @param dataPack 解析后的数据包
* @param parsedDetails 解析后的详细信息
*/
public void collectParsedMessage(String sessionId, DataPack dataPack, String parsedDetails) {
if (!enabled || dataPack == null) {
return;
}
PackHead header = dataPack.getPackHead();
executor.submit(() -> {
try {
BufferedWriter writer = getWriter(sessionId);
writer.write("--------------------------------------------------------------------------------\n");
writer.write("解析结果:\n");
writer.write(String.format(" 设备号: %s\n", header.getTerminalPhone()));
writer.write(String.format(" 消息ID: 0x%04X (%s)\n", header.getId(), getMsgTypeName(header.getId())));
writer.write(String.format(" 流水号: %d\n", header.getFlowId()));
if (parsedDetails != null && !parsedDetails.isEmpty()) {
writer.write(String.format(" 消息体: %s\n", parsedDetails));
}
writer.write("================================================================================\n\n");
writer.flush();
} catch (IOException e) {
logger.error("写入解析数据失败: {}", sessionId, e);
}
});
}
/**
* 完整记录一条消息(原始数据+解析结果)
*/
public void collectMessage(String sessionId, byte[] rawBytes, DataPack dataPack, String parsedDetails) {
if (!enabled) {
return;
}
String hexData = rawBytes != null ? bytesToHex(rawBytes) : "N/A";
String timestamp = LocalDateTime.now().format(timestampFormatter);
PackHead header = dataPack != null ? dataPack.getPackHead() : null;
String terminalPhone = header != null ? header.getTerminalPhone() : sessionId;
executor.submit(() -> {
try {
// 使用设备号作为文件名(如果有的话)
String fileKey = (terminalPhone != null && !terminalPhone.isEmpty()) ? terminalPhone : sessionId;
BufferedWriter writer = getWriter(fileKey);
writer.write("================================================================================\n");
writer.write(String.format("时间: %s\n", timestamp));
if (terminalPhone != null && !terminalPhone.isEmpty()) {
writer.write(String.format("设备号: %s\n", terminalPhone));
}
writer.write(String.format("会话ID: %s\n", sessionId));
writer.write(String.format("原始数据(HEX): %s\n", hexData));
if (header != null) {
writer.write("--------------------------------------------------------------------------------\n");
writer.write("解析结果:\n");
writer.write(
String.format(" 消息ID: 0x%04X (%s)\n", header.getId(), getMsgTypeName(header.getId())));
writer.write(String.format(" 流水号: %d\n", header.getFlowId()));
if (parsedDetails != null && !parsedDetails.isEmpty()) {
writer.write(String.format(" 消息体: %s\n", parsedDetails));
}
}
writer.write("================================================================================\n\n");
writer.flush();
} catch (IOException e) {
logger.error("写入消息数据失败: {}", sessionId, e);
}
});
}
private BufferedWriter getWriter(String key) throws IOException {
String dateStr = LocalDateTime.now().format(dateFormatter);
String fileName = String.format("%s_%s.txt", key, dateStr);
String fullPath = outputDir + File.separator + fileName;
// 检查是否需要创建新的writer日期变化或首次创建
String writerKey = key + "_" + dateStr;
return writers.computeIfAbsent(writerKey, k -> {
try {
return new BufferedWriter(new FileWriter(fullPath, true));
} catch (IOException e) {
logger.error("创建writer失败: {}", fullPath, e);
throw new RuntimeException(e);
}
});
}
private String bytesToHex(byte[] bytes) {
StringBuilder sb = new StringBuilder();
for (byte b : bytes) {
sb.append(String.format("%02X", b));
}
return sb.toString();
}
private String getMsgTypeName(int msgId) {
return switch (msgId) {
case 0x0001 -> "终端通用应答";
case 0x0002 -> "终端心跳";
case 0x0003 -> "终端注销";
case 0x0100 -> "终端注册";
case 0x0102 -> "终端鉴权";
case 0x0200 -> "位置上报";
case 0x0704 -> "批量位置上报";
case 0x0006 -> "按键事件";
default -> "未知消息";
};
}
public boolean isEnabled() {
return enabled;
}
}

View File

@@ -17,6 +17,7 @@ import io.netty.handler.timeout.IdleStateEvent;
import io.netty.util.ReferenceCountUtil;
import com.iot.transport.jt808.service.ApiLogService;
import com.iot.transport.jt808.service.RawDataCollectorService;
import java.util.HashMap;
import java.util.Map;
@@ -30,35 +31,40 @@ import java.util.List;
public class TCPServerHandler extends ChannelInboundHandlerAdapter { // (1)
private final Logger logger = LoggerFactory.getLogger(getClass());
private final Logger logger = LoggerFactory.getLogger(getClass());
private final DataDecoder decoder;
private final SessionManager sessionManager;
private final DataDecoder decoder;
private final SessionManager sessionManager;
private final ApiLogService apiLogService;
private final RawDataCollectorService rawDataCollectorService;
public TCPServerHandler(ApiLogService apiLogService) {
this.decoder = new DataDecoder();
this.sessionManager = SessionManager.getInstance();
// 保存当前处理的原始字节,用于记录
private byte[] currentRawBytes;
public TCPServerHandler(ApiLogService apiLogService, RawDataCollectorService rawDataCollectorService) {
this.decoder = new DataDecoder();
this.sessionManager = SessionManager.getInstance();
this.apiLogService = apiLogService;
}
/**
*
* 处理业务逻辑
*
* @param packageData
* @throws IllegalAccessException
* @throws InstantiationException
*
*/
private void processPackageData(DataPack packageData) throws InstantiationException, IllegalAccessException {
PackHead header = packageData.getPackHead();
Integer msgId = header.getId();
this.rawDataCollectorService = rawDataCollectorService;
}
/**
*
* 处理业务逻辑
*
* @param packageData
* @throws IllegalAccessException
* @throws InstantiationException
*
*/
private void processPackageData(DataPack packageData) throws InstantiationException, IllegalAccessException {
PackHead header = packageData.getPackHead();
Integer msgId = header.getId();
String terminalPhone = header.getTerminalPhone();
logger.info("消息头部msgid={}, phone={}, flowid={}", msgId, terminalPhone, header.getFlowId());
String sessionId = Session.buildId(packageData.getChannel());
logger.info("消息头部msgid={}, phone={}, flowid={}", msgId, terminalPhone, header.getFlowId());
// Update Session with Phone Number if available
if (terminalPhone != null && !terminalPhone.isEmpty()) {
@@ -80,8 +86,9 @@ public class TCPServerHandler extends ChannelInboundHandlerAdapter { // (1)
logMap.put("phone", header.getTerminalPhone());
logMap.put("flowId", header.getFlowId());
logMap.put("summary", "TCP Message Received");
// Special handling for Location Upload (0x0200) to show parsed details including Bluetooth
// Special handling for Location Upload (0x0200) to show parsed details
// including Bluetooth
if (msgId == Consts.MSGID_LOCATION_UPLOAD || msgId == 0x0200) {
try {
LocationPack locPack = this.decoder.toLocationInfoUploadMsg(packageData);
@@ -90,7 +97,7 @@ public class TCPServerHandler extends ChannelInboundHandlerAdapter { // (1)
// Enhanced fields for Frontend Dashboard
logMap.put("type", "badge");
logMap.put("id", header.getTerminalPhone());
// Battery
BatteryVersionInfo batInfo = locPack.getBatteryVersionInfo();
if (batInfo != null) {
@@ -109,7 +116,7 @@ public class TCPServerHandler extends ChannelInboundHandlerAdapter { // (1)
if (bles != null) {
logMap.put("bluetooth", bles);
}
// Location
Map<String, Object> locMap = new HashMap<>();
locMap.put("lat", locPack.getLatitude());
@@ -125,14 +132,14 @@ public class TCPServerHandler extends ChannelInboundHandlerAdapter { // (1)
logMap.put("details", btnPack.toString());
logMap.put("type", "badge"); // Keep it as badge type to update the device
logMap.put("id", header.getTerminalPhone());
// Add button event info
Map<String, Object> btnInfo = new HashMap<>();
btnInfo.put("keyId", btnPack.getKeyId());
btnInfo.put("keyState", btnPack.getKeyState());
btnInfo.put("timestamp", System.currentTimeMillis());
logMap.put("buttonEvent", btnInfo);
} catch (Exception e) {
logMap.put("details", packageData.toString() + " (Parse Error: " + e.getMessage() + ")");
}
@@ -142,17 +149,17 @@ public class TCPServerHandler extends ChannelInboundHandlerAdapter { // (1)
logMap.put("details", "Batch Upload: " + batchPack.getCount() + " items");
logMap.put("type", "badge");
logMap.put("id", header.getTerminalPhone());
// Use the latest location item to update the UI
if (batchPack.getItems() != null && !batchPack.getItems().isEmpty()) {
LocationPack lastLoc = batchPack.getItems().get(batchPack.getItems().size() - 1);
// Location
Map<String, Object> locMap = new HashMap<>();
locMap.put("lat", lastLoc.getLatitude());
locMap.put("lon", lastLoc.getLongitude());
logMap.put("location", locMap);
// Battery
BatteryVersionInfo batInfo = lastLoc.getBatteryVersionInfo();
if (batInfo != null) {
@@ -165,85 +172,93 @@ public class TCPServerHandler extends ChannelInboundHandlerAdapter { // (1)
} else {
logMap.put("details", packageData.toString());
}
apiLogService.broadcastLog("TCP", logMap);
}
MessageHandler handler = MessageHandlerFactory.getInstance(msgId);
if(handler != null){
handler.process(packageData);
}else { // 其他情况
logger.error("[未知消息类型],msgId={},phone={},package={}", header.getId(), header.getTerminalPhone(), packageData);
}
}
MessageHandler handler = MessageHandlerFactory.getInstance(msgId);
if (handler != null) {
handler.process(packageData);
} else { // 其他情况
logger.error("[未知消息类型],msgId={},phone={},package={}", header.getId(), header.getTerminalPhone(),
packageData);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws InterruptedException { // (2)
try {
ByteBuf buf = (ByteBuf) msg;
if (buf.readableBytes() <= 0) {
// ReferenceCountUtil.safeRelease(msg);
return;
}
// 记录原始数据和解析结果
if (rawDataCollectorService != null && rawDataCollectorService.isEnabled()) {
String parsedDetails = packageData.toString();
rawDataCollectorService.collectMessage(sessionId, currentRawBytes, packageData, parsedDetails);
}
}
byte[] bs = new byte[buf.readableBytes()];
buf.readBytes(bs);
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws InterruptedException { // (2)
try {
ByteBuf buf = (ByteBuf) msg;
if (buf.readableBytes() <= 0) {
// ReferenceCountUtil.safeRelease(msg);
return;
}
// 字节数据转换为针对于808消息结构的实体类
DataPack pkg = this.decoder.bytes2PackageData(bs);
// 引用channel,以便回送数据给硬件
pkg.setChannel(ctx.channel());
processPackageData(pkg);
}catch (Exception e) {
// TODO: handle exception
logger.error("消息处理异常", e);
} finally {
release(msg);
}
}
byte[] bs = new byte[buf.readableBytes()];
buf.readBytes(bs);
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // (4)
logger.error("发生异常:{}", cause);
//cause.printStackTrace();
}
// 保存原始字节用于记录
this.currentRawBytes = bs;
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
Session session = Session.buildSession(ctx.channel());
sessionManager.put(session.getId(), session);
logger.debug("终端连接:{}", session);
}
// 字节数据转换为针对于808消息结构的实体类
DataPack pkg = this.decoder.bytes2PackageData(bs);
// 引用channel,以便回送数据给硬件
pkg.setChannel(ctx.channel());
processPackageData(pkg);
} catch (Exception e) {
// TODO: handle exception
logger.error("消息处理异常", e);
} finally {
release(msg);
}
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
final String sessionId = ctx.channel().id().asLongText();
Session session = sessionManager.findBySessionId(sessionId);
this.sessionManager.removeBySessionId(sessionId);
logger.debug("终端断开连接:{}", session);
ctx.channel().close();
// ctx.close();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // (4)
logger.error("发生异常:{}", cause);
// cause.printStackTrace();
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (IdleStateEvent.class.isAssignableFrom(evt.getClass())) {
IdleStateEvent event = (IdleStateEvent) evt;
if (event.state() == IdleState.READER_IDLE) {
Session session = this.sessionManager.removeBySessionId(Session.buildId(ctx.channel()));
logger.error("服务器主动断开连接:{}", session);
ctx.close();
}
}
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
Session session = Session.buildSession(ctx.channel());
sessionManager.put(session.getId(), session);
logger.debug("终端连接:{}", session);
}
private void release(Object msg) {
try {
ReferenceCountUtil.release(msg);
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
final String sessionId = ctx.channel().id().asLongText();
Session session = sessionManager.findBySessionId(sessionId);
this.sessionManager.removeBySessionId(sessionId);
logger.debug("终端断开连接:{}", session);
ctx.channel().close();
// ctx.close();
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (IdleStateEvent.class.isAssignableFrom(evt.getClass())) {
IdleStateEvent event = (IdleStateEvent) evt;
if (event.state() == IdleState.READER_IDLE) {
Session session = this.sessionManager.removeBySessionId(Session.buildId(ctx.channel()));
logger.error("服务器主动断开连接:{}", session);
ctx.close();
}
}
}
private void release(Object msg) {
try {
ReferenceCountUtil.release(msg);
} catch (Exception e) {
e.printStackTrace();
}
}
}

View File

@@ -2,7 +2,10 @@ server:
port: 8080
jt808:
port: 20048
port: 8091
raw-data:
enabled: true
output-dir: ./raw-data
logging:
level:

View File

@@ -2,7 +2,10 @@ server:
port: 8080
jt808:
port: 20048
port: 8091
raw-data:
enabled: true
output-dir: ./raw-data
logging:
level:

View File

@@ -383,7 +383,7 @@
<div v-if="counter.latestData" class="small text-muted d-flex justify-content-between">
<span><i class="far fa-clock me-1"></i>{{ formatCounterTime(counter.latestData.time) }}</span>
<span>
<i class="fas fa-battery-half me-1"></i>Rx:{{counter.latestData.rxBat}}% / Tx:{{counter.latestData.txBat}}%
<i class="fas fa-battery-half me-1"></i>接收端电量:{{counter.latestData.rxBat}}% / 电量:{{counter.latestData.txBat}}%
</span>
</div>
</div>
@@ -576,8 +576,15 @@
headers: {'Content-Type': 'application/json'},
body: JSON.stringify(payload)
});
if (!res.ok) {
throw new Error(`HTTP ${res.status}`);
}
const result = await res.json();
if (result.code !== 200) alert('Error: ' + result.message);
if (result.code !== 200) {
alert('Error: ' + (result.message || result.msg || '未知错误'));
}
} catch (e) {
alert('发送失败: ' + e.message);
}
@@ -647,7 +654,7 @@
});
const result = await res.json();
if (result.code === 200) alert('指令已发送');
else alert('失败: ' + result.message);
else alert('失败: ' + (result.message || '未知错误'));
} else if (this.commandForm.apiType === 'text') {
// 调用 /api/v1/device/command/text
@@ -663,7 +670,7 @@
});
const result = await res.json();
if (result.code === 200) alert('指令已发送');
else alert('失败: ' + result.message);
else alert('失败: ' + (result.message || '未知错误'));
} else {
// 通用指令接口 /api/v1/device/command/send
@@ -676,7 +683,7 @@
});
const result = await res.json();
if (result.code === 200) alert('指令已发送');
else alert('失败: ' + result.message);
else alert('失败: ' + (result.message || '未知错误'));
}
} catch (e) {
alert('发送异常: ' + e.message);