优化国标级联目录订阅通知以及目录查询回复

This commit is contained in:
648540858
2022-07-17 23:17:36 +08:00
parent be5dbc9a21
commit 4451994959
50 changed files with 1159 additions and 819 deletions

View File

@@ -0,0 +1,35 @@
package com.genersoft.iot.vmp.service;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
import java.util.List;
/**
* 国标通道业务类
* @author lin
*/
public interface IDeviceChannelService {
/**
* 更新gps信息
*/
DeviceChannel updateGps(DeviceChannel deviceChannel, Device device);
/**
* 添加设备通道
*
* @param deviceId 设备id
* @param channel 通道
*/
void updateChannel(String deviceId, DeviceChannel channel);
/**
* 批量添加设备通道
*
* @param deviceId 设备id
* @param channels 多个通道
*/
int updateChannels(String deviceId, List<DeviceChannel> channels);
}

View File

@@ -18,7 +18,7 @@ public interface IGbStreamService {
* @param count
* @return
*/
PageInfo<GbStream> getAll(Integer page, Integer count, String platFormId, String catalogId,String query,Boolean pushing,String mediaServerId);
PageInfo<GbStream> getAll(Integer page, Integer count, String platFormId, String catalogId,String query,String mediaServerId);
/**

View File

@@ -0,0 +1,22 @@
package com.genersoft.iot.vmp.service;
import com.genersoft.iot.vmp.vmanager.gb28181.platform.bean.ChannelReduce;
import java.util.List;
/**
* 平台关联通道管理
* @author lin
*/
public interface IPlatformChannelService {
/**
* 更新目录下的通道
* @param platformId 平台编号
* @param channelReduces 通道信息
* @param catalogId 目录编号
* @return
*/
int updateChannelForGB(String platformId, List<ChannelReduce> channelReduces, String catalogId);
}

View File

@@ -101,4 +101,9 @@ public interface IStreamProxyService {
void zlmServerOffline(String mediaServerId);
void clean();
/**
* 更新代理流
*/
boolean updateStreamProxy(StreamProxyItem streamProxyItem);
}

View File

@@ -17,7 +17,6 @@ public class DeviceAlarmServiceImpl implements IDeviceAlarmService {
@Autowired
private DeviceAlarmMapper deviceAlarmMapper;
@Override
public PageInfo<DeviceAlarm> getAllAlarm(int page, int count, String deviceId, String alarmPriority, String alarmMethod, String alarmType, String startTime, String endTime) {
PageHelper.startPage(page, count);

View File

@@ -0,0 +1,165 @@
package com.genersoft.iot.vmp.service.impl;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
import com.genersoft.iot.vmp.gb28181.utils.Coordtransform;
import com.genersoft.iot.vmp.service.IDeviceChannelService;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.dao.DeviceChannelMapper;
import com.genersoft.iot.vmp.storager.dao.DeviceMapper;
import com.genersoft.iot.vmp.utils.DateUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
/**
* @author lin
*/
@Service
public class DeviceChannelServiceImpl implements IDeviceChannelService {
private final static Logger logger = LoggerFactory.getLogger(DeviceChannelServiceImpl.class);
@Autowired
private IRedisCatchStorage redisCatchStorage;
@Autowired
private DeviceChannelMapper channelMapper;
@Autowired
private DeviceMapper deviceMapper;
@Override
public DeviceChannel updateGps(DeviceChannel deviceChannel, Device device) {
if (deviceChannel.getLongitude()*deviceChannel.getLatitude() > 0) {
if (device == null) {
device = deviceMapper.getDeviceByDeviceId(deviceChannel.getDeviceId());
}
if ("WGS84".equals(device.getGeoCoordSys())) {
deviceChannel.setLongitudeWgs84(deviceChannel.getLongitude());
deviceChannel.setLatitudeWgs84(deviceChannel.getLatitude());
Double[] position = Coordtransform.WGS84ToGCJ02(deviceChannel.getLongitude(), deviceChannel.getLatitude());
deviceChannel.setLongitudeGcj02(position[0]);
deviceChannel.setLatitudeGcj02(position[1]);
}else if ("GCJ02".equals(device.getGeoCoordSys())) {
deviceChannel.setLongitudeGcj02(deviceChannel.getLongitude());
deviceChannel.setLatitudeGcj02(deviceChannel.getLatitude());
Double[] position = Coordtransform.GCJ02ToWGS84(deviceChannel.getLongitude(), deviceChannel.getLatitude());
deviceChannel.setLongitudeWgs84(position[0]);
deviceChannel.setLatitudeWgs84(position[1]);
}else {
deviceChannel.setLongitudeGcj02(0.00);
deviceChannel.setLatitudeGcj02(0.00);
deviceChannel.setLongitudeWgs84(0.00);
deviceChannel.setLatitudeWgs84(0.00);
}
}else {
deviceChannel.setLongitudeGcj02(deviceChannel.getLongitude());
deviceChannel.setLatitudeGcj02(deviceChannel.getLatitude());
deviceChannel.setLongitudeWgs84(deviceChannel.getLongitude());
deviceChannel.setLatitudeWgs84(deviceChannel.getLatitude());
}
return deviceChannel;
}
@Override
public void updateChannel(String deviceId, DeviceChannel channel) {
String channelId = channel.getChannelId();
channel.setDeviceId(deviceId);
StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(deviceId, channelId);
if (streamInfo != null) {
channel.setStreamId(streamInfo.getStream());
}
String now = DateUtil.getNow();
channel.setUpdateTime(now);
DeviceChannel deviceChannel = channelMapper.queryChannel(deviceId, channelId);
channel = updateGps(channel, null);
if (deviceChannel == null) {
channel.setCreateTime(now);
channelMapper.add(channel);
}else {
channelMapper.update(channel);
}
channelMapper.updateChannelSubCount(deviceId,channel.getParentId());
}
@Override
public int updateChannels(String deviceId, List<DeviceChannel> channels) {
List<DeviceChannel> addChannels = new ArrayList<>();
List<DeviceChannel> updateChannels = new ArrayList<>();
HashMap<String, DeviceChannel> channelsInStore = new HashMap<>();
Device device = deviceMapper.getDeviceByDeviceId(deviceId);
if (channels != null && channels.size() > 0) {
List<DeviceChannel> channelList = channelMapper.queryChannels(deviceId, null, null, null, null);
if (channelList.size() == 0) {
for (DeviceChannel channel : channels) {
channel.setDeviceId(deviceId);
StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(deviceId, channel.getChannelId());
if (streamInfo != null) {
channel.setStreamId(streamInfo.getStream());
}
String now = DateUtil.getNow();
channel.setUpdateTime(now);
channel.setCreateTime(now);
channel = updateGps(channel, device);
addChannels.add(channel);
}
}else {
for (DeviceChannel deviceChannel : channelList) {
channelsInStore.put(deviceChannel.getChannelId(), deviceChannel);
}
for (DeviceChannel channel : channels) {
channel.setDeviceId(deviceId);
StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(deviceId, channel.getChannelId());
if (streamInfo != null) {
channel.setStreamId(streamInfo.getStream());
}
String now = DateUtil.getNow();
channel.setUpdateTime(now);
channel = updateGps(channel, device);
if (channelsInStore.get(channel.getChannelId()) != null) {
updateChannels.add(channel);
}else {
addChannels.add(channel);
channel.setCreateTime(now);
}
}
}
int limitCount = 300;
if (addChannels.size() > 0) {
if (addChannels.size() > limitCount) {
for (int i = 0; i < addChannels.size(); i += limitCount) {
int toIndex = i + limitCount;
if (i + limitCount > addChannels.size()) {
toIndex = addChannels.size();
}
channelMapper.batchAdd(addChannels.subList(i, toIndex));
}
}else {
channelMapper.batchAdd(addChannels);
}
}
if (updateChannels.size() > 0) {
if (updateChannels.size() > limitCount) {
for (int i = 0; i < updateChannels.size(); i += limitCount) {
int toIndex = i + limitCount;
if (i + limitCount > updateChannels.size()) {
toIndex = updateChannels.size();
}
channelMapper.batchUpdate(updateChannels.subList(i, toIndex));
}
}else {
channelMapper.batchUpdate(updateChannels);
}
}
}
return addChannels.size() + updateChannels.size();
}
}

View File

@@ -7,6 +7,7 @@ import com.genersoft.iot.vmp.gb28181.task.ISubscribeTask;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.response.cmd.CatalogResponseMessageHandler;
import com.genersoft.iot.vmp.gb28181.utils.Coordtransform;
import com.genersoft.iot.vmp.service.IDeviceChannelService;
import com.genersoft.iot.vmp.service.IDeviceService;
import com.genersoft.iot.vmp.gb28181.task.impl.CatalogSubscribeTask;
import com.genersoft.iot.vmp.gb28181.task.impl.MobilePositionSubscribeTask;
@@ -55,6 +56,9 @@ public class DeviceServiceImpl implements IDeviceService {
@Autowired
private DeviceMapper deviceMapper;
@Autowired
private IDeviceChannelService deviceChannelService;
@Autowired
private DeviceChannelMapper deviceChannelMapper;
@@ -324,23 +328,12 @@ public class DeviceServiceImpl implements IDeviceService {
private void updateDeviceChannelGeoCoordSys(Device device) {
List<DeviceChannel> deviceChannels = deviceChannelMapper.getAllChannelWithCoordinate(device.getDeviceId());
if (deviceChannels.size() > 0) {
List<DeviceChannel> deviceChannelsForStore = new ArrayList<>();
for (DeviceChannel deviceChannel : deviceChannels) {
if ("WGS84".equals(device.getGeoCoordSys())) {
deviceChannel.setLongitudeWgs84(deviceChannel.getLongitude());
deviceChannel.setLatitudeWgs84(deviceChannel.getLatitude());
Double[] position = Coordtransform.WGS84ToGCJ02(deviceChannel.getLongitude(), deviceChannel.getLatitude());
deviceChannel.setLongitudeGcj02(position[0]);
deviceChannel.setLatitudeGcj02(position[1]);
}else if ("GCJ02".equals(device.getGeoCoordSys())) {
deviceChannel.setLongitudeGcj02(deviceChannel.getLongitude());
deviceChannel.setLatitudeGcj02(deviceChannel.getLatitude());
Double[] position = Coordtransform.GCJ02ToWGS84(deviceChannel.getLongitude(), deviceChannel.getLatitude());
deviceChannel.setLongitudeWgs84(position[0]);
deviceChannel.setLatitudeWgs84(position[1]);
}
deviceChannelsForStore.add(deviceChannelService.updateGps(deviceChannel, device));
}
deviceChannelService.updateChannels(device.getDeviceId(), deviceChannelsForStore);
}
storage.updateChannels(device.getDeviceId(), deviceChannels);
}
@@ -352,11 +345,11 @@ public class DeviceServiceImpl implements IDeviceService {
}
if (parentId == null || parentId.equals(deviceId)) {
// 字根节点开始查询
List<DeviceChannel> rootNodes = getRootNodes(deviceId, "CivilCode".equals(device.getTreeType()), true, !onlyCatalog);
List<DeviceChannel> rootNodes = getRootNodes(deviceId, TreeType.CIVIL_CODE.equals(device.getTreeType()), true, !onlyCatalog);
return transportChannelsToTree(rootNodes, "");
}
if ("CivilCode".equals(device.getTreeType())) {
if (TreeType.CIVIL_CODE.equals(device.getTreeType())) {
if (parentId.length()%2 != 0) {
return null;
}
@@ -386,7 +379,7 @@ public class DeviceServiceImpl implements IDeviceService {
}
// 使用业务分组展示树
if ("BusinessGroup".equals(device.getTreeType())) {
if (TreeType.BUSINESS_GROUP.equals(device.getTreeType())) {
if (parentId.length() < 14 ) {
return null;
}
@@ -406,11 +399,11 @@ public class DeviceServiceImpl implements IDeviceService {
}
if (parentId == null || parentId.equals(deviceId)) {
// 字根节点开始查询
List<DeviceChannel> rootNodes = getRootNodes(deviceId, "CivilCode".equals(device.getTreeType()), false, true);
List<DeviceChannel> rootNodes = getRootNodes(deviceId, TreeType.CIVIL_CODE.equals(device.getTreeType()), false, true);
return rootNodes;
}
if ("CivilCode".equals(device.getTreeType())) {
if (TreeType.CIVIL_CODE.equals(device.getTreeType())) {
if (parentId.length()%2 != 0) {
return null;
}
@@ -431,7 +424,7 @@ public class DeviceServiceImpl implements IDeviceService {
}
// 使用业务分组展示树
if ("BusinessGroup".equals(device.getTreeType())) {
if (TreeType.BUSINESS_GROUP.equals(device.getTreeType())) {
if (parentId.length() < 14 ) {
return null;
}

View File

@@ -1,14 +1,13 @@
package com.genersoft.iot.vmp.service.impl;
import com.genersoft.iot.vmp.conf.SipConfig;
import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
import com.genersoft.iot.vmp.gb28181.bean.GbStream;
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent;
import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem;
import com.genersoft.iot.vmp.storager.dao.GbStreamMapper;
import com.genersoft.iot.vmp.storager.dao.ParentPlatformMapper;
import com.genersoft.iot.vmp.storager.dao.PlatformCatalogMapper;
import com.genersoft.iot.vmp.storager.dao.PlatformGbStreamMapper;
import com.genersoft.iot.vmp.service.IGbStreamService;
import com.github.pagehelper.PageHelper;
@@ -46,15 +45,15 @@ public class GbStreamServiceImpl implements IGbStreamService {
private ParentPlatformMapper platformMapper;
@Autowired
private SipConfig sipConfig;
private PlatformCatalogMapper catalogMapper;
@Autowired
private EventPublisher eventPublisher;
@Override
public PageInfo<GbStream> getAll(Integer page, Integer count, String platFormId, String catalogId, String query, Boolean pushing, String mediaServerId) {
public PageInfo<GbStream> getAll(Integer page, Integer count, String platFormId, String catalogId, String query, String mediaServerId) {
PageHelper.startPage(page, count);
List<GbStream> all = gbStreamMapper.selectAll(platFormId, catalogId, query, pushing, mediaServerId);
List<GbStream> all = gbStreamMapper.selectAll(platFormId, catalogId, query, mediaServerId);
return new PageInfo<>(all);
}
@@ -102,16 +101,25 @@ public class GbStreamServiceImpl implements IGbStreamService {
deviceChannel.setLatitude(gbStream.getLatitude());
deviceChannel.setDeviceId(platform.getDeviceGBId());
deviceChannel.setManufacture("wvp-pro");
// deviceChannel.setStatus(gbStream.isStatus()?1:0);
deviceChannel.setStatus(1);
deviceChannel.setParentId(catalogId ==null?gbStream.getCatalogId():catalogId);
deviceChannel.setStatus(gbStream.isStatus()?1:0);
deviceChannel.setRegisterWay(1);
if (catalogId.length() > 0 && catalogId.length() <= 10) {
// 父节点是行政区划,则设置CivilCode使用此行政区划
deviceChannel.setCivilCode(platform.getAdministrativeDivision());
if (platform.getTreeType().equals(TreeType.CIVIL_CODE)){
deviceChannel.setCivilCode(catalogId);
}else {
deviceChannel.setCivilCode(platform.getAdministrativeDivision());
}else if (platform.getTreeType().equals(TreeType.BUSINESS_GROUP)){
PlatformCatalog catalog = catalogMapper.select(catalogId);
if (catalog == null) {
deviceChannel.setParentId(platform.getDeviceGBId());
deviceChannel.setBusinessGroupId(null);
}else {
deviceChannel.setParentId(catalog.getId());
deviceChannel.setBusinessGroupId(catalog.getBusinessGroupId());
}
}
deviceChannel.setModel("live");
deviceChannel.setOwner("wvp-pro");
deviceChannel.setParental(0);

View File

@@ -0,0 +1,110 @@
package com.genersoft.iot.vmp.service.impl;
import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
import com.genersoft.iot.vmp.gb28181.bean.PlatformCatalog;
import com.genersoft.iot.vmp.gb28181.bean.TreeType;
import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent;
import com.genersoft.iot.vmp.service.IPlatformChannelService;
import com.genersoft.iot.vmp.storager.dao.DeviceChannelMapper;
import com.genersoft.iot.vmp.storager.dao.ParentPlatformMapper;
import com.genersoft.iot.vmp.storager.dao.PlatformCatalogMapper;
import com.genersoft.iot.vmp.storager.dao.PlatformChannelMapper;
import com.genersoft.iot.vmp.vmanager.gb28181.platform.PlatformController;
import com.genersoft.iot.vmp.vmanager.gb28181.platform.bean.ChannelReduce;
import com.sun.org.apache.xml.internal.resolver.CatalogManager;
import javafx.application.Platform;
import org.apache.ibatis.annotations.Mapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* @author lin
*/
@Service
public class PlatformChannelServiceImpl implements IPlatformChannelService {
private final static Logger logger = LoggerFactory.getLogger(PlatformChannelServiceImpl.class);
@Autowired
private PlatformChannelMapper platformChannelMapper;
@Autowired
private DeviceChannelMapper deviceChannelMapper;
@Autowired
private PlatformCatalogMapper catalogManager;
@Autowired
private ParentPlatformMapper platformMapper;
@Autowired
EventPublisher eventPublisher;
@Override
public int updateChannelForGB(String platformId, List<ChannelReduce> channelReduces, String catalogId) {
ParentPlatform platform = platformMapper.getParentPlatByServerGBId(platformId);
if (platform == null) {
logger.warn("更新级联通道信息时未找到平台{}的信息", platformId);
return 0;
}
Map<Integer, ChannelReduce> deviceAndChannels = new HashMap<>();
for (ChannelReduce channelReduce : channelReduces) {
channelReduce.setCatalogId(catalogId);
deviceAndChannels.put(channelReduce.getId(), channelReduce);
}
List<Integer> deviceAndChannelList = new ArrayList<>(deviceAndChannels.keySet());
// 查询当前已经存在的
List<Integer> channelIds = platformChannelMapper.findChannelRelatedPlatform(platformId, channelReduces);
if (deviceAndChannelList != null) {
deviceAndChannelList.removeAll(channelIds);
}
for (Integer channelId : channelIds) {
deviceAndChannels.remove(channelId);
}
List<ChannelReduce> channelReducesToAdd = new ArrayList<>(deviceAndChannels.values());
// 对剩下的数据进行存储
int result = 0;
if (channelReducesToAdd.size() > 0) {
result = platformChannelMapper.addChannels(platformId, channelReducesToAdd);
// TODO 后续给平台增加控制开关以控制是否响应目录订阅
List<DeviceChannel> deviceChannelList = getDeviceChannelListByChannelReduceList(channelReducesToAdd, catalogId, platform);
eventPublisher.catalogEventPublish(platformId, deviceChannelList, CatalogEvent.ADD);
}
return result;
}
private List<DeviceChannel> getDeviceChannelListByChannelReduceList(List<ChannelReduce> channelReduces, String catalogId, ParentPlatform platform) {
List<DeviceChannel> deviceChannelList = new ArrayList<>();
if (channelReduces.size() > 0){
PlatformCatalog catalog = catalogManager.select(catalogId);
if (catalog == null && !catalogId.equals(platform.getServerGBId())) {
logger.warn("未查询到目录{}的信息", catalogId);
return null;
}
for (ChannelReduce channelReduce : channelReduces) {
DeviceChannel deviceChannel = deviceChannelMapper.queryChannel(channelReduce.getDeviceId(), channelReduce.getChannelId());
deviceChannel.setParental(0);
deviceChannelList.add(deviceChannel);
if (platform.getTreeType().equals(TreeType.CIVIL_CODE)){
deviceChannel.setCivilCode(catalogId);
}else if (platform.getTreeType().equals(TreeType.BUSINESS_GROUP)){
deviceChannel.setParentId(catalogId);
if (catalog != null) {
deviceChannel.setBusinessGroupId(catalog.getBusinessGroupId());
}
}
}
}
return deviceChannelList;
}
}

View File

@@ -15,6 +15,7 @@ import com.genersoft.iot.vmp.media.zlm.dto.MediaItem;
import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
import com.genersoft.iot.vmp.service.IStreamPushService;
import com.genersoft.iot.vmp.service.bean.PushStreamStatusChangeFromRedisDto;
import com.genersoft.iot.vmp.service.bean.StreamPushItemFromRedis;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
import org.slf4j.Logger;
@@ -26,6 +27,7 @@ import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.List;
@@ -55,7 +57,6 @@ public class RedisPushStreamStatusMsgListener implements MessageListener, Applic
@Override
public void onMessage(Message message, byte[] bytes) {
PushStreamStatusChangeFromRedisDto statusChangeFromPushStream = JSON.parseObject(message.getBody(), PushStreamStatusChangeFromRedisDto.class);
if (statusChangeFromPushStream == null) {
logger.warn("[REDIS 消息]推流设备状态变化消息解析失败");
@@ -65,11 +66,13 @@ public class RedisPushStreamStatusMsgListener implements MessageListener, Applic
// 所有设备离线
streamPushService.allStreamOffline();
}
if (statusChangeFromPushStream.getOfflineStreams().size() > 0) {
if (statusChangeFromPushStream.getOfflineStreams() != null
&& statusChangeFromPushStream.getOfflineStreams().size() > 0) {
// 更新部分设备离线
streamPushService.offline(statusChangeFromPushStream.getOfflineStreams());
}
if (statusChangeFromPushStream.getOnlineStreams().size() > 0) {
if (statusChangeFromPushStream.getOnlineStreams() != null &&
statusChangeFromPushStream.getOnlineStreams().size() > 0) {
// 更新部分设备上线
streamPushService.online(statusChangeFromPushStream.getOnlineStreams());
}

View File

@@ -3,10 +3,10 @@ package com.genersoft.iot.vmp.service.impl;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.conf.SipConfig;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.gb28181.bean.GbStream;
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
import com.genersoft.iot.vmp.gb28181.bean.TreeType;
import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent;
import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils;
@@ -23,14 +23,19 @@ import com.genersoft.iot.vmp.storager.dao.ParentPlatformMapper;
import com.genersoft.iot.vmp.storager.dao.PlatformGbStreamMapper;
import com.genersoft.iot.vmp.storager.dao.StreamProxyMapper;
import com.genersoft.iot.vmp.service.IStreamProxyService;
import com.genersoft.iot.vmp.utils.DateUtil;
import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
import com.github.pagehelper.PageInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.stereotype.Service;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.TransactionStatus;
import org.springframework.util.StringUtils;
import java.net.InetAddress;
import java.util.*;
/**
@@ -48,7 +53,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
private IMediaService mediaService;
@Autowired
private ZLMRESTfulUtils zlmresTfulUtils;;
private ZLMRESTfulUtils zlmresTfulUtils;
@Autowired
private StreamProxyMapper streamProxyMapper;
@@ -62,9 +67,6 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
@Autowired
private UserSetting userSetting;
@Autowired
private SipConfig sipConfig;
@Autowired
private GbStreamMapper gbStreamMapper;
@@ -83,6 +85,12 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
@Autowired
private IMediaServerService mediaServerService;
@Autowired
DataSourceTransactionManager dataSourceTransactionManager;
@Autowired
TransactionDefinition transactionDefinition;
@Override
public WVPResult<StreamInfo> save(StreamProxyItem param) {
@@ -99,6 +107,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
wvpResult.setMsg("保存失败");
return wvpResult;
}
String dstUrl = String.format("rtmp://%s:%s/%s/%s", "127.0.0.1", mediaInfo.getRtmpPort(), param.getApp(),
param.getStream() );
param.setDst_url(dstUrl);
@@ -108,9 +117,9 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
boolean saveResult;
// 更新
if (videoManagerStorager.queryStreamProxy(param.getApp(), param.getStream()) != null) {
saveResult = videoManagerStorager.updateStreamProxy(param);
saveResult = updateStreamProxy(param);
}else { // 新增
saveResult = videoManagerStorager.addStreamProxy(param);
saveResult = addStreamProxy(param);
}
if (saveResult) {
result.append("保存成功");
@@ -124,7 +133,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
if (param.isEnable_remove_none_reader()) {
del(param.getApp(), param.getStream());
}else {
videoManagerStorager.updateStreamProxy(param);
updateStreamProxy(param);
}
}else {
@@ -154,6 +163,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
for (ParentPlatform parentPlatform : parentPlatforms) {
param.setPlatformId(parentPlatform.getServerGBId());
param.setCatalogId(parentPlatform.getCatalogId());
String stream = param.getStream();
StreamProxyItem streamProxyItems = platformGbStreamMapper.selectOne(param.getApp(), stream, parentPlatform.getServerGBId());
if (streamProxyItems == null) {
@@ -168,6 +178,77 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
return wvpResult;
}
/**
* 新增代理流
* @param streamProxyItem
* @return
*/
private boolean addStreamProxy(StreamProxyItem streamProxyItem) {
TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition);
boolean result = false;
streamProxyItem.setStreamType("proxy");
streamProxyItem.setStatus(true);
String now = DateUtil.getNow();
streamProxyItem.setCreateTime(now);
try {
if (streamProxyMapper.add(streamProxyItem) > 0) {
if (!StringUtils.isEmpty(streamProxyItem.getGbId())) {
if (gbStreamMapper.add(streamProxyItem) < 0) {
//事务回滚
dataSourceTransactionManager.rollback(transactionStatus);
return false;
}
}
}else {
//事务回滚
dataSourceTransactionManager.rollback(transactionStatus);
return false;
}
result = true;
dataSourceTransactionManager.commit(transactionStatus); //手动提交
}catch (Exception e) {
logger.error("向数据库添加流代理失败:", e);
dataSourceTransactionManager.rollback(transactionStatus);
}
return result;
}
/**
* 更新代理流
* @param streamProxyItem
* @return
*/
@Override
public boolean updateStreamProxy(StreamProxyItem streamProxyItem) {
TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition);
boolean result = false;
streamProxyItem.setStreamType("proxy");
try {
if (streamProxyMapper.update(streamProxyItem) > 0) {
if (!StringUtils.isEmpty(streamProxyItem.getGbId())) {
if (gbStreamMapper.updateByAppAndStream(streamProxyItem) == 0) {
//事务回滚
dataSourceTransactionManager.rollback(transactionStatus);
return false;
}
}
} else {
//事务回滚
dataSourceTransactionManager.rollback(transactionStatus);
return false;
}
dataSourceTransactionManager.commit(transactionStatus); //手动提交
result = true;
}catch (Exception e) {
e.printStackTrace();
dataSourceTransactionManager.rollback(transactionStatus);
}
return result;
}
@Override
public JSONObject addStreamProxyToZlm(StreamProxyItem param) {
JSONObject result = null;
@@ -239,7 +320,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
if (jsonObject.getInteger("code") == 0) {
result = true;
streamProxy.setEnable(true);
videoManagerStorager.updateStreamProxy(streamProxy);
updateStreamProxy(streamProxy);
}
}
return result;
@@ -253,7 +334,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
JSONObject jsonObject = removeStreamProxyFromZlm(streamProxyDto);
if (jsonObject != null && jsonObject.getInteger("code") == 0) {
streamProxyDto.setEnable(false);
result = videoManagerStorager.updateStreamProxy(streamProxyDto);
result = updateStreamProxy(streamProxyDto);
}
}
return result;

View File

@@ -488,7 +488,7 @@ public class StreamPushServiceImpl implements IStreamPushService {
if (onlinePushers.size() == 0) {
return;
}
streamPushMapper.allStreamOffline();
streamPushMapper.setAllStreamOffline();
// 发送通知
eventPublisher.catalogEventPublishForStream(null, onlinePushers, CatalogEvent.OFF);

View File

@@ -14,38 +14,60 @@ import java.util.*;
public class StreamPushUploadFileHandler extends AnalysisEventListener<StreamPushExcelDto> {
// 错误数据的回调,用于将错误数据发送给页面
/**
* 错误数据的回调,用于将错误数据发送给页面
*/
private ErrorDataHandler errorDataHandler;
// 推流的业务类用于存储数据
/**
* 推流的业务类用于存储数据
*/
private IStreamPushService pushService;
// 默认流媒体节点ID
/**
* 默认流媒体节点ID
*/
private String defaultMediaServerId;
// 用于存储不加过滤的所有数据
/**
* 用于存储不加过滤的所有数据
*/
private List<StreamPushItem> streamPushItems = new ArrayList<>();
// 用于存储更具APP+Stream过滤后的数据可以直接存入stream_push表与gb_stream表
/**
* 用于存储更具APP+Stream过滤后的数据可以直接存入stream_push表与gb_stream表
*/
private Map<String,StreamPushItem> streamPushItemForSave = new HashMap<>();
// 用于存储按照APP+Stream为KEY 平台ID+目录Id 为value的数据用于存储到gb_stream表后获取app+Stream对应的平台与目录信息然后存入关联表
/**
* 用于存储按照APP+Stream为KEY 平台ID+目录Id 为value的数据用于存储到gb_stream表后获取app+Stream对应的平台与目录信息然后存入关联表
*/
private Map<String, List<String[]>> streamPushItemsForPlatform = new HashMap<>();
// 用于判断文件是否存在重复的app+Stream+平台ID
/**
* 用于判断文件是否存在重复的app+Stream+平台ID
*/
private Set<String> streamPushStreamSet = new HashSet<>();
// 用于存储APP+Stream->国标ID 的数据结构, 数据一一对应全局判断APP+Stream->国标ID是否存在不对应
/**
* 用于存储APP+Stream->国标ID 的数据结构, 数据一一对应全局判断APP+Stream->国标ID是否存在不对应
*/
private BiMap<String,String> gBMap = HashBiMap.create();
// 记录错误的APP+Stream
/**
* 记录错误的APP+Stream
*/
private List<String> errorStreamList = new ArrayList<>();
// 记录错误的国标ID
/**
* 记录错误的国标ID
*/
private List<String> errorGBList = new ArrayList<>();
// 读取数量计数器
/**
* 读取数量计数器
*/
private int loadedSize = 0;
public StreamPushUploadFileHandler(IStreamPushService pushService, String defaultMediaServerId, ErrorDataHandler errorDataHandler) {