@@ -84,9 +84,6 @@ public class PlayServiceImpl implements IPlayService {
@Autowired
private IRedisCatchStorage redisCatchStorage ;
@Autowired
private ZLMServerFactory zlmServerFactory ;
@Autowired
private IInviteStreamService inviteStreamService ;
@@ -302,8 +299,7 @@ public class PlayServiceImpl implements IPlayService {
}
String mediaServerId = streamInfo . getMediaServerId ( ) ;
MediaServer mediaInfo = mediaServerService . getOne ( mediaServerId ) ;
Boolean ready = zlmServerFactory . isStreamReady ( mediaInfo , " rtp " , streamId ) ;
Boolean ready = mediaServerService . isStreamReady ( mediaInfo , " rtp " , streamId ) ;
if ( ready ! = null & & ready ) {
callback . run ( InviteErrorCode . SUCCESS . getCode ( ) , InviteErrorCode . SUCCESS . getMsg ( ) , streamInfo ) ;
inviteStreamService . call ( InviteSessionType . PLAY , device . getDeviceId ( ) , channelId , null ,
@@ -391,28 +387,15 @@ public class PlayServiceImpl implements IPlayService {
}
} , userSetting . getPlayTimeout ( ) ) ;
Map < String , Object > param = new HashMap < > ( 12 ) ;
param . put ( " vhost " , " __defaultVhost__ " ) ;
param . put ( " app " , sendRtpItem . getApp ( ) ) ;
param . put ( " stream " , sendRtpItem . getStream ( ) ) ;
param . put ( " ssrc " , sendRtpItem . getSsrc ( ) ) ;
param . put ( " src_port " , sendRtpItem . getLocalPort ( ) ) ;
param . put ( " pt " , sendRtpItem . getPt ( ) ) ;
param . put ( " use_ps " , sendRtpItem . isUsePs ( ) ? " 1 " : " 0 " ) ;
param . put ( " only_audio " , sendRtpItem . isOnlyAudio ( ) ? " 1 " : " 0 " ) ;
param . put ( " is_udp " , sendRtpItem . isTcp ( ) ? " 0 " : " 1 " ) ;
param . put ( " recv_stream_id " , sendRtpItem . getReceiveStream ( ) ) ;
param . put ( " close_delay_ms " , userSetting . getPlayTimeout ( ) * 1000 ) ;
zlmServerFactory . startSendRtpPassive ( mediaServerItem , param , jsonObject - > {
if ( jsonObject = = null | | jsonObject . getInteger ( " code " ) ! = 0 ) {
mediaServerService . releaseSsrc ( mediaServerItem . getId ( ) , sendRtpItem . getSsrc ( ) ) ;
logger . info ( " [语音对讲]失败 deviceId: {}, channelId: {} " , device . getDeviceId ( ) , channelId ) ;
audioEvent . call ( " 失败, " + jsonObject . getString ( " msg " ) ) ;
// 查看是否已经建立了通道, 存在则发送bye
stopTalk ( device , channelId ) ;
}
} ) ;
try {
mediaServerService . startSendRtpPassive ( mediaServerItem , null , sendRtpItem , userSetting . getPlayTimeout ( ) * 1000 ) ;
} catch ( ControllerException e ) {
mediaServerService . releaseSsrc ( mediaServerItem . getId ( ) , sendRtpItem . getSsrc ( ) ) ;
logger . info ( " [语音对讲]失败 deviceId: {}, channelId: {} " , device . getDeviceId ( ) , channelId ) ;
audioEvent . call ( " 失败, " + e . getMessage ( ) ) ;
// 查看是否已经建立了通道, 存在则发送bye
stopTalk ( device , channelId ) ;
}
// 查看设备是否已经在推流
@@ -1238,7 +1221,7 @@ public class PlayServiceImpl implements IPlayService {
SendRtpItem sendRtpItem = redisCatchStorage . querySendRTPServer ( device . getDeviceId ( ) , channelId , null , null ) ;
if ( sendRtpItem ! = null & & sendRtpItem . isOnlyAudio ( ) ) {
// 查询流是否存在,不存在则认为是异常状态
Boolean streamReady = zlm ServerFactory . isStreamReady ( mediaServerItem , sendRtpItem . getApp ( ) , sendRtpItem . getStream ( ) ) ;
Boolean streamReady = media ServerService . isStreamReady ( mediaServerItem , sendRtpItem . getApp ( ) , sendRtpItem . getStream ( ) ) ;
if ( streamReady ) {
logger . warn ( " 语音广播已经开启: {} " , channelId ) ;
event . call ( " 语音广播已经开启 " ) ;
@@ -1248,18 +1231,6 @@ public class PlayServiceImpl implements IPlayService {
}
}
}
// SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(device.getDeviceId(), channelId, null, null);
// if (sendRtpItem != null) {
// MediaServerItem mediaServer = mediaServerService.getOne(sendRtpItem.getMediaServerId());
// Boolean streamReady = zlmServerFactory.isStreamReady(mediaServer, sendRtpItem.getApp(), sendRtpItem.getStream());
// if (streamReady) {
// logger.warn("[语音对讲] 进行中: {}", channelId);
// event.call("语音对讲进行中");
// return false;
// } else {
// stopTalk(device, channelId);
// }
// }
// 发送通知
cmder . audioBroadcastCmd ( device , channelId , eventResultForOk - > {
@@ -1291,7 +1262,7 @@ public class PlayServiceImpl implements IPlayService {
if ( sendRtpItem ! = null & & sendRtpItem . isOnlyAudio ( ) ) {
// 查询流是否存在,不存在则认为是异常状态
MediaServer mediaServerServiceOne = mediaServerService . getOne ( sendRtpItem . getMediaServerId ( ) ) ;
Boolean streamReady = zlm ServerFactory . isStreamReady ( mediaServerServiceOne , sendRtpItem . getApp ( ) , sendRtpItem . getStream ( ) ) ;
Boolean streamReady = media ServerService . isStreamReady ( mediaServerServiceOne , sendRtpItem . getApp ( ) , sendRtpItem . getStream ( ) ) ;
if ( streamReady ) {
logger . warn ( " 语音广播通道使用中: {} " , channelId ) ;
return true ;
@@ -1447,24 +1418,7 @@ public class PlayServiceImpl implements IPlayService {
@Override
public void startPushStream ( SendRtpItem sendRtpItem , SIPResponse sipResponse , ParentPlatform platform , CallIdHeader callIdHeader ) {
// 开始发流
String is_Udp = sendRtpItem . isTcp ( ) ? " 0 " : " 1 " ;
MediaServer mediaInfo = mediaServerService . getOne ( sendRtpItem . getMediaServerId ( ) ) ;
logger . info ( " [开始推流] rtp/{}, 目标={}:{}, SSRC={}, RTCP={} " , sendRtpItem . getStream ( ) ,
sendRtpItem . getIp ( ) , sendRtpItem . getPort ( ) , sendRtpItem . getSsrc ( ) , sendRtpItem . isRtcp ( ) ) ;
Map < String , Object > param = new HashMap < > ( 12 ) ;
param . put ( " vhost " , " __defaultVhost__ " ) ;
param . put ( " app " , sendRtpItem . getApp ( ) ) ;
param . put ( " stream " , sendRtpItem . getStream ( ) ) ;
param . put ( " ssrc " , sendRtpItem . getSsrc ( ) ) ;
param . put ( " src_port " , sendRtpItem . getLocalPort ( ) ) ;
param . put ( " pt " , sendRtpItem . getPt ( ) ) ;
param . put ( " use_ps " , sendRtpItem . isUsePs ( ) ? " 1 " : " 0 " ) ;
param . put ( " only_audio " , sendRtpItem . isOnlyAudio ( ) ? " 1 " : " 0 " ) ;
param . put ( " is_udp " , is_Udp ) ;
if ( ! sendRtpItem . isTcp ( ) ) {
// udp模式下开启rtcp保活
param . put ( " udp_rtcp_timeout " , sendRtpItem . isRtcp ( ) ? " 1 " : " 0 " ) ;
}
if ( mediaInfo = = null ) {
RequestPushStreamMsg requestPushStreamMsg = RequestPushStreamMsg . getInstance (
@@ -1472,75 +1426,50 @@ public class PlayServiceImpl implements IPlayService {
sendRtpItem . getIp ( ) , sendRtpItem . getPort ( ) , sendRtpItem . getSsrc ( ) , sendRtpItem . isTcp ( ) ,
sendRtpItem . getLocalPort ( ) , sendRtpItem . getPt ( ) , sendRtpItem . isUsePs ( ) , sendRtpItem . isOnlyAudio ( ) ) ;
redisGbPlayMsgListener . sendMsgForStartSendRtpStream ( sendRtpItem . getServerId ( ) , requestPushStreamMsg , json - > {
startSendRtpStreamHand ( sendRtpItem , platform , json , param , callIdHeader ) ;
startSendRtpStreamFail Hand ( sendRtpItem , platform , callIdHeader ) ;
} ) ;
} else {
// 如果是严格模式,需要关闭端口占用
JSONObject startSendRtpStreamResult = null ;
if ( sendRtpItem . getLocalPort ( ) ! = 0 ) {
try {
if ( sendRtpItem . isTcpActive ( ) ) {
startSendRtpStreamResult = zlmServerFactory . startSendRtpPassive ( mediaInfo , param ) ;
mediaServerService . startSendRtpPassive ( mediaInfo , platform , sendRtpItem , null ) ;
} else {
param . put ( " dst_url " , sendRtpItem . getIp ( ) );
param . put ( " dst_port " , sendRtpItem . getPort ( ) ) ;
startSendRtpStreamResult = zlmServerFactory . startSendRtpStream ( mediaInfo , param ) ;
}
} else {
if ( sendRtpItem . isTcpActive ( ) ) {
startSendRtpStreamResult = zlmServerFactory . startSendRtpPassive ( mediaInfo , param ) ;
} else {
param . put ( " dst_url " , sendRtpItem . getIp ( ) ) ;
param . put ( " dst_port " , sendRtpItem . getPort ( ) ) ;
startSendRtpStreamResult = zlmServerFactory . startSendRtpStream ( mediaInfo , param ) ;
mediaServerService . startSendRtpStream ( mediaInfo , platform , sendRtpItem ) ;
}
} catch ( ControllerException e ) {
logger . error ( " RTP推流失败: {} " , e . getMessage ( ) ) ;
startSendRtpStreamFailHand ( sendRtpItem , platform , callIdHeader ) ;
return ;
}
if ( startSendRtpStreamResult ! = null ) {
startSendRtpStreamHand ( sendRtpItem , platform , startS endRtpStreamResult , param , callIdHeader ) ;
}
logger . info ( " RTP推流成功[ {}/{} ], {}, " , sendRtpItem . getApp ( ) , sendRtpItem . getStream ( ) ,
sendRtpItem . isTcpActive ( ) ? " 被动发流 " : sendRtpItem . getIp ( ) + " : " + sendRtpItem . getPort ( ) ) ;
}
}
@Override
public void startSendRtpStreamHand ( SendRtpItem sendRtpItem , Object correlationInfo ,
JSONObject jsonObject , Map < String , Object > param , CallIdHeader callIdHeader ) {
if ( jsonObject = = null ) {
logger . error ( " RTP推流失败: 请检查ZLM服务 " ) ;
} else if ( jsonObject . getInteger ( " code " ) = = 0 ) {
logger . info ( " 调用ZLM推流接口, 结果: {} " , jsonObject ) ;
logger . info ( " RTP推流成功[ {}/{} ], {}->{}, " , param . get ( " app " ) , param . get ( " stream " ) , jsonObject . getString ( " local_port " ) ,
sendRtpItem . isTcpActive ( ) ? " 被动发流 " : p aram . get ( " dst_url " ) + " : " + param . get ( " dst_port " ) ) ;
if ( sendRtpItem . getPlayType ( ) = = InviteStreamType . PUSH & & correlationInfo instanceof ParentPlatform ) {
ParentPlatform platform = ( ParentPlatform ) correlationInfo ;
MessageForPushChannel messageForPushChannel = MessageForPushChannel . getInstance ( 0 , sendRtpItem . getApp ( ) , sendRtpItem . getStream ( ) ,
sendRtpItem . getChannelId ( ) , platform . getServerGBId ( ) , platform . getName ( ) , userSetting . getServerId ( ) ,
sendRtpItem . getMediaServerId ( ) ) ;
messageForPushChannel . setPlatFormIndex ( platform . getId ( ) ) ;
redisCatchStorage . sendPlatformStartPlayMsg ( messageForPushChannel ) ;
public void startSendRtpStreamFail Hand ( SendRtpItem sendRtpItem , ParentPlatform platform , CallIdHeader callIdHeader ) {
if ( sendRtpItem . isOnlyAudio ( ) ) {
Device device = deviceService . getDevice ( sendRtpItem . getDeviceId ( ) ) ;
AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager . get ( sendRtpItem . getDeviceId ( ) , sendRtpItem . getChannelId ( ) ) ;
if ( audioBroadcastCatch ! = null ) {
try {
cmder . streamByeCmd ( device , sendRtpItem . getChannelId ( ) , audioBroadcastCatch . getSipTransactionInfo ( ) , null ) ;
} catch ( SipException | P arseException | InvalidArgumentException |
SsrcTransactionNotFoundException exception ) {
logger . error ( " [命令发送失败] 停止语音对讲: {} " , exception . getMessage ( ) ) ;
}
}
} else {
logger . error ( " RTP推流失败: {}, 参数:{} " , jsonObject . getString ( " msg " ) , JSONObject . toJSONString ( param ) ) ;
if ( sendRtpItem . isOnlyAudio ( ) ) {
Device device = deviceService . getDevice ( sendRtpItem . getDeviceId ( ) ) ;
AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager . get ( sendRtpItem . getDeviceId ( ) , sendRtpItem . getChannelId ( ) ) ;
if ( audioBroadcastCatch ! = null ) {
try {
cmder . streamByeCmd ( device , sendRtpItem . getChannelId ( ) , audioBroadcastCatch . getSipTransactionInfo ( ) , null ) ;
} catch ( SipException | ParseException | InvalidArgumentException |
SsrcTransactionNotFoundException e ) {
logger . error ( " [命令发送失败] 停止语音对讲: {} " , e . getMessage ( ) ) ;
}
}
} else {
if ( platform ! = null ) {
// 向上级平台
if ( correlationInfo instanceof ParentPlatform ) {
try {
ParentPlatform parentPlatform = ( ParentPlatform ) correlationInfo ;
commanderForPlatform . streamByeCmd ( parentPlatform , callIdHeader . getCallId ( ) ) ;
} catch ( SipException | InvalidArgumentException | ParseException e ) {
logger . error ( " [命令发送失败] 国标级联 发送BYE: {} " , e . getMessage ( ) ) ;
}
try {
commanderForPlatform . streamByeCmd ( platform , callIdHeader . getCallId ( ) ) ;
} catch ( SipException | InvalidArgumentException | ParseException e ) {
logger . error ( " [命令发送失败] 国标级联 发送BYE: {} " , e . getMessage ( ) ) ;
}
}
}
}
@@ -1563,7 +1492,7 @@ public class PlayServiceImpl implements IPlayService {
if ( sendRtpItem ! = null & & sendRtpItem . isOnlyAudio ( ) ) {
// 查询流是否存在,不存在则认为是异常状态
MediaServer mediaServer = mediaServerService . getOne ( sendRtpItem . getMediaServerId ( ) ) ;
Boolean streamReady = zlm ServerFactory . isStreamReady ( mediaServer , sendRtpItem . getApp ( ) , sendRtpItem . getStream ( ) ) ;
Boolean streamReady = media ServerService . isStreamReady ( mediaServer , sendRtpItem . getApp ( ) , sendRtpItem . getStream ( ) ) ;
if ( streamReady ) {
logger . warn ( " [语音对讲] 正在语音广播,无法开启语音通话: {} " , channelId ) ;
event . call ( " 正在语音广播 " ) ;
@@ -1577,7 +1506,7 @@ public class PlayServiceImpl implements IPlayService {
SendRtpItem sendRtpItem = redisCatchStorage . querySendRTPServer ( device . getDeviceId ( ) , channelId , stream , null ) ;
if ( sendRtpItem ! = null ) {
MediaServer mediaServer = mediaServerService . getOne ( sendRtpItem . getMediaServerId ( ) ) ;
Boolean streamReady = zlm ServerFactory . isStreamReady ( mediaServer , " rtp " , sendRtpItem . getReceiveStream ( ) ) ;
Boolean streamReady = media ServerService . isStreamReady ( mediaServer , " rtp " , sendRtpItem . getReceiveStream ( ) ) ;
if ( streamReady ) {
logger . warn ( " [语音对讲] 进行中: {} " , channelId ) ;
event . call ( " 语音对讲进行中 " ) ;
@@ -1624,12 +1553,7 @@ public class PlayServiceImpl implements IPlayService {
MediaServer mediaServer = mediaServerService . getOne ( mediaServerId ) ;
if ( streamIsReady = = null | | streamIsReady ) {
Map < String , Object > param = new HashMap < > ( ) ;
param . put ( " vhost " , " __defaultVhost__ " ) ;
param . put ( " app " , sendRtpItem . getApp ( ) ) ;
param . put ( " stream " , sendRtpItem . getStream ( ) ) ;
param . put ( " ssrc " , sendRtpItem . getSsrc ( ) ) ;
zlmServerFactory . stopSendRtpStream ( mediaServer , param ) ;
mediaServerService . stopSendRtp ( mediaServer , sendRtpItem . getApp ( ) , sendRtpItem . getStream ( ) , sendRtpItem . getSsrc ( ) ) ;
}
ssrcFactory . releaseSsrc ( mediaServerId , sendRtpItem . getSsrc ( ) ) ;