1078-支持分包消息

This commit is contained in:
648540858
2024-04-20 13:03:39 +08:00
parent 50e2565643
commit 975e94eee1
6 changed files with 160 additions and 9 deletions

View File

@@ -93,6 +93,8 @@ public class JTDeviceConfig {
@ConfigAttribute(id = 0x28, description = "紧急报警时汇报时间间隔 单位为秒 值大于0")
private Long reportingIntervalEmergencyAlarm;
// TODO 未完待续
public Long getKeepaliveInterval() {

View File

@@ -49,7 +49,6 @@ public class Jt808Decoder extends ByteToMessageDecoder {
header.setMsgPro(buf.readUnsignedShort());
// 从消息属性中读取是否存在分包
boolean isSubpackage = (header.getMsgPro() >>> 13 & 1) == 1;
System.out.println("是否存在分包: " + isSubpackage);
if (header.is2019Version()) {
header.setVersion(buf.readUnsignedByte());
@@ -63,8 +62,14 @@ public class Jt808Decoder extends ByteToMessageDecoder {
if (isSubpackage) {
int packageCount = buf.readUnsignedShort();
int packageNumber = buf.readUnsignedShort();
System.out.println("消息总包数: " + packageCount);
System.out.println("包序号: " + packageNumber);
MultiPacket multiPacket = MultiPacket.getInstance(header, packageNumber, packageCount, buf);
ByteBuf intactBuf = MultiPacketManager.INSTANCE.add(multiPacket);
if (intactBuf != null) {
buf = intactBuf;
}else {
return;
}
}
Re handler = CodecFactory.getHandler(header.getMsgId());
if (handler == null) {

View File

@@ -0,0 +1,85 @@
package com.genersoft.iot.vmp.jt1078.codec.decode;
import com.genersoft.iot.vmp.jt1078.proc.Header;
import io.netty.buffer.ByteBuf;
/**
* 分包消息
*/
public class MultiPacket {
/**
* 消息头
*/
private Header header;
/**
* 包序号
*/
private Integer number;
/**
* 分包数量
*/
private Integer count;
/**
* 创建时间
*/
private Long createTime;
/**
* 消息内容
*/
private ByteBuf byteBuf;
public static MultiPacket getInstance(Header header, Integer number, Integer count, ByteBuf byteBuf) {
MultiPacket multiPacket = new MultiPacket();
multiPacket.setHeader(header);
multiPacket.setNumber(number);
multiPacket.setCount(count);
multiPacket.setCreateTime(System.currentTimeMillis());
multiPacket.setByteBuf(byteBuf);
return multiPacket;
}
public Header getHeader() {
return header;
}
public void setHeader(Header header) {
this.header = header;
}
public Integer getNumber() {
return number;
}
public void setNumber(Integer number) {
this.number = number;
}
public Integer getCount() {
return count;
}
public void setCount(Integer count) {
this.count = count;
}
public Long getCreateTime() {
return createTime;
}
public void setCreateTime(Long createTime) {
this.createTime = createTime;
}
public ByteBuf getByteBuf() {
return byteBuf;
}
public void setByteBuf(ByteBuf byteBuf) {
this.byteBuf = byteBuf;
}
}

View File

@@ -0,0 +1,64 @@
package com.genersoft.iot.vmp.jt1078.codec.decode;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
public enum MultiPacketManager {
INSTANCE;
private final static Logger logger = LoggerFactory.getLogger(MultiPacketManager.class);
// 用与消息的缓存
private final Map<String, List<MultiPacket>> packetMap = new ConcurrentHashMap<>();
private final Map<String, Long> packetTimeMap = new ConcurrentHashMap<>();
MultiPacketManager() {
startLister();
}
/**
* 增加待合并的分包,如果分包接受完毕会返回完整的数据包
*/
public ByteBuf add(MultiPacket packet) {
String key = packet.getHeader().getTerminalId() + "/" + packet.getHeader().getSn();
List<MultiPacket> multiPackets = packetMap.computeIfAbsent(key, k -> new ArrayList<>(packet.getCount()));
multiPackets.add(packet);
packetTimeMap.put(key, System.currentTimeMillis());
if (packet.getCount() == multiPackets.size()) {
// 所有分包接收完毕,排序后返回
multiPackets.sort(Comparator.comparing(MultiPacket::getNumber));
ByteBuf byteBuf = Unpooled.buffer();
for (MultiPacket multiPacket : multiPackets) {
byteBuf.writeBytes(multiPacket.getByteBuf());
}
packetMap.remove(key);
packetTimeMap.remove(key);
return byteBuf;
}
return null;
}
private void startLister(){
Timer timer = new Timer();
timer.schedule(new TimerTask() {
@Override
public void run() {
long expireTime = System.currentTimeMillis() - 2 * 1000;
if (!packetTimeMap.isEmpty()) {
for (String key : packetTimeMap.keySet()) {
if (packetTimeMap.get(key) < expireTime) {
logger.info("分包消息超时 key: {}", key);
packetTimeMap.remove(key);
packetMap.remove(key);
}
}
}
}
}, 2000L, 2000L);
}
}

View File

@@ -14,7 +14,7 @@ public class Header {
// 消息体属性
Integer msgPro;
// 标识
// 终端手机号
String terminalId;
// 消息体流水号

View File

@@ -31,9 +31,7 @@ public class J0104 extends Re {
@Override
protected Rs decode0(ByteBuf buf, Header header, Session session) {
respNo = buf.readUnsignedShort();
System.err.println("应答流水号: " + respNo);
paramLength = (int) buf.readUnsignedByte();
System.err.println("参数项个数: " + paramLength);
if (paramLength <= 0) {
return null;
}
@@ -47,7 +45,6 @@ public class J0104 extends Re {
allFieldMap.put(configAttribute.id(), field);
}
}
System.out.println("========");
for (int i = 0; i < paramLength; i++) {
long id = buf.readUnsignedInt();
short length = buf.readUnsignedByte();
@@ -73,8 +70,6 @@ public class J0104 extends Re {
}
}
System.out.println(deviceConfig.toString());
return null;
}