@@ -15,21 +15,26 @@ import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException;
import com.genersoft.iot.vmp.gb28181.bean.* ;
import com.genersoft.iot.vmp.gb28181.event.SipSubscribe ;
import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager ;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl. SIPCommander ;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.I SIPCommander ;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform ;
import com.genersoft.iot.vmp.gb28181.utils.SipUtils ;
import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils ;
import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory ;
import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe ;
import com.genersoft.iot.vmp.media.zlm.dto.* ;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory ;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForRecordMp4 ;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange ;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem ;
import com.genersoft.iot.vmp.media.zlm.dto.hook.HookParam ;
import com.genersoft.iot.vmp.media.zlm.dto.hook.OnRecordMp4HookParam ;
import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam ;
import com.genersoft.iot.vmp.service.* ;
import com.genersoft.iot.vmp.service.bean.* ;
import com.genersoft.iot.vmp.service.bean.DownloadFileInfo ;
import com.genersoft.iot.vmp.service.bean.ErrorCallback ;
import com.genersoft.iot.vmp.service.bean.InviteErrorCode ;
import com.genersoft.iot.vmp.service.bean.SSRCInfo ;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage ;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage ;
import com.genersoft.iot.vmp.storager.dao.CloudRecordServiceMapper ;
import com.genersoft.iot.vmp.utils.CloudRecordUtils ;
import com.genersoft.iot.vmp.utils.DateUtil ;
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode ;
@@ -63,7 +68,7 @@ public class PlayServiceImpl implements IPlayService {
private IVideoManagerStorage storager ;
@Autowired
private SIPCommander cmder ;
private I SIPCommander cmder ;
@Autowired
private SIPCommanderFroPlatform sipCommanderFroPlatform ;
@@ -95,15 +100,15 @@ public class PlayServiceImpl implements IPlayService {
@Autowired
private IDeviceService deviceService ;
@Autowired
private IDeviceChannelService channelService ;
@Autowired
private UserSetting userSetting ;
@Autowired
private DynamicTask dynamicTask ;
@Autowired
private CloudRecordServiceMapper cloudRecordServiceMapper ;
@Override
public SSRCInfo play ( MediaServerItem mediaServerItem , String deviceId , String channelId , String ssrc , ErrorCallback < Object > callback ) {
@@ -117,6 +122,11 @@ public class PlayServiceImpl implements IPlayService {
logger . warn ( " [点播] 单端口收流时不支持TCP主动方式收流 deviceId: {},channelId:{} " , deviceId , channelId ) ;
throw new ControllerException ( ErrorCode . ERROR100 . getCode ( ) , " 单端口收流时不支持TCP主动方式收流 " ) ;
}
DeviceChannel channel = channelService . getOne ( deviceId , channelId ) ;
if ( channel = = null ) {
logger . warn ( " [点播] 未找到通道 deviceId: {},channelId:{} " , deviceId , channelId ) ;
throw new ControllerException ( ErrorCode . ERROR100 . getCode ( ) , " 未找到通道 " ) ;
}
InviteInfo inviteInfo = inviteStreamService . getInviteInfoByDeviceAndChannel ( InviteSessionType . PLAY , deviceId , channelId ) ;
if ( inviteInfo ! = null ) {
if ( inviteInfo . getStreamInfo ( ) = = null ) {
@@ -165,13 +175,13 @@ public class PlayServiceImpl implements IPlayService {
null ) ;
return null ;
}
play ( mediaServerItem , ssrcInfo , device , channelId , callback ) ;
play ( mediaServerItem , ssrcInfo , device , channel , callback ) ;
return ssrcInfo ;
}
@Override
public void play ( MediaServerItem mediaServerItem , SSRCInfo ssrcInfo , Device device , String channelId ,
public void play ( MediaServerItem mediaServerItem , SSRCInfo ssrcInfo , Device device , DeviceChannel channel ,
ErrorCallback < Object > callback ) {
if ( mediaServerItem = = null | | ssrcInfo = = null ) {
@@ -180,110 +190,109 @@ public class PlayServiceImpl implements IPlayService {
null ) ;
return ;
}
logger . info ( " [点播开始] deviceId: {}, channelId: {},码流类型:{}, 收流端口: {}, STREAM : {}, 收流模式:{}, SSRC: {}, SSRC校验: {} " ,
device . getDeviceId ( ) , channelId , device . isSwitchPrimarySubStream ( ) ? " 辅码流 " : " 主码流 " , ssrcInfo . getPort ( ) , ssrcInfo . getStream ( ) ,
logger . info ( " [点播开始] deviceId: {}, channelId: {},码流类型:{}, 收流端口: {}, 码流 : {}, 收流模式:{}, SSRC: {}, SSRC校验: {} " ,
device . getDeviceId ( ) , channel. getChannelId ( ) , channel . getStreamIdentification ( ) , ssrcInfo . getPort ( ) , ssrcInfo . getStream ( ) ,
device . getStreamMode ( ) , ssrcInfo . getSsrc ( ) , device . isSsrcCheck ( ) ) ;
//端口获取失败的ssrcInfo 没有必要发送点播指令
if ( ssrcInfo . getPort ( ) < = 0 ) {
logger . info ( " [点播端口分配异常], deviceId={},channelId={},ssrcInfo={} " , device . getDeviceId ( ) , channelId , ssrcInfo ) ;
logger . info ( " [点播端口分配异常], deviceId={},channelId={},ssrcInfo={} " , device . getDeviceId ( ) , channel. getChannelId ( ) , ssrcInfo ) ;
// 释放ssrc
mediaServerService . releaseSsrc ( mediaServerItem . getId ( ) , ssrcInfo . getSsrc ( ) ) ;
streamSession . remove ( device . getDeviceId ( ) , channelId , ssrcInfo . getStream ( ) ) ;
streamSession . remove ( device . getDeviceId ( ) , channel. getChannelId ( ) , ssrcInfo . getStream ( ) ) ;
callback . run ( InviteErrorCode . ERROR_FOR_RESOURCE_EXHAUSTION . getCode ( ) , " 点播端口分配异常 " , null ) ;
inviteStreamService . call ( InviteSessionType . PLAY , device . getDeviceId ( ) , channelId , null ,
inviteStreamService . call ( InviteSessionType . PLAY , device . getDeviceId ( ) , channel. getChannelId ( ) , null ,
InviteErrorCode . ERROR_FOR_RESOURCE_EXHAUSTION . getCode ( ) , " 点播端口分配异常 " , null ) ;
return ;
}
// 初始化redis中的invite消息状态
InviteInfo inviteInfo = InviteInfo . getInviteInfo ( device . getDeviceId ( ) , channelId , ssrcInfo . getStream ( ) , ssrcInfo ,
InviteInfo inviteInfo = InviteInfo . getInviteInfo ( device . getDeviceId ( ) , channel. getChannelId ( ) , ssrcInfo . getStream ( ) , ssrcInfo ,
mediaServerItem . getSdpIp ( ) , ssrcInfo . getPort ( ) , device . getStreamMode ( ) , InviteSessionType . PLAY ,
InviteSessionStatus . ready ) ;
inviteInfo . setSubStream ( device . isSwitchPrimarySubStream ( ) ) ;
inviteStreamService . updateInviteInfo ( inviteInfo ) ;
// 超时处理
String timeOutTaskKey = UUID . randomUUID ( ) . toString ( ) ;
dynamicTask . startDelay ( timeOutTaskKey , ( ) - > {
// 执行超时任务时查询是否已经成功,成功了则不执行超时任务,防止超时任务取消失败的情况
InviteInfo inviteInfoForTimeOut = inviteStreamService . getInviteInfoByDeviceAndChannel ( InviteSessionType . PLAY , device . getDeviceId ( ) , channelId ) ;
InviteInfo inviteInfoForTimeOut = inviteStreamService . getInviteInfoByDeviceAndChannel ( InviteSessionType . PLAY , device . getDeviceId ( ) , channel. getChannelId ( ) ) ;
if ( inviteInfoForTimeOut = = null | | inviteInfoForTimeOut . getStreamInfo ( ) = = null ) {
logger . info ( " [点播超时] 收流超时 deviceId: {}, channelId: {},码流类型 : {},端口:{}, SSRC: {} " ,
device . getDeviceId ( ) , channelId , device . isSwitchPrimarySubStream ( ) ? " 辅码流 " : " 主码流 " ,
logger . info ( " [点播超时] 收流超时 deviceId: {}, channelId: {},码流:{},端口:{}, SSRC: {} " ,
device . getDeviceId ( ) , channel. getChannelId ( ) , channel . getStreamIdentification ( ) ,
ssrcInfo . getPort ( ) , ssrcInfo . getSsrc ( ) ) ;
callback . run ( InviteErrorCode . ERROR_FOR_STREAM_TIMEOUT . getCode ( ) , InviteErrorCode . ERROR_FOR_STREAM_TIMEOUT . getMsg ( ) , null ) ;
inviteStreamService . call ( InviteSessionType . PLAY , device . getDeviceId ( ) , channelId , null ,
inviteStreamService . call ( InviteSessionType . PLAY , device . getDeviceId ( ) , channel. getChannelId ( ) , null ,
InviteErrorCode . ERROR_FOR_STREAM_TIMEOUT . getCode ( ) , InviteErrorCode . ERROR_FOR_STREAM_TIMEOUT . getMsg ( ) , null ) ;
inviteStreamService . removeInviteInfoByDeviceAndChannel ( InviteSessionType . PLAY , device . getDeviceId ( ) , channelId ) ;
inviteStreamService . removeInviteInfoByDeviceAndChannel ( InviteSessionType . PLAY , device . getDeviceId ( ) , channel. getChannelId ( ) ) ;
try {
cmder . streamByeCmd ( device , channelId , ssrcInfo . getStream ( ) , null ) ;
cmder . streamByeCmd ( device , channel. getChannelId ( ) , ssrcInfo . getStream ( ) , null ) ;
} catch ( InvalidArgumentException | ParseException | SipException | SsrcTransactionNotFoundException e ) {
logger . error ( " [点播超时], 发送BYE失败 {} " , e . getMessage ( ) ) ;
} finally {
mediaServerService . releaseSsrc ( mediaServerItem . getId ( ) , ssrcInfo . getSsrc ( ) ) ;
mediaServerService . closeRTPServer ( mediaServerItem , ssrcInfo . getStream ( ) ) ;
streamSession . remove ( device . getDeviceId ( ) , channelId , ssrcInfo . getStream ( ) ) ;
streamSession . remove ( device . getDeviceId ( ) , channel. getChannelId ( ) , ssrcInfo . getStream ( ) ) ;
mediaServerService . closeRTPServer ( mediaServerItem , ssrcInfo . getStream ( ) ) ;
// 取消订阅消息监听
HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory . on_stream_changed ( " rtp " , ssrcInfo . getStream ( ) , true , " rtsp " , mediaServerItem . getId ( ) ) ;
subscribe . removeSubscribe ( hookSubscribe ) ;
}
} else {
logger . info ( " [点播超时] 收流超时 deviceId: {}, channelId: {},码流类型 : {},端口:{}, SSRC: {} " ,
device . getDeviceId ( ) , channelId , device . isSwitchPrimarySubStream ( ) ? " 辅码流 " : " 主码流 " ,
logger . info ( " [点播超时] 收流超时 deviceId: {}, channelId: {},码流:{},端口:{}, SSRC: {} " ,
device . getDeviceId ( ) , channel. getChannelId ( ) , channel . getStreamIdentification ( ) ,
ssrcInfo . getPort ( ) , ssrcInfo . getSsrc ( ) ) ;
mediaServerService . releaseSsrc ( mediaServerItem . getId ( ) , ssrcInfo . getSsrc ( ) ) ;
mediaServerService . closeRTPServer ( mediaServerItem . getId ( ) , ssrcInfo . getStream ( ) ) ;
streamSession . remove ( device . getDeviceId ( ) , channelId , ssrcInfo . getStream ( ) ) ;
streamSession . remove ( device . getDeviceId ( ) , channel. getChannelId ( ) , ssrcInfo . getStream ( ) ) ;
}
} , userSetting . getPlayTimeout ( ) ) ;
try {
cmder . playStreamCmd ( mediaServerItem , ssrcInfo , device , channelId , ( mediaServerItemInuse , hookParam ) - > {
cmder . playStreamCmd ( mediaServerItem , ssrcInfo , device , channel , ( mediaServerItemInuse , hookParam ) - > {
logger . info ( " 收到订阅消息: " + hookParam ) ;
dynamicTask . stop ( timeOutTaskKey ) ;
// hook响应
StreamInfo streamInfo = onPublishHandlerForPlay ( mediaServerItemInuse , hookParam , device . getDeviceId ( ) , channelId ) ;
StreamInfo streamInfo = onPublishHandlerForPlay ( mediaServerItemInuse , hookParam , device . getDeviceId ( ) , channel. getChannelId ( ) ) ;
if ( streamInfo = = null ) {
callback . run ( InviteErrorCode . ERROR_FOR_STREAM_PARSING_EXCEPTIONS . getCode ( ) ,
InviteErrorCode . ERROR_FOR_STREAM_PARSING_EXCEPTIONS . getMsg ( ) , null ) ;
inviteStreamService . call ( InviteSessionType . PLAY , device . getDeviceId ( ) , channelId , null ,
inviteStreamService . call ( InviteSessionType . PLAY , device . getDeviceId ( ) , channel. getChannelId ( ) , null ,
InviteErrorCode . ERROR_FOR_STREAM_PARSING_EXCEPTIONS . getCode ( ) ,
InviteErrorCode . ERROR_FOR_STREAM_PARSING_EXCEPTIONS . getMsg ( ) , null ) ;
return ;
}
callback . run ( InviteErrorCode . SUCCESS . getCode ( ) , InviteErrorCode . SUCCESS . getMsg ( ) , streamInfo ) ;
inviteStreamService . call ( InviteSessionType . PLAY , device . getDeviceId ( ) , channelId , null ,
inviteStreamService . call ( InviteSessionType . PLAY , device . getDeviceId ( ) , channel. getChannelId ( ) , null ,
InviteErrorCode . SUCCESS . getCode ( ) ,
InviteErrorCode . SUCCESS . getMsg ( ) ,
streamInfo ) ;
logger . info ( " [点播成功] deviceId: {}, channelId:{}, 码流类型:{} " , device . getDeviceId ( ) , channelId ,
device . isSwitchPrimarySubStream ( ) ? " 辅码流 " : " 主码流 " ) ;
snapOnPlay ( mediaServerItemInuse , device . getDeviceId ( ) , channelId , ssrcInfo . getStream ( ) ) ;
logger . info ( " [点播成功] deviceId: {}, channelId:{}, 码流类型:{} " , device . getDeviceId ( ) , channel. getChannelId ( ) ,
channel . getStreamIdentification ( ) ) ;
snapOnPlay ( mediaServerItemInuse , device . getDeviceId ( ) , channel. getChannelId ( ) , ssrcInfo . getStream ( ) ) ;
} , ( eventResult ) - > {
// 处理收到200ok后的TCP主动连接以及SSRC不一致的问题
InviteOKHandler ( eventResult , ssrcInfo , mediaServerItem , device , channelId ,
InviteOKHandler ( eventResult , ssrcInfo , mediaServerItem , device , channel. getChannelId ( ) ,
timeOutTaskKey , callback , inviteInfo , InviteSessionType . PLAY ) ;
} , ( event ) - > {
logger . info ( " [点播失败] deviceId: {}, channelId:{}, {}: {} " , device . getDeviceId ( ) , channelId , event . statusCode , event . msg ) ;
logger . info ( " [点播失败] deviceId: {}, channelId:{}, {}: {} " , device . getDeviceId ( ) , channel. getChannelId ( ) , event . statusCode , event . msg ) ;
dynamicTask . stop ( timeOutTaskKey ) ;
mediaServerService . closeRTPServer ( mediaServerItem , ssrcInfo . getStream ( ) ) ;
// 释放ssrc
mediaServerService . releaseSsrc ( mediaServerItem . getId ( ) , ssrcInfo . getSsrc ( ) ) ;
streamSession . remove ( device . getDeviceId ( ) , channelId , ssrcInfo . getStream ( ) ) ;
streamSession . remove ( device . getDeviceId ( ) , channel. getChannelId ( ) , ssrcInfo . getStream ( ) ) ;
callback . run ( InviteErrorCode . ERROR_FOR_SIGNALLING_ERROR . getCode ( ) ,
String . format ( " 点播失败, 错误码: %s, %s " , event . statusCode , event . msg ) , null ) ;
inviteStreamService . call ( InviteSessionType . PLAY , device . getDeviceId ( ) , channelId , null ,
inviteStreamService . call ( InviteSessionType . PLAY , device . getDeviceId ( ) , channel. getChannelId ( ) , null ,
InviteErrorCode . ERROR_FOR_RESET_SSRC . getCode ( ) ,
String . format ( " 点播失败, 错误码: %s, %s " , event . statusCode , event . msg ) , null ) ;
inviteStreamService . removeInviteInfoByDeviceAndChannel ( InviteSessionType . PLAY , device . getDeviceId ( ) , channelId ) ;
inviteStreamService . removeInviteInfoByDeviceAndChannel ( InviteSessionType . PLAY , device . getDeviceId ( ) , channel. getChannelId ( ) ) ;
} ) ;
} catch ( InvalidArgumentException | SipException | ParseException e ) {
@@ -293,15 +302,15 @@ public class PlayServiceImpl implements IPlayService {
// 释放ssrc
mediaServerService . releaseSsrc ( mediaServerItem . getId ( ) , ssrcInfo . getSsrc ( ) ) ;
streamSession . remove ( device . getDeviceId ( ) , channelId , ssrcInfo . getStream ( ) ) ;
streamSession . remove ( device . getDeviceId ( ) , channel. getChannelId ( ) , ssrcInfo . getStream ( ) ) ;
callback . run ( InviteErrorCode . ERROR_FOR_SIP_SENDING_FAILED . getCode ( ) ,
InviteErrorCode . ERROR_FOR_SIP_SENDING_FAILED . getMsg ( ) , null ) ;
inviteStreamService . call ( InviteSessionType . PLAY , device . getDeviceId ( ) , channelId , null ,
inviteStreamService . call ( InviteSessionType . PLAY , device . getDeviceId ( ) , channel. getChannelId ( ) , null ,
InviteErrorCode . ERROR_FOR_SIP_SENDING_FAILED . getCode ( ) ,
InviteErrorCode . ERROR_FOR_SIP_SENDING_FAILED . getMsg ( ) , null ) ;
inviteStreamService . removeInviteInfoByDeviceAndChannel ( InviteSessionType . PLAY , device . getDeviceId ( ) , channelId ) ;
inviteStreamService . removeInviteInfoByDeviceAndChannel ( InviteSessionType . PLAY , device . getDeviceId ( ) , channel. getChannelId ( ) ) ;
}
}
@@ -438,7 +447,7 @@ public class PlayServiceImpl implements IPlayService {
@Override
public void playBack ( String deviceId , String channelId , String startTime ,
String endTime , ErrorCallback < Object > callback ) {
String endTime , ErrorCallback < Object > callback ) {
Device device = storager . queryVideoDevice ( deviceId ) ;
if ( device = = null ) {
logger . warn ( " [录像回放] 未找到设备 deviceId: {},channelId:{} " , deviceId , channelId ) ;
@@ -463,8 +472,8 @@ public class PlayServiceImpl implements IPlayService {
@Override
public void playBack ( MediaServerItem mediaServerItem , SSRCInfo ssrcInfo ,
String deviceId , String channelId , String startTime ,
String endTime , ErrorCallback < Object > callback ) {
String deviceId , String channelId , String startTime ,
String endTime , ErrorCallback < Object > callback ) {
if ( mediaServerItem = = null | | ssrcInfo = = null ) {
callback . run ( InviteErrorCode . ERROR_FOR_PARAMETER_ERROR . getCode ( ) ,
InviteErrorCode . ERROR_FOR_PARAMETER_ERROR . getMsg ( ) ,
@@ -881,7 +890,7 @@ public class PlayServiceImpl implements IPlayService {
cmder . streamByeCmd ( device , ssrcTransaction . getChannelId ( ) ,
ssrcTransaction . getStream ( ) , null ) ;
} catch ( InvalidArgumentException | ParseException | SipException |
SsrcTransactionNotFoundException e ) {
SsrcTransactionNotFoundException e ) {
logger . error ( " [zlm离线]为正在使用此zlm的设备, 发送BYE失败 {} " , e . getMessage ( ) ) ;
}
}
@@ -1031,16 +1040,16 @@ public class PlayServiceImpl implements IPlayService {
MediaServerItem newMediaServerItem = getNewMediaServerItem ( device ) ;
play ( newMediaServerItem , deviceId , channelId , null , ( code , msg , data ) - > {
if ( code = = InviteErrorCode . SUCCESS . getCode ( ) ) {
InviteInfo inviteInfoForPlay = inviteStreamService . getInviteInfoByDeviceAndChannel ( InviteSessionType . PLAY , deviceId , channelId ) ;
if ( inviteInfoForPlay ! = null & & inviteInfoForPlay . getStreamInfo ( ) ! = null ) {
getSnap ( deviceId , channelId , fileName , errorCallback ) ;
} else {
errorCallback . run ( InviteErrorCode . FAIL . getCode ( ) , InviteErrorCode . FAIL . getMsg ( ) , null ) ;
}
} else {
errorCallback . run ( InviteErrorCode . FAIL . getCode ( ) , InviteErrorCode . FAIL . getMsg ( ) , null ) ;
}
if ( code = = InviteErrorCode . SUCCESS . getCode ( ) ) {
InviteInfo inviteInfoForPlay = inviteStreamService . getInviteInfoByDeviceAndChannel ( InviteSessionType . PLAY , deviceId , channelId ) ;
if ( inviteInfoForPlay ! = null & & inviteInfoForPlay . getStreamInfo ( ) ! = null ) {
getSnap ( deviceId , channelId , fileName , errorCallback ) ;
} else {
errorCallback . run ( InviteErrorCode . FAIL . getCode ( ) , InviteErrorCode . FAIL . getMsg ( ) , null ) ;
}
} else {
errorCallback . run ( InviteErrorCode . FAIL . getCode ( ) , InviteErrorCode . FAIL . getMsg ( ) , null ) ;
}
} ) ;
}