优化集群方案, 每个zlm一套ssrc;

优化集群下的docker接入逻辑;
更正sql脚本;
支持重启不设置设备离线。重启SIP事务不丢失
This commit is contained in:
64850858
2021-07-26 11:40:32 +08:00
parent 379830f7eb
commit 3469271ec2
57 changed files with 1318 additions and 1076 deletions

View File

@@ -3,8 +3,8 @@ package com.genersoft.iot.vmp.service;
import com.genersoft.iot.vmp.conf.MediaConfig;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.media.zlm.ZLMServerConfig;
import com.genersoft.iot.vmp.media.zlm.dto.IMediaServerItem;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.service.bean.SSRCInfo;
import java.util.List;
@@ -13,11 +13,13 @@ import java.util.List;
*/
public interface IMediaServerService {
List<IMediaServerItem> getAll();
List<MediaServerItem> getAll();
IMediaServerItem getOne(String generalMediaServerId);
List<MediaServerItem> getAllOnline();
IMediaServerItem getOneByHostAndPort(String host, int port);
MediaServerItem getOne(String generalMediaServerId);
MediaServerItem getOneByHostAndPort(String host, int port);
/**
* 新的节点加入
@@ -26,19 +28,27 @@ public interface IMediaServerService {
*/
void handLeZLMServerConfig(ZLMServerConfig zlmServerConfig);
void updateServerCatch(IMediaServerItem mediaServerItem, Integer count, Boolean b);
MediaServerItem getMediaServerForMinimumLoad();
IMediaServerItem getMediaServerForMinimumLoad();
void setZLMConfig(MediaServerItem mediaServerItem);
void setZLMConfig(IMediaServerItem mediaServerItem);
void init();
SSRCInfo openRTPServer(MediaServerItem mediaServerItem, String streamId);
void closeRTPServer(Device device, String channelId);
void update(MediaConfig mediaConfig);
void clearRTPServer(MediaServerItem mediaServerItem);
void update(MediaServerItem mediaSerItem);
void addCount(String mediaServerId);
void removeCount(String mediaServerId);
void releaseSsrc(MediaServerItem mediaServerItem, String ssrc);
void clearMediaServerForOnline();
void add(MediaServerItem mediaSerItem);
void resetOnlineServerItem(MediaServerItem serverItem);
}

View File

@@ -2,7 +2,6 @@ package com.genersoft.iot.vmp.service;
import com.alibaba.fastjson.JSONArray;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.media.zlm.dto.IMediaServerItem;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
/**
@@ -33,7 +32,7 @@ public interface IMediaService {
* @param stream
* @return
*/
StreamInfo getStreamInfoByAppAndStream(IMediaServerItem mediaServerItem, String app, String stream, JSONArray tracks);
StreamInfo getStreamInfoByAppAndStream(MediaServerItem mediaServerItem, String app, String stream, JSONArray tracks);
/**
* 根据应用名和流ID获取播放地址, 只是地址拼接返回的ip使用远程访问ip适用与zlm与wvp在一台主机的情况
@@ -41,5 +40,5 @@ public interface IMediaService {
* @param stream
* @return
*/
StreamInfo getStreamInfoByAppAndStream(IMediaServerItem mediaInfo, String app, String stream, JSONArray tracks, String addr);
StreamInfo getStreamInfoByAppAndStream(MediaServerItem mediaInfo, String app, String stream, JSONArray tracks, String addr);
}

View File

@@ -4,7 +4,6 @@ import com.alibaba.fastjson.JSONObject;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe;
import com.genersoft.iot.vmp.media.zlm.dto.IMediaServerItem;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.vmanager.gb28181.play.bean.PlayResult;
@@ -13,10 +12,10 @@ import com.genersoft.iot.vmp.vmanager.gb28181.play.bean.PlayResult;
*/
public interface IPlayService {
void onPublishHandlerForPlayBack(IMediaServerItem mediaServerItem,JSONObject resonse, String deviceId, String channelId, String uuid);
void onPublishHandlerForPlay(IMediaServerItem mediaServerItem, JSONObject resonse, String deviceId, String channelId, String uuid);
void onPublishHandlerForPlayBack(MediaServerItem mediaServerItem, JSONObject resonse, String deviceId, String channelId, String uuid);
void onPublishHandlerForPlay(MediaServerItem mediaServerItem, JSONObject resonse, String deviceId, String channelId, String uuid);
PlayResult play(IMediaServerItem mediaServerItem, String deviceId, String channelId, ZLMHttpHookSubscribe.Event event, SipSubscribe.Event errorEvent);
PlayResult play(MediaServerItem mediaServerItem, String deviceId, String channelId, ZLMHttpHookSubscribe.Event event, SipSubscribe.Event errorEvent);
IMediaServerItem getNewMediaServerItem(Device device);
MediaServerItem getNewMediaServerItem(Device device);
}

View File

@@ -1,7 +1,6 @@
package com.genersoft.iot.vmp.service;
import com.alibaba.fastjson.JSONObject;
import com.genersoft.iot.vmp.media.zlm.dto.IMediaServerItem;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem;
import com.github.pagehelper.PageInfo;
@@ -63,5 +62,5 @@ public interface IStreamProxyService {
* 获取ffmpeg.cmd模板
* @return
*/
JSONObject getFFmpegCMDs(IMediaServerItem mediaServerItem);
JSONObject getFFmpegCMDs(MediaServerItem mediaServerItem);
}

View File

