临时提交

This commit is contained in:
648540858
2024-06-27 18:41:58 +08:00
parent b2aa5c839d
commit 88c021948f
16 changed files with 42 additions and 42 deletions

View File

@@ -0,0 +1,64 @@
package com.genersoft.iot.vmp.streamProxy.bean;
import com.genersoft.iot.vmp.gb28181.bean.CommonGBChannel;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data;
import lombok.EqualsAndHashCode;
/**
* @author lin
*/
@Data
@Schema(description = "拉流代理的信息")
@EqualsAndHashCode(callSuper = true)
public class StreamProxy extends CommonGBChannel {
/**
* 数据库自增ID
*/
@Schema(description = "数据库自增ID")
private int id;
@Schema(description = "类型取值default 流媒体直接拉流默认ffmpeg ffmpeg实现拉流")
private String type;
@Schema(description = "应用名")
private String app;
@Schema(description = "流ID")
private String stream;
@Schema(description = "流媒体服务ID")
private String mediaServerId;
@Schema(description = "拉流地址")
private String srcUrl;
@Schema(description = "超时时间")
private int timeout;
@Schema(description = "ffmpeg模板KEY")
private String ffmpegCmdKey;
@Schema(description = "rtsp拉流时拉流方式0tcp1udp2组播")
private String rtspType;
@Schema(description = "是否启用")
private boolean enable;
@Schema(description = "是否启用音频")
private boolean enableAudio;
@Schema(description = "是否启用MP4")
private boolean enableMp4;
@Schema(description = "是否 无人观看时删除")
private boolean enableRemoveNoneReader;
@Schema(description = "是否 无人观看时自动停用")
private boolean enableDisableNoneReader;
@Schema(description = "拉流代理时zlm返回的key用于停止拉流代理")
private String streamKey;
}

View File

