From 033db1925c3ffe317b0f9dd5265ce67c1e5b76f9 Mon Sep 17 00:00:00 2001 From: lin <648540858@qq.com> Date: Wed, 19 Nov 2025 17:03:45 +0800 Subject: [PATCH] =?UTF-8?q?=E9=80=9A=E9=81=93=E7=BC=96=E8=BE=91=E4=BF=AE?= =?UTF-8?q?=E5=A4=8D=E5=88=86=E7=BB=84=E8=B7=AF=E5=BE=84=E6=98=BE=E7=A4=BA?= =?UTF-8?q?=E5=BC=82=E5=B8=B8=EF=BC=8C=E6=94=AF=E6=8C=81=E8=A1=8C=E6=94=BF?= =?UTF-8?q?=E5=8C=BA=E5=88=92=E8=B7=AF=E5=BE=84=E6=98=BE=E7=A4=BA?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../iot/vmp/gb28181/bean/CommonGBChannel.java | 52 +++++++++++ .../iot/vmp/gb28181/bean/SubscribeHolder.java | 18 ++-- .../iot/vmp/gb28181/bean/SubscribeInfo.java | 5 ++ .../vmp/gb28181/dao/DeviceChannelMapper.java | 12 +-- .../iot/vmp/gb28181/event/EventPublisher.java | 3 + .../gb28181/service/IGbChannelService.java | 6 +- .../service/impl/GbChannelServiceImpl.java | 89 ++++++++++--------- .../service/impl/GroupServiceImpl.java | 7 +- .../service/impl/RegionServiceImpl.java | 13 ++- .../impl/MobilePositionServiceImpl.java | 3 +- .../redisMsg/RedisGroupMsgListener.java | 71 ++++++++------- .../RedisPushStreamListMsgListener.java | 6 +- .../RedisPushStreamStatusMsgListener.java | 8 +- .../storager/impl/RedisCatchStorageImpl.java | 3 +- .../service/impl/StreamProxyServiceImpl.java | 6 +- .../vmp/streamPush/dao/StreamPushMapper.java | 4 +- .../service/IStreamPushService.java | 8 +- .../service/impl/StreamPushServiceImpl.java | 28 ++++-- .../iot/vmp/web/custom/bean/SYMember.java | 1 + .../custom/service/CameraChannelService.java | 72 +++++++++++---- web/src/views/common/CommonChannelEdit.vue | 22 ++++- 21 files changed, 279 insertions(+), 158 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/CommonGBChannel.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/CommonGBChannel.java index a9d19363a..d3a606159 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/CommonGBChannel.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/CommonGBChannel.java @@ -396,4 +396,56 @@ public class CommonGBChannel { return commonGBChannel; } + @Override + public String toString() { + return "CommonGBChannel{" + + "gbId=" + gbId + + ", gbDeviceId='" + gbDeviceId + '\'' + + ", gbName='" + gbName + '\'' + + ", gbManufacturer='" + gbManufacturer + '\'' + + ", gbModel='" + gbModel + '\'' + + ", gbOwner='" + gbOwner + '\'' + + ", gbCivilCode='" + gbCivilCode + '\'' + + ", gbBlock='" + gbBlock + '\'' + + ", gbAddress='" + gbAddress + '\'' + + ", gbParental=" + gbParental + + ", gbParentId='" + gbParentId + '\'' + + ", gbSafetyWay=" + gbSafetyWay + + ", gbRegisterWay=" + gbRegisterWay + + ", gbCertNum='" + gbCertNum + '\'' + + ", gbCertifiable=" + gbCertifiable + + ", gbErrCode=" + gbErrCode + + ", gbEndTime='" + gbEndTime + '\'' + + ", gbSecrecy=" + gbSecrecy + + ", gbIpAddress='" + gbIpAddress + '\'' + + ", gbPort=" + gbPort + + ", gbPassword='" + gbPassword + '\'' + + ", gbStatus='" + gbStatus + '\'' + + ", gbLongitude=" + gbLongitude + + ", gbLatitude=" + gbLatitude + + ", gpsAltitude=" + gpsAltitude + + ", gpsSpeed=" + gpsSpeed + + ", gpsDirection=" + gpsDirection + + ", gpsTime='" + gpsTime + '\'' + + ", gbBusinessGroupId='" + gbBusinessGroupId + '\'' + + ", gbPtzType=" + gbPtzType + + ", gbPositionType=" + gbPositionType + + ", gbRoomType=" + gbRoomType + + ", gbUseType=" + gbUseType + + ", gbSupplyLightType=" + gbSupplyLightType + + ", gbDirectionType=" + gbDirectionType + + ", gbResolution='" + gbResolution + '\'' + + ", gbDownloadSpeed='" + gbDownloadSpeed + '\'' + + ", gbSvcSpaceSupportMod=" + gbSvcSpaceSupportMod + + ", gbSvcTimeSupportMode=" + gbSvcTimeSupportMode + + ", recordPLan=" + recordPLan + + ", dataType=" + dataType + + ", dataDeviceId=" + dataDeviceId + + ", createTime='" + createTime + '\'' + + ", updateTime='" + updateTime + '\'' + + ", streamId='" + streamId + '\'' + + ", enableBroadcast=" + enableBroadcast + + ", mapLevel=" + mapLevel + + '}'; + } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeHolder.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeHolder.java index 17a5284b3..0bcb9a695 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeHolder.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeHolder.java @@ -32,7 +32,8 @@ public class SubscribeHolder { public void putCatalogSubscribe(String platformId, SubscribeInfo subscribeInfo) { log.info("[国标级联] 添加目录订阅,平台: {}, 有效期: {}", platformId, subscribeInfo.getExpires()); - String key = String.format("%s_%s_%s_%s", prefix, userSetting.getServerId(), "catalog", platformId); + subscribeInfo.setServerId(userSetting.getServerId()); + String key = String.format("%s:%s:%s", prefix, "catalog", platformId); if (subscribeInfo.getExpires() > 0) { Duration duration = Duration.ofSeconds(subscribeInfo.getExpires()); redisTemplate.opsForValue().set(key, subscribeInfo, duration); @@ -42,18 +43,19 @@ public class SubscribeHolder { } public SubscribeInfo getCatalogSubscribe(String platformId) { - String key = String.format("%s_%s_%s_%s", prefix, userSetting.getServerId(), "catalog", platformId); + String key = String.format("%s:%s:%s", prefix, "catalog", platformId); return (SubscribeInfo)redisTemplate.opsForValue().get(key); } public void removeCatalogSubscribe(String platformId) { - String key = String.format("%s_%s_%s_%s", prefix, userSetting.getServerId(), "catalog", platformId); + String key = String.format("%s:%s:%s", prefix, "catalog", platformId); redisTemplate.delete(key); } public void putMobilePositionSubscribe(String platformId, SubscribeInfo subscribeInfo, Runnable gpsTask) { log.info("[国标级联] 添加移动位置订阅,平台: {}, 有效期: {}s", platformId, subscribeInfo.getExpires()); - String key = String.format("%s_%s_%s_%s", prefix, userSetting.getServerId(), "mobilePosition", platformId); + subscribeInfo.setServerId(userSetting.getServerId()); + String key = String.format("%s:%s:%s", prefix, "mobilePosition", platformId); if (subscribeInfo.getExpires() > 0) { Duration duration = Duration.ofSeconds(subscribeInfo.getExpires()); redisTemplate.opsForValue().set(key, subscribeInfo, duration); @@ -81,12 +83,12 @@ public class SubscribeHolder { } public SubscribeInfo getMobilePositionSubscribe(String platformId) { - String key = String.format("%s_%s_%s_%s", prefix, userSetting.getServerId(), "mobilePosition", platformId); + String key = String.format("%s:%s:%s", prefix, "mobilePosition", platformId); return (SubscribeInfo)redisTemplate.opsForValue().get(key); } public void removeMobilePositionSubscribe(String platformId) { - String key = String.format("%s_%s_%s_%s", prefix, userSetting.getServerId(), "mobilePosition", platformId); + String key = String.format("%s:%s:%s", prefix, "mobilePosition", platformId); redisTemplate.delete(key); } @@ -96,7 +98,7 @@ public class SubscribeHolder { } List result = new ArrayList<>(); for (Platform platform : platformList) { - String key = String.format("%s_%s_%s_%s", prefix, userSetting.getServerId(), "catalog", platform.getServerGBId()); + String key = String.format("%s:%s:%s", prefix, "catalog", platform.getServerGBId()); if (redisTemplate.hasKey(key)) { result.add(platform.getServerGBId()); } @@ -110,7 +112,7 @@ public class SubscribeHolder { } List result = new ArrayList<>(); for (Platform platform : platformList) { - String key = String.format("%s_%s_%s_%s", prefix, userSetting.getServerId(), "mobilePosition", platform.getServerGBId()); + String key = String.format("%s:%s:%s", prefix, "mobilePosition", platform.getServerGBId()); if (redisTemplate.hasKey(key)) { result.add(platform.getServerGBId()); } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeInfo.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeInfo.java index 5820cb67c..d33fa27f3 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeInfo.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeInfo.java @@ -37,6 +37,11 @@ public class SubscribeInfo { */ private String simulatedCallId; + /** + * 来源serverId + */ + private String serverId; + public static SubscribeInfo getInstance(SIPResponse response, String id, int expires, EventHeader eventHeader){ SubscribeInfo subscribeInfo = new SubscribeInfo(); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/dao/DeviceChannelMapper.java b/src/main/java/com/genersoft/iot/vmp/gb28181/dao/DeviceChannelMapper.java index b561d8d02..77e9e5ecb 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/dao/DeviceChannelMapper.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/dao/DeviceChannelMapper.java @@ -22,13 +22,13 @@ public interface DeviceChannelMapper { "insert into wvp_device_channel " + "(device_id, data_type, data_device_id, name, manufacturer, model, owner, civil_code, block, " + "address, parental, parent_id, safety_way, register_way, cert_num, certifiable, err_code, end_time, secrecy, " + - "ip_address, port, password, status, longitude, latitude, gb_longitude, gb_latitude, ptz_type, position_type, room_type, use_type, " + + "ip_address, port, password, status, longitude, latitude, ptz_type, position_type, room_type, use_type, " + "supply_light_type, direction_type, resolution, business_group_id, download_speed, svc_space_support_mod, " + "svc_time_support_mode, create_time, update_time, sub_count, stream_id, has_audio, gps_time, stream_identification, channel_type) " + "values " + "(#{deviceId}, #{dataType}, #{dataDeviceId}, #{name}, #{manufacturer}, #{model}, #{owner}, #{civilCode}, #{block}, " + "#{address}, #{parental}, #{parentId}, #{safetyWay}, #{registerWay}, #{certNum}, #{certifiable}, #{errCode}, #{endTime}, #{secrecy}, " + - "#{ipAddress}, #{port}, #{password}, #{status}, #{longitude}, #{latitude}, #{gbLongitude}, #{gbLatitude}, #{ptzType}, #{positionType}, #{roomType}, #{useType}, " + + "#{ipAddress}, #{port}, #{password}, #{status}, #{longitude}, #{latitude}, #{ptzType}, #{positionType}, #{roomType}, #{useType}, " + "#{supplyLightType}, #{directionType}, #{resolution}, #{businessGroupId}, #{downloadSpeed}, #{svcSpaceSupportMod}," + " #{svcTimeSupportMode}, #{createTime}, #{updateTime}, #{subCount}, #{streamId}, #{hasAudio}, #{gpsTime}, #{streamIdentification}, #{channelType}) " + "") @@ -63,8 +63,6 @@ public interface DeviceChannelMapper { ", status=#{status}" + ", longitude=#{longitude}" + ", latitude=#{latitude}" + - ", gb_longitude=#{gbLongitude}" + - ", gb_latitude=#{gbLatitude}" + ", ptz_type=#{ptzType}" + ", position_type=#{positionType}" + ", room_type=#{roomType}" + @@ -200,14 +198,14 @@ public interface DeviceChannelMapper { "insert into wvp_device_channel " + "(device_id, data_type, data_device_id, name, manufacturer, model, owner, civil_code, block, " + "address, parental, parent_id, safety_way, register_way, cert_num, certifiable, err_code, end_time, secrecy, " + - "ip_address, port, password, status, longitude, latitude, gb_longitude, gb_latitude, ptz_type, position_type, room_type, use_type, " + + "ip_address, port, password, status, longitude, latitude, ptz_type, position_type, room_type, use_type, " + "supply_light_type, direction_type, resolution, business_group_id, download_speed, svc_space_support_mod, " + "svc_time_support_mode, create_time, update_time, sub_count, stream_id, has_audio, gps_time, stream_identification, channel_type) " + "values " + " " + "(#{item.deviceId}, #{item.dataType}, #{item.dataDeviceId}, #{item.name}, #{item.manufacturer}, #{item.model}, #{item.owner}, #{item.civilCode}, #{item.block}, " + "#{item.address}, #{item.parental}, #{item.parentId}, #{item.safetyWay}, #{item.registerWay}, #{item.certNum}, #{item.certifiable}, #{item.errCode}, #{item.endTime}, #{item.secrecy}, " + - "#{item.ipAddress}, #{item.port}, #{item.password}, #{item.status}, #{item.longitude}, #{item.latitude}, #{item.gbLongitude}, #{item.gbLatitude}, #{item.ptzType}, #{item.positionType}, #{item.roomType}, #{item.useType}, " + + "#{item.ipAddress}, #{item.port}, #{item.password}, #{item.status}, #{item.longitude}, #{item.latitude}, #{item.ptzType}, #{item.positionType}, #{item.roomType}, #{item.useType}, " + "#{item.supplyLightType}, #{item.directionType}, #{item.resolution}, #{item.businessGroupId}, #{item.downloadSpeed}, #{item.svcSpaceSupportMod}," + " #{item.svcTimeSupportMode}, #{item.createTime}, #{item.updateTime}, #{item.subCount}, #{item.streamId}, #{item.hasAudio}, #{item.gpsTime}, #{item.streamIdentification}, #{item.channelType}) " + " " + @@ -534,8 +532,6 @@ public interface DeviceChannelMapper { ", status=#{status}" + ", longitude=#{longitude}" + ", latitude=#{latitude}" + - ", gb_longitude=#{gbLongitude}" + - ", gb_latitude=#{gbLatitude}" + ", ptz_type=#{ptzType}" + ", position_type=#{positionType}" + ", room_type=#{roomType}" + diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/event/EventPublisher.java b/src/main/java/com/genersoft/iot/vmp/gb28181/event/EventPublisher.java index df216585b..2e05e2587 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/event/EventPublisher.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/event/EventPublisher.java @@ -65,16 +65,19 @@ public class EventPublisher { } public void channelEventPublishForUpdate(CommonGBChannel commonGBChannel, CommonGBChannel deviceChannelForOld) { + log.info("[通道改变内部分发-更新] {}", commonGBChannel.getGbDeviceId()); ChannelEvent channelEvent = ChannelEvent.getInstanceForUpdate(this, Collections.singletonList(commonGBChannel), Collections.singletonList(deviceChannelForOld)); applicationEventPublisher.publishEvent(channelEvent); } public void channelEventPublishForUpdate(List channelList, List channelListForOld) { + log.info("[通道改变内部分发-更新] 数量: {}", channelList.size()); ChannelEvent channelEvent = ChannelEvent.getInstanceForUpdate(this, channelList, channelListForOld); applicationEventPublisher.publishEvent(channelEvent); } public void channelEventPublish(List channelList, ChannelEvent.ChannelEventMessageType type) { + log.info("[通道改变内部分发-{}] 数量: {}", type, channelList.size()); ChannelEvent channelEvent = ChannelEvent.getInstance(this, type, channelList); applicationEventPublisher.publishEvent(channelEvent); } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/IGbChannelService.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/IGbChannelService.java index 6f5b45a07..1ebcd4a07 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/IGbChannelService.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/IGbChannelService.java @@ -24,11 +24,11 @@ public interface IGbChannelService { int offline(CommonGBChannel commonGBChannel); - int offline(List commonGBChannelList); + int offline(List commonGBChannelList, boolean permission); int online(CommonGBChannel commonGBChannel); - int online(List commonGBChannelList); + int online(List commonGBChannelList, boolean permission); void batchAdd(List commonGBChannels); @@ -78,7 +78,7 @@ public interface IGbChannelService { void deleteChannelToGroupByGbDevice(List deviceIds); - void batchUpdate(List commonGBChannels); + void batchUpdateForStreamPushRedisMsg(List commonGBChannels, boolean permission); CommonGBChannel queryOneWithPlatform(Integer platformId, String channelDeviceId); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/GbChannelServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/GbChannelServiceImpl.java index 984e293f3..7071d914a 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/GbChannelServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/GbChannelServiceImpl.java @@ -187,7 +187,7 @@ public class GbChannelServiceImpl implements IGbChannelService, CommandLineRunne @Override public int update(CommonGBChannel commonGBChannel) { - log.info("[更新通道] 通道ID: {}, ", commonGBChannel.getGbId()); + log.info("[更新通道] 通道ID: {}, ", commonGBChannel.toString()); if (commonGBChannel.getGbId() <= 0) { log.warn("[更新通道] 未找到数据库ID,更新失败, {}({})", commonGBChannel.getGbName(), commonGBChannel.getGbDeviceId()); return 0; @@ -248,32 +248,33 @@ public class GbChannelServiceImpl implements IGbChannelService, CommandLineRunne @Override @Transactional - public int offline(List commonGBChannelList) { + public int offline(List commonGBChannelList, boolean permission) { if (commonGBChannelList.isEmpty()) { log.warn("[多个通道离线] 通道数量为0,更新失败"); return 0; } log.info("[通道离线] 共 {} 个", commonGBChannelList.size()); - int limitCount = 1000; int result = 0; - if (commonGBChannelList.size() > limitCount) { - for (int i = 0; i < commonGBChannelList.size(); i += limitCount) { - int toIndex = i + limitCount; - if (i + limitCount > commonGBChannelList.size()) { - toIndex = commonGBChannelList.size(); + if (permission) { + int limitCount = 1000; + if (commonGBChannelList.size() > limitCount) { + for (int i = 0; i < commonGBChannelList.size(); i += limitCount) { + int toIndex = i + limitCount; + if (i + limitCount > commonGBChannelList.size()) { + toIndex = commonGBChannelList.size(); + } + result += commonGBChannelMapper.updateStatusForListById(commonGBChannelList.subList(i, toIndex), "OFF"); } - result += commonGBChannelMapper.updateStatusForListById(commonGBChannelList.subList(i, toIndex), "OFF"); + } else { + result += commonGBChannelMapper.updateStatusForListById(commonGBChannelList, "OFF"); } - } else { - result += commonGBChannelMapper.updateStatusForListById(commonGBChannelList, "OFF"); + log.info("[通道离线] 保存入库 共 {} 个改变", result); } - if (result > 0) { - try { - // 发送catalog - eventPublisher.channelEventPublish(commonGBChannelList, ChannelEvent.ChannelEventMessageType.OFF); - } catch (Exception e) { - log.warn("[多个通道离线] 发送失败,数量:{}", commonGBChannelList.size(), e); - } + try { + // 发送catalog + eventPublisher.channelEventPublish(commonGBChannelList, ChannelEvent.ChannelEventMessageType.OFF); + } catch (Exception e) { + log.warn("[多个通道离线] 发送失败,数量:{}", commonGBChannelList.size(), e); } return result; } @@ -298,24 +299,26 @@ public class GbChannelServiceImpl implements IGbChannelService, CommandLineRunne @Override @Transactional - public int online(List commonGBChannelList) { + public int online(List commonGBChannelList, boolean permission) { if (commonGBChannelList.isEmpty()) { log.warn("[多个通道上线] 通道数量为0,更新失败"); return 0; } - // 批量更新 - int limitCount = 1000; int result = 0; - if (commonGBChannelList.size() > limitCount) { - for (int i = 0; i < commonGBChannelList.size(); i += limitCount) { - int toIndex = i + limitCount; - if (i + limitCount > commonGBChannelList.size()) { - toIndex = commonGBChannelList.size(); + if (permission) { + // 批量更新 + int limitCount = 1000; + if (commonGBChannelList.size() > limitCount) { + for (int i = 0; i < commonGBChannelList.size(); i += limitCount) { + int toIndex = i + limitCount; + if (i + limitCount > commonGBChannelList.size()) { + toIndex = commonGBChannelList.size(); + } + result += commonGBChannelMapper.updateStatusForListById(commonGBChannelList.subList(i, toIndex), "ON"); } - result += commonGBChannelMapper.updateStatusForListById(commonGBChannelList.subList(i, toIndex), "ON"); + } else { + result += commonGBChannelMapper.updateStatusForListById(commonGBChannelList, "ON"); } - } else { - result += commonGBChannelMapper.updateStatusForListById(commonGBChannelList, "ON"); } try { // 发送catalog @@ -358,27 +361,29 @@ public class GbChannelServiceImpl implements IGbChannelService, CommandLineRunne } @Override - public void batchUpdate(List commonGBChannels) { + public void batchUpdateForStreamPushRedisMsg(List commonGBChannels, boolean permission) { if (commonGBChannels.isEmpty()) { log.warn("[更新多个通道] 通道数量为0,更新失败"); return; } List oldCommonGBChannelList = commonGBChannelMapper.queryOldChanelListByChannels(commonGBChannels); - // 批量保存 - int limitCount = 1000; - int result = 0; - if (commonGBChannels.size() > limitCount) { - for (int i = 0; i < commonGBChannels.size(); i += limitCount) { - int toIndex = i + limitCount; - if (i + limitCount > commonGBChannels.size()) { - toIndex = commonGBChannels.size(); + if (permission) { + // 批量保存 + int limitCount = 1000; + int result = 0; + if (commonGBChannels.size() > limitCount) { + for (int i = 0; i < commonGBChannels.size(); i += limitCount) { + int toIndex = i + limitCount; + if (i + limitCount > commonGBChannels.size()) { + toIndex = commonGBChannels.size(); + } + result += commonGBChannelMapper.batchUpdate(commonGBChannels.subList(i, toIndex)); } - result += commonGBChannelMapper.batchUpdate(commonGBChannels.subList(i, toIndex)); + } else { + result += commonGBChannelMapper.batchUpdate(commonGBChannels); } - } else { - result += commonGBChannelMapper.batchUpdate(commonGBChannels); + log.info("[更新多个通道] 通道数量为{},成功保存:{}", commonGBChannels.size(), result); } - log.info("[更新多个通道] 通道数量为{},成功保存:{}", commonGBChannels.size(), result); // 发送通过更新通知 try { // 发送通知 diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/GroupServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/GroupServiceImpl.java index 9a9f0bc2d..f7342a7ff 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/GroupServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/GroupServiceImpl.java @@ -269,14 +269,12 @@ public class GroupServiceImpl implements IGroupService { if (businessGroupInDb == null) { throw new ControllerException(ErrorCode.ERROR100.getCode(), "业务分组不存在"); } - List groupList = new LinkedList<>(); - groupList.add(businessGroupInDb); Group group = groupManager.queryOneByDeviceId(deviceId, businessGroup); if (group == null) { throw new ControllerException(ErrorCode.ERROR100.getCode(), "虚拟组织不存在"); } List allParent = getAllParent(group); - groupList.addAll(allParent); + List groupList = new LinkedList<>(allParent); groupList.add(group); return groupList; } @@ -286,10 +284,9 @@ public class GroupServiceImpl implements IGroupService { return new ArrayList<>(); } - List groupList = new ArrayList<>(); Group parent = groupManager.queryOneByDeviceId(group.getParentDeviceId(), group.getBusinessGroup()); if (parent == null) { - return groupList; + return new ArrayList<>(); } List allParent = getAllParent(parent); allParent.add(parent); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/RegionServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/RegionServiceImpl.java index f2091e9cd..903753fcc 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/RegionServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/RegionServiceImpl.java @@ -236,8 +236,9 @@ public class RegionServiceImpl implements IRegionService { throw new ControllerException(ErrorCode.ERROR100.getCode(), "行政区划不存在"); } List allParent = getAllParent(region); - allParent.add(region); - return allParent; + List regionList = new LinkedList<>(allParent); + regionList.add(region); + return regionList; } @@ -246,15 +247,13 @@ public class RegionServiceImpl implements IRegionService { return new ArrayList<>(); } - List regionList = new LinkedList<>(); Region parent = regionMapper.queryByDeviceId(region.getParentDeviceId()); if (parent == null) { - return regionList; + return new ArrayList<>(); } - regionList.add(parent); List allParent = getAllParent(parent); - regionList.addAll(allParent); - return regionList; + allParent.add(parent); + return allParent; } @Override diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/MobilePositionServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/MobilePositionServiceImpl.java index 1b145f0cd..cfb5b0df6 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/MobilePositionServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/MobilePositionServiceImpl.java @@ -86,7 +86,6 @@ public class MobilePositionServiceImpl implements IMobilePositionService { /** * 查询最新移动位置 - * @param deviceId */ @Override public MobilePosition queryLatestPosition(String deviceId) { @@ -104,7 +103,7 @@ public class MobilePositionServiceImpl implements IMobilePositionService { if (userSetting.getSavePositionHistory()) { mobilePositionMapper.batchadd(mobilePositions); } - log.debug("[移动位置订阅]更新通道位置: {}", mobilePositions.size()); + log.info("[移动位置订阅]更新通道位置: {}", mobilePositions.size()); Map> updateChannelMap = new HashMap<>(); for (MobilePosition mobilePosition : mobilePositions) { DeviceChannel deviceChannel = new DeviceChannel(); diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGroupMsgListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGroupMsgListener.java index 667d82da6..e5fa4da9e 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGroupMsgListener.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGroupMsgListener.java @@ -109,50 +109,49 @@ public class RedisGroupMsgListener implements MessageListener { continue; } - Group topGroup = aliasGroupMap.get(groupMessage.getTopGroupGAlias()); - if (topGroup == null) { - topGroup = aliasGroupToSave.get(groupMessage.getTopGroupGAlias()); - } - if (topGroup == null) { - log.info("[REDIS消息-业务分组同步回复] 业务分组信息未发送或者未首先发送, {}", groupMessage.toString()); - continue; - } - group.setBusinessGroup(topGroup.getDeviceId()); - if (groupMessage.getParentGAlias() != null) { - Group parentGroup = aliasGroupMap.get(groupMessage.getParentGAlias()); - if (parentGroup == null) { - parentGroup = aliasGroupToSave.get(groupMessage.getParentGAlias()); + Group topGroup = aliasGroupMap.get(groupMessage.getTopGroupGAlias()); + if (topGroup == null) { + topGroup = aliasGroupToSave.get(groupMessage.getTopGroupGAlias()); } - if (parentGroup == null) { - log.info("[REDIS消息-业务分组同步回复] 虚拟组织父节点未发送或者未首先发送, {}", groupMessage.toString()); + if (topGroup == null) { + log.info("[REDIS消息-业务分组同步回复] 业务分组信息未发送或者未首先发送, {}", groupMessage.toString()); continue; } - group.setParentId(null); - group.setParentDeviceId(parentGroup.getDeviceId()); + group.setBusinessGroup(topGroup.getDeviceId()); + if (groupMessage.getParentGAlias() != null) { + Group parentGroup = aliasGroupMap.get(groupMessage.getParentGAlias()); + if (parentGroup == null) { + parentGroup = aliasGroupToSave.get(groupMessage.getParentGAlias()); + } + if (parentGroup == null) { + log.info("[REDIS消息-业务分组同步回复] 虚拟组织父节点未发送或者未首先发送, {}", groupMessage.toString()); + continue; + } + group.setParentId(null); + group.setParentDeviceId(parentGroup.getDeviceId()); + } else { + group.setParentId(null); + group.setParentDeviceId(topGroup.getDeviceId()); + } } else { group.setParentId(null); - group.setParentDeviceId(topGroup.getDeviceId()); + group.setBusinessGroup(group.getDeviceId()); + group.setParentDeviceId(null); } - } else { - group.setParentId(null); - group.setBusinessGroup(group.getDeviceId()); - group.setParentDeviceId(null); + group.setUpdateTime(DateUtil.getNow()); + aliasGroupToSave.put(group.getAlias(), group); } - group.setUpdateTime(DateUtil.getNow()); - aliasGroupToSave.put(group.getAlias(), group); + log.info("[业务分组同步回复-存储分组数据] {}", JSONObject.toJSONString(aliasGroupToSave.values())); + // 存储分组数据 + groupService.saveByAlias(aliasGroupToSave.values()); + + } catch (ControllerException e) { + log.warn("[REDIS消息-业务分组同步回复] 失败, \r\n{}", e.getMsg()); + } catch (Exception e) { + log.warn("[REDIS消息-业务分组同步回复] 发现未处理的异常, \r\n{}", new String(msg.getBody())); + log.error("[REDIS消息-业务分组同步回复] 异常内容: ", e); } - log.info("[业务分组同步回复-存储分组数据] {}", JSONObject.toJSONString(aliasGroupToSave.values())); - // 存储分组数据 - groupService.saveByAlias(aliasGroupToSave.values()); - - } catch (ControllerException e) { - log.warn("[REDIS消息-业务分组同步回复] 失败, \r\n{}", e.getMsg()); - } catch (Exception e) { - log.warn("[REDIS消息-业务分组同步回复] 发现未处理的异常, \r\n{}", new String(msg.getBody())); - log.error("[REDIS消息-业务分组同步回复] 异常内容: ", e); } - } - } @@ -175,7 +174,7 @@ public class RedisGroupMsgListener implements MessageListener { codeType = "215"; } return String.format(deviceTemplate, codeType, RandomStringUtils.insecure().next(6, false, true)); - }catch (Exception e) { + } catch (Exception e) { log.error("[REDIS消息-业务分组同步回复] 构建新的分组编号失败", e); return null; } diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamListMsgListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamListMsgListener.java index 31cb95c9a..735bd9939 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamListMsgListener.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamListMsgListener.java @@ -48,10 +48,6 @@ public class RedisPushStreamListMsgListener implements MessageListener { @Override public void onMessage(Message message, byte[] bytes) { - String serverId = redisCatchStorage.chooseOneServer(null); - if (!userSetting.getServerId().equals(serverId)) { - return; - } log.info("[REDIS: 推流设备列表更新]: {}", new String(message.getBody())); taskQueue.offer(message); } @@ -130,7 +126,7 @@ public class RedisPushStreamListMsgListener implements MessageListener { if (!streamPushItemForUpdate.isEmpty()) { log.info("修改{}条", streamPushItemForUpdate.size()); log.info(JSONObject.toJSONString(streamPushItemForUpdate)); - streamPushService.batchUpdate(streamPushItemForUpdate); + streamPushService.batchUpdateForRedisMsg(streamPushItemForUpdate); } } catch (Exception e) { log.warn("[REDIS消息-推流设备列表更新] 发现未处理的异常, \r\n{}", new String(msg.getBody())); diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamStatusMsgListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamStatusMsgListener.java index e3d4e9670..942788d39 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamStatusMsgListener.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamStatusMsgListener.java @@ -79,19 +79,19 @@ public class RedisPushStreamStatusMsgListener implements MessageListener, Applic dynamicTask.stop(VideoManagerConstants.VM_MSG_GET_ALL_ONLINE_REQUESTED); if (streamStatusMessage.isSetAllOffline()) { // 所有设备离线 - streamPushService.allOffline(); + streamPushService.allOfflineForRedisMsg(); } if (streamStatusMessage.getOfflineStreams() != null && !streamStatusMessage.getOfflineStreams().isEmpty()) { // 更新部分设备离线 log.info("[REDIS: 推流设备状态变化] 更新部分设备离线: {}个", streamStatusMessage.getOfflineStreams().size()); - streamPushService.offline(streamStatusMessage.getOfflineStreams()); + streamPushService.offlineforRedisMsg(streamStatusMessage.getOfflineStreams()); } if (streamStatusMessage.getOnlineStreams() != null && !streamStatusMessage.getOnlineStreams().isEmpty()) { // 更新部分设备上线 log.info("[REDIS: 推流设备状态变化] 更新部分设备上线: {}个", streamStatusMessage.getOnlineStreams().size()); - streamPushService.online(streamStatusMessage.getOnlineStreams()); + streamPushService.onlineForRedisMsg(streamStatusMessage.getOnlineStreams()); } } catch (Exception e) { log.warn("[REDIS消息-推流设备状态变化] 发现未处理的异常, \r\n{}", JSON.parseObject(msg.getBody())); @@ -115,7 +115,7 @@ public class RedisPushStreamStatusMsgListener implements MessageListener, Applic dynamicTask.startDelay(VideoManagerConstants.VM_MSG_GET_ALL_ONLINE_REQUESTED, () -> { log.info("[REDIS消息]未收到redis回复推流设备状态,执行推流设备离线"); // 五秒收不到请求就设置通道离线,然后通知上级离线 - streamPushService.allOffline(); + streamPushService.allOfflineForRedisMsg(); }, 5000); } diff --git a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java index e6acf543b..40c741acf 100755 --- a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java @@ -524,7 +524,8 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage { if (serverId != null) { redisTemplate.opsForZSet().remove(key, serverId); } - Set range = redisTemplate.opsForZSet().range(key, 0, 0); + // 获取得分最高的,也是最后更新时间到redis的wvp,这样可以避免读取到离线的wvp,同时时间最新也一定程度代表最健康的 + Set range = redisTemplate.opsForZSet().reverseRange(key, 0, 0); if (range == null || range.isEmpty()) { return null; } diff --git a/src/main/java/com/genersoft/iot/vmp/streamProxy/service/impl/StreamProxyServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/streamProxy/service/impl/StreamProxyServiceImpl.java index ca5141249..854a43a5c 100755 --- a/src/main/java/com/genersoft/iot/vmp/streamProxy/service/impl/StreamProxyServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/streamProxy/service/impl/StreamProxyServiceImpl.java @@ -289,7 +289,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService { } if (!channelListForOnline.isEmpty()) { - gbChannelService.online(channelListForOnline); + gbChannelService.online(channelListForOnline, true); } List channelListForOffline = new ArrayList<>(); List streamProxiesForRemove = new ArrayList<>(); @@ -302,7 +302,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService { } } if (!channelListForOffline.isEmpty()) { - gbChannelService.offline(channelListForOffline); + gbChannelService.offline(channelListForOffline, true); } if (!streamProxiesForRemove.isEmpty()) { streamProxyMapper.deleteByList(streamProxiesForRemove); @@ -338,7 +338,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService { } if (!channelListForOffline.isEmpty()) { // 修改国标关联的国标通道的状态 - gbChannelService.offline(channelListForOffline); + gbChannelService.offline(channelListForOffline, true); } if (!streamProxiesForSendMessage.isEmpty()) { for (StreamProxy streamProxy : streamProxiesForSendMessage) { diff --git a/src/main/java/com/genersoft/iot/vmp/streamPush/dao/StreamPushMapper.java b/src/main/java/com/genersoft/iot/vmp/streamPush/dao/StreamPushMapper.java index 57fbfa005..0077d7b65 100755 --- a/src/main/java/com/genersoft/iot/vmp/streamPush/dao/StreamPushMapper.java +++ b/src/main/java/com/genersoft/iot/vmp/streamPush/dao/StreamPushMapper.java @@ -160,6 +160,6 @@ public interface StreamPushMapper { @Delete(" DELETE FROM wvp_stream_push st " + " LEFT join wvp_device_channel wdc on wdc.data_type = 2 and st.id = wdc.data_device_id " + - " where wdc.id is null and st.server_id = #{id}") - void deleteWithoutGBId(String id); + " where wdc.id is null and st.server_id = #{serverId}") + void deleteWithoutGBId(@Param("serverId") String serverId); } diff --git a/src/main/java/com/genersoft/iot/vmp/streamPush/service/IStreamPushService.java b/src/main/java/com/genersoft/iot/vmp/streamPush/service/IStreamPushService.java index 9621d8c9e..49b865bae 100755 --- a/src/main/java/com/genersoft/iot/vmp/streamPush/service/IStreamPushService.java +++ b/src/main/java/com/genersoft/iot/vmp/streamPush/service/IStreamPushService.java @@ -52,17 +52,17 @@ public interface IStreamPushService { /** * 全部离线 */ - void allOffline(); + void allOfflineForRedisMsg(); /** * 推流离线 */ - void offline(List offlineStreams); + void offlineforRedisMsg(List offlineStreams); /** * 推流上线 */ - void online(List onlineStreams); + void onlineForRedisMsg(List onlineStreams); /** * 增加推流 @@ -91,7 +91,7 @@ public interface IStreamPushService { void updatePushStatus(StreamPush streamPush); - void batchUpdate(List streamPushItemForUpdate); + void batchUpdateForRedisMsg(List streamPushItemForUpdate); int delete(int id); diff --git a/src/main/java/com/genersoft/iot/vmp/streamPush/service/impl/StreamPushServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/streamPush/service/impl/StreamPushServiceImpl.java index f0a547ce4..60c5555f8 100755 --- a/src/main/java/com/genersoft/iot/vmp/streamPush/service/impl/StreamPushServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/streamPush/service/impl/StreamPushServiceImpl.java @@ -446,7 +446,9 @@ public class StreamPushServiceImpl implements IStreamPushService { } @Override - public void allOffline() { + public void allOfflineForRedisMsg() { + String serverId = redisCatchStorage.chooseOneServer(null); + boolean permission = userSetting.getServerId().equals(serverId); List streamPushList = streamPushMapper.selectAll(null, null, null); if (streamPushList.isEmpty()) { return; @@ -458,11 +460,13 @@ public class StreamPushServiceImpl implements IStreamPushService { commonGBChannelList.add(streamPush.buildCommonGBChannel()); } } - gbChannelService.offline(commonGBChannelList); + gbChannelService.offline(commonGBChannelList, permission); } @Override - public void offline(List offlineStreams) { + public void offlineforRedisMsg(List offlineStreams) { + String serverId = redisCatchStorage.chooseOneServer(null); + boolean permission = userSetting.getServerId().equals(serverId); // 更新部分设备离线 List streamPushList = streamPushMapper.getListInList(offlineStreams); if (streamPushList.isEmpty()) { @@ -470,15 +474,17 @@ public class StreamPushServiceImpl implements IStreamPushService { return; } List commonGBChannelList = gbChannelService.queryListByStreamPushList(streamPushList); - gbChannelService.offline(commonGBChannelList); + gbChannelService.offline(commonGBChannelList, permission); } @Override - public void online(List onlineStreams) { + public void onlineForRedisMsg(List onlineStreams) { if (onlineStreams.isEmpty()) { log.info("[设备上线] 推流设备列表为空"); return; } + String serverId = redisCatchStorage.chooseOneServer(null); + boolean permission = userSetting.getServerId().equals(serverId); // 更新部分设备上线streamPushService List streamPushList = streamPushMapper.getListInList(onlineStreams); if (streamPushList.isEmpty()) { @@ -488,7 +494,7 @@ public class StreamPushServiceImpl implements IStreamPushService { return; } List commonGBChannelList = gbChannelService.queryListByStreamPushList(streamPushList); - gbChannelService.online(commonGBChannelList); + gbChannelService.online(commonGBChannelList, permission); } @Override @@ -556,15 +562,19 @@ public class StreamPushServiceImpl implements IStreamPushService { @Override @Transactional - public void batchUpdate(List streamPushItemForUpdate) { - streamPushMapper.batchUpdate(streamPushItemForUpdate); + public void batchUpdateForRedisMsg(List streamPushItemForUpdate) { + String serverId = redisCatchStorage.chooseOneServer(null); + boolean permission = userSetting.getServerId().equals(serverId); + if (permission) { + streamPushMapper.batchUpdate(streamPushItemForUpdate); + } List commonGBChannels = new ArrayList<>(); for (StreamPush streamPush : streamPushItemForUpdate) { if (!ObjectUtils.isEmpty(streamPush.getGbDeviceId())) { commonGBChannels.add(streamPush.buildCommonGBChannel()); } } - gbChannelService.batchUpdate(commonGBChannels); + gbChannelService.batchUpdateForStreamPushRedisMsg(commonGBChannels, permission); } @Override diff --git a/src/main/java/com/genersoft/iot/vmp/web/custom/bean/SYMember.java b/src/main/java/com/genersoft/iot/vmp/web/custom/bean/SYMember.java index 78a61ea6d..333da68b6 100644 --- a/src/main/java/com/genersoft/iot/vmp/web/custom/bean/SYMember.java +++ b/src/main/java/com/genersoft/iot/vmp/web/custom/bean/SYMember.java @@ -12,5 +12,6 @@ public class SYMember { private Long blockId; private String unitNo; private String terminalMemberStatus; + private String channelDeviceId; } diff --git a/src/main/java/com/genersoft/iot/vmp/web/custom/service/CameraChannelService.java b/src/main/java/com/genersoft/iot/vmp/web/custom/service/CameraChannelService.java index 5e2f7138f..1a7ef5668 100644 --- a/src/main/java/com/genersoft/iot/vmp/web/custom/service/CameraChannelService.java +++ b/src/main/java/com/genersoft/iot/vmp/web/custom/service/CameraChannelService.java @@ -42,6 +42,7 @@ public class CameraChannelService implements CommandLineRunner { private final String REDIS_CHANNEL_MESSAGE = "VM_MSG_MOBILE_CHANNEL"; private final String REDIS_MEMBER_STATUS_MESSAGE = "VM_MSG_MEMBER_STATUS_CHANNEL"; private final String MOBILE_CHANNEL_PREFIX = "nationalStandardMobileTerminal_"; + private final String DELAY_TASK_KEY = "DELAY_TASK_KEY_"; @Autowired private CommonGBChannelMapper channelMapper; @@ -126,9 +127,9 @@ public class CameraChannelService implements CommandLineRunner { List resultListForOnline = new ArrayList<>(); List resultListForOffline = new ArrayList<>(); - List memberList = new ArrayList<>(); - List addMemberList = new ArrayList<>(); + Map delayChannelMap = new HashMap<>(); + List memberList = new ArrayList<>(); switch (event.getMessageType()) { @@ -157,16 +158,29 @@ public class CameraChannelService implements CommandLineRunner { if (oldChannel != null) { if (oldChannel.getGbPtzType() != null && oldChannel.getGbPtzType() == 99) { resultListForUpdate.add(channel); + // 如果状态变化发送消息 + if (!Objects.equals(oldChannel.getGbStatus(), channel.getGbStatus())) { + SYMember member = getMember(channel.getGbDeviceId()); + if (member != null) { + if ("ON".equals(channel.getGbStatus())) { + member.setTerminalMemberStatus("ONLINE"); + }else { + member.setTerminalMemberStatus("OFFLINE"); + } + memberList.add(member); + } + } + }else { resultListForAdd.add(channel); if ("ON".equals(channel.getGbStatus())) { - addMemberList.add(channel); + delayChannelMap.put(channel.getGbDeviceId(), channel); } } }else { resultListForAdd.add(channel); if ("ON".equals(channel.getGbStatus())) { - addMemberList.add(channel); + delayChannelMap.put(channel.getGbDeviceId(), channel); } } }else { @@ -175,6 +189,11 @@ public class CameraChannelService implements CommandLineRunner { CameraChannel cameraChannel = new CameraChannel(); cameraChannel.setGbDeviceId(channel.getGbDeviceId()); resultListForDelete.add(cameraChannel); + SYMember member = getMember(cameraChannel.getGbDeviceId()); + if (member != null) { + member.setTerminalMemberStatus("OFFLINE"); + memberList.add(member); + } } } } @@ -186,8 +205,14 @@ public class CameraChannelService implements CommandLineRunner { CameraChannel cameraChannel = new CameraChannel(); cameraChannel.setGbDeviceId(channel.getGbDeviceId()); resultListForDelete.add(cameraChannel); + SYMember member = getMember(cameraChannel.getGbDeviceId()); + if (member != null) { + member.setTerminalMemberStatus("OFFLINE"); + memberList.add(member); + } } } + break; case ON: case OFF: @@ -221,7 +246,7 @@ public class CameraChannelService implements CommandLineRunner { if (channel.getGbPtzType() != null && channel.getGbPtzType() == 99) { resultListForAdd.add(channel); if ("ON".equals(channel.getGbStatus())) { - addMemberList.add(channel); + delayChannelMap.put(channel.getGbDeviceId(), channel); } } } @@ -249,27 +274,34 @@ public class CameraChannelService implements CommandLineRunner { if (!memberList.isEmpty()) { sendMemberStatusMessage(memberList); } - if (!addMemberList.isEmpty()) { + if (!delayChannelMap.isEmpty()) { // 对于在线的终端进行延迟检查和发送 - String key = UUID.randomUUID().toString(); - dynamicTask.startDelay(key, () -> { - List members = new ArrayList<>(); - for (CommonGBChannel commonGBChannel : addMemberList) { + for (CommonGBChannel commonGBChannel : delayChannelMap.values()) { + String key = DELAY_TASK_KEY + commonGBChannel.getGbDeviceId(); + dynamicTask.startDelay(key, () -> { + dynamicTask.stop(key); SYMember member = getMember(commonGBChannel.getGbDeviceId()); if (member == null) { - continue; + return; } member.setTerminalMemberStatus("ONLINE"); - members.add(member); - } - if (!members.isEmpty()) { - sendMemberStatusMessage(members); - } - }, 5000); + sendMemberStatusMessage(List.of(member)); + }, 3000); + } } } + private void sendMemberStatusMessage(List memberList) { + // 取消延时发送 + for (SYMember syMember : memberList) { + String key = DELAY_TASK_KEY + syMember.getChannelDeviceId(); + if (dynamicTask.contains(key)) { + log.info("[SY-redis发送通知] 取消延时新增任务: {}", key); + dynamicTask.stop(key); + } + } + String jsonString = JSONObject.toJSONString(memberList); log.info("[SY-redis发送通知] 发送 状态变化 {}: {}", REDIS_MEMBER_STATUS_MESSAGE, jsonString); redisTemplateForString.convertAndSend(REDIS_MEMBER_STATUS_MESSAGE, jsonString); @@ -277,6 +309,7 @@ public class CameraChannelService implements CommandLineRunner { private void sendChannelMessage(List channelList, ChannelEvent.ChannelEventMessageType type) { if (channelList.isEmpty()) { + log.warn("[SY-redis发送通知-{}] 发送失败,数据为空, 通道信息变化 {}", type, REDIS_CHANNEL_MESSAGE); return; } List cameraChannelList = channelMapper.queryCameraChannelByIds(channelList); @@ -311,6 +344,7 @@ public class CameraChannelService implements CommandLineRunner { jsonObject.put("direction", mobilePosition.getDirection()); jsonObject.put("speed", mobilePosition.getSpeed()); jsonObject.put("blockId", member.getBlockId()); + jsonObject.put("gbDeviceId", mobilePosition.getChannelDeviceId()); log.info("[SY-redis发送通知-移动设备位置信息] 发送 {}: {}", REDIS_GPS_MESSAGE, jsonObject.toString()); redisTemplateForString.convertAndSend(REDIS_GPS_MESSAGE, jsonObject.toString()); } @@ -322,7 +356,9 @@ public class CameraChannelService implements CommandLineRunner { if (jsonObject == null) { return null; } - return JSONObject.parseObject(jsonObject.toString(), SYMember.class); + SYMember syMember = JSONObject.parseObject(jsonObject.toString(), SYMember.class); + syMember.setChannelDeviceId(deviceId); + return syMember; } diff --git a/web/src/views/common/CommonChannelEdit.vue b/web/src/views/common/CommonChannelEdit.vue index 79710a796..febb1444f 100644 --- a/web/src/views/common/CommonChannelEdit.vue +++ b/web/src/views/common/CommonChannelEdit.vue @@ -30,11 +30,14 @@ - + + + {{ key }} + @@ -274,6 +277,7 @@ export default { loading: false, modelList: [], parentPath: [], + regionPath: [], form: {} } }, @@ -387,6 +391,7 @@ export default { this.form = data this.$set(this.form, 'enableBroadcastForBool', this.form.enableBroadcast === 1) this.getPaths() + this.getRegionPaths() }) .finally(() => { this.loading = false @@ -400,6 +405,7 @@ export default { chooseCivilCode: function() { this.$refs.chooseCivilCode.openDialog(code => { this.form.gbCivilCode = code + this.getRegionPaths() }) }, chooseGroup: function() { @@ -431,6 +437,20 @@ export default { this.parentPath = path }) } + }, + getRegionPaths: function() { + this.regionPath = [] + if (this.form.gbCivilCode) { + this.$store.dispatch('region/queryPath', this.form.gbCivilCode) + .then(data => { + console.log(data) + const path = [] + for (let i = 0; i < data.length; i++) { + path.push(data[i].name) + } + this.regionPath = path + }) + } } } }