优化拉流代理

This commit is contained in:
648540858
2024-07-19 17:54:14 +08:00
parent 78088ba53f
commit 9f4e66a38b
23 changed files with 607 additions and 351 deletions

View File

@@ -35,7 +35,7 @@ public class StreamProxy extends CommonGBChannel {
@Schema(description = "拉流地址")
private String srcUrl;
@Schema(description = "超时时间")
@Schema(description = "超时时间:秒")
private int timeout;
@Schema(description = "ffmpeg模板KEY")

View File

@@ -0,0 +1,71 @@
package com.genersoft.iot.vmp.streamProxy.bean;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data;
/**
* @author lin
*/
@Data
@Schema(description = "拉流代理的信息")
public class StreamProxyParam {
@Schema(description = "类型取值default 流媒体直接拉流默认ffmpeg ffmpeg实现拉流")
private String type;
@Schema(description = "应用名")
private String app;
@Schema(description = "流ID")
private String stream;
@Schema(description = "流媒体服务ID")
private String mediaServerId;
@Schema(description = "拉流地址")
private String url;
@Schema(description = "超时时间:秒")
private int timeoutMs;
@Schema(description = "ffmpeg模板KEY")
private String ffmpegCmdKey;
@Schema(description = "rtsp拉流时拉流方式0tcp1udp2组播")
private String rtpType;
@Schema(description = "是否启用")
private boolean enable;
@Schema(description = "是否启用音频")
private boolean enableAudio;
@Schema(description = "是否启用MP4")
private boolean enableMp4;
@Schema(description = "是否 无人观看时删除")
private boolean enableRemoveNoneReader;
@Schema(description = "是否 无人观看时自动停用")
private boolean enableDisableNoneReader;
public StreamProxy buildStreamProxy() {
StreamProxy streamProxy = new StreamProxy();
streamProxy.setApp(app);
streamProxy.setStream(stream);
streamProxy.setMediaServerId(mediaServerId);
streamProxy.setSrcUrl(url);
streamProxy.setTimeout(timeoutMs/1000);
streamProxy.setRtspType(rtpType);
streamProxy.setEnable(enable);
streamProxy.setEnableAudio(enableAudio);
streamProxy.setEnableMp4(enableMp4);
streamProxy.setEnableRemoveNoneReader(enableRemoveNoneReader);
streamProxy.setEnableDisableNoneReader(enableDisableNoneReader);
streamProxy.setFfmpegCmdKey(ffmpegCmdKey);
return streamProxy;
}
}

View File

@@ -7,6 +7,7 @@ import com.genersoft.iot.vmp.conf.security.JwtUtils;
import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.media.service.IMediaServerService;
import com.genersoft.iot.vmp.streamProxy.bean.StreamProxy;
import com.genersoft.iot.vmp.streamProxy.bean.StreamProxyParam;
import com.genersoft.iot.vmp.streamProxy.service.IStreamProxyService;
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
import com.genersoft.iot.vmp.vmanager.bean.StreamContent;
@@ -44,7 +45,6 @@ public class StreamProxyController {
@Parameter(name = "page", description = "当前页")
@Parameter(name = "count", description = "每页查询数量")
@Parameter(name = "query", description = "查询内容")
@Parameter(name = "query", description = "查询内容")
@Parameter(name = "pulling", description = "是否正在拉流")
@Parameter(name = "mediaServerId", description = "流媒体ID")
@GetMapping(value = "/list")
@@ -55,6 +55,12 @@ public class StreamProxyController {
@RequestParam(required = false)Boolean pulling,
@RequestParam(required = false)String mediaServerId){
if (ObjectUtils.isEmpty(mediaServerId)) {
mediaServerId = null;
}
if (ObjectUtils.isEmpty(query)) {
query = null;
}
return streamProxyService.getAll(page, count, query, pulling, mediaServerId);
}
@@ -73,7 +79,7 @@ public class StreamProxyController {
})
@PostMapping(value = "/save")
@ResponseBody
public StreamContent save(@RequestBody StreamProxy param){
public StreamContent save(@RequestBody StreamProxyParam param){
log.info("添加代理: " + JSONObject.toJSONString(param));
if (ObjectUtils.isEmpty(param.getMediaServerId())) {
param.setMediaServerId("auto");
@@ -81,13 +87,6 @@ public class StreamProxyController {
if (ObjectUtils.isEmpty(param.getType())) {
param.setType("default");
}
if (ObjectUtils.isEmpty(param.getGbId())) {
param.setGbDeviceId(null);
}
StreamProxy streamProxyItem = streamProxyService.getStreamProxyByAppAndStream(param.getApp(), param.getStream());
if (streamProxyItem != null) {
streamProxyService.del(param.getApp(), param.getStream());
}
StreamInfo streamInfo = streamProxyService.save(param);
if (param.isEnable()) {
@@ -107,10 +106,10 @@ public class StreamProxyController {
})
@PostMapping(value = "/add")
@ResponseBody
public StreamContent add(@RequestBody StreamProxy param){
public StreamProxy add(@RequestBody StreamProxy param){
log.info("添加代理: " + JSONObject.toJSONString(param));
if (ObjectUtils.isEmpty(param.getMediaServerId())) {
param.setMediaServerId("auto");
param.setMediaServerId(null);
}
if (ObjectUtils.isEmpty(param.getType())) {
param.setType("default");
@@ -118,22 +117,24 @@ public class StreamProxyController {
if (ObjectUtils.isEmpty(param.getGbId())) {
param.setGbDeviceId(null);
}
StreamProxy streamProxyItem = streamProxyService.getStreamProxyByAppAndStream(param.getApp(), param.getStream());
if (streamProxyItem != null) {
streamProxyService.del(param.getApp(), param.getStream());
}
streamProxyService.add(param);
return param;
}
StreamInfo streamInfo = streamProxyService.add(param);
if (param.isEnable()) {
if (streamInfo == null) {
throw new ControllerException(ErrorCode.ERROR100.getCode(), ErrorCode.ERROR100.getMsg());
}else {
return new StreamContent(streamInfo);
}
}else {
return null;
@Operation(summary = "更新代理", security = @SecurityRequirement(name = JwtUtils.HEADER), parameters = {
@Parameter(name = "param", description = "代理参数", required = true),
})
@PostMapping(value = "/update")
@ResponseBody
public void update(@RequestBody StreamProxy param){
log.info("更新代理: " + JSONObject.toJSONString(param));
if (param.getId() == 0) {
throw new ControllerException(ErrorCode.ERROR400.getCode(), "缺少代理信息的ID");
}
if (ObjectUtils.isEmpty(param.getGbId())) {
param.setGbDeviceId(null);
}
streamProxyService.update(param);
}
@GetMapping(value = "/ffmpeg_cmd/list")
@@ -160,18 +161,26 @@ public class StreamProxyController {
if (app == null || stream == null) {
throw new ControllerException(ErrorCode.ERROR400.getCode(), app == null ?"app不能为null":"stream不能为null");
}else {
streamProxyService.del(app, stream);
streamProxyService.delteByAppAndStream(app, stream);
}
}
@DeleteMapping(value = "/delte")
@ResponseBody
@Operation(summary = "移除代理", security = @SecurityRequirement(name = JwtUtils.HEADER))
@Parameter(name = "id", description = "代理ID", required = true)
public void delte(int id){
log.info("移除代理: " + id );
streamProxyService.delte(id);
}
@GetMapping(value = "/start")
@ResponseBody
@Operation(summary = "启用代理", security = @SecurityRequirement(name = JwtUtils.HEADER))
@Parameter(name = "app", description = "应用名", required = true)
@Parameter(name = "stream", description = "流id", required = true)
public void start(String app, String stream){
log.info("启用代理: " + app + "/" + stream);
boolean result = streamProxyService.start(app, stream);
@Parameter(name = "id", description = "代理Id", required = true)
public void start(int id){
log.info("启用代理: " + id);
boolean result = streamProxyService.start(id);
if (!result) {
throw new ControllerException(ErrorCode.ERROR100);
}
@@ -184,6 +193,6 @@ public class StreamProxyController {
@Parameter(name = "stream", description = "流id", required = true)
public void stop(String app, String stream){
log.info("停用代理: " + app + "/" + stream);
streamProxyService.stop(app, stream);
streamProxyService.stopByAppAndStream(app, stream);
}
}

View File

@@ -1,7 +1,7 @@
package com.genersoft.iot.vmp.streamProxy.dao;
import com.genersoft.iot.vmp.streamProxy.bean.StreamProxy;
import com.genersoft.iot.vmp.vmanager.bean.ResourceBaseInfo;
import com.genersoft.iot.vmp.streamProxy.dao.provider.StreamProxyProvider;
import org.apache.ibatis.annotations.*;
import org.springframework.stereotype.Repository;
@@ -11,10 +11,10 @@ import java.util.List;
@Repository
public interface StreamProxyMapper {
@Insert("INSERT INTO wvp_stream_proxy (type, name, app, stream,media_server_id, src_url, " +
@Insert("INSERT INTO wvp_stream_proxy (type, app, stream,media_server_id, src_url, " +
"timeout, ffmpeg_cmd_key, rtsp_type, enable_audio, enable_mp4, enable, pulling, stream_key, " +
"enable_remove_none_reader, enable_disable_none_reader, create_time) VALUES" +
"(#{type}, #{name}, #{app}, #{stream}, #{mediaServerId}, #{srcUrl}, " +
"(#{type}, #{app}, #{stream}, #{mediaServerId}, #{srcUrl}, " +
"#{timeout}, #{ffmpegCmdKey}, #{rtspType}, #{enableAudio}, #{enableMp4}, #{enable}, #{pulling}, #{streamKey}, " +
"#{enableRemoveNoneReader}, #{enableDisableNoneReader}, #{createTime} )")
@Options(useGeneratedKeys = true, keyProperty = "id", keyColumn = "id")
@@ -24,10 +24,8 @@ public interface StreamProxyMapper {
"SET type=#{type}, " +
"app=#{app}," +
"stream=#{stream}," +
"name=#{name}," +
"app=#{app}," +
"stream=#{stream}," +
"url=#{url}, " +
"media_server_id=#{mediaServerId}, " +
"src_url=#{srcUrl}," +
"timeout=#{timeout}, " +
@@ -46,45 +44,14 @@ public interface StreamProxyMapper {
@Delete("DELETE FROM wvp_stream_proxy WHERE app=#{app} AND stream=#{stream}")
int delByAppAndStream(String app, String stream);
@Select("SELECT " +
" st.*, " +
" st.id as stream_proxy_id, " +
" wdc.*, " +
" wdc.id as gb_id" +
" FROM wvp_stream_proxy st " +
" LEFT join wvp_device_channel wdc " +
" on st.id = wdc.stream_proxy_id " +
" WHERE " +
" 1=1 " +
" <if test='query != null'> AND (st.app LIKE concat('%',#{query},'%') OR st.stream LIKE concat('%',#{query},'%') " +
" OR wdc.gb_device_id LIKE concat('%',#{query},'%') OR wdc.gb_name LIKE concat('%',#{query},'%'))</if> " +
" <if test='pulling == true' > AND st.pulling=1</if>" +
" <if test='pulling == false' > AND st.pulling=0 </if>" +
" <if test='mediaServerId != null' > AND st.media_server_id=#{mediaServerId} </if>" +
"order by st.create_time desc")
List<StreamProxy> selectAll(@Param("query") String query, @Param("pushing") Boolean pushing, @Param("mediaServerId") String mediaServerId);
@SelectProvider(type = StreamProxyProvider.class, method = "selectAll")
List<StreamProxy> selectAll(@Param("query") String query, @Param("pulling") Boolean pulling, @Param("mediaServerId") String mediaServerId);
@Select("SELECT " +
" st.*, " +
" st.id as stream_proxy_id, " +
" wdc.*, " +
" wdc.id as gb_id" +
" FROM wvp_stream_proxy st " +
" LEFT join wvp_device_channel wdc " +
" on st.id = wdc.stream_proxy_id " +
" WHERE st.app=#{app} AND st.stream=#{stream} order by st.create_time desc")
@SelectProvider(type = StreamProxyProvider.class, method = "selectOneByAppAndStream")
StreamProxy selectOneByAppAndStream(@Param("app") String app, @Param("stream") String stream);
@Select("SELECT " +
" st.*, " +
" st.id as stream_proxy_id, " +
" wdc.*, " +
" wdc.id as gb_id" +
" FROM wvp_stream_proxy st " +
" LEFT join wvp_device_channel wdc " +
" on st.id = wdc.stream_proxy_id " +
"WHERE st.enable=#{enable} and st.media_server_id= #{id} order by st.create_time desc")
List<StreamProxy> selectForEnableInMediaServer(@Param("id") String id, @Param("enable") boolean enable);
@SelectProvider(type = StreamProxyProvider.class, method = "selectForEnableInMediaServer")
List<StreamProxy> selectForEnableInMediaServer(@Param("mediaServerId") String mediaServerId, @Param("enable") boolean enable);
@Select("select count(1) from wvp_stream_proxy")
@@ -114,4 +81,7 @@ public interface StreamProxyMapper {
"SET pulling=false " +
"WHERE id=#{id}")
int offline(@Param("id") int id);
@SelectProvider(type = StreamProxyProvider.class, method = "select")
StreamProxy select(@Param("id") int id);
}

View File

@@ -0,0 +1,63 @@
package com.genersoft.iot.vmp.streamProxy.dao.provider;
import java.util.Map;
public class StreamProxyProvider {
public String getBaseSelectSql(){
return "SELECT " +
" st.*, " +
" st.id as stream_proxy_id, " +
" wdc.*, " +
" wdc.id as gb_id" +
" FROM wvp_stream_proxy st " +
" LEFT join wvp_device_channel wdc " +
" on st.id = wdc.stream_proxy_id ";
}
public String select(Map<String, Object> params ){
return getBaseSelectSql() + " WHERE st.id = " + params.get("id");
}
public String selectForEnableInMediaServer(Map<String, Object> params ){
return getBaseSelectSql() + String.format(" WHERE st.enable=%s and st.media_server_id= %s order by st.create_time desc",
params.get("enable"), params.get("mediaServerId"));
}
public String selectOneByAppAndStream(Map<String, Object> params ){
return getBaseSelectSql() + String.format(" WHERE st.app=%s AND st.stream=%s order by st.create_time desc",
params.get("app"), params.get("stream"));
}
public String selectAll(Map<String, Object> params ){
StringBuilder sqlBuild = new StringBuilder();
sqlBuild.append(getBaseSelectSql());
sqlBuild.append(" WHERE 1=1 ");
if (params.get("query") != null) {
sqlBuild.append(" AND ")
.append(" (")
.append(" st.app LIKE ").append("'%").append(params.get("query")).append("%'")
.append(" OR")
.append(" st.stream LIKE ").append("'%").append(params.get("query")).append("%'")
.append(" OR")
.append(" wdc.gb_device_id LIKE ").append("'%").append(params.get("query")).append("%'")
.append(" OR")
.append(" wdc.gb_name LIKE ").append("'%").append(params.get("query")).append("%'")
.append(" )")
;
}
Object pulling = params.get("pulling");
if (pulling != null) {
if ((Boolean) pulling) {
sqlBuild.append(" AND st.pulling=1 ");
}else {
sqlBuild.append(" AND st.pulling=0 ");
}
}
if (params.get("mediaServerId") != null) {
sqlBuild.append(" AND st.media_server_id='").append(params.get("mediaServerId")).append("'");
}
sqlBuild.append(" order by st.create_time desc");
return sqlBuild.toString();
}
}

View File

@@ -3,6 +3,7 @@ package com.genersoft.iot.vmp.streamProxy.service;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.streamProxy.bean.StreamProxy;
import com.genersoft.iot.vmp.streamProxy.bean.StreamProxyParam;
import com.genersoft.iot.vmp.vmanager.bean.ResourceBaseInfo;
import com.github.pagehelper.PageInfo;
@@ -14,7 +15,7 @@ public interface IStreamProxyService {
* 保存视频代理
* @param param
*/
StreamInfo save(StreamProxy param);
StreamInfo save(StreamProxyParam param);
/**
* 分页查询
@@ -29,7 +30,7 @@ public interface IStreamProxyService {
* @param app
* @param stream
*/
void del(String app, String stream);
void delteByAppAndStream(String app, String stream);
/**
* 启用视频代理
@@ -37,17 +38,7 @@ public interface IStreamProxyService {
* @param stream
* @return
*/
boolean start(String app, String stream);
/**
* 更新状态
* @param status 状态
* @param app
* @param stream
*/
int updateStatusByAppAndStream(String app, String stream, boolean status);
boolean startByAppAndStream(String app, String stream);
/**
* 停用用视频代理
@@ -55,7 +46,7 @@ public interface IStreamProxyService {
* @param stream
* @return
*/
void stop(String app, String stream);
void stopByAppAndStream(String app, String stream);
/**
* 获取ffmpeg.cmd模板
@@ -88,7 +79,7 @@ public interface IStreamProxyService {
/**
* 更新代理流
*/
boolean updateStreamProxy(StreamProxy streamProxyItem);
boolean update(StreamProxy streamProxyItem);
/**
* 获取统计信息
@@ -97,4 +88,10 @@ public interface IStreamProxyService {
ResourceBaseInfo getOverview();
StreamInfo add(StreamProxy streamProxy);
StreamProxy getStreamProxy(int id);
void delte(int id);
boolean start(int id);
}

View File

@@ -17,6 +17,7 @@ import com.genersoft.iot.vmp.media.service.IMediaServerService;
import com.genersoft.iot.vmp.media.zlm.dto.hook.OriginType;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.streamProxy.bean.StreamProxy;
import com.genersoft.iot.vmp.streamProxy.bean.StreamProxyParam;
import com.genersoft.iot.vmp.streamProxy.dao.StreamProxyMapper;
import com.genersoft.iot.vmp.streamProxy.service.IStreamProxyService;
import com.genersoft.iot.vmp.utils.DateUtil;
@@ -72,10 +73,11 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
* 流到来的处理
*/
@Async("taskExecutor")
@Transactional
@org.springframework.context.event.EventListener
public void onApplicationEvent(MediaArrivalEvent event) {
if ("rtsp".equals(event.getSchema())) {
updateStatusByAppAndStream(event.getApp(), event.getStream(), true);
streamChangeHandler(event.getApp(), event.getStream(), event.getMediaServer().getId(), true);
}
}
@@ -84,9 +86,10 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
*/
@Async("taskExecutor")
@EventListener
@Transactional
public void onApplicationEvent(MediaDepartureEvent event) {
if ("rtsp".equals(event.getSchema())) {
updateStatusByAppAndStream(event.getApp(), event.getStream(), false);
streamChangeHandler(event.getApp(), event.getStream(), event.getMediaServer().getId(), false);
}
}
@@ -102,7 +105,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
// 拉流代理
StreamProxy streamProxyByAppAndStream = getStreamProxyByAppAndStream(event.getApp(), event.getStream());
if (streamProxyByAppAndStream != null && streamProxyByAppAndStream.isEnableDisableNoneReader()) {
start(event.getApp(), event.getStream());
startByAppAndStream(event.getApp(), event.getStream());
}
}
@@ -129,58 +132,80 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
@Override
@Transactional
public StreamInfo save(StreamProxy streamProxy) {
MediaServer mediaServer;
if (ObjectUtils.isEmpty(streamProxy.getMediaServerId()) || "auto".equals(streamProxy.getMediaServerId())){
mediaServer = mediaServerService.getMediaServerForMinimumLoad(null);
public StreamInfo save(StreamProxyParam param) {
// 兼容旧接口
StreamProxy streamProxyInDb = getStreamProxyByAppAndStream(param.getApp(), param.getStream());
if (streamProxyInDb != null && streamProxyInDb.getPulling()) {
stopProxy(streamProxyInDb);
}
if (streamProxyInDb == null){
return add(param.buildStreamProxy());
}else {
mediaServer = mediaServerService.getOne(streamProxy.getMediaServerId());
}
if (mediaServer == null) {
log.warn("保存代理未找到在线的ZLM...");
throw new ControllerException(ErrorCode.ERROR100.getCode(), "保存代理未找到在线的ZLM");
stopProxy(streamProxyInDb);
streamProxyMapper.delete(streamProxyInDb.getId());
return add(param.buildStreamProxy());
}
}
streamProxy.setMediaServerId(mediaServer.getId());
boolean saveResult;
// 更新
if (streamProxyMapper.selectOneByAppAndStream(streamProxy.getApp(), streamProxy.getStream()) != null) {
saveResult = updateStreamProxy(streamProxy);
}else { // 新增
saveResult = addStreamProxy(streamProxy);
@Override
@Transactional
public StreamInfo add(StreamProxy streamProxy) {
StreamProxy streamProxyInDb = streamProxyMapper.selectOneByAppAndStream(streamProxy.getApp(), streamProxy.getStream());
if (streamProxyInDb != null) {
throw new ControllerException(ErrorCode.ERROR100.getCode(), "APP+STREAM已经存在");
}
if (!saveResult) {
throw new ControllerException(ErrorCode.ERROR100.getCode(), "保存失败");
if (streamProxy.getGbDeviceId() != null) {
gbChannelService.add(streamProxy.buildCommonGBChannel());
}
streamProxy.setCreateTime(DateUtil.getNow());
streamProxy.setUpdateTime(DateUtil.getNow());
streamProxyMapper.add(streamProxy);
streamProxy.setStreamProxyId(streamProxy.getId());
if (streamProxy.isEnable()) {
return mediaServerService.startProxy(mediaServer, streamProxy);
return startProxy(streamProxy);
}
return null;
}
/**
* 新增代理流
*/
@Transactional
public boolean addStreamProxy(StreamProxy streamProxy) {
String now = DateUtil.getNow();
streamProxy.setCreateTime(now);
streamProxy.setUpdateTime(now);
if (streamProxyMapper.add(streamProxy) > 0 && !ObjectUtils.isEmpty(streamProxy.getGbDeviceId())) {
gbChannelService.add(streamProxy.buildCommonGBChannel());
@Override
public void delte(int id) {
StreamProxy streamProxy = getStreamProxy(id);
if (streamProxy == null) {
throw new ControllerException(ErrorCode.ERROR100.getCode(), "代理不存在");
}
return true;
delte(streamProxy);
}
private void delte(StreamProxy streamProxy) {
if (streamProxy.getPulling()) {
stopProxy(streamProxy);
}
if(streamProxy.getGbId() > 0) {
gbChannelService.delete(streamProxy.getGbId());
}
streamProxyMapper.delete(streamProxy.getId());
}
@Override
@Transactional
public void delteByAppAndStream(String app, String stream) {
StreamProxy streamProxy = streamProxyMapper.selectOneByAppAndStream(app, stream);
if (streamProxy == null) {
throw new ControllerException(ErrorCode.ERROR100.getCode(), "代理不存在");
}
delte(streamProxy);
}
/**
* 更新代理流
*/
@Override
public boolean updateStreamProxy(StreamProxy streamProxy) {
public boolean update(StreamProxy streamProxy) {
streamProxy.setUpdateTime(DateUtil.getNow());
StreamProxy streamProxyInDb = streamProxyMapper.select(streamProxy.getId());
if (streamProxyInDb == null) {
throw new ControllerException(ErrorCode.ERROR100.getCode(), "代理不存在");
}
if (streamProxyMapper.update(streamProxy) > 0 && !ObjectUtils.isEmpty(streamProxy.getGbDeviceId())) {
if (streamProxy.getGbId() > 0) {
gbChannelService.update(streamProxy.buildCommonGBChannel());
@@ -188,6 +213,15 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
gbChannelService.add(streamProxy.buildCommonGBChannel());
}
}
// 判断是否需要重启代理
if (!streamProxyInDb.getApp().equals(streamProxy.getApp())
|| !streamProxyInDb.getStream().equals(streamProxy.getStream())
|| !streamProxyInDb.getMediaServerId().equals(streamProxy.getMediaServerId())
) {
// app/stream 变化则重启代理
stopProxy(streamProxyInDb);
startProxy(streamProxy);
}
return true;
}
@@ -198,63 +232,47 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
return new PageInfo<>(all);
}
@Override
@Transactional
public void del(String app, String stream) {
StreamProxy streamProxy = streamProxyMapper.selectOneByAppAndStream(app, stream);
if (streamProxy == null) {
return;
}
if (streamProxy.getStreamKey() != null) {
MediaServer mediaServer = mediaServerService.getOne(streamProxy.getMediaServerId());
if (mediaServer != null) {
mediaServerService.stopProxy(mediaServer, streamProxy.getStreamKey());
}
}
if (streamProxy.getGbId() > 0) {
gbChannelService.delete(streamProxy.getGbId());
}
streamProxyMapper.delete(streamProxy.getId());
}
@Override
public boolean start(String app, String stream) {
public boolean startByAppAndStream(String app, String stream) {
StreamProxy streamProxy = streamProxyMapper.selectOneByAppAndStream(app, stream);
if (streamProxy == null) {
throw new ControllerException(ErrorCode.ERROR404.getCode(), "代理信息未找到");
}
StreamInfo streamInfo = startProxy(streamProxy);
return streamInfo != null;
}
@Override
public void stopByAppAndStream(String app, String stream) {
StreamProxy streamProxy = streamProxyMapper.selectOneByAppAndStream(app, stream);
if (streamProxy == null) {
throw new ControllerException(ErrorCode.ERROR404.getCode(), "代理信息未找到");
}
stopProxy(streamProxy);
}
private void stopProxy(StreamProxy streamProxy){
MediaServer mediaServer;
if (ObjectUtils.isEmpty(streamProxy.getMediaServerId()) || "auto".equals(streamProxy.getMediaServerId())){
String mediaServerId = streamProxy.getMediaServerId();
if (mediaServerId == null) {
mediaServer = mediaServerService.getMediaServerForMinimumLoad(null);
}else {
mediaServer = mediaServerService.getOne(streamProxy.getMediaServerId());
mediaServer = mediaServerService.getOne(mediaServerId);
}
if (mediaServer == null) {
log.warn("[启用代理] 未找到可用的媒体节点");
throw new ControllerException(ErrorCode.ERROR100.getCode(), "未找到可用的媒体节点");
}
StreamInfo streamInfo = mediaServerService.startProxy(mediaServer, streamProxy);
if (streamInfo == null) {
log.warn("[启用代理] 失败");
throw new ControllerException(ErrorCode.ERROR100.getCode(), "失败");
if (ObjectUtils.isEmpty(streamProxy.getStreamKey())) {
mediaServerService.closeStreams(mediaServer, streamProxy.getApp(), streamProxy.getStream());
}else {
mediaServerService.stopProxy(mediaServer, streamProxy.getStreamKey());
}
if (!streamProxy.isEnable()) {
updateStreamProxy(streamProxy);
}
return true;
}
@Override
public void stop(String app, String stream) {
StreamProxy streamProxy = streamProxyMapper.selectOneByAppAndStream(app, stream);
if (streamProxy == null) {
throw new ControllerException(ErrorCode.ERROR404.getCode(), "代理信息未找到");
}
MediaServer mediaServer = mediaServerService.getOne(streamProxy.getMediaServerId());
if (mediaServer == null) {
throw new ControllerException(ErrorCode.ERROR100.getCode(), "未找到启用时使用的媒体节点");
}
mediaServerService.stopProxy(mediaServer, streamProxy.getStreamKey());
streamProxy.setMediaServerId(mediaServer.getId());
streamProxy.setStreamKey(null);
streamProxy.setPulling(false);
streamProxyMapper.update(streamProxy);
}
@Override
@@ -264,8 +282,8 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
@Override
public StreamProxy getStreamProxyByAppAndStream(String app, String streamId) {
return streamProxyMapper.selectOneByAppAndStream(app, streamId);
public StreamProxy getStreamProxyByAppAndStream(String app, String stream) {
return streamProxyMapper.selectOneByAppAndStream(app, stream);
}
@Override
@@ -387,16 +405,19 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
}
}
@Override
@Transactional
public int updateStatusByAppAndStream(String app, String stream, boolean status) {
public void streamChangeHandler(String app, String stream, String mediaServerId, boolean status) {
// 状态变化时推送到国标上级
StreamProxy streamProxy = streamProxyMapper.selectOneByAppAndStream(app, stream);
if (streamProxy == null) {
return 0;
return;
}
streamProxy.setPulling(true);
streamProxyMapper.online(streamProxy.getId());
streamProxy.setPulling(status);
if (!mediaServerId.equals(streamProxy.getMediaServerId())) {
streamProxy.setMediaServerId(mediaServerId);
}
streamProxy.setUpdateTime(DateUtil.getNow());
streamProxyMapper.update(streamProxy);
streamProxy.setGbStatus(status?"ON":"OFF");
if (streamProxy.getGbId() > 0) {
if (status) {
@@ -405,7 +426,6 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
gbChannelService.offline(streamProxy.buildCommonGBChannel());
}
}
return 1;
}
@Override
@@ -417,21 +437,16 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
return new ResourceBaseInfo(total, online);
}
@Override
@Transactional
public StreamInfo add(StreamProxy streamProxy) {
StreamProxy streamProxyInDb = streamProxyMapper.selectOneByAppAndStream(streamProxy.getApp(), streamProxy.getStream());
if (streamProxyInDb != null) {
throw new ControllerException(ErrorCode.ERROR100.getCode(), "APP+STREAM已经存在");
public boolean start(int id) {
StreamProxy streamProxy = streamProxyMapper.select(id);
if (streamProxy == null) {
throw new ControllerException(ErrorCode.ERROR404.getCode(), "代理信息未找到");
}
if (streamProxy.getGbDeviceId() != null) {
gbChannelService.add(streamProxy.buildCommonGBChannel());
}
streamProxyMapper.add(streamProxy);
if (streamProxy.isEnable()) {
return startProxy(streamProxy);
}
return null;
StreamInfo streamInfo = startProxy(streamProxy);
return streamInfo != null;
}
private StreamInfo startProxy(StreamProxy streamProxy){
@@ -440,7 +455,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
}
MediaServer mediaServer;
String mediaServerId = streamProxy.getMediaServerId();
if (mediaServerId == null || "auto".equals(mediaServerId)) {
if (mediaServerId == null) {
mediaServer = mediaServerService.getMediaServerForMinimumLoad(null);
}else {
mediaServer = mediaServerService.getOne(mediaServerId);
@@ -448,10 +463,22 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
if (mediaServer == null) {
throw new ControllerException(ErrorCode.ERROR100.getCode(), "未找到可用的媒体节点");
}
return mediaServerService.startProxy(mediaServer, streamProxy);
StreamInfo streamInfo = mediaServerService.startProxy(mediaServer, streamProxy);
if (mediaServerId == null) {
streamProxy.setMediaServerId(mediaServer.getId());
update(streamProxy);
}
return streamInfo;
}
@Override
public StreamProxy getStreamProxy(int id) {
return streamProxyMapper.select(id);
}
// @Scheduled(cron = "* 0/10 * * * ?")
// public void asyncCheckStreamProxyStatus() {
//