@@ -0,0 +1,189 @@
package com.genersoft.iot.vmp.streamProxy.controller;
import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.exception.ControllerException;
import com.genersoft.iot.vmp.conf.security.JwtUtils;
import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder;
import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage;
import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.streamProxy.bean.StreamProxy;
import com.genersoft.iot.vmp.media.service.IMediaServerService;
import com.genersoft.iot.vmp.streamProxy.service.IStreamProxyService;
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
import com.genersoft.iot.vmp.vmanager.bean.StreamContent;
import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
import com.github.pagehelper.PageInfo;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.security.SecurityRequirement;
import io.swagger.v3.oas.annotations.tags.Tag;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.util.ObjectUtils;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.context.request.async.DeferredResult;
import java.util.Map;
import java.util.UUID;
@SuppressWarnings("rawtypes")
/**
* 拉流代理接口
*/
@Tag(name = "拉流代理", description = "")
@Controller
@RequestMapping(value = "/api/proxy")
public class StreamProxyController {
private final static Logger logger = LoggerFactory.getLogger(StreamProxyController.class);
@Autowired
private IMediaServerService mediaServerService;
@Autowired
private IStreamProxyService streamProxyService;
@Autowired
private DeferredResultHolder resultHolder;
@Autowired
private UserSetting userSetting;
@Operation(summary = "分页查询流代理", security = @SecurityRequirement(name = JwtUtils.HEADER))
@Parameter(name = "page", description = "当前页")
@Parameter(name = "count", description = "每页查询数量")
@Parameter(name = "query", description = "查询内容")
@Parameter(name = "online", description = "是否在线")
@GetMapping(value = "/list")
@ResponseBody
public PageInfo<StreamProxy> list(@RequestParam(required = false)Integer page,
@RequestParam(required = false)Integer count,
@RequestParam(required = false)String query,
@RequestParam(required = false)Boolean online ){
return streamProxyService.getAll(page, count);
}
@Operation(summary = "查询流代理", security = @SecurityRequirement(name = JwtUtils.HEADER))
@Parameter(name = "app", description = "应用名")
@Parameter(name = "stream", description = "流Id")
@GetMapping(value = "/one")
@ResponseBody
public StreamProxy one(String app, String stream){
return streamProxyService.getStreamProxyByAppAndStream(app, stream);
}
@Operation(summary = "保存代理", security = @SecurityRequirement(name = JwtUtils.HEADER), parameters = {
@Parameter(name = "param", description = "代理参数", required = true),
})
@PostMapping(value = "/save")
@ResponseBody
public DeferredResult<Object> save(@RequestBody StreamProxy param){
logger.info("添加代理: " + JSONObject.toJSONString(param));
if (ObjectUtils.isEmpty(param.getMediaServerId())) {
param.setMediaServerId("auto");
}
if (ObjectUtils.isEmpty(param.getType())) {
param.setType("default");
}
if (ObjectUtils.isEmpty(param.getRtpType())) {
param.setRtpType("1");
}
if (ObjectUtils.isEmpty(param.getGbId())) {
param.setGbId(null);
}
StreamProxy streamProxyItem = streamProxyService.getStreamProxyByAppAndStream(param.getApp(), param.getStream());
if (streamProxyItem != null) {
streamProxyService.del(param.getApp(), param.getStream());
}
RequestMessage requestMessage = new RequestMessage();
String key = DeferredResultHolder.CALLBACK_CMD_PROXY + param.getApp() + param.getStream();
requestMessage.setKey(key);
String uuid = UUID.randomUUID().toString();
requestMessage.setId(uuid);
DeferredResult<Object> result = new DeferredResult<>(userSetting.getPlayTimeout().longValue());
// 录像查询以channelId作为deviceId查询
resultHolder.put(key, uuid, result);
result.onTimeout(()->{
WVPResult<StreamInfo> wvpResult = new WVPResult<>();
wvpResult.setCode(ErrorCode.ERROR100.getCode());
wvpResult.setMsg("超时");
requestMessage.setData(wvpResult);
resultHolder.invokeAllResult(requestMessage);
});
streamProxyService.save(param, (code, msg, streamInfo) -> {
logger.info("[拉流代理] {}", code == ErrorCode.SUCCESS.getCode()? "成功":"失败: " + msg);
if (code == ErrorCode.SUCCESS.getCode()) {
requestMessage.setData(new StreamContent(streamInfo));
}else {
requestMessage.setData(WVPResult.fail(code, msg));
}
resultHolder.invokeAllResult(requestMessage);
});
return result;
}
@GetMapping(value = "/ffmpeg_cmd/list")
@ResponseBody
@Operation(summary = "获取ffmpeg.cmd模板", security = @SecurityRequirement(name = JwtUtils.HEADER))
@Parameter(name = "mediaServerId", description = "流媒体ID", required = true)
public Map<String, String> getFFmpegCMDs(@RequestParam String mediaServerId){
logger.debug("获取节点[ {} ]ffmpeg.cmd模板", mediaServerId );
MediaServer mediaServerItem = mediaServerService.getOne(mediaServerId);
if (mediaServerItem == null) {
throw new ControllerException(ErrorCode.ERROR100.getCode(), "流媒体: " + mediaServerId + "未找到");
}
return streamProxyService.getFFmpegCMDs(mediaServerItem);
}
@DeleteMapping(value = "/del")
@ResponseBody
@Operation(summary = "移除代理", security = @SecurityRequirement(name = JwtUtils.HEADER))
@Parameter(name = "app", description = "应用名", required = true)
@Parameter(name = "stream", description = "流id", required = true)
public void del(@RequestParam String app, @RequestParam String stream){
logger.info("移除代理: " + app + "/" + stream);
if (app == null || stream == null) {
throw new ControllerException(ErrorCode.ERROR400.getCode(), app == null ?"app不能为null":"stream不能为null");
}else {
streamProxyService.del(app, stream);
}
}
@GetMapping(value = "/start")
@ResponseBody
@Operation(summary = "启用代理", security = @SecurityRequirement(name = JwtUtils.HEADER))
@Parameter(name = "app", description = "应用名", required = true)
@Parameter(name = "stream", description = "流id", required = true)
public void start(String app, String stream){
logger.info("启用代理: " + app + "/" + stream);
boolean result = streamProxyService.start(app, stream);
if (!result) {
throw new ControllerException(ErrorCode.ERROR100);
}
}
@GetMapping(value = "/stop")
@ResponseBody
@Operation(summary = "停用代理", security = @SecurityRequirement(name = JwtUtils.HEADER))
@Parameter(name = "app", description = "应用名", required = true)
@Parameter(name = "stream", description = "流id", required = true)
public void stop(String app, String stream){
logger.info("停用代理: " + app + "/" + stream);
boolean result = streamProxyService.stop(app, stream);
if (!result) {
logger.info("停用代理失败: " + app + "/" + stream);
throw new ControllerException(ErrorCode.ERROR100);
}
}
}

View File

