重构INVITE消息处理中的上级点播请求

This commit is contained in:
648540858
2024-08-14 17:19:19 +08:00
parent d5621f9489
commit cce1d1de40
28 changed files with 489 additions and 406 deletions

View File

@@ -8,6 +8,7 @@ import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.media.service.IMediaServerService;
import com.genersoft.iot.vmp.streamProxy.bean.StreamProxy;
import com.genersoft.iot.vmp.streamProxy.bean.StreamProxyParam;
import com.genersoft.iot.vmp.streamProxy.service.IStreamProxyPlayService;
import com.genersoft.iot.vmp.streamProxy.service.IStreamProxyService;
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
import com.genersoft.iot.vmp.vmanager.bean.StreamContent;
@@ -40,6 +41,9 @@ public class StreamProxyController {
@Autowired
private IStreamProxyService streamProxyService;
@Autowired
private IStreamProxyPlayService streamProxyPlayService;
@Operation(summary = "分页查询流代理", security = @SecurityRequirement(name = JwtUtils.HEADER))
@Parameter(name = "page", description = "当前页")
@@ -181,7 +185,7 @@ public class StreamProxyController {
@Parameter(name = "id", description = "代理Id", required = true)
public StreamContent start(int id){
log.info("播放代理: {}", id);
StreamInfo streamInfo = streamProxyService.start(id);
StreamInfo streamInfo = streamProxyPlayService.start(id);
if (streamInfo == null) {
throw new ControllerException(ErrorCode.ERROR100.getCode(), ErrorCode.ERROR100.getMsg());
}else {
@@ -195,6 +199,6 @@ public class StreamProxyController {
@Parameter(name = "id", description = "代理Id", required = true)
public void stop(int id){
log.info("停用代理: {}", id);
streamProxyService.stop(id);
streamProxyPlayService.stop(id);
}
}

View File

@@ -0,0 +1,15 @@
package com.genersoft.iot.vmp.streamProxy.service;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.streamProxy.bean.StreamProxy;
public interface IStreamProxyPlayService {
StreamInfo start(int id);
StreamInfo startProxy(StreamProxy streamProxy);
void stop(int id);
void stopProxy(StreamProxy streamProxy);
}

View File

@@ -92,8 +92,4 @@ public interface IStreamProxyService {
StreamProxy getStreamProxy(int id);
void delete(int id);
StreamInfo start(int id);
void stop(int id);
}

View File

@@ -0,0 +1,100 @@
package com.genersoft.iot.vmp.streamProxy.service.impl;
import com.baomidou.dynamic.datasource.annotation.DS;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.conf.exception.ControllerException;
import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.media.service.IMediaServerService;
import com.genersoft.iot.vmp.streamProxy.bean.StreamProxy;
import com.genersoft.iot.vmp.streamProxy.dao.StreamProxyMapper;
import com.genersoft.iot.vmp.streamProxy.service.IStreamProxyPlayService;
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.util.ObjectUtils;
/**
* 视频代理业务
*/
@Slf4j
@Service
@DS("master")
public class StreamProxyPlayServiceImpl implements IStreamProxyPlayService {
@Autowired
private StreamProxyMapper streamProxyMapper;
@Autowired
private IMediaServerService mediaServerService;
@Override
public StreamInfo start(int id) {
StreamProxy streamProxy = streamProxyMapper.select(id);
if (streamProxy == null) {
throw new ControllerException(ErrorCode.ERROR404.getCode(), "代理信息未找到");
}
return startProxy(streamProxy);
}
@Override
public StreamInfo startProxy(StreamProxy streamProxy){
if (!streamProxy.isEnable()) {
return null;
}
MediaServer mediaServer;
String mediaServerId = streamProxy.getMediaServerId();
if (mediaServerId == null) {
mediaServer = mediaServerService.getMediaServerForMinimumLoad(null);
}else {
mediaServer = mediaServerService.getOne(mediaServerId);
}
if (mediaServer == null) {
throw new ControllerException(ErrorCode.ERROR100.getCode(), "未找到可用的媒体节点");
}
StreamInfo streamInfo = mediaServerService.startProxy(mediaServer, streamProxy);
if (mediaServerId == null) {
streamProxy.setMediaServerId(mediaServer.getId());
streamProxyMapper.update(streamProxy);
}
return streamInfo;
}
@Override
public void stop(int id) {
StreamProxy streamProxy = streamProxyMapper.select(id);
if (streamProxy == null) {
throw new ControllerException(ErrorCode.ERROR404.getCode(), "代理信息未找到");
}
stopProxy(streamProxy);
}
@Override
public void stopProxy(StreamProxy streamProxy){
MediaServer mediaServer;
String mediaServerId = streamProxy.getMediaServerId();
if (mediaServerId == null) {
mediaServer = mediaServerService.getMediaServerForMinimumLoad(null);
}else {
mediaServer = mediaServerService.getOne(mediaServerId);
}
if (mediaServer == null) {
throw new ControllerException(ErrorCode.ERROR100.getCode(), "未找到可用的媒体节点");
}
if (ObjectUtils.isEmpty(streamProxy.getStreamKey())) {
mediaServerService.closeStreams(mediaServer, streamProxy.getApp(), streamProxy.getStream());
}else {
mediaServerService.stopProxy(mediaServer, streamProxy.getStreamKey());
}
streamProxy.setMediaServerId(mediaServer.getId());
streamProxy.setStreamKey(null);
streamProxy.setPulling(false);
streamProxyMapper.update(streamProxy);
}
}

View File

@@ -19,6 +19,7 @@ import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.streamProxy.bean.StreamProxy;
import com.genersoft.iot.vmp.streamProxy.bean.StreamProxyParam;
import com.genersoft.iot.vmp.streamProxy.dao.StreamProxyMapper;
import com.genersoft.iot.vmp.streamProxy.service.IStreamProxyPlayService;
import com.genersoft.iot.vmp.streamProxy.service.IStreamProxyService;
import com.genersoft.iot.vmp.utils.DateUtil;
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
@@ -59,11 +60,14 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
private UserSetting userSetting;
@Autowired
private IGbChannelService gbChannelService;
private IStreamProxyPlayService playService;
@Autowired
private IMediaServerService mediaServerService;
@Autowired
private IGbChannelService gbChannelService;
@Autowired
DataSourceTransactionManager dataSourceTransactionManager;
@@ -137,12 +141,12 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
// 兼容旧接口
StreamProxy streamProxyInDb = getStreamProxyByAppAndStream(param.getApp(), param.getStream());
if (streamProxyInDb != null && streamProxyInDb.getPulling()) {
stopProxy(streamProxyInDb);
playService.stopProxy(streamProxyInDb);
}
if (streamProxyInDb == null){
return add(param.buildStreamProxy());
}else {
stopProxy(streamProxyInDb);
playService.stopProxy(streamProxyInDb);
streamProxyMapper.delete(streamProxyInDb.getId());
return add(param.buildStreamProxy());
}
@@ -177,7 +181,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
private void delete(StreamProxy streamProxy) {
Assert.notNull(streamProxy, "代理不可为NULL");
if (streamProxy.getPulling() != null && streamProxy.getPulling()) {
stopProxy(streamProxy);
playService.stopProxy(streamProxy);
}
if(streamProxy.getGbId() > 0) {
gbChannelService.delete(streamProxy.getGbId());
@@ -219,8 +223,8 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
|| (streamProxyInDb.getMediaServerId() == null && streamProxy.getMediaServerId() != null)
) {
// 变化则重启代理
stopProxy(streamProxyInDb);
startProxy(streamProxy);
playService.stopProxy(streamProxyInDb);
playService.startProxy(streamProxy);
}
return true;
}
@@ -239,7 +243,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
if (streamProxy == null) {
throw new ControllerException(ErrorCode.ERROR404.getCode(), "代理信息未找到");
}
StreamInfo streamInfo = startProxy(streamProxy);
StreamInfo streamInfo = playService.startProxy(streamProxy);
return streamInfo != null;
}
@@ -249,31 +253,10 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
if (streamProxy == null) {
throw new ControllerException(ErrorCode.ERROR404.getCode(), "代理信息未找到");
}
stopProxy(streamProxy);
playService.stopProxy(streamProxy);
}
private void stopProxy(StreamProxy streamProxy){
MediaServer mediaServer;
String mediaServerId = streamProxy.getMediaServerId();
if (mediaServerId == null) {
mediaServer = mediaServerService.getMediaServerForMinimumLoad(null);
}else {
mediaServer = mediaServerService.getOne(mediaServerId);
}
if (mediaServer == null) {
throw new ControllerException(ErrorCode.ERROR100.getCode(), "未找到可用的媒体节点");
}
if (ObjectUtils.isEmpty(streamProxy.getStreamKey())) {
mediaServerService.closeStreams(mediaServer, streamProxy.getApp(), streamProxy.getStream());
}else {
mediaServerService.stopProxy(mediaServer, streamProxy.getStreamKey());
}
streamProxy.setMediaServerId(mediaServer.getId());
streamProxy.setStreamKey(null);
streamProxy.setPulling(false);
streamProxyMapper.update(streamProxy);
}
@Override
public Map<String, String> getFFmpegCMDs(MediaServer mediaServer) {
@@ -437,87 +420,9 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
return new ResourceBaseInfo(total, online);
}
@Override
public StreamInfo start(int id) {
StreamProxy streamProxy = streamProxyMapper.select(id);
if (streamProxy == null) {
throw new ControllerException(ErrorCode.ERROR404.getCode(), "代理信息未找到");
}
return startProxy(streamProxy);
}
private StreamInfo startProxy(StreamProxy streamProxy){
if (!streamProxy.isEnable()) {
return null;
}
MediaServer mediaServer;
String mediaServerId = streamProxy.getMediaServerId();
if (mediaServerId == null) {
mediaServer = mediaServerService.getMediaServerForMinimumLoad(null);
}else {
mediaServer = mediaServerService.getOne(mediaServerId);
}
if (mediaServer == null) {
throw new ControllerException(ErrorCode.ERROR100.getCode(), "未找到可用的媒体节点");
}
StreamInfo streamInfo = mediaServerService.startProxy(mediaServer, streamProxy);
if (mediaServerId == null) {
streamProxy.setMediaServerId(mediaServer.getId());
update(streamProxy);
}
return streamInfo;
}
@Override
public StreamProxy getStreamProxy(int id) {
return streamProxyMapper.select(id);
}
@Override
public void stop(int id) {
StreamProxy streamProxy = streamProxyMapper.select(id);
if (streamProxy == null) {
throw new ControllerException(ErrorCode.ERROR404.getCode(), "代理信息未找到");
}
stopProxy(streamProxy);
}
// @Scheduled(cron = "* 0/10 * * * ?")
// public void asyncCheckStreamProxyStatus() {
//
// List<MediaServer> all = mediaServerService.getAllOnline();
//
// if (CollectionUtils.isEmpty(all)){
// return;
// }
//
// Map<String, MediaServer> serverItemMap = all.stream().collect(Collectors.toMap(MediaServer::getId, Function.identity(), (m1, m2) -> m1));
//
// List<StreamProxy> list = streamProxyMapper.selectForEnable(true);
//
// if (CollectionUtils.isEmpty(list)){
// return;
// }
//
// for (StreamProxy streamProxyItem : list) {
//
// MediaServer mediaServerItem = serverItemMap.get(streamProxyItem.getMediaServerId());
//
// MediaInfo mediaInfo = mediaServerService.getMediaInfo(mediaServerItem, streamProxyItem.getApp(), streamProxyItem.getStream());
//
// if (mediaInfo == null){
// streamProxyItem.setStatus(false);
// } else {
// if (mediaInfo.getOnline() != null && mediaInfo.getOnline()) {
// streamProxyItem.setStatus(true);
// } else {
// streamProxyItem.setStatus(false);
// }
// }
//
// updateStreamProxy(streamProxyItem);
// }
// }
}