优化国标级联中的通道全部导入

This commit is contained in:
648540858
2023-03-22 15:20:48 +08:00
parent de969b7760
commit f210952ed2
7 changed files with 100 additions and 28 deletions

View File

@@ -40,6 +40,9 @@ public class GbStreamServiceImpl implements IGbStreamService {
@Autowired
private PlatformGbStreamMapper platformGbStreamMapper;
@Autowired
private SubscribeHolder subscribeHolder;
@Autowired
private ParentPlatformMapper platformMapper;
@@ -73,16 +76,23 @@ public class GbStreamServiceImpl implements IGbStreamService {
}
try {
List<DeviceChannel> deviceChannelList = new ArrayList<>();
for (GbStream gbStream : gbStreams) {
for (int i = 0; i < gbStreams.size(); i++) {
GbStream gbStream = gbStreams.get(i);
gbStream.setCatalogId(catalogId);
gbStream.setPlatformId(platformId);
// TODO 修改为批量提交
platformGbStreamMapper.add(gbStream);
logger.info("[关联通道]直播流通道 平台:{}, 共需关联通道数:{}, 已关联:{}", platformId, gbStreams.size(), i + 1);
DeviceChannel deviceChannelListByStream = getDeviceChannelListByStreamWithStatus(gbStream, catalogId, parentPlatform);
deviceChannelList.add(deviceChannelListByStream);
}
dataSourceTransactionManager.commit(transactionStatus); //手动提交
eventPublisher.catalogEventPublish(platformId, deviceChannelList, CatalogEvent.ADD);
if (subscribeHolder.getCatalogSubscribe(platformId) != null) {
eventPublisher.catalogEventPublish(platformId, deviceChannelList, CatalogEvent.ADD);
}
result = true;
}catch (Exception e) {
logger.error("批量保存流与平台的关系时错误", e);

View File

@@ -1,9 +1,6 @@
package com.genersoft.iot.vmp.service.impl;
import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
import com.genersoft.iot.vmp.gb28181.bean.PlatformCatalog;
import com.genersoft.iot.vmp.gb28181.bean.TreeType;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent;
import com.genersoft.iot.vmp.service.IPlatformChannelService;
@@ -15,7 +12,10 @@ import com.genersoft.iot.vmp.vmanager.gb28181.platform.bean.ChannelReduce;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.stereotype.Service;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.TransactionStatus;
import org.springframework.util.ObjectUtils;
import java.util.ArrayList;
@@ -34,6 +34,16 @@ public class PlatformChannelServiceImpl implements IPlatformChannelService {
@Autowired
private PlatformChannelMapper platformChannelMapper;
@Autowired
TransactionDefinition transactionDefinition;
@Autowired
DataSourceTransactionManager dataSourceTransactionManager;
@Autowired
private SubscribeHolder subscribeHolder;
@Autowired
private DeviceChannelMapper deviceChannelMapper;
@@ -69,17 +79,47 @@ public class PlatformChannelServiceImpl implements IPlatformChannelService {
}
List<ChannelReduce> channelReducesToAdd = new ArrayList<>(deviceAndChannels.values());
// 对剩下的数据进行存储
int result = 0;
int allCount = 0;
boolean result = false;
TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition);
int limitCount = 300;
if (channelReducesToAdd.size() > 0) {
result = platformChannelMapper.addChannels(platformId, channelReducesToAdd);
// TODO 后续给平台增加控制开关以控制是否响应目录订阅
List<DeviceChannel> deviceChannelList = getDeviceChannelListByChannelReduceList(channelReducesToAdd, catalogId, platform);
if (deviceChannelList != null) {
eventPublisher.catalogEventPublish(platformId, deviceChannelList, CatalogEvent.ADD);
if (channelReducesToAdd.size() > limitCount) {
for (int i = 0; i < channelReducesToAdd.size(); i += limitCount) {
int toIndex = i + limitCount;
if (i + limitCount > channelReducesToAdd.size()) {
toIndex = channelReducesToAdd.size();
}
int count = platformChannelMapper.addChannels(platformId, channelReducesToAdd.subList(i, toIndex));
result = result || count < 0;
allCount += count;
logger.info("[关联通道]国标通道 平台:{}, 共需关联通道数:{}, 已关联:{}", platformId, channelReducesToAdd.size(), toIndex);
}
}else {
allCount = platformChannelMapper.addChannels(platformId, channelReducesToAdd);
result = result || allCount < 0;
logger.info("[关联通道]国标通道 平台:{}, 关联通道数:{}", platformId, channelReducesToAdd.size());
}
}
return result;
if (result) {
//事务回滚
dataSourceTransactionManager.rollback(transactionStatus);
allCount = 0;
}else {
logger.info("[关联通道]国标通道 平台:{}, 正在存入数据库", platformId);
dataSourceTransactionManager.commit(transactionStatus);
}
SubscribeInfo catalogSubscribe = subscribeHolder.getCatalogSubscribe(platformId);
if (catalogSubscribe != null) {
List<DeviceChannel> deviceChannelList = getDeviceChannelListByChannelReduceList(channelReducesToAdd, catalogId, platform);
if (deviceChannelList != null) {
eventPublisher.catalogEventPublish(platformId, deviceChannelList, CatalogEvent.ADD);
}
}
logger.info("[关联通道]国标通道 平台:{}, 存入数据库成功", platformId);
}
return allCount;
}
private List<DeviceChannel> getDeviceChannelListByChannelReduceList(List<ChannelReduce> channelReduces, String catalogId, ParentPlatform platform) {