From 55f36f660bf0ef80f8fb7837e2d0d62cc1f58e90 Mon Sep 17 00:00:00 2001 From: lin <648540858@qq.com> Date: Fri, 23 May 2025 17:26:17 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E9=9B=86=E7=BE=A4=E6=92=AD?= =?UTF-8?q?=E6=94=BEBUG?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../iot/vmp/gb28181/bean/SubscribeHolder.java | 4 +- .../iot/vmp/gb28181/dao/PlatformMapper.java | 8 +-- .../iot/vmp/gb28181/event/EventPublisher.java | 22 +++---- .../subscribe/catalog/CatalogEventLister.java | 50 +++++++++------ .../MobilePositionEventLister.java | 6 +- .../vmp/gb28181/service/IPlatformService.java | 2 +- .../service/impl/DeviceServiceImpl.java | 7 ++- .../service/impl/GbChannelServiceImpl.java | 51 ++++++---------- .../service/impl/PlatformServiceImpl.java | 18 ++---- .../gb28181/service/impl/PlayServiceImpl.java | 2 +- .../request/impl/ByeRequestProcessor.java | 61 ++++++++++--------- .../redisMsg/IRedisRpcPlayService.java | 2 +- .../redisMsg/RedisAlarmMsgListener.java | 26 ++++---- .../RedisPushStreamStatusMsgListener.java | 4 +- .../service/RedisRpcPlayServiceImpl.java | 5 +- .../iot/vmp/storager/IRedisCatchStorage.java | 2 + .../storager/impl/RedisCatchStorageImpl.java | 7 +++ .../vmp/streamPush/dao/StreamPushMapper.java | 2 +- .../impl/StreamPushPlayServiceImpl.java | 2 +- .../service/impl/StreamPushServiceImpl.java | 15 ++++- web_src/src/main.js | 24 +++++--- 21 files changed, 176 insertions(+), 144 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeHolder.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeHolder.java index 58e0b81c5..17a5284b3 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeHolder.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeHolder.java @@ -98,7 +98,7 @@ public class SubscribeHolder { for (Platform platform : platformList) { String key = String.format("%s_%s_%s_%s", prefix, userSetting.getServerId(), "catalog", platform.getServerGBId()); if (redisTemplate.hasKey(key)) { - result.add(platform.getServerId()); + result.add(platform.getServerGBId()); } } return result; @@ -112,7 +112,7 @@ public class SubscribeHolder { for (Platform platform : platformList) { String key = String.format("%s_%s_%s_%s", prefix, userSetting.getServerId(), "mobilePosition", platform.getServerGBId()); if (redisTemplate.hasKey(key)) { - result.add(platform.getServerId()); + result.add(platform.getServerGBId()); } } return result; diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/dao/PlatformMapper.java b/src/main/java/com/genersoft/iot/vmp/gb28181/dao/PlatformMapper.java index 7b6846ea2..bb3c0f0e2 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/dao/PlatformMapper.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/dao/PlatformMapper.java @@ -89,8 +89,8 @@ public interface PlatformMapper { @Select("SELECT * FROM wvp_platform WHERE id=#{id}") Platform query(int id); - @Update("UPDATE wvp_platform SET status=#{online} WHERE id=#{id}" ) - int updateStatus(@Param("id") int id, @Param("online") boolean online); + @Update("UPDATE wvp_platform SET status=#{online}, server_id = #{serverId} WHERE id=#{id}" ) + int updateStatus(@Param("id") int id, @Param("online") boolean online, @Param("serverId") String serverId); @Select("SELECT server_id FROM wvp_platform WHERE enable=true and server_id != #{serverId} group by server_id") List queryServerIdsWithEnableAndNotInServer(@Param("serverId") String serverId); @@ -104,7 +104,7 @@ public interface PlatformMapper { @Select("SELECT * FROM wvp_platform WHERE enable=true and server_id = #{serverId}") List queryServerIdsWithEnableAndServer(@Param("serverId") String serverId); - @Update("UPDATE wvp_platform SET status=false" ) - void offlineAll(); + @Update("UPDATE wvp_platform SET status=false where server_id = #{serverId}" ) + void offlineAll(@Param("serverId") String serverId); } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/event/EventPublisher.java b/src/main/java/com/genersoft/iot/vmp/gb28181/event/EventPublisher.java index 9aa75b712..018abc5c6 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/event/EventPublisher.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/event/EventPublisher.java @@ -12,6 +12,7 @@ import com.genersoft.iot.vmp.media.bean.MediaServer; import com.genersoft.iot.vmp.media.event.mediaServer.MediaServerOfflineEvent; import com.genersoft.iot.vmp.media.event.mediaServer.MediaServerOnlineEvent; import com.genersoft.iot.vmp.service.redisMsg.IRedisRpcService; +import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.ApplicationEventPublisher; import org.springframework.stereotype.Component; @@ -21,11 +22,12 @@ import java.util.HashSet; import java.util.List; import java.util.Set; -/** +/** * @description:Event事件通知推送器,支持推送在线事件、离线事件 * @author: swwheihei - * @date: 2020年5月6日 上午11:30:50 + * @date: 2020年5月6日 上午11:30:50 */ +@Slf4j @Component public class EventPublisher { @@ -72,12 +74,7 @@ public class EventPublisher { } public void catalogEventPublish(Platform platform, List deviceChannels, String type, boolean share) { if (platform != null && !userSetting.getServerId().equals(platform.getServerId())) { - // 指定了上级平台的推送,则发送到指定的设备,未指定的则全部发送, 接收后各自处理自己的 - CatalogEvent outEvent = new CatalogEvent(this); - outEvent.setChannels(deviceChannels); - outEvent.setType(type); - outEvent.setPlatform(platform); - redisRpcService.catalogEventPublish(platform.getServerId(), outEvent); + log.info("[国标级联] 目录状态推送, 此上级平台由其他服务处理,消息已经忽略"); return; } CatalogEvent outEvent = new CatalogEvent(this); @@ -96,12 +93,11 @@ public class EventPublisher { } outEvent.setChannels(channels); outEvent.setType(type); - outEvent.setPlatform(platform); - applicationEventPublisher.publishEvent(outEvent); - if (platform == null && share) { - // 如果没指定上级平台,则推送消息到所有在线的wvp处理自己含有的平台的目录更新 - redisRpcService.catalogEventPublish(null, outEvent); + if (platform != null) { + outEvent.setPlatform(platform); } + applicationEventPublisher.publishEvent(outEvent); + } public void mobilePositionEventPublish(MobilePosition mobilePosition) { diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEventLister.java b/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEventLister.java index 9226a1ee2..69b55445c 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEventLister.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEventLister.java @@ -1,5 +1,6 @@ package com.genersoft.iot.vmp.gb28181.event.subscribe.catalog; +import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.gb28181.bean.CommonGBChannel; import com.genersoft.iot.vmp.gb28181.bean.Platform; import com.genersoft.iot.vmp.gb28181.bean.SubscribeHolder; @@ -39,25 +40,30 @@ public class CatalogEventLister implements ApplicationListener { @Autowired private SubscribeHolder subscribeHolder; + @Autowired + private UserSetting userSetting; + @Override public void onApplicationEvent(CatalogEvent event) { SubscribeInfo subscribe = null; Platform parentPlatform = null; - - Map> parentPlatformMap = new HashMap<>(); + log.info("[Catalog事件: {}] 通道数量: {}", event.getType(), event.getChannels().size()); + Map> platformMap = new HashMap<>(); Map channelMap = new HashMap<>(); if (event.getPlatform() != null) { parentPlatform = event.getPlatform(); if (parentPlatform.getServerGBId() == null) { + log.info("[Catalog事件: {}] 平台服务国标编码未找到", event.getType()); return; } subscribe = subscribeHolder.getCatalogSubscribe(parentPlatform.getServerGBId()); if (subscribe == null) { + log.info("[Catalog事件: {}] 未订阅目录事件", event.getType()); return; } }else { - List allPlatform = platformService.queryAll(); + List allPlatform = platformService.queryAll(userSetting.getServerId()); // 获取所用订阅 List platforms = subscribeHolder.getAllCatalogSubscribePlatform(allPlatform); if (event.getChannels() != null) { @@ -65,10 +71,14 @@ public class CatalogEventLister implements ApplicationListener { for (CommonGBChannel deviceChannel : event.getChannels()) { List parentPlatformsForGB = platformChannelService.queryPlatFormListByChannelDeviceId( deviceChannel.getGbId(), platforms); - parentPlatformMap.put(deviceChannel.getGbDeviceId(), parentPlatformsForGB); + platformMap.put(deviceChannel.getGbDeviceId(), parentPlatformsForGB); channelMap.put(deviceChannel.getGbDeviceId(), deviceChannel); } + }else { + log.info("[Catalog事件: {}] 未订阅目录事件", event.getType()); } + }else { + log.info("[Catalog事件: {}] 事件内通道数为0", event.getType()); } } switch (event.getType()) { @@ -77,32 +87,32 @@ public class CatalogEventLister implements ApplicationListener { case CatalogEvent.DEL: if (parentPlatform != null) { - List deviceChannelList = new ArrayList<>(); + List channels = new ArrayList<>(); if (event.getChannels() != null) { - deviceChannelList.addAll(event.getChannels()); + channels.addAll(event.getChannels()); } - if (!deviceChannelList.isEmpty()) { - log.info("[Catalog事件: {}]平台:{},影响通道{}个", event.getType(), parentPlatform.getServerGBId(), deviceChannelList.size()); + if (!channels.isEmpty()) { + log.info("[Catalog事件: {}]平台:{},影响通道{}个", event.getType(), parentPlatform.getServerGBId(), channels.size()); try { - sipCommanderFroPlatform.sendNotifyForCatalogOther(event.getType(), parentPlatform, deviceChannelList, subscribe, null); + sipCommanderFroPlatform.sendNotifyForCatalogOther(event.getType(), parentPlatform, channels, subscribe, null); } catch (InvalidArgumentException | ParseException | NoSuchFieldException | SipException | IllegalAccessException e) { log.error("[命令发送失败] 国标级联 Catalog通知: {}", e.getMessage()); } } - }else if (!parentPlatformMap.keySet().isEmpty()) { - for (String gbId : parentPlatformMap.keySet()) { - List parentPlatforms = parentPlatformMap.get(gbId); - if (parentPlatforms != null && !parentPlatforms.isEmpty()) { - for (Platform platform : parentPlatforms) { + }else if (!platformMap.keySet().isEmpty()) { + for (String serverGbId : platformMap.keySet()) { + List platformList = platformMap.get(serverGbId); + if (platformList != null && !platformList.isEmpty()) { + for (Platform platform : platformList) { SubscribeInfo subscribeInfo = subscribeHolder.getCatalogSubscribe(platform.getServerGBId()); if (subscribeInfo == null) { continue; } - log.info("[Catalog事件: {}]平台:{},影响通道{}", event.getType(), platform.getServerGBId(), gbId); + log.info("[Catalog事件: {}]平台:{},影响通道{}", event.getType(), platform.getServerGBId(), serverGbId); List deviceChannelList = new ArrayList<>(); CommonGBChannel deviceChannel = new CommonGBChannel(); - deviceChannel.setGbDeviceId(gbId); + deviceChannel.setGbDeviceId(serverGbId); deviceChannelList.add(deviceChannel); try { sipCommanderFroPlatform.sendNotifyForCatalogOther(event.getType(), platform, deviceChannelList, subscribeInfo, null); @@ -111,6 +121,8 @@ public class CatalogEventLister implements ApplicationListener { log.error("[命令发送失败] 国标级联 Catalog通知: {}", e.getMessage()); } } + }else { + log.info("[Catalog事件: {}] 未找到上级平台: {}", event.getType(), serverGbId); } } } @@ -135,9 +147,9 @@ public class CatalogEventLister implements ApplicationListener { log.error("[命令发送失败] 国标级联 Catalog通知: {}", e.getMessage()); } } - }else if (!parentPlatformMap.keySet().isEmpty()) { - for (String gbId : parentPlatformMap.keySet()) { - List parentPlatforms = parentPlatformMap.get(gbId); + }else if (!platformMap.keySet().isEmpty()) { + for (String gbId : platformMap.keySet()) { + List parentPlatforms = platformMap.get(gbId); if (parentPlatforms != null && !parentPlatforms.isEmpty()) { for (Platform platform : parentPlatforms) { SubscribeInfo subscribeInfo = subscribeHolder.getCatalogSubscribe(platform.getServerGBId()); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/mobilePosition/MobilePositionEventLister.java b/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/mobilePosition/MobilePositionEventLister.java index 3ce1c14e0..7b06f07fc 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/mobilePosition/MobilePositionEventLister.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/mobilePosition/MobilePositionEventLister.java @@ -1,5 +1,6 @@ package com.genersoft.iot.vmp.gb28181.event.subscribe.mobilePosition; +import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.gb28181.bean.CommonGBChannel; import com.genersoft.iot.vmp.gb28181.bean.Platform; import com.genersoft.iot.vmp.gb28181.bean.SubscribeHolder; @@ -37,12 +38,15 @@ public class MobilePositionEventLister implements ApplicationListener allPlatforms = platformService.queryAll(); + List allPlatforms = platformService.queryAll(userSetting.getServerId()); // 获取所用订阅 List platforms = subscribeHolder.getAllMobilePositionSubscribePlatform(allPlatforms); if (platforms.isEmpty()) { diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/IPlatformService.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/IPlatformService.java index 7ae44996f..b0bea7d14 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/IPlatformService.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/IPlatformService.java @@ -80,6 +80,6 @@ public interface IPlatformService { void delete(Integer platformId, CommonCallback callback); - List queryAll(); + List queryAll(String serverId); } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceServiceImpl.java index 38f4e004f..47dbfc931 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceServiceImpl.java @@ -192,6 +192,7 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner { } sync(device); }else { + device.setServerId(userSetting.getServerId()); if(!device.isOnLine()){ device.setOnLine(true); device.setCreateTime(now); @@ -307,15 +308,15 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner { return; } for (Device device : deviceList) { - if (device == null || !device.isOnLine()) { + if (device == null || !device.isOnLine() || !device.getServerId().equals(userSetting.getServerId())) { continue; } if (device.getSubscribeCycleForCatalog() > 0 && !subscribeTaskRunner.containsKey(SubscribeTaskForCatalog.getKey(device))) { - log.info("[订阅丢失] 目录订阅, 编号: {}, 重新发起订阅", device.getDeviceId()); + log.debug("[订阅丢失] 目录订阅, 编号: {}, 重新发起订阅", device.getDeviceId()); addCatalogSubscribe(device, null); } if (device.getSubscribeCycleForMobilePosition() > 0 && !subscribeTaskRunner.containsKey(SubscribeTaskForMobilPosition.getKey(device))) { - log.info("[订阅丢失] 移动位置订阅, 编号: {}, 重新发起订阅", device.getDeviceId()); + log.debug("[订阅丢失] 移动位置订阅, 编号: {}, 重新发起订阅", device.getDeviceId()); addMobilePositionSubscribe(device, null); } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/GbChannelServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/GbChannelServiceImpl.java index 2019a0bdc..faeecf4ba 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/GbChannelServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/GbChannelServiceImpl.java @@ -160,30 +160,26 @@ public class GbChannelServiceImpl implements IGbChannelService { log.warn("[多个通道离线] 通道数量为0,更新失败"); return 0; } - List onlineChannelList = commonGBChannelMapper.queryInListByStatus(commonGBChannelList, "ON"); - if (onlineChannelList.isEmpty()) { - log.info("[多个通道离线] 更新失败, 参数内通道已经离线, 无需更新"); - return 0; - } + log.info("[通道离线] 共 {} 个", commonGBChannelList.size()); int limitCount = 1000; int result = 0; - if (onlineChannelList.size() > limitCount) { - for (int i = 0; i < onlineChannelList.size(); i += limitCount) { + if (commonGBChannelList.size() > limitCount) { + for (int i = 0; i < commonGBChannelList.size(); i += limitCount) { int toIndex = i + limitCount; - if (i + limitCount > onlineChannelList.size()) { - toIndex = onlineChannelList.size(); + if (i + limitCount > commonGBChannelList.size()) { + toIndex = commonGBChannelList.size(); } - result += commonGBChannelMapper.updateStatusForListById(onlineChannelList.subList(i, toIndex), "OFF"); + result += commonGBChannelMapper.updateStatusForListById(commonGBChannelList.subList(i, toIndex), "OFF"); } } else { - result += commonGBChannelMapper.updateStatusForListById(onlineChannelList, "OFF"); + result += commonGBChannelMapper.updateStatusForListById(commonGBChannelList, "OFF"); } if (result > 0) { try { // 发送catalog - eventPublisher.catalogEventPublish(null, onlineChannelList, CatalogEvent.OFF); + eventPublisher.catalogEventPublish(null, commonGBChannelList, CatalogEvent.OFF); } catch (Exception e) { - log.warn("[多个通道离线] 发送失败,数量:{}", onlineChannelList.size(), e); + log.warn("[多个通道离线] 发送失败,数量:{}", commonGBChannelList.size(), e); } } return result; @@ -214,32 +210,25 @@ public class GbChannelServiceImpl implements IGbChannelService { log.warn("[多个通道上线] 通道数量为0,更新失败"); return 0; } - List offlineChannelList = commonGBChannelMapper.queryInListByStatus(commonGBChannelList, "OFF"); - if (offlineChannelList.isEmpty()) { - log.warn("[多个通道上线] 更新失败, 参数内通道已经上线"); - return 0; - } // 批量更新 int limitCount = 1000; int result = 0; - if (offlineChannelList.size() > limitCount) { - for (int i = 0; i < offlineChannelList.size(); i += limitCount) { + if (commonGBChannelList.size() > limitCount) { + for (int i = 0; i < commonGBChannelList.size(); i += limitCount) { int toIndex = i + limitCount; - if (i + limitCount > offlineChannelList.size()) { - toIndex = offlineChannelList.size(); + if (i + limitCount > commonGBChannelList.size()) { + toIndex = commonGBChannelList.size(); } - result += commonGBChannelMapper.updateStatusForListById(offlineChannelList.subList(i, toIndex), "ON"); + result += commonGBChannelMapper.updateStatusForListById(commonGBChannelList.subList(i, toIndex), "ON"); } } else { - result += commonGBChannelMapper.updateStatusForListById(offlineChannelList, "ON"); + result += commonGBChannelMapper.updateStatusForListById(commonGBChannelList, "ON"); } - if (result > 0) { - try { - // 发送catalog - eventPublisher.catalogEventPublish(null, offlineChannelList, CatalogEvent.ON); - } catch (Exception e) { - log.warn("[多个通道上线] 发送失败,数量:{}", offlineChannelList.size(), e); - } + try { + // 发送catalog + eventPublisher.catalogEventPublish(null, commonGBChannelList, CatalogEvent.ON); + } catch (Exception e) { + log.warn("[多个通道上线] 发送失败,数量:{}", commonGBChannelList.size(), e); } return result; diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlatformServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlatformServiceImpl.java index 2c58259b3..085b6421c 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlatformServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlatformServiceImpl.java @@ -63,11 +63,6 @@ import java.util.concurrent.TimeUnit; @Order(value=15) public class PlatformServiceImpl implements IPlatformService, CommandLineRunner { - private final static String REGISTER_KEY_PREFIX = "platform_register_"; - - private final static String REGISTER_FAIL_AGAIN_KEY_PREFIX = "platform_register_fail_again_"; - private final static String KEEPALIVE_KEY_PREFIX = "platform_keepalive_"; - @Autowired private PlatformMapper platformMapper; @@ -133,7 +128,7 @@ public class PlatformServiceImpl implements IPlatformService, CommandLineRunner sendUnRegister(platform, taskInfo.getSipTransactionInfo()); } // 启动时所有平台默认离线 - platformMapper.offlineAll(); + platformMapper.offlineAll(userSetting.getServerId()); } @Scheduled(fixedDelay = 20, timeUnit = TimeUnit.SECONDS) //每3秒执行一次 public void statusLostCheck(){ @@ -199,6 +194,7 @@ public class PlatformServiceImpl implements IPlatformService, CommandLineRunner return; } log.info("[集群] 检测到 {} 已离线", serverId); + redisCatchStorage.removeOfflineWVPInfo(serverId); String chooseServerId = redisCatchStorage.chooseOneServer(serverId); if (!userSetting.getServerId().equals(chooseServerId)){ return; @@ -390,8 +386,7 @@ public class PlatformServiceImpl implements IPlatformService, CommandLineRunner PlatformKeepaliveTask keepaliveTask = new PlatformKeepaliveTask(platform.getServerGBId(), platform.getKeepTimeout() * 1000L, this::keepaliveExpire); statusTaskRunner.addKeepAliveTask(keepaliveTask); - - platformMapper.updateStatus(platform.getId(), true); + platformMapper.updateStatus(platform.getId(), true, userSetting.getServerId()); if (platform.getAutoPushChannel() != null && platform.getAutoPushChannel()) { if (subscribeHolder.getCatalogSubscribe(platform.getServerGBId()) == null) { @@ -481,7 +476,7 @@ public class PlatformServiceImpl implements IPlatformService, CommandLineRunner subscribeHolder.removeCatalogSubscribe(platform.getServerGBId()); subscribeHolder.removeMobilePositionSubscribe(platform.getServerGBId()); - platformMapper.updateStatus(platform.getId(), false); + platformMapper.updateStatus(platform.getId(), false, userSetting.getServerId()); // 停止所有推流 log.info("[平台离线] {}({}), 停止所有推流", platform.getName(), platform.getServerGBId()); @@ -521,7 +516,6 @@ public class PlatformServiceImpl implements IPlatformService, CommandLineRunner gpsMsgInfo = null; } - if (gpsMsgInfo == null && !userSetting.isSendPositionOnDemand()){ gpsMsgInfo = new GPSMsgInfo(); gpsMsgInfo.setId(channel.getGbDeviceId()); @@ -871,7 +865,7 @@ public class PlatformServiceImpl implements IPlatformService, CommandLineRunner } @Override - public List queryAll() { - return platformMapper.queryAll(); + public List queryAll(String serverId) { + return platformMapper.queryByServerId(serverId); } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlayServiceImpl.java index 3bafa6a20..7a07ae6fc 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlayServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlayServiceImpl.java @@ -339,7 +339,7 @@ public class PlayServiceImpl implements IPlayService { InviteInfo inviteInfoInCatch = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, channel.getId()); if (inviteInfoInCatch != null ) { if (inviteInfoInCatch.getStreamInfo() == null) { - // 释放生成的ssrc,使用上一次申请的322 + // 释放生成的ssrc,使用上一次申请的 ssrcFactory.releaseSsrc(mediaServerItem.getId(), ssrc); // 点播发起了但是尚未成功, 仅注册回调等待结果即可 diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java index 26a886b78..bacf50456 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java @@ -143,39 +143,44 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In mediaServerService.releaseSsrc(mediaInfo.getId(), sendRtpItem.getSsrc()); } } - MediaServer mediaServer = mediaServerService.getOne(sendRtpItem.getMediaServerId()); - if (mediaServer != null) { - AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(sendRtpItem.getChannelId()); - if (audioBroadcastCatch != null && audioBroadcastCatch.getSipTransactionInfo().getCallId().equals(callIdHeader.getCallId())) { - // 来自上级平台的停止对讲 - log.info("[停止对讲] 来自上级,平台:{}, 通道:{}", sendRtpItem.getTargetId(), sendRtpItem.getChannelId()); - audioBroadcastManager.del(sendRtpItem.getChannelId()); - } + if (sendRtpItem.getServerId().equals(userSetting.getServerId())) { + MediaServer mediaServer = mediaServerService.getOne(sendRtpItem.getMediaServerId()); + if (mediaServer != null) { + AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(sendRtpItem.getChannelId()); + if (audioBroadcastCatch != null && audioBroadcastCatch.getSipTransactionInfo().getCallId().equals(callIdHeader.getCallId())) { + // 来自上级平台的停止对讲 + log.info("[停止对讲] 来自上级,平台:{}, 通道:{}", sendRtpItem.getTargetId(), sendRtpItem.getChannelId()); + audioBroadcastManager.del(sendRtpItem.getChannelId()); + } - MediaInfo mediaInfo = mediaServerService.getMediaInfo(mediaServer, sendRtpItem.getApp(), streamId); + MediaInfo mediaInfo = mediaServerService.getMediaInfo(mediaServer, sendRtpItem.getApp(), streamId); - if (mediaInfo.getReaderCount() <= 0) { - log.info("[收到bye] {} 无其它观看者,通知设备停止推流", streamId); - if (sendRtpItem.getPlayType().equals(InviteStreamType.PLAY)) { - Device device = deviceService.getDeviceByDeviceId(sendRtpItem.getTargetId()); - if (device == null) { - log.info("[收到bye] {} 通知设备停止推流时未找到设备信息", streamId); - return; - } - DeviceChannel deviceChannel = deviceChannelService.getOneForSourceById(sendRtpItem.getChannelId()); - if (deviceChannel == null) { - log.info("[收到bye] {} 通知设备停止推流时未找到通道信息", streamId); - return; - } - try { - log.info("[停止点播] {}/{}", sendRtpItem.getTargetId(), sendRtpItem.getChannelId()); - cmder.streamByeCmd(device, deviceChannel.getDeviceId(), sendRtpItem.getApp(), sendRtpItem.getStream(), null, null); - } catch (InvalidArgumentException | ParseException | SipException | - SsrcTransactionNotFoundException e) { - log.error("[收到bye] {} 无其它观看者,通知设备停止推流, 发送BYE失败 {}",streamId, e.getMessage()); + if (mediaInfo != null && mediaInfo.getReaderCount() <= 0) { + log.info("[收到bye] {} 无其它观看者,通知设备停止推流", streamId); + if (sendRtpItem.getPlayType().equals(InviteStreamType.PLAY)) { + Device device = deviceService.getDeviceByDeviceId(sendRtpItem.getTargetId()); + if (device == null) { + log.info("[收到bye] {} 通知设备停止推流时未找到设备信息", streamId); + return; + } + DeviceChannel deviceChannel = deviceChannelService.getOneForSourceById(sendRtpItem.getChannelId()); + if (deviceChannel == null) { + log.info("[收到bye] {} 通知设备停止推流时未找到通道信息", streamId); + return; + } + try { + log.info("[停止点播] {}/{}", sendRtpItem.getTargetId(), sendRtpItem.getChannelId()); + cmder.streamByeCmd(device, deviceChannel.getDeviceId(), sendRtpItem.getApp(), sendRtpItem.getStream(), null, null); + } catch (InvalidArgumentException | ParseException | SipException | + SsrcTransactionNotFoundException e) { + log.error("[收到bye] {} 无其它观看者,通知设备停止推流, 发送BYE失败 {}",streamId, e.getMessage()); + } } } } + } else { + // TODO 流再其他wvp上时应该通知这个wvp停止推流和发送BYE + } } // 可能是设备发送的停止 diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/IRedisRpcPlayService.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/IRedisRpcPlayService.java index 174fa6b67..db1035e3e 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/IRedisRpcPlayService.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/IRedisRpcPlayService.java @@ -26,7 +26,7 @@ public interface IRedisRpcPlayService { String frontEndCommand(String serverId, Integer channelId, int cmdCode, int parameter1, int parameter2, int combindCode2); - void playPush(Integer id, ErrorCallback callback); + void playPush(String serverId, Integer id, ErrorCallback callback); StreamInfo playProxy(String serverId, int id); diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisAlarmMsgListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisAlarmMsgListener.java index 894448d84..f524a9218 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisAlarmMsgListener.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisAlarmMsgListener.java @@ -93,11 +93,11 @@ public class RedisAlarmMsgListener implements MessageListener { log.warn("[REDIS的ALARM通知]消息解析失败"); continue; } - String gbId = alarmChannelMessage.getGbId(); + String chanelId = alarmChannelMessage.getGbId(); DeviceAlarm deviceAlarm = new DeviceAlarm(); deviceAlarm.setCreateTime(DateUtil.getNow()); - deviceAlarm.setChannelId(gbId); + deviceAlarm.setChannelId(chanelId); deviceAlarm.setAlarmDescription(alarmChannelMessage.getAlarmDescription()); deviceAlarm.setAlarmMethod("" + alarmChannelMessage.getAlarmSn()); deviceAlarm.setAlarmType("" + alarmChannelMessage.getAlarmType()); @@ -106,7 +106,7 @@ public class RedisAlarmMsgListener implements MessageListener { deviceAlarm.setLongitude(0); deviceAlarm.setLatitude(0); - if (ObjectUtils.isEmpty(gbId)) { + if (ObjectUtils.isEmpty(chanelId)) { if (userSetting.getSendToPlatformsWhenIdLost()) { // 发送给所有的上级 List parentPlatforms = platformService.queryEnablePlatformList(userSetting.getServerId()); @@ -148,24 +148,26 @@ public class RedisAlarmMsgListener implements MessageListener { } } else { // 获取该通道ID是属于设备还是对应的上级平台 - Device device = deviceService.getDeviceBySourceChannelDeviceId(gbId); - List platforms = platformChannelService.queryByPlatformBySharChannelId(gbId); - if (device != null && (platforms == null || platforms.isEmpty())) { + Device device = deviceService.getDeviceBySourceChannelDeviceId(chanelId); + List platforms = platformChannelService.queryByPlatformBySharChannelId(chanelId); + if (device != null && device.getServerId().equals(userSetting.getServerId()) && (platforms == null || platforms.isEmpty())) { try { commander.sendAlarmMessage(device, deviceAlarm); } catch (InvalidArgumentException | SipException | ParseException e) { log.error("[命令发送失败] 发送报警: {}", e.getMessage()); } - } else if (device == null && (platforms != null && !platforms.isEmpty())) { + } else if (device == null && (platforms != null && !platforms.isEmpty() )) { for (Platform platform : platforms) { - try { - commanderForPlatform.sendAlarmMessage(platform, deviceAlarm); - } catch (InvalidArgumentException | SipException | ParseException e) { - log.error("[命令发送失败] 发送报警: {}", e.getMessage()); + if (platform.getServerId().equals(userSetting.getServerId())) { + try { + commanderForPlatform.sendAlarmMessage(platform, deviceAlarm); + } catch (InvalidArgumentException | SipException | ParseException e) { + log.error("[命令发送失败] 发送报警: {}", e.getMessage()); + } } } } else { - log.warn("[REDIS的ALARM通知] 未查询到" + gbId + "所属的平台或设备"); + log.warn("[REDIS的ALARM通知] 未查询到" + chanelId + "所属的平台或设备"); } } } catch (Exception e) { diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamStatusMsgListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamStatusMsgListener.java index d5d2b68c0..e3d4e9670 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamStatusMsgListener.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamStatusMsgListener.java @@ -48,7 +48,7 @@ public class RedisPushStreamStatusMsgListener implements MessageListener, Applic @Override public void onMessage(Message message, byte[] bytes) { - log.info("[REDIS: 流设备状态变化]: {}", new String(message.getBody())); + log.info("[REDIS: 推流设备状态变化]: {}", new String(message.getBody())); taskQueue.offer(message); } @@ -84,11 +84,13 @@ public class RedisPushStreamStatusMsgListener implements MessageListener, Applic if (streamStatusMessage.getOfflineStreams() != null && !streamStatusMessage.getOfflineStreams().isEmpty()) { // 更新部分设备离线 + log.info("[REDIS: 推流设备状态变化] 更新部分设备离线: {}个", streamStatusMessage.getOfflineStreams().size()); streamPushService.offline(streamStatusMessage.getOfflineStreams()); } if (streamStatusMessage.getOnlineStreams() != null && !streamStatusMessage.getOnlineStreams().isEmpty()) { // 更新部分设备上线 + log.info("[REDIS: 推流设备状态变化] 更新部分设备上线: {}个", streamStatusMessage.getOnlineStreams().size()); streamPushService.online(streamStatusMessage.getOnlineStreams()); } } catch (Exception e) { diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/service/RedisRpcPlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/service/RedisRpcPlayServiceImpl.java index 4404a7205..9bdc7fe61 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/service/RedisRpcPlayServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/service/RedisRpcPlayServiceImpl.java @@ -193,8 +193,9 @@ public class RedisRpcPlayServiceImpl implements IRedisRpcPlayService { } @Override - public void playPush(Integer id, ErrorCallback callback) { - RedisRpcRequest request = buildRequest("streamPush/play", id); + public void playPush(String serverId, Integer id, ErrorCallback callback) { + RedisRpcRequest request = buildRequest("streamPush/play", id + ""); + request.setToId(serverId); RedisRpcResponse response = redisRpcConfig.request(request, userSetting.getPlayTimeout(), TimeUnit.SECONDS); if (response == null) { callback.run(ErrorCode.ERROR100.getCode(), ErrorCode.ERROR100.getMsg(), null); diff --git a/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java b/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java index d409acbfb..106cf3aea 100755 --- a/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java @@ -27,6 +27,8 @@ public interface IRedisCatchStorage { */ void updateWVPInfo(ServerInfo serverInfo, int time); + void removeOfflineWVPInfo(String serverId); + /** * 发送推流生成与推流消失消息 * @param jsonObject 消息内容 diff --git a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java index 25dcf4b76..a5d53f55b 100755 --- a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java @@ -84,6 +84,13 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage { redisTemplate.opsForZSet().add(setKey, userSetting.getServerId(), System.currentTimeMillis()); } + @Override + public void removeOfflineWVPInfo(String serverId) { + String setKey = VideoManagerConstants.WVP_SERVER_LIST; + // 首次设置就设置为0, 后续值越小说明越是最近启动的 + redisTemplate.opsForZSet().remove(setKey, serverId); + } + @Override public void sendStreamChangeMsg(String type, JSONObject jsonObject) { String key = VideoManagerConstants.WVP_MSG_STREAM_CHANGE_PREFIX + type; diff --git a/src/main/java/com/genersoft/iot/vmp/streamPush/dao/StreamPushMapper.java b/src/main/java/com/genersoft/iot/vmp/streamPush/dao/StreamPushMapper.java index c77fa4194..2db9b4ef5 100755 --- a/src/main/java/com/genersoft/iot/vmp/streamPush/dao/StreamPushMapper.java +++ b/src/main/java/com/genersoft/iot/vmp/streamPush/dao/StreamPushMapper.java @@ -91,7 +91,7 @@ public interface StreamPushMapper { "(#{item.app}, #{item.stream}) " + "" + ")") - List getListFromRedis(List offlineStreams); + List getListInList(List offlineStreams); @Select("SELECT CONCAT(app,stream) from wvp_stream_push") diff --git a/src/main/java/com/genersoft/iot/vmp/streamPush/service/impl/StreamPushPlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/streamPush/service/impl/StreamPushPlayServiceImpl.java index ec5e030dc..d8adfa3f0 100644 --- a/src/main/java/com/genersoft/iot/vmp/streamPush/service/impl/StreamPushPlayServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/streamPush/service/impl/StreamPushPlayServiceImpl.java @@ -58,7 +58,7 @@ public class StreamPushPlayServiceImpl implements IStreamPushPlayService { Assert.notNull(streamPush, "推流信息未找到"); if (streamPush.isPushing() && !userSetting.getServerId().equals(streamPush.getServerId())) { - redisRpcPlayService.playPush(id, callback); + redisRpcPlayService.playPush(streamPush.getServerId(), id, callback); return; } diff --git a/src/main/java/com/genersoft/iot/vmp/streamPush/service/impl/StreamPushServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/streamPush/service/impl/StreamPushServiceImpl.java index 4c8b8552b..1090d3c58 100755 --- a/src/main/java/com/genersoft/iot/vmp/streamPush/service/impl/StreamPushServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/streamPush/service/impl/StreamPushServiceImpl.java @@ -458,16 +458,27 @@ public class StreamPushServiceImpl implements IStreamPushService { @Override public void offline(List offlineStreams) { // 更新部分设备离线 - List streamPushList = streamPushMapper.getListFromRedis(offlineStreams); + List streamPushList = streamPushMapper.getListInList(offlineStreams); + if (streamPushList.isEmpty()) { + log.info("[推流设备] 设备离线操作未发现可操作数据。"); + return; + } List commonGBChannelList = gbChannelService.queryListByStreamPushList(streamPushList); gbChannelService.offline(commonGBChannelList); } @Override public void online(List onlineStreams) { + if (onlineStreams.isEmpty()) { + log.info("[设备上线] 推流设备列表为空"); + return; + } // 更新部分设备上线streamPushService - List streamPushList = streamPushMapper.getListFromRedis(onlineStreams); + List streamPushList = streamPushMapper.getListInList(onlineStreams); if (streamPushList.isEmpty()) { + for (StreamPushItemFromRedis onlineStream : onlineStreams) { + log.info("[设备上线] 未查询到这些通道: {}/{}", onlineStream.getApp(), onlineStream.getStream()); + } return; } List commonGBChannelList = gbChannelService.queryListByStreamPushList(streamPushList); diff --git a/web_src/src/main.js b/web_src/src/main.js index 8ea7ece1c..7ce9b40f7 100755 --- a/web_src/src/main.js +++ b/web_src/src/main.js @@ -89,15 +89,21 @@ Vue.prototype.$channelTypeList = { new Vue({ beforeCreate: function () { // 获取本平台的服务ID - axios({ - method: 'get', - url: `/api/server/system/configInfo`, - }).then( (res)=> { - if (res.data.code === 0) { - Vue.prototype.$myServerId = res.data.data.addOn.serverId; - } - }).catch( (error)=> { - }); + console.log("获取本平台的服务ID") + if (!this.$myServerId) { + axios({ + method: 'get', + url: `/api/server/system/configInfo`, + }).then( (res)=> { + if (res.data.code === 0) { + console.log(res.data) + console.log("当前服务ID: " + res.data.data.addOn.serverId) + Vue.prototype.$myServerId = res.data.data.addOn.serverId; + } + }).catch( (error)=> { + }); + } + }, router: router, render: h => h(App),