临时提交

This commit is contained in:
648540858
2024-06-25 16:45:31 +08:00
parent 306c42b4b7
commit 68fce9177f
14 changed files with 654 additions and 675 deletions

View File

@@ -1,5 +1,6 @@
package com.genersoft.iot.vmp.service;
import com.genersoft.iot.vmp.gb28181.bean.CommonGBChannel;
import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
import com.genersoft.iot.vmp.gb28181.bean.GbStream;
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
@@ -45,8 +46,8 @@ public interface IGbStreamService {
DeviceChannel getDeviceChannelListByStream(GbStream gbStream, String catalogId, ParentPlatform platform);
void sendCatalogMsg(GbStream gbStream, String type);
void sendCatalogMsgs(List<GbStream> gbStreams, String type);
void sendCatalogMsg(CommonGBChannel gbStream, String type);
void sendCatalogMsgs(List<CommonGBChannel> gbStreams, String type);
/**
* 修改gbId或name

View File

@@ -116,4 +116,7 @@ public interface IStreamPushService {
void updatePush(OnStreamChangedHookParam param);
Map<String, StreamPush> getAllGBId();
}

View File

@@ -157,16 +157,16 @@ public class GbStreamServiceImpl implements IGbStreamService {
}
@Override
public void sendCatalogMsg(GbStream gbStream, String type) {
if (gbStream == null || type == null) {
public void sendCatalogMsg(CommonGBChannel channel, String type) {
if (channel == null || type == null) {
logger.warn("[发送目录订阅]类型流信息或类型为NULL");
return;
}
List<GbStream> gbStreams = new ArrayList<>();
if (gbStream.getGbId() != null) {
gbStreams.add(gbStream);
List<CommonGBChannel> gbStreams = new ArrayList<>();
if (channel.getGbDeviceId() != null) {
gbStreams.add(channel);
}else {
GbStream gbStreamIndb = gbStreamMapper.selectOne(gbStream.getApp(), gbStream.getStream());
GbStream gbStreamIndb = gbStreamMapper.selectOne(channel.getApp(), channel.getStream());
if (gbStreamIndb != null && gbStreamIndb.getGbId() != null){
gbStreams.add(gbStreamIndb);
}

View File

@@ -48,9 +48,6 @@ public class StreamPushServiceImpl implements IStreamPushService {
private final static Logger logger = LoggerFactory.getLogger(StreamPushServiceImpl.class);
@Autowired
private GbStreamMapper gbStreamMapper;
@Autowired
private StreamPushMapper streamPushMapper;
@@ -66,9 +63,6 @@ public class StreamPushServiceImpl implements IStreamPushService {
@Autowired
private PlatformGbStreamMapper platformGbStreamMapper;
@Autowired
private IGbStreamService gbStreamService;
@Autowired
private EventPublisher eventPublisher;
@@ -126,16 +120,7 @@ public class StreamPushServiceImpl implements IStreamPushService {
streamPushMapper.update(transform);
gbStreamMapper.updateMediaServer(event.getApp(), event.getStream(), event.getMediaServer().getId());
}
// TODO 相关的事件自行管理不需要写入ZLMMediaListManager
// ChannelOnlineEvent channelOnlineEventLister = getChannelOnlineEventLister(transform.getApp(), transform.getStream());
// if ( channelOnlineEventLister != null) {
// try {
// channelOnlineEventLister.run(transform.getApp(), transform.getStream(), transform.getServerId());;
// } catch (ParseException e) {
// logger.error("addPush: ", e);
// }
// removedChannelOnlineEventLister(transform.getApp(), transform.getStream());
// }
// 冗余数据,自己系统中自用
redisCatchStorage.addPushListItem(event.getApp(), event.getStream(), event);
@@ -216,7 +201,6 @@ public class StreamPushServiceImpl implements IStreamPushService {
streamPushItem.setMediaServerId(item.getMediaServerId());
streamPushItem.setStream(item.getStream());
streamPushItem.setCreateTime(DateUtil.getNow());
streamPushItem.setVhost(item.getVhost());
streamPushItem.setServerId(item.getSeverId());
return streamPushItem;
}
@@ -625,6 +609,11 @@ public class StreamPushServiceImpl implements IStreamPushService {
return streamPushMapper.getAllAppAndStreamMap();
}
@Override
public Map<String, StreamPush> getAllGBId() {
return streamPushMapper.getAllGBId();
}
@Override
public void updatePush(OnStreamChangedHookParam param) {
StreamPush transform = transform(param);

View File

@@ -2,10 +2,9 @@ package com.genersoft.iot.vmp.service.redisMsg;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.gb28181.bean.GbStream;
import com.genersoft.iot.vmp.media.service.IMediaServerService;
import com.genersoft.iot.vmp.media.zlm.dto.StreamPush;
import com.genersoft.iot.vmp.service.IGbStreamService;
import com.genersoft.iot.vmp.media.service.IMediaServerService;
import com.genersoft.iot.vmp.service.IStreamPushService;
import com.genersoft.iot.vmp.utils.DateUtil;
import org.slf4j.Logger;
@@ -60,43 +59,40 @@ public class RedisPushStreamStatusListMsgListener implements MessageListener {
List<StreamPush> streamPushItems = JSON.parseArray(new String(msg.getBody()), StreamPush.class);
//查询全部的app+stream 用于判断是添加还是修改
Map<String, StreamPush> allAppAndStream = streamPushService.getAllAppAndStreamMap();
Map<String, GbStream> allGBId = gbStreamService.getAllGBId();
Map<String, StreamPush> allGBId = streamPushService.getAllGBId();
/**
* 用于存储更具APP+Stream过滤后的数据可以直接存入stream_push表与gb_stream表
*/
List<StreamPush> streamPushItemForSave = new ArrayList<>();
List<StreamPush> streamPushItemForUpdate = new ArrayList<>();
for (StreamPush streamPushItem : streamPushItems) {
String app = streamPushItem.getApp();
String stream = streamPushItem.getStream();
for (StreamPush streamPush : streamPushItems) {
String app = streamPush.getApp();
String stream = streamPush.getStream();
boolean contains = allAppAndStream.containsKey(app + stream);
//不存在就添加
if (!contains) {
if (allGBId.containsKey(streamPushItem.getGbId())) {
GbStream gbStream = allGBId.get(streamPushItem.getGbId());
if (allGBId.containsKey(streamPush.getGbDeviceId())) {
StreamPush streamPushInDb = allGBId.get(streamPush.getGbDeviceId());
logger.warn("[REDIS消息-推流设备列表更新-INSERT] 国标编号重复: {}, 已分配给{}/{}",
streamPushItem.getGbId(), gbStream.getApp(), gbStream.getStream());
streamPushInDb.getGbDeviceId(), streamPushInDb.getApp(), streamPushInDb.getStream());
continue;
}
streamPushItem.setStreamType("push");
streamPushItem.setCreateTime(DateUtil.getNow());
streamPushItem.setMediaServerId(mediaServerService.getDefaultMediaServer().getId());
streamPushItem.setOriginType(2);
streamPushItem.setOriginTypeStr("rtsp_push");
streamPushItem.setTotalReaderCount(0);
streamPushItemForSave.add(streamPushItem);
allGBId.put(streamPushItem.getGbId(), streamPushItem);
streamPush.setCreateTime(DateUtil.getNow());
streamPush.setMediaServerId(mediaServerService.getDefaultMediaServer().getId());
streamPushItemForSave.add(streamPush);
allGBId.put(streamPush.getGbDeviceId(), streamPush);
} else {
if (allGBId.containsKey(streamPushItem.getGbId())
&& (!allGBId.get(streamPushItem.getGbId()).getApp().equals(streamPushItem.getApp()) || !allGBId.get(streamPushItem.getGbId()).getStream().equals(streamPushItem.getStream()))) {
GbStream gbStream = allGBId.get(streamPushItem.getGbId());
if (allGBId.containsKey(streamPush.getGbDeviceId())
&& (!allGBId.get(streamPush.getGbDeviceId()).getApp().equals(streamPush.getApp())
|| !allGBId.get(streamPush.getGbDeviceId()).getStream().equals(streamPush.getStream()))) {
StreamPush streamPushInDb = allGBId.get(streamPush.getGbDeviceId());
logger.warn("[REDIS消息-推流设备列表更新-UPDATE] 国标编号重复: {}, 已分配给{}/{}",
streamPushItem.getGbId(), gbStream.getApp(), gbStream.getStream());
streamPush.getGbDeviceId(), streamPushInDb.getApp(), streamPushInDb.getStream());
continue;
}
//存在就只修改 name和gbId
streamPushItemForUpdate.add(streamPushItem);
streamPushItemForUpdate.add(streamPush);
}
}
if (!streamPushItemForSave.isEmpty()) {