优化按需拉流配置,拉流代理支持按需拉流

This commit is contained in:
648540858
2022-10-17 16:38:28 +08:00
parent 1e1364e51a
commit 7918f03734
14 changed files with 91 additions and 82 deletions

View File

@@ -69,9 +69,6 @@ public class MediaConfig{
@Value("${media.secret}")
private String secret;
@Value("${media.stream-none-reader-delay-ms:15000}")
private int streamNoneReaderDelayMS = 15000;
@Value("${media.rtp.enable}")
private boolean rtpEnable;
@@ -151,10 +148,6 @@ public class MediaConfig{
return secret;
}
public int getStreamNoneReaderDelayMS() {
return streamNoneReaderDelayMS;
}
public boolean isRtpEnable() {
return rtpEnable;
}
@@ -219,7 +212,6 @@ public class MediaConfig{
mediaServerItem.setRtspSSLPort(rtspSSLPort);
mediaServerItem.setAutoConfig(autoConfig);
mediaServerItem.setSecret(secret);
mediaServerItem.setStreamNoneReaderDelayMS(streamNoneReaderDelayMS);
mediaServerItem.setRtpEnable(rtpEnable);
mediaServerItem.setRtpPortRange(rtpPortRange);
mediaServerItem.setSendRtpPortRange(sendRtpPortRange);

View File

@@ -33,6 +33,8 @@ public class UserSetting {
private Boolean usePushingAsStatus = Boolean.TRUE;
private Boolean streamOnDemand = Boolean.TRUE;
private String serverId = "000000";
private String thirdPartyGBIdReg = "[\\s\\S]*";
@@ -146,4 +148,12 @@ public class UserSetting {
public void setUsePushingAsStatus(Boolean usePushingAsStatus) {
this.usePushingAsStatus = usePushingAsStatus;
}
public Boolean getStreamOnDemand() {
return streamOnDemand;
}
public void setStreamOnDemand(Boolean streamOnDemand) {
this.streamOnDemand = streamOnDemand;
}
}

View File

@@ -558,9 +558,12 @@ public class ZLMHttpHookListener {
String app = json.getString("app");
JSONObject ret = new JSONObject();
ret.put("code", 0);
// 录像下载
ret.put("close", userSetting.getStreamOnDemand());
if ("rtp".equals(app)){
ret.put("close", true);
// 国标流, 点播/录像回放/录像下载
StreamInfo streamInfoForPlayCatch = redisCatchStorage.queryPlayByStreamId(streamId);
// 点播
if (streamInfoForPlayCatch != null) {
// 收到无人观看说明流也没有在往上级推送
if (redisCatchStorage.isChannelSendingRTP(streamInfoForPlayCatch.getChannelId())) {
@@ -590,40 +593,39 @@ public class ZLMHttpHookListener {
redisCatchStorage.stopPlay(streamInfoForPlayCatch);
storager.stopPlay(streamInfoForPlayCatch.getDeviceID(), streamInfoForPlayCatch.getChannelId());
}else{
StreamInfo streamInfoForPlayBackCatch = redisCatchStorage.queryPlayback(null, null, streamId, null);
if (streamInfoForPlayBackCatch != null ) {
if (streamInfoForPlayBackCatch.isPause()) {
ret.put("close", false);
}else {
Device device = deviceService.queryDevice(streamInfoForPlayBackCatch.getDeviceID());
if (device != null) {
try {
cmder.streamByeCmd(device,streamInfoForPlayBackCatch.getChannelId(),
streamInfoForPlayBackCatch.getStream(), null);
} catch (InvalidArgumentException | ParseException | SipException |
SsrcTransactionNotFoundException e) {
logger.error("[无人观看]回放, 发送BYE失败 {}", e.getMessage());
}
}
redisCatchStorage.stopPlayback(streamInfoForPlayBackCatch.getDeviceID(),
streamInfoForPlayBackCatch.getChannelId(), streamInfoForPlayBackCatch.getStream(), null);
}
return ret;
}
// 录像回放
StreamInfo streamInfoForPlayBackCatch = redisCatchStorage.queryPlayback(null, null, streamId, null);
if (streamInfoForPlayBackCatch != null ) {
if (streamInfoForPlayBackCatch.isPause()) {
ret.put("close", false);
}else {
StreamInfo streamInfoForDownload = redisCatchStorage.queryDownload(null, null, streamId, null);
// 进行录像下载时无人观看不断流
if (streamInfoForDownload != null) {
ret.put("close", false);
Device device = deviceService.queryDevice(streamInfoForPlayBackCatch.getDeviceID());
if (device != null) {
try {
cmder.streamByeCmd(device,streamInfoForPlayBackCatch.getChannelId(),
streamInfoForPlayBackCatch.getStream(), null);
} catch (InvalidArgumentException | ParseException | SipException |
SsrcTransactionNotFoundException e) {
logger.error("[无人观看]回放, 发送BYE失败 {}", e.getMessage());
}
}
redisCatchStorage.stopPlayback(streamInfoForPlayBackCatch.getDeviceID(),
streamInfoForPlayBackCatch.getChannelId(), streamInfoForPlayBackCatch.getStream(), null);
}
return ret;
}
MediaServerItem mediaServerItem = mediaServerService.getOne(mediaServerId);
if (mediaServerItem != null && mediaServerItem.getStreamNoneReaderDelayMS() == -1) {
// 录像下载
StreamInfo streamInfoForDownload = redisCatchStorage.queryDownload(null, null, streamId, null);
// 进行录像下载时无人观看不断流
if (streamInfoForDownload != null) {
ret.put("close", false);
return ret;
}
return ret;
}else {
// 非国标流 推流/拉流代理
// 拉流代理
StreamProxyItem streamProxyItem = streamProxyService.getStreamProxyByAppAndStream(app, streamId);
if (streamProxyItem != null ) {
if (streamProxyItem.isEnable_remove_none_reader()) {
@@ -635,12 +637,21 @@ public class ZLMHttpHookListener {
}else if (streamProxyItem.isEnable_disable_none_reader()) {
// 无人观看停用
ret.put("close", true);
// 修改数据
streamProxyService.stop(app, streamId);
}else {
ret.put("close", false);
}
return ret;
}
return ret;
// 推流具有主动性,暂时不做处理
// StreamPushItem streamPushItem = streamPushService.getPush(app, streamId);
// if (streamPushItem != null) {
// // TODO 发送停止
//
// }
}
return ret;
}
/**
@@ -655,19 +666,27 @@ public class ZLMHttpHookListener {
}
String mediaServerId = json.getString("mediaServerId");
MediaServerItem mediaInfo = mediaServerService.getOne(mediaServerId);
if (userSetting.isAutoApplyPlay() && mediaInfo != null && mediaInfo.isRtpEnable()) {
if (userSetting.isAutoApplyPlay() && mediaInfo != null) {
String app = json.getString("app");
String streamId = json.getString("stream");
if ("rtp".equals(app)) {
String[] s = streamId.split("_");
if (s.length == 2) {
String deviceId = s[0];
String channelId = s[1];
Device device = redisCatchStorage.getDevice(deviceId);
if (device != null) {
playService.play(mediaInfo,deviceId, channelId, null, null, null);
if (mediaInfo.isRtpEnable()) {
String[] s = streamId.split("_");
if (s.length == 2) {
String deviceId = s[0];
String channelId = s[1];
Device device = redisCatchStorage.getDevice(deviceId);
if (device != null) {
playService.play(mediaInfo,deviceId, channelId, null, null, null);
}
}
}
}else {
// 拉流代理
StreamProxyItem streamProxyByAppAndStream = streamProxyService.getStreamProxyByAppAndStream(app, streamId);
if (streamProxyByAppAndStream != null && streamProxyByAppAndStream.isEnable_disable_none_reader()) {
streamProxyService.start(app, streamId);
}
}
}

View File

@@ -54,9 +54,6 @@ public class MediaServerItem{
@Schema(description = "ZLM鉴权参数")
private String secret;
@Schema(description = "某个流无人观看时触发hook.on_stream_none_reader事件的最大等待时间单位毫秒")
private int streamNoneReaderDelayMS;
@Schema(description = "keepalive hook触发间隔,单位秒")
private int hookAliveInterval;
@@ -119,7 +116,6 @@ public class MediaServerItem{
rtspSSLPort = zlmServerConfig.getRtspSSlport();
autoConfig = true; // 默认值true;
secret = zlmServerConfig.getApiSecret();
streamNoneReaderDelayMS = zlmServerConfig.getGeneralStreamNoneReaderDelayMS();
hookAliveInterval = zlmServerConfig.getHookAliveInterval();
rtpEnable = false; // 默认使用单端口;直到用户自己设置开启多端口
rtpPortRange = zlmServerConfig.getPortRange().replace("_",","); // 默认使用30000,30500作为级联时发送流的端口号
@@ -240,14 +236,6 @@ public class MediaServerItem{
this.secret = secret;
}
public int getStreamNoneReaderDelayMS() {
return streamNoneReaderDelayMS;
}
public void setStreamNoneReaderDelayMS(int streamNoneReaderDelayMS) {
this.streamNoneReaderDelayMS = streamNoneReaderDelayMS;
}
public boolean isRtpEnable() {
return rtpEnable;
}

View File

@@ -38,7 +38,7 @@ public class StreamProxyItem extends GbStream {
@Schema(description = "是否 无人观看时删除")
private boolean enable_remove_none_reader;
@Schema(description = "是否 无人观看时不启")
@Schema(description = "是否 无人观看时自动停")
private boolean enable_disable_none_reader;
@Schema(description = "上级平台国标ID")
private String platformGbId;

View File

@@ -541,7 +541,6 @@ public class MediaServerServiceImpl implements IMediaServerService {
param.put("hook.on_record_mp4","");
}
param.put("hook.timeoutSec","20");
param.put("general.streamNoneReaderDelayMS",mediaServerItem.getStreamNoneReaderDelayMS()==-1?"3600000":mediaServerItem.getStreamNoneReaderDelayMS() );
// 推流断开后可以在超时时间内重新连接上继续推流,这样播放器会接着播放。
// 置0关闭此特性(推流断开会导致立即断开播放器)
// 此参数不应大于播放器超时时间
@@ -606,7 +605,6 @@ public class MediaServerServiceImpl implements IMediaServerService {
mediaServerItem.setStreamIp(ip);
mediaServerItem.setHookIp(sipConfig.getIp());
mediaServerItem.setSdpIp(ip);
mediaServerItem.setStreamNoneReaderDelayMS(zlmServerConfig.getGeneralStreamNoneReaderDelayMS());
return mediaServerItem;
}

View File

@@ -26,7 +26,6 @@ public interface MediaServerMapper {
"rtspSSLPort, " +
"autoConfig, " +
"secret, " +
"streamNoneReaderDelayMS, " +
"rtpEnable, " +
"rtpPortRange, " +
"sendRtpPortRange, " +
@@ -51,7 +50,6 @@ public interface MediaServerMapper {
"${rtspSSLPort}, " +
"${autoConfig}, " +
"'${secret}', " +
"${streamNoneReaderDelayMS}, " +
"${rtpEnable}, " +
"'${rtpPortRange}', " +
"'${sendRtpPortRange}', " +
@@ -77,7 +75,6 @@ public interface MediaServerMapper {
"<if test=\"rtspPort != null\">, rtspPort=${rtspPort}</if>" +
"<if test=\"rtspSSLPort != null\">, rtspSSLPort=${rtspSSLPort}</if>" +
"<if test=\"autoConfig != null\">, autoConfig=${autoConfig}</if>" +
"<if test=\"streamNoneReaderDelayMS != null\">, streamNoneReaderDelayMS=${streamNoneReaderDelayMS}</if>" +
"<if test=\"rtpEnable != null\">, rtpEnable=${rtpEnable}</if>" +
"<if test=\"rtpPortRange != null\">, rtpPortRange='${rtpPortRange}'</if>" +
"<if test=\"sendRtpPortRange != null\">, sendRtpPortRange='${sendRtpPortRange}'</if>" +
@@ -102,7 +99,6 @@ public interface MediaServerMapper {
"<if test=\"rtspPort != null\">, rtspPort=${rtspPort}</if>" +
"<if test=\"rtspSSLPort != null\">, rtspSSLPort=${rtspSSLPort}</if>" +
"<if test=\"autoConfig != null\">, autoConfig=${autoConfig}</if>" +
"<if test=\"streamNoneReaderDelayMS != null\">, streamNoneReaderDelayMS=${streamNoneReaderDelayMS}</if>" +
"<if test=\"rtpEnable != null\">, rtpEnable=${rtpEnable}</if>" +
"<if test=\"rtpPortRange != null\">, rtpPortRange='${rtpPortRange}'</if>" +
"<if test=\"sendRtpPortRange != null\">, sendRtpPortRange='${sendRtpPortRange}'</if>" +

View File

@@ -11,10 +11,10 @@ import java.util.List;
public interface StreamProxyMapper {
@Insert("INSERT INTO stream_proxy (type, name, app, stream,mediaServerId, url, src_url, dst_url, " +
"timeout_ms, ffmpeg_cmd_key, rtp_type, enable_hls, enable_mp4, enable, status, enable_remove_none_reader, createTime) VALUES" +
"timeout_ms, ffmpeg_cmd_key, rtp_type, enable_hls, enable_mp4, enable, status, enable_remove_none_reader, enable_disable_none_reader, createTime) VALUES" +
"('${type}','${name}', '${app}', '${stream}', '${mediaServerId}','${url}', '${src_url}', '${dst_url}', " +
"'${timeout_ms}', '${ffmpeg_cmd_key}', '${rtp_type}', ${enable_hls}, ${enable_mp4}, ${enable}, ${status}, " +
"${enable_remove_none_reader}, '${createTime}' )")
"${enable_remove_none_reader}, ${enable_disable_none_reader}, '${createTime}' )")
int add(StreamProxyItem streamProxyDto);
@Update("UPDATE stream_proxy " +
@@ -33,6 +33,7 @@ public interface StreamProxyMapper {
"enable=#{enable}, " +
"status=#{status}, " +
"enable_remove_none_reader=#{enable_remove_none_reader}, " +
"enable_disable_none_reader=#{enable_disable_none_reader}, " +
"enable_mp4=#{enable_mp4} " +
"WHERE app=#{app} AND stream=#{stream}")
int update(StreamProxyItem streamProxyDto);