@@ -0,0 +1,90 @@
package com.genersoft.iot.vmp.streamProxy.dao;
import com.genersoft.iot.vmp.streamProxy.bean.StreamProxy;
import com.genersoft.iot.vmp.vmanager.bean.ResourceBaseInfo;
import org.apache.ibatis.annotations.*;
import org.springframework.stereotype.Repository;
import java.util.List;
@Mapper
@Repository
public interface StreamProxyMapper {
@Insert("INSERT INTO wvp_stream_proxy (type, name, app, stream,media_server_id, url, src_url, dst_url, " +
"timeout_ms, ffmpeg_cmd_key, rtp_type, enable_audio, enable_mp4, enable, status, stream_key, enable_remove_none_reader, enable_disable_none_reader, create_time) VALUES" +
"(#{type}, #{name}, #{app}, #{stream}, #{mediaServerId}, #{url}, #{srcUrl}, #{dstUrl}, " +
"#{timeoutMs}, #{ffmpegCmdKey}, #{rtpType}, #{enableAudio}, #{enableMp4}, #{enable}, #{status}, #{streamKey}, " +
"#{enableRemoveNoneReader}, #{enableDisableNoneReader}, #{createTime} )")
int add(StreamProxy streamProxyDto);
@Update("UPDATE wvp_stream_proxy " +
"SET type=#{type}, " +
"name=#{name}," +
"app=#{app}," +
"stream=#{stream}," +
"url=#{url}, " +
"media_server_id=#{mediaServerId}, " +
"src_url=#{srcUrl}," +
"dst_url=#{dstUrl}, " +
"timeout_ms=#{timeoutMs}, " +
"ffmpeg_cmd_key=#{ffmpegCmdKey}, " +
"rtp_type=#{rtpType}, " +
"enable_audio=#{enableAudio}, " +
"enable=#{enable}, " +
"status=#{status}, " +
"stream_key=#{streamKey}, " +
"enable_remove_none_reader=#{enableRemoveNoneReader}, " +
"enable_disable_none_reader=#{enableDisableNoneReader}, " +
"enable_mp4=#{enableMp4} " +
"WHERE app=#{app} AND stream=#{stream}")
int update(StreamProxy streamProxyDto);
@Delete("DELETE FROM wvp_stream_proxy WHERE app=#{app} AND stream=#{stream}")
int del(String app, String stream);
@Select("SELECT st.*, pgs.gb_id, pgs.name, pgs.longitude, pgs.latitude FROM wvp_stream_proxy st LEFT join wvp_gb_stream pgs on st.app = pgs.app AND st.stream = pgs.stream order by st.create_time desc")
List<StreamProxy> selectAll();
@Select("SELECT st.*, pgs.gb_id, pgs.name, pgs.longitude, pgs.latitude, 'proxy' as streamType FROM wvp_stream_proxy st LEFT join wvp_gb_stream pgs on st.app = pgs.app AND st.stream = pgs.stream WHERE st.enable=#{enable} order by st.create_time desc")
List<StreamProxy> selectForEnable(boolean enable);
@Select("SELECT st.*, pgs.gb_id, pgs.name, pgs.longitude, pgs.latitude FROM wvp_stream_proxy st LEFT join wvp_gb_stream pgs on st.app = pgs.app AND st.stream = pgs.stream WHERE st.app=#{app} AND st.stream=#{stream} order by st.create_time desc")
StreamProxy selectOne(@Param("app") String app, @Param("stream") String stream);
@Select("SELECT st.*, pgs.gb_id, pgs.name, pgs.longitude, pgs.latitude FROM wvp_stream_proxy st " +
"LEFT join wvp_gb_stream pgs on st.app = pgs.app AND st.stream = pgs.stream " +
"WHERE st.enable=#{enable} and st.media_server_id= #{id} order by st.create_time desc")
List<StreamProxy> selectForEnableInMediaServer(@Param("id") String id, @Param("enable") boolean enable);
@Select("SELECT st.*, pgs.gb_id, pgs.name, pgs.longitude, pgs.latitude FROM wvp_stream_proxy st " +
"LEFT join wvp_gb_stream pgs on st.app = pgs.app AND st.stream = pgs.stream " +
"WHERE st.media_server_id= #{id} order by st.create_time desc")
List<StreamProxy> selectInMediaServer(String id);
@Update("UPDATE wvp_stream_proxy " +
"SET status=#{status} " +
"WHERE media_server_id=#{mediaServerId}")
void updateStatusByMediaServerId(@Param("mediaServerId") String mediaServerId, @Param("status") boolean status);
@Update("UPDATE wvp_stream_proxy " +
"SET status=#{status} " +
"WHERE app=#{app} AND stream=#{stream}")
int updateStatus(@Param("app") String app, @Param("stream") String stream, @Param("status") boolean status);
@Delete("DELETE FROM wvp_stream_proxy WHERE enable_remove_none_reader=true AND media_server_id=#{mediaServerId}")
void deleteAutoRemoveItemByMediaServerId(String mediaServerId);
@Select("SELECT st.*, pgs.gb_id, pgs.name, pgs.longitude, pgs.latitude FROM wvp_stream_proxy st LEFT join wvp_gb_stream pgs on st.app = pgs.app AND st.stream = pgs.stream WHERE st.enable_remove_none_reader=true AND st.media_server_id=#{mediaServerId} order by st.create_time desc")
List<StreamProxy> selectAutoRemoveItemByMediaServerId(String mediaServerId);
@Select("select count(1) as total, sum(status) as online from wvp_stream_proxy")
ResourceBaseInfo getOverview();
@Select("select count(1) from wvp_stream_proxy")
int getAllCount();
@Select("select count(1) from wvp_stream_proxy where status = true")
int getOnline();
}

