From 1c55bc3fd16e78c77e8ad3877ecbd933ceb5c449 Mon Sep 17 00:00:00 2001 From: lin <648540858@qq.com> Date: Thu, 19 Jun 2025 15:12:47 +0800 Subject: [PATCH] =?UTF-8?q?=E9=A2=84=E7=BD=AE=E4=BD=8D=E6=9F=A5=E8=AF=A2?= =?UTF-8?q?=E6=94=AF=E6=8C=81=E5=A4=9A=E6=AE=B5=E8=BF=94=E5=9B=9E?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../vmp/gb28181/bean/MessageResponseTask.java | 43 ++++++++++ .../transmit/cmd/impl/SIPCommander.java | 4 +- .../impl/message/MessageHandlerAbstract.java | 25 ++++++ .../response/ResponseMessageHandler.java | 24 +----- .../PresetQueryResponseMessageHandler.java | 80 ++++++++++++++++++- 5 files changed, 147 insertions(+), 29 deletions(-) create mode 100644 src/main/java/com/genersoft/iot/vmp/gb28181/bean/MessageResponseTask.java diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/MessageResponseTask.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/MessageResponseTask.java new file mode 100644 index 000000000..cb3dfb0d4 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/MessageResponseTask.java @@ -0,0 +1,43 @@ +package com.genersoft.iot.vmp.gb28181.bean; + +import lombok.Getter; +import lombok.Setter; +import org.dom4j.Element; +import org.jetbrains.annotations.NotNull; + +import java.util.List; +import java.util.concurrent.Delayed; +import java.util.concurrent.TimeUnit; + +public class MessageResponseTask implements Delayed { + + @Getter + @Setter + private Element element; + + @Getter + @Setter + private List data; + + @Getter + @Setter + private String key; + + + /** + * 超时时间(单位: 毫秒) + */ + @Getter + @Setter + private long delayTime; + + @Override + public long getDelay(@NotNull TimeUnit unit) { + return unit.convert(delayTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS); + } + + @Override + public int compareTo(@NotNull Delayed o) { + return (int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS)); + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java index fd53e18e9..2adc337f1 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java @@ -1136,9 +1136,9 @@ public class SIPCommander implements ISIPCommander { } cmdXml.append("\r\n"); - MessageEvent messageEvent = MessageEvent.getInstance(cmdType, sn + "", channelId, 1000L, callback); + MessageEvent messageEvent = MessageEvent.getInstance(cmdType, sn + "", channelId, 4000L, callback); messageSubscribe.addSubscribe(messageEvent); - + log.info("[预置位查询] 设备编号: {}, 通道编号: {}, SN: {}", device.getDeviceId(), channelId, sn); Request request = headerProvider.createMessageRequest(device, cmdXml.toString(), null, SipUtils.getNewFromTag(), null,sipSender.getNewCallIdHeader(sipLayer.getLocalIp(device.getLocalIp()),device.getTransport())); sipSender.transmitRequest(sipLayer.getLocalIp(device.getLocalIp()), request, eventResult -> { messageSubscribe.removeSubscribe(messageEvent.getKey()); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/MessageHandlerAbstract.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/MessageHandlerAbstract.java index 9dbf4a4ed..7a743d14d 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/MessageHandlerAbstract.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/MessageHandlerAbstract.java @@ -2,9 +2,12 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message; import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.bean.Platform; +import com.genersoft.iot.vmp.gb28181.event.MessageSubscribe; +import com.genersoft.iot.vmp.gb28181.event.sip.MessageEvent; import com.genersoft.iot.vmp.gb28181.service.IPlatformService; import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent; import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.query.cmd.CatalogQueryMessageHandler; +import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; import gov.nist.javax.sip.message.SIPRequest; import lombok.extern.slf4j.Slf4j; import org.dom4j.Element; @@ -28,6 +31,9 @@ public abstract class MessageHandlerAbstract extends SIPRequestProcessorParent i @Autowired private IPlatformService platformService; + @Autowired + private MessageSubscribe messageSubscribe; + public void addHandler(String cmdType, IMessageHandler messageHandler) { messageHandlerMap.put(cmdType, messageHandler); } @@ -54,6 +60,8 @@ public abstract class MessageHandlerAbstract extends SIPRequestProcessorParent i return; } messageHandler.handForDevice(evt, device, element); + }else { + handMessageEvent(element, null); } } @@ -65,4 +73,21 @@ public abstract class MessageHandlerAbstract extends SIPRequestProcessorParent i messageHandler.handForPlatform(evt, parentPlatform, element); } } + + + public void handMessageEvent(Element element, Object data) { + + String cmd = getText(element, "CmdType"); + String sn = getText(element, "SN"); + MessageEvent subscribe = (MessageEvent)messageSubscribe.getSubscribe(cmd + sn); + if (subscribe != null && subscribe.getCallback() != null) { + String result = getText(element, "Result"); + if (result == null || "OK".equalsIgnoreCase(result) || data != null) { + subscribe.getCallback().run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), data); + }else { + subscribe.getCallback().run(ErrorCode.ERROR100.getCode(), ErrorCode.ERROR100.getMsg(), result); + } + messageSubscribe.removeSubscribe(cmd + sn); + } + } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/ResponseMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/ResponseMessageHandler.java index 6a1349d44..163288ea7 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/ResponseMessageHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/ResponseMessageHandler.java @@ -1,11 +1,8 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.response; import com.genersoft.iot.vmp.gb28181.bean.Device; -import com.genersoft.iot.vmp.gb28181.event.MessageSubscribe; -import com.genersoft.iot.vmp.gb28181.event.sip.MessageEvent; import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.MessageHandlerAbstract; import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.MessageRequestProcessor; -import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; import org.dom4j.Element; import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Autowired; @@ -13,8 +10,6 @@ import org.springframework.stereotype.Component; import javax.sip.RequestEvent; -import static com.genersoft.iot.vmp.gb28181.utils.XmlUtil.getText; - /** * 命令类型: 请求动作的应答 * 命令类型: 设备控制, 报警通知, 设备目录信息查询, 目录信息查询, 目录收到, 设备信息查询, 设备状态信息查询 ...... @@ -27,8 +22,7 @@ public class ResponseMessageHandler extends MessageHandlerAbstract implements In @Autowired private MessageRequestProcessor messageRequestProcessor; - @Autowired - private MessageSubscribe messageSubscribe; + @Override public void afterPropertiesSet() throws Exception { @@ -38,21 +32,5 @@ public class ResponseMessageHandler extends MessageHandlerAbstract implements In @Override public void handForDevice(RequestEvent evt, Device device, Element element) { super.handForDevice(evt, device, element); - handMessageEvent(element, null); - } - - public void handMessageEvent(Element element, Object data) { - String cmd = getText(element, "CmdType"); - String sn = getText(element, "SN"); - MessageEvent subscribe = (MessageEvent)messageSubscribe.getSubscribe(cmd + sn); - if (subscribe != null && subscribe.getCallback() != null) { - String result = getText(element, "Result"); - if (result == null || "OK".equalsIgnoreCase(result) || data != null) { - subscribe.getCallback().run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), data); - }else { - subscribe.getCallback().run(ErrorCode.ERROR100.getCode(), ErrorCode.ERROR100.getMsg(), result); - } - messageSubscribe.removeSubscribe(cmd + sn); - } } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/PresetQueryResponseMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/PresetQueryResponseMessageHandler.java index 5e154f6e7..51ecafc1c 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/PresetQueryResponseMessageHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/PresetQueryResponseMessageHandler.java @@ -1,9 +1,9 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.response.cmd; import com.genersoft.iot.vmp.gb28181.bean.Device; +import com.genersoft.iot.vmp.gb28181.bean.MessageResponseTask; import com.genersoft.iot.vmp.gb28181.bean.Platform; import com.genersoft.iot.vmp.gb28181.bean.Preset; -import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder; import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent; import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.IMessageHandler; import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.response.ResponseMessageHandler; @@ -13,6 +13,7 @@ import org.dom4j.DocumentException; import org.dom4j.Element; import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import javax.sip.InvalidArgumentException; @@ -23,6 +24,12 @@ import java.text.ParseException; import java.util.ArrayList; import java.util.Iterator; import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.DelayQueue; +import java.util.concurrent.TimeUnit; + +import static com.genersoft.iot.vmp.gb28181.utils.XmlUtil.getText; /** * 设备预置位查询应答 @@ -36,8 +43,9 @@ public class PresetQueryResponseMessageHandler extends SIPRequestProcessorParent @Autowired private ResponseMessageHandler responseMessageHandler; - @Autowired - private DeferredResultHolder deferredResultHolder; + private final Map> mesageMap = new ConcurrentHashMap<>(); + + private final DelayQueue> delayQueue = new DelayQueue<>(); @Override @@ -93,7 +101,14 @@ public class PresetQueryResponseMessageHandler extends SIPRequestProcessorParent presetQuerySipReqList.add(presetQuerySipReq); } } - responseMessageHandler.handMessageEvent(rootElement, presetQuerySipReqList); +// if (presetQuerySipReqList.size() == sumNum) { +// responseMessageHandler.handMessageEvent(rootElement, presetQuerySipReqList); +// }else { +// String sn = getText(element, "SN"); +// addCatch(cmdType + "_" + sn, rootElement, presetQuerySipReqList); +// } + String sn = getText(element, "SN"); + addCatch(cmdType + "_" + sn, sumNum, rootElement, presetQuerySipReqList); try { responseAck(request, Response.OK); } catch (InvalidArgumentException | ParseException | SipException e) { @@ -104,6 +119,63 @@ public class PresetQueryResponseMessageHandler extends SIPRequestProcessorParent } } + private void addCatch(String key, int sumNum, Element rootElement, List presetQuerySipReqList) { + if (presetQuerySipReqList.size() == sumNum) { + responseMessageHandler.handMessageEvent(rootElement, presetQuerySipReqList); + if (mesageMap.containsKey(key)) { + MessageResponseTask messageResponseTask = mesageMap.get(key); + mesageMap.remove(key); + boolean remove = delayQueue.remove(messageResponseTask); + if (!remove) { + log.info("[移除预置位查询任务] 从延时队列内移除失败: {}", key); + } + } + }else { + if (mesageMap.containsKey(key)) { + MessageResponseTask messageResponseTask = mesageMap.get(key); + List data = messageResponseTask.getData(); + data.addAll(presetQuerySipReqList); + if (data.size() == sumNum) { + responseMessageHandler.handMessageEvent(rootElement, presetQuerySipReqList); + mesageMap.remove(key); + boolean remove = delayQueue.remove(messageResponseTask); + if (!remove) { + log.info("[移除预置位查询任务] 从延时队列内移除失败: {}", key); + } + return; + } + messageResponseTask.setDelayTime(System.currentTimeMillis() + 1000); + }else { + MessageResponseTask messageResponseTask = new MessageResponseTask<>(); + messageResponseTask.setElement(rootElement); + messageResponseTask.setData(presetQuerySipReqList); + messageResponseTask.setDelayTime(System.currentTimeMillis() + 1000); + messageResponseTask.setKey(key); + mesageMap.put(key, messageResponseTask); + delayQueue.offer(messageResponseTask); + } + } + } + + // 处理过期的缓存 + @Scheduled(fixedDelay = 500, timeUnit = TimeUnit.MILLISECONDS) + public void expirationCheck(){ + while (!delayQueue.isEmpty()) { + MessageResponseTask take = null; + try { + take = delayQueue.take(); + try { + responseMessageHandler.handMessageEvent(take.getElement(), take.getData()); + mesageMap.remove(take.getKey()); + }catch (Exception e) { + log.error("[预置位查询到期] {} 到期处理时出现异常", take.getKey()); + } + } catch (InterruptedException e) { + log.error("[设备订阅任务] ", e); + } + } + } + @Override public void handForPlatform(RequestEvent evt, Platform parentPlatform, Element rootElement) {