去除移除订阅时设置订阅周期

This commit is contained in:
lin
2025-05-21 18:06:07 +08:00
parent dd17462b68
commit 29ac4850f4
6 changed files with 65 additions and 67 deletions

View File

@@ -32,12 +32,14 @@ public class SubscribeHolder {
public void putCatalogSubscribe(String platformId, SubscribeInfo subscribeInfo) {
log.info("[国标级联] 添加目录订阅,平台: {} 有效期: {}", platformId, subscribeInfo.getExpires());
if (subscribeInfo.getExpires() < 0) {
return;
}
String key = String.format("%s_%s_%s_%s", prefix, userSetting.getServerId(), "catalog", platformId);
Duration duration = Duration.ofSeconds(subscribeInfo.getExpires());
redisTemplate.opsForValue().set(key, subscribeInfo, duration);
if (subscribeInfo.getExpires() > 0) {
Duration duration = Duration.ofSeconds(subscribeInfo.getExpires());
redisTemplate.opsForValue().set(key, subscribeInfo, duration);
}else {
redisTemplate.opsForValue().set(key, subscribeInfo);
}
}
public SubscribeInfo getCatalogSubscribe(String platformId) {
@@ -52,13 +54,13 @@ public class SubscribeHolder {
public void putMobilePositionSubscribe(String platformId, SubscribeInfo subscribeInfo, Runnable gpsTask) {
log.info("[国标级联] 添加移动位置订阅,平台: {} 有效期: {}", platformId, subscribeInfo.getExpires());
if (subscribeInfo.getExpires() < 0) {
return;
}
String key = String.format("%s_%s_%s_%s", prefix, userSetting.getServerId(), "mobilePosition", platformId);
Duration duration = Duration.ofSeconds(subscribeInfo.getExpires());
redisTemplate.opsForValue().set(key, subscribeInfo, duration);
if (subscribeInfo.getExpires() > 0) {
Duration duration = Duration.ofSeconds(subscribeInfo.getExpires());
redisTemplate.opsForValue().set(key, subscribeInfo, duration);
}else {
redisTemplate.opsForValue().set(key, subscribeInfo);
}
int cycleForCatalog;
if (subscribeInfo.getGpsInterval() <= 0) {
cycleForCatalog = 5;

View File

@@ -1,36 +1,19 @@
package com.genersoft.iot.vmp.gb28181.bean;
import gov.nist.javax.sip.message.SIPRequest;
import gov.nist.javax.sip.message.SIPResponse;
import lombok.Data;
import javax.sip.header.*;
import javax.sip.header.EventHeader;
import java.util.UUID;
@Data
public class SubscribeInfo {
public SubscribeInfo(SIPRequest request, String id) {
this.id = id;
this.request = request;
this.expires = request.getExpires().getExpires();
EventHeader eventHeader = (EventHeader)request.getHeader(EventHeader.NAME);
this.eventId = eventHeader.getEventId();
this.eventType = eventHeader.getEventType();
}
public SubscribeInfo() {
}
private String id;
private SIPRequest request;
private int expires;
private String eventId;
private String eventType;
private SIPResponse response;
private SipTransactionInfo transactionInfo;
/**
* 以下为可选字段
@@ -55,6 +38,16 @@ public class SubscribeInfo {
private String simulatedCallId;
public static SubscribeInfo getInstance(SIPResponse response, String id, int expires, EventHeader eventHeader){
SubscribeInfo subscribeInfo = new SubscribeInfo();
subscribeInfo.id = id;
subscribeInfo.transactionInfo = new SipTransactionInfo(response);
subscribeInfo.expires = expires;
subscribeInfo.eventId = eventHeader.getEventId();
subscribeInfo.eventType = eventHeader.getEventType();
return subscribeInfo;
}
public static SubscribeInfo buildSimulated(String platFormServerId, String platFormServerIp){
SubscribeInfo subscribeInfo = new SubscribeInfo();
subscribeInfo.setId(platFormServerId);

View File

@@ -389,7 +389,6 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner {
return false;
}
log.info("[移除目录订阅]: {}", device.getDeviceId());
device.setSubscribeCycleForCatalog(0);
String key = SubscribeTaskForCatalog.getKey(device);
if (subscribeTaskRunner.containsKey(key)) {
SipTransactionInfo transactionInfo = subscribeTaskRunner.getTransactionInfo(key);
@@ -728,8 +727,6 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner {
// 开启订阅
device.setSubscribeCycleForCatalog(cycle);
addCatalogSubscribe(device, null);
deviceMapper.updateSubscribeCatalog(device);
redisCatchStorage.updateDevice(device);
}
}

View File

@@ -34,7 +34,7 @@ public class SIPRequestHeaderPlarformProvider {
@Autowired
private SipConfig sipConfig;
@Autowired
private SipLayer sipLayer;
@@ -225,11 +225,11 @@ public class SIPRequestHeaderPlarformProvider {
SipURI fromSipURI = SipFactory.getInstance().createAddressFactory().createSipURI(parentPlatform.getDeviceGBId(),
parentPlatform.getDeviceIp() + ":" + parentPlatform.getDevicePort());
Address fromAddress = SipFactory.getInstance().createAddressFactory().createAddress(fromSipURI);
FromHeader fromHeader = SipFactory.getInstance().createHeaderFactory().createFromHeader(fromAddress, subscribeInfo.getResponse() != null ? subscribeInfo.getResponse().getToTag(): subscribeInfo.getSimulatedToTag());
FromHeader fromHeader = SipFactory.getInstance().createHeaderFactory().createFromHeader(fromAddress, subscribeInfo.getTransactionInfo() != null ? subscribeInfo.getTransactionInfo() .getToTag(): subscribeInfo.getSimulatedToTag());
// to
SipURI toSipURI = SipFactory.getInstance().createAddressFactory().createSipURI(parentPlatform.getServerGBId(), parentPlatform.getServerGBDomain());
Address toAddress = SipFactory.getInstance().createAddressFactory().createAddress(toSipURI);
ToHeader toHeader = SipFactory.getInstance().createHeaderFactory().createToHeader(toAddress, subscribeInfo.getRequest() != null ?subscribeInfo.getRequest().getFromTag(): subscribeInfo.getSimulatedFromTag());
ToHeader toHeader = SipFactory.getInstance().createHeaderFactory().createToHeader(toAddress, subscribeInfo.getTransactionInfo() != null ?subscribeInfo.getTransactionInfo().getFromTag(): subscribeInfo.getSimulatedFromTag());
// Forwards
MaxForwardsHeader maxForwards = SipFactory.getInstance().createHeaderFactory().createMaxForwardsHeader(70);
@@ -239,7 +239,7 @@ public class SIPRequestHeaderPlarformProvider {
// 设置编码, 防止中文乱码
messageFactory.setDefaultContentEncodingCharset("gb2312");
CallIdHeader callIdHeader = SipFactory.getInstance().createHeaderFactory().createCallIdHeader(subscribeInfo.getRequest() != null ? subscribeInfo.getRequest().getCallIdHeader().getCallId(): subscribeInfo.getSimulatedCallId());
CallIdHeader callIdHeader = SipFactory.getInstance().createHeaderFactory().createCallIdHeader(subscribeInfo.getTransactionInfo() != null ? subscribeInfo.getTransactionInfo().getCallId(): subscribeInfo.getSimulatedCallId());
request = (SIPRequest) messageFactory.createRequest(requestURI, Request.NOTIFY, callIdHeader, cSeqHeader, fromHeader,
toHeader, viaHeaders, maxForwards);

View File

@@ -1,9 +1,6 @@
package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl;
import com.genersoft.iot.vmp.gb28181.bean.CmdType;
import com.genersoft.iot.vmp.gb28181.bean.Platform;
import com.genersoft.iot.vmp.gb28181.bean.SubscribeHolder;
import com.genersoft.iot.vmp.gb28181.bean.SubscribeInfo;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.service.IPlatformService;
import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver;
import com.genersoft.iot.vmp.gb28181.transmit.SIPSender;
@@ -23,6 +20,7 @@ import org.springframework.stereotype.Component;
import javax.sip.InvalidArgumentException;
import javax.sip.RequestEvent;
import javax.sip.SipException;
import javax.sip.header.EventHeader;
import javax.sip.header.ExpiresHeader;
import javax.sip.message.Response;
import java.text.ParseException;
@@ -56,9 +54,9 @@ public class SubscribeRequestProcessor extends SIPRequestProcessorParent impleme
sipProcessorObserver.addRequestProcessor(method, this);
}
/**
* 处理SUBSCRIBE请求
*
/**
* 处理SUBSCRIBE请求
*
* @param evt 事件
*/
@Override
@@ -106,7 +104,6 @@ public class SubscribeRequestProcessor extends SIPRequestProcessorParent impleme
String platformId = SipUtils.getUserIdFromFromHeader(request);
String deviceId = XmlUtil.getText(rootElement, "DeviceID");
Platform platform = platformService.queryPlatformByServerGBId(platformId);
SubscribeInfo subscribeInfo = new SubscribeInfo(request, platformId);
if (platform == null) {
return;
}
@@ -122,23 +119,28 @@ public class SubscribeRequestProcessor extends SIPRequestProcessorParent impleme
.append("<Result>OK</Result>\r\n")
.append("</Response>\r\n");
if (subscribeInfo.getExpires() > 0) {
// GPS上报时间间隔
String interval = XmlUtil.getText(rootElement, "Interval");
if (interval == null) {
subscribeInfo.setGpsInterval(5);
}else {
subscribeInfo.setGpsInterval(Integer.parseInt(interval));
}
subscribeInfo.setSn(sn);
}
try {
SIPResponse response = responseXmlAck(request, resultXml.toString(), platform, subscribeInfo.getExpires());
int expires = request.getExpires().getExpires();
SIPResponse response = responseXmlAck(request, resultXml.toString(), platform, expires);
SubscribeInfo subscribeInfo = SubscribeInfo.getInstance(response, platformId, expires,
(EventHeader)request.getHeader(EventHeader.NAME));
if (subscribeInfo.getExpires() > 0) {
// GPS上报时间间隔
String interval = XmlUtil.getText(rootElement, "Interval");
if (interval == null) {
subscribeInfo.setGpsInterval(5);
}else {
subscribeInfo.setGpsInterval(Integer.parseInt(interval));
}
subscribeInfo.setSn(sn);
}
if (subscribeInfo.getExpires() == 0) {
subscribeHolder.removeMobilePositionSubscribe(platformId);
}else {
subscribeInfo.setResponse(response);
subscribeInfo.setTransactionInfo(new SipTransactionInfo(response));
subscribeHolder.putMobilePositionSubscribe(platformId, subscribeInfo, ()->{
platformService.sendNotifyMobilePosition(platformId);
});
@@ -163,7 +165,6 @@ public class SubscribeRequestProcessor extends SIPRequestProcessorParent impleme
if (platform == null){
return;
}
SubscribeInfo subscribeInfo = new SubscribeInfo(request, platformId);
String sn = XmlUtil.getText(rootElement, "SN");
log.info("[回复上级的目录订阅请求]: {}/{}", platformId, deviceId);
@@ -176,18 +177,25 @@ public class SubscribeRequestProcessor extends SIPRequestProcessorParent impleme
.append("<Result>OK</Result>\r\n")
.append("</Response>\r\n");
if (subscribeInfo.getExpires() > 0) {
subscribeHolder.putCatalogSubscribe(platformId, subscribeInfo);
}else if (subscribeInfo.getExpires() == 0) {
subscribeHolder.removeCatalogSubscribe(platformId);
}
try {
int expires = request.getExpires().getExpires();
Platform parentPlatform = platformService.queryPlatformByServerGBId(platformId);
SIPResponse response = responseXmlAck(request, resultXml.toString(), parentPlatform, subscribeInfo.getExpires());
SIPResponse response = responseXmlAck(request, resultXml.toString(), parentPlatform, expires);
SubscribeInfo subscribeInfo = SubscribeInfo.getInstance(response, platformId, expires,
(EventHeader)request.getHeader(EventHeader.NAME));
if (subscribeInfo.getExpires() > 0) {
subscribeHolder.putCatalogSubscribe(platformId, subscribeInfo);
}else if (subscribeInfo.getExpires() == 0) {
subscribeHolder.removeCatalogSubscribe(platformId);
}
if (subscribeInfo.getExpires() == 0) {
subscribeHolder.removeCatalogSubscribe(platformId);
}else {
subscribeInfo.setResponse(response);
subscribeInfo.setTransactionInfo(new SipTransactionInfo(response));
subscribeHolder.putCatalogSubscribe(platformId, subscribeInfo);
}
} catch (SipException | InvalidArgumentException | ParseException e) {

View File

@@ -133,7 +133,6 @@ public class RedisAlarmMsgListener implements MessageListener {
}
}
}
}
// 获取开启了消息推送的设备和平台
List<Device> devices = channelService.queryDeviceWithAsMessageChannel();
@@ -147,7 +146,6 @@ public class RedisAlarmMsgListener implements MessageListener {
}
}
}
} else {
// 获取该通道ID是属于设备还是对应的上级平台
Device device = deviceService.getDeviceBySourceChannelDeviceId(gbId);