@@ -28,14 +28,10 @@ import com.genersoft.iot.vmp.media.event.media.MediaArrivalEvent;
import com.genersoft.iot.vmp.media.event.media.MediaDepartureEvent ;
import com.genersoft.iot.vmp.media.event.media.MediaNotFoundEvent ;
import com.genersoft.iot.vmp.media.service.IMediaServerService ;
import com.genersoft.iot.vmp.media.zlm.SendRtpPortManager ;
import com.genersoft.iot.vmp.service.IReceiveRtpServerService ;
import com.genersoft.iot.vmp.service.ISendRtpServerService ;
import com.genersoft.iot.vmp.service.bean.DownloadFileInfo ;
import com.genersoft.iot.vmp.service.bean.ErrorCallback ;
import com.genersoft.iot.vmp.service.bean.InviteErrorCode ;
import com.genersoft.iot.vmp.service.bean.SSRCInfo ;
import com.genersoft.iot.vmp.service.bean.* ;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage ;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage ;
import com.genersoft.iot.vmp.utils.CloudRecordUtils ;
import com.genersoft.iot.vmp.utils.DateUtil ;
import com.genersoft.iot.vmp.vmanager.bean.AudioBroadcastResult ;
@@ -71,11 +67,6 @@ import java.util.Vector;
@DS ( " master " )
public class PlayServiceImpl implements IPlayService {
@Autowired
private IVideoManagerStorage storager ;
@Autowired
private ISIPCommander cmder ;
@@ -97,9 +88,6 @@ public class PlayServiceImpl implements IPlayService {
@Autowired
private HookSubscribe subscribe ;
@Autowired
private SendRtpPortManager sendRtpPortManager ;
@Autowired
private IMediaServerService mediaServerService ;
@@ -130,6 +118,9 @@ public class PlayServiceImpl implements IPlayService {
@Autowired
private ISendRtpServerService sendRtpServerService ;
@Autowired
private IReceiveRtpServerService receiveRtpServerService ;
/**
* 流到来的处理
*/
@@ -279,10 +270,7 @@ public class PlayServiceImpl implements IPlayService {
startTime , endTime
) ;
int tcpMode = d evice . getStreamMode ( ) . equals ( " TCP-ACTIVE " ) ? 2 : ( device. getStreamMode ( ) . equals ( " TCP-PASSIVE " ) ? 1 : 0 ) ;
SSRCInfo ssrcInfo = mediaServerService . openRTPServer ( event . getMediaServer ( ) , event . getStream ( ) , null ,
device . isSsrcCheck ( ) , true , 0 , false , ! deviceChannel . isHasAudio ( ) , false , tcpMode ) ;
playBack ( event . getMediaServer ( ) , ssrcInfo , device , deviceChannel , startTime , endTime , null ) ;
playBack ( event . getMediaServer ( ) , device , deviceChannel , startTime , endTime , null ) ;
}
}
@@ -341,21 +329,130 @@ public class PlayServiceImpl implements IPlayService {
}
}
}
String streamId = String . format ( " %s_%s " , device . getDeviceId ( ) , channelId ) ;
return play ( mediaServerItem , device , channel , ssrc , callback ) ;
}
private SSRCInfo play ( MediaServer mediaServerItem , Device device , DeviceChannel channel , String ssrc ,
ErrorCallback < StreamInfo > callback ) {
if ( mediaServerItem = = null ) {
if ( callback ! = null ) {
callback . run ( InviteErrorCode . ERROR_FOR_PARAMETER_ERROR . getCode ( ) ,
InviteErrorCode . ERROR_FOR_PARAMETER_ERROR . getMsg ( ) ,
null ) ;
}
return null ;
}
String streamId = String . format ( " %s_%s " , device . getDeviceId ( ) , channel . getDeviceId ( ) ) ;
int tcpMode = device . getStreamMode ( ) . equals ( " TCP-ACTIVE " ) ? 2 : ( device . getStreamMode ( ) . equals ( " TCP-PASSIVE " ) ? 1 : 0 ) ;
SSRCInfo ssrcInfo = mediaServerService . openRTPServer ( mediaServerItem , streamId , ssrc , device . isSsrcCheck ( ) , false , 0 , false , ! channel . isHasAudio ( ) , false , tcpMode ) ;
if ( ssrcInfo = = null ) {
callback . run ( InviteErrorCode . ERROR_FOR_RESOURCE_EXHAUSTION . g etCode ( ) , InviteErrorCode . ERROR_FOR_RESOURCE_EXHAUSTION . getMsg ( ) , null ) ;
RTPServerParam rtpServerParam = new RTPServerParam ( ) ;
rtpServerParam . setMediaServerItem ( mediaServerItem ) ;
rtpServerParam . s etStreamId ( streamId ) ;
rtpServerParam . setPresetSsrc ( ssrc ) ;
rtpServerParam . setSsrcCheck ( device . isSsrcCheck ( ) ) ;
rtpServerParam . setPlayback ( false ) ;
rtpServerParam . setPort ( 0 ) ;
rtpServerParam . setTcpMode ( tcpMode ) ;
rtpServerParam . setOnlyAuto ( false ) ;
rtpServerParam . setDisableAudio ( ! channel . isHasAudio ( ) ) ;
SSRCInfo ssrcInfo = receiveRtpServerService . openRTPServer ( rtpServerParam , ( code , msg , hookData ) - > {
if ( code = = InviteErrorCode . SUCCESS . getCode ( ) ) {
// hook响应
StreamInfo streamInfo = onPublishHandlerForPlay ( hookData . getMediaServer ( ) , hookData . getMediaInfo ( ) , device , channel ) ;
if ( streamInfo = = null ) {
if ( callback ! = null ) {
callback . run ( InviteErrorCode . ERROR_FOR_STREAM_PARSING_EXCEPTIONS . getCode ( ) ,
InviteErrorCode . ERROR_FOR_STREAM_PARSING_EXCEPTIONS . getMsg ( ) , null ) ;
}
inviteStreamService . call ( InviteSessionType . PLAY , channel . getId ( ) , null ,
InviteErrorCode . ERROR_FOR_STREAM_PARSING_EXCEPTIONS . getCode ( ) ,
InviteErrorCode . ERROR_FOR_STREAM_PARSING_EXCEPTIONS . getMsg ( ) , null ) ;
return ;
}
if ( callback ! = null ) {
callback . run ( InviteErrorCode . SUCCESS . getCode ( ) , InviteErrorCode . SUCCESS . getMsg ( ) , streamInfo ) ;
}
inviteStreamService . call ( InviteSessionType . PLAY , channel . getId ( ) , null ,
InviteErrorCode . SUCCESS . getCode ( ) ,
InviteErrorCode . SUCCESS . getMsg ( ) ,
streamInfo ) ;
log . info ( " [点播成功] deviceId: {}, channelId:{}, 码流类型:{} " , device . getDeviceId ( ) , channel . getDeviceId ( ) ,
channel . getStreamIdentification ( ) ) ;
snapOnPlay ( hookData . getMediaServer ( ) , device . getDeviceId ( ) , channel . getDeviceId ( ) , streamId ) ;
} else {
if ( callback ! = null ) {
callback . run ( code , msg , null ) ;
}
inviteStreamService . call ( InviteSessionType . PLAY , channel . getId ( ) , null , code , msg , null ) ;
inviteStreamService . removeInviteInfoByDeviceAndChannel ( InviteSessionType . PLAY , channel . getId ( ) ) ;
SsrcTransaction ssrcTransaction = sessionManager . getSsrcTransactionByStream ( streamId ) ;
if ( ssrcTransaction ! = null ) {
try {
cmder . streamByeCmd ( device , channel . getDeviceId ( ) , streamId , null ) ;
} catch ( InvalidArgumentException | ParseException | SipException | SsrcTransactionNotFoundException e ) {
log . error ( " [点播超时], 发送BYE失败 {} " , e . getMessage ( ) ) ;
} finally {
sessionManager . removeByStream ( streamId ) ;
}
}
}
} ) ;
if ( ssrcInfo = = null | | ssrcInfo . getPort ( ) < = 0 ) {
log . info ( " [点播端口/SSRC]获取失败, deviceId={},channelId={},ssrcInfo={} " , device . getDeviceId ( ) , channel . getDeviceId ( ) , ssrcInfo ) ;
callback . run ( InviteErrorCode . ERROR_FOR_RESOURCE_EXHAUSTION . getCode ( ) , " 获取端口或者ssrc失败 " , null ) ;
sessionManager . removeByStream ( streamId ) ;
inviteStreamService . call ( InviteSessionType . PLAY , channel . getId ( ) , null ,
InviteErrorCode . ERROR_FOR_RESOURCE_EXHAUSTION . getCode ( ) ,
InviteErrorCode . ERROR_FOR_RESOURCE_EXHAUSTION . getMsg ( ) ,
null ) ;
return null ;
}
play ( mediaServerItem , ssrcInfo , device , channel , callback ) ;
log . info ( " [点播开始] deviceId: {}, channelId: {},码流类型:{}, 收流端口: {}, 码流:{}, 收流模式:{}, SSRC: {}, SSRC校验: {} " ,
device . getDeviceId ( ) , channel . getDeviceId ( ) , channel . getStreamIdentification ( ) , ssrcInfo . getPort ( ) , ssrcInfo . getStream ( ) ,
device . getStreamMode ( ) , ssrcInfo . getSsrc ( ) , device . isSsrcCheck ( ) ) ;
// 初始化redis中的invite消息状态
InviteInfo inviteInfo = InviteInfo . getInviteInfo ( device . getDeviceId ( ) , channel . getId ( ) , ssrcInfo . getStream ( ) , ssrcInfo ,
mediaServerItem . getSdpIp ( ) , ssrcInfo . getPort ( ) , device . getStreamMode ( ) , InviteSessionType . PLAY ,
InviteSessionStatus . ready ) ;
inviteStreamService . updateInviteInfo ( inviteInfo ) ;
try {
cmder . playStreamCmd ( mediaServerItem , ssrcInfo , device , channel , ( eventResult ) - > {
// 处理收到200ok后的TCP主动连接以及SSRC不一致的问题
InviteOKHandler ( eventResult , ssrcInfo , mediaServerItem , device , channel , callback , inviteInfo , InviteSessionType . PLAY ) ;
} , ( event ) - > {
log . info ( " [点播失败] deviceId: {}, channelId:{}, {}: {} " , device . getDeviceId ( ) , channel . getDeviceId ( ) , event . statusCode , event . msg ) ;
receiveRtpServerService . closeRTPServer ( mediaServerItem , ssrcInfo ) ;
sessionManager . removeByStream ( ssrcInfo . getStream ( ) ) ;
if ( callback ! = null ) {
callback . run ( event . statusCode , event . msg , null ) ;
}
inviteStreamService . call ( InviteSessionType . PLAY , channel . getId ( ) , null ,
event . statusCode , event . msg , null ) ;
inviteStreamService . removeInviteInfoByDeviceAndChannel ( InviteSessionType . PLAY , channel . getId ( ) ) ;
} ) ;
} catch ( InvalidArgumentException | SipException | ParseException e ) {
log . error ( " [命令发送失败] 点播消息: {} " , e . getMessage ( ) ) ;
receiveRtpServerService . closeRTPServer ( mediaServerItem , ssrcInfo ) ;
sessionManager . removeByStream ( ssrcInfo . getStream ( ) ) ;
if ( callback ! = null ) {
callback . run ( InviteErrorCode . ERROR_FOR_SIP_SENDING_FAILED . getCode ( ) ,
InviteErrorCode . ERROR_FOR_SIP_SENDING_FAILED . getMsg ( ) , null ) ;
}
inviteStreamService . call ( InviteSessionType . PLAY , channel . getId ( ) , null ,
InviteErrorCode . ERROR_FOR_SIP_SENDING_FAILED . getCode ( ) ,
InviteErrorCode . ERROR_FOR_SIP_SENDING_FAILED . getMsg ( ) , null ) ;
inviteStreamService . removeInviteInfoByDeviceAndChannel ( InviteSessionType . PLAY , channel . getId ( ) ) ;
}
return ssrcInfo ;
}
private void talk ( MediaServer mediaServerItem , Device device , DeviceChannel channel , String stream ,
HookSubscribe . Event hookEvent , SipSubscribe . Event errorEvent ,
Runnable timeoutCallback , AudioBroadcastEvent audioEvent ) {
@@ -475,154 +572,8 @@ public class PlayServiceImpl implements IPlayService {
}
@Override
public void play ( MediaServer mediaServerItem , SSRCInfo ssrcInfo , Device device , DeviceChannel channel ,
ErrorCallback < StreamInfo > callback ) {
if ( mediaServerItem = = null | | ssrcInfo = = null ) {
if ( callback ! = null ) {
callback . run ( InviteErrorCode . ERROR_FOR_PARAMETER_ERROR . getCode ( ) ,
InviteErrorCode . ERROR_FOR_PARAMETER_ERROR . getMsg ( ) ,
null ) ;
}
return ;
}
log . info ( " [点播开始] deviceId: {}, channelId: {},码流类型:{}, 收流端口: {}, 码流:{}, 收流模式:{}, SSRC: {}, SSRC校验: {} " ,
device . getDeviceId ( ) , channel . getDeviceId ( ) , channel . getStreamIdentification ( ) , ssrcInfo . getPort ( ) , ssrcInfo . getStream ( ) ,
device . getStreamMode ( ) , ssrcInfo . getSsrc ( ) , device . isSsrcCheck ( ) ) ;
//端口获取失败的ssrcInfo 没有必要发送点播指令
if ( ssrcInfo . getPort ( ) < = 0 ) {
log . info ( " [点播端口分配异常], deviceId={},channelId={},ssrcInfo={} " , device . getDeviceId ( ) , channel . getDeviceId ( ) , ssrcInfo ) ;
// 释放ssrc
mediaServerService . releaseSsrc ( mediaServerItem . getId ( ) , ssrcInfo . getSsrc ( ) ) ;
sessionManager . removeByStream ( ssrcInfo . getStream ( ) ) ;
if ( callback ! = null ) {
callback . run ( InviteErrorCode . ERROR_FOR_RESOURCE_EXHAUSTION . getCode ( ) , " 点播端口分配异常 " , null ) ;
}
inviteStreamService . call ( InviteSessionType . PLAY , channel . getId ( ) , null ,
InviteErrorCode . ERROR_FOR_RESOURCE_EXHAUSTION . getCode ( ) , " 点播端口分配异常 " , null ) ;
return ;
}
// 初始化redis中的invite消息状态
InviteInfo inviteInfo = InviteInfo . getInviteInfo ( device . getDeviceId ( ) , channel . getId ( ) , ssrcInfo . getStream ( ) , ssrcInfo ,
mediaServerItem . getSdpIp ( ) , ssrcInfo . getPort ( ) , device . getStreamMode ( ) , InviteSessionType . PLAY ,
InviteSessionStatus . ready ) ;
inviteStreamService . updateInviteInfo ( inviteInfo ) ;
// 超时处理
String timeOutTaskKey = UUID . randomUUID ( ) . toString ( ) ;
dynamicTask . startDelay ( timeOutTaskKey , ( ) - > {
// 执行超时任务时查询是否已经成功,成功了则不执行超时任务,防止超时任务取消失败的情况
InviteInfo inviteInfoForTimeOut = inviteStreamService . getInviteInfoByDeviceAndChannel ( InviteSessionType . PLAY , channel . getId ( ) ) ;
if ( inviteInfoForTimeOut = = null | | inviteInfoForTimeOut . getStreamInfo ( ) = = null ) {
log . info ( " [点播超时] 收流超时 deviceId: {}, channelId: {},码流:{},端口:{}, SSRC: {} " ,
device . getDeviceId ( ) , channel . getDeviceId ( ) , channel . getStreamIdentification ( ) ,
ssrcInfo . getPort ( ) , ssrcInfo . getSsrc ( ) ) ;
if ( callback ! = null ) {
callback . run ( InviteErrorCode . ERROR_FOR_STREAM_TIMEOUT . getCode ( ) , InviteErrorCode . ERROR_FOR_STREAM_TIMEOUT . getMsg ( ) , null ) ;
}
inviteStreamService . call ( InviteSessionType . PLAY , channel . getId ( ) , null ,
InviteErrorCode . ERROR_FOR_STREAM_TIMEOUT . getCode ( ) , InviteErrorCode . ERROR_FOR_STREAM_TIMEOUT . getMsg ( ) , null ) ;
inviteStreamService . removeInviteInfoByDeviceAndChannel ( InviteSessionType . PLAY , channel . getId ( ) ) ;
try {
cmder . streamByeCmd ( device , channel . getDeviceId ( ) , ssrcInfo . getStream ( ) , null ) ;
} catch ( InvalidArgumentException | ParseException | SipException | SsrcTransactionNotFoundException e ) {
log . error ( " [点播超时], 发送BYE失败 {} " , e . getMessage ( ) ) ;
} finally {
mediaServerService . releaseSsrc ( mediaServerItem . getId ( ) , ssrcInfo . getSsrc ( ) ) ;
mediaServerService . closeRTPServer ( mediaServerItem , ssrcInfo . getStream ( ) ) ;
sessionManager . removeByStream ( ssrcInfo . getStream ( ) ) ;
mediaServerService . closeRTPServer ( mediaServerItem , ssrcInfo . getStream ( ) ) ;
// 取消订阅消息监听
subscribe . removeSubscribe ( Hook . getInstance ( HookType . on_media_arrival , " rtp " , ssrcInfo . getStream ( ) ) ) ;
}
} else {
log . info ( " [点播超时] 收流超时 deviceId: {}, channelId: {},码流:{},端口:{}, SSRC: {} " ,
device . getDeviceId ( ) , channel . getDeviceId ( ) , channel . getStreamIdentification ( ) ,
ssrcInfo . getPort ( ) , ssrcInfo . getSsrc ( ) ) ;
mediaServerService . releaseSsrc ( mediaServerItem . getId ( ) , ssrcInfo . getSsrc ( ) ) ;
mediaServerService . closeRTPServer ( mediaServerItem . getId ( ) , ssrcInfo . getStream ( ) ) ;
sessionManager . removeByStream ( ssrcInfo . getStream ( ) ) ;
}
} , userSetting . getPlayTimeout ( ) ) ;
try {
cmder . playStreamCmd ( mediaServerItem , ssrcInfo , device , channel , ( hookData ) - > {
log . info ( " 收到订阅消息: " + hookData ) ;
dynamicTask . stop ( timeOutTaskKey ) ;
// hook响应
StreamInfo streamInfo = onPublishHandlerForPlay ( hookData . getMediaServer ( ) , hookData . getMediaInfo ( ) , device , channel ) ;
if ( streamInfo = = null ) {
if ( callback ! = null ) {
callback . run ( InviteErrorCode . ERROR_FOR_STREAM_PARSING_EXCEPTIONS . getCode ( ) ,
InviteErrorCode . ERROR_FOR_STREAM_PARSING_EXCEPTIONS . getMsg ( ) , null ) ;
}
inviteStreamService . call ( InviteSessionType . PLAY , channel . getId ( ) , null ,
InviteErrorCode . ERROR_FOR_STREAM_PARSING_EXCEPTIONS . getCode ( ) ,
InviteErrorCode . ERROR_FOR_STREAM_PARSING_EXCEPTIONS . getMsg ( ) , null ) ;
return ;
}
if ( callback ! = null ) {
callback . run ( InviteErrorCode . SUCCESS . getCode ( ) , InviteErrorCode . SUCCESS . getMsg ( ) , streamInfo ) ;
}
inviteStreamService . call ( InviteSessionType . PLAY , channel . getId ( ) , null ,
InviteErrorCode . SUCCESS . getCode ( ) ,
InviteErrorCode . SUCCESS . getMsg ( ) ,
streamInfo ) ;
log . info ( " [点播成功] deviceId: {}, channelId:{}, 码流类型:{} " , device . getDeviceId ( ) , channel . getDeviceId ( ) ,
channel . getStreamIdentification ( ) ) ;
snapOnPlay ( hookData . getMediaServer ( ) , device . getDeviceId ( ) , channel . getDeviceId ( ) , ssrcInfo . getStream ( ) ) ;
} , ( eventResult ) - > {
// 处理收到200ok后的TCP主动连接以及SSRC不一致的问题
InviteOKHandler ( eventResult , ssrcInfo , mediaServerItem , device , channel ,
timeOutTaskKey , callback , inviteInfo , InviteSessionType . PLAY ) ;
} , ( event ) - > {
log . info ( " [点播失败] deviceId: {}, channelId:{}, {}: {} " , device . getDeviceId ( ) , channel . getDeviceId ( ) , event . statusCode , event . msg ) ;
dynamicTask . stop ( timeOutTaskKey ) ;
mediaServerService . closeRTPServer ( mediaServerItem , ssrcInfo . getStream ( ) ) ;
// 释放ssrc
mediaServerService . releaseSsrc ( mediaServerItem . getId ( ) , ssrcInfo . getSsrc ( ) ) ;
sessionManager . removeByStream ( ssrcInfo . getStream ( ) ) ;
if ( callback ! = null ) {
callback . run ( event . statusCode , event . msg , null ) ;
}
inviteStreamService . call ( InviteSessionType . PLAY , channel . getId ( ) , null ,
InviteErrorCode . ERROR_FOR_RESET_SSRC . getCode ( ) ,
String . format ( " 点播失败, 错误码: %s, %s " , event . statusCode , event . msg ) , null ) ;
inviteStreamService . removeInviteInfoByDeviceAndChannel ( InviteSessionType . PLAY , channel . getId ( ) ) ;
} ) ;
} catch ( InvalidArgumentException | SipException | ParseException e ) {
log . error ( " [命令发送失败] 点播消息: {} " , e . getMessage ( ) ) ;
dynamicTask . stop ( timeOutTaskKey ) ;
mediaServerService . closeRTPServer ( mediaServerItem , ssrcInfo . getStream ( ) ) ;
// 释放ssrc
mediaServerService . releaseSsrc ( mediaServerItem . getId ( ) , ssrcInfo . getSsrc ( ) ) ;
sessionManager . removeByStream ( ssrcInfo . getStream ( ) ) ;
if ( callback ! = null ) {
callback . run ( InviteErrorCode . ERROR_FOR_SIP_SENDING_FAILED . getCode ( ) ,
InviteErrorCode . ERROR_FOR_SIP_SENDING_FAILED . getMsg ( ) , null ) ;
}
inviteStreamService . call ( InviteSessionType . PLAY , channel . getId ( ) , null ,
InviteErrorCode . ERROR_FOR_SIP_SENDING_FAILED . getCode ( ) ,
InviteErrorCode . ERROR_FOR_SIP_SENDING_FAILED . getMsg ( ) , null ) ;
inviteStreamService . removeInviteInfoByDeviceAndChannel ( InviteSessionType . PLAY , channel . getId ( ) ) ;
}
}
private void tcpActiveHandler ( Device device , DeviceChannel channel , String contentString ,
MediaServer mediaServerItem , String timeOutTaskKey ,
SSRCInfo ssrcInfo , ErrorCallback < StreamInfo > callback ) {
MediaServer mediaServerItem , SSRCInfo ssrcInfo , ErrorCallback < StreamInfo > callback ) {
if ( ! device . getStreamMode ( ) . equalsIgnoreCase ( " TCP-ACTIVE " ) ) {
return ;
}
@@ -652,13 +603,8 @@ public class PlayServiceImpl implements IPlayService {
log . info ( " [TCP主动连接对方] 结果: {} " , result ) ;
if ( ! result ) {
// 主动连接失败,结束流程, 清理数据
dynamicTask . stop ( timeOutTaskKey ) ;
mediaServerService . closeRTPServer ( mediaServerItem , ssrcInfo . getStream ( ) ) ;
// 释放ssrc
mediaServerService . releaseSsrc ( mediaServerItem . getId ( ) , ssrcInfo . getSsrc ( ) ) ;
receiveRtpServerService . closeRTPServer ( mediaServerItem , ssrcInfo ) ;
sessionManager . removeByStream ( ssrcInfo . getStream ( ) ) ;
callback . run ( InviteErrorCode . ERROR_FOR_SDP_PARSING_EXCEPTIONS . getCode ( ) ,
InviteErrorCode . ERROR_FOR_SDP_PARSING_EXCEPTIONS . getMsg ( ) , null ) ;
inviteStreamService . call ( InviteSessionType . BROADCAST , channel . getId ( ) , null ,
@@ -667,10 +613,7 @@ public class PlayServiceImpl implements IPlayService {
}
} catch ( SdpException e ) {
log . error ( " [TCP主动连接对方] deviceId: {}, channelId: {}, 解析200OK的SDP信息失败 " , device . getDeviceId ( ) , channel . getDeviceId ( ) , e ) ;
dynamicTask . stop ( timeOutTaskKey ) ;
mediaServerService . closeRTPServer ( mediaServerItem , ssrcInfo . getStream ( ) ) ;
// 释放ssrc
mediaServerService . releaseSsrc ( mediaServerItem . getId ( ) , ssrcInfo . getSsrc ( ) ) ;
receiveRtpServerService . closeRTPServer ( mediaServerItem , ssrcInfo ) ;
sessionManager . removeByStream ( ssrcInfo . getStream ( ) ) ;
@@ -759,34 +702,86 @@ public class PlayServiceImpl implements IPlayService {
@Override
public void playBack ( Device device , DeviceChannel channel , String startTime ,
String endTime , ErrorCallback < StreamInfo > callback ) {
if ( device = = null ) {
throw new ControllerException ( ErrorCode . ERROR100 . getCode ( ) , " 设备不存在 " ) ;
}
if ( channel = = null ) {
throw new ControllerException ( ErrorCode . ERROR100 . getCode ( ) , " 通道不存在 " ) ;
}
MediaServer newMediaServerItem = getNewMediaServerItem ( device ) ;
if ( newMediaServerItem = = null ) {
throw new ControllerException ( ErrorCode . ERROR100 . getCode ( ) , " 未找到可用的节点 " ) ;
}
if ( device . getStreamMode ( ) . equalsIgnoreCase ( " TCP-ACTIVE " ) & & ! newMediaServerItem . isRtpEnable ( ) ) {
log . warn ( " [录像回放] 单端口收流时不支持TCP主动方式收流 deviceId: {},channelId:{} " , device . getDeviceId ( ) , channel . getDeviceId ( ) ) ;
throw new ControllerException ( ErrorCode . ERROR100 . getCode ( ) , " 单端口收流时不支持TCP主动方式收流 " ) ;
}
playBack ( newMediaServerItem , device , channel , startTime , endTime , callback ) ;
}
private void playBack ( MediaServer mediaServerItem ,
Device device , DeviceChannel channel , String startTime ,
String endTime , ErrorCallback < StreamInfo > callback ) {
String startTimeStr = startTime . replace ( " - " , " " )
. replace ( " : " , " " )
. replace ( " " , " " ) ;
String endTimeTimeStr = endTime . replace ( " - " , " " )
. replace ( " : " , " " )
. replace ( " " , " " ) ;
String stream = device . getDeviceId ( ) + " _ " + channel . getDeviceId ( ) + " _ " + startTimeStr + " _ " + endTimeTimeStr ;
int tcpMode = device . getStreamMode ( ) . equals ( " TCP-ACTIVE " ) ? 2 : ( device . getStreamMode ( ) . equals ( " TCP-PASSIVE " ) ? 1 : 0 ) ;
SSRCInfo ssrcInfo = mediaServerService . openRTPServer ( newMediaServerItem , stream , null , device . isSsrcCheck ( ) ,
true , 0 , false , ! channel . isHasAudio ( ) , false , tcpMode ) ;
playBack ( newMediaServerItem , ssrcInfo , device , channel , startTime , endTime , callback ) ;
}
public void playBack ( MediaServer mediaServerItem , SSRCInfo ssrcInfo ,
Device device , DeviceChannel channel , String startTime ,
String endTime , ErrorCallback < StreamInfo > callback ) {
if ( device = = null ) {
throw new ControllerException ( ErrorCode . ERROR100 . g etCode ( ) , " 设备不存在 " ) ;
}
if ( mediaServerItem = = null | | ssrcInfo = = null ) {
callback . run ( InviteErrorCode . ERROR_FOR_PARAMETER_ERROR . getCode ( ) ,
InviteErrorCode . ERROR_FOR_PARAMETER_ERROR . getMsg ( ) ,
RTPServerParam rtpServerParam = new RTPServerParam ( ) ;
rtpServerParam . setMediaServerItem ( mediaServerItem ) ;
rtpServerParam . setStreamId ( stream ) ;
rtpServerParam . setSsrcCheck ( device . isSsrcCheck ( ) ) ;
rtpServerParam . s etPlayback ( true ) ;
rtpServerParam . setPort ( 0 ) ;
rtpServerParam . setTcpMode ( tcpMode ) ;
rtpServerParam . setOnlyAuto ( false ) ;
rtpServerParam . setDisableAudio ( ! channel . isHasAudio ( ) ) ;
SSRCInfo ssrcInfo = receiveRtpServerService . openRTPServer ( rtpServerParam , ( code , msg , hookData ) - > {
if ( code = = InviteErrorCode . SUCCESS . getCode ( ) ) {
// hook响应
StreamInfo streamInfo = onPublishHandlerForPlayback ( hookData . getMediaServer ( ) , hookData . getMediaInfo ( ) , device , channel , startTime , endTime ) ;
if ( streamInfo = = null ) {
log . warn ( " 设备回放API调用失败! " ) ;
callback . run ( InviteErrorCode . ERROR_FOR_STREAM_PARSING_EXCEPTIONS . getCode ( ) ,
InviteErrorCode . ERROR_FOR_STREAM_PARSING_EXCEPTIONS . getMsg ( ) , null ) ;
return ;
}
callback . run ( InviteErrorCode . SUCCESS . getCode ( ) , InviteErrorCode . SUCCESS . getMsg ( ) , streamInfo ) ;
log . info ( " [录像回放] 成功 deviceId: {}, channelId: {}, 开始时间: {}, 结束时间: {} " , device . getDeviceId ( ) , channel . getGbDeviceId ( ) , startTime , endTime ) ;
} else {
if ( callback ! = null ) {
callback . run ( code , msg , null ) ;
}
inviteStreamService . call ( InviteSessionType . PLAYBACK , channel . getId ( ) , null , code , msg , null ) ;
inviteStreamService . removeInviteInfoByDeviceAndChannel ( InviteSessionType . PLAYBACK , channel . getId ( ) ) ;
SsrcTransaction ssrcTransaction = sessionManager . getSsrcTransactionByStream ( stream ) ;
if ( ssrcTransaction ! = null ) {
try {
cmder . streamByeCmd ( device , channel . getDeviceId ( ) , stream , null ) ;
} catch ( InvalidArgumentException | ParseException | SipException | SsrcTransactionNotFoundException e ) {
log . error ( " [录像回放] 发送BYE失败 {} " , e . getMessage ( ) ) ;
} finally {
sessionManager . removeByStream ( stream ) ;
}
}
}
} ) ;
if ( ssrcInfo = = null | | ssrcInfo . getPort ( ) < = 0 ) {
log . info ( " [回放端口/SSRC]获取失败, deviceId={},channelId={},ssrcInfo={} " , device . getDeviceId ( ) , channel . getDeviceId ( ) , ssrcInfo ) ;
if ( callback ! = null ) {
callback . run ( InviteErrorCode . ERROR_FOR_RESOURCE_EXHAUSTION . getCode ( ) , " 获取端口或者ssrc失败 " , null ) ;
}
sessionManager . removeByStream ( stream ) ;
inviteStreamService . call ( InviteSessionType . PLAY , channel . getId ( ) , null ,
InviteErrorCode . ERROR_FOR_RESOURCE_EXHAUSTION . getCode ( ) ,
InviteErrorCode . ERROR_FOR_RESOURCE_EXHAUSTION . getMsg ( ) ,
null ) ;
return ;
}
@@ -799,70 +794,37 @@ public class PlayServiceImpl implements IPlayService {
mediaServerItem . getSdpIp ( ) , ssrcInfo . getPort ( ) , device . getStreamMode ( ) , InviteSessionType . PLAYBACK ,
InviteSessionStatus . ready ) ;
inviteStreamService . updateInviteInfo ( inviteInfo ) ;
String playBackTimeOutTaskKey = UUID . randomUUID ( ) . toString ( ) ;
dynamicTask . startDelay ( playBackTimeOutTaskKey , ( ) - > {
log . warn ( " [录像回放] 超时, deviceId: {} , channelId: {} " , device . getDeviceId ( ) , channel . getGbDeviceId ( ) ) ;
inviteStreamService . removeInviteInfo ( inviteInfo ) ;
callback . run ( InviteErrorCode . ERROR_FOR_SIGNALLING_TIMEOUT . getCode ( ) , InviteErrorCode . ERROR_FOR_SIGNALLING_TIMEOUT . getMsg ( ) , null ) ;
try {
cmder . streamByeCmd ( device , channel . getGbDeviceId ( ) , ssrcInfo . getStream ( ) , null ) ;
} catch ( InvalidArgumentException | ParseException | SipException e ) {
log . error ( " [录像回放] 超时 发送BYE失败 {} " , e . getMessage ( ) ) ;
} catch ( SsrcTransactionNotFoundException e ) {
// 点播超时回复BYE 同时释放ssrc以及此次点播的资源
mediaServerService . releaseSsrc ( mediaServerItem . getId ( ) , ssrcInfo . getSsrc ( ) ) ;
mediaServerService . closeRTPServer ( mediaServerItem , ssrcInfo . getStream ( ) ) ;
sessionManager . removeByStream ( ssrcInfo . getStream ( ) ) ;
}
} , userSetting . getPlayTimeout ( ) ) ;
SipSubscribe . Event errorEvent = event - > {
log . info ( " [录像回放] 失败,{} {} " , event . statusCode , event . msg ) ;
dynamicTask . stop ( playBackTimeOutTaskKey ) ;
callback . run ( InviteErrorCode . ERROR_FOR_SIGNALLING_ERROR . getCode ( ) ,
String . format ( " 回放失败, 错误码: %s, %s " , event . statusCode , event . msg ) , null ) ;
mediaServerService . releaseSsrc ( mediaServerItem . getId ( ) , ssrcInfo . getSsrc ( ) ) ;
mediaServerService . closeRTPServer ( mediaServerItem , ssrcInfo . getStream ( ) ) ;
sessionManager . removeByStream ( ssrcInfo . getStream ( ) ) ;
inviteStreamService . removeInviteInfo ( inviteInfo ) ;
} ;
HookSubscribe . Event hookEvent = ( hookData ) - > {
log . info ( " 收到回放订阅消息: " + hookData ) ;
dynamicTask . stop ( playBackTimeOutTaskKey ) ;
StreamInfo streamInfo = onPublishHandlerForPlayback ( hookData . getMediaServer ( ) , hookData . getMediaInfo ( ) , device , channel , startTime , endTime ) ;
if ( streamInfo = = null ) {
log . warn ( " 设备回放API调用失败! " ) ;
callback . run ( InviteErrorCode . ERROR_FOR_STREAM_PARSING_EXCEPTIONS . getCode ( ) ,
InviteErrorCode . ERROR_FOR_STREAM_PARSING_EXCEPTIONS . getMsg ( ) , null ) ;
return ;
}
callback . run ( InviteErrorCode . SUCCESS . getCode ( ) , InviteErrorCode . SUCCESS . getMsg ( ) , streamInfo ) ;
log . info ( " [录像回放] 成功 deviceId: {}, channelId: {}, 开始时间: {}, 结束时间: {} " , device . getDeviceId ( ) , channel . getGbDeviceId ( ) , startTime , endTime ) ;
} ;
try {
cmder . playbackStreamCmd ( mediaServerItem , ssrcInfo , device , channel , startTime , endTime ,
hookEvent , eventResult - > {
eventResult - > {
// 处理收到200ok后的TCP主动连接以及SSRC不一致的问题
InviteOKHandler ( eventResult , ssrcInfo , mediaServerItem , device , channel ,
playBackTimeOutTaskKey , callback, inviteInfo , InviteSessionType . PLAYBACK ) ;
} , errorEvent ) ;
callback , inviteInfo , InviteSessionType . PLAYBACK ) ;
} , eventResult - > {
log . info ( " [录像回放] 失败,{} {} " , eventResult . statusCode , eventResult . msg ) ;
if ( callback ! = null ) {
callback . run ( eventResult . statusCode , eventResult . msg , null ) ;
}
receiveRtpServerService . closeRTPServer ( mediaServerItem , ssrcInfo ) ;
sessionManager . removeByStream ( ssrcInfo . getStream ( ) ) ;
inviteStreamService . removeInviteInfo ( inviteInfo ) ;
} ) ;
} catch ( InvalidArgumentException | SipException | ParseException e ) {
log . error ( " [命令发送失败] 录像回放: {} " , e . getMessage ( ) ) ;
SipSubscribe . EventResult eventResult = new SipSubscribe . EventResult ( ) ;
eventResult . type = SipSubscribe . EventResultType . cmdSendFailEvent ;
eventResult . statusCode = - 1 ;
eventResult . msg = " 命令发送失败 " ;
errorEvent . response ( eventResult ) ;
if ( callback ! = null ) {
callback . run ( InviteErrorCode . FAIL . getCode ( ) , e . getMessage ( ) , null ) ;
}
receiveRtpServerService . closeRTPServer ( mediaServerItem , ssrcInfo ) ;
sessionManager . removeByStream ( ssrcInfo . getStream ( ) ) ;
inviteStreamService . removeInviteInfo ( inviteInfo ) ;
}
}
private void InviteOKHandler ( SipSubscribe . EventResult eventResult , SSRCInfo ssrcInfo , MediaServer mediaServerItem ,
Device device , DeviceChannel channel , String timeOutTaskKey , ErrorCallback < StreamInfo > callback ,
Device device , DeviceChannel channel , ErrorCallback < StreamInfo > callback ,
InviteInfo inviteInfo , InviteSessionType inviteSessionType ) {
inviteInfo . setStatus ( InviteSessionStatus . ok ) ;
ResponseEvent responseEvent = ( ResponseEvent ) eventResult . event ;
@@ -877,7 +839,7 @@ public class PlayServiceImpl implements IPlayService {
if ( mediaServerItem . isRtpEnable ( ) ) {
// 多端口
if ( device . getStreamMode ( ) . equalsIgnoreCase ( " TCP-ACTIVE " ) ) {
tcpActiveHandler ( device , channel , contentString , mediaServerItem , timeOutTaskKey , ssrcInfo , callback ) ;
tcpActiveHandler ( device , channel , contentString , mediaServerItem , ssrcInfo , callback ) ;
}
} else {
// 单端口
@@ -906,7 +868,6 @@ public class PlayServiceImpl implements IPlayService {
log . error ( " [命令发送失败] 停止播放, 发送BYE: {} " , e . getMessage ( ) ) ;
}
dynamicTask . stop ( timeOutTaskKey ) ;
// 释放ssrc
mediaServerService . releaseSsrc ( mediaServerItem . getId ( ) , ssrcInfo . getSsrc ( ) ) ;
@@ -924,7 +885,7 @@ public class PlayServiceImpl implements IPlayService {
inviteInfo . setStream ( ssrcInfo . getStream ( ) ) ;
if ( device . getStreamMode ( ) . equalsIgnoreCase ( " TCP-ACTIVE " ) ) {
if ( mediaServerItem . isRtpEnable ( ) ) {
tcpActiveHandler ( device , channel , contentString , mediaServerItem , timeOutTaskKey , ssrcInfo , callback ) ;
tcpActiveHandler ( device , channel , contentString , mediaServerItem , ssrcInfo , callback ) ;
} else {
log . warn ( " [Invite 200OK] 单端口收流模式不支持tcp主动模式收流 " ) ;
}
@@ -1026,10 +987,10 @@ public class PlayServiceImpl implements IPlayService {
} ;
try {
cmder . downloadStreamCmd ( mediaServerItem , ssrcInfo , device , channel , startTime , endTime , downloadSpeed ,
hookEvent , errorEvent, eventResult - > {
errorEvent , eventResult - > {
// 处理收到200ok后的TCP主动连接以及SSRC不一致的问题
InviteOKHandler ( eventResult , ssrcInfo , mediaServerItem , device , channel ,
downLoadTimeOutTaskKey , callback , inviteInfo , InviteSessionType . DOWNLOAD ) ;
callback , inviteInfo , InviteSessionType . DOWNLOAD ) ;
// 注册录像回调事件,录像下载结束后写入下载地址
HookSubscribe . Event hookEventForRecord = ( hookData ) - > {