@@ -3,21 +3,22 @@ package com.genersoft.iot.vmp.vmanager.rtp;
import com.alibaba.fastjson2.JSONObject ;
import com.genersoft.iot.vmp.common.VideoManagerConstants ;
import com.genersoft.iot.vmp.conf.DynamicTask ;
import com.genersoft.iot.vmp.conf.SipConfig ;
import com.genersoft.iot.vmp.conf.UserSetting ;
import com.genersoft.iot.vmp.conf.VersionInfo ;
import com.genersoft.iot.vmp.conf.exception.ControllerException ;
import com.genersoft.iot.vmp.media.zlm.SendRtpPortManager ;
import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory ;
import com.genersoft.iot.vmp.media.zlm.SendRtpPortManager ;
import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory ;
import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe ;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory ;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForRtpServerTimeout ;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange ;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem ;
import com.genersoft.iot.vmp.media.zlm.dto.hook.OnRtpServerTimeoutHookParam ;
import com.genersoft.iot.vmp.service.IDeviceChannelService ;
import com.genersoft.iot.vmp.service.IDeviceService ;
import com.genersoft.iot.vmp.service.IMediaServerService ;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage ;
import com.genersoft.iot.vmp.utils.redis.RedisUtil ;
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode ;
import com.genersoft.iot.vmp.vmanager.bean.OtherRtpSendInfo ;
import io.swagger.v3.oas.annotations.Operation ;
@@ -28,14 +29,16 @@ import okhttp3.Request;
import org.slf4j.Logger ;
import org.slf4j.LoggerFactory ;
import org.springframework.beans.factory.annotation.Autowired ;
import org.springframework.beans.factory.annotation.Value ;
import org.springframework.data.redis.core.RedisTemplate ;
import org.springframework.util.ObjectUtils ;
import org.springframework.web.bind.annotation.* ;
import java.io.IOException ;
import java.util.HashMap ;
import java.util.List ;
import java.util.Map ;
import java.util.UUID ;
import java.util.concurrent.TimeUnit ;
import java.util.concurrent.TimeUnit ;
@SuppressWarnings ( " rawtypes " )
@@ -60,20 +63,11 @@ public class RtpController {
private IMediaServerService mediaServerService ;
@Autowired
private VersionInfo versionInfo ;
@Autowired
private SipConfig sipConfig ;
private SendRtpPortManager sendRtpPortManager ;
@Autowired
private UserSetting userSetting ;
@Autowired
private IDeviceService deviceService ;
@Autowired
private IDeviceChannelService channelService ;
@Autowired
private DynamicTask dynamicTask ;
@@ -82,14 +76,6 @@ public class RtpController {
private RedisTemplate < Object , Object > redisTemplate ;
@Value ( " ${server.port} " )
private int serverPort ;
@Autowired
private IRedisCatchStorage redisCatchStorage ;
@GetMapping ( value = " /receive/open " )
@ResponseBody
@Operation ( summary = " 开启收流和获取发流信息 " )
@@ -121,12 +107,16 @@ public class RtpController {
} catch ( NumberFormatException e ) {
throw new ControllerException ( ErrorCode . ERROR100 . getCode ( ) , " ssrc格式错误 " ) ;
}
}
int localPort = zlmServerFactory . createRTPServer ( mediaServerItem , stream , ssrcInt , null , false , tcpMode ) ;
Str ing receiveKey = VideoManagerConstants . WVP_OTHER_RECEIVE_RTP_INFO + userSetting . getServerId ( ) + " _ " + callId + " _ " + stream ;
int localPortForVideo = zlmServerFactory . createRTPServer ( mediaServerItem , stream , ssrcInt , null , false , tcpMode ) ;
int localPortForAudio = zlmServerFactory . createRTPServer ( mediaServerItem , stream + " _a " , ssrcInt , null , false , tcpMode ) ;
if ( localPortForVideo = = 0 | | localPortForAudio = = 0 ) {
throw new ControllerException ( ErrorCode . ERROR100 . getCode ( ) , " 获取端口失败 " ) ;
}
// 注册回调如果rtp收流超时则通过回调发送通知
if ( callBack ! = null ) {
HookSubscribeForRtpServerTimeout hookSubscribeForRtpServerTimeout = HookSubscribeFactory . on_rtp_server_timeout ( ssrc , null , mediaServerItem . getId ( ) ) ;
HookSubscribeForRtpServerTimeout hookSubscribeForRtpServerTimeout = HookSubscribeFactory . on_rtp_server_timeout ( stream , String . valueOf ( ssrcInt ) , mediaServerItem . getId ( ) ) ;
// 订阅 zlm启动事件, 新的zlm也会从这里进入系统
hookSubscribe . addSubscribe ( hookSubscribeForRtpServerTimeout ,
( mediaServerItemInUse , hookParam ) - > {
@@ -140,22 +130,33 @@ public class RtpController {
try {
client . newCall ( request ) . execute ( ) ;
} catch ( IOException e ) {
logger . error ( " [开启收流和获取发流信息] 等待收流超时 callId->{}, 发送回调失败 " , callId , e ) ;
logger . error ( " [第三方服务对接-> 开启收流和获取发流信息] 等待收流超时 callId->{}, 发送回调失败 " , callId , e ) ;
}
hookSubscribe . removeSubscribe ( hookSubscribeForRtpServerTimeout ) ;
}
} ) ;
}
String key = VideoManagerConstants . WVP_OTHER_SEND_RTP_INFO + userSetting . getServerId ( ) + callId ;
OtherRtpSendInfo otherRtpSendInfo = new OtherRtpSendInfo ( ) ;
otherRtpSendInfo . setReceiveIp ( mediaServerItem . getSdpIp ( ) ) ;
otherRtpSendInfo . setReceivePort ( localPort ) ;
otherRtpSendInfo . setReceivePortForVideo ( localPortForVideo ) ;
otherRtpSendInfo . setReceivePortForAudio ( localPortForAudio ) ;
otherRtpSendInfo . setCallId ( callId ) ;
otherRtpSendInfo . setStream ( stream ) ;
// 将信息写入redis中, 以备后用
redisTemplate . opsForValue ( ) . set ( receiveKey , otherRtpSendInfo ) ;
if ( isSend ! = null & & isSend ) {
int port = sendRtpPortManager . getNextPort ( mediaServerItem . getId ( ) ) ;
otherRtpSendInfo . setIp ( mediaServerItem . getSdpIp ( ) ) ;
otherRtpSendInfo . s etPort( port ) ;
logger . info ( " [开启收流和获取发流信息] 结果, callId->{}, {} " , callId , otherRtpSendInfo ) ;
Str ing key = VideoManagerConstants . WVP_OTHER_SEND_RTP_INFO + userSetting . getServer Id ( ) + " _ " + callId ;
// 预创建发流信息
int portForVideo = sendRtpPortManager . g etNext Port( mediaServerItem . getId ( ) ) ;
int portForAudio = sendRtpPortManager . getNextPort ( mediaServerItem . getId ( ) ) ;
// 将信息写入redis中, 以备后用
redisTemplate . opsForValue ( ) . set ( key , otherRtpSendInfo , 300 , TimeUnit . SECONDS ) ;
otherRtpSendInfo . setSendLocalIp ( mediaServerItem . getSdpIp ( ) ) ;
otherRtpSendInfo . setSendLocalPortForVideo ( portForVideo ) ;
otherRtpSendInfo . setSendLocalPortForAudio ( portForAudio ) ;
logger . info ( " [第三方服务对接->开启收流和获取发流信息] 结果, callId->{}, {} " , callId , otherRtpSendInfo ) ;
}
// 将信息写入redis中, 以备后用
redisTemplate . opsForValue ( ) . set ( key , otherRtpSendInfo , 300 , TimeUnit . SECONDS ) ;
@@ -170,28 +171,69 @@ public class RtpController {
logger . info ( " [第三方服务对接->关闭收流] stream->{} " , stream ) ;
MediaServerItem mediaServerItem = mediaServerService . getDefaultMediaServer ( ) ;
zlmServerFactory . closeRtpServer ( mediaServerItem , stream ) ;
zlmServerFactory . closeRtpServer ( mediaServerItem , stream + " _a " ) ;
String receiveKey = VideoManagerConstants . WVP_OTHER_RECEIVE_RTP_INFO + userSetting . getServerId ( ) + " _*_ " + stream ;
List < Object > scan = RedisUtil . scan ( redisTemplate , receiveKey ) ;
if ( scan . size ( ) > 0 ) {
for ( Object key : scan ) {
// 将信息写入redis中, 以备后用
redisTemplate . delete ( key ) ;
}
}
}
@GetMapping ( value = " /send/start " )
@ResponseBody
@Operation ( summary = " 发送流 " )
@Parameter ( name = " ssrc " , description = " 发送流的SSRC " , required = true )
@Parameter ( name = " ip " , description = " 目标IP " , required = tru e)
@Parameter ( name = " port " , description = " 目标端口 " , required = tru e)
@Parameter ( name = " dstIpForAudio " , description = " 目标音频收流 IP " , required = fals e)
@Parameter ( name = " dstIpForVideo " , description = " 目标视频收流IP " , required = fals e)
@Parameter ( name = " dstPortForAudio " , description = " 目标音频收流端口 " , required = false )
@Parameter ( name = " dstPortForVideo " , description = " 目标视频收流端口 " , required = false )
@Parameter ( name = " app " , description = " 待发送应用名 " , required = true )
@Parameter ( name = " stream " , description = " 待发送流Id " , required = true )
@Parameter ( name = " callId " , description = " 整个过程的唯一标识,不传则使用随机端口发流 " , required = true )
@Parameter ( name = " onlyAudio " , description = " 是否只有音频 " , required = true )
@Parameter ( name = " isUdp " , description = " 是否为UDP " , required = true )
@Parameter ( name = " streamType " , description = " 流类型, 1为es流, 2为ps流, 默认es流 " , required = false )
public void sendRTP ( String ssrc , String ip , Integer port , String app , String stream , String callId , Boolean onlyAudio , Boolean isUdp , @RequestParam ( required = false ) Integer streamType ) {
logger . info ( " [第三方服务对接->发送流] ssrc->{}, ip->{}, port->{}, app->{}, stream->{}, callId->{}, onlyAudio->{}, streamType->{} " ,
ssrc , ip , port , app , s tream , callId , onlyAudio , streamType = = 1 ? " ES " : " PS " ) ;
if ( ObjectUtils . isEmpty ( streamType ) ) {
streamType = 1 ;
@Parameter ( name = " ptForAudio " , description = " rtp的音频pt " , required = false )
@Parameter ( name = " ptForVideo " , description = " rtp的视频pt " , required = false )
public void sendRTP ( String ssrc ,
@RequestParam ( required = false ) S tring dstIpForAudio ,
@RequestParam ( required = false ) String dstIpForVideo ,
@RequestParam ( required = false ) Integer dstPortForAudio ,
@RequestParam ( required = false ) Integer dstPortForVideo ,
String app ,
String stream ,
String callId ,
Boolean isUdp ,
@RequestParam ( required = false ) Integer ptForAudio ,
@RequestParam ( required = false ) Integer ptForVideo
) {
logger . info ( " [第三方服务对接->发送流] " +
" ssrc->{}, \ r \ n " +
" dstIpForAudio->{}, \ n " +
" dstIpForAudio->{}, \ n " +
" dstPortForAudio->{}, \ n " +
" dstPortForVideo->{}, \ n " +
" app->{}, \ n " +
" stream->{}, \ n " +
" callId->{}, \ n " +
" ptForAudio->{}, \ n " +
" ptForVideo->{} " ,
ssrc ,
dstIpForAudio ,
dstIpForVideo ,
dstPortForAudio ,
dstPortForVideo ,
app ,
stream ,
callId ,
ptForAudio ,
ptForVideo ) ;
if ( ! ( ( dstPortForAudio > 0 & & ! ObjectUtils . isEmpty ( dstPortForAudio ) | | ( dstPortForVideo > 0 & & ! ObjectUtils . isEmpty ( dstIpForVideo ) ) ) ) ) {
throw new ControllerException ( ErrorCode . ERROR400 . getCode ( ) , " 至少应该存在一组音频或视频发送参数 " ) ;
}
MediaServerItem mediaServerItem = mediaServerService . getDefaultMediaServer ( ) ;
String key = VideoManagerConstants . WVP_OTHER_SEND_RTP_INFO + userSetting . getServerId ( ) + callId ;
String key = VideoManagerConstants . WVP_OTHER_SEND_RTP_INFO + userSetting . getServerId ( ) + " _ " + callId ;
OtherRtpSendInfo sendInfo = ( OtherRtpSendInfo ) redisTemplate . opsForValue ( ) . get ( key ) ;
if ( sendInfo = = null ) {
sendInfo = new OtherRtpSendInfo ( ) ;
@@ -200,40 +242,131 @@ public class RtpController {
sendInfo . setPushStream ( stream ) ;
sendInfo . setPushSSRC ( ssrc ) ;
Map < String , Object > param = new HashMap < > ( 12 ) ;
param . put ( " vhost " , " __defaultVhost__ " ) ;
param . put ( " app " , app ) ;
param. put ( " stream " , stream ) ;
param . put ( " ssrc " , ssrc ) ;
Map < String , Object > paramForAudio ;
Map < String , Object > paramForVideo ;
if ( ! ObjectUtils . isEmpty ( dstIpForAudio ) & & dstPortForAudio > 0 ) {
paramForAudio = new HashMap < > ( ) ;
paramForAudio . put ( " vhost " , " __defaultVhost__ " ) ;
paramForAudio . put ( " app " , app ) ;
paramForAudio . put ( " stream " , stream ) ;
paramForAudio . put ( " ssrc " , ssrc ) ;
param . put ( " dst_url " , ip ) ;
param . put ( " dst_port " , port ) ;
String is_Udp = isUdp ? " 1 " : " 0 " ;
param . put ( " is_udp " , is_Udp ) ;
param . put ( " src_port " , sendInfo . getPort ( ) ) ;
param . put ( " use_ps " , streamType = = 2 ? " 1 " : " 0 " ) ;
param . put ( " only_audio " , onlyAudio ? " 1 " : " 0 " ) ;
paramForAudio . put ( " dst_url " , dstIpForAudio ) ;
paramForAudio . put ( " dst_port " , dstPortForAudio ) ;
String is_Udp = isUdp ? " 1 " : " 0 " ;
paramForAudio . put ( " is_udp " , is_Udp ) ;
paramForAudio . put ( " src_port " , sendInfo . getSendLocalPortForAudio ( ) ) ;
paramForAudio . put ( " use_ps " , " 0 " ) ;
paramForAudio . put ( " only_audio " , " 1 " ) ;
if ( ptForAudio ! = null ) {
paramForAudio . put ( " pt " , ptForAudio ) ;
}
JSONObject jsonObject = zlmServerFactory . startSendRtpStream ( mediaServerItem , param ) ;
if ( jsonObject . getInteger ( " code " ) = = 0 ) {
logger . info ( " [第三方服务对接->发送流] 发流成功, callId->{} " , callId ) ;
redisTemplate . opsForValue ( ) . set ( key , sendInfo ) ;
} else {
paramForAudio = null ;
}
if ( ! ObjectUtils . isEmpty ( dstIpForVideo ) & & dstPortForVideo > 0 ) {
paramForVideo = new HashMap < > ( ) ;
paramForVideo . put ( " vhost " , " __defaultVhost__ " ) ;
paramForVideo . put ( " app " , app ) ;
paramForVideo . put ( " stream " , stream ) ;
paramForVideo . put ( " ssrc " , ssrc ) ;
paramForVideo . put ( " dst_url " , dstIpForVideo ) ;
paramForVideo . put ( " dst_port " , dstPortForVideo ) ;
String is_Udp = isUdp ? " 1 " : " 0 " ;
paramForVideo . put ( " is_udp " , is_Udp ) ;
paramForVideo . put ( " src_port " , sendInfo . getSendLocalPortForVideo ( ) ) ;
paramForVideo . put ( " use_ps " , " 0 " ) ;
paramForVideo . put ( " only_audio " , " 0 " ) ;
if ( ptForVideo ! = null ) {
paramForVideo . put ( " pt " , ptForVideo ) ;
}
} else {
paramForVideo = null ;
}
Boolean streamReady = zlmServerFactory . isStreamReady ( mediaServerItem , app , stream ) ;
if ( streamReady ) {
if ( paramForVideo ! = null ) {
JSONObject jsonObject = zlmServerFactory . startSendRtpStream ( mediaServerItem , paramForVideo ) ;
if ( jsonObject . getInteger ( " code " ) = = 0 ) {
logger . info ( " [第三方服务对接->发送流] 视频流发流成功, callId->{}, param->{} " , callId , paramForVideo ) ;
redisTemplate . opsForValue ( ) . set ( key , sendInfo ) ;
} else {
redisTemplate . delete ( key ) ;
logger . info ( " [第三方服务对接->发送流] 视频流发流失败, callId->{}, {} " , callId , jsonObject . getString ( " msg " ) ) ;
throw new ControllerException ( ErrorCode . ERROR100 . getCode ( ) , " [视频流发流失败] " + jsonObject . getString ( " msg " ) ) ;
}
}
if ( paramForAudio ! = null ) {
JSONObject jsonObject = zlmServerFactory . startSendRtpStream ( mediaServerItem , paramForAudio ) ;
if ( jsonObject . getInteger ( " code " ) = = 0 ) {
logger . info ( " [第三方服务对接->发送流] 音频流发流成功, callId->{}, param->{} " , callId , paramForAudio ) ;
redisTemplate . opsForValue ( ) . set ( key , sendInfo ) ;
} else {
redisTemplate . delete ( key ) ;
logger . info ( " [第三方服务对接->发送流] 音频流发流失败, callId->{}, {} " , callId , jsonObject . getString ( " msg " ) ) ;
throw new ControllerException ( ErrorCode . ERROR100 . getCode ( ) , " [音频流发流失败] " + jsonObject . getString ( " msg " ) ) ;
}
}
} else {
redisTemplate . delete ( key ) ;
logger . info ( " [第三方服务对接->发送流] 发流失败, callId->{}, {} " , callId , jsonObject . ge tString( " msg " ) ) ;
throw new ControllerException ( ErrorCode . ERROR100 . getCode ( ) , " [发流失败] " + jsonObject . getString ( " msg " ) ) ;
logger . info ( " [第三方服务对接->发送流] 流不存在, 等待流上线, callId->{} " , callId ) ;
String uuid = UUID . randomUUID ( ) . to String ( ) ;
HookSubscribeForStreamChange hookSubscribeForStreamChange = HookSubscribeFactory . on_stream_changed ( app , stream , true , " rtsp " , mediaServerItem . getId ( ) ) ;
dynamicTask . startDelay ( uuid , ( ) - > {
logger . info ( " [第三方服务对接->发送流] 等待流上线超时 callId->{} " , callId ) ;
redisTemplate . delete ( key ) ;
hookSubscribe . removeSubscribe ( hookSubscribeForStreamChange ) ;
} , 10000 ) ;
// 订阅 zlm启动事件, 新的zlm也会从这里进入系统
OtherRtpSendInfo finalSendInfo = sendInfo ;
hookSubscribe . removeSubscribe ( hookSubscribeForStreamChange ) ;
hookSubscribe . addSubscribe ( hookSubscribeForStreamChange ,
( mediaServerItemInUse , response ) - > {
dynamicTask . stop ( uuid ) ;
logger . info ( " [第三方服务对接->发送流] 流上线,开始发流 callId->{} " , callId ) ;
try {
Thread . sleep ( 400 ) ;
} catch ( InterruptedException e ) {
throw new RuntimeException ( e ) ;
}
if ( paramForVideo ! = null ) {
JSONObject jsonObject = zlmServerFactory . startSendRtpStream ( mediaServerItem , paramForVideo ) ;
if ( jsonObject . getInteger ( " code " ) = = 0 ) {
logger . info ( " [第三方服务对接->发送流] 视频流发流成功, callId->{}, param->{} " , callId , paramForVideo ) ;
redisTemplate . opsForValue ( ) . set ( key , finalSendInfo ) ;
} else {
redisTemplate . delete ( key ) ;
logger . info ( " [第三方服务对接->发送流] 视频流发流失败, callId->{}, {} " , callId , jsonObject . getString ( " msg " ) ) ;
throw new ControllerException ( ErrorCode . ERROR100 . getCode ( ) , " [视频流发流失败] " + jsonObject . getString ( " msg " ) ) ;
}
}
if ( paramForAudio ! = null ) {
JSONObject jsonObject = zlmServerFactory . startSendRtpStream ( mediaServerItem , paramForAudio ) ;
if ( jsonObject . getInteger ( " code " ) = = 0 ) {
logger . info ( " [第三方服务对接->发送流] 音频流发流成功, callId->{}, param->{} " , callId , paramForAudio ) ;
redisTemplate . opsForValue ( ) . set ( key , finalSendInfo ) ;
} else {
redisTemplate . delete ( key ) ;
logger . info ( " [第三方服务对接->发送流] 音频流发流失败, callId->{}, {} " , callId , jsonObject . getString ( " msg " ) ) ;
throw new ControllerException ( ErrorCode . ERROR100 . getCode ( ) , " [音频流发流失败] " + jsonObject . getString ( " msg " ) ) ;
}
}
hookSubscribe . removeSubscribe ( hookSubscribeForStreamChange ) ;
} ) ;
}
}
@GetMapping ( value = " /send/stop " )
@ResponseBody
@Operation ( summary = " 关闭发送流 " )
@Parameter ( name = " callId " , description = " 整个过程的唯一标识,不传则使用随机端口发流 " , required = true )
public void closeSendRTP ( String callId ) {
logger . info ( " [第三方服务对接->关闭发送流] callId->{} " , callId ) ;
String key = VideoManagerConstants . WVP_OTHER_SEND_RTP_INFO + userSetting . getServerId ( ) + callId ;
String key = VideoManagerConstants . WVP_OTHER_SEND_RTP_INFO + userSetting . getServerId ( ) + " _ " + callId ;
OtherRtpSendInfo sendInfo = ( OtherRtpSendInfo ) redisTemplate . opsForValue ( ) . get ( key ) ;
if ( sendInfo = = null ) {
throw new ControllerException ( ErrorCode . ERROR100 . getCode ( ) , " 未开启发流 " ) ;
@@ -251,6 +384,7 @@ public class RtpController {
} else {
logger . info ( " [第三方服务对接->关闭发送流] 成功 callId->{} " , callId ) ;
}
redisTemplate . delete ( key ) ;
}
}