优化国标级联发流并发能力

This commit is contained in:
648540858
2022-11-16 09:39:27 +08:00
parent 2466a24860
commit 694076dc8c
26 changed files with 277 additions and 166 deletions

View File

@@ -626,6 +626,32 @@ public class ZLMHttpHookListener {
return ret;
}
/**
* rtpServer收流超时
*/
@ResponseBody
@PostMapping(value = "/on_rtp_server_timeout", produces = "application/json;charset=UTF-8")
public JSONObject onRtpServerTimeout(HttpServletRequest request, @RequestBody OnRtpServerTimeoutHookParam param){
System.out.println(param);
logger.info("[ZLM HOOK] rtpServer收流超时{}->{}({})", param.getMediaServerId(), param.getStream_id(), param.getSsrc());
JSONObject ret = new JSONObject();
ret.put("code", 0);
ret.put("msg", "success");
taskExecutor.execute(()->{
JSONObject json = (JSONObject) JSON.toJSON(param);
List<ZlmHttpHookSubscribe.Event> subscribes = this.subscribe.getSubscribes(HookType.on_rtp_server_timeout);
if (subscribes != null && subscribes.size() > 0) {
for (ZlmHttpHookSubscribe.Event subscribe : subscribes) {
subscribe.response(null, json);
}
}
});
return ret;
}
private Map<String, String> urlParamToMap(String params) {
HashMap<String, String> map = new HashMap<>();
if (ObjectUtils.isEmpty(params)) {

View File

@@ -1,15 +1,15 @@
package com.genersoft.iot.vmp.media.zlm;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONArray;
import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.media.zlm.dto.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils;
import java.util.*;
@@ -24,6 +24,9 @@ public class ZLMRTPServerFactory {
@Autowired
private UserSetting userSetting;
@Autowired
private ZlmHttpHookSubscribe hookSubscribe;
private int[] portRangeArray = new int[2];
public int getFreePort(MediaServerItem mediaServerItem, int startPort, int endPort, List<Integer> usedFreelist) {
@@ -141,7 +144,7 @@ public class ZLMRTPServerFactory {
return result;
}
public boolean closeRTPServer(MediaServerItem serverItem, String streamId) {
public boolean closeRtpServer(MediaServerItem serverItem, String streamId) {
boolean result = false;
if (serverItem !=null){
Map<String, Object> param = new HashMap<>();
@@ -161,32 +164,6 @@ public class ZLMRTPServerFactory {
return result;
}
// private int getPortFromportRange(MediaServerItem mediaServerItem) {
// int currentPort = mediaServerItem.getCurrentPort();
// if (currentPort == 0) {
// String[] portRangeStrArray = mediaServerItem.getSendRtpPortRange().split(",");
// if (portRangeStrArray.length != 2) {
// portRangeArray[0] = 30000;
// portRangeArray[1] = 30500;
// }else {
// portRangeArray[0] = Integer.parseInt(portRangeStrArray[0]);
// portRangeArray[1] = Integer.parseInt(portRangeStrArray[1]);
// }
// }
//
// if (currentPort == 0 || currentPort++ > portRangeArray[1]) {
// currentPort = portRangeArray[0];
// mediaServerItem.setCurrentPort(currentPort);
// return portRangeArray[0];
// } else {
// if (currentPort % 2 == 1) {
// currentPort++;
// }
// currentPort++;
// mediaServerItem.setCurrentPort(currentPort);
// return currentPort;
// }
// }
/**
* 创建一个国标推流
@@ -200,21 +177,15 @@ public class ZLMRTPServerFactory {
*/
public SendRtpItem createSendRtpItem(MediaServerItem serverItem, String ip, int port, String ssrc, String platformId, String deviceId, String channelId, boolean tcp){
// 使用RTPServer 功能找一个可用的端口
String sendRtpPortRange = serverItem.getSendRtpPortRange();
if (ObjectUtils.isEmpty(sendRtpPortRange)) {
return null;
}
String[] portRangeStrArray = serverItem.getSendRtpPortRange().split(",");
int localPort = -1;
if (portRangeStrArray.length != 2) {
localPort = getFreePort(serverItem, 30000, 30500, null);
}else {
localPort = getFreePort(serverItem, Integer.parseInt(portRangeStrArray[0]), Integer.parseInt(portRangeStrArray[1]), null);
}
if (localPort == -1) {
logger.error("没有可用的端口");
return null;
// 默认为随机端口
int localPort = 0;
if (userSetting.getGbSendStreamStrict()) {
if (userSetting.getGbSendStreamStrict()) {
localPort = keepPort(serverItem, ssrc);
if (localPort == 0) {
return null;
}
}
}
SendRtpItem sendRtpItem = new SendRtpItem();
sendRtpItem.setIp(ip);
@@ -242,21 +213,13 @@ public class ZLMRTPServerFactory {
* @return SendRtpItem
*/
public SendRtpItem createSendRtpItem(MediaServerItem serverItem, String ip, int port, String ssrc, String platformId, String app, String stream, String channelId, boolean tcp){
// 使用RTPServer 功能找一个可用的端口
String sendRtpPortRange = serverItem.getSendRtpPortRange();
if (ObjectUtils.isEmpty(sendRtpPortRange)) {
return null;
}
String[] portRangeStrArray = serverItem.getSendRtpPortRange().split(",");
int localPort = -1;
if (portRangeStrArray.length != 2) {
localPort = getFreePort(serverItem, 30000, 30500, null);
}else {
localPort = getFreePort(serverItem, Integer.parseInt(portRangeStrArray[0]), Integer.parseInt(portRangeStrArray[1]), null);
}
if (localPort == -1) {
logger.error("没有可用的端口");
return null;
// 默认为随机端口
int localPort = 0;
if (userSetting.getGbSendStreamStrict()) {
localPort = keepPort(serverItem, ssrc);
if (localPort == 0) {
return null;
}
}
SendRtpItem sendRtpItem = new SendRtpItem();
sendRtpItem.setIp(ip);
@@ -273,6 +236,42 @@ public class ZLMRTPServerFactory {
return sendRtpItem;
}
/**
* 保持端口,直到需要需要发流时再释放
*/
public int keepPort(MediaServerItem serverItem, String ssrc) {
int localPort = 0;
Map<String, Object> param = new HashMap<>(3);
param.put("port", 0);
param.put("enable_tcp", 1);
param.put("stream_id", ssrc);
JSONObject jsonObject = zlmresTfulUtils.openRtpServer(serverItem, param);
if (jsonObject.getInteger("code") == 0) {
localPort = jsonObject.getInteger("port");
HookSubscribeForRtpServerTimeout hookSubscribeForRtpServerTimeout = HookSubscribeFactory.on_rtp_server_timeout(ssrc, null, serverItem.getId());
// 订阅 zlm启动事件, 新的zlm也会从这里进入系统
hookSubscribe.addSubscribe(hookSubscribeForRtpServerTimeout,
(MediaServerItem mediaServerItem, JSONObject response)->{
logger.info("[上级点播] {}->监听端口到期继续保持监听", ssrc);
keepPort(serverItem, ssrc);
});
}
logger.info("[上级点播] {}->监听端口: {}", ssrc, localPort);
return localPort;
}
/**
* 释放保持的端口
*/
public boolean releasePort(MediaServerItem serverItem, String ssrc) {
logger.info("[上级点播] {}->释放监听端口,等待推流", ssrc);
boolean closeRTPServerResult = closeRtpServer(serverItem, ssrc);
HookSubscribeForRtpServerTimeout hookSubscribeForRtpServerTimeout = HookSubscribeFactory.on_rtp_server_timeout(ssrc, null, serverItem.getId());
// 订阅 zlm启动事件, 新的zlm也会从这里进入系统
hookSubscribe.removeSubscribe(hookSubscribeForRtpServerTimeout);
return closeRTPServerResult;
}
/**
* 调用zlm RESTFUL API —— startSendRtp
*/
@@ -333,7 +332,7 @@ public class ZLMRTPServerFactory {
result= true;
logger.info("[停止RTP推流] 成功");
} else {
logger.error("[停止RTP推流] 失败: {}, 参数:{}->\r\n{}",jsonObject.getString("msg"),jsonObject.toJSONString(param));
logger.error("[停止RTP推流] 失败: {}, 参数:{}->\r\n{}",jsonObject.getString("msg"), JSON.toJSON(param), jsonObject);
}
return result;
}

View File

@@ -18,7 +18,11 @@ import org.springframework.core.annotation.Order;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import java.util.*;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
@Component
@Order(value=1)
@@ -73,8 +77,6 @@ public class ZLMRunner implements CommandLineRunner {
}
});
// 获取zlm信息
logger.info("[zlm] 等待默认zlm中...");
@@ -87,7 +89,7 @@ public class ZLMRunner implements CommandLineRunner {
}
for (MediaServerItem mediaServerItem : all) {
if (startGetMedia == null) {
startGetMedia = new HashMap<>();
startGetMedia = new ConcurrentHashMap<>();
}
startGetMedia.put(mediaServerItem.getId(), true);
connectZlmServer(mediaServerItem);
@@ -95,7 +97,7 @@ public class ZLMRunner implements CommandLineRunner {
}
String taskKey = "zlm-connect-timeout";
dynamicTask.startDelay(taskKey, ()->{
if (startGetMedia != null) {
if (startGetMedia != null && startGetMedia.size() > 0) {
Set<String> allZlmId = startGetMedia.keySet();
for (String id : allZlmId) {
logger.error("[ {} ]]主动连接失败,不再尝试连接", id);

View File

@@ -24,6 +24,17 @@ public class HookSubscribeFactory {
return hookSubscribe;
}
public static HookSubscribeForRtpServerTimeout on_rtp_server_timeout(String stream, String ssrc, String mediaServerId) {
HookSubscribeForRtpServerTimeout hookSubscribe = new HookSubscribeForRtpServerTimeout();
JSONObject subscribeKey = new com.alibaba.fastjson2.JSONObject();
subscribeKey.put("stream_id", stream);
subscribeKey.put("ssrc", ssrc);
subscribeKey.put("mediaServerId", mediaServerId);
hookSubscribe.setContent(subscribeKey);
return hookSubscribe;
}
public static HookSubscribeForServerStarted on_server_started() {
HookSubscribeForServerStarted hookSubscribe = new HookSubscribeForServerStarted();
hookSubscribe.setContent(new JSONObject());

View File

@@ -0,0 +1,44 @@
package com.genersoft.iot.vmp.media.zlm.dto;
import com.alibaba.fastjson2.JSONObject;
import com.alibaba.fastjson2.annotation.JSONField;
import java.time.Instant;
/**
* hook订阅-收流超时
* @author lin
*/
public class HookSubscribeForRtpServerTimeout implements IHookSubscribe{
private HookType hookType = HookType.on_rtp_server_timeout;
private JSONObject content;
@JSONField(format="yyyy-MM-dd HH:mm:ss")
private Instant expires;
@Override
public HookType getHookType() {
return hookType;
}
@Override
public JSONObject getContent() {
return content;
}
public void setContent(JSONObject content) {
this.content = content;
}
@Override
public Instant getExpires() {
return expires;
}
@Override
public void setExpires(Instant expires) {
this.expires = expires;
}
}

View File

@@ -15,6 +15,7 @@ public class HookSubscribeForStreamChange implements IHookSubscribe{
private JSONObject content;
@JSONField(format="yyyy-MM-dd HH:mm:ss")
private Instant expires;
@Override

View File

@@ -19,5 +19,7 @@ public enum HookType {
on_stream_none_reader,
on_stream_not_found,
on_server_started,
on_rtp_server_timeout,
on_server_keepalive
}

View File

@@ -65,9 +65,6 @@ public class MediaServerItem{
@Schema(description = "多端口RTP收流端口范围")
private String rtpPortRange;
@Schema(description = "RTP发流端口范围")
private String sendRtpPortRange;
@Schema(description = "assist服务端口")
private int recordAssistPort;
@@ -118,7 +115,6 @@ public class MediaServerItem{
hookAliveInterval = zlmServerConfig.getHookAliveInterval();
rtpEnable = false; // 默认使用单端口;直到用户自己设置开启多端口
rtpPortRange = zlmServerConfig.getPortRange().replace("_",","); // 默认使用30000,30500作为级联时发送流的端口号
sendRtpPortRange = "30000,30500"; // 默认使用30000,30500作为级联时发送流的端口号
recordAssistPort = 0; // 默认关闭
}
@@ -323,14 +319,6 @@ public class MediaServerItem{
this.lastKeepaliveTime = lastKeepaliveTime;
}
public String getSendRtpPortRange() {
return sendRtpPortRange;
}
public void setSendRtpPortRange(String sendRtpPortRange) {
this.sendRtpPortRange = sendRtpPortRange;
}
public Float getHookAliveInterval() {
return hookAliveInterval;
}

View File

@@ -0,0 +1,53 @@
package com.genersoft.iot.vmp.media.zlm.dto.hook;
/**
* zlm hook事件中的on_rtp_server_timeout事件的参数
* @author lin
*/
public class OnRtpServerTimeoutHookParam extends HookParam{
private int local_port;
private String stream_id;
private int tcpMode;
private boolean re_use_port;
private String ssrc;
public int getLocal_port() {
return local_port;
}
public void setLocal_port(int local_port) {
this.local_port = local_port;
}
public String getStream_id() {
return stream_id;
}
public void setStream_id(String stream_id) {
this.stream_id = stream_id;
}
public int getTcpMode() {
return tcpMode;
}
public void setTcpMode(int tcpMode) {
this.tcpMode = tcpMode;
}
public boolean isRe_use_port() {
return re_use_port;
}
public void setRe_use_port(boolean re_use_port) {
this.re_use_port = re_use_port;
}
public String getSsrc() {
return ssrc;
}
public void setSsrc(String ssrc) {
this.ssrc = ssrc;
}
}