修复国标点播缓存启动未清理的BUG

This commit is contained in:
648540858
2024-10-25 17:41:50 +08:00
parent ef66bd8781
commit 059635c2cc
29 changed files with 256 additions and 350 deletions

View File

@@ -41,7 +41,7 @@ public class MediaServerConfig implements CommandLineRunner {
mediaServerService.update(mediaSerItemInConfig);
}else {
if (defaultMediaServer != null) {
mediaServerService.delete(defaultMediaServer.getId());
mediaServerService.delete(defaultMediaServer);
}
MediaServer mediaServerItem = mediaServerService.getOneFromDatabase(mediaSerItemInConfig.getId());
if (mediaServerItem == null) {

View File

@@ -1,24 +1,23 @@
package com.genersoft.iot.vmp.media.event.mediaServer;
import com.genersoft.iot.vmp.media.bean.MediaServer;
import lombok.Getter;
import lombok.Setter;
import org.springframework.context.ApplicationEvent;
public abstract class MediaServerEventAbstract extends ApplicationEvent {
private static final long serialVersionUID = 1L;
private String mediaServerId;
@Getter
@Setter
private MediaServer mediaServer;
public MediaServerEventAbstract(Object source) {
super(source);
}
public String getMediaServerId() {
return mediaServerId;
}
public void setMediaServerId(String mediaServerId) {
this.mediaServerId = mediaServerId;
}
}

View File

@@ -25,16 +25,16 @@ public class MediaServerStatusEventListener {
@Async("taskExecutor")
@EventListener
public void onApplicationEvent(MediaServerOnlineEvent event) {
log.info("[媒体节点] 上线 ID" + event.getMediaServerId());
playService.zlmServerOnline(event.getMediaServerId());
log.info("[媒体节点] 上线 ID" + event.getMediaServer().getId());
playService.zlmServerOnline(event.getMediaServer());
}
@Async("taskExecutor")
@EventListener
public void onApplicationEvent(MediaServerOfflineEvent event) {
log.info("[媒体节点] 离线ID" + event.getMediaServerId());
log.info("[媒体节点] 离线ID" + event.getMediaServer().getId());
// 处理ZLM离线
playService.zlmServerOffline(event.getMediaServerId());
playService.zlmServerOffline(event.getMediaServer());
}
}

View File

@@ -67,4 +67,7 @@ public interface IMediaNodeServerService {
StreamInfo startProxy(MediaServer mediaServer, StreamProxy streamProxy);
void stopProxy(MediaServer mediaServer, String streamKey);
List<String> listRtpServer(MediaServer mediaServer);
}

View File

@@ -65,7 +65,7 @@ public interface IMediaServerService {
boolean checkMediaRecordServer(String ip, int port);
void delete(String id);
void delete(MediaServer mediaServer);
MediaServer getDefaultMediaServer();
@@ -158,4 +158,5 @@ public interface IMediaServerService {
int createRTPServer(MediaServer mediaServerItem, String streamId, long ssrc, Integer port, boolean onlyAuto, boolean disableAudio, boolean reUsePort, Integer tcpMode);
List<String> listRtpServer(MediaServer mediaServer);
}

View File

@@ -81,6 +81,7 @@ public class MediaServerServiceImpl implements IMediaServerService {
@Autowired
private MediaConfig mediaConfig;
/**
* 流到来的处理
*/
@@ -216,6 +217,16 @@ public class MediaServerServiceImpl implements IMediaServerService {
return rtpServerPort;
}
@Override
public List<String> listRtpServer(MediaServer mediaServer) {
IMediaNodeServerService mediaNodeServerService = nodeServerServiceMap.get(mediaServer.getType());
if (mediaNodeServerService == null) {
log.info("[openRTPServer] 失败, mediaServer的类型 {},未找到对应的实现类", mediaServer.getType());
return new ArrayList<>();
}
return mediaNodeServerService.listRtpServer(mediaServer);
}
@Override
public void closeRTPServer(MediaServer mediaServer, String streamId) {
if (mediaServer == null) {
@@ -561,14 +572,14 @@ public class MediaServerServiceImpl implements IMediaServerService {
}
@Override
public void delete(String id) {
mediaServerMapper.delOne(id);
redisTemplate.opsForZSet().remove(VideoManagerConstants.ONLINE_MEDIA_SERVERS_PREFIX + userSetting.getServerId(), id);
String key = VideoManagerConstants.MEDIA_SERVER_PREFIX + userSetting.getServerId() + ":" + id;
public void delete(MediaServer mediaServer) {
mediaServerMapper.delOne(mediaServer.getId());
redisTemplate.opsForZSet().remove(VideoManagerConstants.ONLINE_MEDIA_SERVERS_PREFIX + userSetting.getServerId(), mediaServer.getId());
String key = VideoManagerConstants.MEDIA_SERVER_PREFIX + userSetting.getServerId() + ":" + mediaServer.getId();
redisTemplate.delete(key);
// 发送节点移除通知
MediaServerDeleteEvent event = new MediaServerDeleteEvent(this);
event.setMediaServerId(id);
event.setMediaServer(mediaServer);
applicationEventPublisher.publishEvent(event);
}
@@ -589,7 +600,7 @@ public class MediaServerServiceImpl implements IMediaServerService {
for (MediaServer mediaServer : allInCatch) {
// 清除数据中不存在但redis缓存数据
if (!mediaServerMap.containsKey(mediaServer.getId())) {
delete(mediaServer.getId());
delete(mediaServer);
}
}
}

View File

@@ -1,133 +0,0 @@
package com.genersoft.iot.vmp.media.zlm;
import com.genersoft.iot.vmp.common.VideoManagerConstants;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpInfo;
import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.utils.redis.RedisUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.math.NumberUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.support.atomic.RedisAtomicInteger;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@Slf4j
@Component
public class SendRtpPortManager {
@Autowired
private UserSetting userSetting;
@Autowired
private RedisTemplate<Object, Object> redisTemplate;
private final String KEY = "VM_MEDIA_SEND_RTP_PORT_";
public synchronized int getNextPort(MediaServer mediaServer) {
if (mediaServer == null) {
log.warn("[发送端口管理] 参数错误mediaServer为NULL");
return -1;
}
String sendIndexKey = KEY + userSetting.getServerId() + "_" + mediaServer.getId();
String key = VideoManagerConstants.SEND_RTP_INFO
+ userSetting.getServerId() + "_*";
List<Object> queryResult = RedisUtil.scan(redisTemplate, key);
Map<Integer, SendRtpInfo> sendRtpItemMap = new HashMap<>();
for (Object o : queryResult) {
SendRtpInfo sendRtpItem = (SendRtpInfo) redisTemplate.opsForValue().get(o);
if (sendRtpItem != null) {
sendRtpItemMap.put(sendRtpItem.getLocalPort(), sendRtpItem);
}
}
String sendRtpPortRange = mediaServer.getSendRtpPortRange();
int startPort;
int endPort;
if (sendRtpPortRange != null) {
String[] portArray = sendRtpPortRange.split(",");
if (portArray.length != 2 || !NumberUtils.isParsable(portArray[0]) || !NumberUtils.isParsable(portArray[1])) {
log.warn("{}发送端口配置格式错误自动使用50000-60000作为端口范围", mediaServer.getId());
startPort = 50000;
endPort = 60000;
}else {
if ( Integer.parseInt(portArray[1]) - Integer.parseInt(portArray[0]) < 1) {
log.warn("{}发送端口配置错误,结束端口至少比开始端口大一自动使用50000-60000作为端口范围", mediaServer.getId());
startPort = 50000;
endPort = 60000;
}else {
startPort = Integer.parseInt(portArray[0]);
endPort = Integer.parseInt(portArray[1]);
}
}
}else {
log.warn("{}未设置发送端口默认值自动使用50000-60000作为端口范围", mediaServer.getId());
startPort = 50000;
endPort = 60000;
}
if (redisTemplate == null || redisTemplate.getConnectionFactory() == null) {
log.warn("{}获取redis连接信息失败", mediaServer.getId());
return -1;
}
// RedisAtomicInteger redisAtomicInteger = new RedisAtomicInteger(sendIndexKey , redisTemplate.getConnectionFactory());
// return redisAtomicInteger.getAndUpdate((current)->{
// return getPort(current, startPort, endPort, checkPort-> !sendRtpItemMap.containsKey(checkPort));
// });
return getSendPort(startPort, endPort, sendIndexKey, sendRtpItemMap);
}
private synchronized int getSendPort(int startPort, int endPort, String sendIndexKey, Map<Integer, SendRtpInfo> sendRtpItemMap){
// TODO 这里改为只取偶数端口
RedisAtomicInteger redisAtomicInteger = new RedisAtomicInteger(sendIndexKey , redisTemplate.getConnectionFactory());
if (redisAtomicInteger.get() < startPort) {
redisAtomicInteger.set(startPort);
return startPort;
}else {
int port = redisAtomicInteger.getAndIncrement();
if (port > endPort) {
redisAtomicInteger.set(startPort);
if (sendRtpItemMap.containsKey(startPort)) {
return getSendPort(startPort, endPort, sendIndexKey, sendRtpItemMap);
}else {
return startPort;
}
}
if (sendRtpItemMap.containsKey(port)) {
return getSendPort(startPort, endPort, sendIndexKey, sendRtpItemMap);
}else {
return port;
}
}
}
interface CheckPortCallback{
boolean check(int port);
}
private int getPort(int current, int start, int end, CheckPortCallback checkPortCallback) {
if (current <= 0) {
if (start%2 == 0) {
current = start;
}else {
current = start + 1;
}
}else {
current += 2;
if (current > end) {
if (start%2 == 0) {
current = start;
}else {
current = start + 1;
}
}
}
if (!checkPortCallback.check(current)) {
return getPort(current + 2, start, end, checkPortCallback);
}
return current;
}
}

View File

@@ -20,10 +20,7 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.util.ObjectUtils;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.*;
@Slf4j
@Service("zlm")
@@ -136,9 +133,14 @@ public class ZLMMediaNodeServerService implements IMediaNodeServerService {
param.put("ssrc", ssrc);
}
JSONObject jsonObject = zlmresTfulUtils.stopSendRtp(mediaInfo, param);
log.info("停止发流结果: {}, 参数:{}", jsonObject.getString("msg"), JSON.toJSONString(param));
return true;
System.out.println(jsonObject);
if (jsonObject.getInteger("code") != null && jsonObject.getInteger("code") == 0) {
log.info("[停止发流] 成功: 参数:{}", JSON.toJSONString(param));
return true;
}else {
log.info("停止发流结果: {}, 参数:{}", jsonObject.getString("msg"), JSON.toJSONString(param));
return false;
}
}
@Override
@@ -496,4 +498,22 @@ public class ZLMMediaNodeServerService implements IMediaNodeServerService {
throw new ControllerException(jsonObject.getInteger("code"), jsonObject.getString("msg"));
}
}
@Override
public List<String> listRtpServer(MediaServer mediaServer) {
JSONObject jsonObject = zlmresTfulUtils.listRtpServer(mediaServer);
List<String> result = new ArrayList<>();
if (jsonObject == null || jsonObject.getInteger("code") != 0) {
return result;
}
JSONArray data = jsonObject.getJSONArray("data");
if (data == null || data.isEmpty()) {
return result;
}
for (int i = 0; i < data.size(); i++) {
JSONObject dataJSONObject = data.getJSONObject(i);
result.add(dataJSONObject.getString("stream_id"));
}
return result;
}
}

View File

@@ -112,13 +112,13 @@ public class ZLMMediaServerStatusManager {
@Async("taskExecutor")
@EventListener
public void onApplicationEvent(MediaServerDeleteEvent event) {
if (event.getMediaServerId() == null) {
if (event.getMediaServer() == null) {
return;
}
log.info("[ZLM-节点被移除] ID" + event.getMediaServerId());
offlineZlmPrimaryMap.remove(event.getMediaServerId());
offlineZlmsecondaryMap.remove(event.getMediaServerId());
offlineZlmTimeMap.remove(event.getMediaServerId());
log.info("[ZLM-节点被移除] ID" + event.getMediaServer().getId());
offlineZlmPrimaryMap.remove(event.getMediaServer().getId());
offlineZlmsecondaryMap.remove(event.getMediaServer().getId());
offlineZlmTimeMap.remove(event.getMediaServer().getId());
}
@Scheduled(fixedDelay = 10*1000) //每隔10秒检查一次
@@ -188,7 +188,7 @@ public class ZLMMediaServerStatusManager {
mediaServerItem.setHookAliveInterval(10F);
mediaServerService.update(mediaServerItem);
// 发送上线通知
eventPublisher.mediaServerOnlineEventPublish(mediaServerItem.getId());
eventPublisher.mediaServerOnlineEventPublish(mediaServerItem);
if(mediaServerItem.isAutoConfig()) {
if (config == null) {
JSONObject responseJSON = zlmresTfulUtils.getMediaServerConfig(mediaServerItem);
@@ -213,7 +213,7 @@ public class ZLMMediaServerStatusManager {
offlineZlmPrimaryMap.put(mediaServerItem.getId(), mediaServerItem);
offlineZlmTimeMap.put(mediaServerItem.getId(), System.currentTimeMillis());
// 发送离线通知
eventPublisher.mediaServerOfflineEventPublish(mediaServerItem.getId());
eventPublisher.mediaServerOfflineEventPublish(mediaServerItem);
mediaServerService.update(mediaServerItem);
}, (int)(mediaServerItem.getHookAliveInterval() * 2 * 1000));
}

View File

@@ -2,7 +2,6 @@ package com.genersoft.iot.vmp.media.zlm;
import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.common.CommonCallback;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpInfo;
import com.genersoft.iot.vmp.media.bean.MediaServer;
import lombok.extern.slf4j.Slf4j;
@@ -18,12 +17,6 @@ public class ZLMServerFactory {
@Autowired
private ZLMRESTfulUtils zlmresTfulUtils;
@Autowired
private UserSetting userSetting;
@Autowired
private SendRtpPortManager sendRtpPortManager;
/**
* 开启rtpServer