临时提交

This commit is contained in:
648540858
2024-06-28 18:06:09 +08:00
parent 019af90f6e
commit b630190e30
13 changed files with 249 additions and 396 deletions

View File

@@ -4,6 +4,7 @@ import com.genersoft.iot.vmp.gb28181.bean.CommonGBChannel;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data;
import lombok.EqualsAndHashCode;
import org.springframework.util.ObjectUtils;
/**
* @author lin
@@ -60,4 +61,21 @@ public class StreamProxy extends CommonGBChannel {
@Schema(description = "拉流代理时zlm返回的key用于停止拉流代理")
private String streamKey;
@Schema(description = "更新时间")
private String updateTime;
@Schema(description = "创建时间")
private String createTime;
public CommonGBChannel getCommonGBChannel() {
if (ObjectUtils.isEmpty(this.getGbDeviceId())) {
return null;
}
if (ObjectUtils.isEmpty(this.getGbName())) {
this.setGbName( app+ "-" +stream);
}
this.setStreamProxyId(this.getId());
return this;
}
}

View File

@@ -6,14 +6,12 @@ import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.exception.ControllerException;
import com.genersoft.iot.vmp.conf.security.JwtUtils;
import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder;
import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage;
import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.streamProxy.bean.StreamProxy;
import com.genersoft.iot.vmp.media.service.IMediaServerService;
import com.genersoft.iot.vmp.streamProxy.bean.StreamProxy;
import com.genersoft.iot.vmp.streamProxy.service.IStreamProxyService;
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
import com.genersoft.iot.vmp.vmanager.bean.StreamContent;
import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
import com.github.pagehelper.PageInfo;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
@@ -25,10 +23,8 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.util.ObjectUtils;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.context.request.async.DeferredResult;
import java.util.Map;
import java.util.UUID;
@SuppressWarnings("rawtypes")
/**
@@ -85,7 +81,7 @@ public class StreamProxyController {
})
@PostMapping(value = "/save")
@ResponseBody
public DeferredResult<Object> save(@RequestBody StreamProxy param){
public StreamContent save(@RequestBody StreamProxy param){
logger.info("添加代理: " + JSONObject.toJSONString(param));
if (ObjectUtils.isEmpty(param.getMediaServerId())) {
param.setMediaServerId("auto");
@@ -93,43 +89,25 @@ public class StreamProxyController {
if (ObjectUtils.isEmpty(param.getType())) {
param.setType("default");
}
if (ObjectUtils.isEmpty(param.getRtpType())) {
param.setRtpType("1");
}
if (ObjectUtils.isEmpty(param.getGbId())) {
param.setGbId(null);
param.setGbDeviceId(null);
}
StreamProxy streamProxyItem = streamProxyService.getStreamProxyByAppAndStream(param.getApp(), param.getStream());
if (streamProxyItem != null) {
streamProxyService.del(param.getApp(), param.getStream());
}
RequestMessage requestMessage = new RequestMessage();
String key = DeferredResultHolder.CALLBACK_CMD_PROXY + param.getApp() + param.getStream();
requestMessage.setKey(key);
String uuid = UUID.randomUUID().toString();
requestMessage.setId(uuid);
DeferredResult<Object> result = new DeferredResult<>(userSetting.getPlayTimeout().longValue());
// 录像查询以channelId作为deviceId查询
resultHolder.put(key, uuid, result);
result.onTimeout(()->{
WVPResult<StreamInfo> wvpResult = new WVPResult<>();
wvpResult.setCode(ErrorCode.ERROR100.getCode());
wvpResult.setMsg("超时");
requestMessage.setData(wvpResult);
resultHolder.invokeAllResult(requestMessage);
});
streamProxyService.save(param, (code, msg, streamInfo) -> {
logger.info("[拉流代理] {}", code == ErrorCode.SUCCESS.getCode()? "成功":"失败: " + msg);
if (code == ErrorCode.SUCCESS.getCode()) {
requestMessage.setData(new StreamContent(streamInfo));
StreamInfo streamInfo = streamProxyService.save(param);
if (param.isEnable()) {
if (streamInfo == null) {
throw new ControllerException(ErrorCode.ERROR100.getCode(), ErrorCode.ERROR100.getMsg());
}else {
requestMessage.setData(WVPResult.fail(code, msg));
return new StreamContent(streamInfo);
}
resultHolder.invokeAllResult(requestMessage);
});
return result;
}else {
return null;
}
}
@GetMapping(value = "/ffmpeg_cmd/list")
@@ -180,10 +158,6 @@ public class StreamProxyController {
@Parameter(name = "stream", description = "流id", required = true)
public void stop(String app, String stream){
logger.info("停用代理: " + app + "/" + stream);
boolean result = streamProxyService.stop(app, stream);
if (!result) {
logger.info("停用代理失败: " + app + "/" + stream);
throw new ControllerException(ErrorCode.ERROR100);
}
streamProxyService.stop(app, stream);
}
}

View File

@@ -87,4 +87,9 @@ public interface StreamProxyMapper {
@Select("select count(1) from wvp_stream_proxy where status = true")
int getOnline();
/**
* 查询设置了自动移除并且没有国标编号的代理
*/
List<StreamProxy> selectWithAutoRemoveAndWithoutGbDeviceIdByMediaServerId(String mediaServerId);
}

