优化订阅信息的发送与取消订阅

This commit is contained in:
648540858
2022-04-08 14:44:49 +08:00
parent f1c8ca602d
commit f10b458fc9
16 changed files with 242 additions and 124 deletions

View File

@@ -0,0 +1,5 @@
package com.genersoft.iot.vmp.gb28181.task;
public interface ISubscribeTask extends Runnable{
void stop();
}

View File

@@ -0,0 +1,75 @@
package com.genersoft.iot.vmp.gb28181.task.impl;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.task.ISubscribeTask;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.sip.Dialog;
import javax.sip.DialogState;
import javax.sip.ResponseEvent;
/**
* 目录订阅任务
*/
public class CatalogSubscribeTask implements ISubscribeTask {
private final Logger logger = LoggerFactory.getLogger(CatalogSubscribeTask.class);
private Device device;
private final ISIPCommander sipCommander;
private Dialog dialog;
public CatalogSubscribeTask(Device device, ISIPCommander sipCommander) {
this.device = device;
this.sipCommander = sipCommander;
}
@Override
public void run() {
sipCommander.catalogSubscribe(device, dialog, eventResult -> {
if (eventResult.dialog != null || eventResult.dialog.getState().equals(DialogState.CONFIRMED)) {
dialog = eventResult.dialog;
}
ResponseEvent event = (ResponseEvent) eventResult.event;
if (event.getResponse().getRawContent() != null) {
// 成功
logger.info("[目录订阅]成功: {}", device.getDeviceId());
}else {
// 成功
logger.info("[目录订阅]成功: {}", device.getDeviceId());
}
},eventResult -> {
dialog = null;
// 失败
logger.warn("[目录订阅]失败,信令发送失败: {}-{} ", device.getDeviceId(), eventResult.msg);
});
}
@Override
public void stop() {
/**
* dialog 的各个状态
* EARLY-> Early state状态-初始请求发送以后,收到了一个临时响应消息
* CONFIRMED-> Confirmed Dialog状态-已确认
* COMPLETED-> Completed Dialog状态-已完成
* TERMINATED-> Terminated Dialog状态-终止
*/
logger.info("取消目录订阅时dialog状态为{}", DialogState.CONFIRMED);
if (dialog != null && dialog.getState().equals(DialogState.CONFIRMED)) {
device.setSubscribeCycleForCatalog(0);
sipCommander.mobilePositionSubscribe(device, dialog, eventResult -> {
ResponseEvent event = (ResponseEvent) eventResult.event;
if (event.getResponse().getRawContent() != null) {
// 成功
logger.info("[取消目录订阅订阅]成功: {}", device.getDeviceId());
}else {
// 成功
logger.info("[取消目录订阅订阅]成功: {}", device.getDeviceId());
}
},eventResult -> {
// 失败
logger.warn("[取消目录订阅订阅]失败,信令发送失败: {}-{} ", device.getDeviceId(), eventResult.msg);
});
}
}
}

View File

