Merge branch 'wvp-28181-2.0' into main-dev

# Conflicts:
#	src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java
#	src/main/java/com/genersoft/iot/vmp/service/IPlatformService.java
#	src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java
This commit is contained in:
648540858
2023-10-11 01:36:12 +08:00
28 changed files with 314 additions and 75 deletions

View File

@@ -186,6 +186,9 @@ public class ParentPlatform {
@Schema(description = "是否作为消息通道")
private boolean asMessageChannel;
@Schema(description = "是否作为消息通道")
private boolean autoPushChannel;
public Integer getId() {
return id;
}
@@ -425,4 +428,12 @@ public class ParentPlatform {
public void setAsMessageChannel(boolean asMessageChannel) {
this.asMessageChannel = asMessageChannel;
}
public boolean isAutoPushChannel() {
return autoPushChannel;
}
public void setAutoPushChannel(boolean autoPushChannel) {
this.autoPushChannel = autoPushChannel;
}
}

View File

@@ -32,11 +32,13 @@ public class SubscribeHolder {
public void putCatalogSubscribe(String platformId, SubscribeInfo subscribeInfo) {
catalogMap.put(platformId, subscribeInfo);
// 添加订阅到期
String taskOverdueKey = taskOverduePrefix + "catalog_" + platformId;
// 添加任务处理订阅过期
dynamicTask.startDelay(taskOverdueKey, () -> removeCatalogSubscribe(subscribeInfo.getId()),
subscribeInfo.getExpires() * 1000);
if (subscribeInfo.getExpires() > 0) {
// 添加订阅到期
String taskOverdueKey = taskOverduePrefix + "catalog_" + platformId;
// 添加任务处理订阅过期
dynamicTask.startDelay(taskOverdueKey, () -> removeCatalogSubscribe(subscribeInfo.getId()),
subscribeInfo.getExpires() * 1000);
}
}
public SubscribeInfo getCatalogSubscribe(String platformId) {
@@ -63,11 +65,13 @@ public class SubscribeHolder {
dynamicTask.startCron(key, new MobilePositionSubscribeHandlerTask(platformId),
subscribeInfo.getGpsInterval() * 1000);
String taskOverdueKey = taskOverduePrefix + "MobilePosition_" + platformId;
// 添加任务处理订阅过期
dynamicTask.startDelay(taskOverdueKey, () -> {
removeMobilePositionSubscribe(subscribeInfo.getId());
},
subscribeInfo.getExpires() * 1000);
if (subscribeInfo.getExpires() > 0) {
// 添加任务处理订阅过期
dynamicTask.startDelay(taskOverdueKey, () -> {
removeMobilePositionSubscribe(subscribeInfo.getId());
},
subscribeInfo.getExpires() * 1000);
}
}
public SubscribeInfo getMobilePositionSubscribe(String platformId) {

View File

@@ -18,6 +18,9 @@ public class SubscribeInfo {
}
public SubscribeInfo() {
}
private String id;
private SIPRequest request;
@@ -33,6 +36,21 @@ public class SubscribeInfo {
private String sn;
private int gpsInterval;
/**
* 模拟的FromTag
*/
private String simulatedFromTag;
/**
* 模拟的ToTag
*/
private String simulatedToTag;
/**
* 模拟的CallID
*/
private String simulatedCallId;
public String getId() {
return id;
}
@@ -96,4 +114,28 @@ public class SubscribeInfo {
public void setGpsInterval(int gpsInterval) {
this.gpsInterval = gpsInterval;
}
public String getSimulatedFromTag() {
return simulatedFromTag;
}
public void setSimulatedFromTag(String simulatedFromTag) {
this.simulatedFromTag = simulatedFromTag;
}
public String getSimulatedCallId() {
return simulatedCallId;
}
public void setSimulatedCallId(String simulatedCallId) {
this.simulatedCallId = simulatedCallId;
}
public String getSimulatedToTag() {
return simulatedToTag;
}
public void setSimulatedToTag(String simulatedToTag) {
this.simulatedToTag = simulatedToTag;
}
}

View File

@@ -93,7 +93,10 @@ public class CatalogEventLister implements ApplicationListener<CatalogEvent> {
}
if (event.getGbStreams() != null && event.getGbStreams().size() > 0){
for (GbStream gbStream : event.getGbStreams()) {
if (gbStream.getStreamType().equals("push") && !userSetting.isUsePushingAsStatus()) {
if (gbStream != null
&& gbStream.getStreamType() != null
&& gbStream.getStreamType().equals("push")
&& !userSetting.isUsePushingAsStatus()) {
continue;
}
DeviceChannel deviceChannelByStream = gbStreamService.getDeviceChannelListByStream(gbStream, gbStream.getCatalogId(), parentPlatform);

View File

@@ -228,11 +228,11 @@ public class SIPRequestHeaderPlarformProvider {
SipURI fromSipURI = SipFactory.getInstance().createAddressFactory().createSipURI(parentPlatform.getDeviceGBId(),
parentPlatform.getDeviceIp() + ":" + parentPlatform.getDevicePort());
Address fromAddress = SipFactory.getInstance().createAddressFactory().createAddress(fromSipURI);
FromHeader fromHeader = SipFactory.getInstance().createHeaderFactory().createFromHeader(fromAddress, subscribeInfo.getResponse().getToTag());
FromHeader fromHeader = SipFactory.getInstance().createHeaderFactory().createFromHeader(fromAddress, subscribeInfo.getResponse() != null ? subscribeInfo.getResponse().getToTag(): subscribeInfo.getSimulatedToTag());
// to
SipURI toSipURI = SipFactory.getInstance().createAddressFactory().createSipURI(parentPlatform.getServerGBId(), parentPlatform.getServerGBDomain());
Address toAddress = SipFactory.getInstance().createAddressFactory().createAddress(toSipURI);
ToHeader toHeader = SipFactory.getInstance().createHeaderFactory().createToHeader(toAddress, subscribeInfo.getRequest().getFromTag());
ToHeader toHeader = SipFactory.getInstance().createHeaderFactory().createToHeader(toAddress, subscribeInfo.getRequest() != null ?subscribeInfo.getRequest().getFromTag(): subscribeInfo.getSimulatedFromTag());
// Forwards
MaxForwardsHeader maxForwards = SipFactory.getInstance().createHeaderFactory().createMaxForwardsHeader(70);
@@ -242,7 +242,7 @@ public class SIPRequestHeaderPlarformProvider {
// 设置编码, 防止中文乱码
messageFactory.setDefaultContentEncodingCharset("gb2312");
CallIdHeader callIdHeader = SipFactory.getInstance().createHeaderFactory().createCallIdHeader(subscribeInfo.getRequest().getCallIdHeader().getCallId());
CallIdHeader callIdHeader = SipFactory.getInstance().createHeaderFactory().createCallIdHeader(subscribeInfo.getRequest() != null ? subscribeInfo.getRequest().getCallIdHeader().getCallId(): subscribeInfo.getSimulatedCallId());
request = (SIPRequest) messageFactory.createRequest(requestURI, Request.NOTIFY, callIdHeader, cSeqHeader, fromHeader,
toHeader, viaHeaders, maxForwards);

View File

@@ -169,13 +169,13 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform {
CallIdHeader callIdHeader = sipSender.getNewCallIdHeader(parentPlatform.getDeviceIp(),parentPlatform.getTransport());
Request request = headerProviderPlatformProvider.createMessageRequest(
parentPlatform,
keepaliveXml.toString(),
SipUtils.getNewFromTag(),
SipUtils.getNewViaTag(),
callIdHeader);
sipSender.transmitRequest(parentPlatform.getDeviceIp(), request, errorEvent, okEvent);
Request request = headerProviderPlatformProvider.createMessageRequest(
parentPlatform,
keepaliveXml.toString(),
SipUtils.getNewFromTag(),
SipUtils.getNewViaTag(),
callIdHeader);
sipSender.transmitRequest(parentPlatform.getDeviceIp(), request, errorEvent, okEvent);
return callIdHeader.getCallId();
}

View File

@@ -3,6 +3,7 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.request;
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
import com.genersoft.iot.vmp.gb28181.transmit.SIPSender;
import com.genersoft.iot.vmp.gb28181.utils.SipUtils;
import com.google.common.primitives.Bytes;
import gov.nist.javax.sip.message.SIPRequest;
import gov.nist.javax.sip.message.SIPResponse;
import org.apache.commons.lang3.ArrayUtils;
@@ -203,15 +204,14 @@ public abstract class SIPRequestProcessorParent {
result.add(rawContent[i]);
}
}
Byte[] bytes = new Byte[0];
byte[] bytesResult = ArrayUtils.toPrimitive(result.toArray(bytes));
byte[] bytesResult = Bytes.toArray(result);
Document xml;
try {
xml = reader.read(new ByteArrayInputStream(bytesResult));
}catch (DocumentException e) {
logger.warn("[xml解析异常] 文如下: \r\n{}", new String(bytesResult));
logger.warn("[xml解析异常] 文如下: 尝试兼容性处理");
logger.warn("[xml解析异常] 文如下: \r\n{}", new String(bytesResult));
logger.warn("[xml解析异常] 文如下: 尝试兼容性处理");
String[] xmlLineArray = new String(bytesResult).split("\\r?\\n");
// 兼容海康的address字段带有<破换xml结构导致无法解析xml的问题

View File

@@ -13,6 +13,7 @@ import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor
import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
import com.genersoft.iot.vmp.gb28181.utils.SipUtils;
import com.genersoft.iot.vmp.gb28181.utils.XmlUtil;
import com.genersoft.iot.vmp.service.IPlatformService;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
import gov.nist.javax.sip.message.SIPRequest;
import gov.nist.javax.sip.message.SIPResponse;
@@ -53,6 +54,10 @@ public class SubscribeRequestProcessor extends SIPRequestProcessorParent impleme
@Autowired
private SIPSender sipSender;
@Autowired
private IPlatformService platformService;
@Override
public void afterPropertiesSet() throws Exception {
// 添加消息处理的订阅
@@ -194,5 +199,8 @@ public class SubscribeRequestProcessor extends SIPRequestProcessorParent impleme
} catch (SipException | InvalidArgumentException | ParseException e) {
logger.error("未处理的异常 ", e);
}
if (subscribeHolder.getCatalogSubscribe(platformId) == null && platform.isAutoPushChannel()) {
platformService.addSimulatedSubscribeInfo(platform);
}
}
}

View File

@@ -78,7 +78,6 @@ public class RecordInfoResponseMessageHandler extends SIPRequestProcessorParent
}
taskExecutor.execute(()->{
try {
String sn = getText(rootElement, "SN");
String channelId = getText(rootElement, "DeviceID");
RecordInfo recordInfo = new RecordInfo();