修复国标录像下载

This commit is contained in:
648540858
2024-09-10 16:50:54 +08:00
parent 7cc4c9d14a
commit 3140672e63
13 changed files with 163 additions and 112 deletions

View File

@@ -0,0 +1,12 @@
package com.genersoft.iot.vmp.gb28181.bean;
import com.genersoft.iot.vmp.media.event.hook.HookData;
import com.genersoft.iot.vmp.service.bean.SSRCInfo;
import lombok.Data;
@Data
public class OpenRTPServerResult {
private SSRCInfo ssrcInfo;
private HookData hookData;
}

View File

@@ -31,7 +31,6 @@ public interface IPlayService {
void zlmServerOffline(String mediaServerId);
void download(Device device, DeviceChannel channel, String startTime, String endTime, int downloadSpeed, ErrorCallback<StreamInfo> callback);
void download(MediaServer mediaServerItem, SSRCInfo ssrcInfo, Device device, DeviceChannel channel, String startTime, String endTime, int downloadSpeed, ErrorCallback<StreamInfo> callback);
StreamInfo getDownLoadInfo(Device device, DeviceChannel channel, String stream);

View File

@@ -355,10 +355,11 @@ public class PlayServiceImpl implements IPlayService {
rtpServerParam.setTcpMode(tcpMode);
rtpServerParam.setOnlyAuto(false);
rtpServerParam.setDisableAudio(!channel.isHasAudio());
SSRCInfo ssrcInfo = receiveRtpServerService.openRTPServer(rtpServerParam, (code, msg, hookData) -> {
if (code == InviteErrorCode.SUCCESS.getCode()) {
SSRCInfo ssrcInfo = receiveRtpServerService.openRTPServer(rtpServerParam, (code, msg, result) -> {
if (code == InviteErrorCode.SUCCESS.getCode() && result != null && result.getHookData() != null) {
// hook响应
StreamInfo streamInfo = onPublishHandlerForPlay(hookData.getMediaServer(), hookData.getMediaInfo(), device, channel);
StreamInfo streamInfo = onPublishHandlerForPlay(result.getHookData().getMediaServer(), result.getHookData().getMediaInfo(), device, channel);
if (streamInfo == null){
if (callback != null) {
callback.run(InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getCode(),
@@ -379,7 +380,7 @@ public class PlayServiceImpl implements IPlayService {
log.info("[点播成功] deviceId: {}, channelId:{}, 码流类型:{}", device.getDeviceId(), channel.getDeviceId(),
channel.getStreamIdentification());
snapOnPlay(hookData.getMediaServer(), device.getDeviceId(), channel.getDeviceId(), streamId);
snapOnPlay(result.getHookData().getMediaServer(), device.getDeviceId(), channel.getDeviceId(), streamId);
}else {
if (callback != null) {
callback.run(code, msg, null);
@@ -401,7 +402,6 @@ public class PlayServiceImpl implements IPlayService {
if (ssrcInfo == null || ssrcInfo.getPort() <= 0) {
log.info("[点播端口/SSRC]获取失败deviceId={},channelId={},ssrcInfo={}", device.getDeviceId(), channel.getDeviceId(), ssrcInfo);
callback.run(InviteErrorCode.ERROR_FOR_RESOURCE_EXHAUSTION.getCode(), "获取端口或者ssrc失败", null);
sessionManager.removeByStream(streamId);
inviteStreamService.call(InviteSessionType.PLAY, channel.getId(), null,
InviteErrorCode.ERROR_FOR_RESOURCE_EXHAUSTION.getCode(),
InviteErrorCode.ERROR_FOR_RESOURCE_EXHAUSTION.getMsg(),
@@ -743,10 +743,10 @@ public class PlayServiceImpl implements IPlayService {
rtpServerParam.setTcpMode(tcpMode);
rtpServerParam.setOnlyAuto(false);
rtpServerParam.setDisableAudio(!channel.isHasAudio());
SSRCInfo ssrcInfo = receiveRtpServerService.openRTPServer(rtpServerParam, (code, msg, hookData) -> {
if (code == InviteErrorCode.SUCCESS.getCode()) {
SSRCInfo ssrcInfo = receiveRtpServerService.openRTPServer(rtpServerParam, (code, msg, result) -> {
if (code == InviteErrorCode.SUCCESS.getCode() && result != null && result.getHookData() != null) {
// hook响应
StreamInfo streamInfo = onPublishHandlerForPlayback(hookData.getMediaServer(), hookData.getMediaInfo(), device, channel, startTime, endTime);
StreamInfo streamInfo = onPublishHandlerForPlayback(result.getHookData().getMediaServer(), result.getHookData().getMediaInfo(), device, channel, startTime, endTime);
if (streamInfo == null) {
log.warn("设备回放API调用失败");
callback.run(InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getCode(),
@@ -778,7 +778,6 @@ public class PlayServiceImpl implements IPlayService {
if (callback != null) {
callback.run(InviteErrorCode.ERROR_FOR_RESOURCE_EXHAUSTION.getCode(), "获取端口或者ssrc失败", null);
}
sessionManager.removeByStream(stream);
inviteStreamService.call(InviteSessionType.PLAY, channel.getId(), null,
InviteErrorCode.ERROR_FOR_RESOURCE_EXHAUSTION.getCode(),
InviteErrorCode.ERROR_FOR_RESOURCE_EXHAUSTION.getMsg(),
@@ -924,70 +923,92 @@ public class PlayServiceImpl implements IPlayService {
null);
return;
}
int tcpMode = device.getStreamMode().equals("TCP-ACTIVE")? 2: (device.getStreamMode().equals("TCP-PASSIVE")? 1:0);
// 录像下载不使用固定流地址,固定流地址会导致如果开始时间与结束时间一致时文件错误的叠加在一起
SSRCInfo ssrcInfo = mediaServerService.openRTPServer(newMediaServerItem, null, null, device.isSsrcCheck(), true, 0, false,!channel.isHasAudio(), false, tcpMode);
download(newMediaServerItem, ssrcInfo, device, channel, startTime, endTime, downloadSpeed, callback);
download(newMediaServerItem, device, channel, startTime, endTime, downloadSpeed, callback);
}
@Override
public void download(MediaServer mediaServerItem, SSRCInfo ssrcInfo, Device device, DeviceChannel channel, String startTime, String endTime, int downloadSpeed, ErrorCallback<StreamInfo> callback) {
if (mediaServerItem == null || ssrcInfo == null) {
private void download(MediaServer mediaServerItem, Device device, DeviceChannel channel, String startTime, String endTime, int downloadSpeed, ErrorCallback<StreamInfo> callback) {
if (mediaServerItem == null ) {
callback.run(InviteErrorCode.ERROR_FOR_PARAMETER_ERROR.getCode(),
InviteErrorCode.ERROR_FOR_PARAMETER_ERROR.getMsg(),
null);
return;
}
log.info("[录像下载] deviceId: {}, channelId: {}, 下载速度:{}, 收流端口:{}, 收流模式:{}, SSRC: {}, SSRC校验{}", device.getDeviceId(), channel.getDeviceId(), downloadSpeed, ssrcInfo.getPort(), device.getStreamMode(), ssrcInfo.getSsrc(), device.isSsrcCheck());
int tcpMode = device.getStreamMode().equals("TCP-ACTIVE")? 2: (device.getStreamMode().equals("TCP-PASSIVE")? 1:0);
// 录像下载不使用固定流地址,固定流地址会导致如果开始时间与结束时间一致时文件错误的叠加在一起
RTPServerParam rtpServerParam = new RTPServerParam();
rtpServerParam.setMediaServerItem(mediaServerItem);
rtpServerParam.setSsrcCheck(device.isSsrcCheck());
rtpServerParam.setPlayback(true);
rtpServerParam.setPort(0);
rtpServerParam.setTcpMode(tcpMode);
rtpServerParam.setOnlyAuto(false);
rtpServerParam.setDisableAudio(!channel.isHasAudio());
SSRCInfo ssrcInfo = receiveRtpServerService.openRTPServer(rtpServerParam, (code, msg, result) -> {
if (code == InviteErrorCode.SUCCESS.getCode() && result != null && result.getHookData() != null) {
// hook响应
StreamInfo streamInfo = onPublishHandlerForDownload(mediaServerItem, result.getHookData().getMediaInfo(), device, channel, startTime, endTime);
if (streamInfo == null) {
log.warn("[录像下载] 获取流地址信息失败");
callback.run(InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getCode(),
InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getMsg(), null);
return;
}
callback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), streamInfo);
log.info("[录像下载] 调用成功 deviceId: {}, channelId: {}, 开始时间: {}, 结束时间: {}", device.getDeviceId(), channel, startTime, endTime);
}else {
if (callback != null) {
callback.run(code, msg, null);
}
inviteStreamService.call(InviteSessionType.DOWNLOAD, channel.getId(), null, code, msg, null);
inviteStreamService.removeInviteInfoByDeviceAndChannel(InviteSessionType.DOWNLOAD, channel.getId());
if (result != null && result.getSsrcInfo() != null) {
SsrcTransaction ssrcTransaction = sessionManager.getSsrcTransactionByStream(result.getSsrcInfo().getStream());
if (ssrcTransaction != null) {
try {
cmder.streamByeCmd(device, channel.getDeviceId(), ssrcTransaction.getStream(), null);
} catch (InvalidArgumentException | ParseException | SipException | SsrcTransactionNotFoundException e) {
log.error("[录像下载] 发送BYE失败 {}", e.getMessage());
} finally {
sessionManager.removeByStream(ssrcTransaction.getStream());
}
}
}
}
});
if (ssrcInfo == null || ssrcInfo.getPort() <= 0) {
log.info("[录像下载端口/SSRC]获取失败deviceId={},channelId={},ssrcInfo={}", device.getDeviceId(), channel.getDeviceId(), ssrcInfo);
if (callback != null) {
callback.run(InviteErrorCode.ERROR_FOR_RESOURCE_EXHAUSTION.getCode(), "获取端口或者ssrc失败", null);
}
inviteStreamService.call(InviteSessionType.PLAY, channel.getId(), null,
InviteErrorCode.ERROR_FOR_RESOURCE_EXHAUSTION.getCode(),
InviteErrorCode.ERROR_FOR_RESOURCE_EXHAUSTION.getMsg(),
null);
return;
}
log.info("[录像下载] deviceId: {}, channelId: {}, 下载速度:{}, 收流端口:{}, 收流模式:{}, SSRC: {}({}), SSRC校验{}",
device.getDeviceId(), channel.getDeviceId(), downloadSpeed, ssrcInfo.getPort(), device.getStreamMode(),
ssrcInfo.getSsrc(), String.format("%08x", Long.parseLong(ssrcInfo.getSsrc())).toUpperCase(),
device.isSsrcCheck());
// 初始化redis中的invite消息状态
InviteInfo inviteInfo = InviteInfo.getInviteInfo(device.getDeviceId(), channel.getId(), ssrcInfo.getStream(), ssrcInfo,
mediaServerItem.getSdpIp(), ssrcInfo.getPort(), device.getStreamMode(), InviteSessionType.DOWNLOAD,
InviteSessionStatus.ready);
inviteStreamService.updateInviteInfo(inviteInfo);
String downLoadTimeOutTaskKey = UUID.randomUUID().toString();
dynamicTask.startDelay(downLoadTimeOutTaskKey, () -> {
log.warn(String.format("录像下载请求超时deviceId%s channelId%s", device, channel));
inviteStreamService.removeInviteInfo(inviteInfo);
callback.run(InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getCode(),
InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getMsg(), null);
// 点播超时回复BYE 同时释放ssrc以及此次点播的资源
try {
cmder.streamByeCmd(device, channel.getDeviceId(), ssrcInfo.getStream(), null);
} catch (InvalidArgumentException | ParseException | SipException e) {
log.error("[录像流]录像下载请求超时, 发送BYE失败 {}", e.getMessage());
} catch (SsrcTransactionNotFoundException e) {
mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream());
sessionManager.removeByStream(ssrcInfo.getStream());
}
}, userSetting.getPlayTimeout());
SipSubscribe.Event errorEvent = event -> {
dynamicTask.stop(downLoadTimeOutTaskKey);
callback.run(InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getCode(),
String.format("录像下载失败, 错误码: %s, %s", event.statusCode, event.msg), null);
mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
sessionManager.removeByStream(ssrcInfo.getStream());
inviteStreamService.removeInviteInfo(inviteInfo);
};
HookSubscribe.Event hookEvent = (hookData) -> {
log.info("[录像下载]收到订阅消息: " + hookData);
dynamicTask.stop(downLoadTimeOutTaskKey);
StreamInfo streamInfo = onPublishHandlerForDownload(hookData.getMediaServer(), hookData.getMediaInfo(), device, channel, startTime, endTime);
if (streamInfo == null) {
log.warn("[录像下载] 获取流地址信息失败");
callback.run(InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getCode(),
InviteErrorCode.ERROR_FOR_STREAM_PARSING_EXCEPTIONS.getMsg(), null);
return;
}
callback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), streamInfo);
log.info("[录像下载] 调用成功 deviceId: {}, channelId: {}, 开始时间: {}, 结束时间: {}", device.getDeviceId(), channel, startTime, endTime);
};
try {
cmder.downloadStreamCmd(mediaServerItem, ssrcInfo, device, channel, startTime, endTime, downloadSpeed,
errorEvent, eventResult ->{
eventResult -> {
// 对方返回错误
callback.run(InviteErrorCode.FAIL.getCode(), String.format("录像下载失败, 错误码: %s, %s", eventResult.statusCode, eventResult.msg), null);
receiveRtpServerService.closeRTPServer(mediaServerItem, ssrcInfo);
sessionManager.removeByStream(ssrcInfo.getStream());
inviteStreamService.removeInviteInfo(inviteInfo);
}, eventResult ->{
// 处理收到200ok后的TCP主动连接以及SSRC不一致的问题
InviteOKHandler(eventResult, ssrcInfo, mediaServerItem, device, channel,
callback, inviteInfo, InviteSessionType.DOWNLOAD);
@@ -1002,9 +1023,11 @@ public class PlayServiceImpl implements IPlayService {
DownloadFileInfo downloadFileInfo = CloudRecordUtils.getDownloadFilePath(mediaServerItem, filePath);
InviteInfo inviteInfoForNew = inviteStreamService.getInviteInfo(inviteInfo.getType()
, inviteInfo.getChannelId(), inviteInfo.getStream());
inviteInfoForNew.getStreamInfo().setDownLoadFilePath(downloadFileInfo);
// 不可以马上移除会导致后续接口拿不到下载地址
inviteStreamService.updateInviteInfo(inviteInfoForNew, 60*15L);
if (inviteInfoForNew != null && inviteInfoForNew.getStreamInfo() != null) {
inviteInfoForNew.getStreamInfo().setDownLoadFilePath(downloadFileInfo);
// 不可以马上移除会导致后续接口拿不到下载地址
inviteStreamService.updateInviteInfo(inviteInfoForNew, 60*15L);
}
};
Hook hook = Hook.getInstance(HookType.on_record_mp4, "rtp", ssrcInfo.getStream(), mediaServerItem.getId());
// 设置过期时间,下载失败时自动处理订阅数据
@@ -1013,12 +1036,10 @@ public class PlayServiceImpl implements IPlayService {
});
} catch (InvalidArgumentException | SipException | ParseException e) {
log.error("[命令发送失败] 录像下载: {}", e.getMessage());
SipSubscribe.EventResult eventResult = new SipSubscribe.EventResult();
eventResult.type = SipSubscribe.EventResultType.cmdSendFailEvent;
eventResult.statusCode = -1;
eventResult.msg = "命令发送失败";
errorEvent.response(eventResult);
callback.run(InviteErrorCode.FAIL.getCode(),e.getMessage(), null);
receiveRtpServerService.closeRTPServer(mediaServerItem, ssrcInfo);
sessionManager.removeByStream(ssrcInfo.getStream());
inviteStreamService.removeInviteInfo(inviteInfo);
}
}

View File

@@ -456,7 +456,7 @@ public class SIPCommander implements ISIPCommander {
String startTime, String endTime, int downloadSpeed,
SipSubscribe.Event errorEvent, SipSubscribe.Event okEvent) throws InvalidArgumentException, SipException, ParseException {
log.info("{} 分配的ZLM为: {} [{}:{}]", ssrcInfo.getStream(), mediaServerItem.getId(), mediaServerItem.getSdpIp(), ssrcInfo.getPort());
log.info("[发送-请求历史媒体下载-命令] 流ID {},节点为: {} [{}:{}]", ssrcInfo.getStream(), mediaServerItem.getId(), mediaServerItem.getSdpIp(), ssrcInfo.getPort());
String sdpIp;
if (!ObjectUtils.isEmpty(device.getSdpIp())) {
sdpIp = device.getSdpIp();
@@ -524,27 +524,8 @@ public class SIPCommander implements ISIPCommander {
content.append("y=" + ssrcInfo.getSsrc() + "\r\n");//ssrc
log.debug("此时请求下载信令的ssrc===>{}",ssrcInfo.getSsrc());
Hook rtpHook = Hook.getInstance(HookType.on_media_arrival, "rtp", ssrcInfo.getStream(), mediaServerItem.getId());
// 添加订阅
CallIdHeader newCallIdHeader = sipSender.getNewCallIdHeader(sipLayer.getLocalIp(device.getLocalIp()), device.getTransport());
String callId= newCallIdHeader.getCallId();
subscribe.addSubscribe(rtpHook, (hookData) -> {
log.debug("sipc 添加订阅===callId {}",callId);
subscribe.removeSubscribe(rtpHook);
// 添加流注销的订阅注销了后向设备发送bye
Hook departureHook = Hook.getInstance(HookType.on_media_departure, "rtp", ssrcInfo.getStream(), mediaServerItem.getId());
subscribe.addSubscribe(departureHook,
(departureHookData) -> {
log.info("[录像]下载结束, 发送BYE");
try {
streamByeCmd(device, channel.getDeviceId(), ssrcInfo.getStream(), callId);
} catch (InvalidArgumentException | ParseException | SipException |
SsrcTransactionNotFoundException e) {
log.error("[录像]下载结束, 发送BYE失败 {}", e.getMessage());
}
});
});
Request request = headerProvider.createPlaybackInviteRequest(device, channel.getDeviceId(), content.toString(), SipUtils.getNewViaTag(), SipUtils.getNewFromTag(), null,newCallIdHeader, ssrcInfo.getSsrc());
sipSender.transmitRequest(sipLayer.getLocalIp(device.getLocalIp()), request, errorEvent, event -> {
@@ -653,9 +634,6 @@ public class SIPCommander implements ISIPCommander {
}
log.info("[发送BYE] 设备: device: {}, channel: {}, callId: {}", device.getDeviceId(), channelId, ssrcTransaction.getCallId());
mediaServerService.releaseSsrc(ssrcTransaction.getMediaServerId(), ssrcTransaction.getSsrc());
mediaServerService.closeRTPServer(ssrcTransaction.getMediaServerId(), ssrcTransaction.getStream());
sessionManager.removeByCallId(ssrcTransaction.getCallId());
Request byteRequest = headerProvider.createByteRequest(device, channelId, ssrcTransaction.getSipTransactionInfo());
sipSender.transmitRequest(sipLayer.getLocalIp(device.getLocalIp()), byteRequest, null, okEvent);

View File

@@ -85,7 +85,6 @@ public class AlarmNotifyMessageHandler extends SIPRequestProcessorParent impleme
@Override
public void handForDevice(RequestEvent evt, Device device, Element rootElement) {
log.info("[收到报警通知]设备:{}", device.getDeviceId());
boolean isEmpty = taskQueue.isEmpty();
taskQueue.offer(new SipMsgInfo(evt, device, rootElement));
// 回复200 OK
@@ -96,7 +95,9 @@ public class AlarmNotifyMessageHandler extends SIPRequestProcessorParent impleme
}
if (isEmpty) {
taskExecutor.execute(() -> {
log.info("[处理报警通知]待处理数量:{}", taskQueue.size() );
if (log.isDebugEnabled()) {
log.info("[处理报警通知]待处理数量:{}", taskQueue.size() );
}
while (!taskQueue.isEmpty()) {
try {
SipMsgInfo sipMsgInfo = taskQueue.poll();
@@ -161,7 +162,9 @@ public class AlarmNotifyMessageHandler extends SIPRequestProcessorParent impleme
deviceAlarm.setAlarmType(getText(sipMsgInfo.getRootElement().element("Info"), "AlarmType"));
}
}
log.info("[收到报警通知]内容:{}", JSON.toJSONString(deviceAlarm));
if (log.isDebugEnabled()) {
log.debug("[收到报警通知]设备:{} 内容:{}", device.getDeviceId(), JSON.toJSONString(deviceAlarm));
}
// 作者自用判断其他小伙伴需要此消息可以自行修改但是不要提在pr里
if (DeviceAlarmMethod.Other.getVal() == Integer.parseInt(deviceAlarm.getAlarmMethod())) {
// 发送给平台的报警信息。 发送redis通知