Merge branch 'refs/heads/master' into dev/abl支持

# Conflicts:
#	src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java
This commit is contained in:
648540858
2024-04-11 22:47:03 +08:00
61 changed files with 1740 additions and 617 deletions

View File

@@ -9,6 +9,7 @@ import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.boot.web.servlet.ServletComponentScan;
import org.springframework.boot.web.servlet.support.SpringBootServletInitializer;
import org.springframework.cache.annotation.EnableCaching;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.scheduling.annotation.EnableScheduling;
@@ -24,6 +25,7 @@ import java.util.Collections;
@ServletComponentScan("com.genersoft.iot.vmp.conf")
@SpringBootApplication
@EnableScheduling
@EnableCaching
public class VManageBootstrap extends SpringBootServletInitializer {
private final static Logger logger = LoggerFactory.getLogger(VManageBootstrap.class);

View File

@@ -1,6 +1,8 @@
package com.genersoft.iot.vmp.conf;
import org.apache.ibatis.logging.stdout.StdOutImpl;
import org.apache.ibatis.mapping.DatabaseIdProvider;
import org.apache.ibatis.mapping.VendorDatabaseIdProvider;
import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.SqlSessionFactoryBean;
import org.springframework.beans.factory.annotation.Autowired;
@@ -9,6 +11,7 @@ import org.springframework.context.annotation.Configuration;
import org.springframework.core.annotation.Order;
import javax.sql.DataSource;
import java.util.Properties;
/**
* 配置mybatis
@@ -21,7 +24,29 @@ public class MybatisConfig {
private UserSetting userSetting;
@Bean
public SqlSessionFactory sqlSessionFactory(DataSource dataSource) throws Exception {
public DatabaseIdProvider databaseIdProvider() {
VendorDatabaseIdProvider databaseIdProvider = new VendorDatabaseIdProvider();
Properties properties = new Properties();
properties.setProperty("Oracle", "oracle");
properties.setProperty("MySQL", "mysql");
properties.setProperty("DB2", "db2");
properties.setProperty("Derby", "derby");
properties.setProperty("H2", "h2");
properties.setProperty("HSQL", "hsql");
properties.setProperty("Informix", "informix");
properties.setProperty("MS-SQL", "ms-sql");
properties.setProperty("PostgreSQL", "postgresql");
properties.setProperty("Sybase", "sybase");
properties.setProperty("Hana", "hana");
properties.setProperty("DM", "dm");
properties.setProperty("KingbaseES", "kingbase");
properties.setProperty("KingBase8", "kingbase");
databaseIdProvider.setProperties(properties);
return databaseIdProvider;
}
@Bean
public SqlSessionFactory sqlSessionFactory(DataSource dataSource, DatabaseIdProvider databaseIdProvider) throws Exception {
final SqlSessionFactoryBean sqlSessionFactory = new SqlSessionFactoryBean();
sqlSessionFactory.setDataSource(dataSource);
org.apache.ibatis.session.Configuration config = new org.apache.ibatis.session.Configuration();
@@ -30,6 +55,7 @@ public class MybatisConfig {
}
config.setMapUnderscoreToCamelCase(true);
sqlSessionFactory.setConfiguration(config);
sqlSessionFactory.setDatabaseIdProvider(databaseIdProvider);
return sqlSessionFactory.getObject();
}

View File

@@ -51,8 +51,11 @@ public class JwtAuthenticationFilter extends OncePerRequestFilter {
if (StringUtils.isBlank(jwt)) {
jwt = request.getParameter(JwtUtils.getHeader());
if (StringUtils.isBlank(jwt)) {
chain.doFilter(request, response);
return;
jwt = request.getHeader(JwtUtils.getApiKeyHeader());
if (StringUtils.isBlank(jwt)) {
chain.doFilter(request, response);
return;
}
}
}

View File

@@ -1,8 +1,12 @@
package com.genersoft.iot.vmp.conf.security;
import com.genersoft.iot.vmp.conf.security.dto.JwtUser;
import com.genersoft.iot.vmp.service.IUserApiKeyService;
import com.genersoft.iot.vmp.service.IUserService;
import com.genersoft.iot.vmp.storager.dao.dto.User;
import com.genersoft.iot.vmp.storager.dao.dto.UserApiKey;
import org.jose4j.jwk.JsonWebKey;
import org.jose4j.jwk.JsonWebKeySet;
import org.jose4j.jwk.RsaJsonWebKey;
import org.jose4j.jwk.RsaJwkGenerator;
import org.jose4j.jws.AlgorithmIdentifiers;
@@ -20,8 +24,13 @@ import org.springframework.beans.factory.InitializingBean;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.util.List;
import java.util.Map;
@Component
public class JwtUtils implements InitializingBean {
@@ -30,6 +39,8 @@ public class JwtUtils implements InitializingBean {
public static final String HEADER = "access-token";
public static final String API_KEY_HEADER = "api-key";
private static final String AUDIENCE = "Audience";
private static final String keyId = "3e79646c4dbc408383a9eed09f2b85ae";
@@ -37,17 +48,28 @@ public class JwtUtils implements InitializingBean {
/**
* token过期时间(分钟)
*/
public static final long expirationTime = 30 * 24 * 60;
public static final long EXPIRATION_TIME = 30 * 24 * 60;
private static RsaJsonWebKey rsaJsonWebKey;
private static IUserService userService;
private static IUserApiKeyService userApiKeyService;
public static String getApiKeyHeader() {
return API_KEY_HEADER;
}
@Resource
public void setUserService(IUserService userService) {
JwtUtils.userService = userService;
}
@Resource
public void setUserApiKeyService(IUserApiKeyService userApiKeyService) {
JwtUtils.userApiKeyService = userApiKeyService;
}
@Override
public void afterPropertiesSet() {
try {
@@ -59,17 +81,34 @@ public class JwtUtils implements InitializingBean {
/**
* 创建密钥对
*
* @throws JoseException JoseException
*/
private RsaJsonWebKey generateRsaJsonWebKey() throws JoseException {
// 生成一个RSA密钥对该密钥对将用于JWT的签名和验证包装在JWK中
RsaJsonWebKey rsaJsonWebKey = RsaJwkGenerator.generateJwk(2048);
// 给JWK一个密钥ID
rsaJsonWebKey.setKeyId(keyId);
RsaJsonWebKey rsaJsonWebKey = null;
try (BufferedReader reader = new BufferedReader(new InputStreamReader(getClass().getClassLoader().getResourceAsStream("/jwk.json"), StandardCharsets.UTF_8))) {
String jwkJson = reader.readLine();
JsonWebKeySet jsonWebKeySet = new JsonWebKeySet(jwkJson);
List<JsonWebKey> jsonWebKeys = jsonWebKeySet.getJsonWebKeys();
if (!jsonWebKeys.isEmpty()) {
JsonWebKey jsonWebKey = jsonWebKeys.get(0);
if (jsonWebKey instanceof RsaJsonWebKey) {
rsaJsonWebKey = (RsaJsonWebKey) jsonWebKey;
}
}
} catch (Exception e) {
// ignored
}
if (rsaJsonWebKey == null) {
// 生成一个RSA密钥对该密钥对将用于JWT的签名和验证包装在JWK中
rsaJsonWebKey = RsaJwkGenerator.generateJwk(2048);
// 给JWK一个密钥ID
rsaJsonWebKey.setKeyId(keyId);
}
return rsaJsonWebKey;
}
public static String createToken(String username) {
public static String createToken(String username, Long expirationTime, Map<String, Object> extra) {
try {
/*
* “iss” (issuer) 发行人
@@ -83,13 +122,17 @@ public class JwtUtils implements InitializingBean {
claims.setGeneratedJwtId();
claims.setIssuedAtToNow();
// 令牌将过期的时间 分钟
claims.setExpirationTimeMinutesInTheFuture(expirationTime);
if (expirationTime != null) {
claims.setExpirationTimeMinutesInTheFuture(expirationTime);
}
claims.setNotBeforeMinutesInThePast(0);
claims.setSubject("login");
claims.setAudience(AUDIENCE);
//添加自定义参数,必须是字符串类型
claims.setClaim("userName", username);
if (extra != null) {
extra.forEach(claims::setClaim);
}
//jws
JsonWebSignature jws = new JsonWebSignature();
//签名算法RS256
@@ -104,10 +147,17 @@ public class JwtUtils implements InitializingBean {
} catch (JoseException e) {
logger.error("[Token生成失败] {}", e.getMessage());
}
return null;
}
public static String createToken(String username, Long expirationTime) {
return createToken(username, expirationTime, null);
}
public static String createToken(String username) {
return createToken(username, EXPIRATION_TIME);
}
public static String getHeader() {
return HEADER;
}
@@ -118,8 +168,8 @@ public class JwtUtils implements InitializingBean {
try {
JwtConsumer consumer = new JwtConsumerBuilder()
.setRequireExpirationTime()
.setMaxFutureValidityInMinutes(5256000)
//.setRequireExpirationTime()
//.setMaxFutureValidityInMinutes(5256000)
.setAllowedClockSkewInSeconds(30)
.setRequireSubject()
//.setExpectedIssuer("")
@@ -129,15 +179,27 @@ public class JwtUtils implements InitializingBean {
JwtClaims claims = consumer.processToClaims(token);
NumericDate expirationTime = claims.getExpirationTime();
// 判断是否即将过期, 默认剩余时间小于5分钟未即将过期
// 剩余时间 (秒)
long timeRemaining = LocalDateTime.now().toEpochSecond(ZoneOffset.ofHours(8)) - expirationTime.getValue();
if (timeRemaining < 5 * 60) {
jwtUser.setStatus(JwtUser.TokenStatus.EXPIRING_SOON);
if (expirationTime != null) {
// 判断是否即将过期, 默认剩余时间小于5分钟未即将过期
// 剩余时间 (秒)
long timeRemaining = LocalDateTime.now().toEpochSecond(ZoneOffset.ofHours(8)) - expirationTime.getValue();
if (timeRemaining < 5 * 60) {
jwtUser.setStatus(JwtUser.TokenStatus.EXPIRING_SOON);
} else {
jwtUser.setStatus(JwtUser.TokenStatus.NORMAL);
}
} else {
jwtUser.setStatus(JwtUser.TokenStatus.NORMAL);
}
Long apiKeyId = claims.getClaimValue("apiKeyId", Long.class);
if (apiKeyId != null) {
UserApiKey userApiKey = userApiKeyService.getUserApiKeyById(apiKeyId.intValue());
if (userApiKey == null || !userApiKey.isEnable()) {
jwtUser.setStatus(JwtUser.TokenStatus.EXPIRED);
}
}
String username = (String) claims.getClaimValue("userName");
User user = userService.getUserByUsername(username);

View File

@@ -1,5 +1,7 @@
package com.genersoft.iot.vmp.gb28181.bean;
import com.genersoft.iot.vmp.service.bean.RequestPushStreamMsg;
public class SendRtpItem {
/**
@@ -122,6 +124,39 @@ public class SendRtpItem {
*/
private String receiveStream;
public static SendRtpItem getInstance(RequestPushStreamMsg requestPushStreamMsg) {
SendRtpItem sendRtpItem = new SendRtpItem();
sendRtpItem.setMediaServerId(requestPushStreamMsg.getMediaServerId());
sendRtpItem.setApp(requestPushStreamMsg.getApp());
sendRtpItem.setStream(requestPushStreamMsg.getStream());
sendRtpItem.setIp(requestPushStreamMsg.getIp());
sendRtpItem.setPort(requestPushStreamMsg.getPort());
sendRtpItem.setSsrc(requestPushStreamMsg.getSsrc());
sendRtpItem.setTcp(requestPushStreamMsg.isTcp());
sendRtpItem.setLocalPort(requestPushStreamMsg.getSrcPort());
sendRtpItem.setPt(requestPushStreamMsg.getPt());
sendRtpItem.setUsePs(requestPushStreamMsg.isPs());
sendRtpItem.setOnlyAudio(requestPushStreamMsg.isOnlyAudio());
return sendRtpItem;
}
public static SendRtpItem getInstance(String app, String stream, String ssrc, String dstIp, Integer dstPort, boolean tcp, int sendLocalPort, Integer pt) {
SendRtpItem sendRtpItem = new SendRtpItem();
sendRtpItem.setApp(app);
sendRtpItem.setStream(stream);
sendRtpItem.setSsrc(ssrc);
sendRtpItem.setTcp(tcp);
sendRtpItem.setLocalPort(sendLocalPort);
sendRtpItem.setIp(dstIp);
sendRtpItem.setPort(dstPort);
if (pt != null) {
sendRtpItem.setPt(pt);
}
return sendRtpItem;
}
public String getIp() {
return ip;
}

View File

@@ -37,15 +37,14 @@ public class RecordEndEventListener implements ApplicationListener<RecordEndEven
event.getRecordInfo().getChannelId(), count,sumNum);
if (!handlerMap.isEmpty()) {
RecordEndEventHandler handler = handlerMap.get(deviceId + channelId);
logger.info("录像查询完成事件触发, 发送订阅deviceId{}, channelId: {}",
event.getRecordInfo().getDeviceId(), event.getRecordInfo().getChannelId());
if (handler !=null){
handler.handler(event.getRecordInfo());
if (count ==sumNum){
handlerMap.remove(deviceId + channelId);
}
}
}else {
logger.info("录像查询完成事件触发, 但是订阅为空取消发送deviceId{}, channelId: {}",
event.getRecordInfo().getDeviceId(), event.getRecordInfo().getChannelId());
}
}

View File

@@ -18,7 +18,6 @@ import com.genersoft.iot.vmp.media.event.hook.Hook;
import com.genersoft.iot.vmp.media.event.hook.HookSubscribe;
import com.genersoft.iot.vmp.media.event.hook.HookType;
import com.genersoft.iot.vmp.media.service.IMediaServerService;
import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.service.bean.SSRCInfo;
import com.genersoft.iot.vmp.utils.DateUtil;
@@ -75,8 +74,6 @@ public class SIPCommander implements ISIPCommander {
@Autowired
private IMediaServerService mediaServerService;
@Autowired
private ZLMServerFactory zlmServerFactory;
/**

View File

@@ -13,11 +13,10 @@ import com.genersoft.iot.vmp.gb28181.transmit.SIPSender;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.SIPRequestHeaderPlarformProvider;
import com.genersoft.iot.vmp.gb28181.utils.SipUtils;
import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.media.event.hook.Hook;
import com.genersoft.iot.vmp.media.event.hook.HookSubscribe;
import com.genersoft.iot.vmp.media.event.hook.HookType;
import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.media.service.IMediaServerService;
import com.genersoft.iot.vmp.service.bean.GPSMsgInfo;
import com.genersoft.iot.vmp.service.bean.SSRCInfo;
@@ -65,9 +64,6 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform {
@Autowired
private SipSubscribe sipSubscribe;
@Autowired
private ZLMServerFactory zlmServerFactory;
@Autowired
private SipLayer sipLayer;
@@ -846,7 +842,7 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform {
MediaServer mediaServerItem = mediaServerService.getOne(mediaServerId);
if (mediaServerItem != null) {
mediaServerService.releaseSsrc(mediaServerItem.getId(), sendRtpItem.getSsrc());
zlmServerFactory.closeRtpServer(mediaServerItem, sendRtpItem.getStream());
mediaServerService.closeRTPServer(mediaServerItem, sendRtpItem.getStream());
}
SIPRequest byeRequest = headerProviderPlatformProvider.createByeRequest(platform, sendRtpItem);
if (byeRequest == null) {

View File

@@ -1,22 +1,17 @@
package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl;
import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.exception.ControllerException;
import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException;
import com.genersoft.iot.vmp.gb28181.bean.AudioBroadcastCatch;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
import com.genersoft.iot.vmp.media.event.hook.HookSubscribe;
import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.service.IDeviceService;
import com.genersoft.iot.vmp.media.service.IMediaServerService;
import com.genersoft.iot.vmp.service.IDeviceService;
import com.genersoft.iot.vmp.service.IPlayService;
import com.genersoft.iot.vmp.service.bean.RequestPushStreamMsg;
import com.genersoft.iot.vmp.service.redisMsg.RedisGbPlayMsgListener;
@@ -28,17 +23,12 @@ import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.sip.InvalidArgumentException;
import javax.sip.RequestEvent;
import javax.sip.SipException;
import javax.sip.address.SipURI;
import javax.sip.header.CallIdHeader;
import javax.sip.header.FromHeader;
import javax.sip.header.HeaderAddress;
import javax.sip.header.ToHeader;
import java.text.ParseException;
import java.util.HashMap;
import java.util.Map;
/**
* SIP命令类型 ACK请求
@@ -71,12 +61,6 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In
@Autowired
private IDeviceService deviceService;
@Autowired
private ZLMServerFactory zlmServerFactory;
@Autowired
private HookSubscribe hookSubscribe;
@Autowired
private IMediaServerService mediaServerService;
@@ -122,11 +106,8 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In
if (parentPlatform != null) {
if (!userSetting.getServerId().equals(sendRtpItem.getServerId())) {
RequestPushStreamMsg requestPushStreamMsg = RequestPushStreamMsg.getInstance(
sendRtpItem.getMediaServerId(), sendRtpItem.getApp(), sendRtpItem.getStream(),
sendRtpItem.getIp(), sendRtpItem.getPort(), sendRtpItem.getSsrc(), sendRtpItem.isTcp(),
sendRtpItem.getLocalPort(), sendRtpItem.getPt(), sendRtpItem.isUsePs(), sendRtpItem.isOnlyAudio());
redisGbPlayMsgListener.sendMsgForStartSendRtpStream(sendRtpItem.getServerId(), requestPushStreamMsg, json -> {
RequestPushStreamMsg requestPushStreamMsg = RequestPushStreamMsg.getInstance(sendRtpItem);
redisGbPlayMsgListener.sendMsgForStartSendRtpStream(sendRtpItem.getServerId(), requestPushStreamMsg, () -> {
playService.startSendRtpStreamFailHand(sendRtpItem, parentPlatform, callIdHeader);
});
} else {
@@ -134,7 +115,7 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In
if (sendRtpItem.isTcpActive()) {
mediaServerService.startSendRtpPassive(mediaInfo, parentPlatform, sendRtpItem, null);
} else {
mediaServerService.startSendRtpStream(mediaInfo, parentPlatform, sendRtpItem);
mediaServerService.startSendRtp(mediaInfo, parentPlatform, sendRtpItem);
}
}catch (ControllerException e) {
logger.error("RTP推流失败: {}", e.getMessage());
@@ -159,7 +140,7 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In
if (sendRtpItem.isTcpActive()) {
mediaServerService.startSendRtpPassive(mediaInfo, null, sendRtpItem, null);
} else {
mediaServerService.startSendRtpStream(mediaInfo, null, sendRtpItem);
mediaServerService.startSendRtp(mediaInfo, null, sendRtpItem);
}
}catch (ControllerException e) {
logger.error("RTP推流失败: {}", e.getMessage());

View File

@@ -6,16 +6,15 @@ import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.session.AudioBroadcastManager;
import com.genersoft.iot.vmp.gb28181.session.SSRCFactory;
import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
import com.genersoft.iot.vmp.media.service.IMediaServerService;
import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
import com.genersoft.iot.vmp.media.bean.MediaInfo;
import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.media.service.IMediaServerService;
import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
import com.genersoft.iot.vmp.service.*;
import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
@@ -36,8 +35,6 @@ import javax.sip.SipException;
import javax.sip.header.CallIdHeader;
import javax.sip.message.Response;
import java.text.ParseException;
import java.util.HashMap;
import java.util.Map;
/**
* SIP命令类型 BYE请求
@@ -75,12 +72,6 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In
@Autowired
private IVideoManagerStorage storager;
@Autowired
private ZLMServerFactory zlmServerFactory;
@Autowired
private SSRCFactory ssrcFactory;
@Autowired
private IMediaServerService mediaServerService;
@@ -110,7 +101,6 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In
/**
* 处理BYE请求
* @param evt
*/
@Override
public void process(RequestEvent evt) {
@@ -128,11 +118,6 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In
logger.info("[收到bye] 来自{},停止通道:{}, 类型: {}, callId: {}", sendRtpItem.getPlatformId(), sendRtpItem.getChannelId(), sendRtpItem.getPlayType(), callIdHeader.getCallId());
String streamId = sendRtpItem.getStream();
Map<String, Object> param = new HashMap<>();
param.put("vhost","__defaultVhost__");
param.put("app",sendRtpItem.getApp());
param.put("stream",streamId);
param.put("ssrc",sendRtpItem.getSsrc());
logger.info("[收到bye] 停止推流:{}, 媒体节点: {}", streamId, sendRtpItem.getMediaServerId());
if (sendRtpItem.getPlayType().equals(InviteStreamType.PUSH)) {
@@ -149,7 +134,7 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In
MediaServer mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
redisCatchStorage.deleteSendRTPServer(sendRtpItem.getPlatformId(), sendRtpItem.getChannelId(),
callIdHeader.getCallId(), null);
zlmServerFactory.stopSendRtpStream(mediaInfo, param);
mediaServerService.stopSendRtp(mediaInfo, sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getSsrc());
if (userSetting.getUseCustomSsrcForParentInvite()) {
mediaServerService.releaseSsrc(mediaInfo.getId(), sendRtpItem.getSsrc());
}
@@ -169,13 +154,13 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In
MediaServer mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
redisCatchStorage.deleteSendRTPServer(sendRtpItem.getPlatformId(), sendRtpItem.getChannelId(),
callIdHeader.getCallId(), null);
zlmServerFactory.stopSendRtpStream(mediaInfo, param);
mediaServerService.stopSendRtp(mediaInfo, sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getSsrc());
if (userSetting.getUseCustomSsrcForParentInvite()) {
mediaServerService.releaseSsrc(mediaInfo.getId(), sendRtpItem.getSsrc());
}
}
MediaServer mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
if (mediaInfo != null) {
MediaServer mediaServer = mediaServerService.getOne(sendRtpItem.getMediaServerId());
if (mediaServer != null) {
AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
if (audioBroadcastCatch != null && audioBroadcastCatch.getSipTransactionInfo().getCallId().equals(callIdHeader.getCallId())) {
// 来自上级平台的停止对讲
@@ -183,8 +168,9 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In
audioBroadcastManager.del(sendRtpItem.getDeviceId(), sendRtpItem.getChannelId());
}
int totalReaderCount = zlmServerFactory.totalReaderCount(mediaInfo, sendRtpItem.getApp(), streamId);
if (totalReaderCount <= 0) {
MediaInfo mediaInfo = mediaServerService.getMediaInfo(mediaServer, sendRtpItem.getApp(), streamId);
if (mediaInfo.getReaderCount() <= 0) {
logger.info("[收到bye] {} 无其它观看者,通知设备停止推流", streamId);
if (sendRtpItem.getPlayType().equals(InviteStreamType.PLAY)) {
Device device = deviceService.getDevice(sendRtpItem.getDeviceId());

View File

@@ -8,6 +8,7 @@ import com.genersoft.iot.vmp.common.VideoManagerConstants;
import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.conf.SipConfig;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.exception.ControllerException;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.session.AudioBroadcastManager;
import com.genersoft.iot.vmp.gb28181.session.SSRCFactory;
@@ -24,7 +25,6 @@ import com.genersoft.iot.vmp.media.event.hook.HookSubscribe;
import com.genersoft.iot.vmp.media.event.hook.HookType;
import com.genersoft.iot.vmp.media.service.IMediaServerService;
import com.genersoft.iot.vmp.media.zlm.ZLMMediaListManager;
import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem;
import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
import com.genersoft.iot.vmp.service.IInviteStreamService;
@@ -61,7 +61,6 @@ import javax.sip.header.CallIdHeader;
import javax.sip.message.Response;
import java.text.ParseException;
import java.time.Instant;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import java.util.Vector;
@@ -113,9 +112,6 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
@Autowired
private AudioBroadcastManager audioBroadcastManager;
@Autowired
private ZLMServerFactory zlmServerFactory;
@Autowired
private IMediaServerService mediaServerService;
@@ -382,8 +378,9 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
} else {
streamTypeStr = "UDP";
}
logger.info("[上级Invite] {}, 平台:{} 通道:{}, 收流地址:{}:{},收流方式:{}, ssrc{}", sessionName, username, channelId, addressStr, port, streamTypeStr, ssrc);
SendRtpItem sendRtpItem = zlmServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId,
logger.info("[上级Invite] {}, 平台:{} 通道:{}, 收流地址:{}:{},收流方式:{}, ssrc{}",
sessionName, username, channelId, addressStr, port, streamTypeStr, ssrc);
SendRtpItem sendRtpItem = mediaServerService.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId,
device.getDeviceId(), channelId, mediaTransmissionTCP, platform.isRtcp());
if (tcpActive != null) {
@@ -462,30 +459,10 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
responseSdpAck(request, content.toString(), platform);
// tcp主动模式回复sdp后开启监听
if (sendRtpItem.isTcpActive()) {
MediaServer mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
Map<String, Object> param = new HashMap<>(12);
param.put("vhost","__defaultVhost__");
param.put("app",sendRtpItem.getApp());
param.put("stream",sendRtpItem.getStream());
param.put("ssrc", sendRtpItem.getSsrc());
if (!sendRtpItem.isTcpActive()) {
param.put("dst_url",sendRtpItem.getIp());
param.put("dst_port", sendRtpItem.getPort());
}
String is_Udp = sendRtpItem.isTcp() ? "0" : "1";
param.put("is_udp", is_Udp);
param.put("src_port", localPort);
param.put("pt", sendRtpItem.getPt());
param.put("use_ps", sendRtpItem.isUsePs() ? "1" : "0");
param.put("only_audio", sendRtpItem.isOnlyAudio() ? "1" : "0");
if (!sendRtpItem.isTcp()) {
// 开启rtcp保活
param.put("udp_rtcp_timeout", sendRtpItem.isRtcp()? "1":"0");
}
JSONObject startSendRtpStreamResult = zlmServerFactory.startSendRtpPassive(mediaInfo, param);
if (startSendRtpStreamResult != null) {
startSendRtpStreamHand(evt, sendRtpItem, null, startSendRtpStreamResult, param, callIdHeader);
}
MediaServer mediaServer = mediaServerService.getOne(sendRtpItem.getMediaServerId());
try {
mediaServerService.startSendRtpPassive(mediaServer, platform, sendRtpItem, 5);
}catch (ControllerException e) {}
}
} catch (SipException | InvalidArgumentException | ParseException e) {
logger.error("[命令发送失败] 国标级联 回复SdpAck", e);
@@ -638,13 +615,14 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
* 安排推流
*/
private void pushProxyStream(RequestEvent evt, SIPRequest request, GbStream gbStream, ParentPlatform platform,
CallIdHeader callIdHeader, MediaServer mediaServerItem,
CallIdHeader callIdHeader, MediaServer mediaServer,
int port, Boolean tcpActive, boolean mediaTransmissionTCP,
String channelId, String addressStr, String ssrc, String requesterId) {
Boolean streamReady = mediaServerService.isStreamReady(mediaServerItem, gbStream.getApp(), gbStream.getStream());
Boolean streamReady = mediaServerService.isStreamReady(mediaServer, gbStream.getApp(), gbStream.getStream());
if (streamReady != null && streamReady) {
// 自平台内容
SendRtpItem sendRtpItem = zlmServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId,
SendRtpItem sendRtpItem = mediaServerService.createSendRtpItem(mediaServer, addressStr, port, ssrc, requesterId,
gbStream.getApp(), gbStream.getStream(), channelId, mediaTransmissionTCP, platform.isRtcp());
if (sendRtpItem == null) {
@@ -665,7 +643,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
sendRtpItem.setCallId(callIdHeader.getCallId());
sendRtpItem.setFromTag(request.getFromTag());
SIPResponse response = sendStreamAck(mediaServerItem, request, sendRtpItem, platform, evt);
SIPResponse response = sendStreamAck(mediaServer, request, sendRtpItem, platform, evt);
if (response != null) {
sendRtpItem.setToTag(response.getToTag());
}
@@ -684,7 +662,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
Boolean streamReady = mediaServerService.isStreamReady(mediaServerItem, gbStream.getApp(), gbStream.getStream());
if (streamReady != null && streamReady) {
// 自平台内容
SendRtpItem sendRtpItem = zlmServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId,
SendRtpItem sendRtpItem = mediaServerService.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId,
gbStream.getApp(), gbStream.getStream(), channelId, mediaTransmissionTCP, platform.isRtcp());
if (sendRtpItem == null) {
@@ -794,7 +772,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
dynamicTask.stop(callIdHeader.getCallId());
redisPushStreamResponseListener.removeEvent(gbStream.getApp(), gbStream.getStream());
if (serverId.equals(userSetting.getServerId())) {
SendRtpItem sendRtpItem = zlmServerFactory.createSendRtpItem(mediaServerItem, addressStr, finalPort, ssrc, requesterId,
SendRtpItem sendRtpItem = mediaServerService.createSendRtpItem(mediaServerItem, addressStr, finalPort, ssrc, requesterId,
app, stream, channelId, mediaTransmissionTCP, platform.isRtcp());
if (sendRtpItem == null) {
@@ -1074,7 +1052,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
mediaTransmissionTCP ? (tcpActive ? "TCP主动" : "TCP被动") : "UDP", sdp.getSessionName().getValue());
CallIdHeader callIdHeader = (CallIdHeader) request.getHeader(CallIdHeader.NAME);
SendRtpItem sendRtpItem = zlmServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, gb28181Sdp.getSsrc(), requesterId,
SendRtpItem sendRtpItem = mediaServerService.createSendRtpItem(mediaServerItem, addressStr, port, gb28181Sdp.getSsrc(), requesterId,
device.getDeviceId(), broadcastCatch.getChannelId(),
mediaTransmissionTCP, false);

View File

@@ -1,16 +1,15 @@
package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.notify.cmd;
import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.conf.exception.ControllerException;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.session.AudioBroadcastManager;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.IMessageHandler;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.notify.NotifyMessageHandler;
import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.service.IDeviceService;
import com.genersoft.iot.vmp.media.service.IMediaServerService;
import com.genersoft.iot.vmp.service.IDeviceService;
import com.genersoft.iot.vmp.service.IPlatformService;
import com.genersoft.iot.vmp.service.IPlayService;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
@@ -62,9 +61,6 @@ public class BroadcastNotifyMessageHandler extends SIPRequestProcessorParent imp
@Autowired
private AudioBroadcastManager audioBroadcastManager;
@Autowired
private ZLMServerFactory zlmServerFactory;
@Autowired
private IRedisCatchStorage redisCatchStorage;
@@ -155,12 +151,13 @@ public class BroadcastNotifyMessageHandler extends SIPRequestProcessorParent imp
}
}else {
// 发流
JSONObject jsonObject = zlmServerFactory.startSendRtp(hookData.getMediaServer(), sendRtpItem);
if (jsonObject != null && jsonObject.getInteger("code") == 0 ) {
logger.info("[语音喊话] 自动推流成功, device: {}, channel: {}", device.getDeviceId(), targetId);
}else {
logger.info("[语音喊话] 推流失败, 结果: {}", jsonObject);
try {
mediaServerService.startSendRtp(hookData.getMediaServer(),null, sendRtpItem);
}catch (ControllerException e) {
logger.info("[语音喊话] 推流失败, 结果: {}", e.getMessage());
return;
}
logger.info("[语音喊话] 自动推流成功, device: {}, channel: {}", device.getDeviceId(), targetId);
}
}
}else {

View File

@@ -47,6 +47,8 @@ public class MediaInfo {
private Long aliveSecond;
@Schema(description = "数据产生速度单位byte/s")
private Long bytesSpeed;
@Schema(description = "鉴权参数")
private String callId;
public static MediaInfo getInstance(JSONObject jsonObject, MediaServer mediaServer) {
MediaInfo mediaInfo = new MediaInfo();
@@ -345,4 +347,12 @@ public class MediaInfo {
public void setSchema(String schema) {
this.schema = schema;
}
public String getCallId() {
return callId;
}
public void setCallId(String callId) {
this.callId = callId;
}
}

View File

@@ -14,7 +14,8 @@ public class Hook {
private String mediaServerId;
private Long createTime;
private Long expireTime;
public static Hook getInstance(HookType hookType, String app, String stream, String mediaServerId) {
Hook hookSubscribe = new Hook();
@@ -22,7 +23,7 @@ public class Hook {
hookSubscribe.setStream(stream);
hookSubscribe.setHookType(hookType);
hookSubscribe.setMediaServerId(mediaServerId);
hookSubscribe.setCreateTime(System.currentTimeMillis());
hookSubscribe.setExpireTime(System.currentTimeMillis() + 5 * 60 * 1000);
return hookSubscribe;
}
@@ -50,12 +51,13 @@ public class Hook {
this.stream = stream;
}
public Long getCreateTime() {
return createTime;
public Long getExpireTime() {
return expireTime;
}
public void setCreateTime(Long createTime) {
this.createTime = createTime;
public void setExpireTime(Long expireTime) {
this.expireTime = expireTime;
}
public String getMediaServerId() {

View File

@@ -58,7 +58,7 @@ public class HookSubscribe {
sendNotify(HookType.on_publish, event);
}
/**
* 推流鉴权事件
* 生成录像文件事件
*/
@Async("taskExecutor")
@EventListener
@@ -79,8 +79,8 @@ public class HookSubscribe {
}
public void addSubscribe(Hook hook, HookSubscribe.Event event) {
if (hook.getCreateTime() == null) {
hook.setCreateTime(System.currentTimeMillis());
if (hook.getExpireTime() == null) {
hook.setExpireTime(System.currentTimeMillis() + subscribeExpire);
}
allSubscribes.put(hook.toString(), event);
allHook.put(hook.toString(), hook);
@@ -96,9 +96,9 @@ public class HookSubscribe {
*/
@Scheduled(fixedRate=subscribeExpire) //每5分钟执行一次
public void execute(){
long expireTime = System.currentTimeMillis() - subscribeExpire;
long expireTime = System.currentTimeMillis();
for (Hook hook : allHook.values()) {
if (hook.getCreateTime() < expireTime) {
if (hook.getExpireTime() < expireTime) {
allSubscribes.remove(hook.toString());
allHook.remove(hook.toString());
}

View File

@@ -141,5 +141,10 @@ public interface IMediaServerService {
void startSendRtpPassive(MediaServer mediaServer, ParentPlatform platform, SendRtpItem sendRtpItem, Integer timeout);
void startSendRtpStream(MediaServer mediaServer, ParentPlatform platform, SendRtpItem sendRtpItem);
void startSendRtp(MediaServer mediaServer, ParentPlatform platform, SendRtpItem sendRtpItem);
SendRtpItem createSendRtpItem(MediaServer mediaServerItem, String addressStr, int port, String ssrc, String requesterId, String deviceId, String channelId, boolean mediaTransmissionTCP, boolean rtcp);
SendRtpItem createSendRtpItem(MediaServer serverItem, String ip, int port, String ssrc, String platformId,
String app, String stream, String channelId, boolean tcp, boolean rtcp);
}

View File

@@ -12,14 +12,16 @@ import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
import com.genersoft.iot.vmp.gb28181.session.SSRCFactory;
import com.genersoft.iot.vmp.media.bean.MediaInfo;
import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.media.event.media.MediaArrivalEvent;
import com.genersoft.iot.vmp.media.event.media.MediaDepartureEvent;
import com.genersoft.iot.vmp.media.event.mediaServer.MediaServerChangeEvent;
import com.genersoft.iot.vmp.media.event.mediaServer.MediaServerDeleteEvent;
import com.genersoft.iot.vmp.media.service.IMediaNodeServerService;
import com.genersoft.iot.vmp.media.service.IMediaServerService;
import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.media.zlm.SendRtpPortManager;
import com.genersoft.iot.vmp.media.zlm.dto.StreamAuthorityInfo;
import com.genersoft.iot.vmp.media.zlm.dto.hook.OriginType;
import com.genersoft.iot.vmp.service.IInviteStreamService;
import com.genersoft.iot.vmp.service.bean.MediaServerLoad;
import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
@@ -83,6 +85,9 @@ public class MediaServerServiceImpl implements IMediaServerService {
@Autowired
private MediaConfig mediaConfig;
@Autowired
private SendRtpPortManager sendRtpPortManager;
/**
@@ -94,6 +99,8 @@ public class MediaServerServiceImpl implements IMediaServerService {
if ("rtsp".equals(event.getSchema())) {
logger.info("流变化:注册 app->{}, stream->{}", event.getApp(), event.getStream());
addCount(event.getMediaServer().getId());
String type = OriginType.values()[event.getMediaInfo().getOriginType()].getType();
redisCatchStorage.addStream(event.getMediaServer(), type, event.getApp(), event.getStream(), event.getMediaInfo());
}
}
@@ -106,7 +113,12 @@ public class MediaServerServiceImpl implements IMediaServerService {
if ("rtsp".equals(event.getSchema())) {
logger.info("流变化:注销, app->{}, stream->{}", event.getApp(), event.getStream());
removeCount(event.getMediaServer().getId());
MediaInfo mediaInfo = redisCatchStorage.getStreamInfo(
event.getApp(), event.getStream(), event.getMediaServer().getId());
String type = OriginType.values()[mediaInfo.getOriginType()].getType();
redisCatchStorage.removeStream(mediaInfo.getMediaServer().getId(), type, event.getApp(), event.getStream());
}
}
@@ -812,7 +824,7 @@ public class MediaServerServiceImpl implements IMediaServerService {
}
@Override
public void startSendRtpStream(MediaServer mediaServer, ParentPlatform platform, SendRtpItem sendRtpItem) {
public void startSendRtp(MediaServer mediaServer, ParentPlatform platform, SendRtpItem sendRtpItem) {
IMediaNodeServerService mediaNodeServerService = nodeServerServiceMap.get(mediaServer.getType());
if (mediaNodeServerService == null) {
logger.info("[startSendRtpStream] 失败, mediaServer的类型 {},未找到对应的实现类", mediaServer.getType());
@@ -821,7 +833,10 @@ public class MediaServerServiceImpl implements IMediaServerService {
logger.info("[开始推流] rtp/{}, 目标={}:{}SSRC={}, RTCP={}", sendRtpItem.getStream(),
sendRtpItem.getIp(), sendRtpItem.getPort(), sendRtpItem.getSsrc(), sendRtpItem.isRtcp());
mediaNodeServerService.startSendRtpStream(mediaServer, sendRtpItem);
sendPlatformStartPlayMsg(platform, sendRtpItem);
if (platform != null) {
sendPlatformStartPlayMsg(platform, sendRtpItem);
}
}
@@ -834,4 +849,50 @@ public class MediaServerServiceImpl implements IMediaServerService {
redisCatchStorage.sendPlatformStartPlayMsg(messageForPushChannel);
}
}
@Override
public SendRtpItem createSendRtpItem(MediaServer mediaServer, String ip, int port, String ssrc, String requesterId, String deviceId, String channelId, boolean isTcp, boolean rtcp) {
int localPort = sendRtpPortManager.getNextPort(mediaServer);
if (localPort == 0) {
return null;
}
SendRtpItem sendRtpItem = new SendRtpItem();
sendRtpItem.setIp(ip);
sendRtpItem.setPort(port);
sendRtpItem.setSsrc(ssrc);
sendRtpItem.setPlatformId(deviceId);
sendRtpItem.setDeviceId(deviceId);
sendRtpItem.setChannelId(channelId);
sendRtpItem.setTcp(isTcp);
sendRtpItem.setRtcp(rtcp);
sendRtpItem.setApp("rtp");
sendRtpItem.setLocalPort(localPort);
sendRtpItem.setServerId(userSetting.getServerId());
sendRtpItem.setMediaServerId(mediaServer.getId());
return sendRtpItem;
}
@Override
public SendRtpItem createSendRtpItem(MediaServer serverItem, String ip, int port, String ssrc, String platformId,
String app, String stream, String channelId, boolean tcp, boolean rtcp){
int localPort = sendRtpPortManager.getNextPort(serverItem);
if (localPort == 0) {
return null;
}
SendRtpItem sendRtpItem = new SendRtpItem();
sendRtpItem.setIp(ip);
sendRtpItem.setPort(port);
sendRtpItem.setSsrc(ssrc);
sendRtpItem.setApp(app);
sendRtpItem.setStream(stream);
sendRtpItem.setPlatformId(platformId);
sendRtpItem.setChannelId(channelId);
sendRtpItem.setTcp(tcp);
sendRtpItem.setLocalPort(localPort);
sendRtpItem.setServerId(userSetting.getServerId());
sendRtpItem.setMediaServerId(serverItem.getId());
sendRtpItem.setRtcp(rtcp);
return sendRtpItem;
}
}

View File

@@ -3,15 +3,13 @@ package com.genersoft.iot.vmp.media.zlm;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.gb28181.bean.GbStream;
import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.media.event.hook.HookSubscribe;
import com.genersoft.iot.vmp.media.zlm.dto.*;
import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam;
import com.genersoft.iot.vmp.media.service.IMediaServerService;
import com.genersoft.iot.vmp.service.IStreamProxyService;
import com.genersoft.iot.vmp.media.zlm.dto.ChannelOnlineEvent;
import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam;
import com.genersoft.iot.vmp.service.IStreamPushService;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
import com.genersoft.iot.vmp.storager.dao.GbStreamMapper;
import com.genersoft.iot.vmp.storager.dao.PlatformGbStreamMapper;
import com.genersoft.iot.vmp.storager.dao.StreamPushMapper;
import com.genersoft.iot.vmp.utils.DateUtil;
import org.slf4j.Logger;
@@ -20,7 +18,7 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.text.ParseException;
import java.util.*;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
@@ -37,26 +35,16 @@ public class ZLMMediaListManager {
@Autowired
private GbStreamMapper gbStreamMapper;
@Autowired
private PlatformGbStreamMapper platformGbStreamMapper;
@Autowired
private IStreamPushService streamPushService;
@Autowired
private IStreamProxyService streamProxyService;
@Autowired
private StreamPushMapper streamPushMapper;
@Autowired
private HookSubscribe subscribe;
@Autowired
private UserSetting userSetting;
@Autowired
private ZLMServerFactory zlmServerFactory;
@Autowired
private IMediaServerService mediaServerService;

View File

@@ -322,11 +322,23 @@ public class ZLMMediaNodeServerService implements IMediaNodeServerService {
if (timeout != null) {
param.put("close_delay_ms", timeout);
}
if (!sendRtpItem.isTcp()) {
// 开启rtcp保活
param.put("udp_rtcp_timeout", sendRtpItem.isRtcp()? "1":"0");
}
if (!sendRtpItem.isTcpActive()) {
param.put("dst_url",sendRtpItem.getIp());
param.put("dst_port", sendRtpItem.getPort());
}
JSONObject jsonObject = zlmServerFactory.startSendRtpPassive(mediaServer, param, null);
if (jsonObject == null || jsonObject.getInteger("code") != 0 ) {
logger.error("启动监听TCP被动推流失败: {}, 参数:{}", jsonObject.getString("msg"), JSON.toJSONString(param));
throw new ControllerException(jsonObject.getInteger("code"), jsonObject.getString("msg"));
}
logger.info("调用ZLM-TCP被动推流接口, 结果: {}", jsonObject);
logger.info("启动监听TCP被动推流成功[ {}/{} ]{}->{}:{}, " , sendRtpItem.getApp(), sendRtpItem.getStream(),
jsonObject.getString("local_port"), param.get("dst_url"), param.get("dst_port"));
}
@Override
@@ -347,7 +359,7 @@ public class ZLMMediaNodeServerService implements IMediaNodeServerService {
}
param.put("dst_url", sendRtpItem.getIp());
param.put("dst_port", sendRtpItem.getPort());
JSONObject jsonObject = zlmServerFactory.startSendRtpStream(mediaServer, param);
JSONObject jsonObject = zlmresTfulUtils.startSendRtp(mediaServer, param);
if (jsonObject == null || jsonObject.getInteger("code") != 0 ) {
throw new ControllerException(jsonObject.getInteger("code"), jsonObject.getString("msg"));
}

View File

@@ -55,6 +55,9 @@ public class ZLMMediaServerStatusManger {
@Value("${server.port}")
private Integer serverPort;
@Value("${server.servlet.context-path:}")
private String serverServletContextPath;
@Autowired
private UserSetting userSetting;
@@ -239,7 +242,7 @@ public class ZLMMediaServerStatusManger {
logger.info("[媒体服务节点] 正在设置 {} -> {}:{}",
mediaServerItem.getId(), mediaServerItem.getIp(), mediaServerItem.getHttpPort());
String protocol = sslEnabled ? "https" : "http";
String hookPrefix = String.format("%s://%s:%s/index/hook", protocol, mediaServerItem.getHookIp(), serverPort);
String hookPrefix = String.format("%s://%s:%s%s/index/hook", protocol, mediaServerItem.getHookIp(), serverPort, (serverServletContextPath == null || "/".equals(serverServletContextPath)) ? "" : serverServletContextPath);
Map<String, Object> param = new HashMap<>();
param.put("api.secret",mediaServerItem.getSecret()); // -profile:v Baseline

View File

@@ -5,7 +5,6 @@ import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.common.CommonCallback;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
import com.genersoft.iot.vmp.media.event.hook.HookSubscribe;
import com.genersoft.iot.vmp.media.bean.MediaServer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -26,9 +25,6 @@ public class ZLMServerFactory {
@Autowired
private UserSetting userSetting;
@Autowired
private HookSubscribe hookSubscribe;
@Autowired
private SendRtpPortManager sendRtpPortManager;
@@ -156,72 +152,6 @@ public class ZLMServerFactory {
}
/**
* 创建一个国标推流
* @param ip 推流ip
* @param port 推流端口
* @param ssrc 推流唯一标识
* @param platformId 平台id
* @param channelId 通道id
* @param tcp 是否为tcp
* @return SendRtpItem
*/
public SendRtpItem createSendRtpItem(MediaServer serverItem, String ip, int port, String ssrc, String platformId,
String deviceId, String channelId, boolean tcp, boolean rtcp){
int localPort = sendRtpPortManager.getNextPort(serverItem);
if (localPort == 0) {
return null;
}
SendRtpItem sendRtpItem = new SendRtpItem();
sendRtpItem.setIp(ip);
sendRtpItem.setPort(port);
sendRtpItem.setSsrc(ssrc);
sendRtpItem.setPlatformId(platformId);
sendRtpItem.setDeviceId(deviceId);
sendRtpItem.setChannelId(channelId);
sendRtpItem.setTcp(tcp);
sendRtpItem.setRtcp(rtcp);
sendRtpItem.setApp("rtp");
sendRtpItem.setLocalPort(localPort);
sendRtpItem.setServerId(userSetting.getServerId());
sendRtpItem.setMediaServerId(serverItem.getId());
return sendRtpItem;
}
/**
* 创建一个直播推流
* @param ip 推流ip
* @param port 推流端口
* @param ssrc 推流唯一标识
* @param platformId 平台id
* @param channelId 通道id
* @param tcp 是否为tcp
* @return SendRtpItem
*/
public SendRtpItem createSendRtpItem(MediaServer serverItem, String ip, int port, String ssrc, String platformId,
String app, String stream, String channelId, boolean tcp, boolean rtcp){
int localPort = sendRtpPortManager.getNextPort(serverItem);
if (localPort == 0) {
return null;
}
SendRtpItem sendRtpItem = new SendRtpItem();
sendRtpItem.setIp(ip);
sendRtpItem.setPort(port);
sendRtpItem.setSsrc(ssrc);
sendRtpItem.setApp(app);
sendRtpItem.setStream(stream);
sendRtpItem.setPlatformId(platformId);
sendRtpItem.setChannelId(channelId);
sendRtpItem.setTcp(tcp);
sendRtpItem.setLocalPort(localPort);
sendRtpItem.setServerId(userSetting.getServerId());
sendRtpItem.setMediaServerId(serverItem.getId());
sendRtpItem.setRtcp(rtcp);
return sendRtpItem;
}
/**
* 调用zlm RESTFUL API —— startSendRtp
*/
@@ -240,17 +170,6 @@ public class ZLMServerFactory {
return zlmresTfulUtils.startSendRtpPassive(mediaServerItem, param, callback);
}
/**
* 查询待转推的流是否就绪
*/
public Boolean isRtpReady(MediaServer mediaServerItem, String streamId) {
JSONObject mediaInfo = zlmresTfulUtils.getMediaInfo(mediaServerItem,"rtp", "rtsp", streamId);
if (mediaInfo.getInteger("code") == -2) {
return null;
}
return (mediaInfo.getInteger("code") == 0 && mediaInfo.getBoolean("online"));
}
/**
* 查询待转推的流是否就绪
*/
@@ -286,27 +205,6 @@ public class ZLMServerFactory {
return mediaInfo.getInteger("totalReaderCount");
}
/**
* 调用zlm RESTful API —— stopSendRtp
*/
public Boolean stopSendRtpStream(MediaServer mediaServerItem, Map<String, Object>param) {
if (mediaServerItem == null) {
logger.error("[停止RTP推流] 失败: 媒体节点为NULL");
return false;
}
Boolean result = false;
JSONObject jsonObject = zlmresTfulUtils.stopSendRtp(mediaServerItem, param);
if (jsonObject == null) {
logger.error("[停止RTP推流] 失败: 请检查ZLM服务");
} else if (jsonObject.getInteger("code") == 0) {
result= true;
logger.info("[停止RTP推流] 成功");
} else {
logger.warn("[停止RTP推流] 失败: {}, 参数:{}->\r\n{}",jsonObject.getString("msg"), JSON.toJSON(param), jsonObject);
}
return result;
}
public JSONObject startSendRtp(MediaServer mediaInfo, SendRtpItem sendRtpItem) {
String is_Udp = sendRtpItem.isTcp() ? "0" : "1";
logger.info("rtp/{}开始推流, 目标={}:{}SSRC={}", sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort(), sendRtpItem.getSsrc());

View File

@@ -0,0 +1,25 @@
package com.genersoft.iot.vmp.service;
import com.genersoft.iot.vmp.storager.dao.dto.UserApiKey;
import com.github.pagehelper.PageInfo;
public interface IUserApiKeyService {
int addApiKey(UserApiKey userApiKey);
boolean isApiKeyExists(String apiKey);
PageInfo<UserApiKey> getUserApiKeys(int page, int count);
int enable(Integer id);
int disable(Integer id);
int remark(Integer id, String remark);
int delete(Integer id);
UserApiKey getUserApiKeyById(Integer id);
int reset(Integer id, String apiKey);
}

View File

@@ -11,6 +11,8 @@ public interface IUserService {
boolean changePassword(int id, String password);
User getUserById(int id);
User getUserByUsername(String username);
int addUser(User user);

View File

@@ -1,5 +1,7 @@
package com.genersoft.iot.vmp.service.bean;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
/**
* redis消息请求下级推送流信息
* @author lin
@@ -80,6 +82,22 @@ public class RequestPushStreamMsg {
return requestPushStreamMsg;
}
public static RequestPushStreamMsg getInstance(SendRtpItem sendRtpItem) {
RequestPushStreamMsg requestPushStreamMsg = new RequestPushStreamMsg();
requestPushStreamMsg.setMediaServerId(sendRtpItem.getMediaServerId());
requestPushStreamMsg.setApp(sendRtpItem.getApp());
requestPushStreamMsg.setStream(sendRtpItem.getStream());
requestPushStreamMsg.setIp(sendRtpItem.getIp());
requestPushStreamMsg.setPort(sendRtpItem.getPort());
requestPushStreamMsg.setSsrc(sendRtpItem.getSsrc());
requestPushStreamMsg.setTcp(sendRtpItem.isTcp());
requestPushStreamMsg.setSrcPort(sendRtpItem.getLocalPort());
requestPushStreamMsg.setPt(sendRtpItem.getPt());
requestPushStreamMsg.setPs(sendRtpItem.isUsePs());
requestPushStreamMsg.setOnlyAudio(sendRtpItem.isOnlyAudio());
return requestPushStreamMsg;
}
public String getMediaServerId() {
return mediaServerId;
}

View File

@@ -1,7 +1,9 @@
package com.genersoft.iot.vmp.service.impl;
import com.baomidou.dynamic.datasource.annotation.DS;
import com.genersoft.iot.vmp.common.*;
import com.genersoft.iot.vmp.common.InviteInfo;
import com.genersoft.iot.vmp.common.InviteSessionStatus;
import com.genersoft.iot.vmp.common.InviteSessionType;
import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException;
@@ -11,13 +13,12 @@ import com.genersoft.iot.vmp.gb28181.session.SSRCFactory;
import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
import com.genersoft.iot.vmp.gb28181.utils.SipUtils;
import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.media.event.hook.HookData;
import com.genersoft.iot.vmp.media.event.hook.HookSubscribe;
import com.genersoft.iot.vmp.media.event.media.MediaDepartureEvent;
import com.genersoft.iot.vmp.media.event.mediaServer.MediaSendRtpStoppedEvent;
import com.genersoft.iot.vmp.media.event.hook.HookSubscribe;
import com.genersoft.iot.vmp.media.service.IMediaServerService;
import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.service.IInviteStreamService;
import com.genersoft.iot.vmp.service.IPlatformService;
import com.genersoft.iot.vmp.service.IPlayService;
@@ -41,7 +42,9 @@ import javax.sip.InvalidArgumentException;
import javax.sip.ResponseEvent;
import javax.sip.SipException;
import java.text.ParseException;
import java.util.*;
import java.util.List;
import java.util.UUID;
import java.util.Vector;
/**
* @author lin
@@ -75,9 +78,6 @@ public class PlatformServiceImpl implements IPlatformService {
@Autowired
private DynamicTask dynamicTask;
@Autowired
private ZLMServerFactory zlmServerFactory;
@Autowired
private SubscribeHolder subscribeHolder;
@@ -87,9 +87,6 @@ public class PlatformServiceImpl implements IPlatformService {
@Autowired
private UserSetting userSetting;
@Autowired
private HookSubscribe subscribe;
@Autowired
private VideoStreamSessionManager streamSession;
@@ -437,11 +434,7 @@ public class PlatformServiceImpl implements IPlatformService {
ssrcFactory.releaseSsrc(sendRtpItem.getMediaServerId(), sendRtpItem.getSsrc());
redisCatchStorage.deleteSendRTPServer(platformId, sendRtpItem.getChannelId(), null, null);
MediaServer mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
Map<String, Object> param = new HashMap<>(3);
param.put("vhost", "__defaultVhost__");
param.put("app", sendRtpItem.getApp());
param.put("stream", sendRtpItem.getStream());
zlmServerFactory.stopSendRtpStream(mediaInfo, param);
mediaServerService.stopSendRtp(mediaInfo, sendRtpItem.getApp(), sendRtpItem.getStream(), null);
}
}
}

View File

@@ -30,6 +30,9 @@ import com.genersoft.iot.vmp.service.IDeviceChannelService;
import com.genersoft.iot.vmp.service.IDeviceService;
import com.genersoft.iot.vmp.service.IInviteStreamService;
import com.genersoft.iot.vmp.service.IPlayService;
import com.genersoft.iot.vmp.media.event.hook.HookSubscribe;
import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.service.*;
import com.genersoft.iot.vmp.service.bean.*;
import com.genersoft.iot.vmp.service.redisMsg.RedisGbPlayMsgListener;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
@@ -1044,9 +1047,7 @@ public class PlayServiceImpl implements IPlayService {
};
Hook hook = Hook.getInstance(HookType.on_record_mp4, "rtp", ssrcInfo.getStream(), mediaServerItem.getId());
// 设置过期时间,下载失败时自动处理订阅数据
// long difference = DateUtil.getDifference(startTime, endTime)/1000;
// Instant expiresInstant = Instant.now().plusSeconds(TimeUnit.MINUTES.toSeconds(difference * 2));
// hookSubscribe.setExpires(expiresInstant);
hook.setExpireTime(System.currentTimeMillis() + 24 * 60 * 60 * 1000);
subscribe.addSubscribe(hook, hookEventForRecord);
});
} catch (InvalidArgumentException | SipException | ParseException e) {
@@ -1419,11 +1420,8 @@ public class PlayServiceImpl implements IPlayService {
MediaServer mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
if (mediaInfo == null) {
RequestPushStreamMsg requestPushStreamMsg = RequestPushStreamMsg.getInstance(
sendRtpItem.getMediaServerId(), sendRtpItem.getApp(), sendRtpItem.getStream(),
sendRtpItem.getIp(), sendRtpItem.getPort(), sendRtpItem.getSsrc(), sendRtpItem.isTcp(),
sendRtpItem.getLocalPort(), sendRtpItem.getPt(), sendRtpItem.isUsePs(), sendRtpItem.isOnlyAudio());
redisGbPlayMsgListener.sendMsgForStartSendRtpStream(sendRtpItem.getServerId(), requestPushStreamMsg, json -> {
RequestPushStreamMsg requestPushStreamMsg = RequestPushStreamMsg.getInstance(sendRtpItem);
redisGbPlayMsgListener.sendMsgForStartSendRtpStream(sendRtpItem.getServerId(), requestPushStreamMsg, () -> {
startSendRtpStreamFailHand(sendRtpItem, platform, callIdHeader);
});
} else {
@@ -1431,7 +1429,7 @@ public class PlayServiceImpl implements IPlayService {
if (sendRtpItem.isTcpActive()) {
mediaServerService.startSendRtpPassive(mediaInfo, platform, sendRtpItem, null);
} else {
mediaServerService.startSendRtpStream(mediaInfo, platform, sendRtpItem);
mediaServerService.startSendRtp(mediaInfo, platform, sendRtpItem);
}
}catch (ControllerException e) {
logger.error("RTP推流失败: {}", e.getMessage());

View File

@@ -9,17 +9,15 @@ import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.exception.ControllerException;
import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent;
import com.genersoft.iot.vmp.media.bean.MediaInfo;
import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.media.event.hook.Hook;
import com.genersoft.iot.vmp.media.event.hook.HookSubscribe;
import com.genersoft.iot.vmp.media.event.hook.HookType;
import com.genersoft.iot.vmp.media.event.media.MediaArrivalEvent;
import com.genersoft.iot.vmp.media.event.media.MediaDepartureEvent;
import com.genersoft.iot.vmp.media.event.media.MediaNotFoundEvent;
import com.genersoft.iot.vmp.media.service.IMediaServerService;
import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
import com.genersoft.iot.vmp.media.event.hook.HookSubscribe;
import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem;
import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam;
import com.genersoft.iot.vmp.service.IGbStreamService;
import com.genersoft.iot.vmp.service.IStreamProxyService;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
@@ -64,9 +62,6 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
@Autowired
private IVideoManagerStorage videoManagerStorager;
@Autowired
private ZLMServerFactory zlmServerFactory;
@Autowired
private StreamProxyMapper streamProxyMapper;
@@ -509,18 +504,18 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
String type = "PULL";
// 发送redis消息
List<OnStreamChangedHookParam> onStreamChangedHookParams = redisCatchStorage.getStreams(mediaServerId, type);
if (onStreamChangedHookParams.size() > 0) {
for (OnStreamChangedHookParam onStreamChangedHookParam : onStreamChangedHookParams) {
List<MediaInfo> mediaInfoList = redisCatchStorage.getStreams(mediaServerId, type);
if (mediaInfoList.size() > 0) {
for (MediaInfo mediaInfo : mediaInfoList) {
JSONObject jsonObject = new JSONObject();
jsonObject.put("serverId", userSetting.getServerId());
jsonObject.put("app", onStreamChangedHookParam.getApp());
jsonObject.put("stream", onStreamChangedHookParam.getStream());
jsonObject.put("app", mediaInfo.getApp());
jsonObject.put("stream", mediaInfo.getStream());
jsonObject.put("register", false);
jsonObject.put("mediaServerId", mediaServerId);
redisCatchStorage.sendStreamChangeMsg(type, jsonObject);
// 移除redis内流的信息
redisCatchStorage.removeStream(mediaServerId, type, onStreamChangedHookParam.getApp(), onStreamChangedHookParam.getStream());
redisCatchStorage.removeStream(mediaServerId, type, mediaInfo.getApp(), mediaInfo.getStream());
}
}
}
@@ -538,8 +533,8 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
private void syncPullStream(String mediaServerId){
MediaServer mediaServer = mediaServerService.getOne(mediaServerId);
if (mediaServer != null) {
List<OnStreamChangedHookParam> allPullStream = redisCatchStorage.getStreams(mediaServerId, "PULL");
if (!allPullStream.isEmpty()) {
List<MediaInfo> mediaInfoList = redisCatchStorage.getStreams(mediaServerId, "PULL");
if (!mediaInfoList.isEmpty()) {
List<StreamInfo> mediaList = mediaServerService.getMediaList(mediaServer, null, null, null);
Map<String, StreamInfo> stringStreamInfoMap = new HashMap<>();
if (mediaList != null && !mediaList.isEmpty()) {

View File

@@ -156,10 +156,10 @@ public class StreamPushServiceImpl implements IStreamPushService {
@EventListener
public void onApplicationEvent(MediaDepartureEvent event) {
// 兼容流注销时类型从redis记录获取
OnStreamChangedHookParam onStreamChangedHookParam = redisCatchStorage.getStreamInfo(
MediaInfo mediaInfo = redisCatchStorage.getStreamInfo(
event.getApp(), event.getStream(), event.getMediaServer().getId());
if (onStreamChangedHookParam != null) {
String type = OriginType.values()[onStreamChangedHookParam.getOriginType()].getType();
if (mediaInfo != null) {
String type = OriginType.values()[mediaInfo.getOriginType()].getType();
redisCatchStorage.removeStream(event.getMediaServer().getId(), type, event.getApp(), event.getStream());
if ("PUSH".equalsIgnoreCase(type)) {
// 冗余数据,自己系统中自用
@@ -302,8 +302,8 @@ public class StreamPushServiceImpl implements IStreamPushService {
List<StreamPushItem> pushList = getPushList(mediaServerId);
Map<String, StreamPushItem> pushItemMap = new HashMap<>();
// redis记录
List<OnStreamChangedHookParam> onStreamChangedHookParams = redisCatchStorage.getStreams(mediaServerId, "PUSH");
Map<String, OnStreamChangedHookParam> streamInfoPushItemMap = new HashMap<>();
List<MediaInfo> mediaInfoList = redisCatchStorage.getStreams(mediaServerId, "PUSH");
Map<String, MediaInfo> streamInfoPushItemMap = new HashMap<>();
if (pushList.size() > 0) {
for (StreamPushItem streamPushItem : pushList) {
if (ObjectUtils.isEmpty(streamPushItem.getGbId())) {
@@ -311,9 +311,9 @@ public class StreamPushServiceImpl implements IStreamPushService {
}
}
}
if (onStreamChangedHookParams.size() > 0) {
for (OnStreamChangedHookParam onStreamChangedHookParam : onStreamChangedHookParams) {
streamInfoPushItemMap.put(onStreamChangedHookParam.getApp() + onStreamChangedHookParam.getStream(), onStreamChangedHookParam);
if (mediaInfoList.size() > 0) {
for (MediaInfo mediaInfo : mediaInfoList) {
streamInfoPushItemMap.put(mediaInfo.getApp() + mediaInfo.getStream(), mediaInfo);
}
}
// 获取所有推流鉴权信息,清理过期的
@@ -352,21 +352,21 @@ public class StreamPushServiceImpl implements IStreamPushService {
}
}
Collection<OnStreamChangedHookParam> offlineOnStreamChangedHookParamList = streamInfoPushItemMap.values();
if (offlineOnStreamChangedHookParamList.size() > 0) {
Collection<MediaInfo> mediaInfos = streamInfoPushItemMap.values();
if (mediaInfos.size() > 0) {
String type = "PUSH";
for (OnStreamChangedHookParam offlineOnStreamChangedHookParam : offlineOnStreamChangedHookParamList) {
for (MediaInfo mediaInfo : mediaInfos) {
JSONObject jsonObject = new JSONObject();
jsonObject.put("serverId", userSetting.getServerId());
jsonObject.put("app", offlineOnStreamChangedHookParam.getApp());
jsonObject.put("stream", offlineOnStreamChangedHookParam.getStream());
jsonObject.put("app", mediaInfo.getApp());
jsonObject.put("stream", mediaInfo.getStream());
jsonObject.put("register", false);
jsonObject.put("mediaServerId", mediaServerId);
redisCatchStorage.sendStreamChangeMsg(type, jsonObject);
// 移除redis内流的信息
redisCatchStorage.removeStream(mediaServerItem.getId(), "PUSH", offlineOnStreamChangedHookParam.getApp(), offlineOnStreamChangedHookParam.getStream());
redisCatchStorage.removeStream(mediaServerItem.getId(), "PUSH", mediaInfo.getApp(), mediaInfo.getStream());
// 冗余数据,自己系统中自用
redisCatchStorage.removePushListItem(offlineOnStreamChangedHookParam.getApp(), offlineOnStreamChangedHookParam.getStream(), mediaServerItem.getId());
redisCatchStorage.removePushListItem(mediaInfo.getApp(), mediaInfo.getStream(), mediaServerItem.getId());
}
}
@@ -391,21 +391,21 @@ public class StreamPushServiceImpl implements IStreamPushService {
// 发送流停止消息
String type = "PUSH";
// 发送redis消息
List<OnStreamChangedHookParam> streamInfoList = redisCatchStorage.getStreams(mediaServerId, type);
if (streamInfoList.size() > 0) {
for (OnStreamChangedHookParam onStreamChangedHookParam : streamInfoList) {
List<MediaInfo> mediaInfoList = redisCatchStorage.getStreams(mediaServerId, type);
if (mediaInfoList.size() > 0) {
for (MediaInfo mediaInfo : mediaInfoList) {
// 移除redis内流的信息
redisCatchStorage.removeStream(mediaServerId, type, onStreamChangedHookParam.getApp(), onStreamChangedHookParam.getStream());
redisCatchStorage.removeStream(mediaServerId, type, mediaInfo.getApp(), mediaInfo.getStream());
JSONObject jsonObject = new JSONObject();
jsonObject.put("serverId", userSetting.getServerId());
jsonObject.put("app", onStreamChangedHookParam.getApp());
jsonObject.put("stream", onStreamChangedHookParam.getStream());
jsonObject.put("app", mediaInfo.getApp());
jsonObject.put("stream", mediaInfo.getStream());
jsonObject.put("register", false);
jsonObject.put("mediaServerId", mediaServerId);
redisCatchStorage.sendStreamChangeMsg(type, jsonObject);
// 冗余数据,自己系统中自用
redisCatchStorage.removePushListItem(onStreamChangedHookParam.getApp(), onStreamChangedHookParam.getStream(), mediaServerId);
redisCatchStorage.removePushListItem(mediaInfo.getApp(), mediaInfo.getStream(), mediaServerId);
}
}
}

View File

@@ -0,0 +1,80 @@
package com.genersoft.iot.vmp.service.impl;
import com.baomidou.dynamic.datasource.annotation.DS;
import com.genersoft.iot.vmp.service.IUserApiKeyService;
import com.genersoft.iot.vmp.storager.dao.UserApiKeyMapper;
import com.genersoft.iot.vmp.storager.dao.dto.UserApiKey;
import com.github.pagehelper.PageHelper;
import com.github.pagehelper.PageInfo;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cache.annotation.CacheEvict;
import org.springframework.cache.annotation.Cacheable;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
import java.util.List;
@Service
@DS("master")
public class UserApiKeyServiceImpl implements IUserApiKeyService {
@Autowired
UserApiKeyMapper userApiKeyMapper;
@Autowired
private RedisTemplate<Object, Object> redisTemplate;
@Override
public int addApiKey(UserApiKey userApiKey) {
return userApiKeyMapper.add(userApiKey);
}
@Override
public boolean isApiKeyExists(String apiKey) {
return userApiKeyMapper.isApiKeyExists(apiKey);
}
@Override
public PageInfo<UserApiKey> getUserApiKeys(int page, int count) {
PageHelper.startPage(page, count);
List<UserApiKey> userApiKeys = userApiKeyMapper.getUserApiKeys();
return new PageInfo<>(userApiKeys);
}
@Cacheable(cacheNames = "userApiKey", key = "#id", sync = true)
@Override
public UserApiKey getUserApiKeyById(Integer id) {
return userApiKeyMapper.selectById(id);
}
@CacheEvict(cacheNames = "userApiKey", key = "#id")
@Override
public int enable(Integer id) {
return userApiKeyMapper.enable(id);
}
@CacheEvict(cacheNames = "userApiKey", key = "#id")
@Override
public int disable(Integer id) {
return userApiKeyMapper.disable(id);
}
@CacheEvict(cacheNames = "userApiKey", key = "#id")
@Override
public int remark(Integer id, String remark) {
return userApiKeyMapper.remark(id, remark);
}
@CacheEvict(cacheNames = "userApiKey", key = "#id")
@Override
public int delete(Integer id) {
return userApiKeyMapper.delete(id);
}
@CacheEvict(cacheNames = "userApiKey", key = "#id")
@Override
public int reset(Integer id, String apiKey) {
return userApiKeyMapper.apiKey(id, apiKey);
}
}

View File

@@ -31,6 +31,11 @@ public class UserServiceImpl implements IUserService {
return userMapper.update(user) > 0;
}
@Override
public User getUserById(int id) {
return userMapper.selectById(id);
}
@Override
public User getUserByUsername(String username) {
return userMapper.getUserByUsername(username);

View File

@@ -5,12 +5,12 @@ import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.common.VideoManagerConstants;
import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.exception.ControllerException;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
import com.genersoft.iot.vmp.media.event.hook.Hook;
import com.genersoft.iot.vmp.media.event.hook.HookType;
import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
import com.genersoft.iot.vmp.media.event.hook.HookSubscribe;
import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.media.event.hook.Hook;
import com.genersoft.iot.vmp.media.event.hook.HookSubscribe;
import com.genersoft.iot.vmp.media.event.hook.HookType;
import com.genersoft.iot.vmp.media.service.IMediaServerService;
import com.genersoft.iot.vmp.service.bean.*;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
@@ -27,7 +27,6 @@ import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
import java.text.ParseException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
@@ -72,9 +71,6 @@ public class RedisGbPlayMsgListener implements MessageListener {
@Autowired
private RedisTemplate<Object, Object> redisTemplate;
@Autowired
private ZLMServerFactory zlmServerFactory;
@Autowired
private IMediaServerService mediaServerService;
@@ -101,7 +97,7 @@ public class RedisGbPlayMsgListener implements MessageListener {
}
public interface PlayMsgCallbackForStartSendRtpStream{
void handler(JSONObject jsonObject);
void handler();
}
public interface PlayMsgErrorCallback{
@@ -181,11 +177,10 @@ public class RedisGbPlayMsgListener implements MessageListener {
String serial = wvpRedisMsg.getSerial();
switch (wvpResult.getCode()) {
case 0:
JSONObject jsonObject = (JSONObject)wvpResult.getData();
PlayMsgCallbackForStartSendRtpStream playMsgCallback = callbacksForStartSendRtpStream.get(serial);
if (playMsgCallback != null) {
callbacksForError.remove(serial);
playMsgCallback.handler(jsonObject);
playMsgCallback.handler();
}
break;
case ERROR_CODE_MEDIA_SERVER_NOT_FOUND:
@@ -219,36 +214,24 @@ public class RedisGbPlayMsgListener implements MessageListener {
* 处理收到的请求推流的请求
*/
private void requestPushStreamMsgHand(RequestPushStreamMsg requestPushStreamMsg, String fromId, String serial) {
MediaServer mediaInfo = mediaServerService.getOne(requestPushStreamMsg.getMediaServerId());
if (mediaInfo == null) {
MediaServer mediaServer = mediaServerService.getOne(requestPushStreamMsg.getMediaServerId());
if (mediaServer == null) {
// TODO 回复错误
return;
}
String is_Udp = requestPushStreamMsg.isTcp() ? "0" : "1";
Map<String, Object> param = new HashMap<>();
param.put("vhost","__defaultVhost__");
param.put("app",requestPushStreamMsg.getApp());
param.put("stream",requestPushStreamMsg.getStream());
param.put("ssrc", requestPushStreamMsg.getSsrc());
param.put("dst_url",requestPushStreamMsg.getIp());
param.put("dst_port", requestPushStreamMsg.getPort());
param.put("is_udp", is_Udp);
param.put("src_port", requestPushStreamMsg.getSrcPort());
param.put("pt", requestPushStreamMsg.getPt());
param.put("use_ps", requestPushStreamMsg.isPs() ? "1" : "0");
param.put("only_audio", requestPushStreamMsg.isOnlyAudio() ? "1" : "0");
JSONObject jsonObject = zlmServerFactory.startSendRtpStream(mediaInfo, param);
SendRtpItem sendRtpItem = SendRtpItem.getInstance(requestPushStreamMsg);
try {
mediaServerService.startSendRtp(mediaServer, null, sendRtpItem);
}catch (ControllerException e) {
return;
}
// 回复消息
responsePushStream(jsonObject, fromId, serial);
}
private void responsePushStream(JSONObject content, String toId, String serial) {
WVPResult<JSONObject> result = new WVPResult<>();
result.setCode(0);
result.setData(content);
WvpRedisMsg response = WvpRedisMsg.getResponseInstance(userSetting.getServerId(), toId,
WvpRedisMsg response = WvpRedisMsg.getResponseInstance(userSetting.getServerId(), fromId,
WvpRedisMsgCmd.REQUEST_PUSH_STREAM, serial, JSON.toJSONString(result));
JSONObject jsonObject = (JSONObject)JSON.toJSON(response);
redisTemplate.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject);
@@ -317,7 +300,7 @@ public class RedisGbPlayMsgListener implements MessageListener {
* 将获取到的sendItem发送出去
*/
private void responseSendItem(MediaServer mediaServerItem, RequestSendItemMsg content, String toId, String serial) {
SendRtpItem sendRtpItem = zlmServerFactory.createSendRtpItem(mediaServerItem, content.getIp(),
SendRtpItem sendRtpItem = mediaServerService.createSendRtpItem(mediaServerItem, content.getIp(),
content.getPort(), content.getSsrc(), content.getPlatformId(),
content.getApp(), content.getStream(), content.getChannelId(),
content.getTcp(), content.getRtcp());
@@ -453,13 +436,8 @@ public class RedisGbPlayMsgListener implements MessageListener {
// TODO 回复错误
return;
}
Map<String, Object> param = new HashMap<>();
param.put("vhost","__defaultVhost__");
param.put("app",sendRtpItem.getApp());
param.put("stream",sendRtpItem.getStream());
param.put("ssrc", sendRtpItem.getSsrc());
if (zlmServerFactory.stopSendRtpStream(mediaInfo, param)) {
if (mediaServerService.stopSendRtp(mediaInfo, sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getSsrc())) {
logger.info("[REDIS 执行其他平台的请求停止推流] 成功: {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream());
// 发送redis消息
MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0,

View File

@@ -8,7 +8,6 @@ import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.media.service.IMediaServerService;
import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
import com.genersoft.iot.vmp.service.IStreamPushService;
import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
@@ -25,7 +24,6 @@ import org.springframework.stereotype.Component;
import javax.sip.InvalidArgumentException;
import javax.sip.SipException;
import java.text.ParseException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -57,9 +55,6 @@ public class RedisPushStreamCloseResponseListener implements MessageListener {
@Autowired
private IMediaServerService mediaServerService;
@Autowired
private ZLMServerFactory zlmServerFactory;
private Map<String, PushStreamResponseEvent> responseEvents = new ConcurrentHashMap<>();
@@ -88,16 +83,10 @@ public class RedisPushStreamCloseResponseListener implements MessageListener {
}
if (push.isSelf()) {
// 停止向上级推流
String streamId = sendRtpItem.getStream();
Map<String, Object> param = new HashMap<>();
param.put("vhost","__defaultVhost__");
param.put("app",sendRtpItem.getApp());
param.put("stream",streamId);
param.put("ssrc",sendRtpItem.getSsrc());
logger.info("[REDIS消息-推流结束] 停止向上级推流:{}", streamId);
logger.info("[REDIS消息-推流结束] 停止向上级推流:{}", sendRtpItem.getStream());
MediaServer mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
redisCatchStorage.deleteSendRTPServer(sendRtpItem.getPlatformId(), sendRtpItem.getChannelId(), sendRtpItem.getCallId(), sendRtpItem.getStream());
zlmServerFactory.stopSendRtpStream(mediaInfo, param);
mediaServerService.stopSendRtp(mediaInfo, sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getSsrc());
if (InviteStreamType.PUSH == sendRtpItem.getPlayType()) {
MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0,
sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getChannelId(),

View File

@@ -1,7 +1,11 @@
package com.genersoft.iot.vmp.service.redisMsg;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.media.zlm.ZLMMediaListManager;
import com.genersoft.iot.vmp.media.zlm.dto.ChannelOnlineEvent;
import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -11,6 +15,7 @@ import org.springframework.data.redis.connection.MessageListener;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
import java.text.ParseException;
import java.util.concurrent.ConcurrentLinkedQueue;
@@ -37,52 +42,56 @@ public class RedisStreamMsgListener implements MessageListener {
@Override
public void onMessage(Message message, byte[] bytes) {
// boolean isEmpty = taskQueue.isEmpty();
// taskQueue.offer(message);
// if (isEmpty) {
// taskExecutor.execute(() -> {
// while (!taskQueue.isEmpty()) {
// Message msg = taskQueue.poll();
// try {
// JSONObject steamMsgJson = JSON.parseObject(msg.getBody(), JSONObject.class);
// if (steamMsgJson == null) {
// logger.warn("[收到redis 流变化]消息解析失败");
// continue;
// }
// String serverId = steamMsgJson.getString("serverId");
//
// if (userSetting.getServerId().equals(serverId)) {
// // 自己发送的消息忽略即可
// continue;
// }
// logger.info("[收到redis 流变化] {}", new String(message.getBody()));
// String app = steamMsgJson.getString("app");
// String stream = steamMsgJson.getString("stream");
// boolean register = steamMsgJson.getBoolean("register");
// String mediaServerId = steamMsgJson.getString("mediaServerId");
// OnStreamChangedHookParam onStreamChangedHookParam = new OnStreamChangedHookParam();
// onStreamChangedHookParam.setSeverId(serverId);
// onStreamChangedHookParam.setApp(app);
// onStreamChangedHookParam.setStream(stream);
// onStreamChangedHookParam.setRegist(register);
// onStreamChangedHookParam.setMediaServerId(mediaServerId);
// onStreamChangedHookParam.setCreateStamp(System.currentTimeMillis()/1000);
// onStreamChangedHookParam.setAliveSecond(0L);
// onStreamChangedHookParam.setTotalReaderCount("0");
// onStreamChangedHookParam.setOriginType(0);
// onStreamChangedHookParam.setOriginTypeStr("0");
// onStreamChangedHookParam.setOriginTypeStr("unknown");
// if (register) {
// zlmMediaListManager.addPush(onStreamChangedHookParam);
// }else {
// zlmMediaListManager.removeMedia(app, stream);
// }
// }catch (Exception e) {
// logger.warn("[REDIS消息-流变化] 发现未处理的异常, \r\n{}", JSON.toJSONString(message));
// logger.error("[REDIS消息-流变化] 异常内容: ", e);
// }
// }
// });
// }
boolean isEmpty = taskQueue.isEmpty();
taskQueue.offer(message);
if (isEmpty) {
taskExecutor.execute(() -> {
while (!taskQueue.isEmpty()) {
Message msg = taskQueue.poll();
try {
JSONObject steamMsgJson = JSON.parseObject(msg.getBody(), JSONObject.class);
if (steamMsgJson == null) {
logger.warn("[收到redis 流变化]消息解析失败");
continue;
}
String serverId = steamMsgJson.getString("serverId");
if (userSetting.getServerId().equals(serverId)) {
// 自己发送的消息忽略即可
continue;
}
logger.info("[收到redis 流变化] {}", new String(message.getBody()));
String app = steamMsgJson.getString("app");
String stream = steamMsgJson.getString("stream");
boolean register = steamMsgJson.getBoolean("register");
String mediaServerId = steamMsgJson.getString("mediaServerId");
OnStreamChangedHookParam onStreamChangedHookParam = new OnStreamChangedHookParam();
onStreamChangedHookParam.setSeverId(serverId);
onStreamChangedHookParam.setApp(app);
onStreamChangedHookParam.setStream(stream);
onStreamChangedHookParam.setRegist(register);
onStreamChangedHookParam.setMediaServerId(mediaServerId);
onStreamChangedHookParam.setCreateStamp(System.currentTimeMillis()/1000);
onStreamChangedHookParam.setAliveSecond(0L);
onStreamChangedHookParam.setTotalReaderCount(0);
onStreamChangedHookParam.setOriginType(0);
onStreamChangedHookParam.setOriginTypeStr("0");
onStreamChangedHookParam.setOriginTypeStr("unknown");
ChannelOnlineEvent channelOnlineEventLister = zlmMediaListManager.getChannelOnlineEventLister(app, stream);
if ( channelOnlineEventLister != null) {
try {
channelOnlineEventLister.run(app, stream, serverId);;
} catch (ParseException e) {
logger.error("addPush: ", e);
}
zlmMediaListManager.removedChannelOnlineEventLister(app, stream);
}
}catch (Exception e) {
logger.warn("[REDIS消息-流变化] 发现未处理的异常, \r\n{}", JSON.toJSONString(message));
logger.error("[REDIS消息-流变化] 异常内容: ", e);
}
}
});
}
}
}

View File

@@ -6,11 +6,11 @@ import com.genersoft.iot.vmp.gb28181.bean.AlarmChannelMessage;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatformCatch;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
import com.genersoft.iot.vmp.media.event.media.MediaArrivalEvent;
import com.genersoft.iot.vmp.media.bean.MediaInfo;
import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.media.event.media.MediaArrivalEvent;
import com.genersoft.iot.vmp.media.zlm.dto.StreamAuthorityInfo;
import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam;
import com.genersoft.iot.vmp.service.bean.GPSMsgInfo;
import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
import com.genersoft.iot.vmp.storager.dao.dto.PlatformRegisterInfo;
@@ -91,7 +91,7 @@ public interface IRedisCatchStorage {
* @param app
* @param streamId
*/
void addStream(MediaServer mediaServerItem, String type, String app, String streamId, OnStreamChangedHookParam item);
void addStream(MediaServer mediaServerItem, String type, String app, String streamId, MediaInfo item);
/**
* 移除流信息从redis
@@ -108,7 +108,7 @@ public interface IRedisCatchStorage {
*/
void removeStream(String mediaServerId, String type);
List<OnStreamChangedHookParam> getStreams(String mediaServerId, String pull);
List<MediaInfo> getStreams(String mediaServerId, String pull);
/**
* 将device信息写入redis
@@ -134,7 +134,7 @@ public interface IRedisCatchStorage {
void resetAllSN();
OnStreamChangedHookParam getStreamInfo(String app, String streamId, String mediaServerId);
MediaInfo getStreamInfo(String app, String streamId, String mediaServerId);
void addCpuInfo(double cpuInfo);

View File

@@ -0,0 +1,61 @@
package com.genersoft.iot.vmp.storager.dao;
import com.genersoft.iot.vmp.storager.dao.dto.UserApiKey;
import org.apache.ibatis.annotations.*;
import org.springframework.stereotype.Repository;
import java.util.List;
@Mapper
@Repository
public interface UserApiKeyMapper {
@SelectKey(databaseId = "postgresql", statement = "SELECT currval('wvp_user_api_key_id_seq'::regclass) AS id", keyProperty = "id", before = false, resultType = Integer.class)
@SelectKey(databaseId = "mysql", statement = "SELECT LAST_INSERT_ID() AS id", keyProperty = "id", before = false, resultType = Integer.class)
@Insert("INSERT INTO wvp_user_api_key (user_id, app, api_key, expired_at, remark, enable, create_time, update_time) VALUES" +
"(#{userId}, #{app}, #{apiKey}, #{expiredAt}, #{remark}, #{enable}, #{createTime}, #{updateTime})")
int add(UserApiKey userApiKey);
@Update(value = {"<script>" +
"UPDATE wvp_user_api_key " +
"SET update_time = #{updateTime} " +
"<if test=\"app != null\">, app = #{app}</if>" +
"<if test=\"apiKey != null\">, api_key = #{apiKey}</if>" +
"<if test=\"expiredAt != null\">, expired_at = #{expiredAt}</if>" +
"<if test=\"remark != null\">, username = #{remark}</if>" +
"<if test=\"enable != null\">, enable = #{enable}</if>" +
"WHERE id = #{id}" +
" </script>"})
int update(UserApiKey userApiKey);
@Update("UPDATE wvp_user_api_key SET enable = true WHERE id = #{id}")
int enable(@Param("id") int id);
@Update("UPDATE wvp_user_api_key SET enable = false WHERE id = #{id}")
int disable(@Param("id") int id);
@Update("UPDATE wvp_user_api_key SET api_key = #{apiKey} WHERE id = #{id}")
int apiKey(@Param("id") int id, @Param("apiKey") String apiKey);
@Update("UPDATE wvp_user_api_key SET remark = #{remark} WHERE id = #{id}")
int remark(@Param("id") int id, @Param("remark") String remark);
@Delete("DELETE FROM wvp_user_api_key WHERE id = #{id}")
int delete(@Param("id") int id);
@Select("SELECT uak.id, uak.user_id, uak.app, uak.api_key, uak.expired_at, uak.remark, uak.enable, uak.create_time, uak.update_time, u.username AS username FROM wvp_user_api_key uak LEFT JOIN wvp_user u on u.id = uak.user_id WHERE uak.id = #{id}")
UserApiKey selectById(@Param("id") int id);
@Select("SELECT uak.id, uak.user_id, uak.app, uak.api_key, uak.expired_at, uak.remark, uak.enable, uak.create_time, uak.update_time, u.username AS username FROM wvp_user_api_key uak LEFT JOIN wvp_user u on u.id = uak.user_id WHERE uak.api_key = #{apiKey}")
UserApiKey selectByApiKey(@Param("apiKey") String apiKey);
@Select("SELECT uak.id, uak.user_id, uak.app, uak.api_key, uak.expired_at, uak.remark, uak.enable, uak.create_time, uak.update_time, u.username AS username FROM wvp_user_api_key uak LEFT JOIN wvp_user u on u.id = uak.user_id")
List<UserApiKey> selectAll();
@Select("SELECT uak.id, uak.user_id, uak.app, uak.api_key, uak.expired_at, uak.remark, uak.enable, uak.create_time, uak.update_time, u.username AS username FROM wvp_user_api_key uak LEFT JOIN wvp_user u on u.id = uak.user_id")
List<UserApiKey> getUserApiKeys();
@Select("SELECT COUNT(0) FROM wvp_user_api_key WHERE api_key = #{apiKey}")
boolean isApiKeyExists(@Param("apiKey") String apiKey);
}

View File

@@ -0,0 +1,151 @@
package com.genersoft.iot.vmp.storager.dao.dto;
import io.swagger.v3.oas.annotations.media.Schema;
import java.io.Serializable;
/**
* 用户信息
*/
@Schema(description = "用户ApiKey信息")
public class UserApiKey implements Serializable {
/**
* Id
*/
@Schema(description = "Id")
private int id;
/**
* 用户Id
*/
@Schema(description = "用户Id")
private int userId;
/**
* 应用名
*/
@Schema(description = "应用名")
private String app;
/**
* ApiKey
*/
@Schema(description = "ApiKey")
private String apiKey;
/**
* 过期时间null=永不过期)
*/
@Schema(description = "过期时间null=永不过期)")
private long expiredAt;
/**
* 备注信息
*/
@Schema(description = "备注信息")
private String remark;
/**
* 是否启用
*/
@Schema(description = "是否启用")
private boolean enable;
/**
* 创建时间
*/
@Schema(description = "创建时间")
private String createTime;
/**
* 更新时间
*/
@Schema(description = "更新时间")
private String updateTime;
/**
* 用户名
*/
private String username;
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public int getUserId() {
return userId;
}
public void setUserId(int userId) {
this.userId = userId;
}
public String getApp() {
return app;
}
public void setApp(String app) {
this.app = app;
}
public String getApiKey() {
return apiKey;
}
public void setApiKey(String apiKey) {
this.apiKey = apiKey;
}
public long getExpiredAt() {
return expiredAt;
}
public void setExpiredAt(long expiredAt) {
this.expiredAt = expiredAt;
}
public String getRemark() {
return remark;
}
public void setRemark(String remark) {
this.remark = remark;
}
public boolean isEnable() {
return enable;
}
public void setEnable(boolean enable) {
this.enable = enable;
}
public String getCreateTime() {
return createTime;
}
public void setCreateTime(String createTime) {
this.createTime = createTime;
}
public String getUpdateTime() {
return updateTime;
}
public void setUpdateTime(String updateTime) {
this.updateTime = updateTime;
}
public String getUsername() {
return username;
}
public void setUsername(String username) {
this.username = username;
}
}

View File

@@ -9,11 +9,11 @@ import com.genersoft.iot.vmp.gb28181.bean.AlarmChannelMessage;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatformCatch;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
import com.genersoft.iot.vmp.media.event.media.MediaArrivalEvent;
import com.genersoft.iot.vmp.media.bean.MediaInfo;
import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.media.event.media.MediaArrivalEvent;
import com.genersoft.iot.vmp.media.zlm.dto.StreamAuthorityInfo;
import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam;
import com.genersoft.iot.vmp.service.bean.GPSMsgInfo;
import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
@@ -315,14 +315,14 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage {
}
@Override
public void addStream(MediaServer mediaServerItem, String type, String app, String streamId, OnStreamChangedHookParam onStreamChangedHookParam) {
public void addStream(MediaServer mediaServerItem, String type, String app, String streamId, MediaInfo mediaInfo) {
// 查找是否使用了callID
StreamAuthorityInfo streamAuthorityInfo = getStreamAuthorityInfo(app, streamId);
String key = VideoManagerConstants.WVP_SERVER_STREAM_PREFIX + userSetting.getServerId() + "_" + type + "_" + app + "_" + streamId + "_" + mediaServerItem.getId();
if (streamAuthorityInfo != null) {
onStreamChangedHookParam.setCallId(streamAuthorityInfo.getCallId());
mediaInfo.setCallId(streamAuthorityInfo.getCallId());
}
redisTemplate.opsForValue().set(key, onStreamChangedHookParam);
redisTemplate.opsForValue().set(key, mediaInfo);
}
@Override
@@ -341,13 +341,13 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage {
}
@Override
public List<OnStreamChangedHookParam> getStreams(String mediaServerId, String type) {
List<OnStreamChangedHookParam> result = new ArrayList<>();
public List<MediaInfo> getStreams(String mediaServerId, String type) {
List<MediaInfo> result = new ArrayList<>();
String key = VideoManagerConstants.WVP_SERVER_STREAM_PREFIX + userSetting.getServerId() + "_" + type + "_*_*_" + mediaServerId;
List<Object> streams = RedisUtil.scan(redisTemplate, key);
for (Object stream : streams) {
OnStreamChangedHookParam onStreamChangedHookParam = (OnStreamChangedHookParam)redisTemplate.opsForValue().get(stream);
result.add(onStreamChangedHookParam);
MediaInfo mediaInfo = (MediaInfo)redisTemplate.opsForValue().get(stream);
result.add(mediaInfo);
}
return result;
}
@@ -466,14 +466,14 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage {
@Override
public OnStreamChangedHookParam getStreamInfo(String app, String streamId, String mediaServerId) {
public MediaInfo getStreamInfo(String app, String streamId, String mediaServerId) {
String scanKey = VideoManagerConstants.WVP_SERVER_STREAM_PREFIX + userSetting.getServerId() + "_*_" + app + "_" + streamId + "_" + mediaServerId;
OnStreamChangedHookParam result = null;
MediaInfo result = null;
List<Object> keys = RedisUtil.scan(redisTemplate, scanKey);
if (keys.size() > 0) {
String key = (String) keys.get(0);
result = JsonUtil.redisJsonToObject(redisTemplate, key, OnStreamChangedHookParam.class);
result = JsonUtil.redisJsonToObject(redisTemplate, key, MediaInfo.class);
}
return result;

View File

@@ -6,6 +6,7 @@ import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.exception.ControllerException;
import com.genersoft.iot.vmp.conf.security.JwtUtils;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
import com.genersoft.iot.vmp.media.event.hook.Hook;
import com.genersoft.iot.vmp.media.event.hook.HookType;
import com.genersoft.iot.vmp.media.zlm.SendRtpPortManager;
@@ -13,6 +14,7 @@ import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
import com.genersoft.iot.vmp.media.event.hook.HookSubscribe;
import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.media.service.IMediaServerService;
import com.genersoft.iot.vmp.service.bean.SSRCInfo;
import com.genersoft.iot.vmp.utils.redis.RedisUtil;
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
import com.genersoft.iot.vmp.vmanager.bean.OtherPsSendInfo;
@@ -44,9 +46,6 @@ public class PsController {
private final static Logger logger = LoggerFactory.getLogger(PsController.class);
@Autowired
private ZLMServerFactory zlmServerFactory;
@Autowired
private HookSubscribe hookSubscribe;
@@ -81,8 +80,8 @@ public class PsController {
logger.info("[第三方PS服务对接->开启收流和获取发流信息] isSend->{}, ssrc->{}, callId->{}, stream->{}, tcpMode->{}, callBack->{}",
isSend, ssrc, callId, stream, tcpMode==0?"UDP":"TCP被动", callBack);
MediaServer mediaServerItem = mediaServerService.getDefaultMediaServer();
if (mediaServerItem == null) {
MediaServer mediaServer = mediaServerService.getDefaultMediaServer();
if (mediaServer == null) {
throw new ControllerException(ErrorCode.ERROR100.getCode(),"没有可用的MediaServer");
}
if (stream == null) {
@@ -100,13 +99,14 @@ public class PsController {
}
}
String receiveKey = VideoManagerConstants.WVP_OTHER_RECEIVE_PS_INFO + userSetting.getServerId() + "_" + callId + "_" + stream;
int localPort = zlmServerFactory.createRTPServer(mediaServerItem, stream, ssrcInt, null, false, false, tcpMode);
if (localPort == 0) {
SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServer, stream, ssrcInt + "", false, false, null, false, false, false, tcpMode);
if (ssrcInfo.getPort() == 0) {
throw new ControllerException(ErrorCode.ERROR100.getCode(), "获取端口失败");
}
// 注册回调如果rtp收流超时则通过回调发送通知
if (callBack != null) {
Hook hook = Hook.getInstance(HookType.on_rtp_server_timeout, "rtp", stream, mediaServerItem.getId());
Hook hook = Hook.getInstance(HookType.on_rtp_server_timeout, "rtp", stream, mediaServer.getId());
// 订阅 zlm启动事件, 新的zlm也会从这里进入系统
hookSubscribe.addSubscribe(hook,
(hookData)->{
@@ -128,8 +128,8 @@ public class PsController {
});
}
OtherPsSendInfo otherPsSendInfo = new OtherPsSendInfo();
otherPsSendInfo.setReceiveIp(mediaServerItem.getSdpIp());
otherPsSendInfo.setReceivePort(localPort);
otherPsSendInfo.setReceiveIp(mediaServer.getSdpIp());
otherPsSendInfo.setReceivePort(ssrcInfo.getPort());
otherPsSendInfo.setCallId(callId);
otherPsSendInfo.setStream(stream);
@@ -138,9 +138,9 @@ public class PsController {
if (isSend != null && isSend) {
String key = VideoManagerConstants.WVP_OTHER_SEND_PS_INFO + userSetting.getServerId() + "_" + callId;
// 预创建发流信息
int port = sendRtpPortManager.getNextPort(mediaServerItem);
int port = sendRtpPortManager.getNextPort(mediaServer);
otherPsSendInfo.setSendLocalIp(mediaServerItem.getSdpIp());
otherPsSendInfo.setSendLocalIp(mediaServer.getSdpIp());
otherPsSendInfo.setSendLocalPort(port);
// 将信息写入redis中以备后用
redisTemplate.opsForValue().set(key, otherPsSendInfo, 300, TimeUnit.SECONDS);
@@ -156,7 +156,7 @@ public class PsController {
public void closeRtpServer(String stream) {
logger.info("[第三方PS服务对接->关闭收流] stream->{}", stream);
MediaServer mediaServerItem = mediaServerService.getDefaultMediaServer();
zlmServerFactory.closeRtpServer(mediaServerItem,stream);
mediaServerService.closeRTPServer(mediaServerItem, stream);
String receiveKey = VideoManagerConstants.WVP_OTHER_RECEIVE_PS_INFO + userSetting.getServerId() + "_*_" + stream;
List<Object> scan = RedisUtil.scan(redisTemplate, receiveKey);
if (!scan.isEmpty()) {
@@ -198,7 +198,7 @@ public class PsController {
app,
stream,
callId);
MediaServer mediaServerItem = mediaServerService.getDefaultMediaServer();
MediaServer mediaServer = mediaServerService.getDefaultMediaServer();
String key = VideoManagerConstants.WVP_OTHER_SEND_PS_INFO + userSetting.getServerId() + "_" + callId;
OtherPsSendInfo sendInfo = (OtherPsSendInfo)redisTemplate.opsForValue().get(key);
if (sendInfo == null) {
@@ -207,38 +207,16 @@ public class PsController {
sendInfo.setPushApp(app);
sendInfo.setPushStream(stream);
sendInfo.setPushSSRC(ssrc);
Map<String, Object> param;
param = new HashMap<>();
param.put("vhost","__defaultVhost__");
param.put("app",app);
param.put("stream",stream);
param.put("ssrc", ssrc);
param.put("dst_url", dstIp);
param.put("dst_port", dstPort);
String is_Udp = isUdp ? "1" : "0";
param.put("is_udp", is_Udp);
param.put("src_port", sendInfo.getSendLocalPort());
Boolean streamReady = mediaServerService.isStreamReady(mediaServerItem, app, stream);
SendRtpItem sendRtpItem = SendRtpItem.getInstance(app, stream, ssrc, dstIp, dstPort, !isUdp, sendInfo.getSendLocalPort(), null);
Boolean streamReady = mediaServerService.isStreamReady(mediaServer, app, stream);
if (streamReady) {
JSONObject jsonObject = zlmServerFactory.startSendRtpStream(mediaServerItem, param);
if (jsonObject.getInteger("code") == 0) {
logger.info("[第三方PS服务对接->发送流] 视频流发流成功callId->{}param->{}", callId, param);
redisTemplate.opsForValue().set(key, sendInfo);
}else {
redisTemplate.delete(key);
logger.info("[第三方PS服务对接->发送流] 视频流发流失败callId->{}, {}", callId, jsonObject.getString("msg"));
throw new ControllerException(ErrorCode.ERROR100.getCode(), "[视频流发流失败] " + jsonObject.getString("msg"));
}
mediaServerService.startSendRtp(mediaServer, null, sendRtpItem);
logger.info("[第三方PS服务对接->发送流] 视频流发流成功callId->{}param->{}", callId, sendRtpItem);
redisTemplate.opsForValue().set(key, sendInfo);
}else {
logger.info("[第三方PS服务对接->发送流] 流不存在等待流上线callId->{}", callId);
String uuid = UUID.randomUUID().toString();
Hook hook = Hook.getInstance(HookType.on_media_arrival, app, stream, mediaServerItem.getId());
Hook hook = Hook.getInstance(HookType.on_media_arrival, app, stream, mediaServer.getId());
dynamicTask.startDelay(uuid, ()->{
logger.info("[第三方PS服务对接->发送流] 等待流上线超时 callId->{}", callId);
redisTemplate.delete(key);
@@ -257,15 +235,9 @@ public class PsController {
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
JSONObject jsonObject = zlmServerFactory.startSendRtpStream(mediaServerItem, param);
if (jsonObject.getInteger("code") == 0) {
logger.info("[第三方PS服务对接->发送流] 视频流发流成功callId->{}param->{}", callId, param);
redisTemplate.opsForValue().set(key, finalSendInfo);
}else {
redisTemplate.delete(key);
logger.info("[第三方PS服务对接->发送流] 视频流发流失败callId->{}, {}", callId, jsonObject.getString("msg"));
throw new ControllerException(ErrorCode.ERROR100.getCode(), "[视频流发流失败] " + jsonObject.getString("msg"));
}
mediaServerService.startSendRtp(mediaServer, null, sendRtpItem);
logger.info("[第三方PS服务对接->发送流] 视频流发流成功callId->{}param->{}", callId, sendRtpItem);
redisTemplate.opsForValue().set(key, finalSendInfo);
hookSubscribe.removeSubscribe(hook);
});
}
@@ -288,7 +260,7 @@ public class PsController {
param.put("stream",sendInfo.getPushStream());
param.put("ssrc",sendInfo.getPushSSRC());
MediaServer mediaServerItem = mediaServerService.getDefaultMediaServer();
Boolean result = zlmServerFactory.stopSendRtpStream(mediaServerItem, param);
boolean result = mediaServerService.stopSendRtp(mediaServerItem, sendInfo.getPushApp(), sendInfo.getStream(), sendInfo.getPushSSRC());
if (!result) {
logger.info("[第三方PS服务对接->关闭发送流] 失败 callId->{}", callId);
throw new ControllerException(ErrorCode.ERROR100.getCode(), "停止发流失败");

View File

@@ -6,13 +6,14 @@ import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.exception.ControllerException;
import com.genersoft.iot.vmp.conf.security.JwtUtils;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
import com.genersoft.iot.vmp.media.event.hook.Hook;
import com.genersoft.iot.vmp.media.event.hook.HookType;
import com.genersoft.iot.vmp.media.zlm.SendRtpPortManager;
import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
import com.genersoft.iot.vmp.media.event.hook.HookSubscribe;
import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.media.service.IMediaServerService;
import com.genersoft.iot.vmp.service.bean.SSRCInfo;
import com.genersoft.iot.vmp.utils.redis.RedisUtil;
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
import com.genersoft.iot.vmp.vmanager.bean.OtherRtpSendInfo;
@@ -43,9 +44,6 @@ import java.util.concurrent.TimeUnit;
@RequestMapping("/api/rtp")
public class RtpController {
@Autowired
private ZLMServerFactory zlmServerFactory;
@Autowired
private SendRtpPortManager sendRtpPortManager;
@@ -81,8 +79,8 @@ public class RtpController {
logger.info("[第三方服务对接->开启收流和获取发流信息] isSend->{}, ssrc->{}, callId->{}, stream->{}, tcpMode->{}, callBack->{}",
isSend, ssrc, callId, stream, tcpMode==0?"UDP":"TCP被动", callBack);
MediaServer mediaServerItem = mediaServerService.getDefaultMediaServer();
if (mediaServerItem == null) {
MediaServer mediaServer = mediaServerService.getDefaultMediaServer();
if (mediaServer == null) {
throw new ControllerException(ErrorCode.ERROR100.getCode(),"没有可用的MediaServer");
}
if (stream == null) {
@@ -100,14 +98,14 @@ public class RtpController {
}
}
String receiveKey = VideoManagerConstants.WVP_OTHER_RECEIVE_RTP_INFO + userSetting.getServerId() + "_" + callId + "_" + stream;
int localPortForVideo = zlmServerFactory.createRTPServer(mediaServerItem, stream, ssrcInt, null, false, false, tcpMode);
int localPortForAudio = zlmServerFactory.createRTPServer(mediaServerItem, stream + "_a" , ssrcInt, null, false, false, tcpMode);
if (localPortForVideo == 0 || localPortForAudio == 0) {
SSRCInfo ssrcInfoForVideo = mediaServerService.openRTPServer(mediaServer, stream, ssrcInt + "",false,false, null, false, false, false, tcpMode);
SSRCInfo ssrcInfoForAudio = mediaServerService.openRTPServer(mediaServer, stream + "_a", ssrcInt + "", false, false, null, false,false,false, tcpMode);
if (ssrcInfoForVideo.getPort() == 0 || ssrcInfoForAudio.getPort() == 0) {
throw new ControllerException(ErrorCode.ERROR100.getCode(), "获取端口失败");
}
// 注册回调如果rtp收流超时则通过回调发送通知
if (callBack != null) {
Hook hook = Hook.getInstance(HookType.on_rtp_server_timeout, "rtp", stream, mediaServerItem.getId());
Hook hook = Hook.getInstance(HookType.on_rtp_server_timeout, "rtp", stream, mediaServer.getId());
// 订阅 zlm启动事件, 新的zlm也会从这里进入系统
hookSubscribe.addSubscribe(hook,
(hookData)->{
@@ -128,9 +126,9 @@ public class RtpController {
}
String key = VideoManagerConstants.WVP_OTHER_SEND_RTP_INFO + userSetting.getServerId() + "_" + callId;
OtherRtpSendInfo otherRtpSendInfo = new OtherRtpSendInfo();
otherRtpSendInfo.setReceiveIp(mediaServerItem.getSdpIp());
otherRtpSendInfo.setReceivePortForVideo(localPortForVideo);
otherRtpSendInfo.setReceivePortForAudio(localPortForAudio);
otherRtpSendInfo.setReceiveIp(mediaServer.getSdpIp());
otherRtpSendInfo.setReceivePortForVideo(ssrcInfoForVideo.getPort());
otherRtpSendInfo.setReceivePortForAudio(ssrcInfoForAudio.getPort());
otherRtpSendInfo.setCallId(callId);
otherRtpSendInfo.setStream(stream);
@@ -138,10 +136,10 @@ public class RtpController {
redisTemplate.opsForValue().set(receiveKey, otherRtpSendInfo);
if (isSend != null && isSend) {
// 预创建发流信息
int portForVideo = sendRtpPortManager.getNextPort(mediaServerItem);
int portForAudio = sendRtpPortManager.getNextPort(mediaServerItem);
int portForVideo = sendRtpPortManager.getNextPort(mediaServer);
int portForAudio = sendRtpPortManager.getNextPort(mediaServer);
otherRtpSendInfo.setSendLocalIp(mediaServerItem.getSdpIp());
otherRtpSendInfo.setSendLocalIp(mediaServer.getSdpIp());
otherRtpSendInfo.setSendLocalPortForVideo(portForVideo);
otherRtpSendInfo.setSendLocalPortForAudio(portForAudio);
// 将信息写入redis中以备后用
@@ -160,8 +158,8 @@ public class RtpController {
public void closeRtpServer(String stream) {
logger.info("[第三方服务对接->关闭收流] stream->{}", stream);
MediaServer mediaServerItem = mediaServerService.getDefaultMediaServer();
zlmServerFactory.closeRtpServer(mediaServerItem,stream);
zlmServerFactory.closeRtpServer(mediaServerItem,stream + "_a");
mediaServerService.closeRTPServer(mediaServerItem, stream);
mediaServerService.closeRTPServer(mediaServerItem, stream+ "_a");
String receiveKey = VideoManagerConstants.WVP_OTHER_RECEIVE_RTP_INFO + userSetting.getServerId() + "_*_" + stream;
List<Object> scan = RedisUtil.scan(redisTemplate, receiveKey);
if (scan.size() > 0) {
@@ -232,72 +230,31 @@ public class RtpController {
sendInfo.setPushStream(stream);
sendInfo.setPushSSRC(ssrc);
Map<String, Object> paramForAudio;
Map<String, Object> paramForVideo;
SendRtpItem sendRtpItemForVideo;
SendRtpItem sendRtpItemForAudio;
if (!ObjectUtils.isEmpty(dstIpForAudio) && dstPortForAudio > 0) {
paramForAudio = new HashMap<>();
paramForAudio.put("vhost","__defaultVhost__");
paramForAudio.put("app",app);
paramForAudio.put("stream",stream);
paramForAudio.put("ssrc", ssrc);
paramForAudio.put("dst_url", dstIpForAudio);
paramForAudio.put("dst_port", dstPortForAudio);
String is_Udp = isUdp ? "1" : "0";
paramForAudio.put("is_udp", is_Udp);
paramForAudio.put("src_port", sendInfo.getSendLocalPortForAudio());
paramForAudio.put("only_audio", "1");
if (ptForAudio != null) {
paramForAudio.put("pt", ptForAudio);
}
sendRtpItemForAudio = SendRtpItem.getInstance(app, stream, ssrc, dstIpForAudio, dstPortForAudio, !isUdp, sendInfo.getSendLocalPortForAudio(), ptForAudio);
} else {
paramForAudio = null;
sendRtpItemForAudio = null;
}
if (!ObjectUtils.isEmpty(dstIpForVideo) && dstPortForVideo > 0) {
paramForVideo = new HashMap<>();
paramForVideo.put("vhost","__defaultVhost__");
paramForVideo.put("app",app);
paramForVideo.put("stream",stream);
paramForVideo.put("ssrc", ssrc);
paramForVideo.put("dst_url", dstIpForVideo);
paramForVideo.put("dst_port", dstPortForVideo);
String is_Udp = isUdp ? "1" : "0";
paramForVideo.put("is_udp", is_Udp);
paramForVideo.put("src_port", sendInfo.getSendLocalPortForVideo());
paramForVideo.put("only_audio", "0");
if (ptForVideo != null) {
paramForVideo.put("pt", ptForVideo);
}
sendRtpItemForVideo = SendRtpItem.getInstance(app, stream, ssrc, dstIpForAudio, dstPortForAudio, !isUdp, sendInfo.getSendLocalPortForVideo(), ptForVideo);
} else {
paramForVideo = null;
sendRtpItemForVideo = null;
}
Boolean streamReady = mediaServerService.isStreamReady(mediaServer, app, stream);
if (streamReady) {
if (paramForVideo != null) {
JSONObject jsonObject = zlmServerFactory.startSendRtpStream(mediaServer, paramForVideo);
if (jsonObject.getInteger("code") == 0) {
logger.info("[第三方服务对接->发送流] 视频流发流成功callId->{}param->{}", callId, paramForVideo);
redisTemplate.opsForValue().set(key, sendInfo);
}else {
redisTemplate.delete(key);
logger.info("[第三方服务对接->发送流] 视频流发流失败callId->{}, {}", callId, jsonObject.getString("msg"));
throw new ControllerException(ErrorCode.ERROR100.getCode(), "[视频流发流失败] " + jsonObject.getString("msg"));
}
if (sendRtpItemForVideo != null) {
mediaServerService.startSendRtp(mediaServer, null, sendRtpItemForVideo);
logger.info("[第三方服务对接->发送流] 视频流发流成功callId->{}param->{}", callId, sendRtpItemForVideo);
redisTemplate.opsForValue().set(key, sendInfo);
}
if(paramForAudio != null) {
JSONObject jsonObject = zlmServerFactory.startSendRtpStream(mediaServer, paramForAudio);
if (jsonObject.getInteger("code") == 0) {
logger.info("[第三方服务对接->发送流] 音频流发流成功callId->{}param->{}", callId, paramForAudio);
redisTemplate.opsForValue().set(key, sendInfo);
}else {
redisTemplate.delete(key);
logger.info("[第三方服务对接->发送流] 音频流发流失败callId->{}, {}", callId, jsonObject.getString("msg"));
throw new ControllerException(ErrorCode.ERROR100.getCode(), "[音频流发流失败] " + jsonObject.getString("msg"));
}
if(sendRtpItemForAudio != null) {
mediaServerService.startSendRtp(mediaServer, null, sendRtpItemForAudio);
logger.info("[第三方服务对接->发送流] 音频流发流成功callId->{}param->{}", callId, sendRtpItemForAudio);
redisTemplate.opsForValue().set(key, sendInfo);
}
}else {
logger.info("[第三方服务对接->发送流] 流不存在等待流上线callId->{}", callId);
@@ -310,8 +267,8 @@ public class RtpController {
}, 10000);
// 订阅 zlm启动事件, 新的zlm也会从这里进入系统
OtherRtpSendInfo finalSendInfo = sendInfo;
hookSubscribe.removeSubscribe(hook);
OtherRtpSendInfo finalSendInfo = sendInfo;
hookSubscribe.addSubscribe(hook,
(hookData)->{
dynamicTask.stop(uuid);
@@ -321,27 +278,15 @@ public class RtpController {
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
if (paramForVideo != null) {
JSONObject jsonObject = zlmServerFactory.startSendRtpStream(mediaServer, paramForVideo);
if (jsonObject.getInteger("code") == 0) {
logger.info("[第三方服务对接->发送流] 视频流发流成功callId->{}param->{}", callId, paramForVideo);
redisTemplate.opsForValue().set(key, finalSendInfo);
}else {
redisTemplate.delete(key);
logger.info("[第三方服务对接->发送流] 视频流发流失败callId->{}, {}", callId, jsonObject.getString("msg"));
throw new ControllerException(ErrorCode.ERROR100.getCode(), "[视频流发流失败] " + jsonObject.getString("msg"));
}
if (sendRtpItemForVideo != null) {
mediaServerService.startSendRtp(mediaServer, null, sendRtpItemForVideo);
logger.info("[第三方服务对接->发送流] 视频流发流成功callId->{}param->{}", callId, sendRtpItemForVideo);
redisTemplate.opsForValue().set(key, finalSendInfo);
}
if(paramForAudio != null) {
JSONObject jsonObject = zlmServerFactory.startSendRtpStream(mediaServer, paramForAudio);
if (jsonObject.getInteger("code") == 0) {
logger.info("[第三方服务对接->发送流] 音频流发流成功callId->{}param->{}", callId, paramForAudio);
redisTemplate.opsForValue().set(key, finalSendInfo);
}else {
redisTemplate.delete(key);
logger.info("[第三方服务对接->发送流] 音频流发流失败callId->{}, {}", callId, jsonObject.getString("msg"));
throw new ControllerException(ErrorCode.ERROR100.getCode(), "[音频流发流失败] " + jsonObject.getString("msg"));
}
if(sendRtpItemForAudio != null) {
mediaServerService.startSendRtp(mediaServer, null, sendRtpItemForAudio);
logger.info("[第三方服务对接->发送流] 音频流发流成功callId->{}param->{}", callId, sendRtpItemForAudio);
redisTemplate.opsForValue().set(key, finalSendInfo);
}
hookSubscribe.removeSubscribe(hook);
});
@@ -359,19 +304,9 @@ public class RtpController {
if (sendInfo == null){
throw new ControllerException(ErrorCode.ERROR100.getCode(), "未开启发流");
}
Map<String, Object> param = new HashMap<>();
param.put("vhost","__defaultVhost__");
param.put("app",sendInfo.getPushApp());
param.put("stream",sendInfo.getPushStream());
param.put("ssrc",sendInfo.getPushSSRC());
MediaServer mediaServerItem = mediaServerService.getDefaultMediaServer();
Boolean result = zlmServerFactory.stopSendRtpStream(mediaServerItem, param);
if (!result) {
logger.info("[第三方服务对接->关闭发送流] 失败 callId->{}", callId);
throw new ControllerException(ErrorCode.ERROR100.getCode(), "停止发流失败");
}else {
logger.info("[第三方服务对接->关闭发送流] 成功 callId->{}", callId);
}
mediaServerService.stopSendRtp(mediaServerItem, sendInfo.getPushApp(), sendInfo.getPushStream(), sendInfo.getPushSSRC());
logger.info("[第三方服务对接->关闭发送流] 成功 callId->{}", callId);
redisTemplate.delete(key);
}

View File

@@ -0,0 +1,251 @@
package com.genersoft.iot.vmp.vmanager.user;
import com.genersoft.iot.vmp.conf.exception.ControllerException;
import com.genersoft.iot.vmp.conf.security.JwtUtils;
import com.genersoft.iot.vmp.conf.security.SecurityUtils;
import com.genersoft.iot.vmp.service.IUserApiKeyService;
import com.genersoft.iot.vmp.service.IUserService;
import com.genersoft.iot.vmp.storager.dao.dto.User;
import com.genersoft.iot.vmp.storager.dao.dto.UserApiKey;
import com.genersoft.iot.vmp.utils.DateUtil;
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
import com.github.pagehelper.PageInfo;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.security.SecurityRequirement;
import io.swagger.v3.oas.annotations.tags.Tag;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.web.bind.annotation.*;
import java.util.HashMap;
import java.util.Map;
@Tag(name = "用户ApiKey管理")
@RestController
@RequestMapping("/api/userApiKey")
public class UserApiKeyController {
public static final int EXPIRATION_TIME = Integer.MAX_VALUE;
@Autowired
private IUserService userService;
@Autowired
private IUserApiKeyService userApiKeyService;
/**
* 添加用户ApiKey
*
* @param userId
* @param app
* @param remark
* @param expiresAt
* @param enable
*/
@PostMapping("/add")
@Operation(summary = "添加用户ApiKey", security = @SecurityRequirement(name = JwtUtils.HEADER))
@Parameter(name = "userId", description = "用户Id", required = true)
@Parameter(name = "app", description = "应用名称", required = false)
@Parameter(name = "remark", description = "备注信息", required = false)
@Parameter(name = "expiredAt", description = "过期时间(不传代表永不过期)", required = false)
@Transactional
public synchronized void add(
@RequestParam(required = true) int userId,
@RequestParam(required = false) String app,
@RequestParam(required = false) String remark,
@RequestParam(required = false) String expiresAt,
@RequestParam(required = false) Boolean enable
) {
User user = userService.getUserById(userId);
if (user == null) {
throw new ControllerException(ErrorCode.ERROR400.getCode(), "用户不存在");
}
Long expirationTime = null;
if (expiresAt != null) {
long timestamp = DateUtil.yyyy_MM_dd_HH_mm_ssToTimestampMs(expiresAt);
expirationTime = (timestamp - System.currentTimeMillis()) / (60 * 1000);
if (expirationTime < 0) {
throw new ControllerException(ErrorCode.ERROR400.getCode(), "过期时间不能早于当前时间");
}
}
UserApiKey userApiKey = new UserApiKey();
userApiKey.setUserId(userId);
userApiKey.setApp(app);
userApiKey.setApiKey(null);
userApiKey.setRemark(remark);
userApiKey.setExpiredAt(expirationTime != null ? expirationTime : 0);
userApiKey.setEnable(enable != null ? enable : false);
userApiKey.setCreateTime(DateUtil.getNow());
userApiKey.setUpdateTime(DateUtil.getNow());
int addResult = userApiKeyService.addApiKey(userApiKey);
if (addResult <= 0) {
throw new ControllerException(ErrorCode.ERROR100);
}
String apiKey;
do {
Map<String, Object> extra = new HashMap<>(1);
extra.put("apiKeyId", userApiKey.getId());
apiKey = JwtUtils.createToken(user.getUsername(), expirationTime, extra);
} while (userApiKeyService.isApiKeyExists(apiKey));
int resetResult = userApiKeyService.reset(userApiKey.getId(), apiKey);
if (resetResult <= 0) {
throw new ControllerException(ErrorCode.ERROR100);
}
}
/**
* 分页查询ApiKey
*
* @param page 当前页
* @param count 每页查询数量
* @return 分页ApiKey列表
*/
@GetMapping("/userApiKeys")
@Operation(summary = "分页查询用户", security = @SecurityRequirement(name = JwtUtils.HEADER))
@Parameter(name = "page", description = "当前页", required = true)
@Parameter(name = "count", description = "每页查询数量", required = true)
@Transactional
public PageInfo<UserApiKey> userApiKeys(@RequestParam(required = true) int page, @RequestParam(required = true) int count) {
return userApiKeyService.getUserApiKeys(page, count);
}
@PostMapping("/enable")
@Operation(summary = "启用用户ApiKey", security = @SecurityRequirement(name = JwtUtils.HEADER))
@Parameter(name = "id", description = "用户ApiKeyId", required = true)
@Transactional
public void enable(@RequestParam(required = true) Integer id) {
// 获取当前登录用户id
int currenRoleId = SecurityUtils.getUserInfo().getRole().getId();
if (currenRoleId != 1) {
// 只用角色id为1才可以管理UserApiKey
throw new ControllerException(ErrorCode.ERROR403);
}
UserApiKey userApiKey = userApiKeyService.getUserApiKeyById(id);
if (userApiKey == null) {
throw new ControllerException(ErrorCode.ERROR400.getCode(), "ApiKey不存在");
}
int enableResult = userApiKeyService.enable(id);
if (enableResult <= 0) {
throw new ControllerException(ErrorCode.ERROR100);
}
}
@PostMapping("/disable")
@Operation(summary = "停用用户ApiKey", security = @SecurityRequirement(name = JwtUtils.HEADER))
@Parameter(name = "id", description = "用户ApiKeyId", required = true)
@Transactional
public void disable(@RequestParam(required = true) Integer id) {
// 获取当前登录用户id
int currenRoleId = SecurityUtils.getUserInfo().getRole().getId();
if (currenRoleId != 1) {
// 只用角色id为1才可以管理UserApiKey
throw new ControllerException(ErrorCode.ERROR403);
}
UserApiKey userApiKey = userApiKeyService.getUserApiKeyById(id);
if (userApiKey == null) {
throw new ControllerException(ErrorCode.ERROR400.getCode(), "ApiKey不存在");
}
int disableResult = userApiKeyService.disable(id);
if (disableResult <= 0) {
throw new ControllerException(ErrorCode.ERROR100);
}
}
@PostMapping("/reset")
@Operation(summary = "重置用户ApiKey", security = @SecurityRequirement(name = JwtUtils.HEADER))
@Parameter(name = "id", description = "用户ApiKeyId", required = true)
@Transactional
public void reset(@RequestParam(required = true) Integer id) {
// 获取当前登录用户id
int currenRoleId = SecurityUtils.getUserInfo().getRole().getId();
if (currenRoleId != 1) {
// 只用角色id为1才可以管理UserApiKey
throw new ControllerException(ErrorCode.ERROR403);
}
UserApiKey userApiKey = userApiKeyService.getUserApiKeyById(id);
if (userApiKey == null) {
throw new ControllerException(ErrorCode.ERROR400.getCode(), "ApiKey不存在");
}
User user = userService.getUserById(userApiKey.getUserId());
if (user == null) {
throw new ControllerException(ErrorCode.ERROR400.getCode(), "用户不存在");
}
Long expirationTime = null;
if (userApiKey.getExpiredAt() > 0) {
long timestamp = userApiKey.getExpiredAt();
expirationTime = (timestamp - System.currentTimeMillis()) / (60 * 1000);
if (expirationTime < 0) {
throw new ControllerException(ErrorCode.ERROR400.getCode(), "ApiKey已失效");
}
}
String apiKey;
do {
Map<String, Object> extra = new HashMap<>(1);
extra.put("apiKeyId", userApiKey.getId());
apiKey = JwtUtils.createToken(user.getUsername(), expirationTime, extra);
} while (userApiKeyService.isApiKeyExists(apiKey));
int resetResult = userApiKeyService.reset(id, apiKey);
if (resetResult <= 0) {
throw new ControllerException(ErrorCode.ERROR100);
}
}
@PostMapping("/remark")
@Operation(summary = "备注用户ApiKey", security = @SecurityRequirement(name = JwtUtils.HEADER))
@Parameter(name = "id", description = "用户ApiKeyId", required = true)
@Parameter(name = "remark", description = "用户ApiKey备注", required = false)
@Transactional
public void remark(@RequestParam(required = true) Integer id, @RequestParam(required = false) String remark) {
// 获取当前登录用户id
int currenRoleId = SecurityUtils.getUserInfo().getRole().getId();
if (currenRoleId != 1) {
// 只用角色id为1才可以管理UserApiKey
throw new ControllerException(ErrorCode.ERROR403);
}
UserApiKey userApiKey = userApiKeyService.getUserApiKeyById(id);
if (userApiKey == null) {
throw new ControllerException(ErrorCode.ERROR400.getCode(), "ApiKey不存在");
}
int remarkResult = userApiKeyService.remark(id, remark);
if (remarkResult <= 0) {
throw new ControllerException(ErrorCode.ERROR100);
}
}
@DeleteMapping("/delete")
@Operation(summary = "删除用户ApiKey", security = @SecurityRequirement(name = JwtUtils.HEADER))
@Parameter(name = "id", description = "用户ApiKeyId", required = true)
@Transactional
public void delete(@RequestParam(required = true) Integer id) {
// 获取当前登录用户id
int currenRoleId = SecurityUtils.getUserInfo().getRole().getId();
if (currenRoleId != 1) {
// 只用角色id为1才可以管理UserApiKey
throw new ControllerException(ErrorCode.ERROR403);
}
UserApiKey userApiKey = userApiKeyService.getUserApiKeyById(id);
if (userApiKey == null) {
throw new ControllerException(ErrorCode.ERROR400.getCode(), "ApiKey不存在");
}
int deleteResult = userApiKeyService.delete(id);
if (deleteResult <= 0) {
throw new ControllerException(ErrorCode.ERROR100);
}
}
}

View File

@@ -10,6 +10,8 @@ spring:
multipart:
max-file-size: 10MB
max-request-size: 100MB
cache:
type: redis
# REDIS数据库配置
redis:
# [必须修改] Redis服务器IP, REDIS安装在本机的,使用127.0.0.1

View File

@@ -4,6 +4,8 @@ spring:
multipart:
max-file-size: 10MB
max-request-size: 100MB
cache:
type: redis
# REDIS数据库配置
redis:
# [必须修改] Redis服务器IP, REDIS安装在本机的,使用127.0.0.1

View File

@@ -0,0 +1 @@
{"keys":[{"kty":"RSA","kid":"3e79646c4dbc408383a9eed09f2b85ae","n":"rThRAlbMRceko3NkymeSoN2ICVaDlNBLWv3cyLUeixjWcmuhnPv2JpXmgoxezKZfhH_0sChBof--BaaqSUukl9wWMW1bWCyFyU5qNczhQk3ANlhaLiSgXsqD-NKI3ObJjB-26fnOZb9QskCqrPW1lEtwgb9-skMAfGlh5kaDOKjYKI64DPSMMXpSiJEDM-7DK-TFfm0QfPcoH-k-1C02NHlGWehVUn9FUJ0TAiDxpKj28qOmYh7s1M7OU_h-Sso7LM-5zbftpcO6SINe81Gw9JPd7rKPCRxkw8ROSCCq-JH_zshM80kTK2nWcseGvhQ_4vKQIBp9PrAgCrGJHM160w","e":"AQAB","d":"AwS2NKo6iQS_k7GREg3X-kGh-zest00h4wYFcOHnFFlsczX47PlfArEeASxdAofrpi1soB0zd5UzRHnxAbH1vkexg076hoDQG__nzeQyEKu2K7xCZgdxW_V_cziH9gF3hZ-P2mfl9tPsng6OatElRt5BqaEingyY15ImiJK1-qi_LTx4gfwRfquKLbUgqJR4Tf6eKlwOzEo41Ilo26gnojNzWryB_XHG7lj6SngPDBJp7ty32je4Fv3A3hXt7JHDwloww6-xiRtUflDpSec4A-o-PHgbfoYLyM7mM4BDt4PM54EHm4u8WzypG0wNKDTiq4KSapei5xDbiG3RpngvAQ","p":"5kUHkGxnZvZT762Ex-0De2nYodAbbZNVR-eIPx2ng2VZmEbAU3cp_DxigpXWyQ0FwJ2Me8GvxnlbxJ7k7d-4AV2X8q6Q-UqXajHdudRU_QX05kPEgZ3xtPk5ekI0-u1BEQT7pY_gxlZC2mzXAcVLd-LwbVPuQEba5S4JMsjcHUE","q":"wJNa06-qZ2tWncGl7cfJdO-SJ_H3taowMhh-RsJmeVefjjN3pfVjjE0wG_rIP-BjjCB9OhvSnI8LDjoNu8uIg090DYnA6IUfZpWo3zjgedeyqQyXFVjjVQkn98zgp5NFLpuitZsl9-EHhh7JaZDCwaJ527MN3VCoQxeI75ggjxM","dp":"HQTH_kBbC5OxYjwIxrUswinFnia-viFaFvSrq-CN0rY8Az-vTxVuWhY2B-TgK3gTqIFyScpP34A9u1qW2Q9fffSQiInNRU1MJZrhKWED0NsmULprkjYYVsktoCWlzZWGpKFvIR8voW8Pf71FnziA2TvlNrHkDX-gaE9T422Cp8E","dq":"owJYqMWS1dYLTKBlx0ANbHl6W2u7xb_Y6h7HjTfzLBWazvEL_6QW7uVLqvN-XGuheDTsK6rvfWyr7BACHgvsc1JnJyqK64f8C4b1mnZ3tUt7RROONBi43ftRJLX9GHxV3F0LvvQkkI2gI8ydq0lJQkU5J1qKiuNCewBJ_p3kOZc","qi":"hNAZV6aWEEWfB1HkrfdtO6sjq9ceEod55ez82I1ZNgoKle8gpRkh3vw2EIJ_5lcw57s5rw8G-sCQPG1AQSZ6u9aURwHkIXjpIhLAlv6gvKkCh0smPPvnSiltJKOJsuHkrD6rGkV1f-MlCS51lKlk9xShQzkRidkNd4BUh0a7ktA"}]}