View File

@@ -0,0 +1,119 @@
package com.genersoft.iot.vmp.streamProxy.service;
import com.genersoft.iot.vmp.common.GeneralCallback;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.streamProxy.bean.StreamProxy;
import com.genersoft.iot.vmp.vmanager.bean.ResourceBaseInfo;
import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
import com.github.pagehelper.PageInfo;
import java.util.Map;
public interface IStreamProxyService {
/**
* 保存视频代理
* @param param
*/
void save(StreamProxy param, GeneralCallback<StreamInfo> callback);
/**
* 添加视频代理到zlm
*
* @param param
* @return
*/
WVPResult<String> addStreamProxyToZlm(StreamProxy param);
/**
* 从zlm移除视频代理
*
* @param param
* @return
*/
Boolean removeStreamProxyFromZlm(StreamProxy param);
/**
* 分页查询
* @param page
* @param count
* @return
*/
PageInfo<StreamProxy> getAll(Integer page, Integer count);
/**
* 删除视频代理
* @param app
* @param stream
*/
void del(String app, String stream);
/**
* 启用视频代理
* @param app
* @param stream
* @return
*/
boolean start(String app, String stream);
/**
* 更新状态
* @param status 状态
* @param app
* @param stream
*/
int updateStatusByAppAndStream(String app, String stream, boolean status);
/**
* 停用用视频代理
* @param app
* @param stream
* @return
*/
boolean stop(String app, String stream);
/**
* 获取ffmpeg.cmd模板
*
* @return
*/
Map<String, String> getFFmpegCMDs(MediaServer mediaServerItem);
/**
* 根据app与stream获取streamProxy
* @return
*/
StreamProxy getStreamProxyByAppAndStream(String app, String streamId);
/**
* 新的节点加入
* @param mediaServerId
* @return
*/
void zlmServerOnline(String mediaServerId);
/**
* 节点离线
* @param mediaServerId
* @return
*/
void zlmServerOffline(String mediaServerId);
void clean();
/**
* 更新代理流
*/
boolean updateStreamProxy(StreamProxy streamProxyItem);
/**
* 获取统计信息
* @return
*/
ResourceBaseInfo getOverview();
}

View File

