临时提交
This commit is contained in:
@@ -3,7 +3,7 @@ package com.genersoft.iot.vmp.service;
|
||||
import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
|
||||
import com.genersoft.iot.vmp.gb28181.bean.GbStream;
|
||||
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
|
||||
import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
|
||||
import com.genersoft.iot.vmp.media.zlm.dto.StreamPush;
|
||||
import com.github.pagehelper.PageInfo;
|
||||
|
||||
import java.util.List;
|
||||
@@ -53,7 +53,7 @@ public interface IGbStreamService {
|
||||
* @param streamPushItemForUpdate
|
||||
* @return
|
||||
*/
|
||||
int updateGbIdOrName(List<StreamPushItem> streamPushItemForUpdate);
|
||||
int updateGbIdOrName(List<StreamPush> streamPushItemForUpdate);
|
||||
|
||||
DeviceChannel getDeviceChannelListByStreamWithStatus(GbStream gbStream, String catalogId, ParentPlatform platform);
|
||||
|
||||
|
||||
@@ -3,7 +3,7 @@ package com.genersoft.iot.vmp.service;
|
||||
import com.genersoft.iot.vmp.common.GeneralCallback;
|
||||
import com.genersoft.iot.vmp.common.StreamInfo;
|
||||
import com.genersoft.iot.vmp.media.bean.MediaServer;
|
||||
import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem;
|
||||
import com.genersoft.iot.vmp.media.zlm.dto.StreamProxy;
|
||||
import com.genersoft.iot.vmp.vmanager.bean.ResourceBaseInfo;
|
||||
import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
|
||||
import com.github.pagehelper.PageInfo;
|
||||
@@ -16,7 +16,7 @@ public interface IStreamProxyService {
|
||||
* 保存视频代理
|
||||
* @param param
|
||||
*/
|
||||
void save(StreamProxyItem param, GeneralCallback<StreamInfo> callback);
|
||||
void save(StreamProxy param, GeneralCallback<StreamInfo> callback);
|
||||
|
||||
/**
|
||||
* 添加视频代理到zlm
|
||||
@@ -24,7 +24,7 @@ public interface IStreamProxyService {
|
||||
* @param param
|
||||
* @return
|
||||
*/
|
||||
WVPResult<String> addStreamProxyToZlm(StreamProxyItem param);
|
||||
WVPResult<String> addStreamProxyToZlm(StreamProxy param);
|
||||
|
||||
/**
|
||||
* 从zlm移除视频代理
|
||||
@@ -32,7 +32,7 @@ public interface IStreamProxyService {
|
||||
* @param param
|
||||
* @return
|
||||
*/
|
||||
Boolean removeStreamProxyFromZlm(StreamProxyItem param);
|
||||
Boolean removeStreamProxyFromZlm(StreamProxy param);
|
||||
|
||||
/**
|
||||
* 分页查询
|
||||
@@ -40,7 +40,7 @@ public interface IStreamProxyService {
|
||||
* @param count
|
||||
* @return
|
||||
*/
|
||||
PageInfo<StreamProxyItem> getAll(Integer page, Integer count);
|
||||
PageInfo<StreamProxy> getAll(Integer page, Integer count);
|
||||
|
||||
/**
|
||||
* 删除视频代理
|
||||
@@ -86,7 +86,7 @@ public interface IStreamProxyService {
|
||||
* 根据app与stream获取streamProxy
|
||||
* @return
|
||||
*/
|
||||
StreamProxyItem getStreamProxyByAppAndStream(String app, String streamId);
|
||||
StreamProxy getStreamProxyByAppAndStream(String app, String streamId);
|
||||
|
||||
|
||||
/**
|
||||
@@ -108,7 +108,7 @@ public interface IStreamProxyService {
|
||||
/**
|
||||
* 更新代理流
|
||||
*/
|
||||
boolean updateStreamProxy(StreamProxyItem streamProxyItem);
|
||||
boolean updateStreamProxy(StreamProxy streamProxyItem);
|
||||
|
||||
/**
|
||||
* 获取统计信息
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
package com.genersoft.iot.vmp.service;
|
||||
|
||||
import com.genersoft.iot.vmp.gb28181.bean.GbStream;
|
||||
import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
|
||||
import com.genersoft.iot.vmp.media.zlm.dto.StreamPush;
|
||||
import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam;
|
||||
import com.genersoft.iot.vmp.service.bean.StreamPushItemFromRedis;
|
||||
import com.genersoft.iot.vmp.vmanager.bean.ResourceBaseInfo;
|
||||
@@ -32,13 +32,13 @@ public interface IStreamPushService {
|
||||
/**
|
||||
* 获取
|
||||
*/
|
||||
PageInfo<StreamPushItem> getPushList(Integer page, Integer count, String query, Boolean pushing, String mediaServerId);
|
||||
PageInfo<StreamPush> getPushList(Integer page, Integer count, String query, Boolean pushing, String mediaServerId);
|
||||
|
||||
List<StreamPushItem> getPushList(String mediaSererId);
|
||||
List<StreamPush> getPushList(String mediaSererId);
|
||||
|
||||
StreamPushItem transform(OnStreamChangedHookParam item);
|
||||
StreamPush transform(OnStreamChangedHookParam item);
|
||||
|
||||
StreamPushItem getPush(String app, String streamId);
|
||||
StreamPush getPush(String app, String streamId);
|
||||
|
||||
/**
|
||||
* 停止一路推流
|
||||
@@ -68,7 +68,7 @@ public interface IStreamPushService {
|
||||
/**
|
||||
* 批量添加
|
||||
*/
|
||||
void batchAdd(List<StreamPushItem> streamPushExcelDtoList);
|
||||
void batchAdd(List<StreamPush> streamPushExcelDtoList);
|
||||
|
||||
/**
|
||||
* 中止多个推流
|
||||
@@ -78,7 +78,7 @@ public interface IStreamPushService {
|
||||
/**
|
||||
* 导入时批量增加
|
||||
*/
|
||||
void batchAddForUpload(List<StreamPushItem> streamPushItems, Map<String, List<String[]>> streamPushItemsForAll);
|
||||
void batchAddForUpload(List<StreamPush> streamPushItems, Map<String, List<String[]>> streamPushItemsForAll);
|
||||
|
||||
/**
|
||||
* 全部离线
|
||||
@@ -98,7 +98,7 @@ public interface IStreamPushService {
|
||||
/**
|
||||
* 增加推流
|
||||
*/
|
||||
boolean add(StreamPushItem stream);
|
||||
boolean add(StreamPush stream);
|
||||
|
||||
/**
|
||||
* 获取全部的app+Streanm 用于判断推流列表是新增还是修改
|
||||
@@ -112,7 +112,7 @@ public interface IStreamPushService {
|
||||
*/
|
||||
ResourceBaseInfo getOverview();
|
||||
|
||||
Map<String, StreamPushItem> getAllAppAndStreamMap();
|
||||
Map<String, StreamPush> getAllAppAndStreamMap();
|
||||
|
||||
|
||||
void updatePush(OnStreamChangedHookParam param);
|
||||
|
||||
@@ -2,7 +2,6 @@ package com.genersoft.iot.vmp.service.impl;
|
||||
|
||||
import com.baomidou.dynamic.datasource.annotation.DS;
|
||||
import com.genersoft.iot.vmp.gb28181.bean.DeviceAlarm;
|
||||
import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem;
|
||||
import com.genersoft.iot.vmp.service.IDeviceAlarmService;
|
||||
import com.genersoft.iot.vmp.storager.dao.DeviceAlarmMapper;
|
||||
import com.github.pagehelper.PageHelper;
|
||||
|
||||
@@ -4,7 +4,7 @@ import com.baomidou.dynamic.datasource.annotation.DS;
|
||||
import com.genersoft.iot.vmp.gb28181.bean.*;
|
||||
import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
|
||||
import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent;
|
||||
import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
|
||||
import com.genersoft.iot.vmp.media.zlm.dto.StreamPush;
|
||||
import com.genersoft.iot.vmp.service.IGbStreamService;
|
||||
import com.genersoft.iot.vmp.storager.dao.GbStreamMapper;
|
||||
import com.genersoft.iot.vmp.storager.dao.ParentPlatformMapper;
|
||||
@@ -194,7 +194,7 @@ public class GbStreamServiceImpl implements IGbStreamService {
|
||||
}
|
||||
|
||||
@Override
|
||||
public int updateGbIdOrName(List<StreamPushItem> streamPushItemForUpdate) {
|
||||
public int updateGbIdOrName(List<StreamPush> streamPushItemForUpdate) {
|
||||
return gbStreamMapper.updateGbIdOrName(streamPushItemForUpdate);
|
||||
}
|
||||
|
||||
|
||||
@@ -13,7 +13,7 @@ import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
|
||||
import com.genersoft.iot.vmp.media.bean.MediaServer;
|
||||
import com.genersoft.iot.vmp.media.bean.ResultForOnPublish;
|
||||
import com.genersoft.iot.vmp.media.zlm.dto.StreamAuthorityInfo;
|
||||
import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem;
|
||||
import com.genersoft.iot.vmp.media.zlm.dto.StreamProxy;
|
||||
import com.genersoft.iot.vmp.service.*;
|
||||
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
|
||||
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
|
||||
@@ -91,7 +91,7 @@ public class MediaServiceImpl implements IMediaService {
|
||||
public ResultForOnPublish authenticatePublish(MediaServer mediaServer, String app, String stream, String params) {
|
||||
// 推流鉴权的处理
|
||||
if (!"rtp".equals(app)) {
|
||||
StreamProxyItem streamProxyItem = streamProxyService.getStreamProxyByAppAndStream(app, stream);
|
||||
StreamProxy streamProxyItem = streamProxyService.getStreamProxyByAppAndStream(app, stream);
|
||||
if (streamProxyItem != null) {
|
||||
ResultForOnPublish result = new ResultForOnPublish();
|
||||
result.setEnable_audio(streamProxyItem.isEnableAudio());
|
||||
@@ -277,7 +277,7 @@ public class MediaServiceImpl implements IMediaService {
|
||||
} else {
|
||||
// 非国标流 推流/拉流代理
|
||||
// 拉流代理
|
||||
StreamProxyItem streamProxyItem = streamProxyService.getStreamProxyByAppAndStream(app, stream);
|
||||
StreamProxy streamProxyItem = streamProxyService.getStreamProxyByAppAndStream(app, stream);
|
||||
if (streamProxyItem != null) {
|
||||
if (streamProxyItem.isEnableRemoveNoneReader()) {
|
||||
// 无人观看自动移除
|
||||
|
||||
@@ -7,7 +7,6 @@ import com.genersoft.iot.vmp.common.StreamInfo;
|
||||
import com.genersoft.iot.vmp.conf.DynamicTask;
|
||||
import com.genersoft.iot.vmp.conf.UserSetting;
|
||||
import com.genersoft.iot.vmp.conf.exception.ControllerException;
|
||||
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
|
||||
import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent;
|
||||
import com.genersoft.iot.vmp.media.bean.MediaInfo;
|
||||
import com.genersoft.iot.vmp.media.bean.MediaServer;
|
||||
@@ -18,7 +17,7 @@ import com.genersoft.iot.vmp.media.event.media.MediaArrivalEvent;
|
||||
import com.genersoft.iot.vmp.media.event.media.MediaDepartureEvent;
|
||||
import com.genersoft.iot.vmp.media.event.media.MediaNotFoundEvent;
|
||||
import com.genersoft.iot.vmp.media.service.IMediaServerService;
|
||||
import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem;
|
||||
import com.genersoft.iot.vmp.media.zlm.dto.StreamProxy;
|
||||
import com.genersoft.iot.vmp.service.IGbStreamService;
|
||||
import com.genersoft.iot.vmp.service.IStreamProxyService;
|
||||
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
|
||||
@@ -131,7 +130,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
|
||||
return;
|
||||
}
|
||||
// 拉流代理
|
||||
StreamProxyItem streamProxyByAppAndStream = getStreamProxyByAppAndStream(event.getApp(), event.getStream());
|
||||
StreamProxy streamProxyByAppAndStream = getStreamProxyByAppAndStream(event.getApp(), event.getStream());
|
||||
if (streamProxyByAppAndStream != null && streamProxyByAppAndStream.isEnableDisableNoneReader()) {
|
||||
start(event.getApp(), event.getStream());
|
||||
}
|
||||
@@ -139,7 +138,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
|
||||
|
||||
|
||||
@Override
|
||||
public void save(StreamProxyItem param, GeneralCallback<StreamInfo> callback) {
|
||||
public void save(StreamProxy param, GeneralCallback<StreamInfo> callback) {
|
||||
MediaServer mediaServer;
|
||||
if (ObjectUtils.isEmpty(param.getMediaServerId()) || "auto".equals(param.getMediaServerId())){
|
||||
mediaServer = mediaServerService.getMediaServerForMinimumLoad(null);
|
||||
@@ -266,7 +265,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
|
||||
* @param streamProxyItem
|
||||
* @return
|
||||
*/
|
||||
private boolean addStreamProxy(StreamProxyItem streamProxyItem) {
|
||||
private boolean addStreamProxy(StreamProxy streamProxyItem) {
|
||||
TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition);
|
||||
boolean result = false;
|
||||
streamProxyItem.setStreamType("proxy");
|
||||
@@ -304,7 +303,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
|
||||
* @return
|
||||
*/
|
||||
@Override
|
||||
public boolean updateStreamProxy(StreamProxyItem streamProxyItem) {
|
||||
public boolean updateStreamProxy(StreamProxy streamProxyItem) {
|
||||
TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition);
|
||||
boolean result = false;
|
||||
streamProxyItem.setStreamType("proxy");
|
||||
@@ -333,7 +332,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
|
||||
}
|
||||
|
||||
@Override
|
||||
public WVPResult<String> addStreamProxyToZlm(StreamProxyItem param) {
|
||||
public WVPResult<String> addStreamProxyToZlm(StreamProxy param) {
|
||||
WVPResult<String> result = null;
|
||||
MediaServer mediaServer = null;
|
||||
if (param.getMediaServerId() == null) {
|
||||
@@ -373,7 +372,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
|
||||
}
|
||||
|
||||
@Override
|
||||
public Boolean removeStreamProxyFromZlm(StreamProxyItem param) {
|
||||
public Boolean removeStreamProxyFromZlm(StreamProxy param) {
|
||||
if (param ==null) {
|
||||
return null;
|
||||
}
|
||||
@@ -395,13 +394,13 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
|
||||
}
|
||||
|
||||
@Override
|
||||
public PageInfo<StreamProxyItem> getAll(Integer page, Integer count) {
|
||||
public PageInfo<StreamProxy> getAll(Integer page, Integer count) {
|
||||
return videoManagerStorager.queryStreamProxyList(page, count);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void del(String app, String stream) {
|
||||
StreamProxyItem streamProxyItem = videoManagerStorager.queryStreamProxy(app, stream);
|
||||
StreamProxy streamProxyItem = videoManagerStorager.queryStreamProxy(app, stream);
|
||||
if (streamProxyItem != null) {
|
||||
gbStreamService.sendCatalogMsg(streamProxyItem, CatalogEvent.DEL);
|
||||
|
||||
@@ -423,7 +422,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
|
||||
@Override
|
||||
public boolean start(String app, String stream) {
|
||||
boolean result = false;
|
||||
StreamProxyItem streamProxy = videoManagerStorager.queryStreamProxy(app, stream);
|
||||
StreamProxy streamProxy = videoManagerStorager.queryStreamProxy(app, stream);
|
||||
if (streamProxy != null && !streamProxy.isEnable() ) {
|
||||
WVPResult<String> wvpResult = addStreamProxyToZlm(streamProxy);
|
||||
if (wvpResult == null) {
|
||||
@@ -446,7 +445,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
|
||||
@Override
|
||||
public boolean stop(String app, String stream) {
|
||||
boolean result = false;
|
||||
StreamProxyItem streamProxyDto = videoManagerStorager.queryStreamProxy(app, stream);
|
||||
StreamProxy streamProxyDto = videoManagerStorager.queryStreamProxy(app, stream);
|
||||
if (streamProxyDto != null && streamProxyDto.isEnable()) {
|
||||
Boolean removed = removeStreamProxyFromZlm(streamProxyDto);
|
||||
if (removed != null && removed) {
|
||||
@@ -464,14 +463,14 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
|
||||
|
||||
|
||||
@Override
|
||||
public StreamProxyItem getStreamProxyByAppAndStream(String app, String streamId) {
|
||||
public StreamProxy getStreamProxyByAppAndStream(String app, String streamId) {
|
||||
return videoManagerStorager.getStreamProxyByAppAndStream(app, streamId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void zlmServerOnline(String mediaServerId) {
|
||||
// 移除开启了无人观看自动移除的流
|
||||
List<StreamProxyItem> streamProxyItemList = streamProxyMapper.selectAutoRemoveItemByMediaServerId(mediaServerId);
|
||||
List<StreamProxy> streamProxyItemList = streamProxyMapper.selectAutoRemoveItemByMediaServerId(mediaServerId);
|
||||
if (streamProxyItemList.size() > 0) {
|
||||
gbStreamMapper.batchDel(streamProxyItemList);
|
||||
}
|
||||
@@ -481,9 +480,9 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
|
||||
syncPullStream(mediaServerId);
|
||||
|
||||
// 恢复流代理, 只查找这个这个流媒体
|
||||
List<StreamProxyItem> streamProxyListForEnable = storager.getStreamProxyListForEnableInMediaServer(
|
||||
List<StreamProxy> streamProxyListForEnable = storager.getStreamProxyListForEnableInMediaServer(
|
||||
mediaServerId, true);
|
||||
for (StreamProxyItem streamProxyDto : streamProxyListForEnable) {
|
||||
for (StreamProxy streamProxyDto : streamProxyListForEnable) {
|
||||
logger.info("恢复流代理," + streamProxyDto.getApp() + "/" + streamProxyDto.getStream());
|
||||
WVPResult<String> wvpResult = addStreamProxyToZlm(streamProxyDto);
|
||||
if (wvpResult == null) {
|
||||
@@ -499,7 +498,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
|
||||
@Override
|
||||
public void zlmServerOffline(String mediaServerId) {
|
||||
// 移除开启了无人观看自动移除的流
|
||||
List<StreamProxyItem> streamProxyItemList = streamProxyMapper.selectAutoRemoveItemByMediaServerId(mediaServerId);
|
||||
List<StreamProxy> streamProxyItemList = streamProxyMapper.selectAutoRemoveItemByMediaServerId(mediaServerId);
|
||||
if (streamProxyItemList.size() > 0) {
|
||||
gbStreamMapper.batchDel(streamProxyItemList);
|
||||
}
|
||||
@@ -533,7 +532,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
|
||||
@Override
|
||||
public int updateStatus(boolean status, String app, String stream) {
|
||||
// 状态变化时推送到国标上级
|
||||
StreamProxyItem streamProxyItem = streamProxyMapper.selectOne(app, stream);
|
||||
StreamProxy streamProxyItem = streamProxyMapper.selectOne(app, stream);
|
||||
if (streamProxyItem == null) {
|
||||
return 0;
|
||||
}
|
||||
@@ -592,13 +591,13 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
|
||||
|
||||
Map<String, MediaServer> serverItemMap = all.stream().collect(Collectors.toMap(MediaServer::getId, Function.identity(), (m1, m2) -> m1));
|
||||
|
||||
List<StreamProxyItem> list = videoManagerStorager.getStreamProxyListForEnable(true);
|
||||
List<StreamProxy> list = videoManagerStorager.getStreamProxyListForEnable(true);
|
||||
|
||||
if (CollectionUtils.isEmpty(list)){
|
||||
return;
|
||||
}
|
||||
|
||||
for (StreamProxyItem streamProxyItem : list) {
|
||||
for (StreamProxy streamProxyItem : list) {
|
||||
|
||||
MediaServer mediaServerItem = serverItemMap.get(streamProxyItem.getMediaServerId());
|
||||
|
||||
|
||||
@@ -16,7 +16,7 @@ import com.genersoft.iot.vmp.media.event.media.MediaDepartureEvent;
|
||||
import com.genersoft.iot.vmp.media.service.IMediaServerService;
|
||||
import com.genersoft.iot.vmp.media.bean.MediaServer;
|
||||
import com.genersoft.iot.vmp.media.zlm.dto.StreamAuthorityInfo;
|
||||
import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
|
||||
import com.genersoft.iot.vmp.media.zlm.dto.StreamPush;
|
||||
import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam;
|
||||
import com.genersoft.iot.vmp.media.zlm.dto.hook.OriginType;
|
||||
import com.genersoft.iot.vmp.service.IGbStreamService;
|
||||
@@ -113,12 +113,12 @@ public class StreamPushServiceImpl implements IStreamPushService {
|
||||
streamAuthorityInfo.setOriginType(mediaInfo.getOriginType());
|
||||
}
|
||||
redisCatchStorage.updateStreamAuthorityInfo(event.getApp(), event.getStream(), streamAuthorityInfo);
|
||||
StreamPushItem transform = StreamPushItem.getInstance(event, userSetting.getServerId());
|
||||
StreamPush transform = StreamPush.getInstance(event, userSetting.getServerId());
|
||||
transform.setPushIng(true);
|
||||
transform.setUpdateTime(DateUtil.getNow());
|
||||
transform.setPushTime(DateUtil.getNow());
|
||||
transform.setSelf(true);
|
||||
StreamPushItem pushInDb = getPush(event.getApp(), event.getStream());
|
||||
StreamPush pushInDb = getPush(event.getApp(), event.getStream());
|
||||
if (pushInDb == null) {
|
||||
transform.setCreateTime(DateUtil.getNow());
|
||||
streamPushMapper.add(transform);
|
||||
@@ -188,18 +188,18 @@ public class StreamPushServiceImpl implements IStreamPushService {
|
||||
}
|
||||
|
||||
|
||||
private List<StreamPushItem> handleJSON(List<StreamInfo> streamInfoList) {
|
||||
private List<StreamPush> handleJSON(List<StreamInfo> streamInfoList) {
|
||||
if (streamInfoList == null || streamInfoList.isEmpty()) {
|
||||
return null;
|
||||
}
|
||||
Map<String, StreamPushItem> result = new HashMap<>();
|
||||
Map<String, StreamPush> result = new HashMap<>();
|
||||
for (StreamInfo streamInfo : streamInfoList) {
|
||||
// 不保存国标推理以及拉流代理的流
|
||||
if (streamInfo.getOriginType() == OriginType.RTSP_PUSH.ordinal()
|
||||
|| streamInfo.getOriginType() == OriginType.RTMP_PUSH.ordinal()
|
||||
|| streamInfo.getOriginType() == OriginType.RTC_PUSH.ordinal() ) {
|
||||
String key = streamInfo.getApp() + "_" + streamInfo.getStream();
|
||||
StreamPushItem streamPushItem = result.get(key);
|
||||
StreamPush streamPushItem = result.get(key);
|
||||
if (streamPushItem == null) {
|
||||
streamPushItem = streamPushItem.getInstance(streamInfo);
|
||||
result.put(key, streamPushItem);
|
||||
@@ -210,35 +210,26 @@ public class StreamPushServiceImpl implements IStreamPushService {
|
||||
}
|
||||
|
||||
@Override
|
||||
public StreamPushItem transform(OnStreamChangedHookParam item) {
|
||||
StreamPushItem streamPushItem = new StreamPushItem();
|
||||
public StreamPush transform(OnStreamChangedHookParam item) {
|
||||
StreamPush streamPushItem = new StreamPush();
|
||||
streamPushItem.setApp(item.getApp());
|
||||
streamPushItem.setMediaServerId(item.getMediaServerId());
|
||||
streamPushItem.setStream(item.getStream());
|
||||
streamPushItem.setAliveSecond(item.getAliveSecond());
|
||||
streamPushItem.setOriginSock(item.getOriginSock());
|
||||
streamPushItem.setTotalReaderCount(item.getTotalReaderCount());
|
||||
streamPushItem.setOriginType(item.getOriginType());
|
||||
streamPushItem.setOriginTypeStr(item.getOriginTypeStr());
|
||||
streamPushItem.setOriginUrl(item.getOriginUrl());
|
||||
streamPushItem.setCreateTime(DateUtil.getNow());
|
||||
streamPushItem.setAliveSecond(item.getAliveSecond());
|
||||
streamPushItem.setStatus(true);
|
||||
streamPushItem.setStreamType("push");
|
||||
streamPushItem.setVhost(item.getVhost());
|
||||
streamPushItem.setServerId(item.getSeverId());
|
||||
return streamPushItem;
|
||||
}
|
||||
|
||||
@Override
|
||||
public PageInfo<StreamPushItem> getPushList(Integer page, Integer count, String query, Boolean pushing, String mediaServerId) {
|
||||
public PageInfo<StreamPush> getPushList(Integer page, Integer count, String query, Boolean pushing, String mediaServerId) {
|
||||
PageHelper.startPage(page, count);
|
||||
List<StreamPushItem> all = streamPushMapper.selectAllForList(query, pushing, mediaServerId);
|
||||
List<StreamPush> all = streamPushMapper.selectAllForList(query, pushing, mediaServerId);
|
||||
return new PageInfo<>(all);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<StreamPushItem> getPushList(String mediaServerId) {
|
||||
public List<StreamPush> getPushList(String mediaServerId) {
|
||||
return streamPushMapper.selectAllByMediaServerIdWithOutGbID(mediaServerId);
|
||||
}
|
||||
|
||||
@@ -269,14 +260,14 @@ public class StreamPushServiceImpl implements IStreamPushService {
|
||||
|
||||
|
||||
@Override
|
||||
public StreamPushItem getPush(String app, String streamId) {
|
||||
public StreamPush getPush(String app, String streamId) {
|
||||
return streamPushMapper.selectOne(app, streamId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean stop(String app, String stream) {
|
||||
logger.info("[推流 ] 停止流: {}/{}", app, stream);
|
||||
StreamPushItem streamPushItem = streamPushMapper.selectOne(app, stream);
|
||||
StreamPush streamPushItem = streamPushMapper.selectOne(app, stream);
|
||||
if (streamPushItem != null) {
|
||||
gbStreamService.sendCatalogMsg(streamPushItem, CatalogEvent.DEL);
|
||||
}
|
||||
@@ -299,13 +290,13 @@ public class StreamPushServiceImpl implements IStreamPushService {
|
||||
return;
|
||||
}
|
||||
// 数据库记录
|
||||
List<StreamPushItem> pushList = getPushList(mediaServerId);
|
||||
Map<String, StreamPushItem> pushItemMap = new HashMap<>();
|
||||
List<StreamPush> pushList = getPushList(mediaServerId);
|
||||
Map<String, StreamPush> pushItemMap = new HashMap<>();
|
||||
// redis记录
|
||||
List<MediaInfo> mediaInfoList = redisCatchStorage.getStreams(mediaServerId, "PUSH");
|
||||
Map<String, MediaInfo> streamInfoPushItemMap = new HashMap<>();
|
||||
if (pushList.size() > 0) {
|
||||
for (StreamPushItem streamPushItem : pushList) {
|
||||
for (StreamPush streamPushItem : pushList) {
|
||||
if (ObjectUtils.isEmpty(streamPushItem.getGbId())) {
|
||||
pushItemMap.put(streamPushItem.getApp() + streamPushItem.getStream(), streamPushItem);
|
||||
}
|
||||
@@ -326,15 +317,15 @@ public class StreamPushServiceImpl implements IStreamPushService {
|
||||
if (mediaList == null) {
|
||||
return;
|
||||
}
|
||||
List<StreamPushItem> streamPushItems = handleJSON(mediaList);
|
||||
List<StreamPush> streamPushItems = handleJSON(mediaList);
|
||||
if (streamPushItems != null) {
|
||||
for (StreamPushItem streamPushItem : streamPushItems) {
|
||||
for (StreamPush streamPushItem : streamPushItems) {
|
||||
pushItemMap.remove(streamPushItem.getApp() + streamPushItem.getStream());
|
||||
streamInfoPushItemMap.remove(streamPushItem.getApp() + streamPushItem.getStream());
|
||||
streamAuthorityInfoInfoMap.remove(streamPushItem.getApp() + streamPushItem.getStream());
|
||||
}
|
||||
}
|
||||
List<StreamPushItem> offlinePushItems = new ArrayList<>(pushItemMap.values());
|
||||
List<StreamPush> offlinePushItems = new ArrayList<>(pushItemMap.values());
|
||||
if (offlinePushItems.size() > 0) {
|
||||
String type = "PUSH";
|
||||
int runLimit = 300;
|
||||
@@ -344,7 +335,7 @@ public class StreamPushServiceImpl implements IStreamPushService {
|
||||
if (i + runLimit > offlinePushItems.size()) {
|
||||
toIndex = offlinePushItems.size();
|
||||
}
|
||||
List<StreamPushItem> streamPushItemsSub = offlinePushItems.subList(i, toIndex);
|
||||
List<StreamPush> streamPushItemsSub = offlinePushItems.subList(i, toIndex);
|
||||
streamPushMapper.delAll(streamPushItemsSub);
|
||||
}
|
||||
}else {
|
||||
@@ -381,7 +372,7 @@ public class StreamPushServiceImpl implements IStreamPushService {
|
||||
|
||||
@Override
|
||||
public void zlmServerOffline(String mediaServerId) {
|
||||
List<StreamPushItem> streamPushItems = streamPushMapper.selectAllByMediaServerIdWithOutGbID(mediaServerId);
|
||||
List<StreamPush> streamPushItems = streamPushMapper.selectAllByMediaServerIdWithOutGbID(mediaServerId);
|
||||
// 移除没有GBId的推流
|
||||
streamPushMapper.deleteWithoutGBId(mediaServerId);
|
||||
gbStreamMapper.deleteWithoutGBId("push", mediaServerId);
|
||||
@@ -417,9 +408,9 @@ public class StreamPushServiceImpl implements IStreamPushService {
|
||||
|
||||
@Override
|
||||
public boolean saveToRandomGB() {
|
||||
List<StreamPushItem> streamPushItems = streamPushMapper.selectAll();
|
||||
List<StreamPush> streamPushItems = streamPushMapper.selectAll();
|
||||
long gbId = 100001;
|
||||
for (StreamPushItem streamPushItem : streamPushItems) {
|
||||
for (StreamPush streamPushItem : streamPushItems) {
|
||||
streamPushItem.setStreamType("push");
|
||||
streamPushItem.setStatus(true);
|
||||
streamPushItem.setGbId("34020000004111" + gbId);
|
||||
@@ -443,17 +434,17 @@ public class StreamPushServiceImpl implements IStreamPushService {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void batchAdd(List<StreamPushItem> streamPushItems) {
|
||||
public void batchAdd(List<StreamPush> streamPushItems) {
|
||||
streamPushMapper.addAll(streamPushItems);
|
||||
gbStreamMapper.batchAdd(streamPushItems);
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void batchAddForUpload(List<StreamPushItem> streamPushItems, Map<String, List<String[]>> streamPushItemsForAll ) {
|
||||
public void batchAddForUpload(List<StreamPush> streamPushItems, Map<String, List<String[]>> streamPushItemsForAll ) {
|
||||
// 存储数据到stream_push表
|
||||
streamPushMapper.addAll(streamPushItems);
|
||||
List<StreamPushItem> streamPushItemForGbStream = streamPushItems.stream()
|
||||
List<StreamPush> streamPushItemForGbStream = streamPushItems.stream()
|
||||
.filter(streamPushItem-> streamPushItem.getGbId() != null)
|
||||
.collect(Collectors.toList());
|
||||
// 存储数据到gb_stream表, id会返回到streamPushItemForGbStream里
|
||||
@@ -461,7 +452,7 @@ public class StreamPushServiceImpl implements IStreamPushService {
|
||||
gbStreamMapper.batchAdd(streamPushItemForGbStream);
|
||||
}
|
||||
// 去除没有ID也就是没有存储到数据库的数据
|
||||
List<StreamPushItem> streamPushItemsForPlatform = streamPushItemForGbStream.stream()
|
||||
List<StreamPush> streamPushItemsForPlatform = streamPushItemForGbStream.stream()
|
||||
.filter(streamPushItem-> streamPushItem.getGbStreamId() != null)
|
||||
.collect(Collectors.toList());
|
||||
|
||||
@@ -489,14 +480,14 @@ public class StreamPushServiceImpl implements IStreamPushService {
|
||||
}
|
||||
platformInfoMap.put(platform.getServerGBId(), catalogMap);
|
||||
}
|
||||
List<StreamPushItem> streamPushItemListFroPlatform = new ArrayList<>();
|
||||
List<StreamPush> streamPushItemListFroPlatform = new ArrayList<>();
|
||||
Map<String, List<GbStream>> platformForEvent = new HashMap<>();
|
||||
// 遍历存储结果,查找app+Stream->platformId+catalogId的对应关系,然后执行批量写入
|
||||
for (StreamPushItem streamPushItem : streamPushItemsForPlatform) {
|
||||
for (StreamPush streamPushItem : streamPushItemsForPlatform) {
|
||||
List<String[]> platFormInfoList = streamPushItemsForAll.get(streamPushItem.getApp() + streamPushItem.getStream());
|
||||
if (platFormInfoList != null && platFormInfoList.size() > 0) {
|
||||
for (String[] platFormInfoArray : platFormInfoList) {
|
||||
StreamPushItem streamPushItemForPlatform = new StreamPushItem();
|
||||
StreamPush streamPushItemForPlatform = new StreamPush();
|
||||
streamPushItemForPlatform.setGbStreamId(streamPushItem.getGbStreamId());
|
||||
if (platFormInfoArray.length > 0) {
|
||||
// 数组 platFormInfoArray 0 为平台ID。 1为目录ID
|
||||
@@ -589,7 +580,7 @@ public class StreamPushServiceImpl implements IStreamPushService {
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean add(StreamPushItem stream) {
|
||||
public boolean add(StreamPush stream) {
|
||||
stream.setUpdateTime(DateUtil.getNow());
|
||||
stream.setCreateTime(DateUtil.getNow());
|
||||
stream.setServerId(userSetting.getServerId());
|
||||
@@ -630,14 +621,14 @@ public class StreamPushServiceImpl implements IStreamPushService {
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, StreamPushItem> getAllAppAndStreamMap() {
|
||||
public Map<String, StreamPush> getAllAppAndStreamMap() {
|
||||
return streamPushMapper.getAllAppAndStreamMap();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void updatePush(OnStreamChangedHookParam param) {
|
||||
StreamPushItem transform = transform(param);
|
||||
StreamPushItem pushInDb = getPush(param.getApp(), param.getStream());
|
||||
StreamPush transform = transform(param);
|
||||
StreamPush pushInDb = getPush(param.getApp(), param.getStream());
|
||||
transform.setPushIng(param.isRegist());
|
||||
transform.setUpdateTime(DateUtil.getNow());
|
||||
transform.setPushTime(DateUtil.getNow());
|
||||
|
||||
@@ -2,7 +2,7 @@ package com.genersoft.iot.vmp.service.impl;
|
||||
|
||||
import com.alibaba.excel.context.AnalysisContext;
|
||||
import com.alibaba.excel.event.AnalysisEventListener;
|
||||
import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
|
||||
import com.genersoft.iot.vmp.media.zlm.dto.StreamPush;
|
||||
import com.genersoft.iot.vmp.service.IStreamPushService;
|
||||
import com.genersoft.iot.vmp.utils.DateUtil;
|
||||
import com.genersoft.iot.vmp.vmanager.bean.StreamPushExcelDto;
|
||||
@@ -32,12 +32,12 @@ public class StreamPushUploadFileHandler extends AnalysisEventListener<StreamPus
|
||||
/**
|
||||
* 用于存储不加过滤的所有数据
|
||||
*/
|
||||
private final List<StreamPushItem> streamPushItems = new ArrayList<>();
|
||||
private final List<StreamPush> streamPushItems = new ArrayList<>();
|
||||
|
||||
/**
|
||||
* 用于存储更具APP+Stream过滤后的数据,可以直接存入stream_push表与gb_stream表
|
||||
*/
|
||||
private final Map<String,StreamPushItem> streamPushItemForSave = new HashMap<>();
|
||||
private final Map<String, StreamPush> streamPushItemForSave = new HashMap<>();
|
||||
|
||||
/**
|
||||
* 用于存储按照APP+Stream为KEY, 平台ID+目录Id 为value的数据,用于存储到gb_stream表后获取app+Stream对应的平台与目录信息,然后存入关联表
|
||||
@@ -126,7 +126,7 @@ public class StreamPushUploadFileHandler extends AnalysisEventListener<StreamPus
|
||||
streamPushStreamSet.add(streamPushExcelDto.getApp()+streamPushExcelDto.getStream() + streamPushExcelDto.getPlatformId());
|
||||
}
|
||||
|
||||
StreamPushItem streamPushItem = new StreamPushItem();
|
||||
StreamPush streamPushItem = new StreamPush();
|
||||
streamPushItem.setApp(streamPushExcelDto.getApp());
|
||||
streamPushItem.setStream(streamPushExcelDto.getStream());
|
||||
streamPushItem.setGbId(streamPushExcelDto.getGbId());
|
||||
|
||||
@@ -3,7 +3,7 @@ package com.genersoft.iot.vmp.service.redisMsg;
|
||||
import com.alibaba.fastjson2.JSON;
|
||||
import com.alibaba.fastjson2.JSONObject;
|
||||
import com.genersoft.iot.vmp.gb28181.bean.GbStream;
|
||||
import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
|
||||
import com.genersoft.iot.vmp.media.zlm.dto.StreamPush;
|
||||
import com.genersoft.iot.vmp.service.IGbStreamService;
|
||||
import com.genersoft.iot.vmp.media.service.IMediaServerService;
|
||||
import com.genersoft.iot.vmp.service.IStreamPushService;
|
||||
@@ -57,17 +57,17 @@ public class RedisPushStreamStatusListMsgListener implements MessageListener {
|
||||
while (!taskQueue.isEmpty()) {
|
||||
Message msg = taskQueue.poll();
|
||||
try {
|
||||
List<StreamPushItem> streamPushItems = JSON.parseArray(new String(msg.getBody()), StreamPushItem.class);
|
||||
List<StreamPush> streamPushItems = JSON.parseArray(new String(msg.getBody()), StreamPush.class);
|
||||
//查询全部的app+stream 用于判断是添加还是修改
|
||||
Map<String, StreamPushItem> allAppAndStream = streamPushService.getAllAppAndStreamMap();
|
||||
Map<String, StreamPush> allAppAndStream = streamPushService.getAllAppAndStreamMap();
|
||||
Map<String, GbStream> allGBId = gbStreamService.getAllGBId();
|
||||
|
||||
/**
|
||||
* 用于存储更具APP+Stream过滤后的数据,可以直接存入stream_push表与gb_stream表
|
||||
*/
|
||||
List<StreamPushItem> streamPushItemForSave = new ArrayList<>();
|
||||
List<StreamPushItem> streamPushItemForUpdate = new ArrayList<>();
|
||||
for (StreamPushItem streamPushItem : streamPushItems) {
|
||||
List<StreamPush> streamPushItemForSave = new ArrayList<>();
|
||||
List<StreamPush> streamPushItemForUpdate = new ArrayList<>();
|
||||
for (StreamPush streamPushItem : streamPushItems) {
|
||||
String app = streamPushItem.getApp();
|
||||
String stream = streamPushItem.getStream();
|
||||
boolean contains = allAppAndStream.containsKey(app + stream);
|
||||
|
||||
Reference in New Issue
Block a user