1078-添加发送分包支持

This commit is contained in:
648540858
2024-04-29 16:27:01 +08:00
parent 06b27beedc
commit 91b81ceb7a
2 changed files with 51 additions and 15 deletions

View File

@@ -10,6 +10,8 @@ import io.netty.handler.codec.MessageToByteEncoder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
/**
* @author QingtaiJiang
* @date 2023/4/27 18:10
@@ -22,12 +24,12 @@ public class Jt808Encoder extends MessageToByteEncoder<Rs> {
protected void encode(ChannelHandlerContext ctx, Rs msg, ByteBuf out) throws Exception {
Session session = ctx.channel().attr(Session.KEY).get();
ByteBuf encode = Jt808EncoderCmd.encode(msg, session, session.nextSerialNo());
if(encode!=null){
log.info("< {} hex:{}", session, ByteBufUtil.hexDump(encode));
out.writeBytes(encode);
List<ByteBuf> encodeList = Jt808EncoderCmd.encode(msg, session, session.nextSerialNo());
if(encodeList!=null && !encodeList.isEmpty()){
for (ByteBuf byteBuf : encodeList) {
log.info("< {} hex:{}", session, ByteBufUtil.hexDump(byteBuf));
out.writeBytes(byteBuf);
}
}
}
}

View File

@@ -18,6 +18,7 @@ import org.slf4j.LoggerFactory;
import org.springframework.util.StringUtils;
import java.util.LinkedList;
import java.util.List;
/**
* @author QingtaiJiang
@@ -31,35 +32,63 @@ public class Jt808EncoderCmd extends MessageToByteEncoder<Cmd> {
protected void encode(ChannelHandlerContext ctx, Cmd cmd, ByteBuf out) throws Exception {
Session session = ctx.channel().attr(Session.KEY).get();
Rs msg = cmd.getRs();
ByteBuf encode = encode(msg, session, cmd.getPackageNo().intValue());
if (encode != null) {
log.info("< {} hex:{}", session, ByteBufUtil.hexDump(encode));
out.writeBytes(encode);
List<ByteBuf> encodeList = encode(msg, session, cmd.getPackageNo().intValue());
if (encodeList != null && !encodeList.isEmpty()) {
for (ByteBuf byteBuf : encodeList) {
log.info("< {} hex:{}", session, ByteBufUtil.hexDump(byteBuf));
out.writeBytes(byteBuf);
}
}
}
public static ByteBuf encode(Rs msg, Session session, Integer packageNo) {
public static List<ByteBuf> encode(Rs msg, Session session, Integer packageNo) {
String id = msg.getClass().getAnnotation(MsgId.class).id();
if (!StringUtils.hasLength(id)) {
log.error("Not find msgId");
return null;
}
ByteBuf encode = msg.encode();
Header header = msg.getHeader();
List<ByteBuf> byteBufList = new LinkedList<>();
if (encode.readableBytes() > 1024) {
int index = 1;
int total = encode.readableBytes()/1024 + 1;
while (encode.isReadable()) {
ByteBuf byteBuf;
if (index == total) {
byteBuf = buildMsgByte(header, id, session, packageNo, encode.readRetainedSlice(encode.readableBytes() - index * 1024), index, total);
}else {
byteBuf = buildMsgByte(header, id, session, packageNo, encode.readBytes(1024), index, total);
}
byteBufList.add(byteBuf);
index ++;
}
}else {
byteBufList.add(buildMsgByte(header, id, session, packageNo, encode, 0, 0));
}
return byteBufList;
}
// 分包
private static ByteBuf buildMsgByte(Header header, String id, Session session, Integer packageNo, ByteBuf encode, Integer packetIndex, Integer packetTotal) {
ByteBuf byteBuf = Unpooled.buffer();
byteBuf.writeBytes(ByteBufUtil.decodeHexDump(id));
ByteBuf encode = msg.encode();
Header header = msg.getHeader();
if (header == null) {
header = session.getHeader();
}
if (header.is2019Version()) {
int msgBody = encode.readableBytes() | 1 << 14;
if (packetIndex > 0) {
msgBody = msgBody | 1 << 13;
}
// 消息体属性
byteBuf.writeShort(encode.readableBytes() | 1 << 14);
byteBuf.writeShort(msgBody);
// 版本号
byteBuf.writeByte(header.getVersion());
@@ -76,6 +105,11 @@ public class Jt808EncoderCmd extends MessageToByteEncoder<Cmd> {
// 消息体流水号
byteBuf.writeShort(packageNo);
if (packetIndex > 0) {
byteBuf.writeShort(packetTotal);
byteBuf.writeShort(packetIndex);
}
// 写入消息体
byteBuf.writeBytes(encode);