@@ -1,6 +1,8 @@
package com.genersoft.iot.vmp.service.impl ;
import com.alibaba.fastjson2.JSONObject ;
import com.genersoft.iot.vmp.common.InviteInfo ;
import com.genersoft.iot.vmp.common.InviteSessionStatus ;
import com.genersoft.iot.vmp.common.InviteSessionType ;
import com.baomidou.dynamic.datasource.annotation.DS ;
import com.genersoft.iot.vmp.conf.DynamicTask ;
@@ -11,6 +13,7 @@ import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
import com.genersoft.iot.vmp.gb28181.session.SSRCFactory ;
import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager ;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform ;
import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils ;
import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe ;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory ;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange ;
@@ -22,20 +25,20 @@ import com.genersoft.iot.vmp.service.IInviteStreamService;
import com.genersoft.iot.vmp.service.IMediaServerService ;
import com.genersoft.iot.vmp.service.IPlatformService ;
import com.genersoft.iot.vmp.service.IPlayService ;
import com.genersoft.iot.vmp.service.bean.GPSMsgInfo ;
import com.genersoft.iot.vmp.service.bean.InviteTimeOutCallback ;
import com.genersoft.iot.vmp.service.bean.SSRCInfo ;
import com.genersoft.iot.vmp.service.bean.* ;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage ;
import com.genersoft.iot.vmp.storager.dao.* ;
import com.genersoft.iot.vmp.utils.DateUtil ;
import com.github.pagehelper.PageHelper ;
import com.github.pagehelper.PageInfo ;
import gov.nist.javax.sip.message.SIPRequest ;
import gov.nist.javax.sip.message.SIPResponse ;
import org.slf4j.Logger ;
import org.slf4j.LoggerFactory ;
import org.springframework.beans.factory.annotation.Autowired ;
import org.springframework.stereotype.Service ;
import javax.sdp.* ;
import javax.sip.InvalidArgumentException ;
import javax.sip.ResponseEvent ;
import javax.sip.PeerUnavailableException ;
@@ -109,6 +112,9 @@ public class PlatformServiceImpl implements IPlatformService {
@Autowired
private IInviteStreamService inviteStreamService ;
@Autowired
private ZLMRESTfulUtils zlmresTfulUtils ;
@Override
public ParentPlatform queryPlatformByServerGBId ( String platformGbId ) {
@@ -466,21 +472,21 @@ public class PlatformServiceImpl implements IPlatformService {
logger . info ( " [国标级联] 语音喊话未找到可用的zlm. platform: {} " , platform . getServerGBId ( ) ) ;
return ;
}
InviteInfo inviteInfo = inviteStreamService . getInviteInfoByDeviceAndChannel ( InviteSessionType . PLAY , platform . getServerGBId ( ) , channelId ) ;
InviteInfo inviteInfoForOld = inviteStreamService . getInviteInfoByDeviceAndChannel ( InviteSessionType . PLAY , platform . getServerGBId ( ) , channelId ) ;
if ( inviteInfo ! = null & & inviteInfo . getStreamInfo ( ) ! = null ) {
if ( inviteInfoForOld ! = null & & inviteInfoForOld . getStreamInfo ( ) ! = null ) {
// 如果zlm不存在这个流, 则删除数据即可
MediaServerItem mediaServerItemForStreamInfo = mediaServerService . getOne ( inviteInfo . getStreamInfo ( ) . getMediaServerId ( ) ) ;
MediaServerItem mediaServerItemForStreamInfo = mediaServerService . getOne ( inviteInfoForOld . getStreamInfo ( ) . getMediaServerId ( ) ) ;
if ( mediaServerItemForStreamInfo ! = null ) {
Boolean ready = zlmServerFactory . isStreamReady ( mediaServerItemForStreamInfo , inviteInfo . getStreamInfo ( ) . getApp ( ) , inviteInfo . getStreamInfo ( ) . getStream ( ) ) ;
Boolean ready = zlmServerFactory . isStreamReady ( mediaServerItemForStreamInfo , inviteInfoForOld . getStreamInfo ( ) . getApp ( ) , inviteInfoForOld . getStreamInfo ( ) . getStream ( ) ) ;
if ( ! ready ) {
// 错误存在于redis中的数据
inviteStreamService . removeInviteInfo ( inviteInfo ) ;
inviteStreamService . removeInviteInfo ( inviteInfoForOld ) ;
} else {
// 流确实尚在推流,直接回调结果
OnStreamChangedHookParam hookParam = new OnStreamChangedHookParam ( ) ;
hookParam . setApp ( inviteInfo . getStreamInfo ( ) . getApp ( ) ) ;
hookParam . setStream ( inviteInfo . getStreamInfo ( ) . getStream ( ) ) ;
hookParam . setApp ( inviteInfoForOld . getStreamInfo ( ) . getApp ( ) ) ;
hookParam . setStream ( inviteInfoForOld . getStreamInfo ( ) . getStream ( ) ) ;
hookEvent . response ( mediaServerItemForStreamInfo , hookParam ) ;
return ;
@@ -515,6 +521,11 @@ public class PlatformServiceImpl implements IPlatformService {
logger . info ( " [国标级联] 语音喊话, 发起Invite消息 deviceId: {}, channelId: {},收流端口: {}, 收流模式:{}, SSRC: {}, SSRC校验: {} " ,
platform . getServerGBId ( ) , channelId , ssrcInfo . getPort ( ) , userSetting . getBroadcastForPlatform ( ) , ssrcInfo . getSsrc ( ) , ssrcCheck ) ;
// 初始化redis中的invite消息状态
InviteInfo inviteInfo = InviteInfo . getInviteInfo ( platform . getServerGBId ( ) , channelId , ssrcInfo . getStream ( ) , ssrcInfo ,
mediaServerItem . getSdpIp ( ) , ssrcInfo . getPort ( ) , userSetting . getBroadcastForPlatform ( ) , InviteSessionType . BROADCAST ,
InviteSessionStatus . ready ) ;
inviteStreamService . updateInviteInfo ( inviteInfo ) ;
String timeOutTaskKey = UUID . randomUUID ( ) . toString ( ) ;
dynamicTask . startDelay ( timeOutTaskKey , ( ) - > {
// 执行超时任务时查询是否已经成功,成功了则不执行超时任务,防止超时任务取消失败的情况
@@ -545,44 +556,48 @@ public class PlatformServiceImpl implements IPlatformService {
hookEvent . response ( mediaServerItem , hookParam ) ;
}
} , event - > {
// 收到200OK 检测ssrc是否有变化, 防止上级自定义了ssrc
ResponseE vent responseEvent = ( ResponseEvent ) event . event ;
String contentString = new String ( responseEvent . getResponse ( ) . getRawContent ( ) ) ;
// 获取 ssrc
int ssrcIndex = contentString . indexOf ( " y= " ) ;
// 检查是否有y字段
if ( ssrcIndex > = 0 ) {
//ssrc规定长度为10字节, 不取余下长度以避免后续还有“f=”字段 TODO 后续对不规范的非10位ssrc兼容
String ssrcInResponse = contentString . substring ( ssrcIndex + 2 , ssrcIndex + 12 ) ;
// 查询到ssrc不一致且开启了ssrc校验则需要针对处理
if ( ssrcInfo . getSsrc ( ) . equals ( ssrcInResponse ) | | ssrcCheck ) {
return ;
}
logger . info ( " [点播消息] 收到invite 200, 发现下级自定义了ssrc: {} " , ssrcInResponse ) ;
if ( ! mediaServerItem . isRtpEnable ( ) ) {
logger . info ( " [点播消息] SSRC修正 {}->{} " , ssrcInfo . getSsrc ( ) , ssrcInResponse ) ;
// 释放ssrc
mediaServerService . releaseSsrc ( mediaServerItem . getId ( ) , ssrcInfo . getSsrc ( ) ) ;
// 单端口模式streamId也有变化, 需要重新设置监听
if ( ! mediaServerItem . isRtpEnable ( ) ) {
// 添加订阅
HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory . on_stream_changed ( " rtp " , ssrcInfo . getStream ( ) , true , " rtsp " , mediaServerItem . getId ( ) ) ;
subscribe . removeSubscribe ( hookSubscribe ) ;
hookSubscribe . getContent ( ) . put ( " stream " , String . format ( " %08x " , Integer . parseInt ( ssrcInResponse ) ) . toUpperCase ( ) ) ;
subscribe . addSubscribe ( hookSubscribe , ( mediaServerItemInUse , hookParam ) - > {
logger . info ( " [ZLM HOOK] ssrc修正后收到订阅消息: " + hookParam ) ;
dynamicTask . stop ( timeOutTaskKey ) ;
// hook响应
playService . onPublishHandlerForPlay ( mediaServerItemInUse, hookParam, platform . getServerGBId ( ) , channelId ) ;
hookEvent . response ( mediaServerItemInUse , hookParam) ;
} ) ;
}
// 关闭rtp server
mediaServerService . closeRTPServer ( mediaServerItem , ssrcInfo . getStream ( ) ) ;
// 重新开启ssrc server
mediaServerService . openRTPServer ( mediaServerItem , ssrcInfo . getStream ( ) , ssrcInResponse , false , false , ssrcInfo . getPort ( ) , true , false , tcpMode ) ;
}
}
inviteOKHandler ( e vent, ssrcInfo , tcpMode , ssrcCheck , mediaServerItem , platform , channelId , timeOutTaskKey ,
null , inviteInfo , InviteSessionType . BROADCAST ) ;
// // 收到200OK 检测ssrc是否有变化, 防止上级自定义了 ssrc
// ResponseEvent responseEvent = (ResponseEvent) event.event ;
// String contentString = new String(responseEvent.getResponse().getRawContent());
// // 获取ssrc
// int ssrcIndex = contentString.indexOf("y=");
// // 检查是否有y字段
// if (ssrcIndex >= 0) {
// //ssrc规定长度为10字节, 不取余下长度以避免后续还有“f=”字段 TODO 后续对不规范的非10位ssrc兼容
// String ssrcInResponse = contentString.substring(ssrcIndex + 2, ssrcIndex + 12) ;
// // 查询到ssrc不一致且开启了ssrc校验则需要针对处理
// if (ssrcInfo.getSsrc().equals(ssrcInResponse) || ssrcCheck) {
// tcpActiveHandler(platform, )
// return ;
// }
// logger.info("[点播消息] 收到invite 200, 发现下级自定义了ssrc: {}", ssrcInResponse) ;
// if (!mediaServerItem.isRtpEnable()) {
// logger.info("[点播消息] SSRC修正 {}->{}", ssrcInfo.getSsrc(), ssrcInResponse);
// // 释放ssrc
// mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc()) ;
// // 单端口模式streamId也有变化, 需要重新设置监听
// if (!mediaServerItem.isRtpEnable()) {
// // 添加订阅
// HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", ssrcInfo.getStream(), true, "rtsp", mediaServerItem.getId()) ;
// subscribe.removeSubscribe(hookSubscribe) ;
// hookSubscribe.getContent().put("stream", String.format("%08x", Integer.parseInt(ssrcInResponse)).toUpperCase());
// subscribe.addSubscribe(hookSubscribe, ( mediaServerItemInUse, hookParam) -> {
// logger.info("[ZLM HOOK] ssrc修正后收到订阅消息: " + hookParam) ;
// dynamicTask.stop(timeOutTaskKey) ;
// // hook响应
// playService.onPublishHandlerForPlay(mediaServerItemInUse, hookParam, platform.getServerGBId(), channelId);
// hookEvent.response(mediaServerItemInUse, hookParam) ;
// });
// }
// // 关闭rtp server
// mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream());
// // 重新开启ssrc server
// mediaServerService.openRTPServer(mediaServerItem, ssrcInfo.getStream(), ssrcInResponse, false, false, ssrcInfo.getPort(), true, false, tcpMode);
// }
// }
} , eventResult - > {
// 收到错误回复
if ( errorEvent ! = null ) {
@@ -591,6 +606,158 @@ public class PlatformServiceImpl implements IPlatformService {
} ) ;
}
private void inviteOKHandler ( SipSubscribe . EventResult eventResult , SSRCInfo ssrcInfo , int tcpMode , boolean ssrcCheck , MediaServerItem mediaServerItem ,
ParentPlatform platform , String channelId , String timeOutTaskKey , ErrorCallback < Object > callback ,
InviteInfo inviteInfo , InviteSessionType inviteSessionType ) {
inviteInfo . setStatus ( InviteSessionStatus . ok ) ;
ResponseEvent responseEvent = ( ResponseEvent ) eventResult . event ;
String contentString = new String ( responseEvent . getResponse ( ) . getRawContent ( ) ) ;
System . out . println ( 1111 ) ;
System . out . println ( contentString ) ;
String ssrcInResponse = SipUtils . getSsrcFromSdp ( contentString ) ;
// 兼容回复的消息中缺少ssrc(y字段)的情况
if ( ssrcInResponse = = null ) {
ssrcInResponse = ssrcInfo . getSsrc ( ) ;
}
if ( ssrcInfo . getSsrc ( ) . equals ( ssrcInResponse ) ) {
// ssrc 一致
if ( mediaServerItem . isRtpEnable ( ) ) {
// 多端口
if ( tcpMode = = 2 ) {
tcpActiveHandler ( platform , channelId , contentString , mediaServerItem , tcpMode , ssrcCheck ,
timeOutTaskKey , ssrcInfo , callback ) ;
}
} else {
// 单端口
if ( tcpMode = = 2 ) {
logger . warn ( " [Invite 200OK] 单端口收流模式不支持tcp主动模式收流 " ) ;
}
}
} else {
logger . info ( " [Invite 200OK] 收到invite 200, 发现下级自定义了ssrc: {} " , ssrcInResponse ) ;
// ssrc 不一致
if ( mediaServerItem . isRtpEnable ( ) ) {
// 多端口
if ( ssrcCheck ) {
// ssrc检验
// 更新ssrc
logger . info ( " [Invite 200OK] SSRC修正 {}->{} " , ssrcInfo . getSsrc ( ) , ssrcInResponse ) ;
// 释放ssrc
mediaServerService . releaseSsrc ( mediaServerItem . getId ( ) , ssrcInfo . getSsrc ( ) ) ;
Boolean result = mediaServerService . updateRtpServerSSRC ( mediaServerItem , ssrcInfo . getStream ( ) , ssrcInResponse ) ;
if ( ! result ) {
try {
logger . warn ( " [Invite 200OK] 更新ssrc失败, 停止喊话 {}/{} " , platform . getServerGBId ( ) , channelId ) ;
commanderForPlatform . streamByeCmd ( platform , channelId , ssrcInfo . getStream ( ) , null , null ) ;
} catch ( InvalidArgumentException | SipException | ParseException | SsrcTransactionNotFoundException e ) {
logger . error ( " [命令发送失败] 停止播放, 发送BYE: {} " , e . getMessage ( ) ) ;
}
dynamicTask . stop ( timeOutTaskKey ) ;
// 释放ssrc
mediaServerService . releaseSsrc ( mediaServerItem . getId ( ) , ssrcInfo . getSsrc ( ) ) ;
streamSession . remove ( platform . getServerGBId ( ) , channelId , ssrcInfo . getStream ( ) ) ;
callback . run ( InviteErrorCode . ERROR_FOR_RESET_SSRC . getCode ( ) ,
" 下级自定义了ssrc,重新设置收流信息失败 " , null ) ;
inviteStreamService . call ( inviteSessionType , platform . getServerGBId ( ) , channelId , null ,
InviteErrorCode . ERROR_FOR_RESET_SSRC . getCode ( ) ,
" 下级自定义了ssrc,重新设置收流信息失败 " , null ) ;
} else {
ssrcInfo . setSsrc ( ssrcInResponse ) ;
inviteInfo . setSsrcInfo ( ssrcInfo ) ;
inviteInfo . setStream ( ssrcInfo . getStream ( ) ) ;
if ( tcpMode = = 2 ) {
if ( mediaServerItem . isRtpEnable ( ) ) {
tcpActiveHandler ( platform , channelId , contentString , mediaServerItem , tcpMode , ssrcCheck ,
timeOutTaskKey , ssrcInfo , callback ) ;
} else {
logger . warn ( " [Invite 200OK] 单端口收流模式不支持tcp主动模式收流 " ) ;
}
}
inviteStreamService . updateInviteInfo ( inviteInfo ) ;
}
} else {
ssrcInfo . setSsrc ( ssrcInResponse ) ;
inviteInfo . setSsrcInfo ( ssrcInfo ) ;
inviteInfo . setStream ( ssrcInfo . getStream ( ) ) ;
if ( tcpMode = = 2 ) {
if ( mediaServerItem . isRtpEnable ( ) ) {
tcpActiveHandler ( platform , channelId , contentString , mediaServerItem , tcpMode , ssrcCheck ,
timeOutTaskKey , ssrcInfo , callback ) ;
} else {
logger . warn ( " [Invite 200OK] 单端口收流模式不支持tcp主动模式收流 " ) ;
}
}
inviteStreamService . updateInviteInfo ( inviteInfo ) ;
}
} else {
if ( ssrcInResponse ! = null ) {
// 单端口
// 重新订阅流上线
SsrcTransaction ssrcTransaction = streamSession . getSsrcTransaction ( inviteInfo . getDeviceId ( ) ,
inviteInfo . getChannelId ( ) , null , inviteInfo . getStream ( ) ) ;
streamSession . remove ( inviteInfo . getDeviceId ( ) ,
inviteInfo . getChannelId ( ) , inviteInfo . getStream ( ) ) ;
inviteStreamService . updateInviteInfoForSSRC ( inviteInfo , ssrcInResponse ) ;
streamSession . put ( platform . getServerGBId ( ) , channelId , ssrcTransaction . getCallId ( ) ,
inviteInfo . getStream ( ) , ssrcInResponse , mediaServerItem . getId ( ) , ( SIPResponse ) responseEvent . getResponse ( ) , inviteSessionType ) ;
}
}
}
}
private void tcpActiveHandler ( ParentPlatform platform , String channelId , String contentString ,
MediaServerItem mediaServerItem , int tcpMode , boolean ssrcCheck ,
String timeOutTaskKey , SSRCInfo ssrcInfo , ErrorCallback < Object > callback ) {
if ( tcpMode ! = 2 ) {
return ;
}
String substring ;
if ( contentString . indexOf ( " y= " ) > 0 ) {
substring = contentString . substring ( 0 , contentString . indexOf ( " y= " ) ) ;
} else {
substring = contentString ;
}
try {
SessionDescription sdp = SdpFactory . getInstance ( ) . createSessionDescription ( substring ) ;
int port = - 1 ;
Vector mediaDescriptions = sdp . getMediaDescriptions ( true ) ;
for ( Object description : mediaDescriptions ) {
MediaDescription mediaDescription = ( MediaDescription ) description ;
Media media = mediaDescription . getMedia ( ) ;
Vector mediaFormats = media . getMediaFormats ( false ) ;
if ( mediaFormats . contains ( " 8 " ) | | mediaFormats . contains ( " 0 " ) ) {
port = media . getMediaPort ( ) ;
break ;
}
}
logger . info ( " [TCP主动连接对方] serverGbId: {}, channelId: {}, 连接对方的地址:{}:{}, SSRC: {}, SSRC校验: {} " ,
platform . getServerGBId ( ) , channelId , sdp . getConnection ( ) . getAddress ( ) , port , ssrcInfo . getSsrc ( ) , ssrcCheck ) ;
JSONObject jsonObject = zlmresTfulUtils . connectRtpServer ( mediaServerItem , sdp . getConnection ( ) . getAddress ( ) , port , ssrcInfo . getStream ( ) ) ;
logger . info ( " [TCP主动连接对方] 结果: {} " , jsonObject ) ;
} catch ( SdpException e ) {
logger . error ( " [TCP主动连接对方] serverGbId: {}, channelId: {}, 解析200OK的SDP信息失败 " , platform . getServerGBId ( ) , channelId , e ) ;
dynamicTask . stop ( timeOutTaskKey ) ;
mediaServerService . closeRTPServer ( mediaServerItem , ssrcInfo . getStream ( ) ) ;
// 释放ssrc
mediaServerService . releaseSsrc ( mediaServerItem . getId ( ) , ssrcInfo . getSsrc ( ) ) ;
streamSession . remove ( platform . getServerGBId ( ) , channelId , ssrcInfo . getStream ( ) ) ;
callback . run ( InviteErrorCode . ERROR_FOR_SDP_PARSING_EXCEPTIONS . getCode ( ) ,
InviteErrorCode . ERROR_FOR_SDP_PARSING_EXCEPTIONS . getMsg ( ) , null ) ;
inviteStreamService . call ( InviteSessionType . PLAY , platform . getServerGBId ( ) , channelId , null ,
InviteErrorCode . ERROR_FOR_SDP_PARSING_EXCEPTIONS . getCode ( ) ,
InviteErrorCode . ERROR_FOR_SDP_PARSING_EXCEPTIONS . getMsg ( ) , null ) ;
}
}
@Override
public void stopBroadcast ( ParentPlatform platform , DeviceChannel channel , String stream , boolean sendBye , MediaServerItem mediaServerItem ) {