[1078] 修复分包合并,优化设备列表排序,优化设备上线逻辑
This commit is contained in:
@@ -1,6 +1,7 @@
|
|||||||
package com.genersoft.iot.vmp.jt1078.bean;
|
package com.genersoft.iot.vmp.jt1078.bean;
|
||||||
|
|
||||||
import io.netty.buffer.ByteBuf;
|
import io.netty.buffer.ByteBuf;
|
||||||
|
import io.netty.buffer.ByteBufUtil;
|
||||||
import io.swagger.v3.oas.annotations.media.Schema;
|
import io.swagger.v3.oas.annotations.media.Schema;
|
||||||
import lombok.Data;
|
import lombok.Data;
|
||||||
|
|
||||||
@@ -37,10 +38,9 @@ public class JTMediaEventInfo {
|
|||||||
jtMediaEventInfo.setCode(buf.readUnsignedByte());
|
jtMediaEventInfo.setCode(buf.readUnsignedByte());
|
||||||
jtMediaEventInfo.setEventCode(buf.readUnsignedByte());
|
jtMediaEventInfo.setEventCode(buf.readUnsignedByte());
|
||||||
jtMediaEventInfo.setChannelId(buf.readUnsignedByte());
|
jtMediaEventInfo.setChannelId(buf.readUnsignedByte());
|
||||||
if (buf.readableBytes() > 0) {
|
if (buf.readableBytes() > 28) {
|
||||||
ByteBuf byteBuf = buf.readSlice(28);
|
ByteBuf byteBuf = buf.readSlice(28);
|
||||||
jtMediaEventInfo.setPositionBaseInfo(JTPositionBaseInfo.decode(byteBuf));
|
jtMediaEventInfo.setPositionBaseInfo(JTPositionBaseInfo.decode(byteBuf));
|
||||||
|
|
||||||
byte[] bytes = new byte[buf.readableBytes()];
|
byte[] bytes = new byte[buf.readableBytes()];
|
||||||
buf.readBytes(bytes);
|
buf.readBytes(bytes);
|
||||||
jtMediaEventInfo.setMediaData(bytes);
|
jtMediaEventInfo.setMediaData(bytes);
|
||||||
|
|||||||
@@ -43,10 +43,11 @@ public class Jt808Decoder extends ByteToMessageDecoder {
|
|||||||
Map<String, List<String>> dumpMap = new ConcurrentHashMap<>();
|
Map<String, List<String>> dumpMap = new ConcurrentHashMap<>();
|
||||||
@Override
|
@Override
|
||||||
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
|
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
|
||||||
|
in.retain();
|
||||||
Session session = ctx.channel().attr(Session.KEY).get();
|
Session session = ctx.channel().attr(Session.KEY).get();
|
||||||
log.info("> {} hex: 7e{}7e", session, ByteBufUtil.hexDump(in));
|
log.info("> {} hex: 7e{}7e", session, ByteBufUtil.hexDump(in));
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
// 按照部标定义执行校验和转义
|
||||||
ByteBuf buf = unEscapeAndCheck(in);
|
ByteBuf buf = unEscapeAndCheck(in);
|
||||||
|
|
||||||
Header header = new Header();
|
Header header = new Header();
|
||||||
@@ -65,34 +66,20 @@ public class Jt808Decoder extends ByteToMessageDecoder {
|
|||||||
if (isSubpackage) {
|
if (isSubpackage) {
|
||||||
int packageCount = buf.readUnsignedShort();
|
int packageCount = buf.readUnsignedShort();
|
||||||
int packageNumber = buf.readUnsignedShort();
|
int packageNumber = buf.readUnsignedShort();
|
||||||
// List<String> strings = dumpMap.get(header.getPhoneNumber());
|
log.debug("[分包消息] header: {}, 序号: {}, 总数: {}", header, packageNumber, packageCount);
|
||||||
// if (strings == null) {
|
// 缓存带合并的分包消息
|
||||||
// strings = new ArrayList<>();
|
ByteBuf intactBuf = MultiPacketManager.INSTANCE.add(header, packageCount, buf);
|
||||||
// }
|
if (intactBuf == null) {
|
||||||
// strings.add(dump);
|
|
||||||
// if (strings.size() == packageCount) {
|
|
||||||
// for (int i = 0; i < strings.size(); i++) {
|
|
||||||
// if (i == strings.size() - 1) {
|
|
||||||
// System.out.println(strings.get(i));
|
|
||||||
// }else {
|
|
||||||
// System.out.print(strings.get(i));
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
MultiPacket multiPacket = MultiPacket.getInstance(header, packageNumber, packageCount, buf);
|
|
||||||
ByteBuf intactBuf = MultiPacketManager.INSTANCE.add(multiPacket);
|
|
||||||
if (intactBuf != null) {
|
|
||||||
buf = intactBuf;
|
|
||||||
}else {
|
|
||||||
in.skipBytes(in.readableBytes());
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
buf = intactBuf;
|
||||||
}
|
}
|
||||||
Re handler = CodecFactory.getHandler(header.getMsgId());
|
Re handler = CodecFactory.getHandler(header.getMsgId());
|
||||||
if (handler == null) {
|
if (handler == null) {
|
||||||
log.error("get msgId is null {}", header.getMsgId());
|
log.error("get msgId is null {}", header.getMsgId());
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
buf.retain();
|
||||||
Rs decode = handler.decode(buf, header, session, service);
|
Rs decode = handler.decode(buf, header, session, service);
|
||||||
ApplicationEvent applicationEvent = handler.getEvent();
|
ApplicationEvent applicationEvent = handler.getEvent();
|
||||||
if (applicationEvent != null) {
|
if (applicationEvent != null) {
|
||||||
@@ -167,7 +154,7 @@ public class Jt808Decoder extends ByteToMessageDecoder {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (calculationCheckSum == checkSum) {
|
if (calculationCheckSum == checkSum) {
|
||||||
if (bufList.size() == 0) {
|
if (bufList.isEmpty()) {
|
||||||
return byteBuf.slice(low, high);
|
return byteBuf.slice(low, high);
|
||||||
} else {
|
} else {
|
||||||
bufList.add(byteBuf.slice(low, high - low));
|
bufList.add(byteBuf.slice(low, high - low));
|
||||||
|
|||||||
@@ -2,10 +2,12 @@ package com.genersoft.iot.vmp.jt1078.codec.decode;
|
|||||||
|
|
||||||
import com.genersoft.iot.vmp.jt1078.proc.Header;
|
import com.genersoft.iot.vmp.jt1078.proc.Header;
|
||||||
import io.netty.buffer.ByteBuf;
|
import io.netty.buffer.ByteBuf;
|
||||||
|
import lombok.Data;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 分包消息
|
* 分包消息
|
||||||
*/
|
*/
|
||||||
|
@Data
|
||||||
public class MultiPacket {
|
public class MultiPacket {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -33,7 +35,15 @@ public class MultiPacket {
|
|||||||
*/
|
*/
|
||||||
private ByteBuf byteBuf;
|
private ByteBuf byteBuf;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 消息内容
|
||||||
|
*/
|
||||||
|
private String bufStr;
|
||||||
|
|
||||||
public static MultiPacket getInstance(Header header, Integer number, Integer count, ByteBuf byteBuf) {
|
public static MultiPacket getInstance(Header header, Integer number, Integer count, ByteBuf byteBuf) {
|
||||||
|
if (byteBuf == null) {
|
||||||
|
System.err.println("MultiPacket error byteBuf is null");
|
||||||
|
}
|
||||||
MultiPacket multiPacket = new MultiPacket();
|
MultiPacket multiPacket = new MultiPacket();
|
||||||
multiPacket.setHeader(header);
|
multiPacket.setHeader(header);
|
||||||
multiPacket.setNumber(number);
|
multiPacket.setNumber(number);
|
||||||
@@ -43,46 +53,6 @@ public class MultiPacket {
|
|||||||
return multiPacket;
|
return multiPacket;
|
||||||
}
|
}
|
||||||
|
|
||||||
public Header getHeader() {
|
|
||||||
return header;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setHeader(Header header) {
|
|
||||||
this.header = header;
|
|
||||||
}
|
|
||||||
|
|
||||||
public Integer getNumber() {
|
|
||||||
return number;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setNumber(Integer number) {
|
|
||||||
this.number = number;
|
|
||||||
}
|
|
||||||
|
|
||||||
public Integer getCount() {
|
|
||||||
return count;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setCount(Integer count) {
|
|
||||||
this.count = count;
|
|
||||||
}
|
|
||||||
|
|
||||||
public Long getCreateTime() {
|
|
||||||
return createTime;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setCreateTime(Long createTime) {
|
|
||||||
this.createTime = createTime;
|
|
||||||
}
|
|
||||||
|
|
||||||
public ByteBuf getByteBuf() {
|
|
||||||
return byteBuf;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setByteBuf(ByteBuf byteBuf) {
|
|
||||||
this.byteBuf = byteBuf;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return "MultiPacket{" +
|
return "MultiPacket{" +
|
||||||
|
|||||||
@@ -1,8 +1,7 @@
|
|||||||
package com.genersoft.iot.vmp.jt1078.codec.decode;
|
package com.genersoft.iot.vmp.jt1078.codec.decode;
|
||||||
|
|
||||||
import io.netty.buffer.ByteBuf;
|
import com.genersoft.iot.vmp.jt1078.proc.Header;
|
||||||
import io.netty.buffer.CompositeByteBuf;
|
import io.netty.buffer.*;
|
||||||
import io.netty.buffer.Unpooled;
|
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
@@ -14,7 +13,7 @@ public enum MultiPacketManager {
|
|||||||
private final static Logger logger = LoggerFactory.getLogger(MultiPacketManager.class);
|
private final static Logger logger = LoggerFactory.getLogger(MultiPacketManager.class);
|
||||||
|
|
||||||
// 用与消息的缓存
|
// 用与消息的缓存
|
||||||
private final Map<String, List<MultiPacket>> packetMap = new ConcurrentHashMap<>();
|
private final Map<String, CompositeByteBuf> packetMap = new ConcurrentHashMap<>();
|
||||||
private final Map<String, Long> packetTimeMap = new ConcurrentHashMap<>();
|
private final Map<String, Long> packetTimeMap = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
MultiPacketManager() {
|
MultiPacketManager() {
|
||||||
@@ -24,23 +23,17 @@ public enum MultiPacketManager {
|
|||||||
/**
|
/**
|
||||||
* 增加待合并的分包,如果分包接受完毕会返回完整的数据包
|
* 增加待合并的分包,如果分包接受完毕会返回完整的数据包
|
||||||
*/
|
*/
|
||||||
public ByteBuf add(MultiPacket packet) {
|
public ByteBuf add(Header header, Integer count, ByteBuf byteBuf) {
|
||||||
String key = packet.getHeader().getMsgId() + "/" + packet.getHeader().getPhoneNumber();
|
String key = header.getMsgId() + "/" + header.getPhoneNumber();
|
||||||
logger.info("分包消息: \n{}", packet);
|
CompositeByteBuf compositeBuf = packetMap.computeIfAbsent(key, k -> new CompositeByteBuf(UnpooledByteBufAllocator.DEFAULT, false, count));
|
||||||
List<MultiPacket> multiPackets = packetMap.computeIfAbsent(key, k -> new ArrayList<>(packet.getCount()));
|
// compositeBuf.addComponent(true, byteBuf.readSlice(byteBuf.readableBytes()));
|
||||||
multiPackets.add(packet);
|
compositeBuf.addComponent(true, byteBuf);
|
||||||
packetTimeMap.put(key, System.currentTimeMillis());
|
packetTimeMap.put(key, System.currentTimeMillis());
|
||||||
if (packet.getCount() == multiPackets.size()) {
|
if (count == compositeBuf.numComponents()) {
|
||||||
logger.info("分包接受完毕: \n{}", packet.getHeader());
|
|
||||||
// 所有分包接收完毕,排序后返回
|
|
||||||
multiPackets.sort(Comparator.comparing(MultiPacket::getNumber));
|
|
||||||
ByteBuf byteBuf = Unpooled.buffer();
|
|
||||||
for (MultiPacket multiPacket : multiPackets) {
|
|
||||||
byteBuf.writeBytes(multiPacket.getByteBuf());
|
|
||||||
}
|
|
||||||
packetMap.remove(key);
|
packetMap.remove(key);
|
||||||
packetTimeMap.remove(key);
|
packetTimeMap.remove(key);
|
||||||
return byteBuf;
|
compositeBuf.retain();
|
||||||
|
return compositeBuf;
|
||||||
}
|
}
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -49,7 +49,7 @@ public interface JTTerminalMapper {
|
|||||||
")</if> " +
|
")</if> " +
|
||||||
" <if test='online == true' > AND jd.status= true</if>" +
|
" <if test='online == true' > AND jd.status= true</if>" +
|
||||||
" <if test='online == false' > AND jd.status= false</if>" +
|
" <if test='online == false' > AND jd.status= false</if>" +
|
||||||
"ORDER BY jd.update_time " +
|
"ORDER BY jd.create_time " +
|
||||||
" </script>"})
|
" </script>"})
|
||||||
List<JTDevice> getDeviceList(@Param("query") String query, @Param("online") Boolean online);
|
List<JTDevice> getDeviceList(@Param("query") String query, @Param("online") Boolean online);
|
||||||
|
|
||||||
|
|||||||
@@ -48,6 +48,10 @@ public class J0102 extends Re {
|
|||||||
j8001.setResult(J8001.FAIL);
|
j8001.setResult(J8001.FAIL);
|
||||||
}else {
|
}else {
|
||||||
j8001.setResult(J8001.SUCCESS);
|
j8001.setResult(J8001.SUCCESS);
|
||||||
|
if (!device.isStatus()) {
|
||||||
|
device.setStatus(true);
|
||||||
|
service.updateDevice(device);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return j8001;
|
return j8001;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -33,62 +33,20 @@ public class J0801 extends Re {
|
|||||||
protected Rs decode0(ByteBuf buf, Header header, Session session) {
|
protected Rs decode0(ByteBuf buf, Header header, Session session) {
|
||||||
JTMediaEventInfo mediaEventInfo = JTMediaEventInfo.decode(buf);
|
JTMediaEventInfo mediaEventInfo = JTMediaEventInfo.decode(buf);
|
||||||
log.info("[JT-多媒体数据上传]: {}", mediaEventInfo);
|
log.info("[JT-多媒体数据上传]: {}", mediaEventInfo);
|
||||||
|
|
||||||
|
|
||||||
try {
|
try {
|
||||||
int width = 800;
|
if (mediaEventInfo.getMediaData() != null) {
|
||||||
int height = 600;
|
File file = new File("/home/lin/source.jpg");
|
||||||
BufferedImage image1 = new BufferedImage(width, height, BufferedImage.TYPE_INT_RGB);
|
if (file.exists()) {
|
||||||
File file1 = new File("/home/lin/1.jpg");
|
file.delete();
|
||||||
ImageIO.write(image1, "jpg", file1);
|
}
|
||||||
|
FileOutputStream fileOutputStream = new FileOutputStream(file);
|
||||||
|
fileOutputStream.write(mediaEventInfo.getMediaData());
|
||||||
BufferedImage image2 = new BufferedImage(width, height, 2);
|
fileOutputStream.flush();
|
||||||
File file2 = new File("/home/lin/2.jpg");
|
fileOutputStream.close();
|
||||||
ImageIO.write(image2, "jpg", file2);
|
}
|
||||||
|
|
||||||
|
|
||||||
BufferedImage image3 = new BufferedImage(width, height, 3);
|
|
||||||
File file3 = new File("/home/lin/3.jpg");
|
|
||||||
ImageIO.write(image3, "jpg", file3);
|
|
||||||
|
|
||||||
|
|
||||||
BufferedImage image4 = new BufferedImage(width, height, 4);
|
|
||||||
File file4 = new File("/home/lin/4.jpg");
|
|
||||||
ImageIO.write(image4, "jpg", file4);
|
|
||||||
|
|
||||||
|
|
||||||
BufferedImage image5 = new BufferedImage(width, height, 5);
|
|
||||||
File file5 = new File("/home/lin/5.jpg");
|
|
||||||
ImageIO.write(image5, "jpg", file5);
|
|
||||||
|
|
||||||
|
|
||||||
BufferedImage image6 = new BufferedImage(width, height, 6);
|
|
||||||
File file6 = new File("/home/lin/6.jpg");
|
|
||||||
ImageIO.write(image6, "jpg", file6);
|
|
||||||
|
|
||||||
|
|
||||||
BufferedImage image7 = new BufferedImage(width, height, 7);
|
|
||||||
File file7 = new File("/home/lin/7.jpg");
|
|
||||||
ImageIO.write(image7, "jpg", file7);
|
|
||||||
|
|
||||||
|
|
||||||
BufferedImage image8 = new BufferedImage(width, height, 8);
|
|
||||||
File file8 = new File("/home/lin/8.jpg");
|
|
||||||
ImageIO.write(image8, "jpg", file8);
|
|
||||||
|
|
||||||
File file = new File("/home/lin/source.jpg");
|
|
||||||
FileOutputStream fileOutputStream = new FileOutputStream(file);
|
|
||||||
// fileOutputStream.write(0xFF);
|
|
||||||
// fileOutputStream.write(0xD8);
|
|
||||||
// fileOutputStream.write(0xFF);
|
|
||||||
fileOutputStream.write(mediaEventInfo.getMediaData());
|
|
||||||
fileOutputStream.flush();
|
|
||||||
fileOutputStream.close();
|
|
||||||
}catch (Exception e) {
|
}catch (Exception e) {
|
||||||
log.error("[JT-多媒体数据上传] 写入文件异常", e);
|
log.error("[JT-多媒体数据上传] 写入文件异常", e);
|
||||||
}
|
}
|
||||||
|
|
||||||
SessionManager.INSTANCE.response(header.getPhoneNumber(), "0801", null, mediaEventInfo);
|
SessionManager.INSTANCE.response(header.getPhoneNumber(), "0801", null, mediaEventInfo);
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -37,7 +37,6 @@ public abstract class Re {
|
|||||||
if (rs != null) {
|
if (rs != null) {
|
||||||
rs.setHeader(header);
|
rs.setHeader(header);
|
||||||
}
|
}
|
||||||
|
|
||||||
return rs;
|
return rs;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -108,7 +108,6 @@ public class J8103 extends Rs {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
System.out.println(ByteBufUtil.hexDump(buffer));
|
|
||||||
return buffer;
|
return buffer;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -732,12 +732,13 @@ public class jt1078ServiceImpl implements Ijt1078Service {
|
|||||||
public void snap(String phoneNumber, int channelId, ServletOutputStream outputStream) {
|
public void snap(String phoneNumber, int channelId, ServletOutputStream outputStream) {
|
||||||
J8801 j8801 = new J8801();
|
J8801 j8801 = new J8801();
|
||||||
|
|
||||||
|
// 设置抓图默认参数
|
||||||
JTShootingCommand shootingCommand = new JTShootingCommand();
|
JTShootingCommand shootingCommand = new JTShootingCommand();
|
||||||
shootingCommand.setChanelId(channelId);
|
shootingCommand.setChanelId(channelId);
|
||||||
shootingCommand.setCommand(3);
|
shootingCommand.setCommand(1);
|
||||||
shootingCommand.setTime(0);
|
shootingCommand.setTime(0);
|
||||||
shootingCommand.setSave(0);
|
shootingCommand.setSave(0);
|
||||||
shootingCommand.setResolvingPower(0x01);
|
shootingCommand.setResolvingPower(0xFF);
|
||||||
shootingCommand.setQuality(1);
|
shootingCommand.setQuality(1);
|
||||||
shootingCommand.setBrightness(125);
|
shootingCommand.setBrightness(125);
|
||||||
shootingCommand.setContrastRatio(60);
|
shootingCommand.setContrastRatio(60);
|
||||||
@@ -761,13 +762,8 @@ public class jt1078ServiceImpl implements Ijt1078Service {
|
|||||||
}
|
}
|
||||||
log.info("[JT-抓图] 图片上传完成,抓图编号: {}, 设备编号: {}, 通道编号: {}", ids.get(0), phoneNumber, channelId);
|
log.info("[JT-抓图] 图片上传完成,抓图编号: {}, 设备编号: {}, 通道编号: {}", ids.get(0), phoneNumber, channelId);
|
||||||
try {
|
try {
|
||||||
if (outputStream.isReady()) {
|
outputStream.write(mediaEventInfo.getMediaData());
|
||||||
outputStream.write(mediaEventInfo.getMediaData());
|
outputStream.flush();
|
||||||
outputStream.flush();
|
|
||||||
}else {
|
|
||||||
log.info("[JT-抓图] 请求可能已经结束,抓图编号: {}, 设备编号: {}, 通道编号: {}", ids.get(0), phoneNumber, channelId);
|
|
||||||
}
|
|
||||||
|
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
log.info("[JT-抓图] 数据写入异常,抓图编号: {}, 设备编号: {}, 通道编号: {}", ids.get(0), phoneNumber, channelId, e);
|
log.info("[JT-抓图] 数据写入异常,抓图编号: {}, 设备编号: {}, 通道编号: {}", ids.get(0), phoneNumber, channelId, e);
|
||||||
throw new ControllerException(ErrorCode.ERROR100.getCode(), "数据写入异常");
|
throw new ControllerException(ErrorCode.ERROR100.getCode(), "数据写入异常");
|
||||||
|
|||||||
Reference in New Issue
Block a user