预置位查询支持多段返回

This commit is contained in:
lin
2025-06-19 15:12:47 +08:00
parent 50df430831
commit 1c55bc3fd1
5 changed files with 147 additions and 29 deletions

View File

@@ -0,0 +1,43 @@
package com.genersoft.iot.vmp.gb28181.bean;
import lombok.Getter;
import lombok.Setter;
import org.dom4j.Element;
import org.jetbrains.annotations.NotNull;
import java.util.List;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
public class MessageResponseTask<T> implements Delayed {
@Getter
@Setter
private Element element;
@Getter
@Setter
private List<T> data;
@Getter
@Setter
private String key;
/**
* 超时时间(单位: 毫秒)
*/
@Getter
@Setter
private long delayTime;
@Override
public long getDelay(@NotNull TimeUnit unit) {
return unit.convert(delayTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
@Override
public int compareTo(@NotNull Delayed o) {
return (int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));
}
}

View File

@@ -1136,9 +1136,9 @@ public class SIPCommander implements ISIPCommander {
}
cmdXml.append("</Query>\r\n");
MessageEvent<Object> messageEvent = MessageEvent.getInstance(cmdType, sn + "", channelId, 1000L, callback);
MessageEvent<Object> messageEvent = MessageEvent.getInstance(cmdType, sn + "", channelId, 4000L, callback);
messageSubscribe.addSubscribe(messageEvent);
log.info("[预置位查询] 设备编号: {} 通道编号: {} SN {}", device.getDeviceId(), channelId, sn);
Request request = headerProvider.createMessageRequest(device, cmdXml.toString(), null, SipUtils.getNewFromTag(), null,sipSender.getNewCallIdHeader(sipLayer.getLocalIp(device.getLocalIp()),device.getTransport()));
sipSender.transmitRequest(sipLayer.getLocalIp(device.getLocalIp()), request, eventResult -> {
messageSubscribe.removeSubscribe(messageEvent.getKey());

View File

@@ -2,9 +2,12 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.Platform;
import com.genersoft.iot.vmp.gb28181.event.MessageSubscribe;
import com.genersoft.iot.vmp.gb28181.event.sip.MessageEvent;
import com.genersoft.iot.vmp.gb28181.service.IPlatformService;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.query.cmd.CatalogQueryMessageHandler;
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
import gov.nist.javax.sip.message.SIPRequest;
import lombok.extern.slf4j.Slf4j;
import org.dom4j.Element;
@@ -28,6 +31,9 @@ public abstract class MessageHandlerAbstract extends SIPRequestProcessorParent i
@Autowired
private IPlatformService platformService;
@Autowired
private MessageSubscribe messageSubscribe;
public void addHandler(String cmdType, IMessageHandler messageHandler) {
messageHandlerMap.put(cmdType, messageHandler);
}
@@ -54,6 +60,8 @@ public abstract class MessageHandlerAbstract extends SIPRequestProcessorParent i
return;
}
messageHandler.handForDevice(evt, device, element);
}else {
handMessageEvent(element, null);
}
}
@@ -65,4 +73,21 @@ public abstract class MessageHandlerAbstract extends SIPRequestProcessorParent i
messageHandler.handForPlatform(evt, parentPlatform, element);
}
}
public void handMessageEvent(Element element, Object data) {
String cmd = getText(element, "CmdType");
String sn = getText(element, "SN");
MessageEvent<Object> subscribe = (MessageEvent<Object>)messageSubscribe.getSubscribe(cmd + sn);
if (subscribe != null && subscribe.getCallback() != null) {
String result = getText(element, "Result");
if (result == null || "OK".equalsIgnoreCase(result) || data != null) {
subscribe.getCallback().run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), data);
}else {
subscribe.getCallback().run(ErrorCode.ERROR100.getCode(), ErrorCode.ERROR100.getMsg(), result);
}
messageSubscribe.removeSubscribe(cmd + sn);
}
}
}

View File

