优化发流逻辑

This commit is contained in:
648540858
2023-07-01 18:10:33 +08:00
parent c916433042
commit b64f320805
15 changed files with 222 additions and 169 deletions

View File

@@ -0,0 +1,55 @@
package com.genersoft.iot.vmp.media.zlm;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.media.zlm.dto.MediaSendRtpPortInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
@Component
public class SendRtpPortManager {
private final static Logger logger = LoggerFactory.getLogger(SendRtpPortManager.class);
@Autowired
private UserSetting userSetting;
@Autowired
private RedisTemplate<Object, Object> redisTemplate;
private final String KEY = "VM_MEDIA_SEND_RTP_PORT_RANGE_";
public void initServerPort(String mediaServerId, int startPort, int endPort){
String key = KEY + userSetting.getServerId() + "_" + mediaServerId;
MediaSendRtpPortInfo mediaSendRtpPortInfo = new MediaSendRtpPortInfo(startPort, endPort, mediaServerId);
redisTemplate.opsForValue().set(key, mediaSendRtpPortInfo);
}
public int getNextPort(String mediaServerId) {
String key = KEY + userSetting.getServerId() + "_" + mediaServerId;
MediaSendRtpPortInfo mediaSendRtpPortInfo = (MediaSendRtpPortInfo)redisTemplate.opsForValue().get(key);
if (mediaSendRtpPortInfo == null) {
logger.warn("[发送端口管理] 获取{}的发送端口时未找到端口信息", mediaSendRtpPortInfo);
return 0;
}
int port;
if (mediaSendRtpPortInfo.getCurrent() %2 != 0) {
port = mediaSendRtpPortInfo.getCurrent() + 1;
}else {
port = mediaSendRtpPortInfo.getCurrent() + 2;
}
if (port > mediaSendRtpPortInfo.getEnd()) {
if (mediaSendRtpPortInfo.getStart() %2 != 0) {
port = mediaSendRtpPortInfo.getStart() + 1;
}else {
port = mediaSendRtpPortInfo.getStart();
}
}
mediaSendRtpPortInfo.setCurrent(port);
redisTemplate.opsForValue().set(key, mediaSendRtpPortInfo);
return port;
}
}

View File

