Merge branch 'wvp-28181-2.0'

# Conflicts:
#	src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java
#	src/main/java/com/genersoft/iot/vmp/service/impl/MediaServiceImpl.java
#	src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java
#	src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/play/PlayController.java
#	web_src/src/components/Login.vue
This commit is contained in:
648540858
2022-08-22 16:53:16 +08:00
146 changed files with 2878 additions and 17348 deletions

View File

@@ -3,6 +3,7 @@ package com.genersoft.iot.vmp.service;
import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
import com.genersoft.iot.vmp.gb28181.bean.GbStream;
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
import com.github.pagehelper.PageInfo;
import java.util.List;
@@ -45,4 +46,11 @@ public interface IGbStreamService {
void sendCatalogMsg(GbStream gbStream, String type);
void sendCatalogMsgs(List<GbStream> gbStreams, String type);
/**
* 修改gbId或name
* @param streamPushItemForUpdate
* @return
*/
int updateGbIdOrName(List<StreamPushItem> streamPushItemForUpdate);
}

View File

@@ -63,7 +63,7 @@ public interface IMediaServerService {
void clearMediaServerForOnline();
WVPResult<String> add(MediaServerItem mediaSerItem);
void add(MediaServerItem mediaSerItem);
int addToDatabase(MediaServerItem mediaSerItem);
@@ -71,7 +71,7 @@ public interface IMediaServerService {
void resetOnlineServerItem(MediaServerItem serverItem);
WVPResult<MediaServerItem> checkMediaServer(String ip, int port, String secret);
MediaServerItem checkMediaServer(String ip, int port, String secret);
boolean checkMediaRecordServer(String ip, int port);

View File

@@ -32,13 +32,13 @@ public interface IPlayService {
void onPublishHandlerForDownload(InviteStreamInfo inviteStreamInfo, String deviceId, String channelId, String toString);
DeferredResult<ResponseEntity<String>> playBack(String deviceId, String channelId, String startTime, String endTime, InviteStreamCallback infoCallBack, PlayBackCallback hookCallBack);
DeferredResult<ResponseEntity<String>> playBack(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo,String deviceId, String channelId, String startTime, String endTime, InviteStreamCallback infoCallBack, PlayBackCallback hookCallBack);
DeferredResult<String> playBack(String deviceId, String channelId, String startTime, String endTime, InviteStreamCallback infoCallBack, PlayBackCallback hookCallBack);
DeferredResult<String> playBack(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo,String deviceId, String channelId, String startTime, String endTime, InviteStreamCallback infoCallBack, PlayBackCallback hookCallBack);
void zlmServerOffline(String mediaServerId);
DeferredResult<ResponseEntity<String>> download(String deviceId, String channelId, String startTime, String endTime, int downloadSpeed, InviteStreamCallback infoCallBack, PlayBackCallback hookCallBack);
DeferredResult<ResponseEntity<String>> download(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo,String deviceId, String channelId, String startTime, String endTime, int downloadSpeed, InviteStreamCallback infoCallBack, PlayBackCallback hookCallBack);
DeferredResult<String> download(String deviceId, String channelId, String startTime, String endTime, int downloadSpeed, InviteStreamCallback infoCallBack, PlayBackCallback hookCallBack);
DeferredResult<String> download(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo,String deviceId, String channelId, String startTime, String endTime, int downloadSpeed, InviteStreamCallback infoCallBack, PlayBackCallback hookCallBack);
StreamInfo getDownLoadInfo(String deviceId, String channelId, String stream);

View File

@@ -2,12 +2,8 @@ package com.genersoft.iot.vmp.service;
import com.alibaba.fastjson.JSONObject;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.media.zlm.ZLMServerConfig;
import com.genersoft.iot.vmp.media.zlm.dto.MediaItem;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem;
import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
import com.github.pagehelper.PageInfo;
public interface IStreamProxyService {
@@ -16,7 +12,7 @@ public interface IStreamProxyService {
* 保存视频代理
* @param param
*/
WVPResult<StreamInfo> save(StreamProxyItem param);
StreamInfo save(StreamProxyItem param);
/**
* 添加视频代理到zlm

View File

@@ -100,4 +100,10 @@ public interface IStreamPushService {
* 增加推流
*/
boolean add(StreamPushItem stream);
/**
* 获取全部的app+Streanm 用于判断推流列表是新增还是修改
* @return
*/
List<String> getAllAppAndStream();
}

View File

@@ -4,14 +4,18 @@ import com.alibaba.fastjson.JSONObject;
import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import javax.sip.RequestEvent;
import java.util.EventObject;
/**
* @author lin
*/
public class PlayBackResult<T> {
private int code;
private T data;
private MediaServerItem mediaServerItem;
private JSONObject response;
private SipSubscribe.EventResult event;
private SipSubscribe.EventResult<EventObject> event;
public int getCode() {
return code;
@@ -45,11 +49,11 @@ public class PlayBackResult<T> {
this.response = response;
}
public SipSubscribe.EventResult getEvent() {
public SipSubscribe.EventResult<EventObject> getEvent() {
return event;
}
public void setEvent(SipSubscribe.EventResult event) {
public void setEvent(SipSubscribe.EventResult<EventObject> event) {
this.event = event;
}
}

View File

@@ -23,6 +23,7 @@ import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.support.incrementer.AbstractIdentityColumnMaxValueIncrementer;
import org.springframework.stereotype.Service;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;
import java.time.Instant;
@@ -105,6 +106,7 @@ public class DeviceServiceImpl implements IDeviceService {
redisCatchStorage.updateDevice(device);
commander.deviceInfoQuery(device);
sync(device);
// TODO 如果设备下的通道级联到了其他平台那么需要发送事件或者notify给上级平台
}else {
deviceMapper.update(device);
redisCatchStorage.updateDevice(device);
@@ -281,13 +283,13 @@ public class DeviceServiceImpl implements IDeviceService {
logger.warn("更新设备时未找到设备信息");
return;
}
if (!StringUtils.isEmpty(device.getName())) {
if (!ObjectUtils.isEmpty(device.getName())) {
deviceInStore.setName(device.getName());
}
if (!StringUtils.isEmpty(device.getCharset())) {
if (!ObjectUtils.isEmpty(device.getCharset())) {
deviceInStore.setCharset(device.getCharset());
}
if (!StringUtils.isEmpty(device.getMediaServerId())) {
if (!ObjectUtils.isEmpty(device.getMediaServerId())) {
deviceInStore.setMediaServerId(device.getMediaServerId());
}

View File

@@ -1,10 +1,9 @@
package com.genersoft.iot.vmp.service.impl;
import com.genersoft.iot.vmp.conf.SipConfig;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent;
import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem;
import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
import com.genersoft.iot.vmp.storager.dao.GbStreamMapper;
import com.genersoft.iot.vmp.storager.dao.ParentPlatformMapper;
import com.genersoft.iot.vmp.storager.dao.PlatformCatalogMapper;
@@ -19,6 +18,7 @@ import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.stereotype.Service;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.TransactionStatus;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;
import java.util.ArrayList;
@@ -169,7 +169,7 @@ public class GbStreamServiceImpl implements IGbStreamService {
public void sendCatalogMsgs(List<GbStream> gbStreams, String type) {
if (gbStreams.size() > 0) {
for (GbStream gs : gbStreams) {
if (StringUtils.isEmpty(gs.getGbId())){
if (ObjectUtils.isEmpty(gs.getGbId())){
continue;
}
List<ParentPlatform> parentPlatforms = platformGbStreamMapper.selectByAppAndStream(gs.getApp(), gs.getStream());
@@ -183,4 +183,9 @@ public class GbStreamServiceImpl implements IGbStreamService {
}
}
}
@Override
public int updateGbIdOrName(List<StreamPushItem> streamPushItemForUpdate) {
return gbStreamMapper.updateGbIdOrName(streamPushItemForUpdate);
}
}

View File

@@ -11,6 +11,8 @@ import java.util.Set;
import com.genersoft.iot.vmp.media.zlm.ZLMRunner;
import com.genersoft.iot.vmp.service.IStreamProxyService;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
import com.genersoft.iot.vmp.conf.exception.ControllerException;
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -19,6 +21,7 @@ import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.stereotype.Service;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.TransactionStatus;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;
import com.alibaba.fastjson.JSON;
@@ -58,9 +61,6 @@ public class MediaServerServiceImpl implements IMediaServerService {
@Autowired
private SipConfig sipConfig;
@Autowired
private ZLMRunner zlmRunner;
@Value("${server.ssl.enabled:false}")
private boolean sslEnabled;
@@ -88,8 +88,6 @@ public class MediaServerServiceImpl implements IMediaServerService {
@Autowired
private ZLMRTPServerFactory zlmrtpServerFactory;
@Autowired
private RedisUtil redisUtil;
@Autowired
private IVideoManagerStorage storager;
@@ -107,19 +105,19 @@ public class MediaServerServiceImpl implements IMediaServerService {
public void updateVmServer(List<MediaServerItem> mediaServerItemList) {
logger.info("[zlm] 缓存初始化 ");
for (MediaServerItem mediaServerItem : mediaServerItemList) {
if (StringUtils.isEmpty(mediaServerItem.getId())) {
if (ObjectUtils.isEmpty(mediaServerItem.getId())) {
continue;
}
// 更新
if (mediaServerItem.getSsrcConfig() == null) {
SsrcConfig ssrcConfig = new SsrcConfig(mediaServerItem.getId(), null, sipConfig.getDomain());
mediaServerItem.setSsrcConfig(ssrcConfig);
redisUtil.set(VideoManagerConstants.MEDIA_SERVER_PREFIX + userSetting.getServerId() + "_" + mediaServerItem.getId(), mediaServerItem);
RedisUtil.set(VideoManagerConstants.MEDIA_SERVER_PREFIX + userSetting.getServerId() + "_" + mediaServerItem.getId(), mediaServerItem);
}
// 查询redis是否存在此mediaServer
String key = VideoManagerConstants.MEDIA_SERVER_PREFIX + userSetting.getServerId() + "_" + mediaServerItem.getId();
if (!redisUtil.hasKey(key)) {
redisUtil.set(key, mediaServerItem);
if (!RedisUtil.hasKey(key)) {
RedisUtil.set(key, mediaServerItem);
}
}
@@ -161,7 +159,7 @@ public class MediaServerServiceImpl implements IMediaServerService {
if (mediaServerItem.isRtpEnable()) {
rtpServerPort = zlmrtpServerFactory.createRTPServer(mediaServerItem, streamId, ssrcCheck?Integer.parseInt(ssrc):0, port);
}
redisUtil.set(key, mediaServerItem);
RedisUtil.set(key, mediaServerItem);
return new SSRCInfo(rtpServerPort, ssrc, streamId);
}
}
@@ -194,7 +192,7 @@ public class MediaServerServiceImpl implements IMediaServerService {
ssrcConfig.releaseSsrc(ssrc);
mediaServerItem.setSsrcConfig(ssrcConfig);
String key = VideoManagerConstants.MEDIA_SERVER_PREFIX + userSetting.getServerId() + "_" + mediaServerItem.getId();
redisUtil.set(key, mediaServerItem);
RedisUtil.set(key, mediaServerItem);
}
/**
@@ -203,7 +201,7 @@ public class MediaServerServiceImpl implements IMediaServerService {
@Override
public void clearRTPServer(MediaServerItem mediaServerItem) {
mediaServerItem.setSsrcConfig(new SsrcConfig(mediaServerItem.getId(), null, sipConfig.getDomain()));
redisUtil.zAdd(VideoManagerConstants.MEDIA_SERVERS_ONLINE_PREFIX + userSetting.getServerId(), mediaServerItem.getId(), 0);
RedisUtil.zAdd(VideoManagerConstants.MEDIA_SERVERS_ONLINE_PREFIX + userSetting.getServerId(), mediaServerItem.getId(), 0);
}
@@ -225,19 +223,19 @@ public class MediaServerServiceImpl implements IMediaServerService {
);
}
String key = VideoManagerConstants.MEDIA_SERVER_PREFIX + userSetting.getServerId() + "_" + mediaServerItemInDataBase.getId();
redisUtil.set(key, mediaServerItemInDataBase);
RedisUtil.set(key, mediaServerItemInDataBase);
}
@Override
public List<MediaServerItem> getAll() {
List<MediaServerItem> result = new ArrayList<>();
List<Object> mediaServerKeys = redisUtil.scan(String.format("%S*", VideoManagerConstants.MEDIA_SERVER_PREFIX+ userSetting.getServerId() + "_" ));
List<Object> mediaServerKeys = RedisUtil.scan(String.format("%S*", VideoManagerConstants.MEDIA_SERVER_PREFIX+ userSetting.getServerId() + "_" ));
String onlineKey = VideoManagerConstants.MEDIA_SERVERS_ONLINE_PREFIX + userSetting.getServerId();
for (Object mediaServerKey : mediaServerKeys) {
String key = (String) mediaServerKey;
MediaServerItem mediaServerItem = (MediaServerItem) redisUtil.get(key);
MediaServerItem mediaServerItem = (MediaServerItem) RedisUtil.get(key);
// 检查状态
Double aDouble = redisUtil.zScore(onlineKey, mediaServerItem.getId());
Double aDouble = RedisUtil.zScore(onlineKey, mediaServerItem.getId());
if (aDouble != null) {
mediaServerItem.setStatus(true);
}
@@ -263,13 +261,13 @@ public class MediaServerServiceImpl implements IMediaServerService {
@Override
public List<MediaServerItem> getAllOnline() {
String key = VideoManagerConstants.MEDIA_SERVERS_ONLINE_PREFIX + userSetting.getServerId();
Set<String> mediaServerIdSet = redisUtil.zRevRange(key, 0, -1);
Set<String> mediaServerIdSet = RedisUtil.zRevRange(key, 0, -1);
List<MediaServerItem> result = new ArrayList<>();
if (mediaServerIdSet != null && mediaServerIdSet.size() > 0) {
for (String mediaServerId : mediaServerIdSet) {
String serverKey = VideoManagerConstants.MEDIA_SERVER_PREFIX + userSetting.getServerId() + "_" + mediaServerId;
result.add((MediaServerItem) redisUtil.get(serverKey));
result.add((MediaServerItem) RedisUtil.get(serverKey));
}
}
Collections.reverse(result);
@@ -287,7 +285,7 @@ public class MediaServerServiceImpl implements IMediaServerService {
return null;
}
String key = VideoManagerConstants.MEDIA_SERVER_PREFIX + userSetting.getServerId() + "_" + mediaServerId;
return (MediaServerItem)redisUtil.get(key);
return (MediaServerItem)RedisUtil.get(key);
}
@Override
@@ -299,12 +297,11 @@ public class MediaServerServiceImpl implements IMediaServerService {
@Override
public void clearMediaServerForOnline() {
String key = VideoManagerConstants.MEDIA_SERVERS_ONLINE_PREFIX + userSetting.getServerId();
redisUtil.del(key);
RedisUtil.del(key);
}
@Override
public WVPResult<String> add(MediaServerItem mediaServerItem) {
WVPResult<String> result = new WVPResult<>();
public void add(MediaServerItem mediaServerItem) {
mediaServerItem.setCreateTime(DateUtil.getNow());
mediaServerItem.setUpdateTime(DateUtil.getNow());
mediaServerItem.setHookAliveInterval(120);
@@ -314,26 +311,19 @@ public class MediaServerServiceImpl implements IMediaServerService {
if (data != null && data.size() > 0) {
ZLMServerConfig zlmServerConfig= JSON.parseObject(JSON.toJSONString(data.get(0)), ZLMServerConfig.class);
if (mediaServerMapper.queryOne(zlmServerConfig.getGeneralMediaServerId()) != null) {
result.setCode(-1);
result.setMsg("保存失败媒体服务ID [ " + zlmServerConfig.getGeneralMediaServerId() + " ] 已存在,请修改媒体服务器配置");
return result;
throw new ControllerException(ErrorCode.ERROR100.getCode(),"保存失败媒体服务ID [ " + zlmServerConfig.getGeneralMediaServerId() + " ] 已存在,请修改媒体服务器配置");
}
mediaServerItem.setId(zlmServerConfig.getGeneralMediaServerId());
zlmServerConfig.setIp(mediaServerItem.getIp());
mediaServerMapper.add(mediaServerItem);
zlmServerOnline(zlmServerConfig);
result.setCode(0);
result.setMsg("success");
}else {
result.setCode(-1);
result.setMsg("连接失败");
throw new ControllerException(ErrorCode.ERROR100.getCode(),"连接失败");
}
}else {
result.setCode(-1);
result.setMsg("连接失败");
throw new ControllerException(ErrorCode.ERROR100.getCode(),"连接失败");
}
return result;
}
@Override
@@ -401,20 +391,20 @@ public class MediaServerServiceImpl implements IMediaServerService {
}
serverItem.setStatus(true);
if (StringUtils.isEmpty(serverItem.getId())) {
if (ObjectUtils.isEmpty(serverItem.getId())) {
logger.warn("[未注册的zlm] serverItem缺少ID 无法接入:{}{}", zlmServerConfig.getIp(),zlmServerConfig.getHttpPort() );
return;
}
mediaServerMapper.update(serverItem);
String key = VideoManagerConstants.MEDIA_SERVER_PREFIX + userSetting.getServerId() + "_" + zlmServerConfig.getGeneralMediaServerId();
if (redisUtil.get(key) == null) {
if (RedisUtil.get(key) == null) {
SsrcConfig ssrcConfig = new SsrcConfig(zlmServerConfig.getGeneralMediaServerId(), null, sipConfig.getDomain());
serverItem.setSsrcConfig(ssrcConfig);
}else {
MediaServerItem mediaServerItemInRedis = (MediaServerItem)redisUtil.get(key);
MediaServerItem mediaServerItemInRedis = (MediaServerItem)RedisUtil.get(key);
serverItem.setSsrcConfig(mediaServerItemInRedis.getSsrcConfig());
}
redisUtil.set(key, serverItem);
RedisUtil.set(key, serverItem);
resetOnlineServerItem(serverItem);
if (serverItem.isAutoConfig()) {
setZLMConfig(serverItem, "0".equals(zlmServerConfig.getHookEnable()));
@@ -435,15 +425,15 @@ public class MediaServerServiceImpl implements IMediaServerService {
// 更新缓存
String key = VideoManagerConstants.MEDIA_SERVERS_ONLINE_PREFIX + userSetting.getServerId();
// 使用zset的分数作为当前并发量 默认值设置为0
if (redisUtil.zScore(key, serverItem.getId()) == null) { // 不存在则设置默认值 已存在则重置
redisUtil.zAdd(key, serverItem.getId(), 0L);
if (RedisUtil.zScore(key, serverItem.getId()) == null) { // 不存在则设置默认值 已存在则重置
RedisUtil.zAdd(key, serverItem.getId(), 0L);
// 查询服务流数量
zlmresTfulUtils.getMediaList(serverItem, null, null, "rtmp",(mediaList ->{
Integer code = mediaList.getInteger("code");
if (code == 0) {
JSONArray data = mediaList.getJSONArray("data");
if (data != null) {
redisUtil.zAdd(key, serverItem.getId(), data.size());
RedisUtil.zAdd(key, serverItem.getId(), data.size());
}
}
}));
@@ -460,14 +450,14 @@ public class MediaServerServiceImpl implements IMediaServerService {
return;
}
String key = VideoManagerConstants.MEDIA_SERVERS_ONLINE_PREFIX + userSetting.getServerId();
redisUtil.zIncrScore(key, mediaServerId, 1);
RedisUtil.zIncrScore(key, mediaServerId, 1);
}
@Override
public void removeCount(String mediaServerId) {
String key = VideoManagerConstants.MEDIA_SERVERS_ONLINE_PREFIX + userSetting.getServerId();
redisUtil.zIncrScore(key, mediaServerId, - 1);
RedisUtil.zIncrScore(key, mediaServerId, - 1);
}
/**
@@ -478,15 +468,15 @@ public class MediaServerServiceImpl implements IMediaServerService {
public MediaServerItem getMediaServerForMinimumLoad() {
String key = VideoManagerConstants.MEDIA_SERVERS_ONLINE_PREFIX + userSetting.getServerId();
if (redisUtil.zSize(key) == null || redisUtil.zSize(key) == 0) {
if (redisUtil.zSize(key) == null || redisUtil.zSize(key) == 0) {
if (RedisUtil.zSize(key) == null || RedisUtil.zSize(key) == 0) {
if (RedisUtil.zSize(key) == null || RedisUtil.zSize(key) == 0) {
logger.info("获取负载最低的节点时无在线节点");
return null;
}
}
// 获取分数最低的,及并发最低的
Set<Object> objects = redisUtil.ZRange(key, 0, -1);
Set<Object> objects = RedisUtil.ZRange(key, 0, -1);
ArrayList<Object> mediaServerObjectS = new ArrayList<>(objects);
String mediaServerId = (String)mediaServerObjectS.get(0);
@@ -536,7 +526,7 @@ public class MediaServerServiceImpl implements IMediaServerService {
// 最多等待未初始化的Track时间单位毫秒超时之后会忽略未初始化的Track, 设置此选项优化那些音频错误的不规范流,
// 等zlm支持给每个rtpServer设置关闭音频的时候可以不设置此选项
// param.put("general.wait_track_ready_ms", "3000" );
if (mediaServerItem.isRtpEnable() && !StringUtils.isEmpty(mediaServerItem.getRtpPortRange())) {
if (mediaServerItem.isRtpEnable() && !ObjectUtils.isEmpty(mediaServerItem.getRtpPortRange())) {
param.put("rtp_proxy.port_range", mediaServerItem.getRtpPortRange().replace(",", "-"));
}
@@ -563,12 +553,9 @@ public class MediaServerServiceImpl implements IMediaServerService {
@Override
public WVPResult<MediaServerItem> checkMediaServer(String ip, int port, String secret) {
WVPResult<MediaServerItem> result = new WVPResult<>();
public MediaServerItem checkMediaServer(String ip, int port, String secret) {
if (mediaServerMapper.queryOneByHostAndPort(ip, port) != null) {
result.setCode(-1);
result.setMsg("此连接已存在");
return result;
throw new ControllerException(ErrorCode.ERROR100.getCode(), "此连接已存在");
}
MediaServerItem mediaServerItem = new MediaServerItem();
mediaServerItem.setIp(ip);
@@ -576,21 +563,15 @@ public class MediaServerServiceImpl implements IMediaServerService {
mediaServerItem.setSecret(secret);
JSONObject responseJSON = zlmresTfulUtils.getMediaServerConfig(mediaServerItem);
if (responseJSON == null) {
result.setCode(-1);
result.setMsg("连接失败");
return result;
throw new ControllerException(ErrorCode.ERROR100.getCode(), "连接失败");
}
JSONArray data = responseJSON.getJSONArray("data");
ZLMServerConfig zlmServerConfig = JSON.parseObject(JSON.toJSONString(data.get(0)), ZLMServerConfig.class);
if (zlmServerConfig == null) {
result.setCode(-1);
result.setMsg("读取配置失败");
return result;
throw new ControllerException(ErrorCode.ERROR100.getCode(), "读取配置失败");
}
if (mediaServerMapper.queryOne(zlmServerConfig.getGeneralMediaServerId()) != null) {
result.setCode(-1);
result.setMsg("媒体服务ID [" + zlmServerConfig.getGeneralMediaServerId() + " ] 已存在,请修改媒体服务器配置");
return result;
throw new ControllerException(ErrorCode.ERROR100.getCode(), "媒体服务ID [" + zlmServerConfig.getGeneralMediaServerId() + " ] 已存在,请修改媒体服务器配置");
}
mediaServerItem.setHttpSSlPort(zlmServerConfig.getHttpPort());
mediaServerItem.setRtmpPort(zlmServerConfig.getRtmpPort());
@@ -602,10 +583,7 @@ public class MediaServerServiceImpl implements IMediaServerService {
mediaServerItem.setHookIp(sipConfig.getIp());
mediaServerItem.setSdpIp(ip);
mediaServerItem.setStreamNoneReaderDelayMS(zlmServerConfig.getGeneralStreamNoneReaderDelayMS());
result.setCode(0);
result.setMsg("成功");
result.setData(mediaServerItem);
return result;
return mediaServerItem;
}
@Override
@@ -629,9 +607,9 @@ public class MediaServerServiceImpl implements IMediaServerService {
@Override
public void delete(String id) {
redisUtil.zRemove(VideoManagerConstants.MEDIA_SERVERS_ONLINE_PREFIX + userSetting.getServerId(), id);
RedisUtil.zRemove(VideoManagerConstants.MEDIA_SERVERS_ONLINE_PREFIX + userSetting.getServerId(), id);
String key = VideoManagerConstants.MEDIA_SERVER_PREFIX + userSetting.getServerId() + "_" + id;
redisUtil.del(key);
RedisUtil.del(key);
}
@Override
public void deleteDb(String id){
@@ -650,7 +628,7 @@ public class MediaServerServiceImpl implements IMediaServerService {
}
// zlm连接重试
logger.warn("[更新ZLM 保活信息]失败,未找到流媒体信息,尝试重连zlm");
reloadZlm();
// reloadZlm();
mediaServerItem = getOne(mediaServerId);
if (mediaServerItem == null) {
// zlm连接重试
@@ -660,7 +638,7 @@ public class MediaServerServiceImpl implements IMediaServerService {
}
String key = VideoManagerConstants.MEDIA_SERVER_KEEPALIVE_PREFIX + userSetting.getServerId() + "_" + mediaServerId;
int hookAliveInterval = mediaServerItem.getHookAliveInterval() + 2;
redisUtil.set(key, data, hookAliveInterval);
RedisUtil.set(key, data, hookAliveInterval);
}
private MediaServerItem getOneFromDatabase(String mediaServerId) {
@@ -682,13 +660,4 @@ public class MediaServerServiceImpl implements IMediaServerService {
}
}
}
public void reloadZlm(){
try {
zlmRunner.run();
Thread.sleep(500);//延迟0.5秒缓冲时间
} catch (Exception e) {
logger.warn("尝试重连zlm失败",e);
}
}
}

View File

@@ -15,6 +15,7 @@ import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
import com.genersoft.iot.vmp.service.IMediaService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;
@Service
@@ -94,7 +95,7 @@ public class MediaServiceImpl implements IMediaService {
}
streamInfoResult.setIp(addr);
streamInfoResult.setMediaServerId(mediaInfo.getId());
String callIdParam = StringUtils.isEmpty(callId)?"":"?callId=" + callId;
String callIdParam = ObjectUtils.isEmpty(callId)?"":"?callId=" + callId;
streamInfoResult.setRtmp(String.format("rtmp://%s:%s/%s/%s%s", addr, mediaInfo.getRtmpPort(), app, stream, callIdParam));
if (mediaInfo.getRtmpSSlPort() != 0) {
streamInfoResult.setRtmps(String.format("rtmps://%s:%s/%s/%s%s", addr, mediaInfo.getRtmpSSlPort(), app, stream, callIdParam));
@@ -121,7 +122,7 @@ public class MediaServiceImpl implements IMediaService {
streamInfoResult.setHttps_ts(String.format("https://%s:%s/%s/%s.live.ts%s", addr, mediaInfo.getHttpSSlPort(), app, stream, callIdParam));
streamInfoResult.setWss_ts(String.format("wss://%s:%s/%s/%s.live.ts%s", addr, mediaInfo.getHttpSSlPort(), app, stream, callIdParam));
streamInfoResult.setWss_ts(String.format("wss://%s:%s/%s/%s.live.ts%s", addr, mediaInfo.getHttpSSlPort(), app, stream, callIdParam));
streamInfoResult.setRtc(String.format("https://%s:%s/index/api/webrtc?app=%s&stream=%s&type=%s%s", mediaInfo.getStreamIp(), mediaInfo.getHttpSSlPort(), app, stream, isPlay?"play":"push", StringUtils.isEmpty(callId)?"":"&callId=" + callId));
streamInfoResult.setRtc(String.format("https://%s:%s/index/api/webrtc?app=%s&stream=%s&type=%s%s", mediaInfo.getStreamIp(), mediaInfo.getHttpSSlPort(), app, stream, isPlay?"play":"push", ObjectUtils.isEmpty(callId)?"":"&callId=" + callId));
}
streamInfoResult.setTracks(tracks);

View File

@@ -7,12 +7,12 @@ import java.util.*;
import javax.sip.ResponseEvent;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.conf.exception.ControllerException;
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Service;
import org.springframework.web.context.request.async.DeferredResult;
@@ -34,7 +34,6 @@ import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform;
import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange;
import com.genersoft.iot.vmp.media.zlm.dto.HookType;
import com.genersoft.iot.vmp.utils.DateUtil;
import com.genersoft.iot.vmp.media.zlm.AssistRESTfulUtils;
import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe;
@@ -141,6 +140,9 @@ public class PlayServiceImpl implements IPlayService {
public PlayResult play(MediaServerItem mediaServerItem, String deviceId, String channelId,
ZLMHttpHookSubscribe.Event hookEvent, SipSubscribe.Event errorEvent,
Runnable timeoutCallback) {
if (mediaServerItem == null) {
throw new ControllerException(ErrorCode.ERROR100.getCode(), "未找到可用的zlm");
}
PlayResult playResult = new PlayResult();
RequestMessage msg = new RequestMessage();
String key = DeferredResultHolder.CALLBACK_CMD_PLAY + deviceId + channelId;
@@ -148,18 +150,11 @@ public class PlayServiceImpl implements IPlayService {
String uuid = UUID.randomUUID().toString();
msg.setId(uuid);
playResult.setUuid(uuid);
DeferredResult<ResponseEntity<String>> result = new DeferredResult<>(userSetting.getPlayTimeout().longValue());
DeferredResult<WVPResult<String>> result = new DeferredResult<>(userSetting.getPlayTimeout().longValue());
playResult.setResult(result);
// 录像查询以channelId作为deviceId查询
resultHolder.put(key, uuid, result);
if (mediaServerItem == null) {
WVPResult wvpResult = new WVPResult();
wvpResult.setCode(-1);
wvpResult.setMsg("未找到可用的zlm");
msg.setData(wvpResult);
resultHolder.invokeResult(msg);
return playResult;
}
Device device = redisCatchStorage.getDevice(deviceId);
StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(deviceId, channelId);
playResult.setDevice(device);
@@ -170,17 +165,14 @@ public class PlayServiceImpl implements IPlayService {
// TODO 应该在上流时调用更好,结束也可能是错误结束
String path = "snap";
String fileName = deviceId + "_" + channelId + ".jpg";
ResponseEntity responseEntity = (ResponseEntity)result.getResult();
if (responseEntity != null && responseEntity.getStatusCode() == HttpStatus.OK) {
WVPResult wvpResult = (WVPResult)responseEntity.getBody();
if (Objects.requireNonNull(wvpResult).getCode() == 0) {
StreamInfo streamInfoForSuccess = (StreamInfo)wvpResult.getData();
MediaServerItem mediaInfo = mediaServerService.getOne(streamInfoForSuccess.getMediaServerId());
String streamUrl = streamInfoForSuccess.getFmp4();
// 请求截图
logger.info("[请求截图]: " + fileName);
zlmresTfulUtils.getSnap(mediaInfo, streamUrl, 15, 1, path, fileName);
}
WVPResult wvpResult = (WVPResult)result.getResult();
if (Objects.requireNonNull(wvpResult).getCode() == 0) {
StreamInfo streamInfoForSuccess = (StreamInfo)wvpResult.getData();
MediaServerItem mediaInfo = mediaServerService.getOne(streamInfoForSuccess.getMediaServerId());
String streamUrl = streamInfoForSuccess.getFmp4();
// 请求截图
logger.info("[请求截图]: " + fileName);
zlmresTfulUtils.getSnap(mediaInfo, streamUrl, 15, 1, path, fileName);
}
});
});
@@ -188,7 +180,7 @@ public class PlayServiceImpl implements IPlayService {
String streamId = streamInfo.getStream();
if (streamId == null) {
WVPResult wvpResult = new WVPResult();
wvpResult.setCode(-1);
wvpResult.setCode(ErrorCode.ERROR100.getCode());
wvpResult.setMsg("点播失败, redis缓存streamId等于null");
msg.setData(wvpResult);
resultHolder.invokeAllResult(msg);
@@ -202,8 +194,8 @@ public class PlayServiceImpl implements IPlayService {
if (rtpInfo.getBoolean("exist")) {
WVPResult wvpResult = new WVPResult();
wvpResult.setCode(0);
wvpResult.setMsg("success");
wvpResult.setCode(ErrorCode.SUCCESS.getCode());
wvpResult.setMsg(ErrorCode.SUCCESS.getMsg());
wvpResult.setData(streamInfo);
msg.setData(wvpResult);
@@ -236,7 +228,7 @@ public class PlayServiceImpl implements IPlayService {
}, event -> {
// sip error错误
WVPResult wvpResult = new WVPResult();
wvpResult.setCode(-1);
wvpResult.setCode(ErrorCode.ERROR100.getCode());
wvpResult.setMsg(String.format("点播失败, 错误码: %s, %s", event.statusCode, event.msg));
msg.setData(wvpResult);
resultHolder.invokeAllResult(msg);
@@ -246,7 +238,7 @@ public class PlayServiceImpl implements IPlayService {
}, (code, msgStr)->{
// invite点播超时
WVPResult wvpResult = new WVPResult();
wvpResult.setCode(-1);
wvpResult.setCode(ErrorCode.ERROR100.getCode());
if (code == 0) {
wvpResult.setMsg("点播超时,请稍候重试");
}else if (code == 1) {
@@ -386,15 +378,15 @@ public class PlayServiceImpl implements IPlayService {
redisCatchStorage.startPlay(streamInfo);
WVPResult wvpResult = new WVPResult();
wvpResult.setCode(0);
wvpResult.setMsg("success");
wvpResult.setCode(ErrorCode.SUCCESS.getCode());
wvpResult.setMsg(ErrorCode.SUCCESS.getMsg());
wvpResult.setData(streamInfo);
msg.setData(wvpResult);
resultHolder.invokeAllResult(msg);
} else {
logger.warn("设备预览API调用失败");
msg.setData("设备预览API调用失败");
msg.setData(WVPResult.fail(ErrorCode.ERROR100.getCode(), "设备预览API调用失败"));
resultHolder.invokeAllResult(msg);
}
}
@@ -418,7 +410,7 @@ public class PlayServiceImpl implements IPlayService {
}
@Override
public DeferredResult<ResponseEntity<String>> playBack(String deviceId, String channelId, String startTime,
public DeferredResult<String> playBack(String deviceId, String channelId, String startTime,
String endTime,InviteStreamCallback inviteStreamCallback,
PlayBackCallback callback) {
Device device = storager.queryVideoDevice(deviceId);
@@ -432,7 +424,7 @@ public class PlayServiceImpl implements IPlayService {
}
@Override
public DeferredResult<ResponseEntity<String>> playBack(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo,
public DeferredResult<String> playBack(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo,
String deviceId, String channelId, String startTime,
String endTime, InviteStreamCallback infoCallBack,
PlayBackCallback playBackCallback) {
@@ -441,24 +433,21 @@ public class PlayServiceImpl implements IPlayService {
}
String uuid = UUID.randomUUID().toString();
String key = DeferredResultHolder.CALLBACK_CMD_PLAYBACK + deviceId + channelId;
DeferredResult<ResponseEntity<String>> result = new DeferredResult<>(30000L);
Device device = storager.queryVideoDevice(deviceId);
if (device == null) {
result.setResult(new ResponseEntity<>(HttpStatus.BAD_REQUEST));
return result;
throw new ControllerException(ErrorCode.ERROR100.getCode(), "设备: " + deviceId + "不存在");
}
DeferredResult<String> result = new DeferredResult<>(30000L);
resultHolder.put(DeferredResultHolder.CALLBACK_CMD_PLAYBACK + deviceId + channelId, uuid, result);
RequestMessage msg = new RequestMessage();
msg.setId(uuid);
msg.setKey(key);
RequestMessage requestMessage = new RequestMessage();
requestMessage.setId(uuid);
requestMessage.setKey(key);
PlayBackResult<RequestMessage> playBackResult = new PlayBackResult<>();
String playBackTimeOutTaskKey = UUID.randomUUID().toString();
String playBackTimeOutTaskKey = UUID.randomUUID().toString();
dynamicTask.startDelay(playBackTimeOutTaskKey, ()->{
logger.warn(String.format("设备回放超时deviceId%s channelId%s", deviceId, channelId));
playBackResult.setCode(-1);
playBackResult.setData(msg);
playBackCallback.call(playBackResult);
playBackResult.setData(requestMessage);
SIPDialog dialog = streamSession.getDialogByStream(deviceId, channelId, ssrcInfo.getStream());
// 点播超时回复BYE 同时释放ssrc以及此次点播的资源
if (dialog != null) {
@@ -481,24 +470,23 @@ public class PlayServiceImpl implements IPlayService {
StreamInfo streamInfo = onPublishHandler(inviteStreamInfo.getMediaServerItem(), inviteStreamInfo.getResponse(), deviceId, channelId);
if (streamInfo == null) {
logger.warn("设备回放API调用失败");
msg.setData("设备回放API调用失败");
playBackResult.setCode(-1);
playBackResult.setData(msg);
playBackCallback.call(playBackResult);
return;
}
redisCatchStorage.startPlayback(streamInfo, inviteStreamInfo.getCallId());
msg.setData(JSON.toJSONString(streamInfo));
WVPResult<StreamInfo> success = WVPResult.success(streamInfo);
requestMessage.setData(success);
playBackResult.setCode(0);
playBackResult.setData(msg);
playBackResult.setData(requestMessage);
playBackResult.setMediaServerItem(inviteStreamInfo.getMediaServerItem());
playBackResult.setResponse(inviteStreamInfo.getResponse());
playBackCallback.call(playBackResult);
}, event -> {
dynamicTask.stop(playBackTimeOutTaskKey);
msg.setData(String.format("回放失败, 错误码: %s, %s", event.statusCode, event.msg));
requestMessage.setData(WVPResult.fail(ErrorCode.ERROR100.getCode(), String.format("回放失败, 错误码: %s, %s", event.statusCode, event.msg)));
playBackResult.setCode(-1);
playBackResult.setData(msg);
playBackResult.setData(requestMessage);
playBackResult.setEvent(event);
playBackCallback.call(playBackResult);
streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());
@@ -507,7 +495,7 @@ public class PlayServiceImpl implements IPlayService {
}
@Override
public DeferredResult<ResponseEntity<String>> download(String deviceId, String channelId, String startTime, String endTime, int downloadSpeed, InviteStreamCallback infoCallBack, PlayBackCallback hookCallBack) {
public DeferredResult<String> download(String deviceId, String channelId, String startTime, String endTime, int downloadSpeed, InviteStreamCallback infoCallBack, PlayBackCallback hookCallBack) {
Device device = storager.queryVideoDevice(deviceId);
if (device == null) {
return null;
@@ -519,32 +507,31 @@ public class PlayServiceImpl implements IPlayService {
}
@Override
public DeferredResult<ResponseEntity<String>> download(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, String deviceId, String channelId, String startTime, String endTime, int downloadSpeed, InviteStreamCallback infoCallBack, PlayBackCallback hookCallBack) {
public DeferredResult<String> download(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, String deviceId, String channelId, String startTime, String endTime, int downloadSpeed, InviteStreamCallback infoCallBack, PlayBackCallback hookCallBack) {
if (mediaServerItem == null || ssrcInfo == null) {
return null;
}
String uuid = UUID.randomUUID().toString();
String key = DeferredResultHolder.CALLBACK_CMD_DOWNLOAD + deviceId + channelId;
DeferredResult<ResponseEntity<String>> result = new DeferredResult<>(30000L);
DeferredResult<String> result = new DeferredResult<>(30000L);
Device device = storager.queryVideoDevice(deviceId);
if (device == null) {
result.setResult(new ResponseEntity<>(HttpStatus.BAD_REQUEST));
return result;
throw new ControllerException(ErrorCode.ERROR400.getCode(), "设备:" + deviceId + "不存在");
}
resultHolder.put(key, uuid, result);
RequestMessage msg = new RequestMessage();
msg.setId(uuid);
msg.setKey(key);
RequestMessage requestMessage = new RequestMessage();
requestMessage.setId(uuid);
requestMessage.setKey(key);
WVPResult<StreamInfo> wvpResult = new WVPResult<>();
msg.setData(wvpResult);
requestMessage.setData(wvpResult);
PlayBackResult<RequestMessage> downloadResult = new PlayBackResult<>();
downloadResult.setData(msg);
downloadResult.setData(requestMessage);
String downLoadTimeOutTaskKey = UUID.randomUUID().toString();
dynamicTask.startDelay(downLoadTimeOutTaskKey, ()->{
logger.warn(String.format("录像下载请求超时deviceId%s channelId%s", deviceId, channelId));
wvpResult.setCode(-1);
wvpResult.setCode(ErrorCode.ERROR100.getCode());
wvpResult.setMsg("录像下载请求超时");
downloadResult.setCode(-1);
hookCallBack.call(downloadResult);
@@ -578,8 +565,8 @@ public class PlayServiceImpl implements IPlayService {
return ;
}
redisCatchStorage.startDownload(streamInfo, inviteStreamInfo.getCallId());
wvpResult.setCode(0);
wvpResult.setMsg("success");
wvpResult.setCode(ErrorCode.SUCCESS.getCode());
wvpResult.setMsg(ErrorCode.SUCCESS.getMsg());
wvpResult.setData(streamInfo);
downloadResult.setCode(0);
downloadResult.setMediaServerItem(inviteStreamInfo.getMediaServerItem());
@@ -588,7 +575,7 @@ public class PlayServiceImpl implements IPlayService {
}, event -> {
dynamicTask.stop(downLoadTimeOutTaskKey);
downloadResult.setCode(-1);
wvpResult.setCode(-1);
wvpResult.setCode(ErrorCode.ERROR100.getCode());
wvpResult.setMsg(String.format("录像下载失败, 错误码: %s, %s", event.statusCode, event.msg));
downloadResult.setEvent(event);
hookCallBack.call(downloadResult);
@@ -649,7 +636,7 @@ public class PlayServiceImpl implements IPlayService {
resultHolder.invokeResult(msg);
} else {
logger.warn("设备预览API调用失败");
msg.setData("设备预览API调用失败");
msg.setData(WVPResult.fail(ErrorCode.ERROR100.getCode(), "设备预览API调用失败"));
resultHolder.invokeResult(msg);
}
}

View File

@@ -66,8 +66,6 @@ public class RedisGbPlayMsgListener implements MessageListener {
@Autowired
private UserSetting userSetting;
@Autowired
private RedisUtil redis;
@Autowired
private ZLMMediaListManager zlmMediaListManager;
@@ -227,7 +225,7 @@ public class RedisGbPlayMsgListener implements MessageListener {
WvpRedisMsg response = WvpRedisMsg.getResponseInstance(userSetting.getServerId(), toId,
WvpRedisMsgCmd.REQUEST_PUSH_STREAM, serial, result);
JSONObject jsonObject = (JSONObject)JSON.toJSON(response);
redis.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject);
RedisUtil.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject);
}
/**
@@ -246,7 +244,7 @@ public class RedisGbPlayMsgListener implements MessageListener {
WvpRedisMsgCmd.GET_SEND_ITEM, serial, result);
JSONObject jsonObject = (JSONObject)JSON.toJSON(response);
redis.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject);
RedisUtil.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject);
return;
}
// 确定流是否在线
@@ -269,7 +267,7 @@ public class RedisGbPlayMsgListener implements MessageListener {
userSetting.getServerId(), toId, WvpRedisMsgCmd.GET_SEND_ITEM, serial, result
);
JSONObject jsonObject = (JSONObject)JSON.toJSON(response);
redis.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject);
RedisUtil.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject);
}, userSetting.getPlatformPlayTimeout());
// 添加订阅
@@ -308,7 +306,7 @@ public class RedisGbPlayMsgListener implements MessageListener {
userSetting.getServerId(), toId, WvpRedisMsgCmd.GET_SEND_ITEM, serial, result
);
JSONObject jsonObject = (JSONObject)JSON.toJSON(response);
redis.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject);
RedisUtil.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject);
}
/**
@@ -345,7 +343,7 @@ public class RedisGbPlayMsgListener implements MessageListener {
wvpResult.setMsg("timeout");
errorCallback.handler(wvpResult);
}, userSetting.getPlatformPlayTimeout());
redis.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject);
RedisUtil.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject);
}
/**
@@ -370,6 +368,6 @@ public class RedisGbPlayMsgListener implements MessageListener {
callbacksForStartSendRtpStream.remove(key);
callbacksForError.remove(key);
});
redis.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject);
RedisUtil.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject);
}
}

View File

@@ -0,0 +1,83 @@
package com.genersoft.iot.vmp.service.impl;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
import com.genersoft.iot.vmp.service.IGbStreamService;
import com.genersoft.iot.vmp.service.IMediaServerService;
import com.genersoft.iot.vmp.service.IStreamPushService;
import com.genersoft.iot.vmp.utils.DateUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.*;
/**
* @Auther: JiangFeng
* @Date: 2022/8/16 11:32
* @Description: 接收redis发送的推流设备列表更新通知
*/
@Component
public class RedisPushStreamListMsgListener implements MessageListener {
private final static Logger logger = LoggerFactory.getLogger(RedisPushStreamListMsgListener.class);
@Resource
private IMediaServerService mediaServerService;
@Resource
private IStreamPushService streamPushService;
@Resource
private IGbStreamService gbStreamService;
@Override
public void onMessage(Message message, byte[] bytes) {
//
logger.warn("[REDIS消息-推流设备列表更新] {}", new String(message.getBody()));
List<StreamPushItem> streamPushItems = JSON.parseArray(new String(message.getBody()), StreamPushItem.class);
//查询全部的app+stream 用于判断是添加还是修改
List<String> allAppAndStream = streamPushService.getAllAppAndStream();
/**
* 用于存储更具APP+Stream过滤后的数据可以直接存入stream_push表与gb_stream表
*/
List<StreamPushItem> streamPushItemForSave = new ArrayList<>();
List<StreamPushItem> streamPushItemForUpdate = new ArrayList<>();
for (StreamPushItem streamPushItem : streamPushItems) {
String app = streamPushItem.getApp();
String stream = streamPushItem.getStream();
boolean contains = allAppAndStream.contains(app + stream);
//不存在就添加
if (!contains) {
streamPushItem.setStatus(false);
streamPushItem.setStreamType("push");
streamPushItem.setCreateTime(DateUtil.getNow());
streamPushItem.setMediaServerId(mediaServerService.getDefaultMediaServer().getId());
streamPushItem.setOriginType(2);
streamPushItem.setOriginTypeStr("rtsp_push");
streamPushItem.setTotalReaderCount("0");
streamPushItemForSave.add(streamPushItem);
} else {
//存在就只修改 name和gbId
streamPushItemForUpdate.add(streamPushItem);
}
}
if (streamPushItemForSave.size() > 0) {
logger.info("添加{}条",streamPushItemForSave.size());
logger.info(JSONObject.toJSONString(streamPushItemForSave));
streamPushService.batchAdd(streamPushItemForSave);
}
if(streamPushItemForUpdate.size()>0){
logger.info("修改{}条",streamPushItemForUpdate.size());
logger.info(JSONObject.toJSONString(streamPushItemForUpdate));
gbStreamService.updateGbIdOrName(streamPushItemForUpdate);
}
}
}

View File

@@ -26,15 +26,6 @@ public class RedisStreamMsgListener implements MessageListener {
private final static Logger logger = LoggerFactory.getLogger(RedisStreamMsgListener.class);
@Autowired
private ISIPCommander commander;
@Autowired
private ISIPCommanderForPlatform commanderForPlatform;
@Autowired
private IVideoManagerStorage storage;
@Autowired
private UserSetting userSetting;

View File

@@ -4,6 +4,7 @@ import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.exception.ControllerException;
import com.genersoft.iot.vmp.gb28181.bean.GbStream;
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
import com.genersoft.iot.vmp.gb28181.bean.TreeType;
@@ -24,6 +25,7 @@ import com.genersoft.iot.vmp.storager.dao.PlatformGbStreamMapper;
import com.genersoft.iot.vmp.storager.dao.StreamProxyMapper;
import com.genersoft.iot.vmp.service.IStreamProxyService;
import com.genersoft.iot.vmp.utils.DateUtil;
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
import com.github.pagehelper.PageInfo;
import org.slf4j.Logger;
@@ -33,6 +35,7 @@ import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.stereotype.Service;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.TransactionStatus;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;
import java.net.InetAddress;
@@ -93,10 +96,8 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
@Override
public WVPResult<StreamInfo> save(StreamProxyItem param) {
public StreamInfo save(StreamProxyItem param) {
MediaServerItem mediaInfo;
WVPResult<StreamInfo> wvpResult = new WVPResult<>();
wvpResult.setCode(0);
if (param.getMediaServerId() == null || "auto".equals(param.getMediaServerId())){
mediaInfo = mediaServerService.getMediaServerForMinimumLoad();
}else {
@@ -104,14 +105,12 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
}
if (mediaInfo == null) {
logger.warn("保存代理未找到在线的ZLM...");
wvpResult.setMsg("保存失败");
return wvpResult;
throw new ControllerException(ErrorCode.ERROR100.getCode(), "保存代理未找到在线的ZLM");
}
String dstUrl = String.format("rtmp://%s:%s/%s/%s", "127.0.0.1", mediaInfo.getRtmpPort(), param.getApp(),
param.getStream() );
param.setDst_url(dstUrl);
StringBuffer result = new StringBuffer();
StringBuffer resultMsg = new StringBuffer();
boolean streamLive = false;
param.setMediaServerId(mediaInfo.getId());
boolean saveResult;
@@ -121,43 +120,47 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
}else { // 新增
saveResult = addStreamProxy(param);
}
if (saveResult) {
result.append("保存成功");
if (param.isEnable()) {
JSONObject jsonObject = addStreamProxyToZlm(param);
if (jsonObject == null || jsonObject.getInteger("code") != 0) {
streamLive = false;
result.append(", 但是启用失败,请检查流地址是否可用");
param.setEnable(false);
// 直接移除
if (param.isEnable_remove_none_reader()) {
del(param.getApp(), param.getStream());
}else {
updateStreamProxy(param);
}
}else {
streamLive = true;
StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream(
mediaInfo, param.getApp(), param.getStream(), null, null);
wvpResult.setData(streamInfo);
}
}
}else {
result.append("保存失败");
if (!saveResult) {
throw new ControllerException(ErrorCode.ERROR100.getCode(),"保存失败");
}
if ( !StringUtils.isEmpty(param.getPlatformGbId()) && streamLive) {
StreamInfo resultForStreamInfo = null;
resultMsg.append("保存成功");
if (param.isEnable()) {
JSONObject jsonObject = addStreamProxyToZlm(param);
if (jsonObject == null || jsonObject.getInteger("code") != 0) {
streamLive = false;
resultMsg.append(", 但是启用失败,请检查流地址是否可用");
param.setEnable(false);
// 直接移除
if (param.isEnable_remove_none_reader()) {
del(param.getApp(), param.getStream());
}else {
updateStreamProxy(param);
}
}else {
streamLive = true;
resultForStreamInfo = mediaService.getStreamInfoByAppAndStream(
mediaInfo, param.getApp(), param.getStream(), null, null);
}
}
if ( !ObjectUtils.isEmpty(param.getPlatformGbId()) && streamLive) {
List<GbStream> gbStreams = new ArrayList<>();
gbStreams.add(param);
if (gbStreamService.addPlatformInfo(gbStreams, param.getPlatformGbId(), param.getCatalogId())){
result.append(", 关联国标平台[ " + param.getPlatformGbId() + " ]成功");
return resultForStreamInfo;
}else {
result.append(", 关联国标平台[ " + param.getPlatformGbId() + " ]失败");
resultMsg.append(", 关联国标平台[ " + param.getPlatformGbId() + " ]失败");
throw new ControllerException(ErrorCode.ERROR100.getCode(), resultMsg.toString());
}
}else {
if (!streamLive) {
throw new ControllerException(ErrorCode.ERROR100.getCode(), resultMsg.toString());
}
}
wvpResult.setMsg(result.toString());
return wvpResult;
return resultForStreamInfo;
}
/**
@@ -174,7 +177,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
streamProxyItem.setCreateTime(now);
try {
if (streamProxyMapper.add(streamProxyItem) > 0) {
if (!StringUtils.isEmpty(streamProxyItem.getGbId())) {
if (!ObjectUtils.isEmpty(streamProxyItem.getGbId())) {
if (gbStreamMapper.add(streamProxyItem) < 0) {
//事务回滚
dataSourceTransactionManager.rollback(transactionStatus);
@@ -209,7 +212,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
streamProxyItem.setStreamType("proxy");
try {
if (streamProxyMapper.update(streamProxyItem) > 0) {
if (!StringUtils.isEmpty(streamProxyItem.getGbId())) {
if (!ObjectUtils.isEmpty(streamProxyItem.getGbId())) {
if (gbStreamMapper.updateByAppAndStream(streamProxyItem) == 0) {
//事务回滚
dataSourceTransactionManager.rollback(transactionStatus);

View File

@@ -27,6 +27,7 @@ import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.stereotype.Service;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.TransactionStatus;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;
import java.util.*;
@@ -208,7 +209,7 @@ public class StreamPushServiceImpl implements IStreamPushService {
Map<String, MediaItem> streamInfoPushItemMap = new HashMap<>();
if (pushList.size() > 0) {
for (StreamPushItem streamPushItem : pushList) {
if (StringUtils.isEmpty(streamPushItem.getGbId())) {
if (ObjectUtils.isEmpty(streamPushItem.getGbId())) {
pushItemMap.put(streamPushItem.getApp() + streamPushItem.getStream(), streamPushItem);
}
}
@@ -340,6 +341,7 @@ public class StreamPushServiceImpl implements IStreamPushService {
gbStreamMapper.batchAdd(streamPushItems);
}
@Override
public void batchAddForUpload(List<StreamPushItem> streamPushItems, Map<String, List<String[]>> streamPushItemsForAll ) {
// 存储数据到stream_push表
@@ -491,7 +493,7 @@ public class StreamPushServiceImpl implements IStreamPushService {
TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition);
try {
int addStreamResult = streamPushMapper.add(stream);
if (!StringUtils.isEmpty(stream.getGbId())) {
if (!ObjectUtils.isEmpty(stream.getGbId())) {
stream.setStreamType("push");
gbStreamMapper.add(stream);
}
@@ -503,4 +505,9 @@ public class StreamPushServiceImpl implements IStreamPushService {
}
return result;
}
@Override
public List<String> getAllAppAndStream() {
return streamPushMapper.getAllAppAndStream();
}
}

View File

@@ -8,6 +8,7 @@ import com.genersoft.iot.vmp.utils.DateUtil;
import com.genersoft.iot.vmp.vmanager.bean.StreamPushExcelDto;
import com.google.common.collect.BiMap;
import com.google.common.collect.HashBiMap;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;
import java.util.*;
@@ -82,9 +83,9 @@ public class StreamPushUploadFileHandler extends AnalysisEventListener<StreamPus
@Override
public void invoke(StreamPushExcelDto streamPushExcelDto, AnalysisContext analysisContext) {
if (StringUtils.isEmpty(streamPushExcelDto.getApp())
|| StringUtils.isEmpty(streamPushExcelDto.getStream())
|| StringUtils.isEmpty(streamPushExcelDto.getGbId())) {
if (ObjectUtils.isEmpty(streamPushExcelDto.getApp())
|| ObjectUtils.isEmpty(streamPushExcelDto.getStream())
|| ObjectUtils.isEmpty(streamPushExcelDto.getGbId())) {
return;
}
@@ -130,7 +131,7 @@ public class StreamPushUploadFileHandler extends AnalysisEventListener<StreamPus
streamPushItems.add(streamPushItem);
streamPushItemForSave.put(streamPushItem.getApp() + streamPushItem.getStream(), streamPushItem);
if (!StringUtils.isEmpty(streamPushExcelDto.getPlatformId())) {
if (!ObjectUtils.isEmpty(streamPushExcelDto.getPlatformId())) {
List<String[]> platformList = streamPushItemsForPlatform.get(streamPushItem.getApp() + streamPushItem.getStream());
if (platformList == null) {
platformList = new ArrayList<>();
@@ -138,7 +139,7 @@ public class StreamPushUploadFileHandler extends AnalysisEventListener<StreamPus
}
String platformId = streamPushExcelDto.getPlatformId();
String catalogId = streamPushExcelDto.getCatalogId();
if (StringUtils.isEmpty(streamPushExcelDto.getCatalogId())) {
if (ObjectUtils.isEmpty(streamPushExcelDto.getCatalogId())) {
catalogId = null;
}
String[] platFormInfoArray = new String[]{platformId, catalogId};

View File

@@ -7,6 +7,7 @@ import com.github.pagehelper.PageHelper;
import com.github.pagehelper.PageInfo;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;
import java.util.List;
@@ -60,7 +61,7 @@ public class UserServiceImpl implements IUserService {
@Override
public boolean checkPushAuthority(String callId, String sign) {
if (StringUtils.isEmpty(callId)) {
if (ObjectUtils.isEmpty(callId)) {
return userMapper.checkPushAuthorityByCallId(sign).size() > 0;
}else {
return userMapper.checkPushAuthorityByCallIdAndSign(callId, sign).size() > 0;