@@ -1,7 +1,7 @@
package com.genersoft.iot.vmp.service;
import com.genersoft.iot.vmp.gb28181.bean.GbStream;
import com.genersoft.iot.vmp.media.zlm.dto.IMediaServerItem;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
import com.github.pagehelper.PageInfo;
@@ -9,7 +9,7 @@ import java.util.List;
public interface IStreamPushService {
List<StreamPushItem> handleJSON(String json, IMediaServerItem mediaServerItem);
List<StreamPushItem> handleJSON(String json, MediaServerItem mediaServerItem);
/**
* 将应用名和流ID加入国标关联

View File

@@ -0,0 +1,38 @@
package com.genersoft.iot.vmp.service.bean;
public class SSRCInfo {
private int port;
private String ssrc;
private String StreamId;
public SSRCInfo(int port, String ssrc, String streamId) {
this.port = port;
this.ssrc = ssrc;
StreamId = streamId;
}
public int getPort() {
return port;
}
public void setPort(int port) {
this.port = port;
}
public String getSsrc() {
return ssrc;
}
public void setSsrc(String ssrc) {
this.ssrc = ssrc;
}
public String getStreamId() {
return StreamId;
}
public void setStreamId(String streamId) {
StreamId = streamId;
}
}

View File

@@ -1,29 +1,32 @@
package com.genersoft.iot.vmp.service.impl;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.common.VideoManagerConstants;
import com.genersoft.iot.vmp.conf.MediaConfig;
import com.genersoft.iot.vmp.conf.ProxyServletConfig;
import com.genersoft.iot.vmp.conf.SipConfig;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.session.SsrcConfig;
import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils;
import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory;
import com.genersoft.iot.vmp.media.zlm.ZLMServerConfig;
import com.genersoft.iot.vmp.media.zlm.dto.IMediaServerItem;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.media.zlm.dto.ZLMRunInfo;
import com.genersoft.iot.vmp.service.IMediaServerService;
import com.genersoft.iot.vmp.service.bean.SSRCInfo;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.dao.MediaServerMapper;
import org.mitre.dsmiley.httpproxy.ProxyServlet;
import com.genersoft.iot.vmp.utils.redis.JedisUtil;
import com.genersoft.iot.vmp.utils.redis.RedisUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.CommandLineRunner;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Service;
import java.sql.Array;
import java.text.SimpleDateFormat;
import java.util.*;
@@ -31,15 +34,13 @@ import java.util.*;
* 媒体服务器节点管理
*/
@Service
public class MediaServerServiceImpl implements IMediaServerService {
@Order(value=2)
public class MediaServerServiceImpl implements IMediaServerService, CommandLineRunner {
private final static Logger logger = LoggerFactory.getLogger(MediaServerServiceImpl.class);
private Map<String, IMediaServerItem> zlmServers = new HashMap<>(); // 所有数据库的zlm的缓存
private Map<String, Integer> zlmServerStatus = new LinkedHashMap<>(); // 所有上线的zlm的缓存以及负载
@Value("${sip.ip}")
private String sipIp;
@Autowired
private SipConfig sipConfig;
@Value("${server.ssl.enabled:false}")
private boolean sslEnabled;
@@ -56,7 +57,6 @@ public class MediaServerServiceImpl implements IMediaServerService {
@Autowired
private MediaServerMapper mediaServerMapper;
@Autowired
private IRedisCatchStorage redisCatchStorage;
@@ -66,53 +66,134 @@ public class MediaServerServiceImpl implements IMediaServerService {
@Autowired
private ZLMRTPServerFactory zlmrtpServerFactory;
private SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
@Autowired
private RedisUtil redisUtil;
@Autowired
JedisUtil jedisUtil;
private final SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
/**
* 初始化
*/
@Override
public void init() {
zlmServers.clear();
zlmServerStatus.clear();
public void run(String... args) throws Exception {
logger.info("Media Server 缓存初始化");
List<MediaServerItem> mediaServerItemList = mediaServerMapper.queryAll();
for (IMediaServerItem mediaServerItem : mediaServerItemList) {
zlmServers.put(mediaServerItem.getId(), mediaServerItem);
for (MediaServerItem mediaServerItem : mediaServerItemList) {
// 更新
if (mediaServerItem.getSsrcConfig() == null) {
SsrcConfig ssrcConfig = new SsrcConfig(mediaServerItem.getId(), null, sipConfig.getSipDomain());
mediaServerItem.setSsrcConfig(ssrcConfig);
redisUtil.set(VideoManagerConstants.MEDIA_SERVER_PREFIX + mediaServerItem.getId(), mediaServerItem);
}
// 查询redis是否存在此mediaServer
String key = VideoManagerConstants.MEDIA_SERVER_PREFIX + mediaServerItem.getId();
if (!redisUtil.hasKey(key)) {
redisUtil.set(key, mediaServerItem);
}
}
}
@Override
public SSRCInfo openRTPServer(MediaServerItem mediaServerItem, String streamId) {
if (mediaServerItem == null || mediaServerItem.getId() == null) return null;
// 获取mediaServer可用的ssrc
String key = VideoManagerConstants.MEDIA_SERVER_PREFIX + mediaServerItem.getId();
SsrcConfig ssrcConfig = mediaServerItem.getSsrcConfig();
if (ssrcConfig == null) {
logger.info("media server [ {} ] ssrcConfig is null", mediaServerItem.getId());
return null;
}else {
String ssrc = ssrcConfig.getPlaySsrc();
if (streamId == null) streamId = String.format("%08x", Integer.parseInt(ssrc)).toUpperCase();
int rtpServerPort = mediaServerItem.getRtpProxyPort();
if (mediaServerItem.isRtpEnable()) {
rtpServerPort = zlmrtpServerFactory.createRTPServer(mediaServerItem, streamId);
}
redisUtil.set(key, mediaServerItem);
return new SSRCInfo(rtpServerPort, ssrc, streamId);
}
}
@Override
public void closeRTPServer(Device device, String channelId) {
StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(device.getDeviceId(), channelId);
IMediaServerItem mediaServerItem = null;
if (streamInfo != null) {
mediaServerItem = this.getOne (streamInfo.getMediaServerId());
String mediaServerId = streamSession.getMediaServerId(device.getDeviceId(), channelId);
MediaServerItem mediaServerItem = this.getOne(mediaServerId);
if (mediaServerItem != null) {
String streamId = String.format("gb_play_%s_%s", device.getDeviceId(), channelId);
zlmrtpServerFactory.closeRTPServer(mediaServerItem, streamId);
releaseSsrc(mediaServerItem, streamSession.getSSRC(device.getDeviceId(), channelId));
}
String streamId = String.format("gb_play_%s_%s", device.getDeviceId(), channelId);
zlmrtpServerFactory.closeRTPServer(mediaServerItem, streamId);
streamSession.remove(device.getDeviceId(), channelId);
}
@Override
public void update(MediaConfig mediaConfig) {
public void releaseSsrc(MediaServerItem mediaServerItem, String ssrc) {
if (mediaServerItem == null || ssrc == null) return;
SsrcConfig ssrcConfig = mediaServerItem.getSsrcConfig();
ssrcConfig.releaseSsrc(ssrc);
mediaServerItem.setSsrcConfig(ssrcConfig);
String key = VideoManagerConstants.MEDIA_SERVER_PREFIX + mediaServerItem.getId();
redisUtil.set(key, mediaServerItem);
}
/**
* zlm 重启后重置他的推流信息, TODO 给正在使用的设备发送停止命令
* @param mediaServerItem
*/
@Override
public void clearRTPServer(MediaServerItem mediaServerItem) {
mediaServerItem.setSsrcConfig(new SsrcConfig(mediaServerItem.getId(), null, sipConfig.getSipDomain()));
redisUtil.zAdd(VideoManagerConstants.MEDIA_SERVERS_ONLINE_PREFIX, mediaServerItem.getId(), 0);
}
@Override
public void update(MediaServerItem mediaSerItem) {
mediaServerMapper.update(mediaSerItem);
MediaServerItem mediaServerItemInRedis = getOne(mediaSerItem.getId());
MediaServerItem mediaServerItemInDataBase = mediaServerMapper.queryOne(mediaSerItem.getId());
if (mediaServerItemInRedis != null && mediaServerItemInRedis.getSsrcConfig() != null) {
mediaServerItemInDataBase.setSsrcConfig(mediaServerItemInRedis.getSsrcConfig());
}else {
mediaServerItemInDataBase.setSsrcConfig(
new SsrcConfig(
mediaServerItemInDataBase.getId(),
null,
sipConfig.getSipDomain()
)
);
}
String key = VideoManagerConstants.MEDIA_SERVER_PREFIX + mediaServerItemInDataBase.getId();
redisUtil.set(key, mediaServerItemInDataBase);
}
@Override
public List<IMediaServerItem> getAll() {
if (zlmServers.size() == 0) {
init();
}
List<IMediaServerItem> result = new ArrayList<>();
for (String id : zlmServers.keySet()) {
IMediaServerItem mediaServerItem = zlmServers.get(id);
mediaServerItem.setCount(zlmServerStatus.get(id) == null ? 0 : zlmServerStatus.get(id));
result.add(mediaServerItem);
public List<MediaServerItem> getAll() {
List<MediaServerItem> result = new ArrayList<>();
List<Object> mediaServerKeys = redisUtil.scan(String.format("%S*", VideoManagerConstants.MEDIA_SERVER_PREFIX));
for (int i = 0; i < mediaServerKeys.size(); i++) {
String key = (String) mediaServerKeys.get(i);
result.add((MediaServerItem)redisUtil.get(key));
}
return result;
}
// return mediaServerMapper.queryAll();
@Override
public List<MediaServerItem> getAllOnline() {
String key = VideoManagerConstants.MEDIA_SERVERS_ONLINE_PREFIX;
Set<String> mediaServerIdSet = redisUtil.zRevRange(key, 0, -1);
List<MediaServerItem> result = new ArrayList<>();
if (mediaServerIdSet != null && mediaServerIdSet.size() > 0) {
for (String mediaServerId : mediaServerIdSet) {
String serverKey = VideoManagerConstants.MEDIA_SERVER_PREFIX + mediaServerId;
result.add((MediaServerItem) redisUtil.get(serverKey));
}
}
return result;
}
/**
@@ -121,26 +202,28 @@ public class MediaServerServiceImpl implements IMediaServerService {
* @return MediaServerItem
*/
@Override
public IMediaServerItem getOne(String mediaServerId) {
if (mediaServerId ==null) return null;
IMediaServerItem mediaServerItem = zlmServers.get(mediaServerId);
if (mediaServerItem != null) {
mediaServerItem.setCount(zlmServerStatus.get(mediaServerId) == null ? 0 : zlmServerStatus.get(mediaServerId));
return mediaServerItem;
}else {
IMediaServerItem item = mediaServerMapper.queryOne(mediaServerId);
if (item != null) {
zlmServers.put(item.getId(), item);
}
return item;
}
public MediaServerItem getOne(String mediaServerId) {
if (mediaServerId == null) return null;
String key = VideoManagerConstants.MEDIA_SERVER_PREFIX + mediaServerId;
return (MediaServerItem)redisUtil.get(key);
}
@Override
public IMediaServerItem getOneByHostAndPort(String host, int port) {
public MediaServerItem getOneByHostAndPort(String host, int port) {
return mediaServerMapper.queryOneByHostAndPort(host, port);
}
@Override
public void clearMediaServerForOnline() {
String key = VideoManagerConstants.MEDIA_SERVERS_ONLINE_PREFIX;
redisUtil.del(key);
}
@Override
public void add(MediaServerItem mediaSerItem) {
mediaServerMapper.add(mediaSerItem);
}
/**
* 处理zlm上线
* @param zlmServerConfig zlm上线携带的参数
@@ -150,111 +233,100 @@ public class MediaServerServiceImpl implements IMediaServerService {
logger.info("[ {} ]-[ {}:{} ]已连接",
zlmServerConfig.getGeneralMediaServerId(), zlmServerConfig.getIp(), zlmServerConfig.getHttpPort());
IMediaServerItem serverItem = getOne(zlmServerConfig.getGeneralMediaServerId());
String now = this.format.format(new Date(System.currentTimeMillis()));
if (serverItem != null) {
serverItem.setSecret(zlmServerConfig.getApiSecret());
serverItem.setIp(zlmServerConfig.getIp());
MediaServerItem serverItem = mediaServerMapper.queryOne(zlmServerConfig.getGeneralMediaServerId());
if (serverItem == null) {
serverItem = mediaServerMapper.queryOneByHostAndPort(zlmServerConfig.getIp(), zlmServerConfig.getHttpPort());
}
if (zlmServerConfig.getGeneralMediaServerId().equals(mediaConfig.getId())
|| (zlmServerConfig.getIp().equals(mediaConfig.getIp()) && zlmServerConfig.getHttpPort() == mediaConfig.getHttpPort())) {
// 配置文件的zlm
// 如果是配置文件中的zlm。 也就是默认zlm。 一切以配置文件内容为准
// docker部署不会使用zlm配置的端口号;
// 直接编译部署的使用配置文件的端口号如果zlm修改配改了配置wvp自动修改
if (serverItem.getId().equals(mediaConfig.getId())
|| (serverItem.getIp().equals(mediaConfig.getIp()) && serverItem.getHttpPort() == mediaConfig.getHttpPort())) {
// 配置文件的zlm
mediaConfig.setId(zlmServerConfig.getGeneralMediaServerId());
mediaConfig.setUpdateTime(now);
if (mediaConfig.getHttpPort() == 0) mediaConfig.setHttpPort(zlmServerConfig.getHttpPort());
if (mediaConfig.getHttpSSlPort() == 0) mediaConfig.setHttpSSlPort(zlmServerConfig.getHttpSSLport());
if (mediaConfig.getRtmpPort() == 0) mediaConfig.setRtmpPort(zlmServerConfig.getRtmpPort());
if (mediaConfig.getRtmpSSlPort() == 0) mediaConfig.setRtmpSSlPort(zlmServerConfig.getRtmpSslPort());
if (mediaConfig.getRtspPort() == 0) mediaConfig.setRtspPort(zlmServerConfig.getRtspPort());
if (mediaConfig.getRtspSSLPort() == 0) mediaConfig.setRtspSSLPort(zlmServerConfig.getRtspSSlport());
if (mediaConfig.getRtpProxyPort() == 0) mediaConfig.setRtpProxyPort(zlmServerConfig.getRtpProxyPort());
mediaServerMapper.update(mediaConfig);
serverItem = mediaConfig.getMediaSerItem();
setZLMConfig(mediaConfig);
}else {
if (!serverItem.isDocker()) {
serverItem.setHttpPort(zlmServerConfig.getHttpPort());
serverItem.setHttpSSlPort(zlmServerConfig.getHttpSSLport());
serverItem.setRtmpPort(zlmServerConfig.getRtmpPort());
serverItem.setRtmpSSlPort(zlmServerConfig.getRtmpSslPort());
serverItem.setRtpProxyPort(zlmServerConfig.getRtpProxyPort());
serverItem.setRtspPort(zlmServerConfig.getRtspPort());
// docker部署不会使用zlm配置的端口号不是默认的则不做更新, 配置修改需要自行修改server配置;
MediaServerItem serverItemFromConfig = mediaConfig.getMediaSerItem();
serverItemFromConfig.setId(zlmServerConfig.getGeneralMediaServerId());
if (mediaConfig.getHttpPort() == 0) serverItemFromConfig.setHttpPort(zlmServerConfig.getHttpPort());
if (mediaConfig.getHttpSSlPort() == 0) serverItemFromConfig.setHttpSSlPort(zlmServerConfig.getHttpSSLport());
if (mediaConfig.getRtmpPort() == 0) serverItemFromConfig.setRtmpPort(zlmServerConfig.getRtmpPort());
if (mediaConfig.getRtmpSSlPort() == 0) serverItemFromConfig.setRtmpSSlPort(zlmServerConfig.getRtmpSslPort());
if (mediaConfig.getRtspPort() == 0) serverItemFromConfig.setRtspPort(zlmServerConfig.getRtspPort());
if (mediaConfig.getRtspSSLPort() == 0) serverItemFromConfig.setRtspSSLPort(zlmServerConfig.getRtspSSlport());
if (mediaConfig.getRtpProxyPort() == 0) serverItemFromConfig.setRtpProxyPort(zlmServerConfig.getRtpProxyPort());
if (serverItem != null){
// 可能是同一个zlm但id发生了变化
if (!serverItem.getId().equals(zlmServerConfig.getGeneralMediaServerId())) {
mediaServerMapper.delOne(serverItem.getId());
redisUtil.del(VideoManagerConstants.MEDIA_SERVER_PREFIX + serverItem.getId());
String key = VideoManagerConstants.MEDIA_SERVER_PREFIX + serverItemFromConfig.getId();
serverItemFromConfig.setSsrcConfig(new SsrcConfig(serverItemFromConfig.getId(), null, sipConfig.getSipDomain()));
redisUtil.set(key, serverItemFromConfig);
mediaServerMapper.add(serverItemFromConfig);
}else {
mediaServerMapper.update(serverItemFromConfig);
}
serverItem.setUpdateTime(now);
mediaServerMapper.update(serverItem);
}else {
String key = VideoManagerConstants.MEDIA_SERVER_PREFIX + serverItemFromConfig.getId();
serverItemFromConfig.setSsrcConfig(new SsrcConfig(serverItemFromConfig.getId(), null, sipConfig.getSipDomain()));
redisUtil.set(key, serverItemFromConfig);
mediaServerMapper.add(serverItemFromConfig);
}
resetOnlineServerItem(serverItemFromConfig);
setZLMConfig(serverItemFromConfig);
}else {
String now = this.format.format(new Date(System.currentTimeMillis()));
if (serverItem == null){
// 一个新的zlm接入wvp
serverItem = new MediaServerItem(zlmServerConfig, sipConfig.getSipIp());
serverItem.setCreateTime(now);
serverItem.setUpdateTime(now);
String key = VideoManagerConstants.MEDIA_SERVER_PREFIX + serverItem.getId();
serverItem.setSsrcConfig(new SsrcConfig(serverItem.getId(), null, sipConfig.getSipDomain()));
redisUtil.set(key, serverItem);
// 存入数据库
mediaServerMapper.add(serverItem);
setZLMConfig(serverItem);
}
}else {
if (zlmServerConfig.getGeneralMediaServerId().equals(mediaConfig.getId())
|| (zlmServerConfig.getIp().equals(mediaConfig.getIp()) && zlmServerConfig.getHttpPort() == mediaConfig.getHttpPort())) {
mediaConfig.setId(zlmServerConfig.getGeneralMediaServerId());
mediaConfig.setCreateTime(now);
mediaConfig.setUpdateTime(now);
serverItem = mediaConfig.getMediaSerItem();
mediaServerMapper.add(mediaConfig);
}else {
// 一个新的zlm接入wvp
serverItem = new MediaServerItem(zlmServerConfig, sipIp);
serverItem.setCreateTime(now);
serverItem.setUpdateTime(now);
mediaServerMapper.add(serverItem);
}
resetOnlineServerItem(serverItem);
}
// 更新缓存
if (zlmServerStatus.get(serverItem.getId()) == null) {
zlmServers.put(serverItem.getId(), serverItem);
zlmServerStatus.put(serverItem.getId(),0);
}
// 查询服务流数量
IMediaServerItem finalServerItem = serverItem;
zlmresTfulUtils.getMediaList(serverItem, null, null, "rtmp",(mediaList ->{
Integer code = mediaList.getInteger("code");
if (code == 0) {
JSONArray data = mediaList.getJSONArray("data");
if (data != null) {
zlmServerStatus.put(finalServerItem.getId(),data.size());
}else {
zlmServerStatus.put(finalServerItem.getId(),0);
}
}
}));
}
/**
* 更新缓存
* @param mediaServerItem zlm服务
* @param count 在线数
* @param online 在线状态
*/
@Override
public void updateServerCatch(IMediaServerItem mediaServerItem, Integer count, Boolean online) {
if (mediaServerItem != null) {
zlmServers.put(mediaServerItem.getId(), mediaServerItem);
Collection<Integer> values = zlmServerStatus.values();
if (online != null && count != null) {
zlmServerStatus.put(mediaServerItem.getId(), count);
}
public void resetOnlineServerItem(MediaServerItem serverItem) {
// 更新缓存
String key = VideoManagerConstants.MEDIA_SERVERS_ONLINE_PREFIX;
// 使用zset的分数作为当前并发量 默认值设置为0
if (redisUtil.zScore(key, serverItem.getId()) == null) { // 不存在则设置默认值 已存在则重置
redisUtil.zAdd(key, serverItem.getId(), 0L);
// 查询服务流数量
zlmresTfulUtils.getMediaList(serverItem, null, null, "rtmp",(mediaList ->{
Integer code = mediaList.getInteger("code");
if (code == 0) {
JSONArray data = mediaList.getJSONArray("data");
if (data != null) {
redisUtil.zAdd(key, serverItem.getId(), data.size());
}
}
}));
}else {
clearRTPServer(serverItem);
}
}
@Override
public void addCount(String mediaServerId) {
if (zlmServerStatus.get(mediaServerId) != null) {
zlmServerStatus.put(mediaServerId, zlmServerStatus.get(mediaServerId) + 1);
}
if (mediaServerId == null) return;
String key = VideoManagerConstants.MEDIA_SERVERS_ONLINE_PREFIX;
Double aDouble = redisUtil.zScore(key, mediaServerId);
redisUtil.zIncrScore(key, mediaServerId, 1);
}
@Override
public void removeCount(String mediaServerId) {
if (zlmServerStatus.get(mediaServerId) != null) {
zlmServerStatus.put(mediaServerId, zlmServerStatus.get(mediaServerId) - 1);
}
String key = VideoManagerConstants.MEDIA_SERVERS_ONLINE_PREFIX;
redisUtil.zIncrScore(key, mediaServerId, - 1);
}
/**
@@ -262,35 +334,18 @@ public class MediaServerServiceImpl implements IMediaServerService {
* @return MediaServerItem
*/
@Override
public IMediaServerItem getMediaServerForMinimumLoad() {
int mediaCount = -1;
String key = null;
System.out.println(JSON.toJSONString(zlmServerStatus));
if (zlmServerStatus.size() == 1) {
Map.Entry entry = zlmServerStatus.entrySet().iterator().next();
key= (String) entry.getKey();
}else {
for (String id : zlmServerStatus.keySet()) {
if (key == null) {
key = id;
mediaCount = zlmServerStatus.get(id);
}
if (zlmServerStatus.get(id) == 0) {
key = id;
break;
}else if (mediaCount >= zlmServerStatus.get(id)){
mediaCount = zlmServerStatus.get(id);
key = id;
}
}
public MediaServerItem getMediaServerForMinimumLoad() {
String key = VideoManagerConstants.MEDIA_SERVERS_ONLINE_PREFIX;
if (redisUtil.zSize(key) == null || redisUtil.zSize(key) == 0) {
logger.info("获取负载最低的节点时无在线节点");
}
if (key == null) {
logger.info("获取负载最低的节点时无在线节点");
return null;
}else{
return zlmServers.get(key);
}
// 获取分数最低的,及并发最低的
Set<Object> objects = redisUtil.ZRange(key, 0, -1);
ArrayList<Object> MediaServerObjectS = new ArrayList<>(objects);
String mediaServerId = (String)MediaServerObjectS.get(0);
return getOne(mediaServerId);
}
/**
@@ -298,7 +353,7 @@ public class MediaServerServiceImpl implements IMediaServerService {
* @param mediaServerItem 服务ID
*/
@Override
public void setZLMConfig(IMediaServerItem mediaServerItem) {
public void setZLMConfig(MediaServerItem mediaServerItem) {
logger.info("[ {} ]-[ {}:{} ]设置zlm",
mediaServerItem.getId(), mediaServerItem.getIp(), mediaServerItem.getHttpPort());
String protocol = sslEnabled ? "https" : "http";
@@ -333,8 +388,10 @@ public class MediaServerServiceImpl implements IMediaServerService {
logger.info("[ {} ]-[ {}:{} ]设置zlm成功",
mediaServerItem.getId(), mediaServerItem.getIp(), mediaServerItem.getHttpPort());
}else {
logger.info("[ {} ]-[ {}:{} ]设置zlm失败" + responseJSON.getString("msg"),
logger.info("[ {} ]-[ {}:{} ]设置zlm失败",
mediaServerItem.getId(), mediaServerItem.getIp(), mediaServerItem.getHttpPort());
}
}
}

View File

@@ -4,9 +4,7 @@ import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.media.zlm.ZLMServerConfig;
import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils;
import com.genersoft.iot.vmp.media.zlm.dto.IMediaServerItem;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.service.IMediaServerService;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
@@ -33,14 +31,14 @@ public class MediaServiceImpl implements IMediaService {
@Override
public StreamInfo getStreamInfoByAppAndStream(IMediaServerItem mediaInfo, String app, String stream, JSONArray tracks) {
public StreamInfo getStreamInfoByAppAndStream(MediaServerItem mediaInfo, String app, String stream, JSONArray tracks) {
return getStreamInfoByAppAndStream(mediaInfo, app, stream, tracks, null);
}
@Override
public StreamInfo getStreamInfoByAppAndStreamWithCheck(String app, String stream, String mediaServerId, String addr) {
StreamInfo streamInfo = null;
IMediaServerItem mediaInfo = mediaServerService.getOne(mediaServerId);
MediaServerItem mediaInfo = mediaServerService.getOne(mediaServerId);
if (mediaInfo == null) {
return streamInfo;
}
@@ -63,7 +61,7 @@ public class MediaServiceImpl implements IMediaService {
}
@Override
public StreamInfo getStreamInfoByAppAndStream(IMediaServerItem mediaInfo, String app, String stream, JSONArray tracks, String addr) {
public StreamInfo getStreamInfoByAppAndStream(MediaServerItem mediaInfo, String app, String stream, JSONArray tracks, String addr) {
StreamInfo streamInfoResult = new StreamInfo();
streamInfoResult.setStreamId(stream);
streamInfoResult.setApp(app);

View File

@@ -14,11 +14,12 @@ import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander;
import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe;
import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils;
import com.genersoft.iot.vmp.media.zlm.dto.IMediaServerItem;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.service.IMediaServerService;
import com.genersoft.iot.vmp.service.bean.SSRCInfo;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorager;
import com.genersoft.iot.vmp.utils.redis.RedisUtil;
import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
import com.genersoft.iot.vmp.vmanager.gb28181.play.bean.PlayResult;
import com.genersoft.iot.vmp.service.IMediaService;
@@ -53,6 +54,9 @@ public class PlayServiceImpl implements IPlayService {
@Autowired
private IRedisCatchStorage redisCatchStorage;
@Autowired
private RedisUtil redis;
@Autowired
private DeferredResultHolder resultHolder;
@@ -73,7 +77,7 @@ public class PlayServiceImpl implements IPlayService {
@Override
public PlayResult play(IMediaServerItem mediaServerItem, String deviceId, String channelId, ZLMHttpHookSubscribe.Event hookEvent, SipSubscribe.Event errorEvent) {
public PlayResult play(MediaServerItem mediaServerItem, String deviceId, String channelId, ZLMHttpHookSubscribe.Event hookEvent, SipSubscribe.Event errorEvent) {
PlayResult playResult = new PlayResult();
if (mediaServerItem == null) {
RequestMessage msg = new RequestMessage();
@@ -97,14 +101,21 @@ public class PlayServiceImpl implements IPlayService {
// 超时处理
result.onTimeout(()->{
logger.warn(String.format("设备点播超时deviceId%s channelId%s", deviceId, channelId));
// 释放rtpserver
mediaServerService.closeRTPServer(playResult.getDevice(), channelId);
RequestMessage msg = new RequestMessage();
msg.setId(DeferredResultHolder.CALLBACK_CMD_PlAY + playResult.getUuid());
WVPResult wvpResult = new WVPResult();
wvpResult.setCode(-1);
wvpResult.setMsg("Timeout");
SIPDialog dialog = streamSession.getDialog(deviceId, channelId);
if (dialog != null) {
wvpResult.setMsg("收流超时,请稍候重试");
}else {
wvpResult.setMsg("点播超时,请稍候重试");
}
msg.setData(wvpResult);
// 点播超时回复BYE
cmder.streamByeCmd(device.getDeviceId(), channelId);
// 释放rtpserver
mediaServerService.closeRTPServer(playResult.getDevice(), channelId);
resultHolder.invokeResult(msg);
});
result.onCompletion(()->{
@@ -131,7 +142,7 @@ public class PlayServiceImpl implements IPlayService {
WVPResult wvpResult = (WVPResult)responseEntity.getBody();
if (wvpResult.getCode() == 0) {
StreamInfo streamInfoForSuccess = (StreamInfo)wvpResult.getData();
IMediaServerItem mediaInfo = mediaServerService.getOne(streamInfoForSuccess.getMediaServerId());
MediaServerItem mediaInfo = mediaServerService.getOne(streamInfoForSuccess.getMediaServerId());
String streamUrl = streamInfoForSuccess.getFmp4();
// 请求截图
zlmresTfulUtils.getSnap(mediaInfo, streamUrl, 15, 1, path, fileName);
@@ -142,14 +153,23 @@ public class PlayServiceImpl implements IPlayService {
}
});
if (streamInfo == null) {
SSRCInfo ssrcInfo;
String streamId = null;
if (mediaServerItem.isRtpEnable()) {
streamId = String.format("gb_play_%s_%s", device.getDeviceId(), channelId);
}
ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId);
// 发送点播消息
cmder.playStreamCmd(mediaServerItem, device, channelId, (IMediaServerItem mediaServerItemInUse, JSONObject response) -> {
cmder.playStreamCmd(mediaServerItem, ssrcInfo, device, channelId, (MediaServerItem mediaServerItemInUse, JSONObject response) -> {
logger.info("收到订阅消息: " + response.toJSONString());
onPublishHandlerForPlay(mediaServerItemInUse, response, deviceId, channelId, uuid.toString());
if (hookEvent != null) {
hookEvent.response(mediaServerItem, response);
}
}, (event) -> {
// 点播返回sip错误
RequestMessage msg = new RequestMessage();
msg.setId(DeferredResultHolder.CALLBACK_CMD_PlAY + uuid);
Response response = event.getResponse();
@@ -162,6 +182,7 @@ public class PlayServiceImpl implements IPlayService {
if (errorEvent != null) {
errorEvent.response(event);
}
});
} else {
String streamId = streamInfo.getStreamId();
@@ -176,7 +197,7 @@ public class PlayServiceImpl implements IPlayService {
return playResult;
}
String mediaServerId = streamInfo.getMediaServerId();
IMediaServerItem mediaInfo = mediaServerService.getOne(mediaServerId);
MediaServerItem mediaInfo = mediaServerService.getOne(mediaServerId);
JSONObject rtpInfo = zlmresTfulUtils.getRtpInfo(mediaInfo, streamId);
if (rtpInfo != null && rtpInfo.getBoolean("exist")) {
@@ -194,9 +215,17 @@ public class PlayServiceImpl implements IPlayService {
hookEvent.response(mediaServerItem, JSONObject.parseObject(JSON.toJSONString(streamInfo)));
}
} else {
// TODO 点播前是否重置状态
redisCatchStorage.stopPlay(streamInfo);
storager.stopPlay(streamInfo.getDeviceID(), streamInfo.getChannelId());
cmder.playStreamCmd(mediaServerItem, device, channelId, (IMediaServerItem mediaServerItemInuse, JSONObject response) -> {
SSRCInfo ssrcInfo;
String streamId2 = null;
if (mediaServerItem.isRtpEnable()) {
streamId2 = String.format("gb_play_%s_%s", device.getDeviceId(), channelId);
}
ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId2);
cmder.playStreamCmd(mediaServerItem, ssrcInfo, device, channelId, (MediaServerItem mediaServerItemInuse, JSONObject response) -> {
logger.info("收到订阅消息: " + response.toJSONString());
onPublishHandlerForPlay(mediaServerItemInuse, response, deviceId, channelId, uuid.toString());
}, (event) -> {
@@ -218,7 +247,7 @@ public class PlayServiceImpl implements IPlayService {
}
@Override
public void onPublishHandlerForPlay(IMediaServerItem mediaServerItem, JSONObject resonse, String deviceId, String channelId, String uuid) {
public void onPublishHandlerForPlay(MediaServerItem mediaServerItem, JSONObject resonse, String deviceId, String channelId, String uuid) {
RequestMessage msg = new RequestMessage();
msg.setId(DeferredResultHolder.CALLBACK_CMD_PlAY + uuid);
StreamInfo streamInfo = onPublishHandler(mediaServerItem, resonse, deviceId, channelId, uuid);
@@ -228,14 +257,6 @@ public class PlayServiceImpl implements IPlayService {
deviceChannel.setStreamId(streamInfo.getStreamId());
storager.startPlay(deviceId, channelId, streamInfo.getStreamId());
}
ClientTransaction transaction = streamSession.getTransaction(deviceId, channelId);
SIPDialog dialog = (SIPDialog)transaction.getDialog();
StreamInfo.TransactionInfo transactionInfo = new StreamInfo.TransactionInfo();
transactionInfo.callId = dialog.getCallId().getCallId();
transactionInfo.localTag = dialog.getLocalTag();
transactionInfo.remoteTag = dialog.getRemoteTag();
transactionInfo.branch = dialog.getFirstTransactionInt().getBranchId();
streamInfo.setTransactionInfo(transactionInfo);
redisCatchStorage.startPlay(streamInfo);
msg.setData(JSON.toJSONString(streamInfo));
@@ -254,10 +275,10 @@ public class PlayServiceImpl implements IPlayService {
}
@Override
public IMediaServerItem getNewMediaServerItem(Device device) {
public MediaServerItem getNewMediaServerItem(Device device) {
if (device == null) return null;
String mediaServerId = device.getMediaServerId();
IMediaServerItem mediaServerItem = null;
MediaServerItem mediaServerItem = null;
if (mediaServerId == null) {
mediaServerItem = mediaServerService.getMediaServerForMinimumLoad();
}else {
@@ -270,7 +291,7 @@ public class PlayServiceImpl implements IPlayService {
}
@Override
public void onPublishHandlerForPlayBack(IMediaServerItem mediaServerItem, JSONObject resonse, String deviceId, String channelId, String uuid) {
public void onPublishHandlerForPlayBack(MediaServerItem mediaServerItem, JSONObject resonse, String deviceId, String channelId, String uuid) {
RequestMessage msg = new RequestMessage();
msg.setId(DeferredResultHolder.CALLBACK_CMD_PlAY + uuid);
StreamInfo streamInfo = onPublishHandler(mediaServerItem, resonse, deviceId, channelId, uuid);
@@ -285,7 +306,7 @@ public class PlayServiceImpl implements IPlayService {
}
}
public StreamInfo onPublishHandler(IMediaServerItem mediaServerItem, JSONObject resonse, String deviceId, String channelId, String uuid) {
public StreamInfo onPublishHandler(MediaServerItem mediaServerItem, JSONObject resonse, String deviceId, String channelId, String uuid) {
String streamId = resonse.getString("stream");
JSONArray tracks = resonse.getJSONArray("tracks");
StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream(mediaServerItem,"rtp", streamId, tracks);

View File

@@ -3,7 +3,6 @@ package com.genersoft.iot.vmp.service.impl;
import com.alibaba.fastjson.JSONObject;
import com.genersoft.iot.vmp.gb28181.bean.GbStream;
import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils;
import com.genersoft.iot.vmp.media.zlm.dto.IMediaServerItem;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem;
import com.genersoft.iot.vmp.service.IGbStreamService;
@@ -58,7 +57,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
@Override
public String save(StreamProxyItem param) {
IMediaServerItem mediaInfo;
MediaServerItem mediaInfo;
if ("auto".equals(param.getMediaServerId())){
mediaInfo = mediaServerService.getMediaServerForMinimumLoad();
}else {
@@ -120,7 +119,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
@Override
public JSONObject addStreamProxyToZlm(StreamProxyItem param) {
JSONObject result = null;
IMediaServerItem mediaServerItem = null;
MediaServerItem mediaServerItem = null;
if (param.getMediaServerId() == null) {
logger.warn("添加代理时MediaServerId 为null");
return null;
@@ -141,7 +140,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
@Override
public JSONObject removeStreamProxyFromZlm(StreamProxyItem param) {
if (param ==null) return null;
IMediaServerItem mediaServerItem = mediaServerService.getOne(param.getMediaServerId());
MediaServerItem mediaServerItem = mediaServerService.getOne(param.getMediaServerId());
JSONObject result = zlmresTfulUtils.closeStreams(mediaServerItem, param.getApp(), param.getStream());
return result;
}
@@ -198,7 +197,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
}
@Override
public JSONObject getFFmpegCMDs(IMediaServerItem mediaServerItem) {
public JSONObject getFFmpegCMDs(MediaServerItem mediaServerItem) {
JSONObject result = new JSONObject();
JSONObject mediaServerConfigResuly = zlmresTfulUtils.getMediaServerConfig(mediaServerItem);
if (mediaServerConfigResuly != null && mediaServerConfigResuly.getInteger("code") == 0

View File

@@ -5,7 +5,6 @@ import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.TypeReference;
import com.genersoft.iot.vmp.gb28181.bean.GbStream;
import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils;
import com.genersoft.iot.vmp.media.zlm.dto.IMediaServerItem;
import com.genersoft.iot.vmp.media.zlm.dto.MediaItem;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
@@ -43,7 +42,7 @@ public class StreamPushServiceImpl implements IStreamPushService {
private IMediaServerService mediaServerService;
@Override
public List<StreamPushItem> handleJSON(String jsonData, IMediaServerItem mediaServerItem) {
public List<StreamPushItem> handleJSON(String jsonData, MediaServerItem mediaServerItem) {
if (jsonData == null) return null;
Map<String, StreamPushItem> result = new HashMap<>();
@@ -98,7 +97,7 @@ public class StreamPushServiceImpl implements IStreamPushService {
@Override
public boolean removeFromGB(GbStream stream) {
int del = gbStreamMapper.del(stream.getApp(), stream.getStream());
IMediaServerItem mediaInfo = mediaServerService.getOne(stream.getMediaServerId());
MediaServerItem mediaInfo = mediaServerService.getOne(stream.getMediaServerId());
JSONObject mediaList = zlmresTfulUtils.getMediaList(mediaInfo, stream.getApp(), stream.getStream());
if (mediaList == null) {
streamPushMapper.del(stream.getApp(), stream.getStream());