@@ -1,22 +1,18 @@
package com.genersoft.iot.vmp.media.zlm;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONArray;
import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.common.CommonCallback;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForRtpServerTimeout;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.media.zlm.dto.hook.HookParam;
import com.genersoft.iot.vmp.media.zlm.dto.hook.OnRtpServerTimeoutHookParam;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.*;
import java.util.HashMap;
import java.util.Map;
@Component
public class ZLMRTPServerFactory {
@@ -32,68 +28,9 @@ public class ZLMRTPServerFactory {
@Autowired
private ZlmHttpHookSubscribe hookSubscribe;
private int[] portRangeArray = new int[2];
@Autowired
private SendRtpPortManager sendRtpPortManager;
public int getFreePort(MediaServerItem mediaServerItem, int startPort, int endPort, List<Integer> usedFreelist) {
if (endPort <= startPort) {
return -1;
}
if (usedFreelist == null) {
usedFreelist = new ArrayList<>();
}
JSONObject listRtpServerJsonResult = zlmresTfulUtils.listRtpServer(mediaServerItem);
if (listRtpServerJsonResult != null) {
JSONArray data = listRtpServerJsonResult.getJSONArray("data");
if (data != null) {
for (int i = 0; i < data.size(); i++) {
JSONObject dataItem = data.getJSONObject(i);
usedFreelist.add(dataItem.getInteger("port"));
}
}
}
Map<String, Object> param = new HashMap<>();
int result = -1;
// 设置推流端口
if (startPort%2 == 1) {
startPort ++;
}
boolean checkPort = false;
for (int i = startPort; i < endPort + 1; i+=2) {
if (!usedFreelist.contains(i)){
checkPort = true;
startPort = i;
break;
}
}
if (!checkPort) {
logger.warn("未找到节点{}上范围[{}-{}]的空闲端口", mediaServerItem.getId(), startPort, endPort);
return -1;
}
param.put("port", startPort);
String stream = UUID.randomUUID().toString();
param.put("enable_tcp", 1);
param.put("stream_id", stream);
// param.put("port", 0);
JSONObject openRtpServerResultJson = zlmresTfulUtils.openRtpServer(mediaServerItem, param);
if (openRtpServerResultJson != null) {
if (openRtpServerResultJson.getInteger("code") == 0) {
result= openRtpServerResultJson.getInteger("port");
Map<String, Object> closeRtpServerParam = new HashMap<>();
closeRtpServerParam.put("stream_id", stream);
zlmresTfulUtils.closeRtpServer(mediaServerItem, closeRtpServerParam);
}else {
usedFreelist.add(startPort);
startPort +=2;
result = getFreePort(mediaServerItem, startPort, endPort,usedFreelist);
}
}else {
// 检查ZLM状态
logger.error("创建RTP Server 失败 {}: 请检查ZLM服务", param.get("port"));
}
return result;
}
/**
* 开启rtpServer
@@ -222,16 +159,14 @@ public class ZLMRTPServerFactory {
* @return SendRtpItem
*/
public SendRtpItem createSendRtpItem(MediaServerItem serverItem, String ip, int port, String ssrc, String platformId,
String deviceId, String channelId, boolean tcp, boolean rtcp, KeepPortCallback callback){
String deviceId, String channelId, boolean tcp, boolean rtcp){
// 默认为随机端口
int localPort = 0;
if (userSetting.getGbSendStreamStrict()) {
if (userSetting.getGbSendStreamStrict()) {
localPort = keepPort(serverItem, ssrc, localPort, callback);
if (localPort == 0) {
return null;
}
localPort = sendRtpPortManager.getNextPort(serverItem.getId());
if (localPort == 0) {
return null;
}
}
SendRtpItem sendRtpItem = new SendRtpItem();
@@ -261,11 +196,11 @@ public class ZLMRTPServerFactory {
* @return SendRtpItem
*/
public SendRtpItem createSendRtpItem(MediaServerItem serverItem, String ip, int port, String ssrc, String platformId,
String app, String stream, String channelId, boolean tcp, boolean rtcp, KeepPortCallback callback){
String app, String stream, String channelId, boolean tcp, boolean rtcp){
// 默认为随机端口
int localPort = 0;
if (userSetting.getGbSendStreamStrict()) {
localPort = keepPort(serverItem, ssrc, localPort, callback);
localPort = sendRtpPortManager.getNextPort(serverItem.getId());
if (localPort == 0) {
return null;
}
@@ -286,58 +221,6 @@ public class ZLMRTPServerFactory {
return sendRtpItem;
}
public interface KeepPortCallback{
Boolean keep(String ssrc);
}
/**
* 保持端口,直到需要需要发流时再释放
*/
public int keepPort(MediaServerItem serverItem, String ssrc, int localPort, KeepPortCallback keepPortCallback) {
Map<String, Object> param = new HashMap<>(3);
param.put("port", localPort);
param.put("enable_tcp", 1);
param.put("stream_id", ssrc);
JSONObject jsonObject = zlmresTfulUtils.openRtpServer(serverItem, param);
if (jsonObject.getInteger("code") == 0) {
localPort = jsonObject.getInteger("port");
HookSubscribeForRtpServerTimeout hookSubscribeForRtpServerTimeout = HookSubscribeFactory.on_rtp_server_timeout(ssrc, null, serverItem.getId());
// 订阅 zlm启动事件, 新的zlm也会从这里进入系统
Integer finalLocalPort = localPort;
hookSubscribe.addSubscribe(hookSubscribeForRtpServerTimeout,
(MediaServerItem mediaServerItem, HookParam hookParam)->{
logger.info("[上级点播] {}->监听端口到期继续保持监听: {}", ssrc, finalLocalPort);
OnRtpServerTimeoutHookParam rtpServerTimeoutHookParam = (OnRtpServerTimeoutHookParam) hookParam;
if (ssrc.equals(rtpServerTimeoutHookParam.getSsrc())) {
if (keepPortCallback.keep(ssrc)) {
logger.info("[上级点播] {}->监听端口到期继续保持监听", ssrc);
keepPort(serverItem, ssrc, finalLocalPort, keepPortCallback);
}else {
logger.info("[上级点播] {}->发送取消,无需继续监听", ssrc);
releasePort(serverItem, ssrc);
}
}
});
logger.info("[上级点播] {}->监听端口: {}", ssrc, localPort);
return localPort;
}else {
logger.info("[上级点播] 监听端口失败: {}->{}", ssrc, localPort);
return 0;
}
}
/**
* 释放保持的端口
*/
public boolean releasePort(MediaServerItem serverItem, String ssrc) {
logger.info("[上级点播] {}->释放监听端口", ssrc);
boolean closeRTPServerResult = closeRtpServer(serverItem, ssrc);
HookSubscribeForRtpServerTimeout hookSubscribeForRtpServerTimeout = HookSubscribeFactory.on_rtp_server_timeout(ssrc, null, serverItem.getId());
// 订阅 zlm启动事件, 新的zlm也会从这里进入系统
hookSubscribe.removeSubscribe(hookSubscribeForRtpServerTimeout);
return closeRTPServerResult;
}
/**
* 调用zlm RESTFUL API —— startSendRtp
*/

View File

@@ -0,0 +1,50 @@
package com.genersoft.iot.vmp.media.zlm.dto;
public class MediaSendRtpPortInfo {
private int start;
private int end;
private String mediaServerId;
private int current;
public MediaSendRtpPortInfo(int start, int end, String mediaServerId) {
this.start = start;
this.current = start;
this.end = end;
this.mediaServerId = mediaServerId;
}
public int getStart() {
return start;
}
public void setStart(int start) {
this.start = start;
}
public int getEnd() {
return end;
}
public void setEnd(int end) {
this.end = end;
}
public String getMediaServerId() {
return mediaServerId;
}
public void setMediaServerId(String mediaServerId) {
this.mediaServerId = mediaServerId;
}
public int getCurrent() {
return current;
}
public void setCurrent(int current) {
this.current = current;
}
}

View File

@@ -62,6 +62,9 @@ public class MediaServerItem{
@Schema(description = "多端口RTP收流端口范围")
private String rtpPortRange;
@Schema(description = "RTP发流端口范围")
private String sendRtpPortRange;
@Schema(description = "assist服务端口")
private int recordAssistPort;
@@ -297,4 +300,12 @@ public class MediaServerItem{
public void setHookAliveInterval(Float hookAliveInterval) {
this.hookAliveInterval = hookAliveInterval;
}
public String getSendRtpPortRange() {
return sendRtpPortRange;
}
public void setSendRtpPortRange(String sendRtpPortRange) {
this.sendRtpPortRange = sendRtpPortRange;
}
}