@@ -3,7 +3,6 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl;
import com.alibaba.fastjson2.JSON ;
import com.alibaba.fastjson2.JSONObject ;
import com.genersoft.iot.vmp.common.InviteSessionType ;
import com.genersoft.iot.vmp.common.StreamInfo ;
import com.genersoft.iot.vmp.common.VideoManagerConstants ;
import com.genersoft.iot.vmp.conf.DynamicTask ;
import com.genersoft.iot.vmp.conf.SipConfig ;
@@ -29,19 +28,13 @@ import com.genersoft.iot.vmp.media.event.hook.HookSubscribe;
import com.genersoft.iot.vmp.media.event.hook.HookType ;
import com.genersoft.iot.vmp.media.service.IMediaServerService ;
import com.genersoft.iot.vmp.media.zlm.SendRtpPortManager ;
import com.genersoft.iot.vmp.service.bean.ErrorCallback ;
import com.genersoft.iot.vmp.service.bean.InviteErrorCode ;
import com.genersoft.iot.vmp.service.bean.MessageForPushChannel ;
import com.genersoft.iot.vmp.service.bean.SSRCInfo ;
import com.genersoft.iot.vmp.service.redisMsg.IRedisRpcService ;
import com.genersoft.iot.vmp.service.redisMsg.RedisPushStreamResponseListener ;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage ;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage ;
import com.genersoft.iot.vmp.streamProxy.bean.StreamProxy ;
import com.genersoft.iot.vmp.streamProxy.service.IStreamProxyService ;
import com.genersoft.iot.vmp.streamPush.bean.StreamPush ;
import com.genersoft.iot.vmp.streamPush.service.IStreamPushService ;
import com.genersoft.iot.vmp.utils.DateUtil ;
import gov.nist.javax.sdp.TimeDescriptionImpl ;
import gov.nist.javax.sdp.fields.TimeField ;
import gov.nist.javax.sdp.fields.URIField ;
@@ -63,7 +56,6 @@ import javax.sip.header.CallIdHeader;
import javax.sip.message.Response ;
import java.text.ParseException ;
import java.time.Instant ;
import java.util.List ;
import java.util.Map ;
import java.util.Random ;
import java.util.Vector ;
@@ -168,6 +160,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
if ( platform = = null ) {
inviteFromDeviceHandle ( request , inviteInfo . getRequesterId ( ) , inviteInfo . getChannelId ( ) ) ;
} else {
// 查询平台下是否有该通道
CommonGBChannel channel = channelService . queryOneWithPlatform ( platform . getId ( ) , inviteInfo . getChannelId ( ) ) ;
if ( channel = = null ) {
@@ -180,402 +173,433 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
}
return ;
}
log . info ( " [上级Invite] 平台:{}, 通道:{}({}), 收流地址:{}:{},收流方式:{}, 点播类型:{}, ssrc: {} " ,
platform . getName ( ) , channel . getGbName ( ) , channel . getGbDeviceDbId ( ) , inviteInfo . getIp ( ) ,
inviteInfo . getPort ( ) , inviteInfo . isTcp ( ) ? ( inviteInfo . isTcpActive ( ) ? " TCP主动 " : " TCP被动 " ) : " UDP " ,
inviteInfo . getSessionName ( ) , inviteInfo . getSsrc ( ) ) ;
// 通道存在, 发100, TRYING
try {
responseAck ( request , Response . TRYING ) ;
} catch ( SipException | InvalidArgumentException | ParseException e ) {
log . error ( " [命令发送失败] i nvite TRYING: {} " , e . getMessage ( ) ) ;
log . error ( " [命令发送失败] 上级I nvite TRYING: {} " , e . getMessage ( ) ) ;
}
channelService . start ( channel , ( ( code , msg , data ) - > {
channelService . start ( channel , ( ( code , msg , commonChannelPlayInfo ) - > {
if ( code ! = Response . OK ) {
try {
responseAck ( request , code , msg ) ;
} catch ( SipException | InvalidArgumentException | ParseException e ) {
log . error ( " [命令发送失败] 点播失败: {} " , e . getMessage ( ) ) ;
log . error ( " [命令发送失败] 上级Invite 点播失败: {} " , e . getMessage ( ) ) ;
}
} else {
// 点播成功, TODO 可以在此处检测cancel命令是否存在, 存在则不发送
// 构建sendRTP内容
SendRtpItem sendRtpItem = mediaServerService . createSendRtpItem ( commonChannelPlayInfo . getMediaServer ( ) ,
inviteInfo . getIp ( ) , inviteInfo . getPort ( ) , inviteInfo . getSsrc ( ) , platform . getServerGBId ( ) ,
commonChannelPlayInfo . getStreamInfo ( ) . getApp ( ) , commonChannelPlayInfo . getStreamInfo ( ) . getStream ( ) ,
channel . getGbDeviceId ( ) , inviteInfo . isTcp ( ) , platform . isRtcp ( ) ) ;
if ( inviteInfo . isTcp ( ) & & inviteInfo . isTcpActive ( ) ) {
sendRtpItem . setTcpActive ( true ) ;
}
sendRtpItem . setStatus ( 1 ) ;
sendRtpItem . setCallId ( inviteInfo . getCallId ( ) ) ;
sendRtpItem . setPlayType ( " Play " . equalsIgnoreCase ( inviteInfo . getSessionName ( ) ) ? InviteStreamType . PLAY : InviteStreamType . PLAYBACK ) ;
redisCatchStorage . updateSendRTPSever ( sendRtpItem ) ;
String sdpIp = commonChannelPlayInfo . getMediaServer ( ) . getSdpIp ( ) ;
if ( ! ObjectUtils . isEmpty ( platform . getSendStreamIp ( ) ) ) {
sdpIp = platform . getSendStreamIp ( ) ;
}
String content = createSendSdp ( sendRtpItem , inviteInfo , sdpIp ) ;
// 超时未收到Ack应该回复bye,当前等待时间为10秒
dynamicTask . startDelay ( inviteInfo . getCallId ( ) , ( ) - > {
log . info ( " Ack 等待超时 " ) ;
mediaServerService . releaseSsrc ( commonChannelPlayInfo . getMediaServer ( ) . getId ( ) , sendRtpItem . getSsrc ( ) ) ;
// 回复bye
sendBye ( platform , inviteInfo . getCallId ( ) ) ;
} , 60 * 1000 ) ;
try {
responseSdpAck ( request , content , platform ) ;
} catch ( SipException | InvalidArgumentException | ParseException e ) {
log . error ( " [命令发送失败] 上级Invite 发送 200( SDP) : {} " , e . getMessage ( ) ) ;
}
// tcp主动模式, 回复sdp后开启监听
if ( sendRtpItem . isTcpActive ( ) ) {
MediaServer mediaServer = mediaServerService . getOne ( sendRtpItem . getMediaServerId ( ) ) ;
try {
mediaServerService . startSendRtpPassive ( mediaServer , sendRtpItem , 5 ) ;
redisCatchStorage . sendPlatformStartPlayMsg ( sendRtpItem , platform ) ;
} catch ( ControllerException e ) {
log . warn ( " [上级Invite] tcp主动模式 发流失败 " , e ) ;
sendBye ( platform , inviteInfo . getCallId ( ) ) ;
}
}
}
} ) ) ;
if ( channel . getGbDeviceDbId ( ) > 0 ) {
Device device = deviceService . getDevice ( channel . getGbDeviceDbId ( ) ) ;
if ( device = = null ) {
log . warn ( " 点播平台{}的通道{}时未找到设备信息 " , requesterId , channel ) ;
try {
responseAck ( request , Response . SERVER_INTERNAL_ERROR ) ;
} catch ( SipException | InvalidArgumentException | ParseException e ) {
log . error ( " [命令发送失败] invite 未找到设备信息: {} " , e . getMessage ( ) ) ;
}
return ;
}
}
}
} catch ( SdpException e ) {
// 参数不全, 发400, 请求错误
try {
responseAck ( request , Response . BAD_REQUEST ) ;
} catch ( SipException | InvalidArgumentException | ParseException e ) {
log . error ( " [命令发送失败] invite BAD_REQUEST: {} " , e . getMessage ( ) ) ;
} catch ( SipException | InvalidArgumentException | ParseException sendException ) {
log . error ( " [命令发送失败] invite BAD_REQUEST: {} " , sendException . getMessage ( ) ) ;
}
return ;
} catch ( InviteDecodeException e ) {
try {
responseAck ( request , e . getCode ( ) , e . getMsg ( ) ) ;
} catch ( SipException | InvalidArgumentException | ParseException e ) {
log . error ( " [命令发送失败] invite BAD_REQUEST: {} " , e . getMessage ( ) ) ;
} catch ( SipException | InvalidArgumentException | ParseException sendException ) {
log . error ( " [命令发送失败] invite BAD_REQUEST: {} " , sendException . getMessage ( ) ) ;
}
} catch ( PlayException e ) {
try {
responseAck ( request , e . getCode ( ) , e . getMsg ( ) ) ;
} catch ( SipException | InvalidArgumentException | ParseException e ) {
log . error ( " [命令发送失败] invite 点播失败: {} " , e . getMessage ( ) ) ;
} catch ( SipException | InvalidArgumentException | ParseException sendException ) {
log . error ( " [命令发送失败] invite 点播失败: {} " , sendException . getMessage ( ) ) ;
}
}
// Invite Request消息实现, 此消息一般为级联消息, 上级给下级发送请求视频指令
try {
// 查询请求是否来自上级平台\设备
Platform platform = storager. queryParentPlatByServerGBId( requesterId) ;
if ( platform = = null ) {
inviteFromDeviceHandle( request, requesterId, channelId) ;
} else {
// 查询平台下是否有该通道
CommonGBChannel channel= channelService. queryOneWithPlatform( platform. getId ( ) , channelId) ;
MediaServer mediaServerItem = null ;
StreamPush streamPushItem = null ;
StreamProxy proxyByAppAndStream = null ;
if ( channel = = null ) {
log. info ( " [上级INVITE] 通道不存在, 返回404: {}" , channelId) ;
try {
// 通道不存在, 发404, 资源不存在
responseAck( request, Response. NOT_FOUND) ;
} catch ( SipException | InvalidArgumentException | ParseException e ) {
log. error ( " [命令发送失败] invite 通道不存在: {}" , e . getMessage ( ) ) ;
}
return ;
}
// 通道存在, 发100, TRYING
try {
responseAck( request, Response. TRYING) ;
} catch ( SipException | InvalidArgumentException | ParseException e ) {
log. error ( " [命令发送失败] invite TRYING: {}" , e . getMessage ( ) ) ;
}
Device device = null ;
// 通过 channel 和 gbStream 是否为null 值判断来源是直播流合适国标
if ( channel ! = null ) {
device = storager. queryVideoDeviceByPlatformIdAndChannelId( requesterId, channelId) ;
if ( device = = null ) {
log. warn ( " 点播平台{}的通道{}时未找到设备信息" , requesterId, channel) ;
try {
responseAck( request, Response. SERVER_INTERNAL_ERROR) ;
} catch ( SipException | InvalidArgumentException | ParseException e ) {
log. error ( " [命令发送失败] invite 未找到设备信息: {}" , e . getMessage ( ) ) ;
}
return ;
}
mediaServerItem = playService. getNewMediaServerItem( device) ;
if ( mediaServerItem = = null ) {
log. warn ( " 未找到可用的zlm" ) ;
try {
responseAck( request, Response. BUSY_HERE) ;
} catch ( SipException | InvalidArgumentException | ParseException e ) {
log. error ( " [命令发送失败] invite BUSY_HERE: {}" , e . getMessage ( ) ) ;
}
return ;
}
String ssrc ;
if ( userSetting. getUseCustomSsrcForParentInvite( ) | | gb28181Sdp. getSsrc( ) = = null ) {
// 上级平台点播时不使用上级平台指定的ssrc, 使用自定义的ssrc, 参考国标文档-点播外域设备媒体流SSRC处理方式
ssrc = " Play " . equalsIgnoreCase( sessionName) ? ssrcFactory. getPlaySsrc( mediaServerItem. getId ( ) ) : ssrcFactory. getPlayBackSsrc( mediaServerItem. getId ( ) ) ;
} else {
ssrc = gb28181Sdp. getSsrc( ) ;
}
String streamTypeStr = null ;
if ( mediaTransmissionTCP) {
if ( tcpActive) {
streamTypeStr = " TCP-ACTIVE" ;
} else {
streamTypeStr = " TCP-PASSIVE" ;
}
} else {
streamTypeStr = " UDP " ;
}
log . info ( " [上级Invite] {}, 平台:{}, 通道:{}, 收流地址:{}:{},收流方式:{}, ssrc: {} " ,
sessionName , username , channelId , addressStr , port , streamTypeStr , ssrc ) ;
SendRtpItem sendRtpItem = mediaServerService . createSendRtpItem ( mediaServerItem , addressStr , port , ssrc , requesterId ,
device . getDeviceId ( ) , channelId , mediaTransmissionTCP , platform . isRtcp ( ) ) ;
if ( tcpActive ! = null ) {
sendRtpItem . setTcpActive ( tcpActive ) ;
}
if ( sendRtpItem = = null ) {
log . warn ( " 服务器端口资源不足 " ) ;
try {
responseAck ( request , Response . BUSY_HERE ) ;
} catch ( SipException | InvalidArgumentException | ParseException e ) {
log . error ( " [命令发送失败] invite 服务器端口资源不足: {} " , e . getMessage ( ) ) ;
}
return ;
}
sendRtpItem. setCallId ( callIdHeader . getCallId ( ) ) ;
sendRtpItem . setPlayType ( " Play " . equalsIgnoreCase ( sessionName ) ? InviteStreamType . PLAY : InviteStreamType . PLAYBACK ) ;
Long finalStart Time = start Time ;
Long finalStopTime = stopTime ;
ErrorCallback < Object > hookEvent = ( code , msg , data ) - > {
StreamInfo streamInfo = ( StreamInfo ) data ;
MediaServer mediaServerItemInUSe = mediaServerService . getOne ( streamInfo . getMediaServerId ( ) ) ;
log . info ( " [上级Invite]下级已经开始推流。 回复200OK(SDP), {}/{} " , streamInfo . getApp ( ) , streamInfo . getStream ( ) ) ;
// * 0 等待设备推流上来
// * 1 下级已经推流, 等待上级平台回复ack
// * 2 推流中
sendRtpItem . setStatus ( 1 ) ;
redisCatchStorage . updateSendRTPSever ( sendRtpItem ) ;
String sdpIp = mediaServerItemInUSe . getSdpIp ( ) ;
if ( ! ObjectUtils . isEmpty ( platform. getSendStreamIp( ) ) ) {
sdpIp = platform . getSendStreamIp ( ) ;
}
StringBuffer content = new StringBuffer ( 200 ) ;
content. append( " v=0 \ r \ n " ) ;
content. append( " o= " + channelId + " 0 0 IN IP4 " + sdpIp + " \ r \ n " ) ;
content. append( " s= " + sessionName + " \ r \ n " ) ;
content . append ( " c=IN IP4 " + sdpIp + " \ r \ n " ) ;
if ( " Playback " . equalsIgnoreCase ( sessionName ) ) {
content . append ( " t= " + finalStartTime + " " + finalStopTime + " \ r \ n " ) ;
} else {
content . append ( " t=0 0 \ r \ n " ) ;
}
int localPort = sendRtpItem . getLocalPort ( ) ;
if ( localPort = = 0 ) {
// 非严格模式端口不统一, 增加兼容性, 修改为一个不为0的端口
localPort = new Random ( ) . nextInt ( 65535 ) + 1 ;
}
if ( sendRtpItem . isTcp ( ) ) {
content . append ( " m=video " + localPort + " TCP/RTP/AVP 96 \ r \ n " ) ;
if ( ! sendRtpItem . isTcpActive ( ) ) {
content . append ( " a=setup:active \ r \ n " ) ;
} else {
content . append ( " a=setup:passive \ r \ n " ) ;
}
} else {
content . append ( " m=video " + localPort + " RTP/AVP 96 \ r \ n " ) ;
}
content. append( " a=sendonly \ r \ n " ) ;
content. append( " a=rtpmap:96 PS/90000 \ r \ n " ) ;
content. append( " y= " + sendRtpItem . getSsrc ( ) + " \ r \ n " ) ;
content . append ( " f= \ r \ n " ) ;
try {
// 超时未收到Ack应该回复bye,当前等待时间为10秒
dynamicTask . startDelay ( callIdHeader . getCallId ( ) , ( ) - > {
log . info ( " Ack 等待超时 " ) ;
mediaServerService . releaseSsrc ( mediaServerItemInUSe . getId ( ) , sendRtpItem . getSsrc ( ) ) ;
// 回复bye
try {
cmderFroPlatform . streamByeCmd ( platform , callIdHeader . getCallId ( ) ) ;
} catch ( SipException | InvalidArgumentException | ParseException e ) {
log . error ( " [命令发送失败] 国标级联 发送BYE: {} " , e . getMessage ( ) ) ;
}
} , 60 * 1000 ) ;
responseSdpAck ( request , content . toString ( ) , platform ) ;
// tcp主动模式, 回复sdp后开启监听
if ( sendRtpItem . isTcpActive ( ) ) {
MediaServer mediaServer = mediaServerService . getOne ( sendRtpItem . getMediaServerId ( ) ) ;
try {
mediaServerService . startSendRtpPassive ( mediaServer , sendRtpItem , 5 ) ;
redisCatchStorage . sendPlatformStartPlayMsg ( sendRtpItem , platform ) ;
} catch ( ControllerException e ) { }
}
} catch ( SipException | InvalidArgumentException | ParseException e ) {
log . error ( " [命令发送失败] 国标级联 回复SdpAck " , e ) ;
}
} ;
ErrorCallback < Object > errorEvent = ( ( statusCode , msg , data ) - > {
log . info ( " [上级Invite] {}, 失败, 平台:{}, 通道:{}, code: {}, msg; {} " , sessionName , username , channelId , statusCode , msg ) ;
// 未知错误。直接转发设备点播的错误
try {
Response response = getMessageFactory ( ) . createResponse ( statusCode , evt . getRequest ( ) ) ;
sipSender . transmitRequest ( request . getLocalAddress ( ) . getHostAddress ( ) , response ) ;
} catch ( ParseException | SipException e ) {
log . error ( " 未处理的异常 " , e ) ;
}
} ) ;
sendRtpItem . setApp ( " rtp " ) ;
if ( " Playback " . equalsIgnoreCase ( sessionName ) ) {
sendRtpItem . setPlayType ( InviteStreamType . PLAYBACK ) ;
String startTimeStr = DateUtil. urlFormatter. format( start ) ;
String endTimeStr = DateUtil . urlFormatter . format ( end ) ;
String stream = device . getDeviceId ( ) + " _ " + channelId + " _ " + startTimeStr + " _ " + endTimeStr ;
int tcpMode = device . getStreamMode ( ) . equals ( " TCP-ACTIVE " ) ? 2 : ( device . getStreamMode ( ) . equals ( " TCP-PASSIVE " ) ? 1 : 0 ) ;
SSRCInfo ssrcInfo = mediaServerService . openRTPServer ( mediaServerItem , stream , null ,
device . isSsrcCheck ( ) , true , 0 , false , ! channel . isHasAudio ( ) , false , tcpMode ) ;
sendRtpItem . setStream ( stream ) ;
// 写入redis, 超时时回复
redisCatchStorage . updateSendRTP Sever( sendRtpItem ) ;
playService . playBack ( mediaServerItem , ssrcInfo , device . getDeviceId ( ) , channelId , DateUtil. formatter. format( start ) ,
DateUtil . formatter . format ( end ) ,
( code , msg , data ) - > {
if ( code = = InviteErrorCode . SUCCESS . getCode ( ) ) {
hookEvent . run ( code , msg , data ) ;
} else if ( code = = InviteErrorCode . ERROR_FOR_SIGNALLING_TIMEOUT . getCode ( ) | | code = = InviteErrorCode . ERROR_FOR_STREAM_TIMEOUT . getCode ( ) ) {
log . info ( " [录像回放]超时, 用户:{}, 通道:{} " , username , channelId ) ;
redisCatchStorage . deleteSendRTPServer ( platform . getServerGBId ( ) , channelId , callIdHeader . getCallId ( ) , null ) ;
errorEvent . run ( code , msg , data ) ;
} else {
errorEvent . run ( code , msg , data ) ;
}
} ) ;
} else if ( " Download " . equalsIgnoreCase ( sessionName ) ) {
// 获取指定的下载速度
Vector sdpMediaDescriptions = sdp . getMediaDescriptions ( true ) ;
MediaDescription mediaDescription = null ;
String downloadSpeed = " 1 " ;
if ( sdpMediaDescriptions . size ( ) > 0 ) {
mediaDescription = ( MediaDescription ) sdpMediaDescriptions . get ( 0 ) ;
}
if ( mediaDescription ! = null ) {
downloadSpeed = mediaDescription . getAttribute ( " downloadspeed " ) ;
}
sendRtpItem . setPlayType ( InviteStreamType . DOWNLOAD ) ;
int tcpMode = device . getStreamMode ( ) . equals ( " TCP-ACTIVE " ) ? 2 : ( device . getStreamMode ( ) . equals ( " TCP-PASSIVE " ) ? 1 : 0 ) ;
SSRCInfo ssrcInfo = mediaServerService . openRTPServer ( mediaServerItem , null , null ,
device . isSsrcCheck ( ) , true , 0 , false , ! channel . isHasAudio ( ) , false , tcpMode ) ;
sendRtpItem . setStream ( ssrcInfo . getStream ( ) ) ;
// 写入redis, 超时时回复
redisCatchStorage . updateSendRTP Sever( sendRtpItem ) ;
playService . download ( mediaServerItem , ssrcInfo , device . getDeviceId ( ) , channelId , DateUtil. formatter. format( start ) ,
DateUtil . formatter . format ( end ) , Integer . parseInt ( downloadSpeed ) ,
( code , msg , data ) - > {
if ( code = = InviteErrorCode . SUCCESS . getCode ( ) ) {
hookEvent . run ( code , msg , data ) ;
} else if ( code = = InviteErrorCode . ERROR_FOR_SIGNALLING_TIMEOUT . getCode ( ) | | code = = InviteErrorCode . ERROR_FOR_STREAM_TIMEOUT . getCode ( ) ) {
log . info ( " [录像下载]超时, 用户:{}, 通道:{} " , username , channelId ) ;
redisCatchStorage . deleteSendRTPServer ( platform . getServerGBId ( ) , channelId , callIdHeader . getCallId ( ) , null ) ;
errorEvent . run ( code , msg , data ) ;
} else {
errorEvent . run ( code , msg , data ) ;
}
} ) ;
} else {
sendRtpItem . setPlayType ( InviteStreamType . PLAY ) ;
String streamId = String . format ( " %s_%s " , device . getDeviceId ( ) , channelId ) ;
sendRtpItem . setStream ( streamId ) ;
redisCatchStorage . updateSendRTP Sever( sendRtpItem ) ;
SSRCInfo ssrcInfo = playService . play ( mediaServerItem , device . getDeviceId ( ) , channelId , ssrc , ( ( code , msg , data ) - > {
if ( code = = InviteErrorCode . SUCCESS . getCode ( ) ) {
hookEvent . run ( code , msg , data ) ;
} else if ( code = = InviteErrorCode . ERROR_FOR_SIGNALLING_TIMEOUT . getCode ( ) | | code = = InviteErrorCode . ERROR_FOR_STREAM_TIMEOUT . getCode ( ) ) {
log . info ( " [上级点播]超时, 用户:{}, 通道:{} " , username , channelId ) ;
redisCatchStorage . deleteSendRTPServer ( platform . getServerGBId ( ) , channelId , callIdHeader . getCallId ( ) , null ) ;
errorEvent . run ( code , msg , data ) ;
} else {
errorEvent . run ( code , msg , data ) ;
}
} ) ) ;
sendRtpItem . setSsrc ( ssrcInfo . getSsrc ( ) ) ;
redisCatchStorage . updateSendRTPSever ( sendRtpItem ) ;
}
} else if ( gbStream ! = null ) {
SendRtpItem sendRtpItem = new SendRtpItem ( ) ;
if ( ! userSetting . getUseCustomSsrcForParentInvite ( ) & & gb28181Sdp. getSsrc( ) ! = null ) {
sendRtpItem . setSsrc ( gb28181Sdp . getSsrc ( ) ) ;
}
if ( tcpActive ! = null ) {
sendRtpItem . setTcpActive ( tcpActive ) ;
}
sendRtpItem. setTcp ( mediaTransmissionTCP ) ;
sendRtpItem. setRtcp ( platform . isRtcp ( ) ) ;
sendRtpItem. setPlatformName ( platform . getName ( ) ) ;
sendRtpItem. setPlatformId ( platform . getServerGBId ( ) ) ;
sendRtpItem. setMediaServerId ( mediaServerItem . getId ( ) ) ;
sendRtpItem. setChannelId ( channelId ) ;
sendRtpItem. setIp ( addressStr ) ;
sendRtpItem. setPort ( port ) ;
sendRtpItem. setUsePs ( true ) ;
sendRtpItem. setApp ( gbStream . getApp ( ) ) ;
sendRtpItem. setStream ( gbStream . getStream ( ) ) ;
sendRtpItem. setCallId ( callIdHeader . getCallId ( ) ) ;
sendRtpItem. setFromTag ( request . getFromTag ( ) ) ;
sendRtpItem. setOnlyAudio ( false ) ;
sendRtpItem. setStatus ( 0 ) ;
sendRtpItem . setSessionName ( sessionName ) ;
// 清理可能存在的缓存避免用到旧的数据
List < SendRtpItem > sendRtpItemList = redisCatchStorage . querySendRTPServer ( platform . getServerGBId ( ) , channelId , gbStream . getStream ( ) ) ;
if ( ! sendRtpItemList . isEmpty ( ) ) {
for ( SendRtpItem rtpItem : sendRtpItemList ) {
redisCatchStorage . deleteSendRTPServer ( rtpItem ) ;
}
}
if ( " push " . equals ( gbStream . getStreamType ( ) ) ) {
sendRtpItem . setPlayType ( InviteStreamType . PUSH ) ;
if ( streamPushItem ! = null ) {
// 从redis查询是否正在接收这个推流
MediaInfo mediaInfo = redisCatchStorage . getPushListItem ( gbStream . getApp ( ) , gbStream . getStream ( ) ) ;
if ( mediaInfo ! = null ) {
sendRtpItem. set ServerId( mediaInfo. getServerId ( ) ) ;
sendRtpItem . setMediaServerId ( mediaInfo . getMediaServer ( ) . getId ( ) ) ;
redisCatchStorage . updateSendRTPSever ( sendRtpItem ) ;
// 开始推流
sendPushStream ( sendRtpItem , mediaServerItem , platform , request ) ;
} else {
if ( ! platform . isStartOfflinePush ( ) ) {
// 平台设置中关闭了拉起离线的推流则直接回复
try {
log . info ( " [上级点播] 失败, 推流设备未推流, channel: {}, app: {}, stream: {} " , sendRtpItem . getChannelId ( ) , sendRtpItem . getApp ( ) , sendRtpItem . getStream ( ) ) ;
responseAck ( request , Response . TEMPORARILY_UNAVAILABLE , " channel stream not pushing " ) ;
} catch ( SipException | InvalidArgumentException | ParseException e ) {
log . error ( " [命令发送失败] invite 通道未推流: {} " , e . getMessage ( ) ) ;
}
return ;
}
notifyPushStreamOnline ( sendRtpItem , mediaServerItem , platform , request ) ;
}
}
} else if ( " proxy " . equals ( gbStream . getStreamType ( ) ) ) {
if ( null ! = proxyByAppAndStream ) {
sendRtpItem. setServerId ( userSetting . getServerId ( ) ) ;
if ( sendRtpItem . getSsrc ( ) = = null ) {
// 上级平台点播时不使用上级平台指定的ssrc, 使用自定义的ssrc, 参考国标文档-点播外域设备媒体流SSRC处理方式
String ssrc = " Play " . equalsIgnoreCase ( sessionName ) ? ssrcFactory . getPlaySsrc ( mediaServerItem . getId ( ) ) : ssrcFactory . getPlayBackSsrc ( mediaServerItem . getId ( ) ) ;
sendRtpItem . setSsrc ( ssrc ) ;
}
MediaInfo mediaInfo = redisCatchStorage . getProxyStream ( gbStream . getApp ( ) , gbStream . getStream ( ) ) ;
if ( mediaInfo ! = null ) {
sendProxyStream ( sendRtpItem , mediaServerItem , platform , request ) ;
} else {
//开启代理拉流
notifyProxyStreamOnline ( sendRtpItem , mediaServerItem , platform , request ) ;
}
}
}
}
}
} catch ( SdpParseException e ) {
log . error ( " sdp解析错误 " , e ) ;
} catch ( SdpException e ) {
log . error ( " 未处理的异常 " , e ) ;
}
// // Invite Request消息实现, 此消息一般为级联消息, 上级给下级发送请求视频指令
// try {
//
//
//
//
//
//
//
// // 查询请求是否来自上级平台\设备
// Platform platform = storager. queryParentPlatByServerGBId( requesterId) ;
//
// if (platform == null) {
// inviteFromDeviceHandle( request, requesterId, channelId) ;
//
// } else {
// // 查询平台下是否有该通道
// CommonGBChannel channel= channelService. queryOneWithPlatform( platform.getId(), channelId) ;
// MediaServer mediaServerItem = null;
// StreamPush streamPushItem = null;
// StreamProxy proxyByAppAndStream = null;
// if (channel == null) {
// log.info(" [上级INVITE] 通道不存在, 返回404: {}", channelId) ;
// try {
// // 通道不存在, 发404, 资源不存在
// responseAck( request, Response. NOT_FOUND) ;
// } catch ( SipException | InvalidArgumentException | ParseException e) {
// log.error(" [命令发送失败] invite 通道不存在: {}", e.getMessage()) ;
// }
// return;
// }
// // 通道存在, 发100, TRYING
// try {
// responseAck( request, Response. TRYING) ;
// } catch ( SipException | InvalidArgumentException | ParseException e) {
// log.error(" [命令发送失败] invite TRYING: {}", e.getMessage()) ;
// }
//
//
//
// Device device = null;
// // 通过 channel 和 gbStream 是否为null 值判断来源是直播流合适国标
// if (channel != null) {
// device = storager. queryVideoDeviceByPlatformIdAndChannelId( requesterId, channelId) ;
// if (device == null) {
// log.warn(" 点播平台{}的通道{}时未找到设备信息", requesterId, channel) ;
// try {
// responseAck( request, Response. SERVER_INTERNAL_ERROR) ;
// } catch ( SipException | InvalidArgumentException | ParseException e) {
// log.error(" [命令发送失败] invite 未找到设备信息: {}", e.getMessage()) ;
// }
// return;
// }
// mediaServerItem = playService. getNewMediaServerItem( device) ;
// if ( mediaServerItem == null) {
// log.warn(" 未找到可用的zlm") ;
// try {
// responseAck( request, Response. BUSY_HERE) ;
// } catch ( SipException | InvalidArgumentException | ParseException e) {
// log.error(" [命令发送失败] invite BUSY_HERE: {}", e.getMessage()) ;
// }
// return;
// }
//
// String ssrc;
// if ( userSetting. getUseCustomSsrcForParentInvite() || gb28181Sdp. getSsrc() == null) {
// // 上级平台点播时不使用上级平台指定的ssrc, 使用自定义的ssrc, 参考国标文档-点播外域设备媒体流SSRC处理方式
// ssrc = "Play". equalsIgnoreCase( sessionName) ? ssrcFactory. getPlaySsrc( mediaServerItem.getId()) : ssrcFactory. getPlayBackSsrc( mediaServerItem.getId()) ;
// }else {
// ssrc = gb28181Sdp. getSsrc() ;
// }
// String streamTypeStr = null;
// if ( mediaTransmissionTCP) {
// if ( tcpActive) {
// streamTypeStr = " TCP-ACTIVE" ;
// } else {
// streamTypeStr = " TCP-PASSIVE" ;
// }
// } else {
// streamTypeStr = "UDP" ;
// }
//
// SendRtpItem sendRtpItem = mediaServerService.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId,
// device.getDeviceId(), channelId, mediaTransmissionTCP, platform.isRtcp());
//
// if (tcpActive != null) {
// sendRtpItem.setTcpActive(tcpActive);
// }
// if (sendRtpItem == null) {
// log.warn("服务器端口资源不足");
// try {
// responseAck(request, Response.BUSY_HERE);
// } catch (SipException | InvalidArgumentException | ParseException e) {
// log.error("[命令发送失败] invite 服务器端口资源不足: {}", e.getMessage());
// }
// return;
// }
// sendRtpItem.setCallId(callIdHeader.getCallId());
// sendRtpItem.setPlayType("Play".equalsIgnoreCase(sessionName) ? InviteStreamType.PLAY : InviteStreamType.PLAYBACK) ;
//
// Long finalStartTime = startTime;
// Long finalStop Time = stop Time;
// ErrorCallback<Object> hookEvent = (code, msg, data) -> {
// StreamInfo streamInfo = (StreamInfo)data;
// MediaServer mediaServerItemInUSe = mediaServerService.getOne(streamInfo.getMediaServerId()) ;
// log.info("[上级Invite]下级已经开始推流。 回复200OK(SDP), {}/{}", streamInfo.getApp(), streamInfo.getStream()) ;
// // * 0 等待设备推流上来
// // * 1 下级已经推流, 等待上级平台回复ack
// // * 2 推流中
// sendRtpItem.setStatus(1);
// redisCatchStorage.updateSendRTPSever(sendRtpItem) ;
// String sdpIp = mediaServerItemInUSe.getSdpIp() ;
// if (!ObjectUtils.isEmpty(platform.getSendStreamIp())) {
// sdpIp = platform. getSendStreamIp();
// }
// StringBuffer content = new StringBuffer(200);
// content.append("v=0\r\n") ;
// content. append("o=" + channelId + " 0 0 IN IP4 " + sdpIp + "\r\n") ;
// content. append("s=" + sessionName + "\r\n") ;
// content. append("c=IN IP4 " + sdpIp + "\r\n") ;
// if ("Playback".equalsIgnoreCase(sessionName)) {
// content.append("t=" + finalStartTime + " " + finalStopTime + "\r\n");
// } else {
// content.append("t=0 0\r\n");
// }
// int localPort = sendRtpItem.getLocalPort();
// if (localPort == 0) {
// // 非严格模式端口不统一, 增加兼容性, 修改为一个不为0的端口
// localPort = new Random().nextInt(65535) + 1;
// }
// if (sendRtpItem.isTcp()) {
// content.append("m=video " + localPort + " TCP/RTP/AVP 96\r\n");
// if (!sendRtpItem.isTcpActive()) {
// content.append("a=setup:active\r\n");
// } else {
// content.append("a=setup:passive\r\n");
// }
// }else {
// content.append("m=video " + localPort + " RTP/AVP 96\r\n");
// }
// content.append("a=sendonly\r\n");
// content. append("a=rtpmap:96 PS/90000\r\n") ;
// content. append("y=" + sendRtpItem.getSsrc() + "\r\n") ;
// content. append("f=\r\n") ;
//
//
// try {
// // 超时未收到Ack应该回复bye,当前等待时间为10秒
// dynamicTask.startDelay(callIdHeader.getCallId(), () -> {
// log.info("Ack 等待超时");
// mediaServerService.releaseSsrc(mediaServerItemInUSe.getId(), sendRtpItem.getSsrc()) ;
// // 回复bye
// try {
// cmderFroPlatform.streamByeCmd(platform, callIdHeader.getCallId());
// } catch (SipException | InvalidArgumentException | ParseException e) {
// log.error("[命令发送失败] 国标级联 发送BYE: {}", e.getMessage());
// }
// }, 60 * 1000);
// responseSdpAck(request, content.toString(), platform) ;
// // tcp主动模式, 回复sdp后开启监听
// if (sendRtpItem.isTcpActive()) {
// MediaServer mediaServer = mediaServerService.getOne(sendRtpItem.getMediaServerId());
// try {
// mediaServerService.startSendRtpPassive(mediaServer, sendRtpItem, 5);
// redisCatchStorage.sendPlatformStartPlayMsg(sendRtpItem, platform) ;
// }catch (ControllerException e) {}
// }
// } catch (SipException | InvalidArgumentException | ParseException e) {
// log.error("[命令发送失败] 国标级联 回复SdpAck", e);
// }
// };
// ErrorCallback<Object> errorEvent = ((statusCode, msg, data) -> {
// log.info("[上级Invite] {}, 失败, 平台:{}, 通道:{}, code: {}, msg; {}", sessionName, username, channelId, statusCode, msg);
// // 未知错误。直接转发设备点播的错误
// try {
// Response response = getMessageFactory().createResponse(statusCode, evt.getRequest());
// sipSender.transmitRequest(request.getLocalAddress().getHostAddress(), response) ;
// } catch (ParseException | SipException e) {
// log.error("未处理的异常 ", e);
// }
// });
// sendRtpItem.setApp("rtp") ;
// if ("Playback".equalsIgnoreCase(sessionName)) {
// sendRtpItem.setPlayType(InviteStreamType.PLAYBACK);
// String startTimeStr = DateUtil.urlFormatter.format(start) ;
// String endTimeStr = DateUtil. urlFormatter. format(end) ;
// String stream = device.getDeviceId() + "_" + channelId + "_" + startTimeStr + "_" + endTimeStr ;
// int tcpMode = device.getStreamMode().equals("TCP-ACTIVE")? 2: (device.getStreamMode().equals("TCP-PASSIVE")? 1:0) ;
// SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, stream, null,
// device.isSsrcCheck(), true, 0,false,!channel.isHasAudio(), false, tcpMode);
// sendRtpItem.setStream(stream) ;
// // 写入redis, 超时时回复
// redisCatchStorage.updateSendRTPSever(sendRtpItem);
// playService.playBack(media Ser verItem, ssrcInfo, device.getDeviceId(), channelId, DateUtil.formatter.format(start),
// DateUtil. formatter. format(end) ,
// (code, msg, data) -> {
// if (code == InviteErrorCode.SUCCESS.getCode()) {
// hookEvent.run(code, msg, data);
// } else if (code == InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getCode() || code == InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getCode()) {
// log.info("[录像回放]超时, 用户:{}, 通道:{}", username, channelId);
// redisCatchStorage.deleteSendRTPServer(platform.getServerGBId(), channelId, callIdHeader.getCallId(), null) ;
// errorEvent.run(code, msg, data) ;
// } else {
// errorEvent.run(code, msg, data);
// }
// });
// } else if ("Download".equalsIgnoreCase(sessionName)) {
// // 获取指定的下载速度
// Vector sdpMediaDescriptions = sdp.getMediaDescriptions(true);
// MediaDescription mediaDescription = null ;
// String downloadSpeed = "1" ;
// if (sdpMediaDescriptions.size() > 0) {
// mediaDescription = (MediaDescription) sdpMediaDescriptions.get(0);
// }
// if (mediaDescription != null) {
// downloadSpeed = mediaDescription.getAttribute("downloadspeed");
// }
//
// sendRtpItem.setPlayType(InviteStreamType.DOWNLOAD);
// int tcpMode = device.getStreamMode().equals("TCP-ACTIVE")? 2: (device.getStreamMode().equals("TCP-PASSIVE")? 1:0) ;
// SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, null, null,
// device.isSsrcCheck(), true, 0, false,!channel.isHasAudio(), false, tcpMode);
// sendRtpItem.setStream(ssrcInfo.getStream()) ;
// // 写入redis, 超时时回复
// redisCatchStorage.updateSendRTPSever(sendRtpItem);
// playService.download(media Ser verItem, ssrcInfo, device.getDeviceId(), channelId, DateUtil.formatter.format(start),
// DateUtil. formatter. format(end), Integer.parseInt(downloadSpeed) ,
// (code, msg, data) -> {
// if (code == InviteErrorCode.SUCCESS.getCode()) {
// hookEvent.run(code, msg, data);
// } else if (code == InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getCode() || code == InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getCode()) {
// log.info("[录像下载]超时, 用户:{}, 通道:{}", username, channelId);
// redisCatchStorage.deleteSendRTPServer(platform.getServerGBId(), channelId, callIdHeader.getCallId(), null) ;
// errorEvent.run(code, msg, data) ;
// } else {
// errorEvent.run(code, msg, data);
// }
// });
// } else {
// sendRtpItem.setPlayType(InviteStreamType.PLAY);
// String streamId = String.format("%s_%s", device.getDeviceId(), channelId) ;
// sendRtpItem.setStream(streamId) ;
// redisCatchStorage.updateSendRTPSever(sendRtpItem) ;
// SSRCInfo ssrcInfo = playService.play(media Ser verItem, device.getDeviceId(), channelId, ssrc, ((code, msg, data) -> {
// if (code == InviteErrorCode.SUCCESS.getCode()) {
// hookEvent.run(code, msg, data);
// } else if (code == InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getCode() || code == InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getCode()) {
// log.info("[上级点播]超时, 用户:{}, 通道:{}", username, channelId);
// redisCatchStorage.deleteSendRTPServer(platform.getServerGBId(), channelId, callIdHeader.getCallId(), null) ;
// errorEvent.run(code, msg, data) ;
// } else {
// errorEvent.run(code, msg, data);
// }
// }));
// sendRtpItem.setSsrc(ssrcInfo.getSsrc()) ;
// redisCatchStorage.updateSendRTPSever(sendRtpItem) ;
//
// }
// } else if (gbStream != null) {
// SendRtpItem sendRtpItem = new SendRtpItem();
// if (!userSetting.getUseCustomSsrcForParentInvite() && gb28181Sdp.getSsrc() != null) {
// sendRtpItem.setSsrc( gb28181Sdp. getSsrc());
// }
//
// if (tcpActive != null) {
// sendRtpItem.setTcpActive(tcpActive);
// }
// sendRtpItem.setTcp(mediaTransmissionTCP);
// sendRtpItem.setRtcp(platform.isRtcp()) ;
// sendRtpItem.setPlatformName(platform.getName()) ;
// sendRtpItem. setPlatformId(platform.getServerGBId()) ;
// sendRtpItem.setMediaServerId(mediaServerItem.getId()) ;
// sendRtpItem.setChannelId(channelId) ;
// sendRtpItem.setIp(addressStr) ;
// sendRtpItem.setPort(port) ;
// sendRtpItem.setUsePs(true) ;
// sendRtpItem.setApp(gbStream.getApp()) ;
// sendRtpItem.setStream(gbStream.getStream()) ;
// sendRtpItem.setCallId(callIdHeader.getCallId()) ;
// sendRtpItem.setFromTag(request.getFromTag()) ;
// sendRtpItem.setOnlyAudio(false) ;
// sendRtpItem.setStatus(0) ;
// sendRtpItem.setSessionName(sessionName) ;
// // 清理可能存在的缓存避免用到旧的数据
// List<SendRtpItem> sendRtpItemList = redisCatchStorage.querySendRTPServer(platform.getServerGBId(), channelId, gbStream.getStream());
// if (!sendRtpItemList.isEmpty()) {
// for (SendRtpItem rtpItem : sendRtpItemList) {
// redisCatchStorage.deleteSendRTPServer(rtpItem);
// }
// }
// if ("push".equals(gbStream.getStreamType())) {
// sendRtpItem.setPlayType(InviteStreamType.PUSH);
// if (streamPushItem != null) {
// // 从redis查询是否正在接收这个推流
// MediaInfo mediaInfo = redisCatchStorage.getPushListItem(gbStream.getApp(), gbStream.getStream());
// if (mediaInfo != null) {
// sendRtpItem.setServerId(mediaInfo.getServerId());
// sendRtpItem.setMedia ServerId( mediaInfo.getMediaServer().getId()) ;
//
// redisCatchStorage.updateSendRTPSever(sendRtpItem);
// // 开始推流
// sendPushStream(sendRtpItem, mediaServerItem, platform, request);
// }else {
// if (!platform.isStartOfflinePush()) {
// // 平台设置中关闭了拉起离线的推流则直接回复
// try {
// log.info("[上级点播] 失败, 推流设备未推流, channel: {}, app: {}, stream: {}", sendRtpItem.getChannelId(), sendRtpItem.getApp(), sendRtpItem.getStream());
// responseAck(request, Response.TEMPORARILY_UNAVAILABLE, "channel stream not pushing") ;
// } catch (SipException | InvalidArgumentException | ParseException e) {
// log.error("[命令发送失败] invite 通道未推流: {}", e.getMessage());
// }
// return;
// }
// notifyPushStreamOnline(sendRtpItem, mediaServerItem, platform, request);
// }
// }
// } else if ("proxy".equals(gbStream.getStreamType())) {
// if (null != proxyByAppAndStream) {
// sendRtpItem.setServerId(userSetting.getServerId());
// if ( sendRtpItem.getSsrc() == null) {
// // 上级平台点播时不使用上级平台指定的ssrc, 使用自定义的ssrc, 参考国标文档-点播外域设备媒体流SSRC处理方式
// String ssrc = "Play".equalsIgnoreCase(sessionName) ? ssrcFactory.getPlaySsrc(mediaServerItem.getId()) : ssrcFactory.getPlayBackSsrc(mediaServerItem.getId());
// sendRtpItem.setSsrc(ssrc) ;
// }
// MediaInfo mediaInfo = redisCatchStorage.getProxyStream(gbStream.getApp(), gbStream.getStream());
// if (mediaInfo != null) {
// sendProxyStream(sendRtpItem, mediaServerItem, platform, request);
// } else {
// //开启代理拉流
// notifyProxyStreamOnline(sendRtpItem, mediaServerItem, platform, request);
// }
// }
// }
// }
// }
// } catch (SdpParseException e) {
// log.error("sdp解析错误", e);
// } catch (SdpException e) {
// log.error("未处理的异常 ", e);
// }
}
private InviteInfo decode ( RequestEvent evt ) throws SdpException {
@@ -688,6 +712,42 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
}
private String createSendSdp ( SendRtpItem sendRtpItem , InviteInfo inviteInfo , String sdpIp ) {
StringBuilder content = new StringBuilder ( 200 ) ;
content . append ( " v=0 \ r \ n " ) ;
content . append ( " o= " + inviteInfo . getChannelId ( ) + " 0 0 IN IP4 " + sdpIp + " \ r \ n " ) ;
content . append ( " s= " + inviteInfo . getSessionName ( ) + " \ r \ n " ) ;
content . append ( " c=IN IP4 " + sdpIp + " \ r \ n " ) ;
if ( " Playback " . equalsIgnoreCase ( inviteInfo . getSessionName ( ) ) ) {
content . append ( " t= " + inviteInfo . getStartTime ( ) + " " + inviteInfo . getStopTime ( ) + " \ r \ n " ) ;
} else {
content . append ( " t=0 0 \ r \ n " ) ;
}
if ( sendRtpItem . isTcp ( ) ) {
content . append ( " m=video " + sendRtpItem . getLocalPort ( ) + " TCP/RTP/AVP 96 \ r \ n " ) ;
if ( ! sendRtpItem . isTcpActive ( ) ) {
content . append ( " a=setup:active \ r \ n " ) ;
} else {
content . append ( " a=setup:passive \ r \ n " ) ;
}
} else {
content . append ( " m=video " + sendRtpItem . getLocalPort ( ) + " RTP/AVP 96 \ r \ n " ) ;
}
content . append ( " a=sendonly \ r \ n " ) ;
content . append ( " a=rtpmap:96 PS/90000 \ r \ n " ) ;
content . append ( " y= " + sendRtpItem . getSsrc ( ) + " \ r \ n " ) ;
content . append ( " f= \ r \ n " ) ;
return content . toString ( ) ;
}
private void sendBye ( Platform platform , String callId ) {
try {
cmderFroPlatform . streamByeCmd ( platform , callId ) ;
} catch ( SipException | InvalidArgumentException | ParseException e ) {
log . error ( " [命令发送失败] 上级Invite 发送BYE: {} " , e . getMessage ( ) ) ;
}
}
private void startSendRtpStreamHand ( RequestEvent evt , SendRtpItem sendRtpItem , Platform parentPlatform ,
JSONObject jsonObject , Map < String , Object > param , CallIdHeader callIdHeader ) {
if ( jsonObject = = null ) {