diff --git a/README.md b/README.md
index df71b6c0d..4f09aba7c 100644
--- a/README.md
+++ b/README.md
@@ -25,7 +25,6 @@ WEB VIDEO PLATFORM是一个基于GB28181-2016标准实现的开箱即用的网
# 文档
wvp使用文档 [https://doc.wvp-pro.cn](https://doc.wvp-pro.cn)
ZLM使用文档 [https://github.com/ZLMediaKit/ZLMediaKit](https://github.com/ZLMediaKit/ZLMediaKit)
-> wvp文档由gitee提供服务,如果遇到打不开请多刷新几次。
# 付费社群
[](https://t.zsxq.com/0d8VAD3Dm)
diff --git a/doc/_content/ability/cloud_record.md b/doc/_content/ability/cloud_record.md
index 4ac4f8f12..0aa55e1e5 100644
--- a/doc/_content/ability/cloud_record.md
+++ b/doc/_content/ability/cloud_record.md
@@ -1,6 +1,6 @@
# 云端录像
-云端录像是对录制在zlm服务下的录像文件的管理,录像的文件路径默认在ZLM/www/record下,使用云端录像功能必须部署wvp-pro-assist,主要通过调用wvp-pro-assist的接口完成各种功能。
+云端录像是对录制在zlm服务下的录像文件的管理,录像的文件路径默认在ZLM/www/record下。
如果你需要24小时的录像,目前有一个这种方案,可以参考[7*24不间断录像](./_content/ability/continuous_recording.md)。
1. 云段录像支持录像文件的查看,播放(可能因为编码的原因导致无法播放);
2. 支持录像的下载;
diff --git a/doc/_content/introduction/compile.md b/doc/_content/introduction/compile.md
index 1a9d58b45..66693bd9c 100644
--- a/doc/_content/introduction/compile.md
+++ b/doc/_content/introduction/compile.md
@@ -16,7 +16,6 @@ WVP-PRO使用Spring boot开发,maven管理依赖。对于熟悉spring开发的
|----------------|------------------------------------------|-------------------------|
| WVP-PRO | 实现国标28181的信令以及视频平台相关的功能 | 是 |
| ZLMediaKit | 为WVP-PRO提供国标28181的媒体部分的实现,以及各种视频流格式的分发支持 | 是 |
-| wvp-pro-assist | wvp的辅助录像程序,也可单独跟zlm一起使用,提供录像控制,录像合并下载接口 | 否(不安装只是影响云端录像功能和国标录像下载) |
## 2 安装依赖
| 依赖 | 版本 | 用途 | 开发环境需要 | 生产环境需要 |
diff --git a/doc/_content/introduction/deployment.md b/doc/_content/introduction/deployment.md
index 45c1a83a6..3883842a7 100644
--- a/doc/_content/introduction/deployment.md
+++ b/doc/_content/introduction/deployment.md
@@ -2,7 +2,7 @@
# 部署
**请仔细阅读以下内容**
-1. WVP-PRO与ZLM支持分开部署,但是wvp-pro-assist必须与zlm部署在同一台主机;
+1. WVP-PRO与ZLM支持分开部署;
2. 需要开放的端口
| 服务 | 端口 | 类型 | 必选 |
|-----|:-------------------------|-------------|-------|
@@ -18,11 +18,10 @@
| zlm | rtp.port-range(在wvp中配置) | udp and tcp | 多端口开放 |
3. 测试环境部署建议所有服务部署在一台主机,关闭防火墙,减少因网络出现问题的可能;
-4. WVP-PRO与ZLM支持分开部署,但是wvp-pro-assist必须与zlm部署在同一台主机;
-5. 生产环境按需开放端口,但是建议修改默认端口,尤其是5060端口,易受到攻击;
-6. zlm使用docker部署的情况,要求端口映射一致,比如映射5060,应将外部端口也映射为5060端口;
-7. zlm与wvp会保持高频率的通信,所以不要去将wvp与zlm分属在两个网络,比如wvp在内网,zlm却在公网的情况。
-8. 启动服务,以linux为例
+4. 生产环境按需开放端口,但是建议修改默认端口,尤其是5060端口,易受到攻击;
+5. zlm使用docker部署的情况,要求端口映射一致,比如映射5060,应将外部端口也映射为5060端口;
+6. zlm与wvp会保持高频率的通信,所以不要去将wvp与zlm分属在两个网络,比如wvp在内网,zlm却在公网的情况。
+7. 启动服务,以linux为例
**启动WVP-PRO**
```shell
nohup java -jar wvp-pro-*.jar &
diff --git a/pom.xml b/pom.xml
index df841d521..e129b7842 100644
--- a/pom.xml
+++ b/pom.xml
@@ -99,6 +99,10 @@
org.springframework.boot
spring-boot-starter-data-redis
+
+ org.springframework.boot
+ spring-boot-starter-cache
+
org.springframework.boot
spring-boot-starter-web
diff --git a/src/main/java/com/genersoft/iot/vmp/VManageBootstrap.java b/src/main/java/com/genersoft/iot/vmp/VManageBootstrap.java
index be573166b..262910b0b 100644
--- a/src/main/java/com/genersoft/iot/vmp/VManageBootstrap.java
+++ b/src/main/java/com/genersoft/iot/vmp/VManageBootstrap.java
@@ -9,6 +9,7 @@ import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.boot.web.servlet.ServletComponentScan;
import org.springframework.boot.web.servlet.support.SpringBootServletInitializer;
+import org.springframework.cache.annotation.EnableCaching;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.scheduling.annotation.EnableScheduling;
@@ -24,6 +25,7 @@ import java.util.Collections;
@ServletComponentScan("com.genersoft.iot.vmp.conf")
@SpringBootApplication
@EnableScheduling
+@EnableCaching
public class VManageBootstrap extends SpringBootServletInitializer {
private final static Logger logger = LoggerFactory.getLogger(VManageBootstrap.class);
diff --git a/src/main/java/com/genersoft/iot/vmp/conf/MybatisConfig.java b/src/main/java/com/genersoft/iot/vmp/conf/MybatisConfig.java
index 03ef09945..7f25a3629 100644
--- a/src/main/java/com/genersoft/iot/vmp/conf/MybatisConfig.java
+++ b/src/main/java/com/genersoft/iot/vmp/conf/MybatisConfig.java
@@ -1,6 +1,8 @@
package com.genersoft.iot.vmp.conf;
import org.apache.ibatis.logging.stdout.StdOutImpl;
+import org.apache.ibatis.mapping.DatabaseIdProvider;
+import org.apache.ibatis.mapping.VendorDatabaseIdProvider;
import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.SqlSessionFactoryBean;
import org.springframework.beans.factory.annotation.Autowired;
@@ -9,6 +11,7 @@ import org.springframework.context.annotation.Configuration;
import org.springframework.core.annotation.Order;
import javax.sql.DataSource;
+import java.util.Properties;
/**
* 配置mybatis
@@ -21,7 +24,29 @@ public class MybatisConfig {
private UserSetting userSetting;
@Bean
- public SqlSessionFactory sqlSessionFactory(DataSource dataSource) throws Exception {
+ public DatabaseIdProvider databaseIdProvider() {
+ VendorDatabaseIdProvider databaseIdProvider = new VendorDatabaseIdProvider();
+ Properties properties = new Properties();
+ properties.setProperty("Oracle", "oracle");
+ properties.setProperty("MySQL", "mysql");
+ properties.setProperty("DB2", "db2");
+ properties.setProperty("Derby", "derby");
+ properties.setProperty("H2", "h2");
+ properties.setProperty("HSQL", "hsql");
+ properties.setProperty("Informix", "informix");
+ properties.setProperty("MS-SQL", "ms-sql");
+ properties.setProperty("PostgreSQL", "postgresql");
+ properties.setProperty("Sybase", "sybase");
+ properties.setProperty("Hana", "hana");
+ properties.setProperty("DM", "dm");
+ properties.setProperty("KingbaseES", "kingbase");
+ properties.setProperty("KingBase8", "kingbase");
+ databaseIdProvider.setProperties(properties);
+ return databaseIdProvider;
+ }
+
+ @Bean
+ public SqlSessionFactory sqlSessionFactory(DataSource dataSource, DatabaseIdProvider databaseIdProvider) throws Exception {
final SqlSessionFactoryBean sqlSessionFactory = new SqlSessionFactoryBean();
sqlSessionFactory.setDataSource(dataSource);
org.apache.ibatis.session.Configuration config = new org.apache.ibatis.session.Configuration();
@@ -30,6 +55,7 @@ public class MybatisConfig {
}
config.setMapUnderscoreToCamelCase(true);
sqlSessionFactory.setConfiguration(config);
+ sqlSessionFactory.setDatabaseIdProvider(databaseIdProvider);
return sqlSessionFactory.getObject();
}
diff --git a/src/main/java/com/genersoft/iot/vmp/conf/security/JwtAuthenticationFilter.java b/src/main/java/com/genersoft/iot/vmp/conf/security/JwtAuthenticationFilter.java
index f45f89a13..274a19f8a 100644
--- a/src/main/java/com/genersoft/iot/vmp/conf/security/JwtAuthenticationFilter.java
+++ b/src/main/java/com/genersoft/iot/vmp/conf/security/JwtAuthenticationFilter.java
@@ -51,8 +51,11 @@ public class JwtAuthenticationFilter extends OncePerRequestFilter {
if (StringUtils.isBlank(jwt)) {
jwt = request.getParameter(JwtUtils.getHeader());
if (StringUtils.isBlank(jwt)) {
- chain.doFilter(request, response);
- return;
+ jwt = request.getHeader(JwtUtils.getApiKeyHeader());
+ if (StringUtils.isBlank(jwt)) {
+ chain.doFilter(request, response);
+ return;
+ }
}
}
diff --git a/src/main/java/com/genersoft/iot/vmp/conf/security/JwtUtils.java b/src/main/java/com/genersoft/iot/vmp/conf/security/JwtUtils.java
index fcd194614..eacff1888 100644
--- a/src/main/java/com/genersoft/iot/vmp/conf/security/JwtUtils.java
+++ b/src/main/java/com/genersoft/iot/vmp/conf/security/JwtUtils.java
@@ -1,8 +1,12 @@
package com.genersoft.iot.vmp.conf.security;
import com.genersoft.iot.vmp.conf.security.dto.JwtUser;
+import com.genersoft.iot.vmp.service.IUserApiKeyService;
import com.genersoft.iot.vmp.service.IUserService;
import com.genersoft.iot.vmp.storager.dao.dto.User;
+import com.genersoft.iot.vmp.storager.dao.dto.UserApiKey;
+import org.jose4j.jwk.JsonWebKey;
+import org.jose4j.jwk.JsonWebKeySet;
import org.jose4j.jwk.RsaJsonWebKey;
import org.jose4j.jwk.RsaJwkGenerator;
import org.jose4j.jws.AlgorithmIdentifiers;
@@ -20,8 +24,13 @@ import org.springframework.beans.factory.InitializingBean;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
+import java.util.List;
+import java.util.Map;
@Component
public class JwtUtils implements InitializingBean {
@@ -30,6 +39,8 @@ public class JwtUtils implements InitializingBean {
public static final String HEADER = "access-token";
+ public static final String API_KEY_HEADER = "api-key";
+
private static final String AUDIENCE = "Audience";
private static final String keyId = "3e79646c4dbc408383a9eed09f2b85ae";
@@ -37,17 +48,28 @@ public class JwtUtils implements InitializingBean {
/**
* token过期时间(分钟)
*/
- public static final long expirationTime = 30 * 24 * 60;
+ public static final long EXPIRATION_TIME = 30 * 24 * 60;
private static RsaJsonWebKey rsaJsonWebKey;
private static IUserService userService;
+ private static IUserApiKeyService userApiKeyService;
+
+ public static String getApiKeyHeader() {
+ return API_KEY_HEADER;
+ }
+
@Resource
public void setUserService(IUserService userService) {
JwtUtils.userService = userService;
}
+ @Resource
+ public void setUserApiKeyService(IUserApiKeyService userApiKeyService) {
+ JwtUtils.userApiKeyService = userApiKeyService;
+ }
+
@Override
public void afterPropertiesSet() {
try {
@@ -59,17 +81,34 @@ public class JwtUtils implements InitializingBean {
/**
* 创建密钥对
+ *
* @throws JoseException JoseException
*/
private RsaJsonWebKey generateRsaJsonWebKey() throws JoseException {
- // 生成一个RSA密钥对,该密钥对将用于JWT的签名和验证,包装在JWK中
- RsaJsonWebKey rsaJsonWebKey = RsaJwkGenerator.generateJwk(2048);
- // 给JWK一个密钥ID
- rsaJsonWebKey.setKeyId(keyId);
+ RsaJsonWebKey rsaJsonWebKey = null;
+ try (BufferedReader reader = new BufferedReader(new InputStreamReader(getClass().getClassLoader().getResourceAsStream("/jwk.json"), StandardCharsets.UTF_8))) {
+ String jwkJson = reader.readLine();
+ JsonWebKeySet jsonWebKeySet = new JsonWebKeySet(jwkJson);
+ List jsonWebKeys = jsonWebKeySet.getJsonWebKeys();
+ if (!jsonWebKeys.isEmpty()) {
+ JsonWebKey jsonWebKey = jsonWebKeys.get(0);
+ if (jsonWebKey instanceof RsaJsonWebKey) {
+ rsaJsonWebKey = (RsaJsonWebKey) jsonWebKey;
+ }
+ }
+ } catch (Exception e) {
+ // ignored
+ }
+ if (rsaJsonWebKey == null) {
+ // 生成一个RSA密钥对,该密钥对将用于JWT的签名和验证,包装在JWK中
+ rsaJsonWebKey = RsaJwkGenerator.generateJwk(2048);
+ // 给JWK一个密钥ID
+ rsaJsonWebKey.setKeyId(keyId);
+ }
return rsaJsonWebKey;
}
- public static String createToken(String username) {
+ public static String createToken(String username, Long expirationTime, Map extra) {
try {
/*
* “iss” (issuer) 发行人
@@ -83,13 +122,17 @@ public class JwtUtils implements InitializingBean {
claims.setGeneratedJwtId();
claims.setIssuedAtToNow();
// 令牌将过期的时间 分钟
- claims.setExpirationTimeMinutesInTheFuture(expirationTime);
+ if (expirationTime != null) {
+ claims.setExpirationTimeMinutesInTheFuture(expirationTime);
+ }
claims.setNotBeforeMinutesInThePast(0);
claims.setSubject("login");
claims.setAudience(AUDIENCE);
//添加自定义参数,必须是字符串类型
claims.setClaim("userName", username);
-
+ if (extra != null) {
+ extra.forEach(claims::setClaim);
+ }
//jws
JsonWebSignature jws = new JsonWebSignature();
//签名算法RS256
@@ -104,10 +147,17 @@ public class JwtUtils implements InitializingBean {
} catch (JoseException e) {
logger.error("[Token生成失败]: {}", e.getMessage());
}
-
return null;
}
+ public static String createToken(String username, Long expirationTime) {
+ return createToken(username, expirationTime, null);
+ }
+
+ public static String createToken(String username) {
+ return createToken(username, EXPIRATION_TIME);
+ }
+
public static String getHeader() {
return HEADER;
}
@@ -118,8 +168,8 @@ public class JwtUtils implements InitializingBean {
try {
JwtConsumer consumer = new JwtConsumerBuilder()
- .setRequireExpirationTime()
- .setMaxFutureValidityInMinutes(5256000)
+ //.setRequireExpirationTime()
+ //.setMaxFutureValidityInMinutes(5256000)
.setAllowedClockSkewInSeconds(30)
.setRequireSubject()
//.setExpectedIssuer("")
@@ -129,15 +179,27 @@ public class JwtUtils implements InitializingBean {
JwtClaims claims = consumer.processToClaims(token);
NumericDate expirationTime = claims.getExpirationTime();
- // 判断是否即将过期, 默认剩余时间小于5分钟未即将过期
- // 剩余时间 (秒)
- long timeRemaining = LocalDateTime.now().toEpochSecond(ZoneOffset.ofHours(8)) - expirationTime.getValue();
- if (timeRemaining < 5 * 60) {
- jwtUser.setStatus(JwtUser.TokenStatus.EXPIRING_SOON);
+ if (expirationTime != null) {
+ // 判断是否即将过期, 默认剩余时间小于5分钟未即将过期
+ // 剩余时间 (秒)
+ long timeRemaining = LocalDateTime.now().toEpochSecond(ZoneOffset.ofHours(8)) - expirationTime.getValue();
+ if (timeRemaining < 5 * 60) {
+ jwtUser.setStatus(JwtUser.TokenStatus.EXPIRING_SOON);
+ } else {
+ jwtUser.setStatus(JwtUser.TokenStatus.NORMAL);
+ }
} else {
jwtUser.setStatus(JwtUser.TokenStatus.NORMAL);
}
+ Long apiKeyId = claims.getClaimValue("apiKeyId", Long.class);
+ if (apiKeyId != null) {
+ UserApiKey userApiKey = userApiKeyService.getUserApiKeyById(apiKeyId.intValue());
+ if (userApiKey == null || !userApiKey.isEnable()) {
+ jwtUser.setStatus(JwtUser.TokenStatus.EXPIRED);
+ }
+ }
+
String username = (String) claims.getClaimValue("userName");
User user = userService.getUserByUsername(username);
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpItem.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpItem.java
index 30193d275..c133c8222 100755
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpItem.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpItem.java
@@ -1,5 +1,7 @@
package com.genersoft.iot.vmp.gb28181.bean;
+import com.genersoft.iot.vmp.service.bean.RequestPushStreamMsg;
+
public class SendRtpItem {
/**
@@ -122,6 +124,39 @@ public class SendRtpItem {
*/
private String receiveStream;
+ public static SendRtpItem getInstance(RequestPushStreamMsg requestPushStreamMsg) {
+ SendRtpItem sendRtpItem = new SendRtpItem();
+ sendRtpItem.setMediaServerId(requestPushStreamMsg.getMediaServerId());
+ sendRtpItem.setApp(requestPushStreamMsg.getApp());
+ sendRtpItem.setStream(requestPushStreamMsg.getStream());
+ sendRtpItem.setIp(requestPushStreamMsg.getIp());
+ sendRtpItem.setPort(requestPushStreamMsg.getPort());
+ sendRtpItem.setSsrc(requestPushStreamMsg.getSsrc());
+ sendRtpItem.setTcp(requestPushStreamMsg.isTcp());
+ sendRtpItem.setLocalPort(requestPushStreamMsg.getSrcPort());
+ sendRtpItem.setPt(requestPushStreamMsg.getPt());
+ sendRtpItem.setUsePs(requestPushStreamMsg.isPs());
+ sendRtpItem.setOnlyAudio(requestPushStreamMsg.isOnlyAudio());
+ return sendRtpItem;
+
+ }
+
+ public static SendRtpItem getInstance(String app, String stream, String ssrc, String dstIp, Integer dstPort, boolean tcp, int sendLocalPort, Integer pt) {
+ SendRtpItem sendRtpItem = new SendRtpItem();
+ sendRtpItem.setApp(app);
+ sendRtpItem.setStream(stream);
+ sendRtpItem.setSsrc(ssrc);
+ sendRtpItem.setTcp(tcp);
+ sendRtpItem.setLocalPort(sendLocalPort);
+ sendRtpItem.setIp(dstIp);
+ sendRtpItem.setPort(dstPort);
+ if (pt != null) {
+ sendRtpItem.setPt(pt);
+ }
+
+ return sendRtpItem;
+ }
+
public String getIp() {
return ip;
}
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/event/record/RecordEndEventListener.java b/src/main/java/com/genersoft/iot/vmp/gb28181/event/record/RecordEndEventListener.java
index 167b9f201..e6cfac3d5 100755
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/event/record/RecordEndEventListener.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/event/record/RecordEndEventListener.java
@@ -37,15 +37,14 @@ public class RecordEndEventListener implements ApplicationListener {
+ RequestPushStreamMsg requestPushStreamMsg = RequestPushStreamMsg.getInstance(sendRtpItem);
+ redisGbPlayMsgListener.sendMsgForStartSendRtpStream(sendRtpItem.getServerId(), requestPushStreamMsg, () -> {
playService.startSendRtpStreamFailHand(sendRtpItem, parentPlatform, callIdHeader);
});
} else {
@@ -134,7 +115,7 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In
if (sendRtpItem.isTcpActive()) {
mediaServerService.startSendRtpPassive(mediaInfo, parentPlatform, sendRtpItem, null);
} else {
- mediaServerService.startSendRtpStream(mediaInfo, parentPlatform, sendRtpItem);
+ mediaServerService.startSendRtp(mediaInfo, parentPlatform, sendRtpItem);
}
}catch (ControllerException e) {
logger.error("RTP推流失败: {}", e.getMessage());
@@ -159,7 +140,7 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In
if (sendRtpItem.isTcpActive()) {
mediaServerService.startSendRtpPassive(mediaInfo, null, sendRtpItem, null);
} else {
- mediaServerService.startSendRtpStream(mediaInfo, null, sendRtpItem);
+ mediaServerService.startSendRtp(mediaInfo, null, sendRtpItem);
}
}catch (ControllerException e) {
logger.error("RTP推流失败: {}", e.getMessage());
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java
index b2d14a0f1..302b6941c 100755
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java
@@ -6,16 +6,15 @@ import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.session.AudioBroadcastManager;
-import com.genersoft.iot.vmp.gb28181.session.SSRCFactory;
import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
-import com.genersoft.iot.vmp.media.service.IMediaServerService;
-import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
+import com.genersoft.iot.vmp.media.bean.MediaInfo;
import com.genersoft.iot.vmp.media.bean.MediaServer;
+import com.genersoft.iot.vmp.media.service.IMediaServerService;
import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
import com.genersoft.iot.vmp.service.*;
import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
@@ -36,8 +35,6 @@ import javax.sip.SipException;
import javax.sip.header.CallIdHeader;
import javax.sip.message.Response;
import java.text.ParseException;
-import java.util.HashMap;
-import java.util.Map;
/**
* SIP命令类型: BYE请求
@@ -75,12 +72,6 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In
@Autowired
private IVideoManagerStorage storager;
- @Autowired
- private ZLMServerFactory zlmServerFactory;
-
- @Autowired
- private SSRCFactory ssrcFactory;
-
@Autowired
private IMediaServerService mediaServerService;
@@ -110,7 +101,6 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In
/**
* 处理BYE请求
- * @param evt
*/
@Override
public void process(RequestEvent evt) {
@@ -128,11 +118,6 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In
logger.info("[收到bye] 来自{},停止通道:{}, 类型: {}, callId: {}", sendRtpItem.getPlatformId(), sendRtpItem.getChannelId(), sendRtpItem.getPlayType(), callIdHeader.getCallId());
String streamId = sendRtpItem.getStream();
- Map param = new HashMap<>();
- param.put("vhost","__defaultVhost__");
- param.put("app",sendRtpItem.getApp());
- param.put("stream",streamId);
- param.put("ssrc",sendRtpItem.getSsrc());
logger.info("[收到bye] 停止推流:{}, 媒体节点: {}", streamId, sendRtpItem.getMediaServerId());
if (sendRtpItem.getPlayType().equals(InviteStreamType.PUSH)) {
@@ -149,7 +134,7 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In
MediaServer mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
redisCatchStorage.deleteSendRTPServer(sendRtpItem.getPlatformId(), sendRtpItem.getChannelId(),
callIdHeader.getCallId(), null);
- zlmServerFactory.stopSendRtpStream(mediaInfo, param);
+ mediaServerService.stopSendRtp(mediaInfo, sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getSsrc());
if (userSetting.getUseCustomSsrcForParentInvite()) {
mediaServerService.releaseSsrc(mediaInfo.getId(), sendRtpItem.getSsrc());
}
@@ -169,13 +154,13 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In
MediaServer mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
redisCatchStorage.deleteSendRTPServer(sendRtpItem.getPlatformId(), sendRtpItem.getChannelId(),
callIdHeader.getCallId(), null);
- zlmServerFactory.stopSendRtpStream(mediaInfo, param);
+ mediaServerService.stopSendRtp(mediaInfo, sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getSsrc());
if (userSetting.getUseCustomSsrcForParentInvite()) {
mediaServerService.releaseSsrc(mediaInfo.getId(), sendRtpItem.getSsrc());
}
}
- MediaServer mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
- if (mediaInfo != null) {
+ MediaServer mediaServer = mediaServerService.getOne(sendRtpItem.getMediaServerId());
+ if (mediaServer != null) {
AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
if (audioBroadcastCatch != null && audioBroadcastCatch.getSipTransactionInfo().getCallId().equals(callIdHeader.getCallId())) {
// 来自上级平台的停止对讲
@@ -183,8 +168,9 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In
audioBroadcastManager.del(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
}
- int totalReaderCount = zlmServerFactory.totalReaderCount(mediaInfo, sendRtpItem.getApp(), streamId);
- if (totalReaderCount <= 0) {
+ MediaInfo mediaInfo = mediaServerService.getMediaInfo(mediaServer, sendRtpItem.getApp(), streamId);
+
+ if (mediaInfo.getReaderCount() <= 0) {
logger.info("[收到bye] {} 无其它观看者,通知设备停止推流", streamId);
if (sendRtpItem.getPlayType().equals(InviteStreamType.PLAY)) {
Device device = deviceService.getDevice(sendRtpItem.getDeviceId());
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
index b4d183ed6..46e779d92 100755
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
@@ -8,6 +8,7 @@ import com.genersoft.iot.vmp.common.VideoManagerConstants;
import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.conf.SipConfig;
import com.genersoft.iot.vmp.conf.UserSetting;
+import com.genersoft.iot.vmp.conf.exception.ControllerException;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.session.AudioBroadcastManager;
import com.genersoft.iot.vmp.gb28181.session.SSRCFactory;
@@ -24,7 +25,6 @@ import com.genersoft.iot.vmp.media.event.hook.HookSubscribe;
import com.genersoft.iot.vmp.media.event.hook.HookType;
import com.genersoft.iot.vmp.media.service.IMediaServerService;
import com.genersoft.iot.vmp.media.zlm.ZLMMediaListManager;
-import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem;
import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
import com.genersoft.iot.vmp.service.IInviteStreamService;
@@ -61,7 +61,6 @@ import javax.sip.header.CallIdHeader;
import javax.sip.message.Response;
import java.text.ParseException;
import java.time.Instant;
-import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import java.util.Vector;
@@ -113,9 +112,6 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
@Autowired
private AudioBroadcastManager audioBroadcastManager;
- @Autowired
- private ZLMServerFactory zlmServerFactory;
-
@Autowired
private IMediaServerService mediaServerService;
@@ -382,8 +378,9 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
} else {
streamTypeStr = "UDP";
}
- logger.info("[上级Invite] {}, 平台:{}, 通道:{}, 收流地址:{}:{},收流方式:{}, ssrc:{}", sessionName, username, channelId, addressStr, port, streamTypeStr, ssrc);
- SendRtpItem sendRtpItem = zlmServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId,
+ logger.info("[上级Invite] {}, 平台:{}, 通道:{}, 收流地址:{}:{},收流方式:{}, ssrc:{}",
+ sessionName, username, channelId, addressStr, port, streamTypeStr, ssrc);
+ SendRtpItem sendRtpItem = mediaServerService.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId,
device.getDeviceId(), channelId, mediaTransmissionTCP, platform.isRtcp());
if (tcpActive != null) {
@@ -462,30 +459,10 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
responseSdpAck(request, content.toString(), platform);
// tcp主动模式,回复sdp后开启监听
if (sendRtpItem.isTcpActive()) {
- MediaServer mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
- Map param = new HashMap<>(12);
- param.put("vhost","__defaultVhost__");
- param.put("app",sendRtpItem.getApp());
- param.put("stream",sendRtpItem.getStream());
- param.put("ssrc", sendRtpItem.getSsrc());
- if (!sendRtpItem.isTcpActive()) {
- param.put("dst_url",sendRtpItem.getIp());
- param.put("dst_port", sendRtpItem.getPort());
- }
- String is_Udp = sendRtpItem.isTcp() ? "0" : "1";
- param.put("is_udp", is_Udp);
- param.put("src_port", localPort);
- param.put("pt", sendRtpItem.getPt());
- param.put("use_ps", sendRtpItem.isUsePs() ? "1" : "0");
- param.put("only_audio", sendRtpItem.isOnlyAudio() ? "1" : "0");
- if (!sendRtpItem.isTcp()) {
- // 开启rtcp保活
- param.put("udp_rtcp_timeout", sendRtpItem.isRtcp()? "1":"0");
- }
- JSONObject startSendRtpStreamResult = zlmServerFactory.startSendRtpPassive(mediaInfo, param);
- if (startSendRtpStreamResult != null) {
- startSendRtpStreamHand(evt, sendRtpItem, null, startSendRtpStreamResult, param, callIdHeader);
- }
+ MediaServer mediaServer = mediaServerService.getOne(sendRtpItem.getMediaServerId());
+ try {
+ mediaServerService.startSendRtpPassive(mediaServer, platform, sendRtpItem, 5);
+ }catch (ControllerException e) {}
}
} catch (SipException | InvalidArgumentException | ParseException e) {
logger.error("[命令发送失败] 国标级联 回复SdpAck", e);
@@ -638,13 +615,14 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
* 安排推流
*/
private void pushProxyStream(RequestEvent evt, SIPRequest request, GbStream gbStream, ParentPlatform platform,
- CallIdHeader callIdHeader, MediaServer mediaServerItem,
+ CallIdHeader callIdHeader, MediaServer mediaServer,
int port, Boolean tcpActive, boolean mediaTransmissionTCP,
String channelId, String addressStr, String ssrc, String requesterId) {
- Boolean streamReady = mediaServerService.isStreamReady(mediaServerItem, gbStream.getApp(), gbStream.getStream());
+ Boolean streamReady = mediaServerService.isStreamReady(mediaServer, gbStream.getApp(), gbStream.getStream());
if (streamReady != null && streamReady) {
+
// 自平台内容
- SendRtpItem sendRtpItem = zlmServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId,
+ SendRtpItem sendRtpItem = mediaServerService.createSendRtpItem(mediaServer, addressStr, port, ssrc, requesterId,
gbStream.getApp(), gbStream.getStream(), channelId, mediaTransmissionTCP, platform.isRtcp());
if (sendRtpItem == null) {
@@ -665,7 +643,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
sendRtpItem.setCallId(callIdHeader.getCallId());
sendRtpItem.setFromTag(request.getFromTag());
- SIPResponse response = sendStreamAck(mediaServerItem, request, sendRtpItem, platform, evt);
+ SIPResponse response = sendStreamAck(mediaServer, request, sendRtpItem, platform, evt);
if (response != null) {
sendRtpItem.setToTag(response.getToTag());
}
@@ -684,7 +662,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
Boolean streamReady = mediaServerService.isStreamReady(mediaServerItem, gbStream.getApp(), gbStream.getStream());
if (streamReady != null && streamReady) {
// 自平台内容
- SendRtpItem sendRtpItem = zlmServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId,
+ SendRtpItem sendRtpItem = mediaServerService.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId,
gbStream.getApp(), gbStream.getStream(), channelId, mediaTransmissionTCP, platform.isRtcp());
if (sendRtpItem == null) {
@@ -794,7 +772,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
dynamicTask.stop(callIdHeader.getCallId());
redisPushStreamResponseListener.removeEvent(gbStream.getApp(), gbStream.getStream());
if (serverId.equals(userSetting.getServerId())) {
- SendRtpItem sendRtpItem = zlmServerFactory.createSendRtpItem(mediaServerItem, addressStr, finalPort, ssrc, requesterId,
+ SendRtpItem sendRtpItem = mediaServerService.createSendRtpItem(mediaServerItem, addressStr, finalPort, ssrc, requesterId,
app, stream, channelId, mediaTransmissionTCP, platform.isRtcp());
if (sendRtpItem == null) {
@@ -1074,7 +1052,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
mediaTransmissionTCP ? (tcpActive ? "TCP主动" : "TCP被动") : "UDP", sdp.getSessionName().getValue());
CallIdHeader callIdHeader = (CallIdHeader) request.getHeader(CallIdHeader.NAME);
- SendRtpItem sendRtpItem = zlmServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, gb28181Sdp.getSsrc(), requesterId,
+ SendRtpItem sendRtpItem = mediaServerService.createSendRtpItem(mediaServerItem, addressStr, port, gb28181Sdp.getSsrc(), requesterId,
device.getDeviceId(), broadcastCatch.getChannelId(),
mediaTransmissionTCP, false);
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/BroadcastNotifyMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/BroadcastNotifyMessageHandler.java
index 08be1f366..46f9642cd 100644
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/BroadcastNotifyMessageHandler.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/BroadcastNotifyMessageHandler.java
@@ -1,16 +1,15 @@
package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.notify.cmd;
-import com.alibaba.fastjson2.JSONObject;
+import com.genersoft.iot.vmp.conf.exception.ControllerException;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.session.AudioBroadcastManager;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.IMessageHandler;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.notify.NotifyMessageHandler;
-import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
import com.genersoft.iot.vmp.media.bean.MediaServer;
-import com.genersoft.iot.vmp.service.IDeviceService;
import com.genersoft.iot.vmp.media.service.IMediaServerService;
+import com.genersoft.iot.vmp.service.IDeviceService;
import com.genersoft.iot.vmp.service.IPlatformService;
import com.genersoft.iot.vmp.service.IPlayService;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
@@ -62,9 +61,6 @@ public class BroadcastNotifyMessageHandler extends SIPRequestProcessorParent imp
@Autowired
private AudioBroadcastManager audioBroadcastManager;
- @Autowired
- private ZLMServerFactory zlmServerFactory;
-
@Autowired
private IRedisCatchStorage redisCatchStorage;
@@ -155,12 +151,13 @@ public class BroadcastNotifyMessageHandler extends SIPRequestProcessorParent imp
}
}else {
// 发流
- JSONObject jsonObject = zlmServerFactory.startSendRtp(hookData.getMediaServer(), sendRtpItem);
- if (jsonObject != null && jsonObject.getInteger("code") == 0 ) {
- logger.info("[语音喊话] 自动推流成功, device: {}, channel: {}", device.getDeviceId(), targetId);
- }else {
- logger.info("[语音喊话] 推流失败, 结果: {}", jsonObject);
+ try {
+ mediaServerService.startSendRtp(hookData.getMediaServer(),null, sendRtpItem);
+ }catch (ControllerException e) {
+ logger.info("[语音喊话] 推流失败, 结果: {}", e.getMessage());
+ return;
}
+ logger.info("[语音喊话] 自动推流成功, device: {}, channel: {}", device.getDeviceId(), targetId);
}
}
}else {
diff --git a/src/main/java/com/genersoft/iot/vmp/media/bean/MediaInfo.java b/src/main/java/com/genersoft/iot/vmp/media/bean/MediaInfo.java
index d0ea368fb..4803b1c58 100644
--- a/src/main/java/com/genersoft/iot/vmp/media/bean/MediaInfo.java
+++ b/src/main/java/com/genersoft/iot/vmp/media/bean/MediaInfo.java
@@ -47,6 +47,8 @@ public class MediaInfo {
private Long aliveSecond;
@Schema(description = "数据产生速度,单位byte/s")
private Long bytesSpeed;
+ @Schema(description = "鉴权参数")
+ private String callId;
public static MediaInfo getInstance(JSONObject jsonObject, MediaServer mediaServer) {
MediaInfo mediaInfo = new MediaInfo();
@@ -345,4 +347,12 @@ public class MediaInfo {
public void setSchema(String schema) {
this.schema = schema;
}
+
+ public String getCallId() {
+ return callId;
+ }
+
+ public void setCallId(String callId) {
+ this.callId = callId;
+ }
}
diff --git a/src/main/java/com/genersoft/iot/vmp/media/event/hook/Hook.java b/src/main/java/com/genersoft/iot/vmp/media/event/hook/Hook.java
index a0dc7c32c..341e3c361 100755
--- a/src/main/java/com/genersoft/iot/vmp/media/event/hook/Hook.java
+++ b/src/main/java/com/genersoft/iot/vmp/media/event/hook/Hook.java
@@ -14,7 +14,8 @@ public class Hook {
private String mediaServerId;
- private Long createTime;
+ private Long expireTime;
+
public static Hook getInstance(HookType hookType, String app, String stream, String mediaServerId) {
Hook hookSubscribe = new Hook();
@@ -22,7 +23,7 @@ public class Hook {
hookSubscribe.setStream(stream);
hookSubscribe.setHookType(hookType);
hookSubscribe.setMediaServerId(mediaServerId);
- hookSubscribe.setCreateTime(System.currentTimeMillis());
+ hookSubscribe.setExpireTime(System.currentTimeMillis() + 5 * 60 * 1000);
return hookSubscribe;
}
@@ -50,12 +51,13 @@ public class Hook {
this.stream = stream;
}
- public Long getCreateTime() {
- return createTime;
+
+ public Long getExpireTime() {
+ return expireTime;
}
- public void setCreateTime(Long createTime) {
- this.createTime = createTime;
+ public void setExpireTime(Long expireTime) {
+ this.expireTime = expireTime;
}
public String getMediaServerId() {
diff --git a/src/main/java/com/genersoft/iot/vmp/media/event/hook/HookSubscribe.java b/src/main/java/com/genersoft/iot/vmp/media/event/hook/HookSubscribe.java
index 907e904c0..58e376195 100755
--- a/src/main/java/com/genersoft/iot/vmp/media/event/hook/HookSubscribe.java
+++ b/src/main/java/com/genersoft/iot/vmp/media/event/hook/HookSubscribe.java
@@ -58,7 +58,7 @@ public class HookSubscribe {
sendNotify(HookType.on_publish, event);
}
/**
- * 推流鉴权事件
+ * 生成录像文件事件
*/
@Async("taskExecutor")
@EventListener
@@ -79,8 +79,8 @@ public class HookSubscribe {
}
public void addSubscribe(Hook hook, HookSubscribe.Event event) {
- if (hook.getCreateTime() == null) {
- hook.setCreateTime(System.currentTimeMillis());
+ if (hook.getExpireTime() == null) {
+ hook.setExpireTime(System.currentTimeMillis() + subscribeExpire);
}
allSubscribes.put(hook.toString(), event);
allHook.put(hook.toString(), hook);
@@ -96,9 +96,9 @@ public class HookSubscribe {
*/
@Scheduled(fixedRate=subscribeExpire) //每5分钟执行一次
public void execute(){
- long expireTime = System.currentTimeMillis() - subscribeExpire;
+ long expireTime = System.currentTimeMillis();
for (Hook hook : allHook.values()) {
- if (hook.getCreateTime() < expireTime) {
+ if (hook.getExpireTime() < expireTime) {
allSubscribes.remove(hook.toString());
allHook.remove(hook.toString());
}
diff --git a/src/main/java/com/genersoft/iot/vmp/media/service/IMediaServerService.java b/src/main/java/com/genersoft/iot/vmp/media/service/IMediaServerService.java
index eea19f0ee..42f36928c 100755
--- a/src/main/java/com/genersoft/iot/vmp/media/service/IMediaServerService.java
+++ b/src/main/java/com/genersoft/iot/vmp/media/service/IMediaServerService.java
@@ -141,5 +141,10 @@ public interface IMediaServerService {
void startSendRtpPassive(MediaServer mediaServer, ParentPlatform platform, SendRtpItem sendRtpItem, Integer timeout);
- void startSendRtpStream(MediaServer mediaServer, ParentPlatform platform, SendRtpItem sendRtpItem);
+ void startSendRtp(MediaServer mediaServer, ParentPlatform platform, SendRtpItem sendRtpItem);
+
+ SendRtpItem createSendRtpItem(MediaServer mediaServerItem, String addressStr, int port, String ssrc, String requesterId, String deviceId, String channelId, boolean mediaTransmissionTCP, boolean rtcp);
+
+ SendRtpItem createSendRtpItem(MediaServer serverItem, String ip, int port, String ssrc, String platformId,
+ String app, String stream, String channelId, boolean tcp, boolean rtcp);
}
diff --git a/src/main/java/com/genersoft/iot/vmp/media/service/impl/MediaServerServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/media/service/impl/MediaServerServiceImpl.java
index 92925e433..919d04e0e 100755
--- a/src/main/java/com/genersoft/iot/vmp/media/service/impl/MediaServerServiceImpl.java
+++ b/src/main/java/com/genersoft/iot/vmp/media/service/impl/MediaServerServiceImpl.java
@@ -12,14 +12,16 @@ import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
import com.genersoft.iot.vmp.gb28181.session.SSRCFactory;
import com.genersoft.iot.vmp.media.bean.MediaInfo;
+import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.media.event.media.MediaArrivalEvent;
import com.genersoft.iot.vmp.media.event.media.MediaDepartureEvent;
import com.genersoft.iot.vmp.media.event.mediaServer.MediaServerChangeEvent;
import com.genersoft.iot.vmp.media.event.mediaServer.MediaServerDeleteEvent;
import com.genersoft.iot.vmp.media.service.IMediaNodeServerService;
import com.genersoft.iot.vmp.media.service.IMediaServerService;
-import com.genersoft.iot.vmp.media.bean.MediaServer;
+import com.genersoft.iot.vmp.media.zlm.SendRtpPortManager;
import com.genersoft.iot.vmp.media.zlm.dto.StreamAuthorityInfo;
+import com.genersoft.iot.vmp.media.zlm.dto.hook.OriginType;
import com.genersoft.iot.vmp.service.IInviteStreamService;
import com.genersoft.iot.vmp.service.bean.MediaServerLoad;
import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
@@ -83,6 +85,9 @@ public class MediaServerServiceImpl implements IMediaServerService {
@Autowired
private MediaConfig mediaConfig;
+ @Autowired
+ private SendRtpPortManager sendRtpPortManager;
+
/**
@@ -94,6 +99,8 @@ public class MediaServerServiceImpl implements IMediaServerService {
if ("rtsp".equals(event.getSchema())) {
logger.info("流变化:注册 app->{}, stream->{}", event.getApp(), event.getStream());
addCount(event.getMediaServer().getId());
+ String type = OriginType.values()[event.getMediaInfo().getOriginType()].getType();
+ redisCatchStorage.addStream(event.getMediaServer(), type, event.getApp(), event.getStream(), event.getMediaInfo());
}
}
@@ -106,7 +113,12 @@ public class MediaServerServiceImpl implements IMediaServerService {
if ("rtsp".equals(event.getSchema())) {
logger.info("流变化:注销, app->{}, stream->{}", event.getApp(), event.getStream());
removeCount(event.getMediaServer().getId());
+ MediaInfo mediaInfo = redisCatchStorage.getStreamInfo(
+ event.getApp(), event.getStream(), event.getMediaServer().getId());
+ String type = OriginType.values()[mediaInfo.getOriginType()].getType();
+ redisCatchStorage.removeStream(mediaInfo.getMediaServer().getId(), type, event.getApp(), event.getStream());
}
+
}
@@ -812,7 +824,7 @@ public class MediaServerServiceImpl implements IMediaServerService {
}
@Override
- public void startSendRtpStream(MediaServer mediaServer, ParentPlatform platform, SendRtpItem sendRtpItem) {
+ public void startSendRtp(MediaServer mediaServer, ParentPlatform platform, SendRtpItem sendRtpItem) {
IMediaNodeServerService mediaNodeServerService = nodeServerServiceMap.get(mediaServer.getType());
if (mediaNodeServerService == null) {
logger.info("[startSendRtpStream] 失败, mediaServer的类型: {},未找到对应的实现类", mediaServer.getType());
@@ -821,7 +833,10 @@ public class MediaServerServiceImpl implements IMediaServerService {
logger.info("[开始推流] rtp/{}, 目标={}:{},SSRC={}, RTCP={}", sendRtpItem.getStream(),
sendRtpItem.getIp(), sendRtpItem.getPort(), sendRtpItem.getSsrc(), sendRtpItem.isRtcp());
mediaNodeServerService.startSendRtpStream(mediaServer, sendRtpItem);
- sendPlatformStartPlayMsg(platform, sendRtpItem);
+ if (platform != null) {
+ sendPlatformStartPlayMsg(platform, sendRtpItem);
+ }
+
}
@@ -834,4 +849,50 @@ public class MediaServerServiceImpl implements IMediaServerService {
redisCatchStorage.sendPlatformStartPlayMsg(messageForPushChannel);
}
}
+
+ @Override
+ public SendRtpItem createSendRtpItem(MediaServer mediaServer, String ip, int port, String ssrc, String requesterId, String deviceId, String channelId, boolean isTcp, boolean rtcp) {
+ int localPort = sendRtpPortManager.getNextPort(mediaServer);
+ if (localPort == 0) {
+ return null;
+ }
+ SendRtpItem sendRtpItem = new SendRtpItem();
+ sendRtpItem.setIp(ip);
+ sendRtpItem.setPort(port);
+ sendRtpItem.setSsrc(ssrc);
+ sendRtpItem.setPlatformId(deviceId);
+ sendRtpItem.setDeviceId(deviceId);
+ sendRtpItem.setChannelId(channelId);
+ sendRtpItem.setTcp(isTcp);
+ sendRtpItem.setRtcp(rtcp);
+ sendRtpItem.setApp("rtp");
+ sendRtpItem.setLocalPort(localPort);
+ sendRtpItem.setServerId(userSetting.getServerId());
+ sendRtpItem.setMediaServerId(mediaServer.getId());
+ return sendRtpItem;
+ }
+
+ @Override
+ public SendRtpItem createSendRtpItem(MediaServer serverItem, String ip, int port, String ssrc, String platformId,
+ String app, String stream, String channelId, boolean tcp, boolean rtcp){
+
+ int localPort = sendRtpPortManager.getNextPort(serverItem);
+ if (localPort == 0) {
+ return null;
+ }
+ SendRtpItem sendRtpItem = new SendRtpItem();
+ sendRtpItem.setIp(ip);
+ sendRtpItem.setPort(port);
+ sendRtpItem.setSsrc(ssrc);
+ sendRtpItem.setApp(app);
+ sendRtpItem.setStream(stream);
+ sendRtpItem.setPlatformId(platformId);
+ sendRtpItem.setChannelId(channelId);
+ sendRtpItem.setTcp(tcp);
+ sendRtpItem.setLocalPort(localPort);
+ sendRtpItem.setServerId(userSetting.getServerId());
+ sendRtpItem.setMediaServerId(serverItem.getId());
+ sendRtpItem.setRtcp(rtcp);
+ return sendRtpItem;
+ }
}
diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaListManager.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaListManager.java
index 84df2e785..80699d20e 100755
--- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaListManager.java
+++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaListManager.java
@@ -3,15 +3,13 @@ package com.genersoft.iot.vmp.media.zlm;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.gb28181.bean.GbStream;
import com.genersoft.iot.vmp.media.bean.MediaServer;
-import com.genersoft.iot.vmp.media.event.hook.HookSubscribe;
-import com.genersoft.iot.vmp.media.zlm.dto.*;
-import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam;
import com.genersoft.iot.vmp.media.service.IMediaServerService;
-import com.genersoft.iot.vmp.service.IStreamProxyService;
+import com.genersoft.iot.vmp.media.zlm.dto.ChannelOnlineEvent;
+import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
+import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam;
import com.genersoft.iot.vmp.service.IStreamPushService;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
import com.genersoft.iot.vmp.storager.dao.GbStreamMapper;
-import com.genersoft.iot.vmp.storager.dao.PlatformGbStreamMapper;
import com.genersoft.iot.vmp.storager.dao.StreamPushMapper;
import com.genersoft.iot.vmp.utils.DateUtil;
import org.slf4j.Logger;
@@ -20,7 +18,7 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.text.ParseException;
-import java.util.*;
+import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
@@ -37,26 +35,16 @@ public class ZLMMediaListManager {
@Autowired
private GbStreamMapper gbStreamMapper;
- @Autowired
- private PlatformGbStreamMapper platformGbStreamMapper;
-
@Autowired
private IStreamPushService streamPushService;
- @Autowired
- private IStreamProxyService streamProxyService;
@Autowired
private StreamPushMapper streamPushMapper;
- @Autowired
- private HookSubscribe subscribe;
-
@Autowired
private UserSetting userSetting;
- @Autowired
- private ZLMServerFactory zlmServerFactory;
@Autowired
private IMediaServerService mediaServerService;
diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaNodeServerService.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaNodeServerService.java
index c1bf27531..7d39605a2 100644
--- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaNodeServerService.java
+++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaNodeServerService.java
@@ -322,11 +322,23 @@ public class ZLMMediaNodeServerService implements IMediaNodeServerService {
if (timeout != null) {
param.put("close_delay_ms", timeout);
}
+ if (!sendRtpItem.isTcp()) {
+ // 开启rtcp保活
+ param.put("udp_rtcp_timeout", sendRtpItem.isRtcp()? "1":"0");
+ }
+ if (!sendRtpItem.isTcpActive()) {
+ param.put("dst_url",sendRtpItem.getIp());
+ param.put("dst_port", sendRtpItem.getPort());
+ }
JSONObject jsonObject = zlmServerFactory.startSendRtpPassive(mediaServer, param, null);
if (jsonObject == null || jsonObject.getInteger("code") != 0 ) {
+ logger.error("启动监听TCP被动推流失败: {}, 参数:{}", jsonObject.getString("msg"), JSON.toJSONString(param));
throw new ControllerException(jsonObject.getInteger("code"), jsonObject.getString("msg"));
}
+ logger.info("调用ZLM-TCP被动推流接口, 结果: {}", jsonObject);
+ logger.info("启动监听TCP被动推流成功[ {}/{} ],{}->{}:{}, " , sendRtpItem.getApp(), sendRtpItem.getStream(),
+ jsonObject.getString("local_port"), param.get("dst_url"), param.get("dst_port"));
}
@Override
@@ -347,7 +359,7 @@ public class ZLMMediaNodeServerService implements IMediaNodeServerService {
}
param.put("dst_url", sendRtpItem.getIp());
param.put("dst_port", sendRtpItem.getPort());
- JSONObject jsonObject = zlmServerFactory.startSendRtpStream(mediaServer, param);
+ JSONObject jsonObject = zlmresTfulUtils.startSendRtp(mediaServer, param);
if (jsonObject == null || jsonObject.getInteger("code") != 0 ) {
throw new ControllerException(jsonObject.getInteger("code"), jsonObject.getString("msg"));
}
diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaServerStatusManger.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaServerStatusManger.java
index d744850dc..cdf1e3f27 100644
--- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaServerStatusManger.java
+++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaServerStatusManger.java
@@ -55,6 +55,9 @@ public class ZLMMediaServerStatusManger {
@Value("${server.port}")
private Integer serverPort;
+ @Value("${server.servlet.context-path:}")
+ private String serverServletContextPath;
+
@Autowired
private UserSetting userSetting;
@@ -239,7 +242,7 @@ public class ZLMMediaServerStatusManger {
logger.info("[媒体服务节点] 正在设置 :{} -> {}:{}",
mediaServerItem.getId(), mediaServerItem.getIp(), mediaServerItem.getHttpPort());
String protocol = sslEnabled ? "https" : "http";
- String hookPrefix = String.format("%s://%s:%s/index/hook", protocol, mediaServerItem.getHookIp(), serverPort);
+ String hookPrefix = String.format("%s://%s:%s%s/index/hook", protocol, mediaServerItem.getHookIp(), serverPort, (serverServletContextPath == null || "/".equals(serverServletContextPath)) ? "" : serverServletContextPath);
Map param = new HashMap<>();
param.put("api.secret",mediaServerItem.getSecret()); // -profile:v Baseline
diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMServerFactory.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMServerFactory.java
index 14025cecc..3ba48d3b9 100755
--- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMServerFactory.java
+++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMServerFactory.java
@@ -5,7 +5,6 @@ import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.common.CommonCallback;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
-import com.genersoft.iot.vmp.media.event.hook.HookSubscribe;
import com.genersoft.iot.vmp.media.bean.MediaServer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -26,9 +25,6 @@ public class ZLMServerFactory {
@Autowired
private UserSetting userSetting;
- @Autowired
- private HookSubscribe hookSubscribe;
-
@Autowired
private SendRtpPortManager sendRtpPortManager;
@@ -156,72 +152,6 @@ public class ZLMServerFactory {
}
- /**
- * 创建一个国标推流
- * @param ip 推流ip
- * @param port 推流端口
- * @param ssrc 推流唯一标识
- * @param platformId 平台id
- * @param channelId 通道id
- * @param tcp 是否为tcp
- * @return SendRtpItem
- */
- public SendRtpItem createSendRtpItem(MediaServer serverItem, String ip, int port, String ssrc, String platformId,
- String deviceId, String channelId, boolean tcp, boolean rtcp){
-
- int localPort = sendRtpPortManager.getNextPort(serverItem);
- if (localPort == 0) {
- return null;
- }
- SendRtpItem sendRtpItem = new SendRtpItem();
- sendRtpItem.setIp(ip);
- sendRtpItem.setPort(port);
- sendRtpItem.setSsrc(ssrc);
- sendRtpItem.setPlatformId(platformId);
- sendRtpItem.setDeviceId(deviceId);
- sendRtpItem.setChannelId(channelId);
- sendRtpItem.setTcp(tcp);
- sendRtpItem.setRtcp(rtcp);
- sendRtpItem.setApp("rtp");
- sendRtpItem.setLocalPort(localPort);
- sendRtpItem.setServerId(userSetting.getServerId());
- sendRtpItem.setMediaServerId(serverItem.getId());
- return sendRtpItem;
- }
-
- /**
- * 创建一个直播推流
- * @param ip 推流ip
- * @param port 推流端口
- * @param ssrc 推流唯一标识
- * @param platformId 平台id
- * @param channelId 通道id
- * @param tcp 是否为tcp
- * @return SendRtpItem
- */
- public SendRtpItem createSendRtpItem(MediaServer serverItem, String ip, int port, String ssrc, String platformId,
- String app, String stream, String channelId, boolean tcp, boolean rtcp){
-
- int localPort = sendRtpPortManager.getNextPort(serverItem);
- if (localPort == 0) {
- return null;
- }
- SendRtpItem sendRtpItem = new SendRtpItem();
- sendRtpItem.setIp(ip);
- sendRtpItem.setPort(port);
- sendRtpItem.setSsrc(ssrc);
- sendRtpItem.setApp(app);
- sendRtpItem.setStream(stream);
- sendRtpItem.setPlatformId(platformId);
- sendRtpItem.setChannelId(channelId);
- sendRtpItem.setTcp(tcp);
- sendRtpItem.setLocalPort(localPort);
- sendRtpItem.setServerId(userSetting.getServerId());
- sendRtpItem.setMediaServerId(serverItem.getId());
- sendRtpItem.setRtcp(rtcp);
- return sendRtpItem;
- }
-
/**
* 调用zlm RESTFUL API —— startSendRtp
*/
@@ -240,17 +170,6 @@ public class ZLMServerFactory {
return zlmresTfulUtils.startSendRtpPassive(mediaServerItem, param, callback);
}
- /**
- * 查询待转推的流是否就绪
- */
- public Boolean isRtpReady(MediaServer mediaServerItem, String streamId) {
- JSONObject mediaInfo = zlmresTfulUtils.getMediaInfo(mediaServerItem,"rtp", "rtsp", streamId);
- if (mediaInfo.getInteger("code") == -2) {
- return null;
- }
- return (mediaInfo.getInteger("code") == 0 && mediaInfo.getBoolean("online"));
- }
-
/**
* 查询待转推的流是否就绪
*/
@@ -286,27 +205,6 @@ public class ZLMServerFactory {
return mediaInfo.getInteger("totalReaderCount");
}
- /**
- * 调用zlm RESTful API —— stopSendRtp
- */
- public Boolean stopSendRtpStream(MediaServer mediaServerItem, Mapparam) {
- if (mediaServerItem == null) {
- logger.error("[停止RTP推流] 失败: 媒体节点为NULL");
- return false;
- }
- Boolean result = false;
- JSONObject jsonObject = zlmresTfulUtils.stopSendRtp(mediaServerItem, param);
- if (jsonObject == null) {
- logger.error("[停止RTP推流] 失败: 请检查ZLM服务");
- } else if (jsonObject.getInteger("code") == 0) {
- result= true;
- logger.info("[停止RTP推流] 成功");
- } else {
- logger.warn("[停止RTP推流] 失败: {}, 参数:{}->\r\n{}",jsonObject.getString("msg"), JSON.toJSON(param), jsonObject);
- }
- return result;
- }
-
public JSONObject startSendRtp(MediaServer mediaInfo, SendRtpItem sendRtpItem) {
String is_Udp = sendRtpItem.isTcp() ? "0" : "1";
logger.info("rtp/{}开始推流, 目标={}:{},SSRC={}", sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort(), sendRtpItem.getSsrc());
diff --git a/src/main/java/com/genersoft/iot/vmp/service/IUserApiKeyService.java b/src/main/java/com/genersoft/iot/vmp/service/IUserApiKeyService.java
new file mode 100644
index 000000000..b3cc58053
--- /dev/null
+++ b/src/main/java/com/genersoft/iot/vmp/service/IUserApiKeyService.java
@@ -0,0 +1,25 @@
+package com.genersoft.iot.vmp.service;
+
+import com.genersoft.iot.vmp.storager.dao.dto.UserApiKey;
+import com.github.pagehelper.PageInfo;
+
+public interface IUserApiKeyService {
+ int addApiKey(UserApiKey userApiKey);
+
+ boolean isApiKeyExists(String apiKey);
+
+ PageInfo getUserApiKeys(int page, int count);
+
+ int enable(Integer id);
+
+ int disable(Integer id);
+
+ int remark(Integer id, String remark);
+
+ int delete(Integer id);
+
+ UserApiKey getUserApiKeyById(Integer id);
+
+ int reset(Integer id, String apiKey);
+
+}
diff --git a/src/main/java/com/genersoft/iot/vmp/service/IUserService.java b/src/main/java/com/genersoft/iot/vmp/service/IUserService.java
index 7e2a8395b..1e9b7247c 100755
--- a/src/main/java/com/genersoft/iot/vmp/service/IUserService.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/IUserService.java
@@ -11,6 +11,8 @@ public interface IUserService {
boolean changePassword(int id, String password);
+ User getUserById(int id);
+
User getUserByUsername(String username);
int addUser(User user);
diff --git a/src/main/java/com/genersoft/iot/vmp/service/bean/RequestPushStreamMsg.java b/src/main/java/com/genersoft/iot/vmp/service/bean/RequestPushStreamMsg.java
index 5827d0132..9b446f68c 100755
--- a/src/main/java/com/genersoft/iot/vmp/service/bean/RequestPushStreamMsg.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/bean/RequestPushStreamMsg.java
@@ -1,5 +1,7 @@
package com.genersoft.iot.vmp.service.bean;
+import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
+
/**
* redis消息:请求下级推送流信息
* @author lin
@@ -80,6 +82,22 @@ public class RequestPushStreamMsg {
return requestPushStreamMsg;
}
+ public static RequestPushStreamMsg getInstance(SendRtpItem sendRtpItem) {
+ RequestPushStreamMsg requestPushStreamMsg = new RequestPushStreamMsg();
+ requestPushStreamMsg.setMediaServerId(sendRtpItem.getMediaServerId());
+ requestPushStreamMsg.setApp(sendRtpItem.getApp());
+ requestPushStreamMsg.setStream(sendRtpItem.getStream());
+ requestPushStreamMsg.setIp(sendRtpItem.getIp());
+ requestPushStreamMsg.setPort(sendRtpItem.getPort());
+ requestPushStreamMsg.setSsrc(sendRtpItem.getSsrc());
+ requestPushStreamMsg.setTcp(sendRtpItem.isTcp());
+ requestPushStreamMsg.setSrcPort(sendRtpItem.getLocalPort());
+ requestPushStreamMsg.setPt(sendRtpItem.getPt());
+ requestPushStreamMsg.setPs(sendRtpItem.isUsePs());
+ requestPushStreamMsg.setOnlyAudio(sendRtpItem.isOnlyAudio());
+ return requestPushStreamMsg;
+ }
+
public String getMediaServerId() {
return mediaServerId;
}
diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java
index aa39f4186..6554817b3 100755
--- a/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java
@@ -1,7 +1,9 @@
package com.genersoft.iot.vmp.service.impl;
import com.baomidou.dynamic.datasource.annotation.DS;
-import com.genersoft.iot.vmp.common.*;
+import com.genersoft.iot.vmp.common.InviteInfo;
+import com.genersoft.iot.vmp.common.InviteSessionStatus;
+import com.genersoft.iot.vmp.common.InviteSessionType;
import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException;
@@ -11,13 +13,12 @@ import com.genersoft.iot.vmp.gb28181.session.SSRCFactory;
import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
import com.genersoft.iot.vmp.gb28181.utils.SipUtils;
+import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.media.event.hook.HookData;
+import com.genersoft.iot.vmp.media.event.hook.HookSubscribe;
import com.genersoft.iot.vmp.media.event.media.MediaDepartureEvent;
import com.genersoft.iot.vmp.media.event.mediaServer.MediaSendRtpStoppedEvent;
-import com.genersoft.iot.vmp.media.event.hook.HookSubscribe;
import com.genersoft.iot.vmp.media.service.IMediaServerService;
-import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
-import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.service.IInviteStreamService;
import com.genersoft.iot.vmp.service.IPlatformService;
import com.genersoft.iot.vmp.service.IPlayService;
@@ -41,7 +42,9 @@ import javax.sip.InvalidArgumentException;
import javax.sip.ResponseEvent;
import javax.sip.SipException;
import java.text.ParseException;
-import java.util.*;
+import java.util.List;
+import java.util.UUID;
+import java.util.Vector;
/**
* @author lin
@@ -75,9 +78,6 @@ public class PlatformServiceImpl implements IPlatformService {
@Autowired
private DynamicTask dynamicTask;
- @Autowired
- private ZLMServerFactory zlmServerFactory;
-
@Autowired
private SubscribeHolder subscribeHolder;
@@ -87,9 +87,6 @@ public class PlatformServiceImpl implements IPlatformService {
@Autowired
private UserSetting userSetting;
- @Autowired
- private HookSubscribe subscribe;
-
@Autowired
private VideoStreamSessionManager streamSession;
@@ -437,11 +434,7 @@ public class PlatformServiceImpl implements IPlatformService {
ssrcFactory.releaseSsrc(sendRtpItem.getMediaServerId(), sendRtpItem.getSsrc());
redisCatchStorage.deleteSendRTPServer(platformId, sendRtpItem.getChannelId(), null, null);
MediaServer mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
- Map param = new HashMap<>(3);
- param.put("vhost", "__defaultVhost__");
- param.put("app", sendRtpItem.getApp());
- param.put("stream", sendRtpItem.getStream());
- zlmServerFactory.stopSendRtpStream(mediaInfo, param);
+ mediaServerService.stopSendRtp(mediaInfo, sendRtpItem.getApp(), sendRtpItem.getStream(), null);
}
}
}
diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java
index a6a9b7751..8027c7e94 100755
--- a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java
@@ -30,6 +30,9 @@ import com.genersoft.iot.vmp.service.IDeviceChannelService;
import com.genersoft.iot.vmp.service.IDeviceService;
import com.genersoft.iot.vmp.service.IInviteStreamService;
import com.genersoft.iot.vmp.service.IPlayService;
+import com.genersoft.iot.vmp.media.event.hook.HookSubscribe;
+import com.genersoft.iot.vmp.media.bean.MediaServer;
+import com.genersoft.iot.vmp.service.*;
import com.genersoft.iot.vmp.service.bean.*;
import com.genersoft.iot.vmp.service.redisMsg.RedisGbPlayMsgListener;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
@@ -1044,9 +1047,7 @@ public class PlayServiceImpl implements IPlayService {
};
Hook hook = Hook.getInstance(HookType.on_record_mp4, "rtp", ssrcInfo.getStream(), mediaServerItem.getId());
// 设置过期时间,下载失败时自动处理订阅数据
-// long difference = DateUtil.getDifference(startTime, endTime)/1000;
-// Instant expiresInstant = Instant.now().plusSeconds(TimeUnit.MINUTES.toSeconds(difference * 2));
-// hookSubscribe.setExpires(expiresInstant);
+ hook.setExpireTime(System.currentTimeMillis() + 24 * 60 * 60 * 1000);
subscribe.addSubscribe(hook, hookEventForRecord);
});
} catch (InvalidArgumentException | SipException | ParseException e) {
@@ -1419,11 +1420,8 @@ public class PlayServiceImpl implements IPlayService {
MediaServer mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
if (mediaInfo == null) {
- RequestPushStreamMsg requestPushStreamMsg = RequestPushStreamMsg.getInstance(
- sendRtpItem.getMediaServerId(), sendRtpItem.getApp(), sendRtpItem.getStream(),
- sendRtpItem.getIp(), sendRtpItem.getPort(), sendRtpItem.getSsrc(), sendRtpItem.isTcp(),
- sendRtpItem.getLocalPort(), sendRtpItem.getPt(), sendRtpItem.isUsePs(), sendRtpItem.isOnlyAudio());
- redisGbPlayMsgListener.sendMsgForStartSendRtpStream(sendRtpItem.getServerId(), requestPushStreamMsg, json -> {
+ RequestPushStreamMsg requestPushStreamMsg = RequestPushStreamMsg.getInstance(sendRtpItem);
+ redisGbPlayMsgListener.sendMsgForStartSendRtpStream(sendRtpItem.getServerId(), requestPushStreamMsg, () -> {
startSendRtpStreamFailHand(sendRtpItem, platform, callIdHeader);
});
} else {
@@ -1431,7 +1429,7 @@ public class PlayServiceImpl implements IPlayService {
if (sendRtpItem.isTcpActive()) {
mediaServerService.startSendRtpPassive(mediaInfo, platform, sendRtpItem, null);
} else {
- mediaServerService.startSendRtpStream(mediaInfo, platform, sendRtpItem);
+ mediaServerService.startSendRtp(mediaInfo, platform, sendRtpItem);
}
}catch (ControllerException e) {
logger.error("RTP推流失败: {}", e.getMessage());
diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java
index 6692aa8f9..8ddf4f878 100755
--- a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java
@@ -9,17 +9,15 @@ import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.exception.ControllerException;
import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent;
import com.genersoft.iot.vmp.media.bean.MediaInfo;
+import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.media.event.hook.Hook;
+import com.genersoft.iot.vmp.media.event.hook.HookSubscribe;
import com.genersoft.iot.vmp.media.event.hook.HookType;
import com.genersoft.iot.vmp.media.event.media.MediaArrivalEvent;
import com.genersoft.iot.vmp.media.event.media.MediaDepartureEvent;
import com.genersoft.iot.vmp.media.event.media.MediaNotFoundEvent;
import com.genersoft.iot.vmp.media.service.IMediaServerService;
-import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
-import com.genersoft.iot.vmp.media.event.hook.HookSubscribe;
-import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem;
-import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam;
import com.genersoft.iot.vmp.service.IGbStreamService;
import com.genersoft.iot.vmp.service.IStreamProxyService;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
@@ -64,9 +62,6 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
@Autowired
private IVideoManagerStorage videoManagerStorager;
- @Autowired
- private ZLMServerFactory zlmServerFactory;
-
@Autowired
private StreamProxyMapper streamProxyMapper;
@@ -509,18 +504,18 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
String type = "PULL";
// 发送redis消息
- List onStreamChangedHookParams = redisCatchStorage.getStreams(mediaServerId, type);
- if (onStreamChangedHookParams.size() > 0) {
- for (OnStreamChangedHookParam onStreamChangedHookParam : onStreamChangedHookParams) {
+ List mediaInfoList = redisCatchStorage.getStreams(mediaServerId, type);
+ if (mediaInfoList.size() > 0) {
+ for (MediaInfo mediaInfo : mediaInfoList) {
JSONObject jsonObject = new JSONObject();
jsonObject.put("serverId", userSetting.getServerId());
- jsonObject.put("app", onStreamChangedHookParam.getApp());
- jsonObject.put("stream", onStreamChangedHookParam.getStream());
+ jsonObject.put("app", mediaInfo.getApp());
+ jsonObject.put("stream", mediaInfo.getStream());
jsonObject.put("register", false);
jsonObject.put("mediaServerId", mediaServerId);
redisCatchStorage.sendStreamChangeMsg(type, jsonObject);
// 移除redis内流的信息
- redisCatchStorage.removeStream(mediaServerId, type, onStreamChangedHookParam.getApp(), onStreamChangedHookParam.getStream());
+ redisCatchStorage.removeStream(mediaServerId, type, mediaInfo.getApp(), mediaInfo.getStream());
}
}
}
@@ -538,8 +533,8 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
private void syncPullStream(String mediaServerId){
MediaServer mediaServer = mediaServerService.getOne(mediaServerId);
if (mediaServer != null) {
- List allPullStream = redisCatchStorage.getStreams(mediaServerId, "PULL");
- if (!allPullStream.isEmpty()) {
+ List mediaInfoList = redisCatchStorage.getStreams(mediaServerId, "PULL");
+ if (!mediaInfoList.isEmpty()) {
List mediaList = mediaServerService.getMediaList(mediaServer, null, null, null);
Map stringStreamInfoMap = new HashMap<>();
if (mediaList != null && !mediaList.isEmpty()) {
diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java
index 652bcdd69..c849358b4 100755
--- a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java
+++ b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java
@@ -156,10 +156,10 @@ public class StreamPushServiceImpl implements IStreamPushService {
@EventListener
public void onApplicationEvent(MediaDepartureEvent event) {
// 兼容流注销时类型从redis记录获取
- OnStreamChangedHookParam onStreamChangedHookParam = redisCatchStorage.getStreamInfo(
+ MediaInfo mediaInfo = redisCatchStorage.getStreamInfo(
event.getApp(), event.getStream(), event.getMediaServer().getId());
- if (onStreamChangedHookParam != null) {
- String type = OriginType.values()[onStreamChangedHookParam.getOriginType()].getType();
+ if (mediaInfo != null) {
+ String type = OriginType.values()[mediaInfo.getOriginType()].getType();
redisCatchStorage.removeStream(event.getMediaServer().getId(), type, event.getApp(), event.getStream());
if ("PUSH".equalsIgnoreCase(type)) {
// 冗余数据,自己系统中自用
@@ -302,8 +302,8 @@ public class StreamPushServiceImpl implements IStreamPushService {
List pushList = getPushList(mediaServerId);
Map pushItemMap = new HashMap<>();
// redis记录
- List onStreamChangedHookParams = redisCatchStorage.getStreams(mediaServerId, "PUSH");
- Map streamInfoPushItemMap = new HashMap<>();
+ List mediaInfoList = redisCatchStorage.getStreams(mediaServerId, "PUSH");
+ Map streamInfoPushItemMap = new HashMap<>();
if (pushList.size() > 0) {
for (StreamPushItem streamPushItem : pushList) {
if (ObjectUtils.isEmpty(streamPushItem.getGbId())) {
@@ -311,9 +311,9 @@ public class StreamPushServiceImpl implements IStreamPushService {
}
}
}
- if (onStreamChangedHookParams.size() > 0) {
- for (OnStreamChangedHookParam onStreamChangedHookParam : onStreamChangedHookParams) {
- streamInfoPushItemMap.put(onStreamChangedHookParam.getApp() + onStreamChangedHookParam.getStream(), onStreamChangedHookParam);
+ if (mediaInfoList.size() > 0) {
+ for (MediaInfo mediaInfo : mediaInfoList) {
+ streamInfoPushItemMap.put(mediaInfo.getApp() + mediaInfo.getStream(), mediaInfo);
}
}
// 获取所有推流鉴权信息,清理过期的
@@ -352,21 +352,21 @@ public class StreamPushServiceImpl implements IStreamPushService {
}
}
- Collection offlineOnStreamChangedHookParamList = streamInfoPushItemMap.values();
- if (offlineOnStreamChangedHookParamList.size() > 0) {
+ Collection mediaInfos = streamInfoPushItemMap.values();
+ if (mediaInfos.size() > 0) {
String type = "PUSH";
- for (OnStreamChangedHookParam offlineOnStreamChangedHookParam : offlineOnStreamChangedHookParamList) {
+ for (MediaInfo mediaInfo : mediaInfos) {
JSONObject jsonObject = new JSONObject();
jsonObject.put("serverId", userSetting.getServerId());
- jsonObject.put("app", offlineOnStreamChangedHookParam.getApp());
- jsonObject.put("stream", offlineOnStreamChangedHookParam.getStream());
+ jsonObject.put("app", mediaInfo.getApp());
+ jsonObject.put("stream", mediaInfo.getStream());
jsonObject.put("register", false);
jsonObject.put("mediaServerId", mediaServerId);
redisCatchStorage.sendStreamChangeMsg(type, jsonObject);
// 移除redis内流的信息
- redisCatchStorage.removeStream(mediaServerItem.getId(), "PUSH", offlineOnStreamChangedHookParam.getApp(), offlineOnStreamChangedHookParam.getStream());
+ redisCatchStorage.removeStream(mediaServerItem.getId(), "PUSH", mediaInfo.getApp(), mediaInfo.getStream());
// 冗余数据,自己系统中自用
- redisCatchStorage.removePushListItem(offlineOnStreamChangedHookParam.getApp(), offlineOnStreamChangedHookParam.getStream(), mediaServerItem.getId());
+ redisCatchStorage.removePushListItem(mediaInfo.getApp(), mediaInfo.getStream(), mediaServerItem.getId());
}
}
@@ -391,21 +391,21 @@ public class StreamPushServiceImpl implements IStreamPushService {
// 发送流停止消息
String type = "PUSH";
// 发送redis消息
- List streamInfoList = redisCatchStorage.getStreams(mediaServerId, type);
- if (streamInfoList.size() > 0) {
- for (OnStreamChangedHookParam onStreamChangedHookParam : streamInfoList) {
+ List mediaInfoList = redisCatchStorage.getStreams(mediaServerId, type);
+ if (mediaInfoList.size() > 0) {
+ for (MediaInfo mediaInfo : mediaInfoList) {
// 移除redis内流的信息
- redisCatchStorage.removeStream(mediaServerId, type, onStreamChangedHookParam.getApp(), onStreamChangedHookParam.getStream());
+ redisCatchStorage.removeStream(mediaServerId, type, mediaInfo.getApp(), mediaInfo.getStream());
JSONObject jsonObject = new JSONObject();
jsonObject.put("serverId", userSetting.getServerId());
- jsonObject.put("app", onStreamChangedHookParam.getApp());
- jsonObject.put("stream", onStreamChangedHookParam.getStream());
+ jsonObject.put("app", mediaInfo.getApp());
+ jsonObject.put("stream", mediaInfo.getStream());
jsonObject.put("register", false);
jsonObject.put("mediaServerId", mediaServerId);
redisCatchStorage.sendStreamChangeMsg(type, jsonObject);
// 冗余数据,自己系统中自用
- redisCatchStorage.removePushListItem(onStreamChangedHookParam.getApp(), onStreamChangedHookParam.getStream(), mediaServerId);
+ redisCatchStorage.removePushListItem(mediaInfo.getApp(), mediaInfo.getStream(), mediaServerId);
}
}
}
diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/UserApiKeyServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/UserApiKeyServiceImpl.java
new file mode 100644
index 000000000..85ee4f0f5
--- /dev/null
+++ b/src/main/java/com/genersoft/iot/vmp/service/impl/UserApiKeyServiceImpl.java
@@ -0,0 +1,80 @@
+package com.genersoft.iot.vmp.service.impl;
+
+import com.baomidou.dynamic.datasource.annotation.DS;
+import com.genersoft.iot.vmp.service.IUserApiKeyService;
+import com.genersoft.iot.vmp.storager.dao.UserApiKeyMapper;
+import com.genersoft.iot.vmp.storager.dao.dto.UserApiKey;
+import com.github.pagehelper.PageHelper;
+import com.github.pagehelper.PageInfo;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.cache.annotation.CacheEvict;
+import org.springframework.cache.annotation.Cacheable;
+import org.springframework.data.redis.core.RedisTemplate;
+import org.springframework.stereotype.Service;
+
+import java.util.List;
+
+@Service
+@DS("master")
+public class UserApiKeyServiceImpl implements IUserApiKeyService {
+
+ @Autowired
+ UserApiKeyMapper userApiKeyMapper;
+
+ @Autowired
+ private RedisTemplate