@@ -1,5 +1,7 @@
package com.genersoft.iot.vmp.service.impl ;
import com.alibaba.fastjson2.JSON ;
import com.alibaba.fastjson2.JSONArray ;
import com.alibaba.fastjson2.JSONObject ;
import com.genersoft.iot.vmp.common.InviteInfo ;
import com.genersoft.iot.vmp.common.InviteSessionStatus ;
@@ -21,16 +23,12 @@ import com.genersoft.iot.vmp.media.zlm.AssistRESTfulUtils;
import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils ;
import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory ;
import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe ;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory ;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange ;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem ;
import com.genersoft.iot.vmp.media.zlm.dto.* ;
import com.genersoft.iot.vmp.media.zlm.dto.hook.HookParam ;
import com.genersoft.iot.vmp.media.zlm.dto.hook.OnRecordMp4HookParam ;
import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam ;
import com.genersoft.iot.vmp.service.* ;
import com.genersoft.iot.vmp.service.bean.CloudRecordItem ;
import com.genersoft.iot.vmp.service.bean.ErrorCallback ;
import com.genersoft.iot.vmp.service.bean.InviteErrorCode ;
import com.genersoft.iot.vmp.service.bean.SSRCInfo ;
import com.genersoft.iot.vmp.service.bean.* ;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage ;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage ;
import com.genersoft.iot.vmp.storager.dao.CloudRecordServiceMapper ;
@@ -77,7 +75,7 @@ public class PlayServiceImpl implements IPlayService {
private IInviteStreamService inviteStreamService ;
@Autowired
private DeferredResultHolder resultHolder ;
private ZlmHttpHookSubscribe subscribe ;
@Autowired
private ZLMRESTfulUtils zlmresTfulUtils ;
@@ -85,9 +83,6 @@ public class PlayServiceImpl implements IPlayService {
@Autowired
private ZLMServerFactory zlmServerFactory ;
@Autowired
private AssistRESTfulUtils assistRESTfulUtils ;
@Autowired
private IMediaService mediaService ;
@@ -106,9 +101,6 @@ public class PlayServiceImpl implements IPlayService {
@Autowired
private DynamicTask dynamicTask ;
@Autowired
private ZlmHttpHookSubscribe subscribe ;
@Autowired
private CloudRecordServiceMapper cloudRecordServiceMapper ;
@@ -741,60 +733,147 @@ public class PlayServiceImpl implements IPlayService {
@Override
public StreamInfo getDownLoadInfo ( String deviceId , String channelId , String stream ) {
InviteInfo inviteInfo = inviteStreamService . getInviteInfo ( InviteSessionType . DOWNLOAD , deviceId , channelId , stream ) ;
if ( inviteInfo = = null | | inviteInfo . getStreamInfo ( ) = = null ) {
logger . warn ( " [获取下载进度] 未查询到录像下载的信息 " ) ;
return null ;
}
if ( inviteInfo ! = null & & inviteInfo . getStreamInfo ( ) ! = null ) {
if ( inviteInfo . getStreamInfo ( ) . getProgress ( ) = = 1 ) {
return inviteInfo . getStreamInfo ( ) ;
}
// 获取当前已下载时长
String mediaServerId = inviteInfo . getStreamInfo ( ) . getMediaServerId ( ) ;
MediaServerItem mediaServerItem = mediaServerService . getOne ( mediaServerId ) ;
if ( mediaServerItem = = null ) {
logger . warn ( " 查询录像信息时发现节点已离线 " ) ;
return null ;
}
if ( mediaServerItem . getRecordAssistPort ( ) = = 0 ) {
throw new ControllerException ( ErrorCode . ERROR100 . getCode ( ) , " 未配置Assist服务, 无法完成录像下载 " ) ;
}
SsrcTransaction ssrcTransaction = streamSession . getSsrcTransaction ( deviceId , channelId , null , stream ) ;
if ( ssrcTransaction = = null ) {
logger . warn ( " [获取下载进度],未找到下载事务信息 " ) ;
return null ;
}
// 为了支持多个数据库,这里不能使用求和函数来直接获取总数了
List < CloudRecordItem > cloudRecordItemList = cloudRecordServiceMapper . getList ( null , " rtp " , inviteInfo . getStream ( ) , null , null , ssrcTransaction . getCallId ( ) , null ) ;
if ( cloudRecordItemList . isEmpty ( ) ) {
logger . warn ( " [获取下载进度],未找到下载视频信息 " ) ;
return null ;
}
long duration = 0 ;
for ( CloudRecordItem cloudRecordItem : cloudRecordItemList ) {
duration + = cloudRecordItem . getTimeLen ( ) ;
}
if ( duration = = 0 ) {
inviteInfo . getStreamInfo ( ) . setProgress ( 0 ) ;
} else {
String startTime = inviteInfo . getStreamInfo ( ) . getStartTime ( ) ;
String endTime = inviteInfo . getStreamInfo ( ) . getEndTime ( ) ;
// 此时start和end单位是秒
long start = DateUtil . yyyy_MM_dd_HH_mm_ssToTimestamp ( startTime ) ;
long end = DateUtil . yyyy_MM_dd_HH_mm_ssToTimestamp ( endTime ) ;
BigDecimal currentCount = new BigDecimal ( duration ) ;
BigDecimal totalCount = new BigDecimal ( ( end - start ) * 1000 ) ;
BigDecimal divide = currentCount . divide ( totalCount , 2 , RoundingMode . HALF_UP ) ;
double process = divide . doubleValue ( ) ;
inviteInfo . getStreamInfo ( ) . setProgress ( process ) ;
}
inviteStreamService . updateInviteInfo ( inviteInfo ) ;
if ( inviteInfo . getStreamInfo( ) . getProgress ( ) = = 1 ) {
return inviteInfo . getStreamInfo ( ) ;
}
return null ;
// 获取当前已下载时长
String mediaServerId = inviteInfo . getStreamInfo ( ) . getMediaServerId ( ) ;
MediaServerItem mediaServerItem = mediaServerService . getOne ( mediaServerId ) ;
if ( mediaServerItem = = null ) {
logger . warn ( " [获取下载进度] 查询录像信息时发现节点不存在 " ) ;
return null ;
}
SsrcTransaction ssrcTransaction = streamSession . getSsrcTransaction ( deviceId , channelId , null , stream ) ;
if ( ssrcTransaction = = null ) {
logger . warn ( " [获取下载进度] 下载已结束 " ) ;
return null ;
}
JSONObject mediaListJson = zlmresTfulUtils . getMediaList ( mediaServerItem , " rtp " , stream ) ;
if ( mediaListJson = = null ) {
logger . warn ( " [获取下载进度] 从zlm查询进度失败 " ) ;
return null ;
}
if ( mediaListJson . getInteger ( " code " ) ! = 0 ) {
logger . warn ( " [获取下载进度] 从zlm查询进度出现错误: {} " , mediaListJson . getString ( " msg " ) ) ;
return null ;
}
JSONArray data = mediaListJson . getJSONArray ( " data " ) ;
if ( data = = null ) {
logger . warn ( " [获取下载进度] 从zlm查询进度时未返回数据 " ) ;
return null ;
}
JSONObject mediaJSON = data . getJSONObject ( 0 ) ;
JSONArray tracks = mediaJSON . getJSONArray ( " tracks " ) ;
if ( tracks . isEmpty ( ) ) {
logger . warn ( " [获取下载进度] 从zlm查询进度时未返回数据 " ) ;
return null ;
}
JSONObject jsonObject = tracks . getJSONObject ( 0 ) ;
long duration = jsonObject . getLongValue ( " duration " ) ;
if ( duration = = 0 ) {
inviteInfo . getStreamInfo ( ) . setProgress ( 0 ) ;
} else {
String startTime = inviteInfo . getStreamInfo ( ) . getStartTime ( ) ;
String endTime = inviteInfo . getStreamInfo ( ) . getEndTime ( ) ;
// 此时start和end单位是秒
long start = DateUtil . yyyy_MM_dd_HH_mm_ssToTimestamp ( startTime ) ;
long end = DateUtil . yyyy_MM_dd_HH_mm_ssToTimestamp ( endTime ) ;
BigDecimal currentCount = new BigDecimal ( duration ) ;
BigDecimal totalCount = new BigDecimal ( ( end - start ) * 1000 ) ;
BigDecimal divide = currentCount . divide ( totalCount , 2 , RoundingMode . HALF_UP ) ;
double process = divide . doubleValue ( ) ;
inviteInfo . getStreamInfo ( ) . setProgress ( process ) ;
}
inviteStreamService . updateInviteInfo ( inviteInfo ) ;
return inviteInfo . getStreamInfo ( ) ;
}
@Override
public void getFilePath ( String deviceId , String channelId , String stream , ErrorCallback < DownloadFileInfo > callback ) {
InviteInfo inviteInfo = inviteStreamService . getInviteInfo ( InviteSessionType . DOWNLOAD , deviceId , channelId , stream ) ;
if ( inviteInfo = = null | | inviteInfo . getStreamInfo ( ) = = null ) {
logger . warn ( " [获取录像下载文件地址] 未查询到录像下载的信息, {}/{}-{} " , deviceId , channelId , stream ) ;
callback . run ( ErrorCode . ERROR100 . getCode ( ) , " 未查询到录像下载的信息 " , null ) ;
return ;
}
if ( ! ObjectUtils . isEmpty ( inviteInfo . getStreamInfo ( ) . getDownLoadFilePath ( ) ) ) {
callback . run ( ErrorCode . SUCCESS . getCode ( ) , ErrorCode . SUCCESS . getMsg ( ) ,
inviteInfo . getStreamInfo ( ) . getDownLoadFilePath ( ) ) ;
return ;
}
StreamAuthorityInfo streamAuthorityInfo = redisCatchStorage . getStreamAuthorityInfo ( " rtp " , stream ) ;
if ( streamAuthorityInfo = = null ) {
logger . warn ( " [获取录像下载文件地址] 未查询到录像的视频信息, {}/{}-{} " , deviceId , channelId , stream ) ;
callback . run ( ErrorCode . ERROR100 . getCode ( ) , " 未查询到录像的视频信息 " , null ) ;
return ;
}
// 获取当前已下载时长
String mediaServerId = inviteInfo . getStreamInfo ( ) . getMediaServerId ( ) ;
MediaServerItem mediaServerItem = mediaServerService . getOne ( mediaServerId ) ;
if ( mediaServerItem = = null ) {
logger . warn ( " [获取录像下载文件地址] 查询录像信息时发现节点不存在, {}/{}-{} " , deviceId , channelId , stream ) ;
callback . run ( ErrorCode . ERROR100 . getCode ( ) , " 查询录像信息时发现节点不存在 " , null ) ;
return ;
}
List < CloudRecordItem > cloudRecordItemList = cloudRecordServiceMapper . getListByCallId ( streamAuthorityInfo . getCallId ( ) ) ;
if ( ! cloudRecordItemList . isEmpty ( ) ) {
String filePath = cloudRecordItemList . get ( 0 ) . getFilePath ( ) ;
DownloadFileInfo downloadFileInfo = getDownloadFilePath ( mediaServerItem , filePath ) ;
inviteInfo . getStreamInfo ( ) . setDownLoadFilePath ( downloadFileInfo ) ;
inviteStreamService . updateInviteInfo ( inviteInfo ) ;
callback . run ( ErrorCode . SUCCESS . getCode ( ) , ErrorCode . SUCCESS . getMsg ( ) , downloadFileInfo ) ;
} else {
// 可能尚未生成, 那就监听hook等着收到对应的录像通知
ZlmHttpHookSubscribe . Event hookEvent = ( mediaServerItemInuse , hookParam ) - > {
logger . info ( " [录像下载]收到订阅消息: , {}/{}-{} " , deviceId , channelId , stream ) ;
logger . info ( " [录像下载]收到订阅消息内容: " + hookParam ) ;
dynamicTask . stop ( streamAuthorityInfo . getCallId ( ) ) ;
OnRecordMp4HookParam recordMp4HookParam = ( OnRecordMp4HookParam ) hookParam ;
String filePath = recordMp4HookParam . getFile_path ( ) ;
DownloadFileInfo downloadFileInfo = getDownloadFilePath ( mediaServerItem , filePath ) ;
inviteInfo . getStreamInfo ( ) . setDownLoadFilePath ( downloadFileInfo ) ;
inviteStreamService . updateInviteInfo ( inviteInfo ) ;
callback . run ( ErrorCode . SUCCESS . getCode ( ) , ErrorCode . SUCCESS . getMsg ( ) , downloadFileInfo ) ;
} ;
HookSubscribeForRecordMp4 hookSubscribe = HookSubscribeFactory . on_record_mp4 ( mediaServerId , " rtp " , stream ) ;
subscribe . addSubscribe ( hookSubscribe , hookEvent ) ;
// 设置超时,超时结束监听
dynamicTask . startDelay ( streamAuthorityInfo . getCallId ( ) , ( ) - > {
logger . info ( " [录像下载] 接收hook超时, {}/{}-{} " , deviceId , channelId , stream ) ;
subscribe . removeSubscribe ( hookSubscribe ) ;
callback . run ( ErrorCode . ERROR100 . getCode ( ) , " 接收hook超时 " , null ) ;
} , 10000 ) ;
}
}
private DownloadFileInfo getDownloadFilePath ( MediaServerItem mediaServerItem , String filePath ) {
DownloadFileInfo downloadFileInfo = new DownloadFileInfo ( ) ;
String pathTemplate = " %s://%s:%s/index/api/downloadFile?file_path= " + filePath ;
downloadFileInfo . setHttpPath ( String . format ( pathTemplate , " http " , mediaServerItem . getStreamIp ( ) ,
mediaServerItem . getHttpPort ( ) ) ) ;
if ( mediaServerItem . getHttpSSlPort ( ) > 0 ) {
downloadFileInfo . setHttpsPath ( String . format ( pathTemplate , " https " , mediaServerItem . getStreamIp ( ) ,
mediaServerItem . getHttpSSlPort ( ) ) ) ;
}
return downloadFileInfo ;
}
private StreamInfo onPublishHandlerForDownload ( MediaServerItem mediaServerItemInuse , HookParam hookParam , String deviceId , String channelId , String startTime , String endTime ) {