View File

@@ -1,6 +1,5 @@
package com.genersoft.iot.vmp.streamProxy.service;
import com.genersoft.iot.vmp.common.GeneralCallback;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.streamProxy.bean.StreamProxy;
@@ -16,7 +15,7 @@ public interface IStreamProxyService {
* 保存视频代理
* @param param
*/
void save(StreamProxy param, GeneralCallback<StreamInfo> callback);
StreamInfo save(StreamProxy param);
/**
* 添加视频代理到zlm
@@ -73,7 +72,7 @@ public interface IStreamProxyService {
* @param stream
* @return
*/
boolean stop(String app, String stream);
void stop(String app, String stream);
/**
* 获取ffmpeg.cmd模板

View File

@@ -8,6 +8,7 @@ import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.exception.ControllerException;
import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent;
import com.genersoft.iot.vmp.gb28181.service.IGbChannelService;
import com.genersoft.iot.vmp.media.bean.MediaInfo;
import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.media.event.hook.Hook;
@@ -16,6 +17,8 @@ import com.genersoft.iot.vmp.media.event.hook.HookType;
import com.genersoft.iot.vmp.media.event.media.MediaArrivalEvent;
import com.genersoft.iot.vmp.media.event.media.MediaDepartureEvent;
import com.genersoft.iot.vmp.media.event.media.MediaNotFoundEvent;
import com.genersoft.iot.vmp.media.event.mediaServer.MediaServerOfflineEvent;
import com.genersoft.iot.vmp.media.event.mediaServer.MediaServerOnlineEvent;
import com.genersoft.iot.vmp.media.service.IMediaServerService;
import com.genersoft.iot.vmp.streamProxy.bean.StreamProxy;
import com.genersoft.iot.vmp.service.IGbStreamService;
@@ -42,6 +45,7 @@ import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.CollectionUtils;
import org.springframework.util.ObjectUtils;
@@ -73,7 +77,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
private UserSetting userSetting;
@Autowired
private GbStreamMapper gbStreamMapper;
private IGbChannelService gbChannelService;
@Autowired
private PlatformGbStreamMapper platformGbStreamMapper;
@@ -119,7 +123,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
}
/**
* 流离开的处理
* 流未找到的处理
*/
@Async("taskExecutor")
@EventListener
@@ -134,9 +138,29 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
}
}
/**
* 流媒体节点上线
*/
@Async("taskExecutor")
@EventListener
@Transactional
public void onApplicationEvent(MediaServerOnlineEvent event) {
zlmServerOnline(event.getMediaServerId());
}
/**
* 流媒体节点离线
*/
@Async("taskExecutor")
@EventListener
@Transactional
public void onApplicationEvent(MediaServerOfflineEvent event) {
zlmServerOffline(event.getMediaServerId());
}
@Override
public void save(StreamProxy streamProxy, GeneralCallback<StreamInfo> callback) {
public StreamInfo save(StreamProxy streamProxy) {
MediaServer mediaServer;
if (ObjectUtils.isEmpty(streamProxy.getMediaServerId()) || "auto".equals(streamProxy.getMediaServerId())){
mediaServer = mediaServerService.getMediaServerForMinimumLoad(null);
@@ -157,136 +181,47 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
saveResult = addStreamProxy(streamProxy);
}
if (!saveResult) {
callback.run(ErrorCode.ERROR100.getCode(), "保存失败", null);
return;
throw new ControllerException(ErrorCode.ERROR100.getCode(), "保存失败");
}
if (streamProxy.isEnable()) {
StreamInfo streamInfo = mediaServerService.startProxy(mediaServer, streamProxy);
if (streamInfo != null) {
callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo);
}else {
callback.run(ErrorCode.ERROR100.getCode(), "记录已保存,启用失败", null);
}
//
//
// Hook hook = Hook.getInstance(HookType.on_media_arrival, streamProxy.getApp(), streamProxy.getStream(), mediaServer.getId());
// hookSubscribe.addSubscribe(hook, (hookData) -> {
// StreamInfo streamInfo = mediaServerService.getStreamInfoByAppAndStream(
// mediaServer, streamProxy.getApp(), streamProxy.getStream(), null, null);
// callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo);
// });
// String talkKey = UUID.randomUUID().toString();
// String delayTalkKey = UUID.randomUUID().toString();
// dynamicTask.startDelay(delayTalkKey, ()->{
// StreamInfo streamInfo = mediaServerService.getStreamInfoByAppAndStreamWithCheck(streamProxy.getApp(), streamProxy.getStream(), mediaServer.getId(), false);
// if (streamInfo != null) {
// callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo);
// }else {
// dynamicTask.stop(talkKey);
// callback.run(ErrorCode.ERROR100.getCode(), "超时", null);
// }
// }, 7000);
// WVPResult<String> result = addStreamProxyToZlm(streamProxy);
// if (result != null && result.getCode() == 0) {
// hookSubscribe.removeSubscribe(hook);
// dynamicTask.stop(talkKey);
// StreamInfo streamInfo = mediaServerService.getStreamInfoByAppAndStream(
// mediaServer, streamProxy.getApp(), streamProxy.getStream(), null, null);
// callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo);
// }else {
// streamProxy.setEnable(false);
// // 直接移除
// if (streamProxy.isEnableRemoveNoneReader()) {
// del(streamProxy.getApp(), streamProxy.getStream());
// }else {
// updateStreamProxy(streamProxy);
// }
// if (result == null){
// callback.run(ErrorCode.ERROR100.getCode(), "记录已保存,启用失败", null);
// }else {
// callback.run(ErrorCode.ERROR100.getCode(), result.getMsg(), null);
// }
// }
}else{
StreamInfo streamInfo = mediaServerService.getStreamInfoByAppAndStream(
mediaServer, streamProxy.getApp(), streamProxy.getStream(), null, null);
callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo);
return mediaServerService.startProxy(mediaServer, streamProxy);
}
return null;
}
/**
* 新增代理流
* @param streamProxyItem
* @return
*/
private boolean addStreamProxy(StreamProxy streamProxyItem) {
TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition);
boolean result = false;
streamProxyItem.setStreamType("proxy");
streamProxyItem.setStatus(true);
@Transactional
public boolean addStreamProxy(StreamProxy streamProxy) {
String now = DateUtil.getNow();
streamProxyItem.setCreateTime(now);
try {
if (streamProxyMapper.add(streamProxyItem) > 0) {
if (!ObjectUtils.isEmpty(streamProxyItem.getGbId())) {
if (gbStreamMapper.add(streamProxyItem) < 0) {
//事务回滚
dataSourceTransactionManager.rollback(transactionStatus);
return false;
}
}
}else {
//事务回滚
dataSourceTransactionManager.rollback(transactionStatus);
return false;
}
result = true;
dataSourceTransactionManager.commit(transactionStatus); //手动提交
}catch (Exception e) {
log.error("向数据库添加流代理失败:", e);
dataSourceTransactionManager.rollback(transactionStatus);
streamProxy.setCreateTime(now);
streamProxy.setUpdateTime(now);
if (streamProxyMapper.add(streamProxy) > 0 && !ObjectUtils.isEmpty(streamProxy.getGbDeviceId())) {
gbChannelService.add(streamProxy.getCommonGBChannel());
}
return result;
return true;
}
/**
* 更新代理流
* @param streamProxyItem
* @return
*/
@Override
public boolean updateStreamProxy(StreamProxy streamProxyItem) {
TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition);
boolean result = false;
streamProxyItem.setStreamType("proxy");
try {
if (streamProxyMapper.update(streamProxyItem) > 0) {
if (!ObjectUtils.isEmpty(streamProxyItem.getGbId())) {
if (gbStreamMapper.updateByAppAndStream(streamProxyItem) == 0) {
//事务回滚
dataSourceTransactionManager.rollback(transactionStatus);
return false;
}
}
} else {
//事务回滚
dataSourceTransactionManager.rollback(transactionStatus);
return false;
}
public boolean updateStreamProxy(StreamProxy streamProxy) {
streamProxy.setUpdateTime(DateUtil.getNow());
dataSourceTransactionManager.commit(transactionStatus); //手动提交
result = true;
}catch (Exception e) {
log.error("未处理的异常 ", e);
dataSourceTransactionManager.rollback(transactionStatus);
if (streamProxyMapper.update(streamProxy) > 0 && !ObjectUtils.isEmpty(streamProxy.getGbDeviceId())) {
if (streamProxy.getGbId() > 0) {
gbChannelService.update(streamProxy.getCommonGBChannel());
}else {
gbChannelService.add(streamProxy.getCommonGBChannel());
}
}
return result;
return true;
}
@Override
@@ -314,8 +249,8 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
param.getTimeoutMs(), param.isEnableAudio(), param.isEnableMp4(),
param.getFfmpegCmdKey());
}else {
result = mediaServerService.addStreamProxy(mediaServer, param.getApp(), param.getStream(), param.getUrl().trim(),
param.isEnableAudio(), param.isEnableMp4(), param.getRtpType());
result = mediaServerService.addStreamProxy(mediaServer, param.getApp(), param.getStream(), param.getSrcUrl().trim(),
param.isEnableAudio(), param.isEnableMp4(), param.getRtspType(), param.getTimeout());
}
if (result != null && result.getCode() == 0) {
String key = result.getData();
@@ -381,39 +316,42 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
@Override
public boolean start(String app, String stream) {
boolean result = false;
StreamProxy streamProxy = streamProxyMapper.selectOne(app, stream);
if (streamProxy != null && !streamProxy.isEnable() ) {
WVPResult<String> wvpResult = addStreamProxyToZlm(streamProxy);
if (wvpResult == null) {
return false;
}
if (wvpResult.getCode() == 0) {
result = true;
streamProxy.setEnable(true);
updateStreamProxy(streamProxy);
}else {
log.info("启用代理失败: {}/{}->{}({})", app, stream, wvpResult.getMsg(),
streamProxy.getSrcUrl() == null? streamProxy.getUrl():streamProxy.getSrcUrl());
}
} else if (streamProxy != null && streamProxy.isEnable()) {
return true ;
if (streamProxy == null) {
throw new ControllerException(ErrorCode.ERROR404.getCode(), "代理信息未找到");
}
return result;
MediaServer mediaServer;
if (ObjectUtils.isEmpty(streamProxy.getMediaServerId()) || "auto".equals(streamProxy.getMediaServerId())){
mediaServer = mediaServerService.getMediaServerForMinimumLoad(null);
}else {
mediaServer = mediaServerService.getOne(streamProxy.getMediaServerId());
}
if (mediaServer == null) {
log.warn("[启用代理] 未找到可用的媒体节点");
throw new ControllerException(ErrorCode.ERROR100.getCode(), "未找到可用的媒体节点");
}
StreamInfo streamInfo = mediaServerService.startProxy(mediaServer, streamProxy);
if (streamInfo == null) {
log.warn("[启用代理] 失败");
throw new ControllerException(ErrorCode.ERROR100.getCode(), "失败");
}
if (!streamProxy.isEnable()) {
updateStreamProxy(streamProxy);
}
return true;
}
@Override
public boolean stop(String app, String stream) {
boolean result = false;
StreamProxy streamProxyDto = streamProxyMapper.selectOne(app, stream);
if (streamProxyDto != null && streamProxyDto.isEnable()) {
Boolean removed = removeStreamProxyFromZlm(streamProxyDto);
if (removed != null && removed) {
streamProxyDto.setEnable(false);
result = updateStreamProxy(streamProxyDto);
}
public void stop(String app, String stream) {
StreamProxy streamProxy = streamProxyMapper.selectOne(app, stream);
if (streamProxy == null) {
throw new ControllerException(ErrorCode.ERROR404.getCode(), "代理信息未找到");
}
return result;
MediaServer mediaServer = mediaServerService.getOne(streamProxy.getMediaServerId());
if (mediaServer == null) {
throw new ControllerException(ErrorCode.ERROR100.getCode(), "未找到启用时使用的媒体节点");
}
mediaServerService.stopProxy(mediaServer, streamProxy.getStreamKey());
}
@Override
@@ -430,10 +368,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
@Override
public void zlmServerOnline(String mediaServerId) {
// 移除开启了无人观看自动移除的流
List<StreamProxy> streamProxyItemList = streamProxyMapper.selectAutoRemoveItemByMediaServerId(mediaServerId);
if (streamProxyItemList.size() > 0) {
gbStreamMapper.batchDel(streamProxyItemList);
}
List<StreamProxy> streamProxyItemList = streamProxyMapper.selectWithAutoRemoveAndWithoutGbDeviceIdByMediaServerId(mediaServerId);
streamProxyMapper.deleteAutoRemoveItemByMediaServerId(mediaServerId);
// 移除拉流代理生成的流信息
@@ -540,40 +475,40 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
}
@Scheduled(cron = "* 0/10 * * * ?")
public void asyncCheckStreamProxyStatus() {
List<MediaServer> all = mediaServerService.getAllOnline();
if (CollectionUtils.isEmpty(all)){
return;
}
Map<String, MediaServer> serverItemMap = all.stream().collect(Collectors.toMap(MediaServer::getId, Function.identity(), (m1, m2) -> m1));
List<StreamProxy> list = streamProxyMapper.selectForEnable(true);
if (CollectionUtils.isEmpty(list)){
return;
}
for (StreamProxy streamProxyItem : list) {
MediaServer mediaServerItem = serverItemMap.get(streamProxyItem.getMediaServerId());
MediaInfo mediaInfo = mediaServerService.getMediaInfo(mediaServerItem, streamProxyItem.getApp(), streamProxyItem.getStream());
if (mediaInfo == null){
streamProxyItem.setStatus(false);
} else {
if (mediaInfo.getOnline() != null && mediaInfo.getOnline()) {
streamProxyItem.setStatus(true);
} else {
streamProxyItem.setStatus(false);
}
}
updateStreamProxy(streamProxyItem);
}
}
// @Scheduled(cron = "* 0/10 * * * ?")
// public void asyncCheckStreamProxyStatus() {
//
// List<MediaServer> all = mediaServerService.getAllOnline();
//
// if (CollectionUtils.isEmpty(all)){
// return;
// }
//
// Map<String, MediaServer> serverItemMap = all.stream().collect(Collectors.toMap(MediaServer::getId, Function.identity(), (m1, m2) -> m1));
//
// List<StreamProxy> list = streamProxyMapper.selectForEnable(true);
//
// if (CollectionUtils.isEmpty(list)){
// return;
// }
//
// for (StreamProxy streamProxyItem : list) {
//
// MediaServer mediaServerItem = serverItemMap.get(streamProxyItem.getMediaServerId());
//
// MediaInfo mediaInfo = mediaServerService.getMediaInfo(mediaServerItem, streamProxyItem.getApp(), streamProxyItem.getStream());
//
// if (mediaInfo == null){
// streamProxyItem.setStatus(false);
// } else {
// if (mediaInfo.getOnline() != null && mediaInfo.getOnline()) {
// streamProxyItem.setStatus(true);
// } else {
// streamProxyItem.setStatus(false);
// }
// }
//
// updateStreamProxy(streamProxyItem);
// }
// }
}