From 29ac4850f46b25b4dd00d94d8ac0c4b21a885e41 Mon Sep 17 00:00:00 2001 From: lin <648540858@qq.com> Date: Wed, 21 May 2025 18:06:07 +0800 Subject: [PATCH] =?UTF-8?q?=E5=8E=BB=E9=99=A4=E7=A7=BB=E9=99=A4=E8=AE=A2?= =?UTF-8?q?=E9=98=85=E6=97=B6=E8=AE=BE=E7=BD=AE=E8=AE=A2=E9=98=85=E5=91=A8?= =?UTF-8?q?=E6=9C=9F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../iot/vmp/gb28181/bean/SubscribeHolder.java | 24 +++---- .../iot/vmp/gb28181/bean/SubscribeInfo.java | 31 ++++----- .../service/impl/DeviceServiceImpl.java | 3 - .../cmd/SIPRequestHeaderPlarformProvider.java | 8 +-- .../impl/SubscribeRequestProcessor.java | 64 +++++++++++-------- .../redisMsg/RedisAlarmMsgListener.java | 2 - 6 files changed, 65 insertions(+), 67 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeHolder.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeHolder.java index a03d5a706..85004db3a 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeHolder.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeHolder.java @@ -32,12 +32,14 @@ public class SubscribeHolder { public void putCatalogSubscribe(String platformId, SubscribeInfo subscribeInfo) { log.info("[国标级联] 添加目录订阅,平台: {}, 有效期: {}", platformId, subscribeInfo.getExpires()); - if (subscribeInfo.getExpires() < 0) { - return; - } + String key = String.format("%s_%s_%s_%s", prefix, userSetting.getServerId(), "catalog", platformId); - Duration duration = Duration.ofSeconds(subscribeInfo.getExpires()); - redisTemplate.opsForValue().set(key, subscribeInfo, duration); + if (subscribeInfo.getExpires() > 0) { + Duration duration = Duration.ofSeconds(subscribeInfo.getExpires()); + redisTemplate.opsForValue().set(key, subscribeInfo, duration); + }else { + redisTemplate.opsForValue().set(key, subscribeInfo); + } } public SubscribeInfo getCatalogSubscribe(String platformId) { @@ -52,13 +54,13 @@ public class SubscribeHolder { public void putMobilePositionSubscribe(String platformId, SubscribeInfo subscribeInfo, Runnable gpsTask) { log.info("[国标级联] 添加移动位置订阅,平台: {}, 有效期: {}", platformId, subscribeInfo.getExpires()); - if (subscribeInfo.getExpires() < 0) { - return; - } String key = String.format("%s_%s_%s_%s", prefix, userSetting.getServerId(), "mobilePosition", platformId); - Duration duration = Duration.ofSeconds(subscribeInfo.getExpires()); - redisTemplate.opsForValue().set(key, subscribeInfo, duration); - + if (subscribeInfo.getExpires() > 0) { + Duration duration = Duration.ofSeconds(subscribeInfo.getExpires()); + redisTemplate.opsForValue().set(key, subscribeInfo, duration); + }else { + redisTemplate.opsForValue().set(key, subscribeInfo); + } int cycleForCatalog; if (subscribeInfo.getGpsInterval() <= 0) { cycleForCatalog = 5; diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeInfo.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeInfo.java index a131ccb43..5820cb67c 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeInfo.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeInfo.java @@ -1,36 +1,19 @@ package com.genersoft.iot.vmp.gb28181.bean; -import gov.nist.javax.sip.message.SIPRequest; import gov.nist.javax.sip.message.SIPResponse; import lombok.Data; -import javax.sip.header.*; +import javax.sip.header.EventHeader; import java.util.UUID; @Data public class SubscribeInfo { - - public SubscribeInfo(SIPRequest request, String id) { - this.id = id; - this.request = request; - this.expires = request.getExpires().getExpires(); - EventHeader eventHeader = (EventHeader)request.getHeader(EventHeader.NAME); - this.eventId = eventHeader.getEventId(); - this.eventType = eventHeader.getEventType(); - - } - - public SubscribeInfo() { - } - private String id; - - private SIPRequest request; private int expires; private String eventId; private String eventType; - private SIPResponse response; + private SipTransactionInfo transactionInfo; /** * 以下为可选字段 @@ -55,6 +38,16 @@ public class SubscribeInfo { private String simulatedCallId; + public static SubscribeInfo getInstance(SIPResponse response, String id, int expires, EventHeader eventHeader){ + SubscribeInfo subscribeInfo = new SubscribeInfo(); + subscribeInfo.id = id; + subscribeInfo.transactionInfo = new SipTransactionInfo(response); + + subscribeInfo.expires = expires; + subscribeInfo.eventId = eventHeader.getEventId(); + subscribeInfo.eventType = eventHeader.getEventType(); + return subscribeInfo; + } public static SubscribeInfo buildSimulated(String platFormServerId, String platFormServerIp){ SubscribeInfo subscribeInfo = new SubscribeInfo(); subscribeInfo.setId(platFormServerId); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceServiceImpl.java index 46702ad65..2b5bdacf5 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceServiceImpl.java @@ -389,7 +389,6 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner { return false; } log.info("[移除目录订阅]: {}", device.getDeviceId()); - device.setSubscribeCycleForCatalog(0); String key = SubscribeTaskForCatalog.getKey(device); if (subscribeTaskRunner.containsKey(key)) { SipTransactionInfo transactionInfo = subscribeTaskRunner.getTransactionInfo(key); @@ -728,8 +727,6 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner { // 开启订阅 device.setSubscribeCycleForCatalog(cycle); addCatalogSubscribe(device, null); - deviceMapper.updateSubscribeCatalog(device); - redisCatchStorage.updateDevice(device); } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderPlarformProvider.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderPlarformProvider.java index 4b7176590..8e3fb4b36 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderPlarformProvider.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderPlarformProvider.java @@ -34,7 +34,7 @@ public class SIPRequestHeaderPlarformProvider { @Autowired private SipConfig sipConfig; - + @Autowired private SipLayer sipLayer; @@ -225,11 +225,11 @@ public class SIPRequestHeaderPlarformProvider { SipURI fromSipURI = SipFactory.getInstance().createAddressFactory().createSipURI(parentPlatform.getDeviceGBId(), parentPlatform.getDeviceIp() + ":" + parentPlatform.getDevicePort()); Address fromAddress = SipFactory.getInstance().createAddressFactory().createAddress(fromSipURI); - FromHeader fromHeader = SipFactory.getInstance().createHeaderFactory().createFromHeader(fromAddress, subscribeInfo.getResponse() != null ? subscribeInfo.getResponse().getToTag(): subscribeInfo.getSimulatedToTag()); + FromHeader fromHeader = SipFactory.getInstance().createHeaderFactory().createFromHeader(fromAddress, subscribeInfo.getTransactionInfo() != null ? subscribeInfo.getTransactionInfo() .getToTag(): subscribeInfo.getSimulatedToTag()); // to SipURI toSipURI = SipFactory.getInstance().createAddressFactory().createSipURI(parentPlatform.getServerGBId(), parentPlatform.getServerGBDomain()); Address toAddress = SipFactory.getInstance().createAddressFactory().createAddress(toSipURI); - ToHeader toHeader = SipFactory.getInstance().createHeaderFactory().createToHeader(toAddress, subscribeInfo.getRequest() != null ?subscribeInfo.getRequest().getFromTag(): subscribeInfo.getSimulatedFromTag()); + ToHeader toHeader = SipFactory.getInstance().createHeaderFactory().createToHeader(toAddress, subscribeInfo.getTransactionInfo() != null ?subscribeInfo.getTransactionInfo().getFromTag(): subscribeInfo.getSimulatedFromTag()); // Forwards MaxForwardsHeader maxForwards = SipFactory.getInstance().createHeaderFactory().createMaxForwardsHeader(70); @@ -239,7 +239,7 @@ public class SIPRequestHeaderPlarformProvider { // 设置编码, 防止中文乱码 messageFactory.setDefaultContentEncodingCharset("gb2312"); - CallIdHeader callIdHeader = SipFactory.getInstance().createHeaderFactory().createCallIdHeader(subscribeInfo.getRequest() != null ? subscribeInfo.getRequest().getCallIdHeader().getCallId(): subscribeInfo.getSimulatedCallId()); + CallIdHeader callIdHeader = SipFactory.getInstance().createHeaderFactory().createCallIdHeader(subscribeInfo.getTransactionInfo() != null ? subscribeInfo.getTransactionInfo().getCallId(): subscribeInfo.getSimulatedCallId()); request = (SIPRequest) messageFactory.createRequest(requestURI, Request.NOTIFY, callIdHeader, cSeqHeader, fromHeader, toHeader, viaHeaders, maxForwards); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/SubscribeRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/SubscribeRequestProcessor.java index acdff1ce3..f59091ccd 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/SubscribeRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/SubscribeRequestProcessor.java @@ -1,9 +1,6 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl; -import com.genersoft.iot.vmp.gb28181.bean.CmdType; -import com.genersoft.iot.vmp.gb28181.bean.Platform; -import com.genersoft.iot.vmp.gb28181.bean.SubscribeHolder; -import com.genersoft.iot.vmp.gb28181.bean.SubscribeInfo; +import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.gb28181.service.IPlatformService; import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver; import com.genersoft.iot.vmp.gb28181.transmit.SIPSender; @@ -23,6 +20,7 @@ import org.springframework.stereotype.Component; import javax.sip.InvalidArgumentException; import javax.sip.RequestEvent; import javax.sip.SipException; +import javax.sip.header.EventHeader; import javax.sip.header.ExpiresHeader; import javax.sip.message.Response; import java.text.ParseException; @@ -56,9 +54,9 @@ public class SubscribeRequestProcessor extends SIPRequestProcessorParent impleme sipProcessorObserver.addRequestProcessor(method, this); } - /** - * 处理SUBSCRIBE请求 - * + /** + * 处理SUBSCRIBE请求 + * * @param evt 事件 */ @Override @@ -106,7 +104,6 @@ public class SubscribeRequestProcessor extends SIPRequestProcessorParent impleme String platformId = SipUtils.getUserIdFromFromHeader(request); String deviceId = XmlUtil.getText(rootElement, "DeviceID"); Platform platform = platformService.queryPlatformByServerGBId(platformId); - SubscribeInfo subscribeInfo = new SubscribeInfo(request, platformId); if (platform == null) { return; } @@ -122,23 +119,28 @@ public class SubscribeRequestProcessor extends SIPRequestProcessorParent impleme .append("OK\r\n") .append("\r\n"); - if (subscribeInfo.getExpires() > 0) { - // GPS上报时间间隔 - String interval = XmlUtil.getText(rootElement, "Interval"); - if (interval == null) { - subscribeInfo.setGpsInterval(5); - }else { - subscribeInfo.setGpsInterval(Integer.parseInt(interval)); - } - subscribeInfo.setSn(sn); - } + try { - SIPResponse response = responseXmlAck(request, resultXml.toString(), platform, subscribeInfo.getExpires()); + int expires = request.getExpires().getExpires(); + SIPResponse response = responseXmlAck(request, resultXml.toString(), platform, expires); + + SubscribeInfo subscribeInfo = SubscribeInfo.getInstance(response, platformId, expires, + (EventHeader)request.getHeader(EventHeader.NAME)); + if (subscribeInfo.getExpires() > 0) { + // GPS上报时间间隔 + String interval = XmlUtil.getText(rootElement, "Interval"); + if (interval == null) { + subscribeInfo.setGpsInterval(5); + }else { + subscribeInfo.setGpsInterval(Integer.parseInt(interval)); + } + subscribeInfo.setSn(sn); + } if (subscribeInfo.getExpires() == 0) { subscribeHolder.removeMobilePositionSubscribe(platformId); }else { - subscribeInfo.setResponse(response); + subscribeInfo.setTransactionInfo(new SipTransactionInfo(response)); subscribeHolder.putMobilePositionSubscribe(platformId, subscribeInfo, ()->{ platformService.sendNotifyMobilePosition(platformId); }); @@ -163,7 +165,6 @@ public class SubscribeRequestProcessor extends SIPRequestProcessorParent impleme if (platform == null){ return; } - SubscribeInfo subscribeInfo = new SubscribeInfo(request, platformId); String sn = XmlUtil.getText(rootElement, "SN"); log.info("[回复上级的目录订阅请求]: {}/{}", platformId, deviceId); @@ -176,18 +177,25 @@ public class SubscribeRequestProcessor extends SIPRequestProcessorParent impleme .append("OK\r\n") .append("\r\n"); - if (subscribeInfo.getExpires() > 0) { - subscribeHolder.putCatalogSubscribe(platformId, subscribeInfo); - }else if (subscribeInfo.getExpires() == 0) { - subscribeHolder.removeCatalogSubscribe(platformId); - } + try { + int expires = request.getExpires().getExpires(); Platform parentPlatform = platformService.queryPlatformByServerGBId(platformId); - SIPResponse response = responseXmlAck(request, resultXml.toString(), parentPlatform, subscribeInfo.getExpires()); + SIPResponse response = responseXmlAck(request, resultXml.toString(), parentPlatform, expires); + + SubscribeInfo subscribeInfo = SubscribeInfo.getInstance(response, platformId, expires, + (EventHeader)request.getHeader(EventHeader.NAME)); + + if (subscribeInfo.getExpires() > 0) { + subscribeHolder.putCatalogSubscribe(platformId, subscribeInfo); + }else if (subscribeInfo.getExpires() == 0) { + subscribeHolder.removeCatalogSubscribe(platformId); + } + if (subscribeInfo.getExpires() == 0) { subscribeHolder.removeCatalogSubscribe(platformId); }else { - subscribeInfo.setResponse(response); + subscribeInfo.setTransactionInfo(new SipTransactionInfo(response)); subscribeHolder.putCatalogSubscribe(platformId, subscribeInfo); } } catch (SipException | InvalidArgumentException | ParseException e) { diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisAlarmMsgListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisAlarmMsgListener.java index dc20f64bd..894448d84 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisAlarmMsgListener.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisAlarmMsgListener.java @@ -133,7 +133,6 @@ public class RedisAlarmMsgListener implements MessageListener { } } } - } // 获取开启了消息推送的设备和平台 List devices = channelService.queryDeviceWithAsMessageChannel(); @@ -147,7 +146,6 @@ public class RedisAlarmMsgListener implements MessageListener { } } } - } else { // 获取该通道ID是属于设备还是对应的上级平台 Device device = deviceService.getDeviceBySourceChannelDeviceId(gbId);