@@ -12,6 +12,7 @@ import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform ;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor ;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent ;
import com.genersoft.iot.vmp.gb28181.utils.SipUtils ;
import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe ;
import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory ;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem ;
@@ -101,19 +102,12 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
@Override
public void process ( RequestEvent evt ) {
// Invite Request消息实现, 此消息一般为级联消息, 上级给下级发送请求视频指令
Long startTimeForInvite = System . currentTimeMillis ( ) ;
try {
Request request = evt . getRequest ( ) ;
SipURI sipURI = ( SipURI ) request . getRequestURI ( ) ;
String channelId = sipURI . getUser ( ) ;
String requesterId = null ;
FromHeader fromHeader = ( FromHeader ) request . getHeader ( FromHeader . NAME ) ;
String requesterId = SipUtils . getUserIdFromFromHeader ( request ) ;
CallIdHeader callIdHeader = ( CallIdHeader ) request . getHeader ( CallIdHeader . NAME ) ;
AddressImpl address = ( AddressImpl ) fromHeader . getAddress ( ) ;
SipUri uri = ( SipUri ) address . getURI ( ) ;
requesterId = uri . getUser ( ) ;
if ( requesterId = = null | | channelId = = null ) {
logger . info ( " 无法从FromHeader的Address中获取到平台id, 返回400 " ) ;
responseAck ( evt , Response . BAD_REQUEST ) ; // 参数不全, 发400, 请求错误
@@ -122,7 +116,9 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
// 查询请求是否来自上级平台\设备
ParentPlatform platform = storager . queryParentPlatByServerGBId ( requesterId ) ;
if ( platform ! = null ) {
if ( platform = = null ) {
inviteFromDeviceHandle ( evt , requesterId ) ;
} else {
// 查询平台下是否有该通道
DeviceChannel channel = storager . queryChannelInParentPlatform ( requesterId , channelId ) ;
GbStream gbStream = storager . queryStreamInParentPlatform ( requesterId , channelId ) ;
@@ -141,7 +137,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
mediaServerItem = mediaServerService . getOne ( mediaServerId ) ;
if ( mediaServerItem = = null ) {
logger . info ( " [ app={}, stream={} ]找不到zlm {}, 返回410 " , gbStream . getApp ( ) , gbStream . getStream ( ) , mediaServerId ) ;
responseAck ( evt , Response . GONE , " media server not found " );
responseAck ( evt , Response . GONE ) ;
return ;
}
Boolean streamReady = zlmrtpServerFactory . isStreamReady ( mediaServerItem , gbStream . getApp ( ) , gbStream . getStream ( ) ) ;
@@ -197,7 +193,6 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
// 查看是否支持PS 负载96
//String ip = null;
int port = - 1 ;
//boolean recvonly = false;
boolean mediaTransmissionTCP = false ;
Boolean tcpActive = null ;
for ( Object description : mediaDescriptions ) {
@@ -233,7 +228,6 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
}
String username = sdp . getOrigin ( ) . getUsername ( ) ;
String addressStr = sdp . getOrigin ( ) . getAddress ( ) ;
//String sessionName = sdp.getSessionName().getValue();
logger . info ( " [上级点播]用户:{}, 地址:{}:{}, ssrc: {} " , username , addressStr , port , ssrc ) ;
Device device = null ;
// 通过 channel 和 gbStream 是否为null 值判断来源是直播流合适国标
@@ -271,8 +265,10 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
Long finalStartTime = startTime ;
Long finalStopTime = stopTime ;
ZLMHttpHookSubscribe . Event hookEvent = ( mediaServerItemInUSe , responseJSON ) - > {
logger . info ( " [上级点播]收到下级开始点播订阅 , {}/{} " , sendRtpItem . getApp ( ) , sendRtpItem . getStreamId ( ) ) ;
// if (sendRtpItem == null) return;
logger . info ( " [上级点播]下级已经开始推流。 回复200OK(SDP) , {}/{} " , sendRtpItem . getApp ( ) , sendRtpItem . getStreamId ( ) ) ;
// * 0 等待设备推流上来
// * 1 下级已经推流, 等待上级平台回复ack
// * 2 推流中
sendRtpItem . setStatus ( 1 ) ;
redisCatchStorage . updateSendRTPSever ( sendRtpItem ) ;
@@ -301,9 +297,6 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
} catch ( ParseException e ) {
e . printStackTrace ( ) ;
}
if ( " Playback " . equals ( sessionName ) & & responseJSON ! = null ) {
playService . onPublishHandlerForPlayBack ( finalMediaServerItem , responseJSON , finalDevice . getDeviceId ( ) , channelId , null ) ;
}
} ;
SipSubscribe . Event errorEvent = ( ( event ) - > {
// 未知错误。直接转发设备点播的错误
@@ -319,10 +312,29 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
} ) ;
if ( " Playback " . equals ( sessionName ) ) {
sendRtpItem . setPlay ( false ) ;
SSRCInfo ssrcInfo = mediaServerService . openRTPServer ( mediaServerItem , sendRtpItem . getSsrc ( ) , true ) ;
sendRtpItem . setStreamId ( ssrc ) ;
SimpleDateFormat format = new SimpleDateFormat ( " yyyy-MM-dd HH:mm:ss " ) ;
commander . playb ackStreamCmd ( mediaServerItem , ssrcInfo , device , channelId , format . format ( start ) , format . format ( end ) , hookEvent , errorEvent ) ;
playService . playB ack( device . getDeviceId ( ) , channelId , format . format ( start ) , format . format ( end ) , result - > {
if ( result . getCode ( ) ! = 0 ) {
logger . warn ( " 录像回放失败 " ) ;
if ( result . getEvent ( ) ! = null ) {
errorEvent . response ( result . getEvent ( ) ) ;
}
try {
responseAck ( evt , Response . REQUEST_TIMEOUT ) ;
} catch ( SipException e ) {
e . printStackTrace ( ) ;
} catch ( InvalidArgumentException e ) {
e . printStackTrace ( ) ;
} catch ( ParseException e ) {
e . printStackTrace ( ) ;
}
} else {
if ( result . getMediaServerItem ( ) ! = null ) {
hookEvent . response ( result . getMediaServerItem ( ) , result . getResponse ( ) ) ;
}
}
} ) ;
} else {
sendRtpItem . setPlay ( true ) ;
StreamInfo streamInfo = redisCatchStorage . queryPlayByDevice ( device . getDeviceId ( ) , channelId ) ;
@@ -333,7 +345,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
sendRtpItem . setPlay ( false ) ;
playService . play ( mediaServerItem , device . getDeviceId ( ) , channelId , hookEvent , errorEvent ) ;
} else {
sendRtpItem . setStreamId ( streamInfo . getStreamId ( ) ) ;
sendRtpItem . setStreamId ( streamInfo . getStream ( ) ) ;
hookEvent . response ( mediaServerItem , null ) ;
}
}
@@ -379,72 +391,6 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
}
}
} else {
// 非上级平台请求,查询是否设备请求(通常为接收语音广播的设备)
Device device = redisCatchStorage . getDevice ( requesterId ) ;
if ( device ! = null ) {
logger . info ( " 收到设备 " + requesterId + " 的语音广播Invite请求 " ) ;
responseAck ( evt , Response . TRYING ) ;
String contentString = new String ( request . getRawContent ( ) ) ;
// jainSip不支持y=字段, 移除移除以解析。
String substring = contentString ;
String ssrc = " 0000000404 " ;
int ssrcIndex = contentString . indexOf ( " y= " ) ;
if ( ssrcIndex > 0 ) {
substring = contentString . substring ( 0 , ssrcIndex ) ;
ssrc = contentString . substring ( ssrcIndex + 2 , ssrcIndex + 12 ) ;
}
ssrcIndex = substring . indexOf ( " f= " ) ;
if ( ssrcIndex > 0 ) {
substring = contentString . substring ( 0 , ssrcIndex ) ;
}
SessionDescription sdp = SdpFactory . getInstance ( ) . createSessionDescription ( substring ) ;
// 获取支持的格式
Vector mediaDescriptions = sdp . getMediaDescriptions ( true ) ;
// 查看是否支持PS 负载96
int port = - 1 ;
//boolean recvonly = false;
boolean mediaTransmissionTCP = false ;
Boolean tcpActive = null ;
for ( int i = 0 ; i < mediaDescriptions . size ( ) ; i + + ) {
MediaDescription mediaDescription = ( MediaDescription ) mediaDescriptions . get ( i ) ;
Media media = mediaDescription . getMedia ( ) ;
Vector mediaFormats = media . getMediaFormats ( false ) ;
if ( mediaFormats . contains ( " 8 " ) ) {
port = media . getMediaPort ( ) ;
String protocol = media . getProtocol ( ) ;
// 区分TCP发流还是udp, 当前默认udp
if ( " TCP/RTP/AVP " . equals ( protocol ) ) {
String setup = mediaDescription . getAttribute ( " setup " ) ;
if ( setup ! = null ) {
mediaTransmissionTCP = true ;
if ( " active " . equals ( setup ) ) {
tcpActive = true ;
} else if ( " passive " . equals ( setup ) ) {
tcpActive = false ;
}
}
}
break ;
}
}
if ( port = = - 1 ) {
logger . info ( " 不支持的媒体格式, 返回415 " ) ;
// 回复不支持的格式
responseAck ( evt , Response . UNSUPPORTED_MEDIA_TYPE ) ; // 不支持的格式, 发415
return ;
}
String username = sdp . getOrigin ( ) . getUsername ( ) ;
String addressStr = sdp . getOrigin ( ) . getAddress ( ) ;
logger . info ( " 设备{}请求语音流,地址:{}:{}, ssrc: {} " , username , addressStr , port , ssrc ) ;
} else {
logger . warn ( " 来自无效设备/平台的请求 " ) ;
responseAck ( evt , Response . BAD_REQUEST ) ;
}
}
} catch ( SipException | InvalidArgumentException | ParseException e ) {
@@ -457,4 +403,74 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
e . printStackTrace ( ) ;
}
}
public void inviteFromDeviceHandle ( RequestEvent evt , String requesterId ) throws InvalidArgumentException , ParseException , SipException , SdpException {
// 非上级平台请求,查询是否设备请求(通常为接收语音广播的设备)
Device device = redisCatchStorage . getDevice ( requesterId ) ;
Request request = evt . getRequest ( ) ;
if ( device ! = null ) {
logger . info ( " 收到设备 " + requesterId + " 的语音广播Invite请求 " ) ;
responseAck ( evt , Response . TRYING ) ;
String contentString = new String ( request . getRawContent ( ) ) ;
// jainSip不支持y=字段, 移除移除以解析。
String substring = contentString ;
String ssrc = " 0000000404 " ;
int ssrcIndex = contentString . indexOf ( " y= " ) ;
if ( ssrcIndex > 0 ) {
substring = contentString . substring ( 0 , ssrcIndex ) ;
ssrc = contentString . substring ( ssrcIndex + 2 , ssrcIndex + 12 ) ;
}
ssrcIndex = substring . indexOf ( " f= " ) ;
if ( ssrcIndex > 0 ) {
substring = contentString . substring ( 0 , ssrcIndex ) ;
}
SessionDescription sdp = SdpFactory . getInstance ( ) . createSessionDescription ( substring ) ;
// 获取支持的格式
Vector mediaDescriptions = sdp . getMediaDescriptions ( true ) ;
// 查看是否支持PS 负载96
int port = - 1 ;
//boolean recvonly = false;
boolean mediaTransmissionTCP = false ;
Boolean tcpActive = null ;
for ( int i = 0 ; i < mediaDescriptions . size ( ) ; i + + ) {
MediaDescription mediaDescription = ( MediaDescription ) mediaDescriptions . get ( i ) ;
Media media = mediaDescription . getMedia ( ) ;
Vector mediaFormats = media . getMediaFormats ( false ) ;
if ( mediaFormats . contains ( " 8 " ) ) {
port = media . getMediaPort ( ) ;
String protocol = media . getProtocol ( ) ;
// 区分TCP发流还是udp, 当前默认udp
if ( " TCP/RTP/AVP " . equals ( protocol ) ) {
String setup = mediaDescription . getAttribute ( " setup " ) ;
if ( setup ! = null ) {
mediaTransmissionTCP = true ;
if ( " active " . equals ( setup ) ) {
tcpActive = true ;
} else if ( " passive " . equals ( setup ) ) {
tcpActive = false ;
}
}
}
break ;
}
}
if ( port = = - 1 ) {
logger . info ( " 不支持的媒体格式, 返回415 " ) ;
// 回复不支持的格式
responseAck ( evt , Response . UNSUPPORTED_MEDIA_TYPE ) ; // 不支持的格式, 发415
return ;
}
String username = sdp . getOrigin ( ) . getUsername ( ) ;
String addressStr = sdp . getOrigin ( ) . getAddress ( ) ;
logger . info ( " 设备{}请求语音流,地址:{}:{}, ssrc: {} " , username , addressStr , port , ssrc ) ;
} else {
logger . warn ( " 来自无效设备/平台的请求 " ) ;
responseAck ( evt , Response . BAD_REQUEST ) ;
}
}
}