diff --git a/src/main/java/com/genersoft/iot/vmp/jt1078/bean/JTDeviceConfig.java b/src/main/java/com/genersoft/iot/vmp/jt1078/bean/JTDeviceConfig.java index 4a0825d0b..63e39d991 100644 --- a/src/main/java/com/genersoft/iot/vmp/jt1078/bean/JTDeviceConfig.java +++ b/src/main/java/com/genersoft/iot/vmp/jt1078/bean/JTDeviceConfig.java @@ -93,6 +93,8 @@ public class JTDeviceConfig { @ConfigAttribute(id = 0x28, description = "紧急报警时汇报时间间隔 单位为秒 值大于0") private Long reportingIntervalEmergencyAlarm; + + // TODO 未完待续 public Long getKeepaliveInterval() { diff --git a/src/main/java/com/genersoft/iot/vmp/jt1078/codec/decode/Jt808Decoder.java b/src/main/java/com/genersoft/iot/vmp/jt1078/codec/decode/Jt808Decoder.java index 1b7739b10..92011283f 100644 --- a/src/main/java/com/genersoft/iot/vmp/jt1078/codec/decode/Jt808Decoder.java +++ b/src/main/java/com/genersoft/iot/vmp/jt1078/codec/decode/Jt808Decoder.java @@ -49,7 +49,6 @@ public class Jt808Decoder extends ByteToMessageDecoder { header.setMsgPro(buf.readUnsignedShort()); // 从消息属性中读取是否存在分包 boolean isSubpackage = (header.getMsgPro() >>> 13 & 1) == 1; - System.out.println("是否存在分包: " + isSubpackage); if (header.is2019Version()) { header.setVersion(buf.readUnsignedByte()); @@ -63,8 +62,14 @@ public class Jt808Decoder extends ByteToMessageDecoder { if (isSubpackage) { int packageCount = buf.readUnsignedShort(); int packageNumber = buf.readUnsignedShort(); - System.out.println("消息总包数: " + packageCount); - System.out.println("包序号: " + packageNumber); + MultiPacket multiPacket = MultiPacket.getInstance(header, packageNumber, packageCount, buf); + ByteBuf intactBuf = MultiPacketManager.INSTANCE.add(multiPacket); + if (intactBuf != null) { + buf = intactBuf; + }else { + return; + } + } Re handler = CodecFactory.getHandler(header.getMsgId()); if (handler == null) { diff --git a/src/main/java/com/genersoft/iot/vmp/jt1078/codec/decode/MultiPacket.java b/src/main/java/com/genersoft/iot/vmp/jt1078/codec/decode/MultiPacket.java new file mode 100644 index 000000000..4ccb25a0a --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/jt1078/codec/decode/MultiPacket.java @@ -0,0 +1,85 @@ +package com.genersoft.iot.vmp.jt1078.codec.decode; + +import com.genersoft.iot.vmp.jt1078.proc.Header; +import io.netty.buffer.ByteBuf; + +/** + * 分包消息 + */ +public class MultiPacket { + + /** + * 消息头 + */ + private Header header; + + /** + * 包序号 + */ + private Integer number; + + /** + * 分包数量 + */ + private Integer count; + + /** + * 创建时间 + */ + private Long createTime; + + /** + * 消息内容 + */ + private ByteBuf byteBuf; + + public static MultiPacket getInstance(Header header, Integer number, Integer count, ByteBuf byteBuf) { + MultiPacket multiPacket = new MultiPacket(); + multiPacket.setHeader(header); + multiPacket.setNumber(number); + multiPacket.setCount(count); + multiPacket.setCreateTime(System.currentTimeMillis()); + multiPacket.setByteBuf(byteBuf); + return multiPacket; + } + + public Header getHeader() { + return header; + } + + public void setHeader(Header header) { + this.header = header; + } + + public Integer getNumber() { + return number; + } + + public void setNumber(Integer number) { + this.number = number; + } + + public Integer getCount() { + return count; + } + + public void setCount(Integer count) { + this.count = count; + } + + public Long getCreateTime() { + return createTime; + } + + public void setCreateTime(Long createTime) { + this.createTime = createTime; + } + + public ByteBuf getByteBuf() { + return byteBuf; + } + + public void setByteBuf(ByteBuf byteBuf) { + this.byteBuf = byteBuf; + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/jt1078/codec/decode/MultiPacketManager.java b/src/main/java/com/genersoft/iot/vmp/jt1078/codec/decode/MultiPacketManager.java new file mode 100644 index 000000000..ceade85da --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/jt1078/codec/decode/MultiPacketManager.java @@ -0,0 +1,64 @@ +package com.genersoft.iot.vmp.jt1078.codec.decode; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufUtil; +import io.netty.buffer.Unpooled; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; + +public enum MultiPacketManager { + INSTANCE; + private final static Logger logger = LoggerFactory.getLogger(MultiPacketManager.class); + + // 用与消息的缓存 + private final Map> packetMap = new ConcurrentHashMap<>(); + private final Map packetTimeMap = new ConcurrentHashMap<>(); + + MultiPacketManager() { + startLister(); + } + + /** + * 增加待合并的分包,如果分包接受完毕会返回完整的数据包 + */ + public ByteBuf add(MultiPacket packet) { + String key = packet.getHeader().getTerminalId() + "/" + packet.getHeader().getSn(); + List multiPackets = packetMap.computeIfAbsent(key, k -> new ArrayList<>(packet.getCount())); + multiPackets.add(packet); + packetTimeMap.put(key, System.currentTimeMillis()); + if (packet.getCount() == multiPackets.size()) { + // 所有分包接收完毕,排序后返回 + multiPackets.sort(Comparator.comparing(MultiPacket::getNumber)); + ByteBuf byteBuf = Unpooled.buffer(); + for (MultiPacket multiPacket : multiPackets) { + byteBuf.writeBytes(multiPacket.getByteBuf()); + } + packetMap.remove(key); + packetTimeMap.remove(key); + return byteBuf; + } + return null; + } + + private void startLister(){ + Timer timer = new Timer(); + timer.schedule(new TimerTask() { + @Override + public void run() { + long expireTime = System.currentTimeMillis() - 2 * 1000; + if (!packetTimeMap.isEmpty()) { + for (String key : packetTimeMap.keySet()) { + if (packetTimeMap.get(key) < expireTime) { + logger.info("分包消息超时 key: {}", key); + packetTimeMap.remove(key); + packetMap.remove(key); + } + } + } + } + }, 2000L, 2000L); + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/jt1078/proc/Header.java b/src/main/java/com/genersoft/iot/vmp/jt1078/proc/Header.java index 20941d2e8..163db8277 100644 --- a/src/main/java/com/genersoft/iot/vmp/jt1078/proc/Header.java +++ b/src/main/java/com/genersoft/iot/vmp/jt1078/proc/Header.java @@ -14,7 +14,7 @@ public class Header { // 消息体属性 Integer msgPro; - // 标识 + // 终端手机号 String terminalId; // 消息体流水号 diff --git a/src/main/java/com/genersoft/iot/vmp/jt1078/proc/request/J0104.java b/src/main/java/com/genersoft/iot/vmp/jt1078/proc/request/J0104.java index 888c251a4..5f37b61a7 100644 --- a/src/main/java/com/genersoft/iot/vmp/jt1078/proc/request/J0104.java +++ b/src/main/java/com/genersoft/iot/vmp/jt1078/proc/request/J0104.java @@ -31,9 +31,7 @@ public class J0104 extends Re { @Override protected Rs decode0(ByteBuf buf, Header header, Session session) { respNo = buf.readUnsignedShort(); - System.err.println("应答流水号: " + respNo); paramLength = (int) buf.readUnsignedByte(); - System.err.println("参数项个数: " + paramLength); if (paramLength <= 0) { return null; } @@ -47,7 +45,6 @@ public class J0104 extends Re { allFieldMap.put(configAttribute.id(), field); } } - System.out.println("========"); for (int i = 0; i < paramLength; i++) { long id = buf.readUnsignedInt(); short length = buf.readUnsignedByte(); @@ -73,8 +70,6 @@ public class J0104 extends Re { } } - System.out.println(deviceConfig.toString()); - return null; }