@@ -0,0 +1,619 @@
package com.genersoft.iot.vmp.streamProxy.service.impl;
import com.alibaba.fastjson2.JSONObject;
import com.baomidou.dynamic.datasource.annotation.DS;
import com.genersoft.iot.vmp.common.GeneralCallback;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.exception.ControllerException;
import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent;
import com.genersoft.iot.vmp.media.bean.MediaInfo;
import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.media.event.hook.Hook;
import com.genersoft.iot.vmp.media.event.hook.HookSubscribe;
import com.genersoft.iot.vmp.media.event.hook.HookType;
import com.genersoft.iot.vmp.media.event.media.MediaArrivalEvent;
import com.genersoft.iot.vmp.media.event.media.MediaDepartureEvent;
import com.genersoft.iot.vmp.media.event.media.MediaNotFoundEvent;
import com.genersoft.iot.vmp.media.service.IMediaServerService;
import com.genersoft.iot.vmp.streamProxy.bean.StreamProxy;
import com.genersoft.iot.vmp.service.IGbStreamService;
import com.genersoft.iot.vmp.streamProxy.service.IStreamProxyService;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
import com.genersoft.iot.vmp.storager.dao.GbStreamMapper;
import com.genersoft.iot.vmp.storager.dao.PlatformGbStreamMapper;
import com.genersoft.iot.vmp.streamProxy.dao.StreamProxyMapper;
import com.genersoft.iot.vmp.utils.DateUtil;
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
import com.genersoft.iot.vmp.vmanager.bean.ResourceBaseInfo;
import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
import com.github.pagehelper.PageHelper;
import com.github.pagehelper.PageInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.event.EventListener;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.TransactionStatus;
import org.springframework.util.CollectionUtils;
import org.springframework.util.ObjectUtils;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.function.Function;
import java.util.stream.Collectors;
/**
* 视频代理业务
*/
@Service
@DS("master")
public class StreamProxyServiceImpl implements IStreamProxyService {
private final static Logger logger = LoggerFactory.getLogger(StreamProxyServiceImpl.class);
@Autowired
private StreamProxyMapper streamProxyMapper;
@Autowired
private IRedisCatchStorage redisCatchStorage;
@Autowired
private IVideoManagerStorage storager;
@Autowired
private UserSetting userSetting;
@Autowired
private GbStreamMapper gbStreamMapper;
@Autowired
private PlatformGbStreamMapper platformGbStreamMapper;
@Autowired
private IGbStreamService gbStreamService;
@Autowired
private IMediaServerService mediaServerService;
@Autowired
private HookSubscribe hookSubscribe;
@Autowired
private DynamicTask dynamicTask;
@Autowired
DataSourceTransactionManager dataSourceTransactionManager;
@Autowired
TransactionDefinition transactionDefinition;
/**
* 流到来的处理
*/
@Async("taskExecutor")
@org.springframework.context.event.EventListener
public void onApplicationEvent(MediaArrivalEvent event) {
if ("rtsp".equals(event.getSchema())) {
updateStatusByAppAndStream(event.getApp(), event.getStream(), true);
}
}
/**
* 流离开的处理
*/
@Async("taskExecutor")
@EventListener
public void onApplicationEvent(MediaDepartureEvent event) {
if ("rtsp".equals(event.getSchema())) {
updateStatusByAppAndStream(event.getApp(), event.getStream(), false);
}
}
/**
* 流离开的处理
*/
@Async("taskExecutor")
@EventListener
public void onApplicationEvent(MediaNotFoundEvent event) {
if ("rtp".equals(event.getApp())) {
return;
}
// 拉流代理
StreamProxy streamProxyByAppAndStream = getStreamProxyByAppAndStream(event.getApp(), event.getStream());
if (streamProxyByAppAndStream != null && streamProxyByAppAndStream.isEnableDisableNoneReader()) {
start(event.getApp(), event.getStream());
}
}
@Override
public void save(StreamProxy param, GeneralCallback<StreamInfo> callback) {
MediaServer mediaServer;
if (ObjectUtils.isEmpty(param.getMediaServerId()) || "auto".equals(param.getMediaServerId())){
mediaServer = mediaServerService.getMediaServerForMinimumLoad(null);
}else {
mediaServer = mediaServerService.getOne(param.getMediaServerId());
}
if (mediaServer == null) {
logger.warn("保存代理未找到在线的ZLM...");
throw new ControllerException(ErrorCode.ERROR100.getCode(), "保存代理未找到在线的ZLM");
}
String dstUrl;
if ("ffmpeg".equalsIgnoreCase(param.getType())) {
String ffmpegCmd = mediaServerService.getFfmpegCmd(mediaServer, param.getFfmpegCmdKey());
if (ffmpegCmd == null) {
throw new ControllerException(ErrorCode.ERROR100.getCode(), "ffmpeg拉流代理无法获取ffmpeg cmd");
}
String schema = getSchemaFromFFmpegCmd(ffmpegCmd);
if (schema == null) {
throw new ControllerException(ErrorCode.ERROR100.getCode(), "ffmpeg拉流代理无法从ffmpeg cmd中获取到输出格式");
}
int port;
String schemaForUri;
if (schema.equalsIgnoreCase("rtsp")) {
port = mediaServer.getRtspPort();
schemaForUri = schema;
}else if (schema.equalsIgnoreCase("flv")) {
port = mediaServer.getRtmpPort();
schemaForUri = schema;
}else {
port = mediaServer.getRtmpPort();
schemaForUri = schema;
}
dstUrl = String.format("%s://%s:%s/%s/%s", schemaForUri, "127.0.0.1", port, param.getApp(),
param.getStream());
}else {
dstUrl = String.format("rtsp://%s:%s/%s/%s", "127.0.0.1", mediaServer.getRtspPort(), param.getApp(),
param.getStream());
}
param.setDstUrl(dstUrl);
logger.info("[拉流代理] 输出地址为:{}", dstUrl);
param.setMediaServerId(mediaServer.getId());
boolean saveResult;
// 更新
if (streamProxyMapper.selectOne(param.getApp(), param.getStream()) != null) {
saveResult = updateStreamProxy(param);
}else { // 新增
saveResult = addStreamProxy(param);
}
if (!saveResult) {
callback.run(ErrorCode.ERROR100.getCode(), "保存失败", null);
return;
}
Hook hook = Hook.getInstance(HookType.on_media_arrival, param.getApp(), param.getStream(), mediaServer.getId());
hookSubscribe.addSubscribe(hook, (hookData) -> {
StreamInfo streamInfo = mediaServerService.getStreamInfoByAppAndStream(
mediaServer, param.getApp(), param.getStream(), null, null);
callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo);
});
if (param.isEnable()) {
String talkKey = UUID.randomUUID().toString();
String delayTalkKey = UUID.randomUUID().toString();
dynamicTask.startDelay(delayTalkKey, ()->{
StreamInfo streamInfo = mediaServerService.getStreamInfoByAppAndStreamWithCheck(param.getApp(), param.getStream(), mediaServer.getId(), false);
if (streamInfo != null) {
callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo);
}else {
dynamicTask.stop(talkKey);
callback.run(ErrorCode.ERROR100.getCode(), "超时", null);
}
}, 7000);
WVPResult<String> result = addStreamProxyToZlm(param);
if (result != null && result.getCode() == 0) {
hookSubscribe.removeSubscribe(hook);
dynamicTask.stop(talkKey);
StreamInfo streamInfo = mediaServerService.getStreamInfoByAppAndStream(
mediaServer, param.getApp(), param.getStream(), null, null);
callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo);
}else {
param.setEnable(false);
// 直接移除
if (param.isEnableRemoveNoneReader()) {
del(param.getApp(), param.getStream());
}else {
updateStreamProxy(param);
}
if (result == null){
callback.run(ErrorCode.ERROR100.getCode(), "记录已保存,启用失败", null);
}else {
callback.run(ErrorCode.ERROR100.getCode(), result.getMsg(), null);
}
}
}
else{
StreamInfo streamInfo = mediaServerService.getStreamInfoByAppAndStream(
mediaServer, param.getApp(), param.getStream(), null, null);
callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo);
}
}
private String getSchemaFromFFmpegCmd(String ffmpegCmd) {
ffmpegCmd = ffmpegCmd.replaceAll(" + ", " ");
String[] paramArray = ffmpegCmd.split(" ");
if (paramArray.length == 0) {
return null;
}
for (int i = 0; i < paramArray.length; i++) {
if (paramArray[i].equalsIgnoreCase("-f")) {
if (i + 1 < paramArray.length - 1) {
return paramArray[i+1];
}else {
return null;
}
}
}
return null;
}
/**
* 新增代理流
* @param streamProxyItem
* @return
*/
private boolean addStreamProxy(StreamProxy streamProxyItem) {
TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition);
boolean result = false;
streamProxyItem.setStreamType("proxy");
streamProxyItem.setStatus(true);
String now = DateUtil.getNow();
streamProxyItem.setCreateTime(now);
try {
if (streamProxyMapper.add(streamProxyItem) > 0) {
if (!ObjectUtils.isEmpty(streamProxyItem.getGbId())) {
if (gbStreamMapper.add(streamProxyItem) < 0) {
//事务回滚
dataSourceTransactionManager.rollback(transactionStatus);
return false;
}
}
}else {
//事务回滚
dataSourceTransactionManager.rollback(transactionStatus);
return false;
}
result = true;
dataSourceTransactionManager.commit(transactionStatus); //手动提交
}catch (Exception e) {
logger.error("向数据库添加流代理失败:", e);
dataSourceTransactionManager.rollback(transactionStatus);
}
return result;
}
/**
* 更新代理流
* @param streamProxyItem
* @return
*/
@Override
public boolean updateStreamProxy(StreamProxy streamProxyItem) {
TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition);
boolean result = false;
streamProxyItem.setStreamType("proxy");
try {
if (streamProxyMapper.update(streamProxyItem) > 0) {
if (!ObjectUtils.isEmpty(streamProxyItem.getGbId())) {
if (gbStreamMapper.updateByAppAndStream(streamProxyItem) == 0) {
//事务回滚
dataSourceTransactionManager.rollback(transactionStatus);
return false;
}
}
} else {
//事务回滚
dataSourceTransactionManager.rollback(transactionStatus);
return false;
}
dataSourceTransactionManager.commit(transactionStatus); //手动提交
result = true;
}catch (Exception e) {
logger.error("未处理的异常 ", e);
dataSourceTransactionManager.rollback(transactionStatus);
}
return result;
}
@Override
public WVPResult<String> addStreamProxyToZlm(StreamProxy param) {
WVPResult<String> result = null;
MediaServer mediaServer = null;
if (param.getMediaServerId() == null) {
logger.warn("添加代理时MediaServerId 为null");
return null;
}else {
mediaServer = mediaServerService.getOne(param.getMediaServerId());
}
if (mediaServer == null) {
return null;
}
if (mediaServerService.isStreamReady(mediaServer, param.getApp(), param.getStream())) {
mediaServerService.closeStreams(mediaServer, param.getApp(), param.getStream());
}
String msgResult;
if ("ffmpeg".equalsIgnoreCase(param.getType())){
if (param.getTimeoutMs() == 0) {
param.setTimeoutMs(15);
}
result = mediaServerService.addFFmpegSource(mediaServer, param.getSrcUrl().trim(), param.getDstUrl(),
param.getTimeoutMs(), param.isEnableAudio(), param.isEnableMp4(),
param.getFfmpegCmdKey());
}else {
result = mediaServerService.addStreamProxy(mediaServer, param.getApp(), param.getStream(), param.getUrl().trim(),
param.isEnableAudio(), param.isEnableMp4(), param.getRtpType());
}
if (result != null && result.getCode() == 0) {
String key = result.getData();
if (key == null) {
logger.warn("[获取拉流代理的结果数据Data] 失败: {}", result );
return result;
}
param.setStreamKey(key);
streamProxyMapper.update(param);
}
return result;
}
@Override
public Boolean removeStreamProxyFromZlm(StreamProxy param) {
if (param ==null) {
return null;
}
MediaServer mediaServer = mediaServerService.getOne(param.getMediaServerId());
if (mediaServer == null) {
return null;
}
List<StreamInfo> mediaList = mediaServerService.getMediaList(mediaServer, param.getApp(), param.getStream(), null);
if (mediaList == null || mediaList.isEmpty()) {
return true;
}
Boolean result = false;
if ("ffmpeg".equalsIgnoreCase(param.getType())){
result = mediaServerService.delFFmpegSource(mediaServer, param.getStreamKey());
}else {
result = mediaServerService.delStreamProxy(mediaServer, param.getStreamKey());
}
return result;
}
@Override
public PageInfo<StreamProxy> getAll(Integer page, Integer count) {
PageHelper.startPage(page, count);
List<StreamProxy> all = streamProxyMapper.selectAll();
return new PageInfo<>(all);
}
@Override
public void del(String app, String stream) {
StreamProxy streamProxyItem = streamProxyMapper.selectOne(app, stream);
if (streamProxyItem != null) {
gbStreamService.sendCatalogMsg(streamProxyItem, CatalogEvent.DEL);
// 如果关联了国标那么移除关联
platformGbStreamMapper.delByAppAndStream(app, stream);
gbStreamMapper.del(app, stream);
streamProxyMapper.del(app, stream);
redisCatchStorage.removeStream(streamProxyItem.getMediaServerId(), "PULL", app, stream);
redisCatchStorage.removeStream(streamProxyItem.getMediaServerId(), "PUSH", app, stream);
Boolean result = removeStreamProxyFromZlm(streamProxyItem);
if (result != null && result) {
logger.info("[移除代理] 代理: {}/{}, 从zlm移除成功", app, stream);
}else {
logger.info("[移除代理] 代理: {}/{}, 从zlm移除失败", app, stream);
}
}
}
@Override
public boolean start(String app, String stream) {
boolean result = false;
StreamProxy streamProxy = streamProxyMapper.selectOne(app, stream);
if (streamProxy != null && !streamProxy.isEnable() ) {
WVPResult<String> wvpResult = addStreamProxyToZlm(streamProxy);
if (wvpResult == null) {
return false;
}
if (wvpResult.getCode() == 0) {
result = true;
streamProxy.setEnable(true);
updateStreamProxy(streamProxy);
}else {
logger.info("启用代理失败: {}/{}->{}({})", app, stream, wvpResult.getMsg(),
streamProxy.getSrcUrl() == null? streamProxy.getUrl():streamProxy.getSrcUrl());
}
} else if (streamProxy != null && streamProxy.isEnable()) {
return true ;
}
return result;
}
@Override
public boolean stop(String app, String stream) {
boolean result = false;
StreamProxy streamProxyDto = streamProxyMapper.selectOne(app, stream);
if (streamProxyDto != null && streamProxyDto.isEnable()) {
Boolean removed = removeStreamProxyFromZlm(streamProxyDto);
if (removed != null && removed) {
streamProxyDto.setEnable(false);
result = updateStreamProxy(streamProxyDto);
}
}
return result;
}
@Override
public Map<String, String> getFFmpegCMDs(MediaServer mediaServer) {
return mediaServerService.getFFmpegCMDs(mediaServer);
}
@Override
public StreamProxy getStreamProxyByAppAndStream(String app, String streamId) {
return streamProxyMapper.selectOne(app, streamId);
}
@Override
public void zlmServerOnline(String mediaServerId) {
// 移除开启了无人观看自动移除的流
List<StreamProxy> streamProxyItemList = streamProxyMapper.selectAutoRemoveItemByMediaServerId(mediaServerId);
if (streamProxyItemList.size() > 0) {
gbStreamMapper.batchDel(streamProxyItemList);
}
streamProxyMapper.deleteAutoRemoveItemByMediaServerId(mediaServerId);
// 移除拉流代理生成的流信息
syncPullStream(mediaServerId);
// 恢复流代理, 只查找这个这个流媒体
List<StreamProxy> streamProxyListForEnable = storager.getStreamProxyListForEnableInMediaServer(
mediaServerId, true);
for (StreamProxy streamProxyDto : streamProxyListForEnable) {
logger.info("恢复流代理," + streamProxyDto.getApp() + "/" + streamProxyDto.getStream());
WVPResult<String> wvpResult = addStreamProxyToZlm(streamProxyDto);
if (wvpResult == null) {
// 设置为离线
logger.info("恢复流代理失败" + streamProxyDto.getApp() + "/" + streamProxyDto.getStream());
updateStatusByAppAndStream(streamProxyDto.getApp(), streamProxyDto.getStream(), false);
}else {
updateStatusByAppAndStream(streamProxyDto.getApp(), streamProxyDto.getStream(), true);
}
}
}
@Override
public void zlmServerOffline(String mediaServerId) {
// 移除开启了无人观看自动移除的流
List<StreamProxy> streamProxyItemList = streamProxyMapper.selectAutoRemoveItemByMediaServerId(mediaServerId);
if (streamProxyItemList.size() > 0) {
gbStreamMapper.batchDel(streamProxyItemList);
}
streamProxyMapper.deleteAutoRemoveItemByMediaServerId(mediaServerId);
// 其他的流设置离线
streamProxyMapper.updateStatusByMediaServerId(mediaServerId, false);
String type = "PULL";
// 发送redis消息
List<MediaInfo> mediaInfoList = redisCatchStorage.getStreams(mediaServerId, type);
if (mediaInfoList.size() > 0) {
for (MediaInfo mediaInfo : mediaInfoList) {
JSONObject jsonObject = new JSONObject();
jsonObject.put("serverId", userSetting.getServerId());
jsonObject.put("app", mediaInfo.getApp());
jsonObject.put("stream", mediaInfo.getStream());
jsonObject.put("register", false);
jsonObject.put("mediaServerId", mediaServerId);
redisCatchStorage.sendStreamChangeMsg(type, jsonObject);
// 移除redis内流的信息
redisCatchStorage.removeStream(mediaServerId, type, mediaInfo.getApp(), mediaInfo.getStream());
}
}
}
@Override
public void clean() {
}
@Override
public int updateStatusByAppAndStream(String app, String stream, boolean status) {
// 状态变化时推送到国标上级
StreamProxy streamProxyItem = streamProxyMapper.selectOne(app, stream);
if (streamProxyItem == null) {
return 0;
}
int result = streamProxyMapper.updateStatus(app, stream, status);
if (!ObjectUtils.isEmpty(streamProxyItem.getGbId())) {
gbStreamService.sendCatalogMsg(streamProxyItem, status?CatalogEvent.ON:CatalogEvent.OFF);
}
return result;
}
private void syncPullStream(String mediaServerId){
MediaServer mediaServer = mediaServerService.getOne(mediaServerId);
if (mediaServer != null) {
List<MediaInfo> mediaInfoList = redisCatchStorage.getStreams(mediaServerId, "PULL");
if (!mediaInfoList.isEmpty()) {
List<StreamInfo> mediaList = mediaServerService.getMediaList(mediaServer, null, null, null);
Map<String, StreamInfo> stringStreamInfoMap = new HashMap<>();
if (mediaList != null && !mediaList.isEmpty()) {
for (StreamInfo streamInfo : mediaList) {
stringStreamInfoMap.put(streamInfo.getApp() + streamInfo.getStream(), streamInfo);
}
}
if (stringStreamInfoMap.isEmpty()) {
redisCatchStorage.removeStream(mediaServerId, "PULL");
}else {
for (String key : stringStreamInfoMap.keySet()) {
StreamInfo streamInfo = stringStreamInfoMap.get(key);
if (stringStreamInfoMap.get(streamInfo.getApp() + streamInfo.getStream()) == null) {
redisCatchStorage.removeStream(mediaServerId, "PULL", streamInfo.getApp(),
streamInfo.getStream());
}
}
}
}
}
}
@Override
public ResourceBaseInfo getOverview() {
int total = streamProxyMapper.getAllCount();
int online = streamProxyMapper.getOnline();
return new ResourceBaseInfo(total, online);
}
@Scheduled(cron = "* 0/10 * * * ?")
public void asyncCheckStreamProxyStatus() {
List<MediaServer> all = mediaServerService.getAllOnline();
if (CollectionUtils.isEmpty(all)){
return;
}
Map<String, MediaServer> serverItemMap = all.stream().collect(Collectors.toMap(MediaServer::getId, Function.identity(), (m1, m2) -> m1));
List<StreamProxy> list = streamProxyMapper.selectForEnable(true);
if (CollectionUtils.isEmpty(list)){
return;
}
for (StreamProxy streamProxyItem : list) {
MediaServer mediaServerItem = serverItemMap.get(streamProxyItem.getMediaServerId());
MediaInfo mediaInfo = mediaServerService.getMediaInfo(mediaServerItem, streamProxyItem.getApp(), streamProxyItem.getStream());
if (mediaInfo == null){
streamProxyItem.setStatus(false);
} else {
if (mediaInfo.getOnline() != null && mediaInfo.getOnline()) {
streamProxyItem.setStatus(true);
} else {
streamProxyItem.setStatus(false);
}
}
updateStreamProxy(streamProxyItem);
}
}
}