优化国标录像下载,添加进度条以及自动合并文件下载,需要结合新版assist服务使用。

This commit is contained in:
648540858
2022-03-25 16:05:14 +08:00
parent e9586687f7
commit 7d9cc96ef5
27 changed files with 761 additions and 591 deletions

View File

@@ -31,6 +31,9 @@ public class StreamInfo {
private String rtc;
private String mediaServerId;
private Object tracks;
private String startTime;
private String endTime;
private double progress;
public static class TransactionInfo{
public String callId;
@@ -264,4 +267,29 @@ public class StreamInfo {
public void setHttps_ts(String https_ts) {
this.https_ts = https_ts;
}
public String getStartTime() {
return startTime;
}
public void setStartTime(String startTime) {
this.startTime = startTime;
}
public String getEndTime() {
return endTime;
}
public void setEndTime(String endTime) {
this.endTime = endTime;
}
public double getProgress() {
return progress;
}
public void setProgress(double progress) {
this.progress = progress;
}
}

View File

@@ -1,5 +1,7 @@
package com.genersoft.iot.vmp.gb28181.bean;
import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
public class SsrcTransaction {
private String deviceId;
@@ -10,6 +12,7 @@ public class SsrcTransaction {
private byte[] dialog;
private String mediaServerId;
private String ssrc;
private VideoStreamSessionManager.SessionType type;
public String getDeviceId() {
return deviceId;
@@ -74,4 +77,12 @@ public class SsrcTransaction {
public void setSsrc(String ssrc) {
this.ssrc = ssrc;
}
public VideoStreamSessionManager.SessionType getType() {
return type;
}
public void setType(VideoStreamSessionManager.SessionType type) {
this.type = type;
}
}

View File

@@ -156,8 +156,6 @@ public class CatalogEventLister implements ApplicationListener<CatalogEvent> {
List<ParentPlatform> parentPlatforms = parentPlatformMap.get(gbId);
if (parentPlatforms != null && parentPlatforms.size() > 0) {
for (ParentPlatform platform : parentPlatforms) {
// String key = VideoManagerConstants.SIP_SUBSCRIBE_PREFIX + userSetup.getServerId() + "_Catalog_" + platform.getServerGBId();
// SubscribeInfo subscribeInfo = redisCatchStorage.getSubscribe(key);
SubscribeInfo subscribeInfo = subscribeHolder.getCatalogSubscribe(event.getPlatformId());
if (subscribeInfo == null) continue;
logger.info("[Catalog事件: {}]平台:{},影响通道{}", event.getType(), platform.getServerGBId(), gbId);

View File

@@ -30,6 +30,12 @@ public class VideoStreamSessionManager {
@Autowired
private UserSetup userSetup;
public enum SessionType {
play,
playback,
download
}
/**
* 添加一个点播/回放的事务信息
* 后续可以通过流Id/callID
@@ -40,7 +46,7 @@ public class VideoStreamSessionManager {
* @param mediaServerId 所使用的流媒体ID
* @param transaction 事务
*/
public void put(String deviceId, String channelId, String callId, String stream, String ssrc, String mediaServerId, ClientTransaction transaction){
public void put(String deviceId, String channelId, String callId, String stream, String ssrc, String mediaServerId, ClientTransaction transaction, SessionType type){
SsrcTransaction ssrcTransaction = new SsrcTransaction();
ssrcTransaction.setDeviceId(deviceId);
ssrcTransaction.setChannelId(channelId);
@@ -50,6 +56,7 @@ public class VideoStreamSessionManager {
ssrcTransaction.setCallId(callId);
ssrcTransaction.setSsrc(ssrc);
ssrcTransaction.setMediaServerId(mediaServerId);
ssrcTransaction.setType(type);
redisUtil.set(VideoManagerConstants.MEDIA_TRANSACTION_USED_PREFIX + userSetup.getServerId()
+ "_" + deviceId + "_" + channelId + "_" + callId + "_" + stream, ssrcTransaction);

View File

@@ -115,7 +115,9 @@ public interface ISIPCommander {
* @param endTime 结束时间,格式要求yyyy-MM-dd HH:mm:ss
* @param downloadSpeed 下载倍速参数
*/
void downloadStreamCmd(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId, String startTime, String endTime, String downloadSpeed, InviteStreamCallback event, SipSubscribe.Event errorEvent);
void downloadStreamCmd(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId,
String startTime, String endTime, int downloadSpeed, InviteStreamCallback inviteStreamCallback, InviteStreamCallback hookEvent,
SipSubscribe.Event errorEvent);
/**
* 视频流停止

View File

@@ -428,7 +428,7 @@ public class SIPCommander implements ISIPCommander {
errorEvent.response(e);
}), e ->{
// 这里为例避免一个通道的点播只有一个callID这个参数使用一个固定值
streamSession.put(device.getDeviceId(), channelId ,"play", streamId, ssrcInfo.getSsrc(), mediaServerItem.getId(), ((ResponseEvent)e.event).getClientTransaction());
streamSession.put(device.getDeviceId(), channelId ,"play", streamId, ssrcInfo.getSsrc(), mediaServerItem.getId(), ((ResponseEvent)e.event).getClientTransaction(), VideoStreamSessionManager.SessionType.play);
streamSession.put(device.getDeviceId(), channelId ,"play", e.dialog);
});
@@ -537,7 +537,7 @@ public class SIPCommander implements ISIPCommander {
transmitRequest(device, request, errorEvent, okEvent -> {
ResponseEvent responseEvent = (ResponseEvent) okEvent.event;
streamSession.put(device.getDeviceId(), channelId, callIdHeader.getCallId(), ssrcInfo.getStream(), ssrcInfo.getSsrc(), mediaServerItem.getId(), responseEvent.getClientTransaction());
streamSession.put(device.getDeviceId(), channelId, callIdHeader.getCallId(), ssrcInfo.getStream(), ssrcInfo.getSsrc(), mediaServerItem.getId(), responseEvent.getClientTransaction(), VideoStreamSessionManager.SessionType.playback);
streamSession.put(device.getDeviceId(), channelId, callIdHeader.getCallId(), okEvent.dialog);
});
if (inviteStreamCallback != null) {
@@ -558,8 +558,9 @@ public class SIPCommander implements ISIPCommander {
* @param downloadSpeed 下载倍速参数
*/
@Override
public void downloadStreamCmd(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId, String startTime, String endTime, String downloadSpeed, InviteStreamCallback event
, SipSubscribe.Event errorEvent) {
public void downloadStreamCmd(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId,
String startTime, String endTime, int downloadSpeed, InviteStreamCallback inviteStreamCallback, InviteStreamCallback hookEvent,
SipSubscribe.Event errorEvent) {
try {
logger.info("{} 分配的ZLM为: {} [{}:{}]", ssrcInfo.getStream(), mediaServerItem.getId(), mediaServerItem.getIp(), ssrcInfo.getPort());
@@ -572,8 +573,6 @@ public class SIPCommander implements ISIPCommander {
content.append("t="+DateUtil.yyyy_MM_dd_HH_mm_ssToTimestamp(startTime)+" "
+DateUtil.yyyy_MM_dd_HH_mm_ssToTimestamp(endTime) +"\r\n");
String streamMode = device.getStreamMode().toUpperCase();
if (userSetup.isSeniorSdp()) {
@@ -639,15 +638,20 @@ public class SIPCommander implements ISIPCommander {
logger.debug("录像回放添加订阅,订阅内容:" + subscribeKey.toString());
subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey,
(MediaServerItem mediaServerItemInUse, JSONObject json)->{
event.call(new InviteStreamInfo(mediaServerItem, json, callIdHeader.getCallId(), "rtp", ssrcInfo.getStream()));
hookEvent.call(new InviteStreamInfo(mediaServerItem, json, callIdHeader.getCallId(), "rtp", ssrcInfo.getStream()));
subscribe.removeSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey);
});
Request request = headerProvider.createPlaybackInviteRequest(device, channelId, content.toString(), null, "fromplybck" + tm, null, callIdHeader, ssrcInfo.getSsrc());
if (inviteStreamCallback != null) {
inviteStreamCallback.call(new InviteStreamInfo(mediaServerItem, null, callIdHeader.getCallId(), "rtp", ssrcInfo.getStream()));
}
transmitRequest(device, request, errorEvent, okEvent->{
ResponseEvent responseEvent = (ResponseEvent) okEvent.event;
streamSession.put(device.getDeviceId(), channelId, callIdHeader.getCallId(), ssrcInfo.getStream(), ssrcInfo.getSsrc(), mediaServerItem.getId(), responseEvent.getClientTransaction(), VideoStreamSessionManager.SessionType.download);
streamSession.put(device.getDeviceId(), channelId, callIdHeader.getCallId(), okEvent.dialog);
});
ClientTransaction transaction = transmitRequest(device, request, errorEvent);
streamSession.put(device.getDeviceId(), channelId, callIdHeader.getCallId(), ssrcInfo.getStream(), ssrcInfo.getSsrc(), mediaServerItem.getId(), transaction);
streamSession.put(device.getDeviceId(), channelId, callIdHeader.getCallId(), ssrcInfo.getStream(), ssrcInfo.getSsrc(), mediaServerItem.getId(), transaction);
} catch ( SipException | ParseException | InvalidArgumentException e) {
e.printStackTrace();

View File

@@ -104,6 +104,7 @@ public class CatalogNotifyMessageHandler extends SIPRequestProcessorParent imple
DeviceChannel deviceChannel = storager.queryChannel(channelReduce.getDeviceId(), channelReduce.getChannelId());
deviceChannel.setParental(0);
deviceChannel.setParentId(channelReduce.getCatalogId());
deviceChannel.setCivilCode(parentPlatform.getDeviceGBId().substring(0, 6));
cmderFroPlatform.catalogQuery(deviceChannel, parentPlatform, sn, fromHeader.getTag(), size);
// 防止发送过快
Thread.sleep(100);

View File

@@ -62,7 +62,12 @@ public class MediaStatusNotifyMessageHandler extends SIPRequestProcessorParent i
if (NotifyType.equals("121")){
logger.info("媒体播放完毕,通知关流");
String channelId =getText(rootElement, "DeviceID");
redisCatchStorage.stopPlayback(device.getDeviceId(), channelId, null, callIdHeader.getCallId());
// redisCatchStorage.stopPlayback(device.getDeviceId(), channelId, null, callIdHeader.getCallId());
// redisCatchStorage.stopDownload(device.getDeviceId(), channelId, null, callIdHeader.getCallId());
StreamInfo streamInfo = redisCatchStorage.queryDownload(device.getDeviceId(), channelId, null, callIdHeader.getCallId());
// 设置进度100%
streamInfo.setProgress(1);
redisCatchStorage.startDownload(streamInfo, callIdHeader.getCallId());
cmder.streamByeCmd(device.getDeviceId(), channelId, null, callIdHeader.getCallId());
// TODO 如果级联播放,需要给上级发送此通知

View File

@@ -107,6 +107,7 @@ public class CatalogQueryMessageHandler extends SIPRequestProcessorParent implem
DeviceChannel deviceChannel = storager.queryChannel(channelReduce.getDeviceId(), channelReduce.getChannelId());
deviceChannel.setParental(0);
deviceChannel.setParentId(channelReduce.getCatalogId());
deviceChannel.setCivilCode(parentPlatform.getDeviceGBId().substring(0, 6));
cmderFroPlatform.catalogQuery(deviceChannel, parentPlatform, sn, fromHeader.getTag(), size);
// 防止发送过快
Thread.sleep(100);

View File

@@ -0,0 +1,139 @@
package com.genersoft.iot.vmp.media.zlm;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import okhttp3.*;
import okhttp3.logging.HttpLoggingInterceptor;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.net.ConnectException;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
@Component
public class AssistRESTfulUtils {
private final static Logger logger = LoggerFactory.getLogger(AssistRESTfulUtils.class);
public interface RequestCallback{
void run(JSONObject response);
}
private OkHttpClient getClient(){
OkHttpClient.Builder httpClientBuilder = new OkHttpClient.Builder();
if (logger.isDebugEnabled()) {
HttpLoggingInterceptor logging = new HttpLoggingInterceptor(message -> {
logger.debug("http请求参数" + message);
});
logging.setLevel(HttpLoggingInterceptor.Level.BASIC);
// OkHttp進行添加攔截器loggingInterceptor
httpClientBuilder.addInterceptor(logging);
}
return httpClientBuilder.build();
}
public JSONObject sendGet(MediaServerItem mediaServerItem, String api, Map<String, Object> param, RequestCallback callback) {
OkHttpClient client = getClient();
if (mediaServerItem == null) {
return null;
}
if (StringUtils.isEmpty(mediaServerItem.getRecordAssistPort())) {
logger.warn("未启用Assist服务");
return null;
}
StringBuffer stringBuffer = new StringBuffer();
stringBuffer.append(String.format("http://%s:%s/%s", mediaServerItem.getIp(), mediaServerItem.getRecordAssistPort(), api));
JSONObject responseJSON = null;
if (param != null && param.keySet().size() > 0) {
stringBuffer.append("?");
int index = 1;
for (String key : param.keySet()){
if (param.get(key) != null) {
stringBuffer.append(key + "=" + param.get(key));
if (index < param.size()) {
stringBuffer.append("&");
}
}
index++;
}
}
String url = stringBuffer.toString();
Request request = new Request.Builder()
.get()
.url(url)
.build();
if (callback == null) {
try {
Response response = client.newCall(request).execute();
if (response.isSuccessful()) {
ResponseBody responseBody = response.body();
if (responseBody != null) {
String responseStr = responseBody.string();
responseJSON = JSON.parseObject(responseStr);
}
}else {
response.close();
Objects.requireNonNull(response.body()).close();
}
} catch (ConnectException e) {
logger.error(String.format("连接Assist失败: %s, %s", e.getCause().getMessage(), e.getMessage()));
logger.info("请检查media配置并确认Assist已启动...");
}catch (IOException e) {
logger.error(String.format("[ %s ]请求失败: %s", url, e.getMessage()));
}
}else {
client.newCall(request).enqueue(new Callback(){
@Override
public void onResponse(@NotNull Call call, @NotNull Response response){
if (response.isSuccessful()) {
try {
String responseStr = Objects.requireNonNull(response.body()).string();
callback.run(JSON.parseObject(responseStr));
} catch (IOException e) {
logger.error(String.format("[ %s ]请求失败: %s", url, e.getMessage()));
}
}else {
response.close();
Objects.requireNonNull(response.body()).close();
}
}
@Override
public void onFailure(@NotNull Call call, @NotNull IOException e) {
logger.error(String.format("连接Assist失败: %s, %s", e.getCause().getMessage(), e.getMessage()));
logger.info("请检查media配置并确认Assist已启动...");
}
});
}
return responseJSON;
}
public JSONObject fileDuration(MediaServerItem mediaServerItem, String app, String stream, RequestCallback callback){
Map<String, Object> param = new HashMap<>();
param.put("app",app);
param.put("stream",stream);
param.put("recordIng",true);
return sendGet(mediaServerItem, "api/record/file/duration",param, callback);
}
}

View File

@@ -215,7 +215,16 @@ public class ZLMHttpHookListener {
if (deviceChannel != null) {
ret.put("enable_audio", deviceChannel.isHasAudio());
}
// 如果是录像下载就设置视频间隔十秒
if (ssrcTransactionForAll.get(0).getType() == VideoStreamSessionManager.SessionType.download) {
ret.put("mp4_max_second", 10);
ret.put("enable_mp4", true);
ret.put("enable_audio", true);
}
}
return new ResponseEntity<String>(ret.toString(), HttpStatus.OK);
}
@@ -324,7 +333,6 @@ public class ZLMHttpHookListener {
if (mediaInfo != null) {
subscribe.response(mediaInfo, json);
}
}
// 流消失移除redis play
String app = item.getApp();
@@ -441,6 +449,7 @@ public class ZLMHttpHookListener {
if ("rtp".equals(app)){
ret.put("close", true);
StreamInfo streamInfoForPlayCatch = redisCatchStorage.queryPlayByStreamId(streamId);
SsrcTransaction ssrcTransaction = sessionManager.getSsrcTransaction(null, null, null, streamId);
if (streamInfoForPlayCatch != null) {
// 如果在给上级推流,也不停止。
if (redisCatchStorage.isChannelSendingRTP(streamInfoForPlayCatch.getChannelId())) {

View File

@@ -31,6 +31,7 @@ public class ZLMHttpHookSubscribe {
on_server_keepalive
}
@FunctionalInterface
public interface Event{
void response(MediaServerItem mediaServerItem, JSONObject response);
}

View File

@@ -1,6 +1,7 @@
package com.genersoft.iot.vmp.service;
import com.alibaba.fastjson.JSONObject;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.InviteStreamCallback;
import com.genersoft.iot.vmp.gb28181.bean.InviteStreamInfo;
@@ -34,4 +35,9 @@ public interface IPlayService {
DeferredResult<ResponseEntity<String>> playBack(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo,String deviceId, String channelId, String startTime, String endTime, InviteStreamCallback infoCallBack, PlayBackCallback hookCallBack);
void zlmServerOffline(String mediaServerId);
DeferredResult<ResponseEntity<String>> download(String deviceId, String channelId, String startTime, String endTime, int downloadSpeed, InviteStreamCallback infoCallBack, PlayBackCallback hookCallBack);
DeferredResult<ResponseEntity<String>> download(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo,String deviceId, String channelId, String startTime, String endTime, int downloadSpeed, InviteStreamCallback infoCallBack, PlayBackCallback hookCallBack);
StreamInfo getDownLoadInfo(String deviceId, String channelId, String stream);
}

View File

@@ -12,6 +12,8 @@ import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder;
import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform;
import com.genersoft.iot.vmp.gb28181.utils.DateUtil;
import com.genersoft.iot.vmp.media.zlm.AssistRESTfulUtils;
import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe;
import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
@@ -28,7 +30,6 @@ import com.genersoft.iot.vmp.vmanager.gb28181.play.bean.PlayResult;
import com.genersoft.iot.vmp.service.IMediaService;
import com.genersoft.iot.vmp.service.IPlayService;
import gov.nist.javax.sip.stack.SIPDialog;
import jdk.nashorn.internal.ir.RuntimeNode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -38,10 +39,8 @@ import org.springframework.stereotype.Service;
import org.springframework.util.ResourceUtils;
import org.springframework.web.context.request.async.DeferredResult;
import javax.sip.header.CallIdHeader;
import javax.sip.header.Header;
import javax.sip.message.Request;
import java.io.FileNotFoundException;
import java.math.BigDecimal;
import java.util.*;
@SuppressWarnings(value = {"rawtypes", "unchecked"})
@@ -71,6 +70,9 @@ public class PlayServiceImpl implements IPlayService {
@Autowired
private ZLMRESTfulUtils zlmresTfulUtils;
@Autowired
private AssistRESTfulUtils assistRESTfulUtils;
@Autowired
private IMediaService mediaService;
@@ -344,7 +346,7 @@ public class PlayServiceImpl implements IPlayService {
return result;
}
resultHolder.put(DeferredResultHolder.CALLBACK_CMD_PLAY + deviceId + channelId, uuid, result);
resultHolder.put(DeferredResultHolder.CALLBACK_CMD_PLAYBACK + deviceId + channelId, uuid, result);
RequestMessage msg = new RequestMessage();
msg.setId(uuid);
msg.setKey(key);
@@ -405,6 +407,136 @@ public class PlayServiceImpl implements IPlayService {
return result;
}
@Override
public DeferredResult<ResponseEntity<String>> download(String deviceId, String channelId, String startTime, String endTime, int downloadSpeed, InviteStreamCallback infoCallBack, PlayBackCallback hookCallBack) {
Device device = storager.queryVideoDevice(deviceId);
if (device == null) return null;
MediaServerItem newMediaServerItem = getNewMediaServerItem(device);
SSRCInfo ssrcInfo = mediaServerService.openRTPServer(newMediaServerItem, null, true);
return download(newMediaServerItem, ssrcInfo, deviceId, channelId, startTime, endTime, downloadSpeed,infoCallBack, hookCallBack);
}
@Override
public DeferredResult<ResponseEntity<String>> download(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, String deviceId, String channelId, String startTime, String endTime, int downloadSpeed, InviteStreamCallback infoCallBack, PlayBackCallback hookCallBack) {
if (mediaServerItem == null || ssrcInfo == null) return null;
String uuid = UUID.randomUUID().toString();
String key = DeferredResultHolder.CALLBACK_CMD_DOWNLOAD + deviceId + channelId;
DeferredResult<ResponseEntity<String>> result = new DeferredResult<>(30000L);
Device device = storager.queryVideoDevice(deviceId);
if (device == null) {
result.setResult(new ResponseEntity<>(HttpStatus.BAD_REQUEST));
return result;
}
resultHolder.put(key, uuid, result);
RequestMessage msg = new RequestMessage();
msg.setId(uuid);
msg.setKey(key);
WVPResult<StreamInfo> wvpResult = new WVPResult<>();
msg.setData(wvpResult);
PlayBackResult<RequestMessage> downloadResult = new PlayBackResult<>();
downloadResult.setData(msg);
Timer timer = new Timer();
timer.schedule(new TimerTask() {
@Override
public void run() {
logger.warn(String.format("录像下载请求超时deviceId%s channelId%s", deviceId, channelId));
wvpResult.setCode(-1);
wvpResult.setMsg("录像下载请求超时");
downloadResult.setCode(-1);
hookCallBack.call(downloadResult);
SIPDialog dialog = streamSession.getDialogByStream(deviceId, channelId, ssrcInfo.getStream());
// 点播超时回复BYE 同时释放ssrc以及此次点播的资源
if (dialog != null) {
// 点播超时回复BYE 同时释放ssrc以及此次点播的资源
cmder.streamByeCmd(device.getDeviceId(), channelId, ssrcInfo.getStream(), null);
}else {
mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
mediaServerService.closeRTPServer(deviceId, channelId, ssrcInfo.getStream());
streamSession.remove(deviceId, channelId, ssrcInfo.getStream());
}
cmder.streamByeCmd(device.getDeviceId(), channelId, ssrcInfo.getStream(), null);
// 回复之前所有的点播请求
hookCallBack.call(downloadResult);
}
}, userSetup.getPlayTimeout());
cmder.downloadStreamCmd(mediaServerItem, ssrcInfo, device, channelId, startTime, endTime, downloadSpeed, infoCallBack,
inviteStreamInfo -> {
logger.info("收到订阅消息: " + inviteStreamInfo.getResponse().toJSONString());
timer.cancel();
StreamInfo streamInfo = onPublishHandler(inviteStreamInfo.getMediaServerItem(), inviteStreamInfo.getResponse(), deviceId, channelId);
streamInfo.setStartTime(startTime);
streamInfo.setEndTime(endTime);
if (streamInfo == null) {
logger.warn("录像下载API调用失败");
wvpResult.setCode(-1);
wvpResult.setMsg("录像下载API调用失败");
downloadResult.setCode(-1);
hookCallBack.call(downloadResult);
return ;
}
redisCatchStorage.startDownload(streamInfo, inviteStreamInfo.getCallId());
wvpResult.setCode(0);
wvpResult.setMsg("success");
wvpResult.setData(streamInfo);
downloadResult.setCode(0);
downloadResult.setMediaServerItem(inviteStreamInfo.getMediaServerItem());
downloadResult.setResponse(inviteStreamInfo.getResponse());
hookCallBack.call(downloadResult);
}, event -> {
timer.cancel();
downloadResult.setCode(-1);
wvpResult.setCode(-1);
wvpResult.setMsg(String.format("录像下载失败, 错误码: %s, %s", event.statusCode, event.msg));
downloadResult.setEvent(event);
hookCallBack.call(downloadResult);
streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());
});
return result;
}
@Override
public StreamInfo getDownLoadInfo(String deviceId, String channelId, String stream) {
StreamInfo streamInfo = redisCatchStorage.queryDownload(deviceId, channelId, stream, null);
if (streamInfo != null) {
if (streamInfo.getProgress() == 1) {
return streamInfo;
}
// 获取当前已下载时长
String mediaServerId = streamInfo.getMediaServerId();
MediaServerItem mediaServerItem = mediaServerService.getOne(mediaServerId);
if (mediaServerItem == null) {
logger.warn("查询录像信息时发现节点已离线");
return null;
}
if (mediaServerItem.getRecordAssistPort() != 0) {
JSONObject jsonObject = assistRESTfulUtils.fileDuration(mediaServerItem, streamInfo.getApp(), streamInfo.getStream(), null);
if (jsonObject != null && jsonObject.getInteger("code") == 0) {
long duration = jsonObject.getLong("data");
if (duration == 0) {
streamInfo.setProgress(0);
}else {
String startTime = streamInfo.getStartTime();
String endTime = streamInfo.getEndTime();
long start = DateUtil.yyyy_MM_dd_HH_mm_ssToTimestamp(startTime);
long end = DateUtil.yyyy_MM_dd_HH_mm_ssToTimestamp(endTime);
BigDecimal currentCount = new BigDecimal(duration/1000);
BigDecimal totalCount = new BigDecimal(end-start);
BigDecimal divide = currentCount.divide(totalCount,2, BigDecimal.ROUND_HALF_UP);
double process = divide.doubleValue();
streamInfo.setProgress(process);
}
}
}
}
return streamInfo;
}
@Override
public void onPublishHandlerForDownload(InviteStreamInfo inviteStreamInfo, String deviceId, String channelId, String uuid) {
RequestMessage msg = new RequestMessage();

View File

@@ -91,7 +91,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
MediaServerItem mediaInfo;
WVPResult<StreamInfo> wvpResult = new WVPResult<>();
wvpResult.setCode(0);
if ("auto".equals(param.getMediaServerId())){
if (param.getMediaServerId() == null || "auto".equals(param.getMediaServerId())){
mediaInfo = mediaServerService.getMediaServerForMinimumLoad();
}else {
mediaInfo = mediaServerService.getOne(param.getMediaServerId());

View File

@@ -169,6 +169,8 @@ public interface IRedisCatchStorage {
StreamInfo queryDownload(String deviceId, String channelId, String stream, String callId);
boolean stopDownload(String deviceId, String channelId, String stream, String callId);
/**
* 查找第三方系统留下的国标预设值
* @param queryKey

View File

@@ -166,8 +166,42 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage {
@Override
public boolean startDownload(StreamInfo stream, String callId) {
return redis.set(String.format("%S_%s_%s_%s_%s_%s", VideoManagerConstants.DOWNLOAD_PREFIX,
userSetup.getServerId(), stream.getDeviceID(), stream.getChannelId(), stream.getStream(), callId), stream);
boolean result;
if (stream.getProgress() == 1) {
result = redis.set(String.format("%S_%s_%s_%s_%s_%s", VideoManagerConstants.DOWNLOAD_PREFIX,
userSetup.getServerId(), stream.getDeviceID(), stream.getChannelId(), stream.getStream(), callId), stream);
}else {
result = redis.set(String.format("%S_%s_%s_%s_%s_%s", VideoManagerConstants.DOWNLOAD_PREFIX,
userSetup.getServerId(), stream.getDeviceID(), stream.getChannelId(), stream.getStream(), callId), stream, 60*60);
}
return result;
}
@Override
public boolean stopDownload(String deviceId, String channelId, String stream, String callId) {
DeviceChannel deviceChannel = deviceChannelMapper.queryChannel(deviceId, channelId);
if (deviceChannel != null) {
deviceChannel.setStreamId(null);
deviceChannel.setDeviceId(deviceId);
deviceChannelMapper.update(deviceChannel);
}
if (deviceId == null) deviceId = "*";
if (channelId == null) channelId = "*";
if (stream == null) stream = "*";
if (callId == null) callId = "*";
String key = String.format("%S_%s_%s_%s_%s_%s", VideoManagerConstants.DOWNLOAD_PREFIX,
userSetup.getServerId(),
deviceId,
channelId,
stream,
callId
);
List<Object> scan = redis.scan(key);
if (scan.size() > 0) {
for (Object keyObj : scan) {
redis.del((String) keyObj);
}
}
return true;
}
@Override

View File

@@ -1,150 +0,0 @@
package com.genersoft.iot.vmp.vmanager.gb28181.playback;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.gb28181.bean.InviteStreamInfo;
import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder;
import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.service.IMediaServerService;
import com.genersoft.iot.vmp.service.bean.SSRCInfo;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.service.IPlayService;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiImplicitParam;
import io.swagger.annotations.ApiImplicitParams;
import io.swagger.annotations.ApiOperation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.CrossOrigin;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import com.alibaba.fastjson.JSONObject;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander;
import com.genersoft.iot.vmp.storager.IVideoManagerStorager;
import org.springframework.web.context.request.async.DeferredResult;
import javax.sip.message.Response;
import java.util.UUID;
@Api(tags = "历史媒体下载")
@CrossOrigin
@RestController
@RequestMapping("/api/download")
public class DownloadController {
private final static Logger logger = LoggerFactory.getLogger(DownloadController.class);
@Autowired
private SIPCommander cmder;
@Autowired
private IVideoManagerStorager storager;
@Autowired
private IRedisCatchStorage redisCatchStorage;
// @Autowired
// private ZLMRESTfulUtils zlmresTfulUtils;
@Autowired
private IPlayService playService;
@Autowired
private DeferredResultHolder resultHolder;
@Autowired
private IMediaServerService mediaServerService;
@ApiOperation("开始历史媒体下载")
@ApiImplicitParams({
@ApiImplicitParam(name = "deviceId", value = "设备ID", dataTypeClass = String.class),
@ApiImplicitParam(name = "channelId", value = "通道ID", dataTypeClass = String.class),
@ApiImplicitParam(name = "startTime", value = "开始时间", dataTypeClass = String.class),
@ApiImplicitParam(name = "endTime", value = "结束时间", dataTypeClass = String.class),
@ApiImplicitParam(name = "downloadSpeed", value = "下载倍速", dataTypeClass = String.class),
})
@GetMapping("/start/{deviceId}/{channelId}")
public DeferredResult<ResponseEntity<String>> play(@PathVariable String deviceId, @PathVariable String channelId,
String startTime, String endTime, String downloadSpeed) {
if (logger.isDebugEnabled()) {
logger.debug(String.format("历史媒体下载 API调用deviceId%schannelId%sdownloadSpeed%s", deviceId, channelId, downloadSpeed));
}
String key = DeferredResultHolder.CALLBACK_CMD_DOWNLOAD + deviceId + channelId;
String uuid = UUID.randomUUID().toString();
DeferredResult<ResponseEntity<String>> result = new DeferredResult<ResponseEntity<String>>(30000L);
// 超时处理
result.onTimeout(()->{
logger.warn(String.format("设备下载响应超时deviceId%s channelId%s", deviceId, channelId));
RequestMessage msg = new RequestMessage();
msg.setId(uuid);
msg.setKey(key);
msg.setData("Timeout");
resultHolder.invokeAllResult(msg);
});
if(resultHolder.exist(key, null)) {
return result;
}
resultHolder.put(key, uuid, result);
Device device = storager.queryVideoDevice(deviceId);
MediaServerItem newMediaServerItem = playService.getNewMediaServerItem(device);
if (newMediaServerItem == null) {
logger.warn(String.format("设备下载响应超时deviceId%s channelId%s", deviceId, channelId));
RequestMessage msg = new RequestMessage();
msg.setId(uuid);
msg.setKey(key);
msg.setData("Timeout");
resultHolder.invokeAllResult(msg);
return result;
}
SSRCInfo ssrcInfo = mediaServerService.openRTPServer(newMediaServerItem, null, true);
cmder.downloadStreamCmd(newMediaServerItem, ssrcInfo, device, channelId, startTime, endTime, downloadSpeed, (InviteStreamInfo inviteStreamInfo) -> {
logger.info("收到订阅消息: " + inviteStreamInfo.getResponse().toJSONString());
playService.onPublishHandlerForDownload(inviteStreamInfo, deviceId, channelId, uuid);
}, event -> {
RequestMessage msg = new RequestMessage();
msg.setId(uuid);
msg.setKey(key);
msg.setData(String.format("回放失败, 错误码: %s, %s", event.statusCode, event.msg));
resultHolder.invokeAllResult(msg);
});
return result;
}
@ApiOperation("停止历史媒体下载")
@ApiImplicitParams({
@ApiImplicitParam(name = "deviceId", value = "设备ID", dataTypeClass = String.class),
@ApiImplicitParam(name = "channelId", value = "通道ID", dataTypeClass = String.class),
@ApiImplicitParam(name = "stream", value = "流ID", dataTypeClass = String.class),
})
@GetMapping("/stop/{deviceId}/{channelId}/{stream}")
public ResponseEntity<String> playStop(@PathVariable String deviceId, @PathVariable String channelId, @PathVariable String stream) {
cmder.streamByeCmd(deviceId, channelId, stream, null);
if (logger.isDebugEnabled()) {
logger.debug(String.format("设备历史媒体下载停止 API调用deviceId/channelId%s_%s", deviceId, channelId));
}
if (deviceId != null && channelId != null) {
JSONObject json = new JSONObject();
json.put("deviceId", deviceId);
json.put("channelId", channelId);
return new ResponseEntity<String>(json.toString(), HttpStatus.OK);
} else {
logger.warn("设备历史媒体下载停止API调用失败");
return new ResponseEntity<String>(HttpStatus.INTERNAL_SERVER_ERROR);
}
}
}

View File

@@ -1,6 +1,13 @@
package com.genersoft.iot.vmp.vmanager.gb28181.record;
import com.alibaba.fastjson.JSONObject;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.gb28181.bean.InviteStreamInfo;
import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.service.IMediaServerService;
import com.genersoft.iot.vmp.service.IPlayService;
import com.genersoft.iot.vmp.service.bean.SSRCInfo;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiImplicitParam;
import io.swagger.annotations.ApiImplicitParams;
@@ -8,6 +15,7 @@ import io.swagger.annotations.ApiOperation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.CrossOrigin;
import org.springframework.web.bind.annotation.GetMapping;
@@ -41,6 +49,12 @@ public class GBRecordController {
@Autowired
private DeferredResultHolder resultHolder;
@Autowired
private IPlayService playService;
@Autowired
private IMediaServerService mediaServerService;
@ApiOperation("录像查询")
@ApiImplicitParams({
@ApiImplicitParam(name = "deviceId", value = "设备ID", dataTypeClass = String.class),
@@ -77,4 +91,111 @@ public class GBRecordController {
});
return result;
}
@ApiOperation("开始历史媒体下载")
@ApiImplicitParams({
@ApiImplicitParam(name = "deviceId", value = "设备ID", dataTypeClass = String.class),
@ApiImplicitParam(name = "channelId", value = "通道ID", dataTypeClass = String.class),
@ApiImplicitParam(name = "startTime", value = "开始时间", dataTypeClass = String.class),
@ApiImplicitParam(name = "endTime", value = "结束时间", dataTypeClass = String.class),
@ApiImplicitParam(name = "downloadSpeed", value = "下载倍速", dataTypeClass = String.class),
})
@GetMapping("/download/start/{deviceId}/{channelId}")
public DeferredResult<ResponseEntity<String>> download(@PathVariable String deviceId, @PathVariable String channelId,
String startTime, String endTime, String downloadSpeed) {
if (logger.isDebugEnabled()) {
logger.debug(String.format("历史媒体下载 API调用deviceId%schannelId%sdownloadSpeed%s", deviceId, channelId, downloadSpeed));
}
// String key = DeferredResultHolder.CALLBACK_CMD_DOWNLOAD + deviceId + channelId;
// String uuid = UUID.randomUUID().toString();
// DeferredResult<ResponseEntity<String>> result = new DeferredResult<ResponseEntity<String>>(30000L);
// // 超时处理
// result.onTimeout(()->{
// logger.warn(String.format("设备下载响应超时deviceId%s channelId%s", deviceId, channelId));
// RequestMessage msg = new RequestMessage();
// msg.setId(uuid);
// msg.setKey(key);
// msg.setData("Timeout");
// resultHolder.invokeAllResult(msg);
// });
// if(resultHolder.exist(key, null)) {
// return result;
// }
// resultHolder.put(key, uuid, result);
// Device device = storager.queryVideoDevice(deviceId);
//
// MediaServerItem newMediaServerItem = playService.getNewMediaServerItem(device);
// if (newMediaServerItem == null) {
// logger.warn(String.format("设备下载响应超时deviceId%s channelId%s", deviceId, channelId));
// RequestMessage msg = new RequestMessage();
// msg.setId(uuid);
// msg.setKey(key);
// msg.setData("Timeout");
// resultHolder.invokeAllResult(msg);
// return result;
// }
//
// SSRCInfo ssrcInfo = mediaServerService.openRTPServer(newMediaServerItem, null, true);
//
// cmder.downloadStreamCmd(newMediaServerItem, ssrcInfo, device, channelId, startTime, endTime, downloadSpeed, (InviteStreamInfo inviteStreamInfo) -> {
// logger.info("收到订阅消息: " + inviteStreamInfo.getResponse().toJSONString());
// playService.onPublishHandlerForDownload(inviteStreamInfo, deviceId, channelId, uuid);
// }, event -> {
// RequestMessage msg = new RequestMessage();
// msg.setId(uuid);
// msg.setKey(key);
// msg.setData(String.format("回放失败, 错误码: %s, %s", event.statusCode, event.msg));
// resultHolder.invokeAllResult(msg);
// });
if (logger.isDebugEnabled()) {
logger.debug(String.format("设备回放 API调用deviceId%s channelId%s", deviceId, channelId));
}
DeferredResult<ResponseEntity<String>> result = playService.download(deviceId, channelId, startTime, endTime, Integer.parseInt(downloadSpeed), null, hookCallBack->{
resultHolder.invokeResult(hookCallBack.getData());
});
return result;
}
@ApiOperation("停止历史媒体下载")
@ApiImplicitParams({
@ApiImplicitParam(name = "deviceId", value = "设备ID", dataTypeClass = String.class),
@ApiImplicitParam(name = "channelId", value = "通道ID", dataTypeClass = String.class),
@ApiImplicitParam(name = "stream", value = "流ID", dataTypeClass = String.class),
})
@GetMapping("/download/stop/{deviceId}/{channelId}/{stream}")
public ResponseEntity<String> playStop(@PathVariable String deviceId, @PathVariable String channelId, @PathVariable String stream) {
cmder.streamByeCmd(deviceId, channelId, stream, null);
if (logger.isDebugEnabled()) {
logger.debug(String.format("设备历史媒体下载停止 API调用deviceId/channelId%s_%s", deviceId, channelId));
}
if (deviceId != null && channelId != null) {
JSONObject json = new JSONObject();
json.put("deviceId", deviceId);
json.put("channelId", channelId);
return new ResponseEntity<String>(json.toString(), HttpStatus.OK);
} else {
logger.warn("设备历史媒体下载停止API调用失败");
return new ResponseEntity<String>(HttpStatus.INTERNAL_SERVER_ERROR);
}
}
@ApiOperation("获取历史媒体下载进度")
@ApiImplicitParams({
@ApiImplicitParam(name = "deviceId", value = "设备ID", dataTypeClass = String.class),
@ApiImplicitParam(name = "channelId", value = "通道ID", dataTypeClass = String.class),
@ApiImplicitParam(name = "stream", value = "流ID", dataTypeClass = String.class),
})
@GetMapping("/download/progress/{deviceId}/{channelId}/{stream}")
public ResponseEntity<StreamInfo> getProgress(@PathVariable String deviceId, @PathVariable String channelId, @PathVariable String stream) {
StreamInfo streamInfo = playService.getDownLoadInfo(deviceId, channelId, stream);
return new ResponseEntity<>(streamInfo, HttpStatus.OK);
}
}