diff --git a/doc/_content/donation.md b/doc/_content/donation.md
deleted file mode 100644
index 959eee819..000000000
--- a/doc/_content/donation.md
+++ /dev/null
@@ -1,7 +0,0 @@
-# 捐赠
-项目目前仍在积极开发。大家的捐赠以及start可以让我看到大家的支持于关注。更加有动力把项目维护下去。
-
-
-

-

-
\ No newline at end of file
diff --git a/doc/_sidebar.md b/doc/_sidebar.md
index eb0c96c01..3b10bae84 100644
--- a/doc/_sidebar.md
+++ b/doc/_sidebar.md
@@ -29,5 +29,4 @@
- [设备注册不上来的解决办法](_content/qa/regiser_error.md)
- [点播超时/报错的解决办法](_content/qa/play_error.md)
* [**免责声明**](_content/disclaimers.md)
-* [**捐赠**](_content/donation.md)
* [**关于本文档**](_content/about_doc.md)
diff --git a/pom.xml b/pom.xml
index 2b9a838c6..71dd94850 100644
--- a/pom.xml
+++ b/pom.xml
@@ -11,7 +11,7 @@
com.genersoft
wvp-pro
- 2.3.1
+ 2.3.2
web video platform
国标28181视频平台
diff --git a/src/main/java/com/genersoft/iot/vmp/common/StreamInfo.java b/src/main/java/com/genersoft/iot/vmp/common/StreamInfo.java
index 4f717f05e..41a56cd61 100644
--- a/src/main/java/com/genersoft/iot/vmp/common/StreamInfo.java
+++ b/src/main/java/com/genersoft/iot/vmp/common/StreamInfo.java
@@ -31,6 +31,8 @@ public class StreamInfo {
private String rtsp;
private String rtsps;
private String rtc;
+
+ private String rtcs;
private String mediaServerId;
private Object tracks;
private String startTime;
@@ -302,4 +304,12 @@ public class StreamInfo {
public void setIp(String ip) {
this.ip = ip;
}
+
+ public String getRtcs() {
+ return rtcs;
+ }
+
+ public void setRtcs(String rtcs) {
+ this.rtcs = rtcs;
+ }
}
diff --git a/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java b/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java
index bbbfce971..7a122c774 100644
--- a/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java
+++ b/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java
@@ -14,8 +14,6 @@ public class VideoManagerConstants {
public static final String MEDIA_SERVER_PREFIX = "VMP_MEDIA_SERVER_";
- public static final String MEDIA_SERVER_KEEPALIVE_PREFIX = "VMP_MEDIA_SERVER_KEEPALIVE_";
-
public static final String MEDIA_SERVERS_ONLINE_PREFIX = "VMP_MEDIA_ONLINE_SERVERS_";
public static final String MEDIA_STREAM_PREFIX = "VMP_MEDIA_STREAM";
diff --git a/src/main/java/com/genersoft/iot/vmp/conf/RedisKeyExpirationEventMessageListener.java b/src/main/java/com/genersoft/iot/vmp/conf/RedisKeyExpirationEventMessageListener.java
deleted file mode 100644
index ef4a6172e..000000000
--- a/src/main/java/com/genersoft/iot/vmp/conf/RedisKeyExpirationEventMessageListener.java
+++ /dev/null
@@ -1,40 +0,0 @@
-package com.genersoft.iot.vmp.conf;
-
-import org.springframework.data.redis.connection.RedisConnection;
-import org.springframework.data.redis.listener.KeyExpirationEventMessageListener;
-import org.springframework.data.redis.listener.RedisMessageListenerContainer;
-
-import java.util.Properties;
-
-public class RedisKeyExpirationEventMessageListener extends KeyExpirationEventMessageListener {
-
- private UserSetting userSetting;
- private RedisMessageListenerContainer listenerContainer;
- private String keyspaceNotificationsConfigParameter = "EA";
-
- public RedisKeyExpirationEventMessageListener(RedisMessageListenerContainer listenerContainer, UserSetting userSetting) {
- super(listenerContainer);
- this.listenerContainer = listenerContainer;
- this.userSetting = userSetting;
- }
-
- @Override
- public void init() {
- if (!userSetting.getRedisConfig()) {
- // 配置springboot默认Config为空,即不让应用去修改redis的默认配置,因为Redis服务出于安全会禁用CONFIG命令给远程用户使用
- setKeyspaceNotificationsConfigParameter("");
- }else {
-
- RedisConnection connection = this.listenerContainer.getConnectionFactory().getConnection();
- Properties config = connection.getConfig("notify-keyspace-events");
- try {
- if (!keyspaceNotificationsConfigParameter.equals(config.getProperty("notify-keyspace-events"))) {
- connection.setConfig("notify-keyspace-events", keyspaceNotificationsConfigParameter);
- }
- } finally {
- connection.close();
- }
- }
- super.init();
- }
-}
diff --git a/src/main/java/com/genersoft/iot/vmp/conf/SipPlatformRunner.java b/src/main/java/com/genersoft/iot/vmp/conf/SipPlatformRunner.java
index cf16f8642..93674f61c 100644
--- a/src/main/java/com/genersoft/iot/vmp/conf/SipPlatformRunner.java
+++ b/src/main/java/com/genersoft/iot/vmp/conf/SipPlatformRunner.java
@@ -4,6 +4,7 @@ import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatformCatch;
import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
+import com.genersoft.iot.vmp.service.IPlatformService;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
import org.springframework.beans.factory.annotation.Autowired;
@@ -15,6 +16,7 @@ import java.util.List;
/**
* 系统启动时控制上级平台重新注册
+ * @author lin
*/
@Component
@Order(value=3)
@@ -27,7 +29,7 @@ public class SipPlatformRunner implements CommandLineRunner {
private IRedisCatchStorage redisCatchStorage;
@Autowired
- private EventPublisher publisher;
+ private IPlatformService platformService;
@Autowired
private ISIPCommanderForPlatform sipCommanderForPlatform;
@@ -35,33 +37,26 @@ public class SipPlatformRunner implements CommandLineRunner {
@Override
public void run(String... args) throws Exception {
- // 设置所有平台离线
- storager.outlineForAllParentPlatform();
-
- // 清理所有平台注册缓存
- redisCatchStorage.cleanPlatformRegisterInfos();
-
- // 停止所有推流
-// zlmrtpServerFactory.closeAllSendRtpStream();
-
+ // 获取所有启用的平台
List parentPlatforms = storager.queryEnableParentPlatformList(true);
for (ParentPlatform parentPlatform : parentPlatforms) {
- redisCatchStorage.updatePlatformRegister(parentPlatform);
-
- redisCatchStorage.updatePlatformKeepalive(parentPlatform);
-
+ // 更新缓存
ParentPlatformCatch parentPlatformCatch = new ParentPlatformCatch();
-
parentPlatformCatch.setParentPlatform(parentPlatform);
parentPlatformCatch.setId(parentPlatform.getServerGBId());
redisCatchStorage.updatePlatformCatchInfo(parentPlatformCatch);
+ if (parentPlatform.isStatus()) {
+ // 设置所有平台离线
+ platformService.offline(parentPlatform);
+ // 取消订阅
+ sipCommanderForPlatform.unregister(parentPlatform, null, (eventResult)->{
+ platformService.login(parentPlatform);
+ });
+ }else {
+ platformService.login(parentPlatform);
+ }
- // 取消订阅
- sipCommanderForPlatform.unregister(parentPlatform, null, (eventResult)->{
- // 发送平台未注册消息
- publisher.platformNotRegisterEventPublish(parentPlatform.getServerGBId());
- });
}
}
}
diff --git a/src/main/java/com/genersoft/iot/vmp/conf/UserSetting.java b/src/main/java/com/genersoft/iot/vmp/conf/UserSetting.java
index d28ddebc8..017b39db4 100644
--- a/src/main/java/com/genersoft/iot/vmp/conf/UserSetting.java
+++ b/src/main/java/com/genersoft/iot/vmp/conf/UserSetting.java
@@ -31,8 +31,6 @@ public class UserSetting {
private Boolean logInDatebase = Boolean.TRUE;
- private Boolean redisConfig = Boolean.TRUE;
-
private String serverId = "000000";
private String thirdPartyGBIdReg = "[\\s\\S]*";
@@ -123,14 +121,6 @@ public class UserSetting {
this.thirdPartyGBIdReg = thirdPartyGBIdReg;
}
- public Boolean getRedisConfig() {
- return redisConfig;
- }
-
- public void setRedisConfig(Boolean redisConfig) {
- this.redisConfig = redisConfig;
- }
-
public Boolean getRecordSip() {
return recordSip;
}
diff --git a/src/main/java/com/genersoft/iot/vmp/conf/security/AnonymousAuthenticationEntryPoint.java b/src/main/java/com/genersoft/iot/vmp/conf/security/AnonymousAuthenticationEntryPoint.java
index 95b226253..9cdd2a49c 100644
--- a/src/main/java/com/genersoft/iot/vmp/conf/security/AnonymousAuthenticationEntryPoint.java
+++ b/src/main/java/com/genersoft/iot/vmp/conf/security/AnonymousAuthenticationEntryPoint.java
@@ -1,6 +1,8 @@
package com.genersoft.iot.vmp.conf.security;
import com.alibaba.fastjson.JSONObject;
+import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
+import org.apache.poi.hssf.eventmodel.ERFListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.security.core.AuthenticationException;
@@ -28,8 +30,8 @@ public class AnonymousAuthenticationEntryPoint implements AuthenticationEntryPoi
response.setHeader("Access-Control-Allow-Headers", "token, Accept, Origin, X-Requested-With, Content-Type, Last-Modified");
response.setHeader("Content-type", "application/json;charset=UTF-8");
JSONObject jsonObject = new JSONObject();
- jsonObject.put("code", "-1");
- jsonObject.put("msg", "请登录后重新请求");
+ jsonObject.put("code", ErrorCode.ERROR401.getCode());
+ jsonObject.put("msg", ErrorCode.ERROR401.getMsg());
String logUri = "api/user/login";
if (request.getRequestURI().contains(logUri)){
jsonObject.put("msg", e.getMessage());
diff --git a/src/main/java/com/genersoft/iot/vmp/conf/security/UrlTokenHandler.java b/src/main/java/com/genersoft/iot/vmp/conf/security/UrlTokenHandler.java
new file mode 100644
index 000000000..e63aca4a5
--- /dev/null
+++ b/src/main/java/com/genersoft/iot/vmp/conf/security/UrlTokenHandler.java
@@ -0,0 +1,24 @@
+package com.genersoft.iot.vmp.conf.security;
+
+import org.springframework.boot.web.servlet.support.SpringBootServletInitializer;
+
+import javax.servlet.ServletContext;
+import javax.servlet.ServletException;
+import javax.servlet.SessionCookieConfig;
+import javax.servlet.SessionTrackingMode;
+import java.util.Collections;
+
+public class UrlTokenHandler extends SpringBootServletInitializer {
+
+ @Override
+ public void onStartup(ServletContext servletContext) throws ServletException {
+ super.onStartup(servletContext);
+
+ servletContext.setSessionTrackingModes(
+ Collections.singleton(SessionTrackingMode.COOKIE)
+ );
+ SessionCookieConfig sessionCookieConfig = servletContext.getSessionCookieConfig();
+ sessionCookieConfig.setHttpOnly(true);
+
+ }
+}
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/SipLayer.java b/src/main/java/com/genersoft/iot/vmp/gb28181/SipLayer.java
index c1811bf43..f32bd26a5 100644
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/SipLayer.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/SipLayer.java
@@ -10,14 +10,10 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.DependsOn;
-import org.springframework.stereotype.Component;
import javax.sip.*;
import java.util.Properties;
import java.util.TooManyListenersException;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
@Configuration
public class SipLayer{
@@ -52,7 +48,9 @@ public class SipLayer{
* 完整配置参考 gov.nist.javax.sip.SipStackImpl,需要下载源码
* gov/nist/javax/sip/SipStackImpl.class
*/
- properties.setProperty("gov.nist.javax.sip.LOG_MESSAGE_CONTENT", "true");
+ if (logger.isDebugEnabled()) {
+ properties.setProperty("gov.nist.javax.sip.LOG_MESSAGE_CONTENT", "false");
+ }
// 接收所有notify请求,即使没有订阅
properties.setProperty("gov.nist.javax.sip.DELIVER_UNSOLICITED_NOTIFY", "true");
// 为_NULL _对话框传递_终止的_事件
@@ -63,13 +61,13 @@ public class SipLayer{
properties.setProperty("gov.nist.javax.sip.RELIABLE_CONNECTION_KEEP_ALIVE_TIMEOUT", "60");
/**
- * sip_server_log.log 和 sip_debug_log.log public static final int TRACE_NONE =
- * 0; public static final int TRACE_MESSAGES = 16; public static final int
- * TRACE_EXCEPTION = 17; public static final int TRACE_DEBUG = 32;
+ * sip_server_log.log 和 sip_debug_log.log ERROR, INFO, WARNING, OFF, DEBUG, TRACE
*/
- properties.setProperty("gov.nist.javax.sip.TRACE_LEVEL", "0");
- properties.setProperty("gov.nist.javax.sip.SERVER_LOG", "sip_server_log");
- properties.setProperty("gov.nist.javax.sip.DEBUG_LOG", "sip_debug_log");
+ properties.setProperty("gov.nist.javax.sip.TRACE_LEVEL", "ERROR");
+// if (logger.isDebugEnabled()) {
+// properties.setProperty("gov.nist.javax.sip.TRACE_LEVEL", "DEBUG");
+// }
+
sipStack = (SipStackImpl) sipFactory.createSipStack(properties);
return sipStack;
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/ParentPlatform.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/ParentPlatform.java
index ef2eecd5a..ade5d0eef 100644
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/ParentPlatform.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/ParentPlatform.java
@@ -84,7 +84,7 @@ public class ParentPlatform {
* 注册周期 (秒)
*/
@Schema(description = "注册周期 (秒)")
- private String expires;
+ private int expires;
/**
* 心跳周期(秒)
@@ -286,11 +286,11 @@ public class ParentPlatform {
this.password = password;
}
- public String getExpires() {
+ public int getExpires() {
return expires;
}
- public void setExpires(String expires) {
+ public void setExpires(int expires) {
this.expires = expires;
}
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/ParentPlatformCatch.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/ParentPlatformCatch.java
index 6c429f265..a53d26e48 100644
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/ParentPlatformCatch.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/ParentPlatformCatch.java
@@ -4,7 +4,9 @@ public class ParentPlatformCatch {
private String id;
- // 心跳未回复次数
+ /**
+ * 心跳未回复次数
+ */
private int keepAliveReply;
// 注册未回复次数
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeHolder.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeHolder.java
index 4a900c164..441dff3fd 100644
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeHolder.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeHolder.java
@@ -14,6 +14,9 @@ import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
+/**
+ * @author lin
+ */
@Component
public class SubscribeHolder {
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/event/EventPublisher.java b/src/main/java/com/genersoft/iot/vmp/gb28181/event/EventPublisher.java
index 8a4dd3da4..26ababd4c 100644
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/event/EventPublisher.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/event/EventPublisher.java
@@ -2,9 +2,6 @@ package com.genersoft.iot.vmp.gb28181.event;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.event.device.RequestTimeoutEvent;
-import com.genersoft.iot.vmp.gb28181.event.platformKeepaliveExpire.PlatformKeepaliveExpireEvent;
-import com.genersoft.iot.vmp.gb28181.event.platformNotRegister.PlatformCycleRegisterEvent;
-import com.genersoft.iot.vmp.gb28181.event.platformNotRegister.PlatformNotRegisterEvent;
import com.genersoft.iot.vmp.gb28181.event.record.RecordEndEvent;
import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent;
import com.genersoft.iot.vmp.media.zlm.event.ZLMOfflineEvent;
@@ -31,36 +28,6 @@ public class EventPublisher {
@Autowired
private ApplicationEventPublisher applicationEventPublisher;
-
- /**
- * 平台心跳到期事件
- * @param platformGbId
- */
- public void platformKeepaliveExpireEventPublish(String platformGbId){
- PlatformKeepaliveExpireEvent platformKeepaliveExpireEvent = new PlatformKeepaliveExpireEvent(this);
- platformKeepaliveExpireEvent.setPlatformGbID(platformGbId);
- applicationEventPublisher.publishEvent(platformKeepaliveExpireEvent);
- }
-
- /**
- * 平台未注册事件
- * @param platformGbId
- */
- public void platformNotRegisterEventPublish(String platformGbId){
- PlatformNotRegisterEvent platformNotRegisterEvent = new PlatformNotRegisterEvent(this);
- platformNotRegisterEvent.setPlatformGbID(platformGbId);
- applicationEventPublisher.publishEvent(platformNotRegisterEvent);
- }
-
- /**
- * 平台周期注册事件
- * @param paltformGbId
- */
- public void platformRegisterCycleEventPublish(String paltformGbId) {
- PlatformCycleRegisterEvent platformCycleRegisterEvent = new PlatformCycleRegisterEvent(this);
- platformCycleRegisterEvent.setPlatformGbID(paltformGbId);
- applicationEventPublisher.publishEvent(platformCycleRegisterEvent);
- }
/**
* 设备报警事件
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/event/SipSubscribe.java b/src/main/java/com/genersoft/iot/vmp/gb28181/event/SipSubscribe.java
index c6cfc7a04..b3fd82e8d 100644
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/event/SipSubscribe.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/event/SipSubscribe.java
@@ -59,9 +59,25 @@ public class SipSubscribe {
void response(EventResult eventResult);
}
+ /**
+ *
+ */
+ public enum EventResultType{
+ // 超时
+ timeout,
+ // 回复
+ response,
+ // 事务已结束
+ transactionTerminated,
+ // 会话已结束
+ dialogTerminated,
+ // 设备未找到
+ deviceNotFoundEvent
+ }
+
public static class EventResult{
public int statusCode;
- public String type;
+ public EventResultType type;
public String msg;
public String callId;
public Dialog dialog;
@@ -76,7 +92,7 @@ public class SipSubscribe {
ResponseEvent responseEvent = (ResponseEvent)event;
Response response = responseEvent.getResponse();
this.dialog = responseEvent.getDialog();
- this.type = "response";
+ this.type = EventResultType.response;
if (response != null) {
this.msg = response.getReasonPhrase();
this.statusCode = response.getStatusCode();
@@ -85,28 +101,28 @@ public class SipSubscribe {
}else if (event instanceof TimeoutEvent) {
TimeoutEvent timeoutEvent = (TimeoutEvent)event;
- this.type = "timeout";
+ this.type = EventResultType.timeout;
this.msg = "消息超时未回复";
this.statusCode = -1024;
this.dialog = timeoutEvent.getClientTransaction().getDialog();
this.callId = this.dialog != null?timeoutEvent.getClientTransaction().getDialog().getCallId().getCallId(): null;
}else if (event instanceof TransactionTerminatedEvent) {
TransactionTerminatedEvent transactionTerminatedEvent = (TransactionTerminatedEvent)event;
- this.type = "transactionTerminated";
+ this.type = EventResultType.transactionTerminated;
this.msg = "事务已结束";
this.statusCode = -1024;
this.callId = transactionTerminatedEvent.getClientTransaction().getDialog().getCallId().getCallId();
this.dialog = transactionTerminatedEvent.getClientTransaction().getDialog();
}else if (event instanceof DialogTerminatedEvent) {
DialogTerminatedEvent dialogTerminatedEvent = (DialogTerminatedEvent)event;
- this.type = "dialogTerminated";
+ this.type = EventResultType.dialogTerminated;
this.msg = "会话已结束";
this.statusCode = -1024;
this.callId = dialogTerminatedEvent.getDialog().getCallId().getCallId();
this.dialog = dialogTerminatedEvent.getDialog();
}else if (event instanceof DeviceNotFoundEvent) {
DeviceNotFoundEvent deviceNotFoundEvent = (DeviceNotFoundEvent)event;
- this.type = "deviceNotFoundEvent";
+ this.type = EventResultType.deviceNotFoundEvent;
this.msg = "设备未找到";
this.statusCode = -1024;
this.dialog = deviceNotFoundEvent.getDialog();
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/event/platformKeepaliveExpire/PlatformKeepaliveExpireEvent.java b/src/main/java/com/genersoft/iot/vmp/gb28181/event/platformKeepaliveExpire/PlatformKeepaliveExpireEvent.java
deleted file mode 100644
index 1e9a2c4b4..000000000
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/event/platformKeepaliveExpire/PlatformKeepaliveExpireEvent.java
+++ /dev/null
@@ -1,28 +0,0 @@
-package com.genersoft.iot.vmp.gb28181.event.platformKeepaliveExpire;
-
-import org.springframework.context.ApplicationEvent;
-
-/**
- * 平台心跳超时事件
- */
-public class PlatformKeepaliveExpireEvent extends ApplicationEvent {
-
- /**
- * Add default serial version ID
- */
- private static final long serialVersionUID = 1L;
-
- private String platformGbID;
-
- public PlatformKeepaliveExpireEvent(Object source) {
- super(source);
- }
-
- public String getPlatformGbID() {
- return platformGbID;
- }
-
- public void setPlatformGbID(String platformGbID) {
- this.platformGbID = platformGbID;
- }
-}
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/event/platformKeepaliveExpire/PlatformKeepaliveExpireEventLister.java b/src/main/java/com/genersoft/iot/vmp/gb28181/event/platformKeepaliveExpire/PlatformKeepaliveExpireEventLister.java
deleted file mode 100644
index 67b297c3c..000000000
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/event/platformKeepaliveExpire/PlatformKeepaliveExpireEventLister.java
+++ /dev/null
@@ -1,87 +0,0 @@
-package com.genersoft.iot.vmp.gb28181.event.platformKeepaliveExpire;
-
-import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
-import com.genersoft.iot.vmp.gb28181.bean.ParentPlatformCatch;
-import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
-import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
-import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
-import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
-import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
-import org.jetbrains.annotations.NotNull;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.context.ApplicationListener;
-import org.springframework.stereotype.Component;
-
-import javax.sip.message.Response;
-
-/**
- * @description: 平台心跳超时事件
- * @author: panll
- * @date: 2020年11月5日 10:00
- */
-@Component
-public class PlatformKeepaliveExpireEventLister implements ApplicationListener {
-
-
- private final static Logger logger = LoggerFactory.getLogger(PlatformKeepaliveExpireEventLister.class);
-
- @Autowired
- private IVideoManagerStorage storager;
-
- @Autowired
- private IRedisCatchStorage redisCatchStorage;
-
- @Autowired
- private ISIPCommanderForPlatform sipCommanderForPlatform;
-
- @Autowired
- private SipSubscribe sipSubscribe;
-
- @Autowired
- private EventPublisher publisher;
-
- @Override
- public void onApplicationEvent(@NotNull PlatformKeepaliveExpireEvent event) {
-
- if (logger.isDebugEnabled()) {
- logger.debug("平台心跳到期事件事件触发,平台国标ID:" + event.getPlatformGbID());
- }
- ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(event.getPlatformGbID());
- ParentPlatformCatch parentPlatformCatch = redisCatchStorage.queryPlatformCatchInfo(event.getPlatformGbID());
- if (parentPlatformCatch == null) {
- return;
- }
- if (parentPlatform == null) {
- logger.debug("平台心跳到期事件事件触发,但平台已经删除!!! 平台国标ID:" + event.getPlatformGbID());
- return;
- }
- parentPlatformCatch.setParentPlatform(parentPlatform);
- // 发送心跳
- if (parentPlatformCatch.getKeepAliveReply() >= 3) {
- // 有3次未收到心跳回复, 设置平台状态为离线, 开始重新注册
- logger.warn("有3次未收到心跳回复,标记设置平台状态为离线, 并重新注册 平台国标ID:" + event.getPlatformGbID());
- storager.updateParentPlatformStatus(event.getPlatformGbID(), false);
- publisher.platformNotRegisterEventPublish(event.getPlatformGbID());
- parentPlatformCatch.setKeepAliveReply(0);
- redisCatchStorage.updatePlatformCatchInfo(parentPlatformCatch);
- }else {
- // 再次发送心跳
- String callId = sipCommanderForPlatform.keepalive(parentPlatform);
-
- parentPlatformCatch.setKeepAliveReply( parentPlatformCatch.getKeepAliveReply() + 1);
- // 存储心跳信息, 并设置状态为未回复, 如果多次过期仍未收到回复,则认为上级平台已经离线
- redisCatchStorage.updatePlatformKeepalive(parentPlatform);
- redisCatchStorage.updatePlatformCatchInfo(parentPlatformCatch);
-
- sipSubscribe.addOkSubscribe(callId, (SipSubscribe.EventResult eventResult) ->{
- if (eventResult.statusCode == Response.OK) {
- // 收到心跳响应信息,
- parentPlatformCatch.setKeepAliveReply(0);
- redisCatchStorage.updatePlatformCatchInfo(parentPlatformCatch);
- }
- } );
- }
- }
-}
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/event/platformNotRegister/PlatformCycleRegisterEvent.java b/src/main/java/com/genersoft/iot/vmp/gb28181/event/platformNotRegister/PlatformCycleRegisterEvent.java
deleted file mode 100644
index c2ff61f34..000000000
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/event/platformNotRegister/PlatformCycleRegisterEvent.java
+++ /dev/null
@@ -1,24 +0,0 @@
-package com.genersoft.iot.vmp.gb28181.event.platformNotRegister;
-
-import org.springframework.context.ApplicationEvent;
-
-public class PlatformCycleRegisterEvent extends ApplicationEvent {
- /**
- * Add default serial version ID
- */
- private static final long serialVersionUID = 1L;
-
- private String platformGbID;
-
- public String getPlatformGbID() {
- return platformGbID;
- }
-
- public void setPlatformGbID(String platformGbID) {
- this.platformGbID = platformGbID;
- }
-
- public PlatformCycleRegisterEvent(Object source) {
- super(source);
- }
-}
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/event/platformNotRegister/PlatformCycleRegisterEventLister.java b/src/main/java/com/genersoft/iot/vmp/gb28181/event/platformNotRegister/PlatformCycleRegisterEventLister.java
deleted file mode 100644
index d2a9246f4..000000000
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/event/platformNotRegister/PlatformCycleRegisterEventLister.java
+++ /dev/null
@@ -1,46 +0,0 @@
-package com.genersoft.iot.vmp.gb28181.event.platformNotRegister;
-
-import com.genersoft.iot.vmp.conf.DynamicTask;
-import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
-import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
-import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
-import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.context.ApplicationListener;
-import org.springframework.stereotype.Component;
-
-import java.util.Timer;
-import java.util.TimerTask;
-
-@Component
-public class PlatformCycleRegisterEventLister implements ApplicationListener {
-
- private final static Logger logger = LoggerFactory.getLogger(PlatformCycleRegisterEventLister.class);
-
- @Autowired
- private IVideoManagerStorage storager;
- @Autowired
- private ISIPCommanderForPlatform sipCommanderFroPlatform;
- @Autowired
- private DynamicTask dynamicTask;
-
- @Override
- public void onApplicationEvent(PlatformCycleRegisterEvent event) {
- logger.info("上级平台周期注册事件");
- ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(event.getPlatformGbID());
- if (parentPlatform == null) {
- logger.info("[ 平台未注册事件 ] 平台已经删除!!! 平台国标ID:" + event.getPlatformGbID());
- return;
- }
- String taskKey = "platform-cycle-register" + parentPlatform.getServerGBId();;
- SipSubscribe.Event okEvent = (responseEvent)->{
- dynamicTask.stop(taskKey);
- };
- dynamicTask.startCron(taskKey, ()->{
- logger.info("[平台注册]再次向平台注册,平台国标ID:" + event.getPlatformGbID());
- sipCommanderFroPlatform.register(parentPlatform, null, okEvent);
- }, Integer.parseInt(parentPlatform.getExpires())* 1000);
- }
-}
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/event/platformNotRegister/PlatformNotRegisterEvent.java b/src/main/java/com/genersoft/iot/vmp/gb28181/event/platformNotRegister/PlatformNotRegisterEvent.java
deleted file mode 100644
index c9369754a..000000000
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/event/platformNotRegister/PlatformNotRegisterEvent.java
+++ /dev/null
@@ -1,25 +0,0 @@
-package com.genersoft.iot.vmp.gb28181.event.platformNotRegister;
-
-import org.springframework.context.ApplicationEvent;
-
-public class PlatformNotRegisterEvent extends ApplicationEvent {
-
- /**
- * Add default serial version ID
- */
- private static final long serialVersionUID = 1L;
-
- private String platformGbID;
-
- public PlatformNotRegisterEvent(Object source) {
- super(source);
- }
-
- public String getPlatformGbID() {
- return platformGbID;
- }
-
- public void setPlatformGbID(String platformGbID) {
- this.platformGbID = platformGbID;
- }
-}
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/event/platformNotRegister/PlatformNotRegisterEventLister.java b/src/main/java/com/genersoft/iot/vmp/gb28181/event/platformNotRegister/PlatformNotRegisterEventLister.java
deleted file mode 100644
index 56bdeb58b..000000000
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/event/platformNotRegister/PlatformNotRegisterEventLister.java
+++ /dev/null
@@ -1,90 +0,0 @@
-package com.genersoft.iot.vmp.gb28181.event.platformNotRegister;
-
-import com.genersoft.iot.vmp.conf.DynamicTask;
-import com.genersoft.iot.vmp.conf.SipConfig;
-import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
-import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
-import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
-import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform;
-import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory;
-import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
-import com.genersoft.iot.vmp.service.IMediaServerService;
-import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
-import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.context.ApplicationListener;
-import org.springframework.stereotype.Component;
-
-import java.util.*;
-
-/**
- * @description: 平台未注册事件,来源有二:
- * 1、平台新添加
- * 2、平台心跳超时
- * @author: panll
- * @date: 2020年11月24日 10:00
- */
-@Component
-public class PlatformNotRegisterEventLister implements ApplicationListener {
-
- private final static Logger logger = LoggerFactory.getLogger(PlatformNotRegisterEventLister.class);
-
- @Autowired
- private IVideoManagerStorage storager;
- @Autowired
- private IRedisCatchStorage redisCatchStorage;
- @Autowired
- private IMediaServerService mediaServerService;
-
- @Autowired
- private SIPCommanderFroPlatform sipCommanderFroPlatform;
-
- @Autowired
- private ZLMRTPServerFactory zlmrtpServerFactory;
-
- @Autowired
- private SipConfig config;
-
- @Autowired
- private DynamicTask dynamicTask;
-
- // @Autowired
- // private RedisUtil redis;
-
- @Override
- public void onApplicationEvent(PlatformNotRegisterEvent event) {
-
- logger.info("[ 平台未注册事件 ]平台国标ID:" + event.getPlatformGbID());
-
- ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(event.getPlatformGbID());
- if (parentPlatform == null) {
- logger.info("[ 平台未注册事件 ] 平台已经删除!!! 平台国标ID:" + event.getPlatformGbID());
- return;
- }
- // 查询是否有推流, 如果有则都停止
- List sendRtpItems = redisCatchStorage.querySendRTPServer(event.getPlatformGbID());
- if (sendRtpItems != null && sendRtpItems.size() > 0) {
- logger.info("[ 平台未注册事件 ] 停止[ {} ]的所有推流", event.getPlatformGbID());
- for (SendRtpItem sendRtpItem : sendRtpItems) {
- redisCatchStorage.deleteSendRTPServer(event.getPlatformGbID(), sendRtpItem.getChannelId(), null, null);
- MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
- Map param = new HashMap<>();
- param.put("vhost", "__defaultVhost__");
- param.put("app", sendRtpItem.getApp());
- param.put("stream", sendRtpItem.getStreamId());
- zlmrtpServerFactory.stopSendRtpStream(mediaInfo, param);
- }
-
- }
- String taskKey = "platform-not-register-" + parentPlatform.getServerGBId();
- SipSubscribe.Event okEvent = (responseEvent)->{
- dynamicTask.stop(taskKey);
- };
- dynamicTask.startCron(taskKey, ()->{
- logger.info("[平台注册]再次向平台注册,平台国标ID:" + event.getPlatformGbID());
- sipCommanderFroPlatform.register(parentPlatform, null, okEvent);
- }, config.getRegisterTimeInterval()* 1000);
- }
-}
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEventLister.java b/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEventLister.java
index 79bb4cad1..734d60077 100644
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEventLister.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEventLister.java
@@ -30,23 +30,10 @@ public class CatalogEventLister implements ApplicationListener {
@Autowired
private IVideoManagerStorage storager;
- @Autowired
- private IRedisCatchStorage redisCatchStorage;
- @Autowired
- private IMediaServerService mediaServerService;
@Autowired
private SIPCommanderFroPlatform sipCommanderFroPlatform;
- @Autowired
- private ZLMRTPServerFactory zlmrtpServerFactory;
-
- @Autowired
- private SipConfig config;
-
- @Autowired
- private UserSetting userSetting;
-
@Autowired
private IGbStreamService gbStreamService;
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/MobilePositionSubscribeHandlerTask.java b/src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/MobilePositionSubscribeHandlerTask.java
index 7edee4dd7..2ee103763 100644
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/MobilePositionSubscribeHandlerTask.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/MobilePositionSubscribeHandlerTask.java
@@ -60,7 +60,6 @@ public class MobilePositionSubscribeHandlerTask implements ISubscribeTask {
// TODO 暂时只处理视频流的回复,后续增加对国标设备的支持
List gbStreams = storager.queryGbStreamListInPlatform(platform.getServerGBId());
if (gbStreams.size() == 0) {
- logger.info("发送订阅时发现平台已经没有关联的直播流:{}", platform.getServerGBId());
return;
}
for (DeviceChannel deviceChannel : gbStreams) {
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPProcessorObserver.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPProcessorObserver.java
index 319016c9c..13f04b6d4 100644
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPProcessorObserver.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPProcessorObserver.java
@@ -71,7 +71,6 @@ public class SIPProcessorObserver implements ISIPProcessorObserver {
@Override
@Async
public void processRequest(RequestEvent requestEvent) {
- logger.debug("\n收到请求:\n{}", requestEvent.getRequest());
String method = requestEvent.getRequest().getMethod();
ISIPRequestProcessor sipRequestProcessor = requestProcessorMap.get(method);
if (sipRequestProcessor == null) {
@@ -90,7 +89,6 @@ public class SIPProcessorObserver implements ISIPProcessorObserver {
@Async
public void processResponse(ResponseEvent responseEvent) {
Response response = responseEvent.getResponse();
- logger.debug("\n收到响应:\n{}", responseEvent.getResponse());
int status = response.getStatusCode();
if (((status >= 200) && (status < 300)) || status == Response.UNAUTHORIZED) { // Success!
@@ -114,7 +112,7 @@ public class SIPProcessorObserver implements ISIPProcessorObserver {
} else if ((status >= 100) && (status < 200)) {
// 增加其它无需回复的响应,如101、180等
} else {
- logger.warn("接收到失败的response响应!status:" + status + ",message:" + response.getReasonPhrase()/* .getContent().toString()*/);
+ logger.warn("接收到失败的response响应!status:" + status + ",message:" + response.getReasonPhrase());
if (responseEvent.getResponse() != null && sipSubscribe.getErrorSubscribesSize() > 0 ) {
CallIdHeader callIdHeader = (CallIdHeader)responseEvent.getResponse().getHeader(CallIdHeader.NAME);
if (callIdHeader != null) {
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommander.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommander.java
index 154071e45..b5cc51421 100644
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommander.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommander.java
@@ -3,7 +3,7 @@ package com.genersoft.iot.vmp.gb28181.transmit.cmd;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
-import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe;
+import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.service.bean.SSRCInfo;
import gov.nist.javax.sip.message.SIPRequest;
@@ -98,7 +98,7 @@ public interface ISIPCommander {
* @param device 视频设备
* @param channelId 预览通道
*/
- void playStreamCmd(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId, ZLMHttpHookSubscribe.Event event, SipSubscribe.Event okEvent, SipSubscribe.Event errorEvent);
+ void playStreamCmd(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId, ZlmHttpHookSubscribe.Event event, SipSubscribe.Event okEvent, SipSubscribe.Event errorEvent);
/**
* 请求回放视频流
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommanderForPlatform.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommanderForPlatform.java
index d000f5afb..351505f39 100644
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommanderForPlatform.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommanderForPlatform.java
@@ -15,7 +15,7 @@ public interface ISIPCommanderForPlatform {
* @return
*/
boolean register(ParentPlatform parentPlatform, SipSubscribe.Event errorEvent , SipSubscribe.Event okEvent);
- boolean register(ParentPlatform parentPlatform, String callId, WWWAuthenticateHeader www, SipSubscribe.Event errorEvent , SipSubscribe.Event okEvent, boolean registerAgain);
+ boolean register(ParentPlatform parentPlatform, String callId, WWWAuthenticateHeader www, SipSubscribe.Event errorEvent , SipSubscribe.Event okEvent, boolean registerAgain, boolean isRegister);
/**
* 向上级平台注销
@@ -30,7 +30,7 @@ public interface ISIPCommanderForPlatform {
* @param parentPlatform
* @return callId(作为接受回复的判定)
*/
- String keepalive(ParentPlatform parentPlatform);
+ String keepalive(ParentPlatform parentPlatform,SipSubscribe.Event errorEvent , SipSubscribe.Event okEvent);
/**
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderPlarformProvider.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderPlarformProvider.java
index ad8043f15..a75e806cb 100644
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderPlarformProvider.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderPlarformProvider.java
@@ -3,6 +3,7 @@ package com.genersoft.iot.vmp.gb28181.transmit.cmd;
import com.genersoft.iot.vmp.conf.SipConfig;
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
import com.genersoft.iot.vmp.gb28181.bean.SubscribeInfo;
+import com.genersoft.iot.vmp.gb28181.utils.HeaderUtils;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import gov.nist.javax.sip.message.MessageFactoryImpl;
import org.springframework.beans.factory.annotation.Autowired;
@@ -75,7 +76,7 @@ public class SIPRequestHeaderPlarformProvider {
}
- public Request createRegisterRequest(@NotNull ParentPlatform platform, long CSeq, String fromTag, String viaTag, CallIdHeader callIdHeader) throws ParseException, InvalidArgumentException, PeerUnavailableException {
+ public Request createRegisterRequest(@NotNull ParentPlatform platform, long CSeq, String fromTag, String viaTag, CallIdHeader callIdHeader, boolean isRegister) throws ParseException, InvalidArgumentException, PeerUnavailableException {
Request request = null;
String sipAddress = sipConfig.getIp() + ":" + sipConfig.getPort();
//请求行
@@ -109,18 +110,20 @@ public class SIPRequestHeaderPlarformProvider {
.createSipURI(platform.getDeviceGBId(), sipAddress));
request.addHeader(sipFactory.createHeaderFactory().createContactHeader(concatAddress));
- ExpiresHeader expires = sipFactory.createHeaderFactory().createExpiresHeader(Integer.parseInt(platform.getExpires()));
+ ExpiresHeader expires = sipFactory.createHeaderFactory().createExpiresHeader(isRegister ? platform.getExpires() : 0);
request.addHeader(expires);
+ UserAgentHeader userAgentHeader = HeaderUtils.createUserAgentHeader(sipFactory);
+ request.addHeader(userAgentHeader);
return request;
}
public Request createRegisterRequest(@NotNull ParentPlatform parentPlatform, String fromTag, String viaTag,
- String callId, WWWAuthenticateHeader www , CallIdHeader callIdHeader) throws ParseException, PeerUnavailableException, InvalidArgumentException {
+ String callId, WWWAuthenticateHeader www , CallIdHeader callIdHeader, boolean isRegister) throws ParseException, PeerUnavailableException, InvalidArgumentException {
- Request registerRequest = createRegisterRequest(parentPlatform, redisCatchStorage.getCSEQ(), fromTag, viaTag, callIdHeader);
+ Request registerRequest = createRegisterRequest(parentPlatform, redisCatchStorage.getCSEQ(), fromTag, viaTag, callIdHeader, isRegister);
SipURI requestURI = sipFactory.createAddressFactory().createSipURI(parentPlatform.getServerGBId(), parentPlatform.getServerIP() + ":" + parentPlatform.getServerPort());
if (www == null) {
AuthorizationHeader authorizationHeader = sipFactory.createHeaderFactory().createAuthorizationHeader("Digest");
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderProvider.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderProvider.java
index b89fd8ea5..aee6d4e05 100644
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderProvider.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderProvider.java
@@ -12,6 +12,7 @@ import javax.sip.message.Request;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
+import com.genersoft.iot.vmp.gb28181.utils.HeaderUtils;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import gov.nist.javax.sip.SipProviderImpl;
import gov.nist.javax.sip.SipStackImpl;
@@ -266,15 +267,7 @@ public class SIPRequestHeaderProvider {
Address concatAddress = sipFactory.createAddressFactory().createAddress(sipFactory.createAddressFactory()
.createSipURI(sipConfig.getId(), sipConfig.getIp() + ":" + sipConfig.getPort()));
infoRequest.addHeader(sipFactory.createHeaderFactory().createContactHeader(concatAddress));
- List agentParam = new ArrayList<>();
- agentParam.add("wvp-pro");
- // TODO 添加版本信息以及日期
- UserAgentHeader userAgentHeader = null;
- try {
- userAgentHeader = sipFactory.createHeaderFactory().createUserAgentHeader(agentParam);
- } catch (ParseException e) {
- throw new RuntimeException(e);
- }
+ UserAgentHeader userAgentHeader = HeaderUtils.createUserAgentHeader(sipFactory);
infoRequest.addHeader(userAgentHeader);
ContentTypeHeader contentTypeHeader = sipFactory.createHeaderFactory().createContentTypeHeader("Application",
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java
index 811637ad8..9c31e2a15 100644
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java
@@ -10,12 +10,12 @@ import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.SIPRequestHeaderProvider;
+import com.genersoft.iot.vmp.gb28181.utils.HeaderUtils;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange;
-import com.genersoft.iot.vmp.media.zlm.dto.HookType;
import com.genersoft.iot.vmp.utils.DateUtil;
import com.genersoft.iot.vmp.gb28181.utils.NumericUtil;
-import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe;
+import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.service.IMediaServerService;
import com.genersoft.iot.vmp.service.bean.SSRCInfo;
@@ -33,19 +33,15 @@ import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.DependsOn;
import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils;
-import org.springframework.util.StringUtils;
import javax.sip.*;
import javax.sip.address.Address;
import javax.sip.address.SipURI;
-import javax.sip.address.URI;
import javax.sip.header.*;
import javax.sip.message.Request;
import java.lang.reflect.Field;
import java.text.ParseException;
-import java.util.ArrayList;
import java.util.HashSet;
-import java.util.List;
/**
* @description:设备能力接口,用于定义设备的控制、查询能力
@@ -88,7 +84,7 @@ public class SIPCommander implements ISIPCommander {
private UserSetting userSetting;
@Autowired
- private ZLMHttpHookSubscribe subscribe;
+ private ZlmHttpHookSubscribe subscribe;
@Autowired
private SipSubscribe sipSubscribe;
@@ -351,7 +347,7 @@ public class SIPCommander implements ISIPCommander {
*/
@Override
public void playStreamCmd(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId,
- ZLMHttpHookSubscribe.Event event, SipSubscribe.Event okEvent, SipSubscribe.Event errorEvent) {
+ ZlmHttpHookSubscribe.Event event, SipSubscribe.Event okEvent, SipSubscribe.Event errorEvent) {
String stream = ssrcInfo.getStream();
try {
if (device == null) {
@@ -640,7 +636,7 @@ public class SIPCommander implements ISIPCommander {
hookEvent.call(new InviteStreamInfo(mediaServerItem, json, callIdHeader.getCallId(), "rtp", ssrcInfo.getStream()));
subscribe.removeSubscribe(hookSubscribe);
hookSubscribe.getContent().put("regist", false);
- hookSubscribe.getContent().put("schema", "rtmp");
+ hookSubscribe.getContent().put("schema", "rtsp");
// 添加流注销的订阅,注销了后向设备发送bye
subscribe.addSubscribe(hookSubscribe,
(MediaServerItem mediaServerItemForEnd, JSONObject jsonForEnd)->{
@@ -780,15 +776,7 @@ public class SIPCommander implements ISIPCommander {
// 增加Contact header
Address concatAddress = sipFactory.createAddressFactory().createAddress(sipFactory.createAddressFactory().createSipURI(sipConfig.getId(), sipConfig.getIp()+":"+sipConfig.getPort()));
byeRequest.addHeader(sipFactory.createHeaderFactory().createContactHeader(concatAddress));
- List agentParam = new ArrayList<>();
- agentParam.add("wvp-pro");
- // TODO 添加版本信息以及日期
- UserAgentHeader userAgentHeader = null;
- try {
- userAgentHeader = sipFactory.createHeaderFactory().createUserAgentHeader(agentParam);
- } catch (ParseException e) {
- throw new RuntimeException(e);
- }
+ UserAgentHeader userAgentHeader = HeaderUtils.createUserAgentHeader(sipFactory);
byeRequest.addHeader(userAgentHeader);
ClientTransaction clientTransaction = null;
if("TCP".equals(protocol)) {
@@ -1680,14 +1668,11 @@ public class SIPCommander implements ISIPCommander {
clientTransaction = udpSipProvider.getNewClientTransaction(request);
}
if (request.getHeader(UserAgentHeader.NAME) == null) {
- List agentParam = new ArrayList<>();
- agentParam.add("wvp-pro");
- // TODO 添加版本信息以及日期
UserAgentHeader userAgentHeader = null;
try {
- userAgentHeader = sipFactory.createHeaderFactory().createUserAgentHeader(agentParam);
+ userAgentHeader = HeaderUtils.createUserAgentHeader(sipFactory);
} catch (ParseException e) {
- throw new RuntimeException(e);
+ logger.error("添加UserAgentHeader失败", e);
}
request.addHeader(userAgentHeader);
}
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java
index e767b4f53..55a00a82e 100644
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java
@@ -4,6 +4,8 @@ import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.SIPRequestHeaderPlarformProvider;
+import com.genersoft.iot.vmp.storager.dao.dto.PlatformRegisterInfo;
+import com.genersoft.iot.vmp.utils.DateUtil;
import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.service.IMediaServerService;
@@ -75,28 +77,21 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform {
@Override
public boolean register(ParentPlatform parentPlatform, SipSubscribe.Event errorEvent , SipSubscribe.Event okEvent) {
- return register(parentPlatform, null, null, errorEvent, okEvent, false);
+ return register(parentPlatform, null, null, errorEvent, okEvent, false, true);
}
@Override
public boolean unregister(ParentPlatform parentPlatform, SipSubscribe.Event errorEvent , SipSubscribe.Event okEvent) {
- ParentPlatformCatch parentPlatformCatch = redisCatchStorage.queryPlatformCatchInfo(parentPlatform.getServerGBId());
- parentPlatform.setExpires("0");
- if (parentPlatformCatch != null) {
- parentPlatformCatch.setParentPlatform(parentPlatform);
- redisCatchStorage.updatePlatformCatchInfo(parentPlatformCatch);
- }
- return register(parentPlatform, null, null, errorEvent, okEvent, false);
+ return register(parentPlatform, null, null, errorEvent, okEvent, false, false);
}
@Override
public boolean register(ParentPlatform parentPlatform, @Nullable String callId, @Nullable WWWAuthenticateHeader www,
- SipSubscribe.Event errorEvent , SipSubscribe.Event okEvent, boolean registerAgain) {
+ SipSubscribe.Event errorEvent , SipSubscribe.Event okEvent, boolean registerAgain, boolean isRegister) {
try {
Request request;
String tm = Long.toString(System.currentTimeMillis());
if (!registerAgain ) {
- // //callid
CallIdHeader callIdHeader = null;
if(parentPlatform.getTransport().equals("TCP")) {
callIdHeader = tcpSipProvider.getNewCallId();
@@ -107,10 +102,10 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform {
request = headerProviderPlarformProvider.createRegisterRequest(parentPlatform,
redisCatchStorage.getCSEQ(), "FromRegister" + tm,
- "z9hG4bK-" + UUID.randomUUID().toString().replace("-", ""), callIdHeader);
+ "z9hG4bK-" + UUID.randomUUID().toString().replace("-", ""), callIdHeader, isRegister);
// 将 callid 写入缓存, 等注册成功可以更新状态
String callIdFromHeader = callIdHeader.getCallId();
- redisCatchStorage.updatePlatformRegisterInfo(callIdFromHeader, parentPlatform.getServerGBId());
+ redisCatchStorage.updatePlatformRegisterInfo(callIdFromHeader, PlatformRegisterInfo.getInstance(parentPlatform.getServerGBId(), isRegister));
sipSubscribe.addErrorSubscribe(callIdHeader.getCallId(), (event)->{
if (event != null) {
@@ -127,7 +122,7 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform {
}else {
CallIdHeader callIdHeader = parentPlatform.getTransport().equals("TCP") ? tcpSipProvider.getNewCallId()
: udpSipProvider.getNewCallId();
- request = headerProviderPlarformProvider.createRegisterRequest(parentPlatform, "FromRegister" + tm, null, callId, www, callIdHeader);
+ request = headerProviderPlarformProvider.createRegisterRequest(parentPlatform, "FromRegister" + tm, null, callId, www, callIdHeader, isRegister);
}
transmitRequest(parentPlatform, request, null, okEvent);
@@ -145,7 +140,7 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform {
}
@Override
- public String keepalive(ParentPlatform parentPlatform) {
+ public String keepalive(ParentPlatform parentPlatform,SipSubscribe.Event errorEvent , SipSubscribe.Event okEvent) {
String callId = null;
try {
String characterSet = parentPlatform.getCharacterSet();
@@ -168,7 +163,7 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform {
UUID.randomUUID().toString().replace("-", ""),
null,
callIdHeader);
- transmitRequest(parentPlatform, request);
+ transmitRequest(parentPlatform, request, errorEvent, okEvent);
callId = callIdHeader.getCallId();
} catch (ParseException | InvalidArgumentException | SipException e) {
e.printStackTrace();
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/SIPRequestProcessorParent.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/SIPRequestProcessorParent.java
index 8f3ba0a23..8977d8a70 100644
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/SIPRequestProcessorParent.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/SIPRequestProcessorParent.java
@@ -59,6 +59,9 @@ public abstract class SIPRequestProcessorParent {
public ServerTransaction getServerTransaction(RequestEvent evt) {
Request request = evt.getRequest();
ServerTransaction serverTransaction = evt.getServerTransaction();
+ if (serverTransaction != null) {
+ System.out.println(serverTransaction.getState().toString());
+ }
// 判断TCP还是UDP
boolean isTcp = false;
ViaHeader reqViaHeader = (ViaHeader) request.getHeader(ViaHeader.NAME);
@@ -86,6 +89,8 @@ public abstract class SIPRequestProcessorParent {
logger.error(e.getMessage());
} catch (TransactionUnavailableException e) {
logger.error(e.getMessage());
+ }finally {
+
}
}
return serverTransaction;
@@ -182,6 +187,10 @@ public abstract class SIPRequestProcessorParent {
sipFactory.createAddressFactory().createSipURI(sipURI.getUser(), sipURI.getHost()+":"+sipURI.getPort()
));
response.addHeader(sipFactory.createHeaderFactory().createContactHeader(concatAddress));
+ ServerTransaction serverTransaction = getServerTransaction(evt);
+ if (serverTransaction == null) {
+
+ }
getServerTransaction(evt).sendResponse(response);
}
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java
index 6eb4bc959..2b7fec2ca 100644
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java
@@ -4,12 +4,14 @@ import com.alibaba.fastjson.JSONObject;
import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.session.AudioBroadcastManager;
+import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
+import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
-import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe;
+import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.service.IMediaServerService;
@@ -66,7 +68,7 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In
private IMediaServerService mediaServerService;
@Autowired
- private ZLMHttpHookSubscribe subscribe;
+ private ZlmHttpHookSubscribe subscribe;
@Autowired
private DynamicTask dynamicTask;
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
index cb7faf249..cf413883c 100644
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
@@ -19,7 +19,7 @@ import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
import com.genersoft.iot.vmp.gb28181.utils.SipUtils;
-import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe;
+import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
import com.genersoft.iot.vmp.media.zlm.ZLMMediaListManager;
import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils;
import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory;
@@ -50,7 +50,6 @@ import org.springframework.stereotype.Component;
import javax.sdp.*;
import javax.sip.*;
-import javax.sip.address.SipURI;
import javax.sip.header.CallIdHeader;
import javax.sip.message.Request;
import javax.sip.message.Response;
@@ -337,7 +336,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
Long finalStartTime = startTime;
Long finalStopTime = stopTime;
- ZLMHttpHookSubscribe.Event hookEvent = (mediaServerItemInUSe, responseJSON) -> {
+ ZlmHttpHookSubscribe.Event hookEvent = (mediaServerItemInUSe, responseJSON) -> {
String app = responseJSON.getString("app");
String stream = responseJSON.getString("stream");
logger.info("[上级点播]下级已经开始推流。 回复200OK(SDP), {}/{}", app, stream);
@@ -440,6 +439,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
streamId = String.format("%s_%s", device.getDeviceId(), channelId);
}
SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId, null, device.isSsrcCheck(), false);
+ logger.info(JSONObject.toJSONString(ssrcInfo));
sendRtpItem.setStreamId(ssrcInfo.getStream());
// 写入redis, 超时时回复
redisCatchStorage.updateSendRTPSever(sendRtpItem);
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/SubscribeRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/SubscribeRequestProcessor.java
index 13cc1b662..6ce2ce053 100644
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/SubscribeRequestProcessor.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/SubscribeRequestProcessor.java
@@ -180,7 +180,6 @@ public class SubscribeRequestProcessor extends SIPRequestProcessorParent impleme
private void processNotifyCatalogList(RequestEvent evt, Element rootElement) throws SipException {
- System.out.println(evt.getRequest().toString());
String platformId = SipUtils.getUserIdFromFromHeader(evt.getRequest());
String deviceId = XmlUtil.getText(rootElement, "DeviceID");
ParentPlatform platform = storager.queryParentPlatByServerGBId(platformId);
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/AlarmNotifyMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/AlarmNotifyMessageHandler.java
index 4ea5d92b9..98fd7a7c1 100644
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/AlarmNotifyMessageHandler.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/AlarmNotifyMessageHandler.java
@@ -164,7 +164,7 @@ public class AlarmNotifyMessageHandler extends SIPRequestProcessorParent impleme
}
}
- if (channelId.equals(sipConfig.getId())) {
+ if ("7".equals(deviceAlarm.getAlarmMethod()) ) {
// 发送给平台的报警信息。 发送redis通知
AlarmChannelMessage alarmChannelMessage = new AlarmChannelMessage();
alarmChannelMessage.setAlarmSn(Integer.parseInt(deviceAlarm.getAlarmMethod()));
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/InviteResponseProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/InviteResponseProcessor.java
index 04a11b930..1a396353b 100644
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/InviteResponseProcessor.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/InviteResponseProcessor.java
@@ -6,6 +6,7 @@ import com.genersoft.iot.vmp.gb28181.bean.SsrcTransaction;
import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver;
import com.genersoft.iot.vmp.gb28181.transmit.event.response.SIPResponseProcessorAbstract;
+import com.genersoft.iot.vmp.gb28181.utils.HeaderUtils;
import gov.nist.javax.sip.ResponseEventExt;
import gov.nist.javax.sip.message.SIPResponse;
import gov.nist.javax.sip.stack.SIPDialog;
@@ -103,15 +104,7 @@ public class InviteResponseProcessor extends SIPResponseProcessorAbstract {
}
requestURI.setPort(event.getRemotePort());
reqAck.setRequestURI(requestURI);
- List agentParam = new ArrayList<>();
- agentParam.add("wvp-pro");
- // TODO 添加版本信息以及日期
- UserAgentHeader userAgentHeader = null;
- try {
- userAgentHeader = sipFactory.createHeaderFactory().createUserAgentHeader(agentParam);
- } catch (ParseException e) {
- throw new RuntimeException(e);
- }
+ UserAgentHeader userAgentHeader = HeaderUtils.createUserAgentHeader(sipFactory);
reqAck.addHeader(userAgentHeader);
Address concatAddress = sipFactory.createAddressFactory().createAddress(sipFactory.createAddressFactory().createSipURI(sipConfig.getId(), sipConfig.getIp()+":"+sipConfig.getPort()));
reqAck.addHeader(sipFactory.createHeaderFactory().createContactHeader(concatAddress));
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/RegisterResponseProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/RegisterResponseProcessor.java
index a48dd203b..a5cddaedd 100644
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/RegisterResponseProcessor.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/RegisterResponseProcessor.java
@@ -6,8 +6,10 @@ import com.genersoft.iot.vmp.gb28181.bean.SubscribeHolder;
import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
import com.genersoft.iot.vmp.gb28181.transmit.event.response.SIPResponseProcessorAbstract;
+import com.genersoft.iot.vmp.service.IPlatformService;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
+import com.genersoft.iot.vmp.storager.dao.dto.PlatformRegisterInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -44,6 +46,9 @@ public class RegisterResponseProcessor extends SIPResponseProcessorAbstract {
@Autowired
private SubscribeHolder subscribeHolder;
+ @Autowired
+ private IPlatformService platformService;
+
@Override
public void afterPropertiesSet() throws Exception {
// 添加消息处理的订阅
@@ -60,48 +65,39 @@ public class RegisterResponseProcessor extends SIPResponseProcessorAbstract {
Response response = evt.getResponse();
CallIdHeader callIdHeader = (CallIdHeader) response.getHeader(CallIdHeader.NAME);
String callId = callIdHeader.getCallId();
-
- String platformGBId = redisCatchStorage.queryPlatformRegisterInfo(callId);
- if (platformGBId == null) {
- logger.info(String.format("未找到callId: %s 的注册/注销平台id", callId ));
+ PlatformRegisterInfo platformRegisterInfo = redisCatchStorage.queryPlatformRegisterInfo(callId);
+ if (platformRegisterInfo == null) {
+ logger.info(String.format("[国标级联]未找到callId: %s 的注册/注销平台id", callId ));
return;
}
- ParentPlatformCatch parentPlatformCatch = redisCatchStorage.queryPlatformCatchInfo(platformGBId);
+ ParentPlatformCatch parentPlatformCatch = redisCatchStorage.queryPlatformCatchInfo(platformRegisterInfo.getPlatformId());
if (parentPlatformCatch == null) {
- logger.warn(String.format("[收到注册/注销%S请求]平台:%s,但是平台缓存信息未查询到!!!", response.getStatusCode(),platformGBId));
+ logger.warn(String.format("[国标级联]收到注册/注销%S请求,平台:%s,但是平台缓存信息未查询到!!!", response.getStatusCode(),platformRegisterInfo.getPlatformId()));
return;
}
- String action = parentPlatformCatch.getParentPlatform().getExpires().equals("0") ? "注销" : "注册";
- logger.info(String.format("[%s %S响应]%s ", action, response.getStatusCode(), platformGBId ));
+
+ String action = platformRegisterInfo.isRegister() ? "注册" : "注销";
+ logger.info(String.format("[国标级联]%s %S响应,%s ", action, response.getStatusCode(), platformRegisterInfo.getPlatformId() ));
ParentPlatform parentPlatform = parentPlatformCatch.getParentPlatform();
if (parentPlatform == null) {
- logger.warn(String.format("收到 %s %s的%S请求, 但是平台信息未查询到!!!", platformGBId, action, response.getStatusCode()));
+ logger.warn(String.format("[国标级联]收到 %s %s的%S请求, 但是平台信息未查询到!!!", platformRegisterInfo.getPlatformId(), action, response.getStatusCode()));
return;
}
- if (response.getStatusCode() == 401) {
+ if (response.getStatusCode() == Response.UNAUTHORIZED) {
WWWAuthenticateHeader www = (WWWAuthenticateHeader)response.getHeader(WWWAuthenticateHeader.NAME);
- sipCommanderForPlatform.register(parentPlatform, callId, www, null, null, true);
- }else if (response.getStatusCode() == 200){
- // 注册/注销成功
- logger.info(String.format("%s %s成功", platformGBId, action));
+ sipCommanderForPlatform.register(parentPlatform, callId, www, null, null, true, platformRegisterInfo.isRegister());
+ }else if (response.getStatusCode() == Response.OK){
+
+ if (platformRegisterInfo.isRegister()) {
+ platformService.online(parentPlatform);
+ }else {
+ platformService.offline(parentPlatform);
+ }
+
+ // 注册/注销成功移除缓存的信息
redisCatchStorage.delPlatformRegisterInfo(callId);
- redisCatchStorage.delPlatformCatchInfo(platformGBId);
- // 取回Expires设置,避免注销过程中被置为0
- ParentPlatform parentPlatformTmp = storager.queryParentPlatByServerGBId(platformGBId);
- if (parentPlatformTmp != null) {
- parentPlatformTmp.setStatus("注册".equals(action));
- redisCatchStorage.updatePlatformRegister(parentPlatformTmp);
- redisCatchStorage.updatePlatformKeepalive(parentPlatformTmp);
- parentPlatformCatch.setParentPlatform(parentPlatformTmp);
- }
- redisCatchStorage.updatePlatformCatchInfo(parentPlatformCatch);
- storager.updateParentPlatformStatus(platformGBId, "注册".equals(action));
- if ("注销".equals(action)) {
- subscribeHolder.removeCatalogSubscribe(platformGBId);
- subscribeHolder.removeMobilePositionSubscribe(platformGBId);
- }
}
}
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/utils/HeaderUtils.java b/src/main/java/com/genersoft/iot/vmp/gb28181/utils/HeaderUtils.java
new file mode 100644
index 000000000..86112672a
--- /dev/null
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/utils/HeaderUtils.java
@@ -0,0 +1,22 @@
+package com.genersoft.iot.vmp.gb28181.utils;
+
+import javax.sip.PeerUnavailableException;
+import javax.sip.SipFactory;
+import javax.sip.header.UserAgentHeader;
+import java.text.ParseException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * 生成header的工具类
+ * @author lin
+ */
+public class HeaderUtils {
+
+ public static UserAgentHeader createUserAgentHeader(SipFactory sipFactory) throws PeerUnavailableException, ParseException {
+ List agentParam = new ArrayList<>();
+ agentParam.add("WVP PRO");
+ // TODO 添加版本信息以及日期
+ return sipFactory.createHeaderFactory().createUserAgentHeader(agentParam);
+ }
+}
diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/AssistRESTfulUtils.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/AssistRESTfulUtils.java
index 36ae1b819..2d117543f 100644
--- a/src/main/java/com/genersoft/iot/vmp/media/zlm/AssistRESTfulUtils.java
+++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/AssistRESTfulUtils.java
@@ -50,7 +50,7 @@ public class AssistRESTfulUtils {
if (mediaServerItem == null) {
return null;
}
- if (ObjectUtils.isEmpty(mediaServerItem.getRecordAssistPort())) {
+ if (mediaServerItem.getRecordAssistPort() > 0) {
logger.warn("未启用Assist服务");
return null;
}
diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java
index e3ad0bd8b..ac57c3361 100644
--- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java
+++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java
@@ -7,12 +7,10 @@ import java.util.Map;
import com.alibaba.fastjson.JSON;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.conf.UserSetting;
-import com.genersoft.iot.vmp.gb28181.bean.Device;
-import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
-import com.genersoft.iot.vmp.gb28181.bean.GbStream;
-import com.genersoft.iot.vmp.gb28181.bean.SsrcTransaction;
+import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
+import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform;
import com.genersoft.iot.vmp.media.zlm.dto.*;
import com.genersoft.iot.vmp.service.*;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
@@ -20,10 +18,11 @@ import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.util.ObjectUtils;
-import org.springframework.util.StringUtils;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
@@ -49,6 +48,9 @@ public class ZLMHttpHookListener {
@Autowired
private SIPCommander cmder;
+ @Autowired
+ private SIPCommanderFroPlatform commanderFroPlatform;
+
@Autowired
private IPlayService playService;
@@ -77,7 +79,7 @@ public class ZLMHttpHookListener {
private ZLMMediaListManager zlmMediaListManager;
@Autowired
- private ZLMHttpHookSubscribe subscribe;
+ private ZlmHttpHookSubscribe subscribe;
@Autowired
private UserSetting userSetting;
@@ -91,6 +93,10 @@ public class ZLMHttpHookListener {
@Autowired
private AssistRESTfulUtils assistRESTfulUtils;
+ @Qualifier("taskExecutor")
+ @Autowired
+ private ThreadPoolTaskExecutor taskExecutor;
+
/**
* 服务器定时上报时间,上报间隔可配置,默认10s上报一次
*
@@ -101,9 +107,9 @@ public class ZLMHttpHookListener {
logger.info("[ ZLM HOOK ] on_server_keepalive API调用,参数:" + json.toString());
String mediaServerId = json.getString("mediaServerId");
- List subscribes = this.subscribe.getSubscribes(HookType.on_server_keepalive);
+ List subscribes = this.subscribe.getSubscribes(HookType.on_server_keepalive);
if (subscribes != null && subscribes.size() > 0) {
- for (ZLMHttpHookSubscribe.Event subscribe : subscribes) {
+ for (ZlmHttpHookSubscribe.Event subscribe : subscribes) {
subscribe.response(null, json);
}
}
@@ -167,7 +173,7 @@ public class ZLMHttpHookListener {
logger.debug("[ ZLM HOOK ]on_play API调用,参数:" + JSON.toJSONString(param));
}
String mediaServerId = param.getMediaServerId();
- ZLMHttpHookSubscribe.Event subscribe = this.subscribe.sendNotify(HookType.on_play, json);
+ ZlmHttpHookSubscribe.Event subscribe = this.subscribe.sendNotify(HookType.on_play, json);
if (subscribe != null ) {
MediaServerItem mediaInfo = mediaServerService.getOne(mediaServerId);
if (mediaInfo != null) {
@@ -237,9 +243,11 @@ public class ZLMHttpHookListener {
// 鉴权通过
redisCatchStorage.updateStreamAuthorityInfo(param.getApp(), param.getStream(), streamAuthorityInfo);
// 通知assist新的callId
- if (mediaInfo != null && mediaInfo.getRecordAssistPort() > 0) {
- assistRESTfulUtils.addStreamCallInfo(mediaInfo, param.getApp(), param.getStream(), callId, null);
- }
+ taskExecutor.execute(()->{
+ if (mediaInfo != null && mediaInfo.getRecordAssistPort() > 0) {
+ assistRESTfulUtils.addStreamCallInfo(mediaInfo, param.getApp(), param.getStream(), callId, null);
+ }
+ });
}else {
zlmMediaListManager.sendStreamEvent(param.getApp(),param.getStream(), param.getMediaServerId());
}
@@ -252,7 +260,7 @@ public class ZLMHttpHookListener {
}
- ZLMHttpHookSubscribe.Event subscribe = this.subscribe.sendNotify(HookType.on_publish, json);
+ ZlmHttpHookSubscribe.Event subscribe = this.subscribe.sendNotify(HookType.on_publish, json);
if (subscribe != null) {
if (mediaInfo != null) {
subscribe.response(mediaInfo, json);
@@ -376,7 +384,7 @@ public class ZLMHttpHookListener {
logger.debug("[ ZLM HOOK ]on_shell_login API调用,参数:" + json.toString());
}
String mediaServerId = json.getString("mediaServerId");
- ZLMHttpHookSubscribe.Event subscribe = this.subscribe.sendNotify(HookType.on_shell_login, json);
+ ZlmHttpHookSubscribe.Event subscribe = this.subscribe.sendNotify(HookType.on_shell_login, json);
if (subscribe != null ) {
MediaServerItem mediaInfo = mediaServerService.getOne(mediaServerId);
if (mediaInfo != null) {
@@ -402,7 +410,7 @@ public class ZLMHttpHookListener {
logger.info("[ ZLM HOOK ]on_stream_changed API调用,参数:" + JSONObject.toJSONString(item));
String mediaServerId = item.getMediaServerId();
JSONObject json = (JSONObject) JSON.toJSON(item);
- ZLMHttpHookSubscribe.Event subscribe = this.subscribe.sendNotify(HookType.on_stream_changed, json);
+ ZlmHttpHookSubscribe.Event subscribe = this.subscribe.sendNotify(HookType.on_stream_changed, json);
if (subscribe != null ) {
MediaServerItem mediaInfo = mediaServerService.getOne(mediaServerId);
if (mediaInfo != null) {
@@ -415,19 +423,24 @@ public class ZLMHttpHookListener {
String schema = item.getSchema();
List tracks = item.getTracks();
boolean regist = item.isRegist();
- if (regist) {
- StreamAuthorityInfo streamAuthorityInfo = redisCatchStorage.getStreamAuthorityInfo(app, stream);
- if (streamAuthorityInfo == null) {
- streamAuthorityInfo = StreamAuthorityInfo.getInstanceByHook(item);
+ if (item.getOriginType() == OriginType.RTMP_PUSH.ordinal()
+ || item.getOriginType() == OriginType.RTSP_PUSH.ordinal()
+ || item.getOriginType() == OriginType.RTC_PUSH.ordinal()) {
+ if (regist) {
+ StreamAuthorityInfo streamAuthorityInfo = redisCatchStorage.getStreamAuthorityInfo(app, stream);
+ if (streamAuthorityInfo == null) {
+ streamAuthorityInfo = StreamAuthorityInfo.getInstanceByHook(item);
+ }else {
+ streamAuthorityInfo.setOriginType(item.getOriginType());
+ streamAuthorityInfo.setOriginTypeStr(item.getOriginTypeStr());
+ }
+ redisCatchStorage.updateStreamAuthorityInfo(app, stream, streamAuthorityInfo);
}else {
- streamAuthorityInfo.setOriginType(item.getOriginType());
- streamAuthorityInfo.setOriginTypeStr(item.getOriginTypeStr());
+ redisCatchStorage.removeStreamAuthorityInfo(app, stream);
}
- redisCatchStorage.updateStreamAuthorityInfo(app, stream, streamAuthorityInfo);
- }else {
- redisCatchStorage.removeStreamAuthorityInfo(app, stream);
}
- if ("rtmp".equals(schema)){
+
+ if ("rtsp".equals(schema)){
logger.info("on_stream_changed:注册->{}, app->{}, stream->{}", regist, app, stream);
if (regist) {
mediaServerService.addCount(mediaServerId);
@@ -523,17 +536,21 @@ public class ZLMHttpHookListener {
if ("rtp".equals(app)){
ret.put("close", true);
StreamInfo streamInfoForPlayCatch = redisCatchStorage.queryPlayByStreamId(streamId);
- SsrcTransaction ssrcTransaction = sessionManager.getSsrcTransaction(null, null, null, streamId);
if (streamInfoForPlayCatch != null) {
- // 如果在给上级推流,也不停止。
+ // 收到无人观看说明流也没有在往上级推送
if (redisCatchStorage.isChannelSendingRTP(streamInfoForPlayCatch.getChannelId())) {
- ret.put("close", false);
- } else {
- cmder.streamByeCmd(streamInfoForPlayCatch.getDeviceID(), streamInfoForPlayCatch.getChannelId(),
- streamInfoForPlayCatch.getStream(), null);
- redisCatchStorage.stopPlay(streamInfoForPlayCatch);
- storager.stopPlay(streamInfoForPlayCatch.getDeviceID(), streamInfoForPlayCatch.getChannelId());
+ List sendRtpItems = redisCatchStorage.querySendRTPServerByChnnelId(streamInfoForPlayCatch.getChannelId());
+ if (sendRtpItems.size() > 0) {
+ for (SendRtpItem sendRtpItem : sendRtpItems) {
+ ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(sendRtpItem.getPlatformId());
+ commanderFroPlatform.streamByeCmd(parentPlatform, sendRtpItem.getCallId());
+ }
+ }
}
+ cmder.streamByeCmd(streamInfoForPlayCatch.getDeviceID(), streamInfoForPlayCatch.getChannelId(),
+ streamInfoForPlayCatch.getStream(), null);
+ redisCatchStorage.stopPlay(streamInfoForPlayCatch);
+ storager.stopPlay(streamInfoForPlayCatch.getDeviceID(), streamInfoForPlayCatch.getChannelId());
}else{
StreamInfo streamInfoForPlayBackCatch = redisCatchStorage.queryPlayback(null, null, streamId, null);
if (streamInfoForPlayBackCatch != null) {
@@ -615,9 +632,9 @@ public class ZLMHttpHookListener {
}
String remoteAddr = request.getRemoteAddr();
jsonObject.put("ip", remoteAddr);
- List subscribes = this.subscribe.getSubscribes(HookType.on_server_started);
+ List subscribes = this.subscribe.getSubscribes(HookType.on_server_started);
if (subscribes != null && subscribes.size() > 0) {
- for (ZLMHttpHookSubscribe.Event subscribe : subscribes) {
+ for (ZlmHttpHookSubscribe.Event subscribe : subscribes) {
subscribe.response(null, jsonObject);
}
}
diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaListManager.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaListManager.java
index a8b4a8d97..50a1fa59a 100644
--- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaListManager.java
+++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaListManager.java
@@ -1,30 +1,24 @@
package com.genersoft.iot.vmp.media.zlm;
-import com.alibaba.fastjson.JSONObject;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.gb28181.bean.GbStream;
import com.genersoft.iot.vmp.media.zlm.dto.*;
import com.genersoft.iot.vmp.service.IMediaServerService;
import com.genersoft.iot.vmp.service.IStreamProxyService;
import com.genersoft.iot.vmp.service.IStreamPushService;
-import com.genersoft.iot.vmp.service.bean.ThirdPartyGB;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
import com.genersoft.iot.vmp.storager.dao.GbStreamMapper;
import com.genersoft.iot.vmp.storager.dao.PlatformGbStreamMapper;
import com.genersoft.iot.vmp.storager.dao.StreamPushMapper;
import com.genersoft.iot.vmp.utils.DateUtil;
-import org.checkerframework.checker.units.qual.C;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
-import org.springframework.util.StringUtils;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
/**
* @author lin
@@ -59,7 +53,7 @@ public class ZLMMediaListManager {
private StreamPushMapper streamPushMapper;
@Autowired
- private ZLMHttpHookSubscribe subscribe;
+ private ZlmHttpHookSubscribe subscribe;
@Autowired
private UserSetting userSetting;
diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java
index c9e2b3484..65bebc947 100644
--- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java
+++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java
@@ -92,6 +92,7 @@ public class ZLMRTPServerFactory {
int result = -1;
// 查询此rtp server 是否已经存在
JSONObject rtpInfo = zlmresTfulUtils.getRtpInfo(mediaServerItem, streamId);
+ logger.info(JSONObject.toJSONString(rtpInfo));
if(rtpInfo.getInteger("code") == 0){
if (rtpInfo.getBoolean("exist")) {
result = rtpInfo.getInteger("local_port");
@@ -113,7 +114,7 @@ public class ZLMRTPServerFactory {
}
param.put("ssrc", ssrc);
JSONObject openRtpServerResultJson = zlmresTfulUtils.openRtpServer(mediaServerItem, param);
-
+ logger.info(JSONObject.toJSONString(openRtpServerResultJson));
if (openRtpServerResultJson != null) {
if (openRtpServerResultJson.getInteger("code") == 0) {
result= openRtpServerResultJson.getInteger("port");
@@ -277,7 +278,7 @@ public class ZLMRTPServerFactory {
* 查询待转推的流是否就绪
*/
public Boolean isRtpReady(MediaServerItem mediaServerItem, String streamId) {
- JSONObject mediaInfo = zlmresTfulUtils.getMediaInfo(mediaServerItem,"rtp", "rtmp", streamId);
+ JSONObject mediaInfo = zlmresTfulUtils.getMediaInfo(mediaServerItem,"rtp", "rtsp", streamId);
return (mediaInfo.getInteger("code") == 0 && mediaInfo.getBoolean("online"));
}
@@ -297,7 +298,7 @@ public class ZLMRTPServerFactory {
* @return
*/
public int totalReaderCount(MediaServerItem mediaServerItem, String app, String streamId) {
- JSONObject mediaInfo = zlmresTfulUtils.getMediaInfo(mediaServerItem, app, "rtmp", streamId);
+ JSONObject mediaInfo = zlmresTfulUtils.getMediaInfo(mediaServerItem, app, "rtsp", streamId);
if (mediaInfo == null) {
return 0;
}
diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRunner.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRunner.java
index b24d0a1a4..da4bb76c3 100644
--- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRunner.java
+++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRunner.java
@@ -8,7 +8,6 @@ import com.genersoft.iot.vmp.conf.MediaConfig;
import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForServerStarted;
-import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.service.IMediaServerService;
import org.slf4j.Logger;
@@ -19,9 +18,7 @@ import org.springframework.core.annotation.Order;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
-import java.time.Instant;
import java.util.*;
-import java.util.concurrent.TimeUnit;
@Component
@Order(value=1)
@@ -35,7 +32,7 @@ public class ZLMRunner implements CommandLineRunner {
private ZLMRESTfulUtils zlmresTfulUtils;
@Autowired
- private ZLMHttpHookSubscribe hookSubscribe;
+ private ZlmHttpHookSubscribe hookSubscribe;
@Autowired
private EventPublisher publisher;
@@ -62,8 +59,6 @@ public class ZLMRunner implements CommandLineRunner {
}
mediaServerService.syncCatchFromDatabase();
HookSubscribeForServerStarted hookSubscribeForServerStarted = HookSubscribeFactory.on_server_started();
-// Instant expiresInstant = Instant.now().plusSeconds(TimeUnit.SECONDS.toSeconds(60));
-// hookSubscribeForStreamChange.setExpires(expiresInstant);
// 订阅 zlm启动事件, 新的zlm也会从这里进入系统
hookSubscribe.addSubscribe(hookSubscribeForServerStarted,
(MediaServerItem mediaServerItem, JSONObject response)->{
diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookSubscribe.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZlmHttpHookSubscribe.java
similarity index 69%
rename from src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookSubscribe.java
rename to src/main/java/com/genersoft/iot/vmp/media/zlm/ZlmHttpHookSubscribe.java
index 57b6d81f9..823bdabb4 100644
--- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookSubscribe.java
+++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZlmHttpHookSubscribe.java
@@ -4,6 +4,9 @@ import com.alibaba.fastjson.JSONObject;
import com.genersoft.iot.vmp.media.zlm.dto.HookType;
import com.genersoft.iot.vmp.media.zlm.dto.IHookSubscribe;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
@@ -13,21 +16,22 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
/**
- * @description:针对 ZLMediaServer的hook事件订阅
- * @author: pan
- * @date: 2020年12月2日 21:17:32
+ * ZLMediaServer的hook事件订阅
+ * @author lin
*/
@Component
-public class ZLMHttpHookSubscribe {
+public class ZlmHttpHookSubscribe {
+
+ private final static Logger logger = LoggerFactory.getLogger(ZlmHttpHookSubscribe.class);
@FunctionalInterface
public interface Event{
void response(MediaServerItem mediaServerItem, JSONObject response);
}
- private Map> allSubscribes = new ConcurrentHashMap<>();
+ private Map> allSubscribes = new ConcurrentHashMap<>();
- public void addSubscribe(IHookSubscribe hookSubscribe, ZLMHttpHookSubscribe.Event event) {
+ public void addSubscribe(IHookSubscribe hookSubscribe, ZlmHttpHookSubscribe.Event event) {
if (hookSubscribe.getExpires() == null) {
// 默认5分钟过期
Instant expiresInstant = Instant.now().plusSeconds(TimeUnit.MINUTES.toSeconds(5));
@@ -36,8 +40,8 @@ public class ZLMHttpHookSubscribe {
allSubscribes.computeIfAbsent(hookSubscribe.getHookType(), k -> new ConcurrentHashMap<>()).put(hookSubscribe, event);
}
- public ZLMHttpHookSubscribe.Event sendNotify(HookType type, JSONObject hookResponse) {
- ZLMHttpHookSubscribe.Event event= null;
+ public ZlmHttpHookSubscribe.Event sendNotify(HookType type, JSONObject hookResponse) {
+ ZlmHttpHookSubscribe.Event event= null;
Map eventMap = allSubscribes.get(type);
if (eventMap == null) {
return null;
@@ -69,8 +73,8 @@ public class ZLMHttpHookSubscribe {
Set> entries = eventMap.entrySet();
if (entries.size() > 0) {
- List> entriesToRemove = new ArrayList<>();
- for (Map.Entry entry : entries) {
+ List> entriesToRemove = new ArrayList<>();
+ for (Map.Entry entry : entries) {
JSONObject content = entry.getKey().getContent();
if (content == null || content.size() == 0) {
entriesToRemove.add(entry);
@@ -87,13 +91,13 @@ public class ZLMHttpHookSubscribe {
result = result && content.getString(s).equals(hookSubscribe.getContent().getString(s));
}
}
- if (null != result && result){
+ if (result){
entriesToRemove.add(entry);
}
}
if (!CollectionUtils.isEmpty(entriesToRemove)) {
- for (Map.Entry entry : entriesToRemove) {
+ for (Map.Entry entry : entriesToRemove) {
entries.remove(entry);
}
}
@@ -106,12 +110,12 @@ public class ZLMHttpHookSubscribe {
* @param type
* @return
*/
- public List getSubscribes(HookType type) {
+ public List getSubscribes(HookType type) {
Map eventMap = allSubscribes.get(type);
if (eventMap == null) {
return null;
}
- List result = new ArrayList<>();
+ List result = new ArrayList<>();
for (IHookSubscribe key : eventMap.keySet()) {
result.add(eventMap.get(key));
}
@@ -127,5 +131,28 @@ public class ZLMHttpHookSubscribe {
return result;
}
+ /**
+ * 对订阅数据进行过期清理
+ */
+ @Scheduled(cron="0 0/5 * * * ?") //每5分钟执行一次
+ public void execute(){
+ logger.info("[hook订阅] 清理");
+
+ Instant instant = Instant.now().minusMillis(TimeUnit.MINUTES.toMillis(5));
+ int total = 0;
+ for (HookType hookType : allSubscribes.keySet()) {
+ Map hookSubscribeEventMap = allSubscribes.get(hookType);
+ if (hookSubscribeEventMap.size() > 0) {
+ for (IHookSubscribe hookSubscribe : hookSubscribeEventMap.keySet()) {
+ if (hookSubscribe.getExpires().isBefore(instant)) {
+ // 过期的
+ hookSubscribeEventMap.remove(hookSubscribe);
+ total ++;
+ }
+ }
+ }
+ }
+ logger.info("[hook订阅] 清理结束,共清理{}条过期数据", total);
+ }
}
diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/event/ZLMKeepliveTimeoutListener.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/event/ZLMKeepliveTimeoutListener.java
deleted file mode 100644
index d3af23c0b..000000000
--- a/src/main/java/com/genersoft/iot/vmp/media/zlm/event/ZLMKeepliveTimeoutListener.java
+++ /dev/null
@@ -1,72 +0,0 @@
-package com.genersoft.iot.vmp.media.zlm.event;
-
-import com.alibaba.fastjson.JSONObject;
-import com.genersoft.iot.vmp.common.VideoManagerConstants;
-import com.genersoft.iot.vmp.conf.RedisKeyExpirationEventMessageListener;
-import com.genersoft.iot.vmp.conf.UserSetting;
-import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
-import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils;
-import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
-import com.genersoft.iot.vmp.service.IMediaServerService;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.data.redis.connection.Message;
-import org.springframework.data.redis.listener.RedisMessageListenerContainer;
-import org.springframework.stereotype.Component;
-
-/**
- * @description:设备心跳超时监听,借助redis过期特性,进行监听,监听到说明设备心跳超时,发送离线事件
- * @author: swwheihei
- * @date: 2020年5月6日 上午11:35:46
- */
-@Component
-public class ZLMKeepliveTimeoutListener extends RedisKeyExpirationEventMessageListener {
-
- private Logger logger = LoggerFactory.getLogger(ZLMKeepliveTimeoutListener.class);
-
- @Autowired
- private EventPublisher publisher;
-
- @Autowired
- private ZLMRESTfulUtils zlmresTfulUtils;
-
- @Autowired
- private UserSetting userSetting;
-
- @Autowired
- private IMediaServerService mediaServerService;
-
- public ZLMKeepliveTimeoutListener(RedisMessageListenerContainer listenerContainer, UserSetting userSetting) {
- super(listenerContainer, userSetting);
- }
-
-
- /**
- * 监听失效的key,key格式为keeplive_deviceId
- * @param message
- * @param pattern
- */
- @Override
- public void onMessage(Message message, byte[] pattern) {
- // 获取失效的key
- String expiredKey = message.toString();
- String KEEPLIVEKEY_PREFIX = VideoManagerConstants.MEDIA_SERVER_KEEPALIVE_PREFIX + userSetting.getServerId() + "_";
- if(!expiredKey.startsWith(KEEPLIVEKEY_PREFIX)){
- return;
- }
-
- String mediaServerId = expiredKey.substring(KEEPLIVEKEY_PREFIX.length(),expiredKey.length());
- logger.info("[zlm心跳到期]:" + mediaServerId);
- // 发起http请求验证zlm是否确实无法连接,如果确实无法连接则发送离线事件,否则不作处理
- MediaServerItem mediaServerItem = mediaServerService.getOne(mediaServerId);
- JSONObject mediaServerConfig = zlmresTfulUtils.getMediaServerConfig(mediaServerItem);
- if (mediaServerConfig != null && mediaServerConfig.getInteger("code") == 0) {
- logger.info("[zlm心跳到期]:{}验证后zlm仍在线,恢复心跳信息", mediaServerId);
- // 添加zlm信息
- mediaServerService.updateMediaServerKeepalive(mediaServerId, mediaServerConfig);
- }else {
- publisher.zlmOfflineEventPublish(mediaServerId);
- }
- }
-}
diff --git a/src/main/java/com/genersoft/iot/vmp/service/IPlatformService.java b/src/main/java/com/genersoft/iot/vmp/service/IPlatformService.java
new file mode 100644
index 000000000..b5f3c5b7c
--- /dev/null
+++ b/src/main/java/com/genersoft/iot/vmp/service/IPlatformService.java
@@ -0,0 +1,45 @@
+package com.genersoft.iot.vmp.service;
+
+import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
+import com.github.pagehelper.PageInfo;
+
+/**
+ * 国标平台的业务类
+ * @author lin
+ */
+public interface IPlatformService {
+
+ ParentPlatform queryPlatformByServerGBId(String platformGbId);
+
+ /**
+ * 分页获取上级平台
+ * @param page
+ * @param count
+ * @return
+ */
+ PageInfo queryParentPlatformList(int page, int count);
+
+ /**
+ * 添加级联平台
+ * @param parentPlatform 级联平台
+ */
+ boolean add(ParentPlatform parentPlatform);
+
+ /**
+ * 平台上线
+ * @param parentPlatform 平台信息
+ */
+ void online(ParentPlatform parentPlatform);
+
+ /**
+ * 平台离线
+ * @param parentPlatform 平台信息
+ */
+ void offline(ParentPlatform parentPlatform);
+
+ /**
+ * 向上级平台发起注册
+ * @param parentPlatform
+ */
+ void login(ParentPlatform parentPlatform);
+}
diff --git a/src/main/java/com/genersoft/iot/vmp/service/IPlayService.java b/src/main/java/com/genersoft/iot/vmp/service/IPlayService.java
index 027eb97e9..9a2bb58e6 100644
--- a/src/main/java/com/genersoft/iot/vmp/service/IPlayService.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/IPlayService.java
@@ -6,7 +6,7 @@ import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.InviteStreamCallback;
import com.genersoft.iot.vmp.gb28181.bean.InviteStreamInfo;
import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
-import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe;
+import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.service.bean.InviteTimeOutCallback;
import com.genersoft.iot.vmp.service.bean.PlayBackCallback;
@@ -14,7 +14,6 @@ import com.genersoft.iot.vmp.service.bean.SSRCInfo;
import com.genersoft.iot.vmp.vmanager.gb28181.play.bean.AudioBroadcastEvent;
import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
import com.genersoft.iot.vmp.vmanager.gb28181.play.bean.PlayResult;
-import org.springframework.http.ResponseEntity;
import org.springframework.web.context.request.async.DeferredResult;
/**
@@ -25,9 +24,9 @@ public interface IPlayService {
void onPublishHandlerForPlay(MediaServerItem mediaServerItem, JSONObject resonse, String deviceId, String channelId, String uuid);
void play(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId,
- ZLMHttpHookSubscribe.Event hookEvent, SipSubscribe.Event errorEvent,
+ ZlmHttpHookSubscribe.Event hookEvent, SipSubscribe.Event errorEvent,
InviteTimeOutCallback timeoutCallback, String uuid);
- PlayResult play(MediaServerItem mediaServerItem, String deviceId, String channelId, ZLMHttpHookSubscribe.Event event, SipSubscribe.Event errorEvent, Runnable timeoutCallback);
+ PlayResult play(MediaServerItem mediaServerItem, String deviceId, String channelId, ZlmHttpHookSubscribe.Event event, SipSubscribe.Event errorEvent, Runnable timeoutCallback);
MediaServerItem getNewMediaServerItem(Device device);
diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java
index 867fcab0a..58f8741ea 100644
--- a/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java
@@ -11,6 +11,7 @@ import java.util.Set;
import com.genersoft.iot.vmp.media.zlm.ZLMRunner;
import com.genersoft.iot.vmp.service.IStreamProxyService;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
+import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.conf.exception.ControllerException;
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
import org.slf4j.Logger;
@@ -22,7 +23,6 @@ import org.springframework.stereotype.Service;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.TransactionStatus;
import org.springframework.util.ObjectUtils;
-import org.springframework.util.StringUtils;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
@@ -42,7 +42,6 @@ import com.genersoft.iot.vmp.service.bean.SSRCInfo;
import com.genersoft.iot.vmp.storager.dao.MediaServerMapper;
import com.genersoft.iot.vmp.utils.DateUtil;
import com.genersoft.iot.vmp.utils.redis.RedisUtil;
-import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
import okhttp3.OkHttpClient;
import okhttp3.Request;
@@ -58,6 +57,8 @@ public class MediaServerServiceImpl implements IMediaServerService {
private final static Logger logger = LoggerFactory.getLogger(MediaServerServiceImpl.class);
+ private final String zlmKeepaliveKeyPrefix = "zlm-keepalive_";
+
@Autowired
private SipConfig sipConfig;
@@ -88,10 +89,12 @@ public class MediaServerServiceImpl implements IMediaServerService {
@Autowired
private ZLMRTPServerFactory zlmrtpServerFactory;
-
@Autowired
private EventPublisher publisher;
+ @Autowired
+ private DynamicTask dynamicTask;
+
/**
* 初始化
*/
@@ -135,7 +138,7 @@ public class MediaServerServiceImpl implements IMediaServerService {
logger.info("media server [ {} ] ssrcConfig is null", mediaServerItem.getId());
return null;
}else {
- String ssrc = null;
+ String ssrc;
if (presetSsrc != null) {
ssrc = presetSsrc;
}else {
@@ -404,15 +407,43 @@ public class MediaServerServiceImpl implements IMediaServerService {
if (serverItem.isAutoConfig()) {
setZLMConfig(serverItem, "0".equals(zlmServerConfig.getHookEnable()));
}
+ final String zlmKeepaliveKey = zlmKeepaliveKeyPrefix + serverItem.getId();
+ dynamicTask.stop(zlmKeepaliveKey);
+ dynamicTask.startDelay(zlmKeepaliveKey, new KeepAliveTimeoutRunnable(serverItem), (serverItem.getHookAliveInterval() + 5) * 1000);
publisher.zlmOnlineEventPublish(serverItem.getId());
logger.info("[ZLM] 连接成功 {} - {}:{} ",
zlmServerConfig.getGeneralMediaServerId(), zlmServerConfig.getIp(), zlmServerConfig.getHttpPort());
}
+ class KeepAliveTimeoutRunnable implements Runnable{
+
+ private MediaServerItem serverItem;
+
+ public KeepAliveTimeoutRunnable(MediaServerItem serverItem) {
+ this.serverItem = serverItem;
+ }
+
+ @Override
+ public void run() {
+ logger.info("[zlm心跳到期]:" + serverItem.getId());
+ // 发起http请求验证zlm是否确实无法连接,如果确实无法连接则发送离线事件,否则不作处理
+ JSONObject mediaServerConfig = zlmresTfulUtils.getMediaServerConfig(serverItem);
+ if (mediaServerConfig != null && mediaServerConfig.getInteger("code") == 0) {
+ logger.info("[zlm心跳到期]:{}验证后zlm仍在线,恢复心跳信息,请检查zlm是否可以正常向wvp发送心跳", serverItem.getId());
+ // 添加zlm信息
+ updateMediaServerKeepalive(serverItem.getId(), mediaServerConfig);
+ }else {
+ publisher.zlmOfflineEventPublish(serverItem.getId());
+ }
+ }
+ }
+
@Override
public void zlmServerOffline(String mediaServerId) {
delete(mediaServerId);
+ final String zlmKeepaliveKey = zlmKeepaliveKeyPrefix + mediaServerId;
+ dynamicTask.stop(zlmKeepaliveKey);
}
@Override
@@ -423,7 +454,7 @@ public class MediaServerServiceImpl implements IMediaServerService {
if (RedisUtil.zScore(key, serverItem.getId()) == null) { // 不存在则设置默认值 已存在则重置
RedisUtil.zAdd(key, serverItem.getId(), 0L);
// 查询服务流数量
- zlmresTfulUtils.getMediaList(serverItem, null, null, "rtmp",(mediaList ->{
+ zlmresTfulUtils.getMediaList(serverItem, null, null, "rtsp",(mediaList ->{
Integer code = mediaList.getInteger("code");
if (code == 0) {
JSONArray data = mediaList.getJSONArray("data");
@@ -435,7 +466,6 @@ public class MediaServerServiceImpl implements IMediaServerService {
}else {
clearRTPServer(serverItem);
}
-
}
@@ -471,7 +501,7 @@ public class MediaServerServiceImpl implements IMediaServerService {
}
// 获取分数最低的,及并发最低的
- Set