@@ -1,10 +1,10 @@
package com.genersoft.iot.vmp.gb28181.task;
package com.genersoft.iot.vmp.gb28181.task.impl;
import com.alibaba.fastjson.JSON;
import com.genersoft.iot.vmp.gb28181.bean.GbStream;
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
import com.genersoft.iot.vmp.gb28181.bean.SubscribeHolder;
import com.genersoft.iot.vmp.gb28181.bean.SubscribeInfo;
import com.genersoft.iot.vmp.gb28181.task.ISubscribeTask;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
import com.genersoft.iot.vmp.service.bean.GPSMsgInfo;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
@@ -13,7 +13,10 @@ import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
import java.text.SimpleDateFormat;
import java.util.List;
public class GPSSubscribeTask implements Runnable{
/**
* 向已经订阅(移动位置)的上级发送MobilePosition消息
*/
public class MobilePositionSubscribeHandlerTask implements ISubscribeTask {
private IRedisCatchStorage redisCatchStorage;
private IVideoManagerStorage storager;
@@ -25,7 +28,7 @@ public class GPSSubscribeTask implements Runnable{
private final SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
public GPSSubscribeTask(IRedisCatchStorage redisCatchStorage, ISIPCommanderForPlatform sipCommanderForPlatform, IVideoManagerStorage storager, String platformId, String sn, String key, SubscribeHolder subscribeInfo) {
public MobilePositionSubscribeHandlerTask(IRedisCatchStorage redisCatchStorage, ISIPCommanderForPlatform sipCommanderForPlatform, IVideoManagerStorage storager, String platformId, String sn, String key, SubscribeHolder subscribeInfo) {
this.redisCatchStorage = redisCatchStorage;
this.storager = storager;
this.platformId = platformId;
@@ -66,4 +69,9 @@ public class GPSSubscribeTask implements Runnable{
}
}
}
@Override
public void stop() {
}
}

View File

@@ -0,0 +1,77 @@
package com.genersoft.iot.vmp.gb28181.task.impl;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.task.ISubscribeTask;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander;
import org.dom4j.Element;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.sip.Dialog;
import javax.sip.DialogState;
import javax.sip.ResponseEvent;
/**
* 移动位置订阅的定时更新
*/
public class MobilePositionSubscribeTask implements ISubscribeTask {
private final Logger logger = LoggerFactory.getLogger(MobilePositionSubscribeTask.class);
private Device device;
private ISIPCommander sipCommander;
private Dialog dialog;
public MobilePositionSubscribeTask(Device device, ISIPCommander sipCommander) {
this.device = device;
this.sipCommander = sipCommander;
}
@Override
public void run() {
sipCommander.mobilePositionSubscribe(device, dialog, eventResult -> {
if (eventResult.dialog != null || eventResult.dialog.getState().equals(DialogState.CONFIRMED)) {
dialog = eventResult.dialog;
}
ResponseEvent event = (ResponseEvent) eventResult.event;
if (event.getResponse().getRawContent() != null) {
// 成功
logger.info("[移动位置订阅]成功: {}", device.getDeviceId());
}else {
// 成功
logger.info("[移动位置订阅]成功: {}", device.getDeviceId());
}
},eventResult -> {
dialog = null;
// 失败
logger.warn("[移动位置订阅]失败,信令发送失败: {}-{} ", device.getDeviceId(), eventResult.msg);
});
}
@Override
public void stop() {
/**
* dialog 的各个状态
* EARLY-> Early state状态-初始请求发送以后,收到了一个临时响应消息
* CONFIRMED-> Confirmed Dialog状态-已确认
* COMPLETED-> Completed Dialog状态-已完成
* TERMINATED-> Terminated Dialog状态-终止
*/
logger.info("取消移动订阅时dialog状态为{}", dialog.getState());
if (dialog != null && dialog.getState().equals(DialogState.CONFIRMED)) {
device.setSubscribeCycleForMobilePosition(0);
sipCommander.mobilePositionSubscribe(device, dialog, eventResult -> {
ResponseEvent event = (ResponseEvent) eventResult.event;
if (event.getResponse().getRawContent() != null) {
// 成功
logger.info("[取消移动位置订阅]成功: {}", device.getDeviceId());
}else {
// 成功
logger.info("[取消移动位置订阅]成功: {}", device.getDeviceId());
}
},eventResult -> {
// 失败
logger.warn("[取消移动位置订阅]失败,信令发送失败: {}-{} ", device.getDeviceId(), eventResult.msg);
});
}
}
}

View File

@@ -8,6 +8,8 @@ import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.service.bean.SSRCInfo;
import javax.sip.Dialog;
/**
* @description:设备能力接口,用于定义设备的控制、查询能力
* @author: swwheihei
@@ -304,7 +306,7 @@ public interface ISIPCommander {
* @param device 视频设备
* @return true = 命令发送成功
*/
boolean mobilePositionSubscribe(Device device, SipSubscribe.Event okEvent ,SipSubscribe.Event errorEvent);
boolean mobilePositionSubscribe(Device device, Dialog dialog, SipSubscribe.Event okEvent , SipSubscribe.Event errorEvent);
/**
* 订阅、取消订阅报警信息
@@ -324,7 +326,7 @@ public interface ISIPCommander {
* @param device 视频设备
* @return true = 命令发送成功
*/
boolean catalogSubscribe(Device device, SipSubscribe.Event okEvent ,SipSubscribe.Event errorEvent);
boolean catalogSubscribe(Device device, Dialog dialog, SipSubscribe.Event okEvent ,SipSubscribe.Event errorEvent);
/**
* 拉框控制命令

View File

@@ -36,6 +36,8 @@ import org.springframework.util.StringUtils;
import javax.sip.*;
import javax.sip.address.SipURI;
import javax.sip.header.CallIdHeader;
import javax.sip.header.ContentTypeHeader;
import javax.sip.header.ExpiresHeader;
import javax.sip.header.ViaHeader;
import javax.sip.message.Request;
import java.lang.reflect.Field;
@@ -56,6 +58,9 @@ public class SIPCommander implements ISIPCommander {
@Autowired
private SipConfig sipConfig;
@Autowired
private SipFactory sipFactory;
@Autowired
@Qualifier(value="tcpSipProvider")
private SipProviderImpl tcpSipProvider;
@@ -1453,7 +1458,7 @@ public class SIPCommander implements ISIPCommander {
* @param device 视频设备
* @return true = 命令发送成功
*/
public boolean mobilePositionSubscribe(Device device, SipSubscribe.Event okEvent ,SipSubscribe.Event errorEvent) {
public boolean mobilePositionSubscribe(Device device, Dialog dialog, SipSubscribe.Event okEvent ,SipSubscribe.Event errorEvent) {
try {
StringBuffer subscribePostitionXml = new StringBuffer(200);
String charset = device.getCharset();
@@ -1467,12 +1472,20 @@ public class SIPCommander implements ISIPCommander {
}
subscribePostitionXml.append("</Query>\r\n");
String tm = Long.toString(System.currentTimeMillis());
CallIdHeader callIdHeader = device.getTransport().equals("TCP") ? tcpSipProvider.getNewCallId()
: udpSipProvider.getNewCallId();
Request request = headerProvider.createSubscribeRequest(device, subscribePostitionXml.toString(), "z9hG4bK-viaPos-" + tm, "fromTagPos" + tm, null, device.getSubscribeCycleForMobilePosition(), "presence" ,callIdHeader); //Position;id=" + tm.substring(tm.length() - 4));
Request request;
if (dialog != null) {
logger.info("发送移动位置订阅消息时 dialog的状态为 {}", dialog.getState());
request = dialog.createRequest(Request.SUBSCRIBE);
ContentTypeHeader contentTypeHeader = sipFactory.createHeaderFactory().createContentTypeHeader("Application", "MANSCDP+xml");
request.setContent(subscribePostitionXml.toString(), contentTypeHeader);
ExpiresHeader expireHeader = sipFactory.createHeaderFactory().createExpiresHeader(device.getSubscribeCycleForMobilePosition());
request.addHeader(expireHeader);
}else {
String tm = Long.toString(System.currentTimeMillis());
CallIdHeader callIdHeader = device.getTransport().equals("TCP") ? tcpSipProvider.getNewCallId()
: udpSipProvider.getNewCallId();
request = headerProvider.createSubscribeRequest(device, subscribePostitionXml.toString(), "z9hG4bK-viaPos-" + tm, "fromTagPos" + tm, null, device.getSubscribeCycleForMobilePosition(), "presence" ,callIdHeader); //Position;id=" + tm.substring(tm.length() - 4));
}
transmitRequest(device, request, errorEvent, okEvent);
return true;
@@ -1542,7 +1555,7 @@ public class SIPCommander implements ISIPCommander {
}
@Override
public boolean catalogSubscribe(Device device, SipSubscribe.Event okEvent, SipSubscribe.Event errorEvent) {
public boolean catalogSubscribe(Device device, Dialog dialog, SipSubscribe.Event okEvent, SipSubscribe.Event errorEvent) {
try {
StringBuffer cmdXml = new StringBuffer(200);
String charset = device.getCharset();

View File

@@ -7,7 +7,7 @@ import com.genersoft.iot.vmp.gb28181.bean.CmdType;
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
import com.genersoft.iot.vmp.gb28181.bean.SubscribeHolder;
import com.genersoft.iot.vmp.gb28181.bean.SubscribeInfo;
import com.genersoft.iot.vmp.gb28181.task.GPSSubscribeTask;
import com.genersoft.iot.vmp.gb28181.task.impl.MobilePositionSubscribeHandlerTask;
import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor;
@@ -147,7 +147,7 @@ public class SubscribeRequestProcessor extends SIPRequestProcessorParent impleme
}
String sn = XmlUtil.getText(rootElement, "SN");
String key = VideoManagerConstants.SIP_SUBSCRIBE_PREFIX + userSetting.getServerId() + "_MobilePosition_" + platformId;
logger.info("接收到{}的MobilePosition订阅", platformId);
logger.info("[notify-MobilePosition]: {}", platformId);
StringBuilder resultXml = new StringBuilder(200);
resultXml.append("<?xml version=\"1.0\" ?>\r\n")
.append("<Response>\r\n")
@@ -162,7 +162,7 @@ public class SubscribeRequestProcessor extends SIPRequestProcessorParent impleme
dynamicTask.stop(key);
}
String interval = XmlUtil.getText(rootElement, "Interval"); // GPS上报时间间隔
dynamicTask.startCron(key, new GPSSubscribeTask(redisCatchStorage, sipCommanderForPlatform, storager, platformId, sn, key, subscribeHolder), Integer.parseInt(interval));
dynamicTask.startCron(key, new MobilePositionSubscribeHandlerTask(redisCatchStorage, sipCommanderForPlatform, storager, platformId, sn, key, subscribeHolder), Integer.parseInt(interval) -1 );
subscribeHolder.putMobilePositionSubscribe(platformId, subscribeInfo);
}else if (subscribeInfo.getExpires() == 0) {
dynamicTask.stop(key);
@@ -200,7 +200,7 @@ public class SubscribeRequestProcessor extends SIPRequestProcessorParent impleme
}
String sn = XmlUtil.getText(rootElement, "SN");
String key = VideoManagerConstants.SIP_SUBSCRIBE_PREFIX + userSetting.getServerId() + "_Catalog_" + platformId;
logger.info("接收到{}的Catalog订阅", platformId);
logger.info("[notify-Catalog]: {}", platformId);
StringBuilder resultXml = new StringBuilder(200);
resultXml.append("<?xml version=\"1.0\" ?>\r\n")
.append("<Response>\r\n")

View File

@@ -81,7 +81,7 @@ public class InviteResponseProcessor extends SIPResponseProcessorAbstract {
}
requestURI.setPort(event.getRemotePort());
reqAck.setRequestURI(requestURI);
logger.info("" + event.getRemoteIpAddress() + ":" + event.getRemotePort() + "回复ack");
logger.info("[回复ack] {}-> {}:{} ",requestURI, event.getRemoteIpAddress(), event.getRemotePort());
dialog.sendAck(reqAck);