临时提交

This commit is contained in:
648540858
2024-07-17 17:56:26 +08:00
parent 15f44b0d23
commit e46ad4e66a
11 changed files with 149 additions and 106 deletions

View File

@@ -63,13 +63,7 @@ public class StreamProxy extends CommonGBChannel {
private String streamKey;
@Schema(description = "拉流状态")
private Boolean status;
@Schema(description = "更新时间")
private String updateTime;
@Schema(description = "创建时间")
private String createTime;
private Boolean pulling;
public CommonGBChannel buildCommonGBChannel() {
if (ObjectUtils.isEmpty(this.getGbDeviceId())) {

View File

@@ -44,15 +44,18 @@ public class StreamProxyController {
@Parameter(name = "page", description = "当前页")
@Parameter(name = "count", description = "每页查询数量")
@Parameter(name = "query", description = "查询内容")
@Parameter(name = "online", description = "是否在线")
@Parameter(name = "query", description = "查询内容")
@Parameter(name = "pulling", description = "是否正在拉流")
@Parameter(name = "mediaServerId", description = "流媒体ID")
@GetMapping(value = "/list")
@ResponseBody
public PageInfo<StreamProxy> list(@RequestParam(required = false)Integer page,
@RequestParam(required = false)Integer count,
@RequestParam(required = false)String query,
@RequestParam(required = false)Boolean online ){
@RequestParam(required = false)Boolean pulling,
@RequestParam(required = false)String mediaServerId){
return streamProxyService.getAll(page, count);
return streamProxyService.getAll(page, count, query, pulling, mediaServerId);
}
@Operation(summary = "查询流代理", security = @SecurityRequirement(name = JwtUtils.HEADER))
@@ -65,7 +68,7 @@ public class StreamProxyController {
return streamProxyService.getStreamProxyByAppAndStream(app, stream);
}
@Operation(summary = "保存代理", security = @SecurityRequirement(name = JwtUtils.HEADER), parameters = {
@Operation(summary = "保存代理(已存在会覆盖)", security = @SecurityRequirement(name = JwtUtils.HEADER), parameters = {
@Parameter(name = "param", description = "代理参数", required = true),
})
@PostMapping(value = "/save")
@@ -99,6 +102,40 @@ public class StreamProxyController {
}
@Operation(summary = "新增代理", security = @SecurityRequirement(name = JwtUtils.HEADER), parameters = {
@Parameter(name = "param", description = "代理参数", required = true),
})
@PostMapping(value = "/add")
@ResponseBody
public StreamContent add(@RequestBody StreamProxy param){
log.info("添加代理: " + JSONObject.toJSONString(param));
if (ObjectUtils.isEmpty(param.getMediaServerId())) {
param.setMediaServerId("auto");
}
if (ObjectUtils.isEmpty(param.getType())) {
param.setType("default");
}
if (ObjectUtils.isEmpty(param.getGbId())) {
param.setGbDeviceId(null);
}
StreamProxy streamProxyItem = streamProxyService.getStreamProxyByAppAndStream(param.getApp(), param.getStream());
if (streamProxyItem != null) {
streamProxyService.del(param.getApp(), param.getStream());
}
StreamInfo streamInfo = streamProxyService.add(param);
if (param.isEnable()) {
if (streamInfo == null) {
throw new ControllerException(ErrorCode.ERROR100.getCode(), ErrorCode.ERROR100.getMsg());
}else {
return new StreamContent(streamInfo);
}
}else {
return null;
}
}
@GetMapping(value = "/ffmpeg_cmd/list")
@ResponseBody
@Operation(summary = "获取ffmpeg.cmd模板", security = @SecurityRequirement(name = JwtUtils.HEADER))

View File

@@ -11,81 +11,86 @@ import java.util.List;
@Repository
public interface StreamProxyMapper {
@Insert("INSERT INTO wvp_stream_proxy (type, name, app, stream,media_server_id, url, src_url, dst_url, " +
"timeout_ms, ffmpeg_cmd_key, rtp_type, enable_audio, enable_mp4, enable, status, stream_key, enable_remove_none_reader, enable_disable_none_reader, create_time) VALUES" +
"(#{type}, #{name}, #{app}, #{stream}, #{mediaServerId}, #{url}, #{srcUrl}, #{dstUrl}, " +
"#{timeoutMs}, #{ffmpegCmdKey}, #{rtpType}, #{enableAudio}, #{enableMp4}, #{enable}, #{status}, #{streamKey}, " +
@Insert("INSERT INTO wvp_stream_proxy (type, name, app, stream,media_server_id, src_url, " +
"timeout, ffmpeg_cmd_key, rtsp_type, enable_audio, enable_mp4, enable, pulling, stream_key, " +
"enable_remove_none_reader, enable_disable_none_reader, create_time) VALUES" +
"(#{type}, #{name}, #{app}, #{stream}, #{mediaServerId}, #{srcUrl}, " +
"#{timeout}, #{ffmpegCmdKey}, #{rtspType}, #{enableAudio}, #{enableMp4}, #{enable}, #{pulling}, #{streamKey}, " +
"#{enableRemoveNoneReader}, #{enableDisableNoneReader}, #{createTime} )")
@Options(useGeneratedKeys = true, keyProperty = "id", keyColumn = "id")
int add(StreamProxy streamProxyDto);
@Update("UPDATE wvp_stream_proxy " +
"SET type=#{type}, " +
"app=#{app}," +
"stream=#{stream}," +
"name=#{name}," +
"app=#{app}," +
"stream=#{stream}," +
"url=#{url}, " +
"media_server_id=#{mediaServerId}, " +
"src_url=#{srcUrl}," +
"dst_url=#{dstUrl}, " +
"timeout_ms=#{timeoutMs}, " +
"timeout=#{timeout}, " +
"ffmpeg_cmd_key=#{ffmpegCmdKey}, " +
"rtp_type=#{rtpType}, " +
"rtsp_type=#{rtspType}, " +
"enable_audio=#{enableAudio}, " +
"enable=#{enable}, " +
"status=#{status}, " +
"pulling=#{pulling}, " +
"stream_key=#{streamKey}, " +
"enable_remove_none_reader=#{enableRemoveNoneReader}, " +
"enable_disable_none_reader=#{enableDisableNoneReader}, " +
"enable_mp4=#{enableMp4} " +
"WHERE app=#{app} AND stream=#{stream}")
"WHERE id=#{id}")
int update(StreamProxy streamProxyDto);
@Delete("DELETE FROM wvp_stream_proxy WHERE app=#{app} AND stream=#{stream}")
int del(String app, String stream);
int delByAppAndStream(String app, String stream);
@Select("SELECT st.*, pgs.gb_id, pgs.name, pgs.longitude, pgs.latitude FROM wvp_stream_proxy st LEFT join wvp_gb_stream pgs on st.app = pgs.app AND st.stream = pgs.stream order by st.create_time desc")
List<StreamProxy> selectAll();
@Select("SELECT " +
" st.*, " +
" st.id as stream_proxy_id, " +
" wdc.*, " +
" wdc.id as gb_id" +
" FROM wvp_stream_proxy st " +
" LEFT join wvp_device_channel wdc " +
" on st.id = wdc.stream_proxy_id " +
" WHERE " +
" 1=1 " +
" <if test='query != null'> AND (st.app LIKE concat('%',#{query},'%') OR st.stream LIKE concat('%',#{query},'%') " +
" OR wdc.gb_device_id LIKE concat('%',#{query},'%') OR wdc.gb_name LIKE concat('%',#{query},'%'))</if> " +
" <if test='pulling == true' > AND st.pulling=1</if>" +
" <if test='pulling == false' > AND st.pulling=0 </if>" +
" <if test='mediaServerId != null' > AND st.media_server_id=#{mediaServerId} </if>" +
"order by st.create_time desc")
List<StreamProxy> selectAll(@Param("query") String query, @Param("pushing") Boolean pushing, @Param("mediaServerId") String mediaServerId);
@Select("SELECT st.*, pgs.gb_id, pgs.name, pgs.longitude, pgs.latitude, 'proxy' as streamType FROM wvp_stream_proxy st LEFT join wvp_gb_stream pgs on st.app = pgs.app AND st.stream = pgs.stream WHERE st.enable=#{enable} order by st.create_time desc")
List<StreamProxy> selectForEnable(boolean enable);
@Select("SELECT " +
" st.*, " +
" st.id as stream_proxy_id, " +
" wdc.*, " +
" wdc.id as gb_id" +
" FROM wvp_stream_proxy st " +
" LEFT join wvp_device_channel wdc " +
" on st.id = wdc.stream_proxy_id " +
" WHERE st.app=#{app} AND st.stream=#{stream} order by st.create_time desc")
StreamProxy selectOneByAppAndStream(@Param("app") String app, @Param("stream") String stream);
@Select("SELECT st.*, pgs.gb_id, pgs.name, pgs.longitude, pgs.latitude FROM wvp_stream_proxy st LEFT join wvp_gb_stream pgs on st.app = pgs.app AND st.stream = pgs.stream WHERE st.app=#{app} AND st.stream=#{stream} order by st.create_time desc")
StreamProxy selectOne(@Param("app") String app, @Param("stream") String stream);
@Select("SELECT st.*, pgs.gb_id, pgs.name, pgs.longitude, pgs.latitude FROM wvp_stream_proxy st " +
"LEFT join wvp_gb_stream pgs on st.app = pgs.app AND st.stream = pgs.stream " +
@Select("SELECT " +
" st.*, " +
" st.id as stream_proxy_id, " +
" wdc.*, " +
" wdc.id as gb_id" +
" FROM wvp_stream_proxy st " +
" LEFT join wvp_device_channel wdc " +
" on st.id = wdc.stream_proxy_id " +
"WHERE st.enable=#{enable} and st.media_server_id= #{id} order by st.create_time desc")
List<StreamProxy> selectForEnableInMediaServer(@Param("id") String id, @Param("enable") boolean enable);
@Select("SELECT st.*, pgs.gb_id, pgs.name, pgs.longitude, pgs.latitude FROM wvp_stream_proxy st " +
"LEFT join wvp_gb_stream pgs on st.app = pgs.app AND st.stream = pgs.stream " +
"WHERE st.media_server_id= #{id} order by st.create_time desc")
List<StreamProxy> selectInMediaServer(String id);
@Update("UPDATE wvp_stream_proxy " +
"SET status=#{status} " +
"WHERE media_server_id=#{mediaServerId}")
void updateStatusByMediaServerId(@Param("mediaServerId") String mediaServerId, @Param("status") boolean status);
@Update("UPDATE wvp_stream_proxy " +
"SET status=#{status} " +
"WHERE app=#{app} AND stream=#{stream}")
int updateStatus(@Param("app") String app, @Param("stream") String stream, @Param("status") boolean status);
@Delete("DELETE FROM wvp_stream_proxy WHERE enable_remove_none_reader=true AND media_server_id=#{mediaServerId}")
void deleteAutoRemoveItemByMediaServerId(String mediaServerId);
@Select("SELECT st.*, pgs.gb_id, pgs.name, pgs.longitude, pgs.latitude FROM wvp_stream_proxy st LEFT join wvp_gb_stream pgs on st.app = pgs.app AND st.stream = pgs.stream WHERE st.enable_remove_none_reader=true AND st.media_server_id=#{mediaServerId} order by st.create_time desc")
List<StreamProxy> selectAutoRemoveItemByMediaServerId(String mediaServerId);
@Select("select count(1) as total, sum(status) as online from wvp_stream_proxy")
ResourceBaseInfo getOverview();
@Select("select count(1) from wvp_stream_proxy")
int getAllCount();
@Select("select count(1) from wvp_stream_proxy where status = true")
@Select("select count(1) from wvp_stream_proxy where pulling = true")
int getOnline();
@Delete("DELETE FROM wvp_stream_proxy WHERE id=#{id}")
@@ -101,12 +106,12 @@ public interface StreamProxyMapper {
void deleteByList(List<StreamProxy> streamProxiesForRemove);
@Update("UPDATE wvp_stream_proxy " +
"SET status=true " +
"SET pulling=true " +
"WHERE id=#{id}")
int online(@Param("id") int id);
@Update("UPDATE wvp_stream_proxy " +
"SET status=false " +
"SET pulling=false " +
"WHERE id=#{id}")
int offline(@Param("id") int id);
}

View File

@@ -22,7 +22,7 @@ public interface IStreamProxyService {
* @param count
* @return
*/
PageInfo<StreamProxy> getAll(Integer page, Integer count);
PageInfo<StreamProxy> getAll(Integer page, Integer count, String query, Boolean pulling,String mediaServerId);
/**
* 删除视频代理
@@ -96,4 +96,5 @@ public interface IStreamProxyService {
*/
ResourceBaseInfo getOverview();
StreamInfo add(StreamProxy streamProxy);
}

View File

@@ -144,7 +144,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
streamProxy.setMediaServerId(mediaServer.getId());
boolean saveResult;
// 更新
if (streamProxyMapper.selectOne(streamProxy.getApp(), streamProxy.getStream()) != null) {
if (streamProxyMapper.selectOneByAppAndStream(streamProxy.getApp(), streamProxy.getStream()) != null) {
saveResult = updateStreamProxy(streamProxy);
}else { // 新增
saveResult = addStreamProxy(streamProxy);
@@ -192,16 +192,16 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
}
@Override
public PageInfo<StreamProxy> getAll(Integer page, Integer count) {
public PageInfo<StreamProxy> getAll(Integer page, Integer count, String query, Boolean pulling,String mediaServerId) {
PageHelper.startPage(page, count);
List<StreamProxy> all = streamProxyMapper.selectAll();
List<StreamProxy> all = streamProxyMapper.selectAll(query, pulling, mediaServerId);
return new PageInfo<>(all);
}
@Override
@Transactional
public void del(String app, String stream) {
StreamProxy streamProxy = streamProxyMapper.selectOne(app, stream);
StreamProxy streamProxy = streamProxyMapper.selectOneByAppAndStream(app, stream);
if (streamProxy == null) {
return;
}
@@ -219,7 +219,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
@Override
public boolean start(String app, String stream) {
StreamProxy streamProxy = streamProxyMapper.selectOne(app, stream);
StreamProxy streamProxy = streamProxyMapper.selectOneByAppAndStream(app, stream);
if (streamProxy == null) {
throw new ControllerException(ErrorCode.ERROR404.getCode(), "代理信息未找到");
}
@@ -246,7 +246,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
@Override
public void stop(String app, String stream) {
StreamProxy streamProxy = streamProxyMapper.selectOne(app, stream);
StreamProxy streamProxy = streamProxyMapper.selectOneByAppAndStream(app, stream);
if (streamProxy == null) {
throw new ControllerException(ErrorCode.ERROR404.getCode(), "代理信息未找到");
}
@@ -265,7 +265,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
@Override
public StreamProxy getStreamProxyByAppAndStream(String app, String streamId) {
return streamProxyMapper.selectOne(app, streamId);
return streamProxyMapper.selectOneByAppAndStream(app, streamId);
}
@Override
@@ -391,11 +391,11 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
@Transactional
public int updateStatusByAppAndStream(String app, String stream, boolean status) {
// 状态变化时推送到国标上级
StreamProxy streamProxy = streamProxyMapper.selectOne(app, stream);
StreamProxy streamProxy = streamProxyMapper.selectOneByAppAndStream(app, stream);
if (streamProxy == null) {
return 0;
}
streamProxy.setStatus(true);
streamProxy.setPulling(true);
streamProxyMapper.online(streamProxy.getId());
streamProxy.setGbStatus(status?"ON":"OFF");
if (streamProxy.getGbId() > 0) {
@@ -404,7 +404,6 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
}else {
gbChannelService.offline(streamProxy.buildCommonGBChannel());
}
}
return 1;
}
@@ -418,8 +417,24 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
return new ResourceBaseInfo(total, online);
}
@Override
@Transactional
public StreamInfo add(StreamProxy streamProxy) {
StreamProxy streamProxyInDb = streamProxyMapper.selectOneByAppAndStream(streamProxy.getApp(), streamProxy.getStream());
if (streamProxyInDb != null) {
throw new ControllerException(ErrorCode.ERROR100.getCode(), "APP+STREAM已经存在");
}
if (streamProxy.getGbDeviceId() != null) {
gbChannelService.add(streamProxy.buildCommonGBChannel());
}
streamProxyMapper.add(streamProxy);
if (streamProxy.isEnable()) {
// start()
}
return null;
}
// @Scheduled(cron = "* 0/10 * * * ?")
// @Scheduled(cron = "* 0/10 * * * ?")
// public void asyncCheckStreamProxyStatus() {
//
// List<MediaServer> all = mediaServerService.getAllOnline();