优化级联移动位置订阅位置更新

This commit is contained in:
648540858
2022-04-14 16:52:48 +08:00
parent efc4a7bc8e
commit e6ee7fe747
13 changed files with 227 additions and 121 deletions

View File

@@ -1,5 +1,12 @@
package com.genersoft.iot.vmp.gb28181.bean;
import com.genersoft.iot.vmp.common.VideoManagerConstants;
import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.gb28181.task.impl.MobilePositionSubscribeHandlerTask;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
@@ -9,12 +16,32 @@ import java.util.concurrent.ConcurrentHashMap;
@Component
public class SubscribeHolder {
@Autowired
private DynamicTask dynamicTask;
@Autowired
private IRedisCatchStorage redisCatchStorage;
@Autowired
private ISIPCommanderForPlatform sipCommanderForPlatform;
@Autowired
private IVideoManagerStorage storager;
private final String taskOverduePrefix = "subscribe_overdue_";
private static ConcurrentHashMap<String, SubscribeInfo> catalogMap = new ConcurrentHashMap<>();
private static ConcurrentHashMap<String, SubscribeInfo> mobilePositionMap = new ConcurrentHashMap<>();
public void putCatalogSubscribe(String platformId, SubscribeInfo subscribeInfo) {
catalogMap.put(platformId, subscribeInfo);
// 添加订阅到期
String taskOverdueKey = taskOverduePrefix + "catalog_" + platformId;
dynamicTask.stop(taskOverdueKey);
// 添加任务处理订阅过期
dynamicTask.startDelay(taskOverdueKey, () -> removeCatalogSubscribe(subscribeInfo.getId()),
subscribeInfo.getExpires() * 1000);
}
public SubscribeInfo getCatalogSubscribe(String platformId) {
@@ -23,10 +50,24 @@ public class SubscribeHolder {
public void removeCatalogSubscribe(String platformId) {
catalogMap.remove(platformId);
String taskOverdueKey = taskOverduePrefix + "catalog_" + platformId;
// 添加任务处理订阅过期
dynamicTask.stop(taskOverdueKey);
}
public void putMobilePositionSubscribe(String platformId, SubscribeInfo subscribeInfo) {
mobilePositionMap.put(platformId, subscribeInfo);
String key = VideoManagerConstants.SIP_SUBSCRIBE_PREFIX + "MobilePosition_" + platformId;
// 添加任务处理GPS定时推送
dynamicTask.startCron(key, new MobilePositionSubscribeHandlerTask(redisCatchStorage, sipCommanderForPlatform, storager, platformId, subscribeInfo.getSn(), key, this), subscribeInfo.getGpsInterval());
String taskOverdueKey = taskOverduePrefix + "MobilePosition_" + platformId;
dynamicTask.stop(taskOverdueKey);
// 添加任务处理订阅过期
dynamicTask.startDelay(taskOverdueKey, () -> {
System.out.println("订阅过期");
removeMobilePositionSubscribe(subscribeInfo.getId());
},
subscribeInfo.getExpires() * 1000);
}
public SubscribeInfo getMobilePositionSubscribe(String platformId) {
@@ -35,6 +76,12 @@ public class SubscribeHolder {
public void removeMobilePositionSubscribe(String platformId) {
mobilePositionMap.remove(platformId);
String key = VideoManagerConstants.SIP_SUBSCRIBE_PREFIX + "MobilePosition_" + platformId;
// 结束任务处理GPS定时推送
dynamicTask.stop(key);
String taskOverdueKey = taskOverduePrefix + "MobilePosition_" + platformId;
// 添加任务处理订阅过期
dynamicTask.stop(taskOverdueKey);
}
public List<String> getAllCatalogSubscribePlatform() {
@@ -48,7 +95,7 @@ public class SubscribeHolder {
}
public void removeAllSubscribe(String platformId) {
mobilePositionMap.remove(platformId);
catalogMap.remove(platformId);
removeMobilePositionSubscribe(platformId);
removeCatalogSubscribe(platformId);
}
}

View File

@@ -33,6 +33,14 @@ public class SubscribeInfo {
private ServerTransaction transaction;
private Dialog dialog;
/**
* 以下为可选字段
* @return
*/
private String sn;
private int gpsInterval;
public String getId() {
return id;
}
@@ -88,4 +96,20 @@ public class SubscribeInfo {
public void setDialog(Dialog dialog) {
this.dialog = dialog;
}
public String getSn() {
return sn;
}
public void setSn(String sn) {
this.sn = sn;
}
public int getGpsInterval() {
return gpsInterval;
}
public void setGpsInterval(int gpsInterval) {
this.gpsInterval = gpsInterval;
}
}

View File

@@ -1,50 +0,0 @@
package com.genersoft.iot.vmp.gb28181.event.subscribe;
import com.genersoft.iot.vmp.common.VideoManagerConstants;
import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.conf.RedisKeyExpirationEventMessageListener;
import com.genersoft.iot.vmp.conf.UserSetting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.stereotype.Component;
/**
* 平台订阅到期事件
*/
@Component
public class SubscribeListenerForPlatform extends RedisKeyExpirationEventMessageListener {
private Logger logger = LoggerFactory.getLogger(SubscribeListenerForPlatform.class);
@Autowired
private UserSetting userSetting;
@Autowired
private DynamicTask dynamicTask;
public SubscribeListenerForPlatform(RedisMessageListenerContainer listenerContainer, UserSetting userSetting) {
super(listenerContainer, userSetting);
}
/**
* 监听失效的key
* @param message
* @param pattern
*/
@Override
public void onMessage(Message message, byte[] pattern) {
// 获取失效的key
String expiredKey = message.toString();
logger.debug(expiredKey);
// 订阅到期
String PLATFORM_KEEPLIVEKEY_PREFIX = VideoManagerConstants.SIP_SUBSCRIBE_PREFIX + userSetting.getServerId() + "_";
if (expiredKey.startsWith(PLATFORM_KEEPLIVEKEY_PREFIX)) {
// 取消定时任务
dynamicTask.stop(expiredKey);
}
}
}

View File

@@ -61,8 +61,6 @@ public class CatalogEventLister implements ApplicationListener<CatalogEvent> {
if (event.getPlatformId() != null) {
parentPlatform = storager.queryParentPlatByServerGBId(event.getPlatformId());
if (parentPlatform != null && !parentPlatform.isStatus())return;
String key = VideoManagerConstants.SIP_SUBSCRIBE_PREFIX + userSetting.getServerId() + "_Catalog_" + event.getPlatformId();
// subscribe = redisCatchStorage.getSubscribe(key);
subscribe = subscribeHolder.getCatalogSubscribe(event.getPlatformId());
if (subscribe == null) {

View File

@@ -46,7 +46,6 @@ public class CatalogSubscribeTask implements ISubscribeTask {
});
}
@Async
@Override
public void stop() {
/**

View File

@@ -24,52 +24,50 @@ public class MobilePositionSubscribeHandlerTask implements ISubscribeTask {
private IVideoManagerStorage storager;
private ISIPCommanderForPlatform sipCommanderForPlatform;
private SubscribeHolder subscribeHolder;
private String platformId;
private ParentPlatform platform;
private String sn;
private String key;
public MobilePositionSubscribeHandlerTask(IRedisCatchStorage redisCatchStorage, ISIPCommanderForPlatform sipCommanderForPlatform, IVideoManagerStorage storager, String platformId, String sn, String key, SubscribeHolder subscribeInfo) {
System.out.println("MobilePositionSubscribeHandlerTask 初始化");
this.redisCatchStorage = redisCatchStorage;
this.storager = storager;
this.platformId = platformId;
this.platform = storager.queryParentPlatByServerGBId(platformId);
this.sn = sn;
this.key = key;
this.sipCommanderForPlatform = sipCommanderForPlatform;
this.subscribeHolder = subscribeInfo;
}
@Async
@Override
public void run() {
logger.info("执行MobilePositionSubscribeHandlerTask");
SubscribeInfo subscribe = subscribeHolder.getMobilePositionSubscribe(platformId);
if (platform == null) return;
SubscribeInfo subscribe = subscribeHolder.getMobilePositionSubscribe(platform.getServerGBId());
if (subscribe != null) {
ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(platformId);
if (parentPlatform == null ) {
logger.info("发送订阅时未找到平台信息:{}", platformId);
return;
}
if (!parentPlatform.isStatus()) {
logger.info("发送订阅时发现平台已经离线:{}", platformId);
return;
}
// if (!parentPlatform.isStatus()) {
// logger.info("发送订阅时发现平台已经离线:{}", platformId);
// return;
// }
// TODO 暂时只处理视频流的回复,后续增加对国标设备的支持
List<GbStream> gbStreams = storager.queryGbStreamListInPlatform(platformId);
List<GbStream> gbStreams = storager.queryGbStreamListInPlatform(platform.getServerGBId());
if (gbStreams.size() == 0) {
logger.info("发送订阅时发现平台已经没有关联的直播流:{}", platformId);
logger.info("发送订阅时发现平台已经没有关联的直播流:{}", platform.getServerGBId());
return;
}
for (GbStream gbStream : gbStreams) {
String gbId = gbStream.getGbId();
GPSMsgInfo gpsMsgInfo = redisCatchStorage.getGpsMsgInfo(gbId);
if (gpsMsgInfo != null) { // 无最新位置不发送
logger.info("无最新位置不发送");
// 经纬度都为0不发送
if (gpsMsgInfo.getLng() == 0 && gpsMsgInfo.getLat() == 0) {
continue;
}
// 发送GPS消息
sipCommanderForPlatform.sendNotifyMobilePosition(parentPlatform, gpsMsgInfo, subscribe);
sipCommanderForPlatform.sendNotifyMobilePosition(platform, gpsMsgInfo, subscribe);
}
}
}

View File

@@ -11,6 +11,8 @@ import org.springframework.scheduling.annotation.Async;
import javax.sip.Dialog;
import javax.sip.DialogState;
import javax.sip.ResponseEvent;
import java.util.Timer;
import java.util.TimerTask;
/**
* 移动位置订阅的定时更新
@@ -21,18 +23,23 @@ public class MobilePositionSubscribeTask implements ISubscribeTask {
private ISIPCommander sipCommander;
private Dialog dialog;
private Timer timer ;
public MobilePositionSubscribeTask(Device device, ISIPCommander sipCommander) {
this.device = device;
this.sipCommander = sipCommander;
}
@Async
@Override
public void run() {
if (timer != null ) {
timer.cancel();
timer = null;
}
sipCommander.mobilePositionSubscribe(device, dialog, eventResult -> {
if (eventResult.dialog != null || eventResult.dialog.getState().equals(DialogState.CONFIRMED)) {
dialog = eventResult.dialog;
}
// if (eventResult.dialog != null || eventResult.dialog.getState().equals(DialogState.CONFIRMED)) {
// dialog = eventResult.dialog;
// }
ResponseEvent event = (ResponseEvent) eventResult.event;
if (event.getResponse().getRawContent() != null) {
// 成功
@@ -45,6 +52,13 @@ public class MobilePositionSubscribeTask implements ISubscribeTask {
dialog = null;
// 失败
logger.warn("[移动位置订阅]失败,信令发送失败: {}-{} ", device.getDeviceId(), eventResult.msg);
timer = new Timer();
timer.schedule(new TimerTask() {
@Override
public void run() {
MobilePositionSubscribeTask.this.run();
}
}, 2000);
});
}
@@ -58,6 +72,10 @@ public class MobilePositionSubscribeTask implements ISubscribeTask {
* COMPLETED-> Completed Dialog状态-已完成
* TERMINATED-> Terminated Dialog状态-终止
*/
if (timer != null ) {
timer.cancel();
timer = null;
}
if (dialog != null && dialog.getState().equals(DialogState.CONFIRMED)) {
logger.info("取消移动订阅时dialog状态为{}", dialog.getState());
device.setSubscribeCycleForMobilePosition(0);

View File

@@ -385,7 +385,7 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform {
if (parentPlatform == null) {
return false;
}
logger.info("[发送 移动位置订阅] {}/{}->{},{}", parentPlatform.getServerGBId(), gpsMsgInfo.getId(), gpsMsgInfo.getLng(), gpsMsgInfo.getLat());
try {
String characterSet = parentPlatform.getCharacterSet();
StringBuffer deviceStatusXml = new StringBuffer(600);
@@ -405,7 +405,7 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform {
CallIdHeader callIdHeader = parentPlatform.getTransport().equals("TCP") ? tcpSipProvider.getNewCallId()
: udpSipProvider.getNewCallId();
callIdHeader.setCallId(subscribeInfo.getCallId());
logger.info("[发送 移动位置订阅] {}/{}->{},{}", parentPlatform.getServerGBId(), gpsMsgInfo.getId(), gpsMsgInfo.getLng(), gpsMsgInfo.getLat());
sendNotify(parentPlatform, deviceStatusXml.toString(), subscribeInfo, eventResult -> {
logger.error("发送NOTIFY通知消息失败。错误{} {}", eventResult.statusCode, eventResult.msg);
}, null);
@@ -459,7 +459,7 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform {
// 设置编码, 防止中文乱码
messageFactory.setDefaultContentEncodingCharset(characterSet);
Dialog dialog = subscribeInfo.getDialog();
if (dialog == null) return;
if (dialog == null || !dialog.getState().equals(DialogState.CONFIRMED)) return;
SIPRequest notifyRequest = (SIPRequest)dialog.createRequest(Request.NOTIFY);
ContentTypeHeader contentTypeHeader = sipFactory.createHeaderFactory().createContentTypeHeader("Application", "MANSCDP+xml");
notifyRequest.setContent(catalogXmlContent, contentTypeHeader);

View File

@@ -149,7 +149,6 @@ public class SubscribeRequestProcessor extends SIPRequestProcessorParent impleme
subscribeInfo.setDialog(dialog);
}
String sn = XmlUtil.getText(rootElement, "SN");
String key = VideoManagerConstants.SIP_SUBSCRIBE_PREFIX + userSetting.getServerId() + "_MobilePosition_" + platformId;
logger.info("[回复 移动位置订阅]: {}", platformId);
StringBuilder resultXml = new StringBuilder(200);
resultXml.append("<?xml version=\"1.0\" ?>\r\n")
@@ -161,23 +160,25 @@ public class SubscribeRequestProcessor extends SIPRequestProcessorParent impleme
.append("</Response>\r\n");
if (subscribeInfo.getExpires() > 0) {
if (subscribeHolder.getMobilePositionSubscribe(platformId) == null ) {
String interval = XmlUtil.getText(rootElement, "Interval"); // GPS上报时间间隔
subscribeHolder.putMobilePositionSubscribe(platformId, subscribeInfo);
dynamicTask.startCron(key, new MobilePositionSubscribeHandlerTask(redisCatchStorage, sipCommanderForPlatform, storager, platformId, sn, key, subscribeHolder), Integer.parseInt(interval));
String interval = XmlUtil.getText(rootElement, "Interval"); // GPS上报时间间隔
if (interval == null) {
subscribeInfo.setGpsInterval(5);
}else {
if (subscribeHolder.getMobilePositionSubscribe(platformId).getDialog() != null
&& subscribeHolder.getMobilePositionSubscribe(platformId).getDialog().getState() != null
&& !subscribeHolder.getMobilePositionSubscribe(platformId).getDialog().getState().equals(DialogState.CONFIRMED)) {
dynamicTask.stop(key);
String interval = XmlUtil.getText(rootElement, "Interval"); // GPS上报时间间隔
subscribeHolder.putMobilePositionSubscribe(platformId, subscribeInfo);
dynamicTask.startCron(key, new MobilePositionSubscribeHandlerTask(redisCatchStorage, sipCommanderForPlatform, storager, platformId, sn, key, subscribeHolder), Integer.parseInt(interval));
}
subscribeInfo.setGpsInterval(Integer.parseInt(interval));
}
subscribeInfo.setSn(sn);
subscribeHolder.putMobilePositionSubscribe(platformId, subscribeInfo);
// if (subscribeHolder.getMobilePositionSubscribe(platformId) == null ) {
// subscribeHolder.putMobilePositionSubscribe(platformId, subscribeInfo);
// }else {
// if (subscribeHolder.getMobilePositionSubscribe(platformId).getDialog() != null
// && subscribeHolder.getMobilePositionSubscribe(platformId).getDialog().getState() != null
// && !subscribeHolder.getMobilePositionSubscribe(platformId).getDialog().getState().equals(DialogState.CONFIRMED)) {
// subscribeHolder.putMobilePositionSubscribe(platformId, subscribeInfo);
// }
// }
}else if (subscribeInfo.getExpires() == 0) {
dynamicTask.stop(key);
subscribeHolder.removeMobilePositionSubscribe(platformId);
}
@@ -211,7 +212,6 @@ public class SubscribeRequestProcessor extends SIPRequestProcessorParent impleme
subscribeInfo.setDialog(dialog);
}
String sn = XmlUtil.getText(rootElement, "SN");
String key = VideoManagerConstants.SIP_SUBSCRIBE_PREFIX + userSetting.getServerId() + "_Catalog_" + platformId;
logger.info("[回复 目录订阅]: {}/{}", platformId, deviceID);
StringBuilder resultXml = new StringBuilder(200);
resultXml.append("<?xml version=\"1.0\" ?>\r\n")

View File

@@ -145,9 +145,6 @@ public class CatalogResponseMessageHandler extends SIPRequestProcessorParent imp
}
// 回复200 OK
responseAck(evt, Response.OK);
if (offLineDetector.isOnline(device.getDeviceId())) {
publisher.onlineEventPublish(device, VideoManagerConstants.EVENT_ONLINE_MESSAGE);
}
}
} catch (DocumentException e) {
e.printStackTrace();