@@ -1,11 +1,8 @@
package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.response;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.event.MessageSubscribe;
import com.genersoft.iot.vmp.gb28181.event.sip.MessageEvent;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.MessageHandlerAbstract;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.MessageRequestProcessor;
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
import org.dom4j.Element;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
@@ -13,8 +10,6 @@ import org.springframework.stereotype.Component;
import javax.sip.RequestEvent;
import static com.genersoft.iot.vmp.gb28181.utils.XmlUtil.getText;
/**
* 命令类型: 请求动作的应答
* 命令类型: 设备控制, 报警通知, 设备目录信息查询, 目录信息查询, 目录收到, 设备信息查询, 设备状态信息查询 ......
@@ -27,8 +22,7 @@ public class ResponseMessageHandler extends MessageHandlerAbstract implements In
@Autowired
private MessageRequestProcessor messageRequestProcessor;
@Autowired
private MessageSubscribe messageSubscribe;
@Override
public void afterPropertiesSet() throws Exception {
@@ -38,21 +32,5 @@ public class ResponseMessageHandler extends MessageHandlerAbstract implements In
@Override
public void handForDevice(RequestEvent evt, Device device, Element element) {
super.handForDevice(evt, device, element);
handMessageEvent(element, null);
}
public void handMessageEvent(Element element, Object data) {
String cmd = getText(element, "CmdType");
String sn = getText(element, "SN");
MessageEvent<Object> subscribe = (MessageEvent<Object>)messageSubscribe.getSubscribe(cmd + sn);
if (subscribe != null && subscribe.getCallback() != null) {
String result = getText(element, "Result");
if (result == null || "OK".equalsIgnoreCase(result) || data != null) {
subscribe.getCallback().run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), data);
}else {
subscribe.getCallback().run(ErrorCode.ERROR100.getCode(), ErrorCode.ERROR100.getMsg(), result);
}
messageSubscribe.removeSubscribe(cmd + sn);
}
}
}

View File

@@ -1,9 +1,9 @@
package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.response.cmd;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.MessageResponseTask;
import com.genersoft.iot.vmp.gb28181.bean.Platform;
import com.genersoft.iot.vmp.gb28181.bean.Preset;
import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.IMessageHandler;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.response.ResponseMessageHandler;
@@ -13,6 +13,7 @@ import org.dom4j.DocumentException;
import org.dom4j.Element;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import javax.sip.InvalidArgumentException;
@@ -23,6 +24,12 @@ import java.text.ParseException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.TimeUnit;
import static com.genersoft.iot.vmp.gb28181.utils.XmlUtil.getText;
/**
* 设备预置位查询应答
@@ -36,8 +43,9 @@ public class PresetQueryResponseMessageHandler extends SIPRequestProcessorParent
@Autowired
private ResponseMessageHandler responseMessageHandler;
@Autowired
private DeferredResultHolder deferredResultHolder;
private final Map<String, MessageResponseTask<Preset>> mesageMap = new ConcurrentHashMap<>();
private final DelayQueue<MessageResponseTask<Preset>> delayQueue = new DelayQueue<>();
@Override
@@ -93,7 +101,14 @@ public class PresetQueryResponseMessageHandler extends SIPRequestProcessorParent
presetQuerySipReqList.add(presetQuerySipReq);
}
}
responseMessageHandler.handMessageEvent(rootElement, presetQuerySipReqList);
// if (presetQuerySipReqList.size() == sumNum) {
// responseMessageHandler.handMessageEvent(rootElement, presetQuerySipReqList);
// }else {
// String sn = getText(element, "SN");
// addCatch(cmdType + "_" + sn, rootElement, presetQuerySipReqList);
// }
String sn = getText(element, "SN");
addCatch(cmdType + "_" + sn, sumNum, rootElement, presetQuerySipReqList);
try {
responseAck(request, Response.OK);
} catch (InvalidArgumentException | ParseException | SipException e) {
@@ -104,6 +119,63 @@ public class PresetQueryResponseMessageHandler extends SIPRequestProcessorParent
}
}
private void addCatch(String key, int sumNum, Element rootElement, List<Preset> presetQuerySipReqList) {
if (presetQuerySipReqList.size() == sumNum) {
responseMessageHandler.handMessageEvent(rootElement, presetQuerySipReqList);
if (mesageMap.containsKey(key)) {
MessageResponseTask<Preset> messageResponseTask = mesageMap.get(key);
mesageMap.remove(key);
boolean remove = delayQueue.remove(messageResponseTask);
if (!remove) {
log.info("[移除预置位查询任务] 从延时队列内移除失败: {}", key);
}
}
}else {
if (mesageMap.containsKey(key)) {
MessageResponseTask<Preset> messageResponseTask = mesageMap.get(key);
List<Preset> data = messageResponseTask.getData();
data.addAll(presetQuerySipReqList);
if (data.size() == sumNum) {
responseMessageHandler.handMessageEvent(rootElement, presetQuerySipReqList);
mesageMap.remove(key);
boolean remove = delayQueue.remove(messageResponseTask);
if (!remove) {
log.info("[移除预置位查询任务] 从延时队列内移除失败: {}", key);
}
return;
}
messageResponseTask.setDelayTime(System.currentTimeMillis() + 1000);
}else {
MessageResponseTask<Preset> messageResponseTask = new MessageResponseTask<>();
messageResponseTask.setElement(rootElement);
messageResponseTask.setData(presetQuerySipReqList);
messageResponseTask.setDelayTime(System.currentTimeMillis() + 1000);
messageResponseTask.setKey(key);
mesageMap.put(key, messageResponseTask);
delayQueue.offer(messageResponseTask);
}
}
}
// 处理过期的缓存
@Scheduled(fixedDelay = 500, timeUnit = TimeUnit.MILLISECONDS)
public void expirationCheck(){
while (!delayQueue.isEmpty()) {
MessageResponseTask<Preset> take = null;
try {
take = delayQueue.take();
try {
responseMessageHandler.handMessageEvent(take.getElement(), take.getData());
mesageMap.remove(take.getKey());
}catch (Exception e) {
log.error("[预置位查询到期] {} 到期处理时出现异常", take.getKey());
}
} catch (InterruptedException e) {
log.error("[设备订阅任务] ", e);
}
}
}
@Override
public void handForPlatform(RequestEvent evt, Platform parentPlatform, Element rootElement) {