From c10c982954a12dd92a407166d2e911b5ca3d6ec0 Mon Sep 17 00:00:00 2001 From: lin <648540858@qq.com> Date: Fri, 17 Oct 2025 15:26:32 +0800 Subject: [PATCH] =?UTF-8?q?=E4=B8=B4=E6=97=B6=E6=8F=90=E4=BA=A4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../service/impl/DeviceServiceImpl.java | 16 +++--- .../service/impl/GbChannelServiceImpl.java | 53 +++++++------------ .../NotifyRequestForCatalogProcessor.java | 32 +---------- 3 files changed, 26 insertions(+), 75 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceServiceImpl.java index 3628933d0..4189db945 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceServiceImpl.java @@ -11,7 +11,6 @@ import com.genersoft.iot.vmp.gb28181.dao.DeviceChannelMapper; import com.genersoft.iot.vmp.gb28181.dao.DeviceMapper; import com.genersoft.iot.vmp.gb28181.dao.PlatformChannelMapper; import com.genersoft.iot.vmp.gb28181.event.EventPublisher; -import com.genersoft.iot.vmp.gb28181.event.channel.ChannelEvent; import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent; import com.genersoft.iot.vmp.gb28181.service.IDeviceService; import com.genersoft.iot.vmp.gb28181.service.IInviteStreamService; @@ -44,7 +43,6 @@ import jakarta.validation.constraints.NotNull; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.CommandLineRunner; -import org.springframework.context.ApplicationEventPublisher; import org.springframework.core.annotation.Order; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service; @@ -122,9 +120,6 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner { @Autowired private DeviceStatusTaskRunner deviceStatusTaskRunner; - @Autowired - private ApplicationEventPublisher applicationEventPublisher; - private Device getDeviceByDeviceIdFromDb(String deviceId) { return deviceMapper.getDeviceByDeviceId(deviceId); } @@ -428,8 +423,6 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner { deviceChannelMapper.offlineByDeviceId(device.getId()); // 发送通道离线通知 eventPublisher.catalogEventPublish(null, channelList, CatalogEvent.OFF); - // 发送离线通知 - applicationEventPublisher.publishEvent(ChannelEvent.getInstanceForOfflineList(this, channelList)); } private boolean isDevice(String deviceId) { @@ -831,8 +824,13 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner { deviceStatusTaskRunner.removeTask(deviceId); } List commonGBChannels = commonGBChannelMapper.queryByGbDeviceIds(1, List.of(device.getId())); - // 发送删除通知 - applicationEventPublisher.publishEvent(ChannelEvent.getInstanceForDeleteList(this, commonGBChannels)); + + try { + // 发送catalog + eventPublisher.catalogEventPublish(null, commonGBChannels, CatalogEvent.DEL); + } catch (Exception e) { + log.warn("[多个通道删除] 发送失败,数量:{}", commonGBChannels.size(), e); + } platformChannelMapper.delChannelForDeviceId(deviceId); deviceChannelMapper.cleanChannelsByDeviceId(device.getId()); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/GbChannelServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/GbChannelServiceImpl.java index d49be9432..f1416c937 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/GbChannelServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/GbChannelServiceImpl.java @@ -9,7 +9,6 @@ import com.genersoft.iot.vmp.gb28181.dao.GroupMapper; import com.genersoft.iot.vmp.gb28181.dao.PlatformChannelMapper; import com.genersoft.iot.vmp.gb28181.dao.RegionMapper; import com.genersoft.iot.vmp.gb28181.event.EventPublisher; -import com.genersoft.iot.vmp.gb28181.event.channel.ChannelEvent; import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent; import com.genersoft.iot.vmp.gb28181.service.IGbChannelService; import com.genersoft.iot.vmp.gb28181.service.IPlatformChannelService; @@ -21,7 +20,6 @@ import com.github.pagehelper.PageHelper; import com.github.pagehelper.PageInfo; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.context.ApplicationEventPublisher; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import org.springframework.util.ObjectUtils; @@ -53,9 +51,6 @@ public class GbChannelServiceImpl implements IGbChannelService { @Autowired private GroupMapper groupMapper; - @Autowired - private ApplicationEventPublisher applicationEventPublisher; - @Override public CommonGBChannel queryByDeviceId(String gbDeviceId) { return commonGBChannelMapper.queryByDeviceId(gbDeviceId); @@ -73,9 +68,12 @@ public class GbChannelServiceImpl implements IGbChannelService { commonGBChannel.setCreateTime(DateUtil.getNow()); commonGBChannel.setUpdateTime(DateUtil.getNow()); int result = commonGBChannelMapper.insert(commonGBChannel); - // 发送通道新增通知 - // 发送通知 - applicationEventPublisher.publishEvent(ChannelEvent.getInstanceForAdd(this, commonGBChannel)); + try { + // 发送通知 + eventPublisher.catalogEventPublish(null, commonGBChannel, CatalogEvent.ADD); + } catch (Exception e) { + log.warn("[通道移除通知] 发送失败,{}", commonGBChannel.getGbDeviceId(), e); + } return result; } @@ -98,8 +96,6 @@ public class GbChannelServiceImpl implements IGbChannelService { } catch (Exception e) { log.warn("[通道移除通知] 发送失败,{}", channel.getGbDeviceId(), e); } - // 发送通知 - applicationEventPublisher.publishEvent(ChannelEvent.getInstanceForDelete(this, channel)); } return 1; } @@ -117,9 +113,13 @@ public class GbChannelServiceImpl implements IGbChannelService { if (channelListInDb.isEmpty()) { return; } - // 发送通知 - applicationEventPublisher.publishEvent(ChannelEvent.getInstanceForDeleteList(this, channelListInDb)); commonGBChannelMapper.batchDelete(channelListInDb); + try { + // 发送通知 + eventPublisher.catalogEventPublish(null, channelListInDb, CatalogEvent.DEL); + } catch (Exception e) { + log.warn("[通道移除通知] 发送失败", e); + } } @Override @@ -133,9 +133,6 @@ public class GbChannelServiceImpl implements IGbChannelService { int result = commonGBChannelMapper.update(commonGBChannel); if (result > 0) { try { - // 发送通知 - applicationEventPublisher.publishEvent(ChannelEvent.getInstanceForUpdate(this, commonGBChannel)); - // 发送通知 eventPublisher.catalogEventPublish(null, commonGBChannel, CatalogEvent.UPDATE); } catch (Exception e) { @@ -154,8 +151,6 @@ public class GbChannelServiceImpl implements IGbChannelService { int result = commonGBChannelMapper.updateStatusById(commonGBChannel.getGbId(), "OFF"); if (result > 0) { try { - // 发送通知 - applicationEventPublisher.publishEvent(ChannelEvent.getInstanceForOffline(this, commonGBChannel)); // 发送通知 eventPublisher.catalogEventPublish(null, commonGBChannel, CatalogEvent.OFF); } catch (Exception e) { @@ -173,8 +168,6 @@ public class GbChannelServiceImpl implements IGbChannelService { return 0; } log.info("[通道离线] 共 {} 个", commonGBChannelList.size()); - // 发送通知 - applicationEventPublisher.publishEvent(ChannelEvent.getInstanceForOfflineList(this, commonGBChannelList)); int limitCount = 1000; int result = 0; if (commonGBChannelList.size() > limitCount) { @@ -208,8 +201,6 @@ public class GbChannelServiceImpl implements IGbChannelService { int result = commonGBChannelMapper.updateStatusById(commonGBChannel.getGbId(), "ON"); if (result > 0) { try { - // 发送通知 - applicationEventPublisher.publishEvent(ChannelEvent.getInstanceForOnline(this, commonGBChannel)); // 发送通知 eventPublisher.catalogEventPublish(null, commonGBChannel, CatalogEvent.ON); } catch (Exception e) { @@ -241,8 +232,6 @@ public class GbChannelServiceImpl implements IGbChannelService { result += commonGBChannelMapper.updateStatusForListById(commonGBChannelList, "ON"); } try { - // 发送通知 - applicationEventPublisher.publishEvent(ChannelEvent.getInstanceForOnlineList(this, commonGBChannelList)); // 发送catalog eventPublisher.catalogEventPublish(null, commonGBChannelList, CatalogEvent.ON); } catch (Exception e) { @@ -273,8 +262,12 @@ public class GbChannelServiceImpl implements IGbChannelService { } else { result += commonGBChannelMapper.batchAdd(commonGBChannels); } - // 发送通知 - applicationEventPublisher.publishEvent(ChannelEvent.getInstanceForAddList(this, commonGBChannels)); + try { + // 发送catalog + eventPublisher.catalogEventPublish(null, commonGBChannels, CatalogEvent.ADD); + } catch (Exception e) { + log.warn("[多个通道新增] 发送失败,数量:{}", commonGBChannels.size(), e); + } log.warn("[新增多个通道] 通道数量为{},成功保存:{}", commonGBChannels.size(), result); } @@ -301,8 +294,6 @@ public class GbChannelServiceImpl implements IGbChannelService { log.info("[更新多个通道] 通道数量为{},成功保存:{}", commonGBChannels.size(), result); // 发送通过更新通知 try { - // 发送通知 - applicationEventPublisher.publishEvent(ChannelEvent.getInstanceForUpdateList(this, commonGBChannels)); // 发送通知 eventPublisher.catalogEventPublish(null, commonGBChannels, CatalogEvent.UPDATE); } catch (Exception e) { @@ -333,8 +324,6 @@ public class GbChannelServiceImpl implements IGbChannelService { log.warn("[更新多个通道状态] 通道数量为{},成功保存:{}", commonGBChannels.size(), result); // 发送通过更新通知 try { - // 发送通知 - applicationEventPublisher.publishEvent(ChannelEvent.getInstanceForUpdateList(this, commonGBChannels)); // 发送通知 eventPublisher.catalogEventPublish(null, commonGBChannels, CatalogEvent.UPDATE); } catch (Exception e) { @@ -399,10 +388,6 @@ public class GbChannelServiceImpl implements IGbChannelService { CommonGBChannel channelNew = getOne(id); // 发送通过更新通知 try { - - // 发送通知 - applicationEventPublisher.publishEvent(ChannelEvent.getInstanceForUpdate(this, channelNew)); - // 发送通知 eventPublisher.catalogEventPublish(null, channelNew, CatalogEvent.UPDATE); } catch (Exception e) { @@ -589,8 +574,6 @@ public class GbChannelServiceImpl implements IGbChannelService { } // 发送catalog try { - // 发送通知 - applicationEventPublisher.publishEvent(ChannelEvent.getInstanceForUpdateList(this, channelList)); eventPublisher.catalogEventPublish(null, channelList, CatalogEvent.UPDATE); } catch (Exception e) { log.warn("[多个通道业务分组] 发送失败,数量:{}", channelList.size(), e); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForCatalogProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForCatalogProcessor.java index 90d8f8e02..ce61535d6 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForCatalogProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForCatalogProcessor.java @@ -3,7 +3,6 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.gb28181.event.EventPublisher; -import com.genersoft.iot.vmp.gb28181.event.channel.ChannelEvent; import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent; import com.genersoft.iot.vmp.gb28181.service.IDeviceChannelService; import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent; @@ -15,7 +14,6 @@ import lombok.extern.slf4j.Slf4j; import org.dom4j.DocumentException; import org.dom4j.Element; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.context.ApplicationEventPublisher; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import org.springframework.transaction.annotation.Transactional; @@ -51,10 +49,7 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent @Autowired private IDeviceChannelService deviceChannelService; - - @Autowired - private ApplicationEventPublisher applicationEventPublisher; - + // @Scheduled(fixedRate = 2000) //每400毫秒执行一次 // public void showSize(){ // log.warn("[notify-目录订阅] 待处理消息数量: {}", taskQueue.size() ); @@ -151,9 +146,6 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent channel.setStatus("ON"); channelList.add(NotifyCatalogChannel.getInstance(NotifyCatalogChannel.Type.STATUS_CHANGED, channel)); - // 发送通道修改消息 - applicationEventPublisher.publishEvent(ChannelEvent.getInstanceForOnline(this, channel)); - if (userSetting.getDeviceStatusNotify()) { // 发送redis消息 redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), catalogChannelEvent.getChannel().getDeviceId(), true); @@ -167,9 +159,6 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent } else { channel.setStatus("OFF"); channelList.add(NotifyCatalogChannel.getInstance(NotifyCatalogChannel.Type.STATUS_CHANGED, channel)); - - // 发送通道修改消息 - applicationEventPublisher.publishEvent(ChannelEvent.getInstanceForOffline(this, channel)); if (userSetting.getDeviceStatusNotify()) { // 发送redis消息 redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), catalogChannelEvent.getChannel().getDeviceId(), false); @@ -185,9 +174,6 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent channel.setStatus("OFF"); channelList.add(NotifyCatalogChannel.getInstance(NotifyCatalogChannel.Type.STATUS_CHANGED, channel)); - // 发送通道修改消息 - applicationEventPublisher.publishEvent(ChannelEvent.getInstanceForOffline(this, channel)); - if (userSetting.getDeviceStatusNotify()) { // 发送redis消息 redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), catalogChannelEvent.getChannel().getDeviceId(), false); @@ -203,9 +189,6 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent channel.setStatus("OFF"); channelList.add(NotifyCatalogChannel.getInstance(NotifyCatalogChannel.Type.STATUS_CHANGED, channel)); - // 发送通道修改消息 - applicationEventPublisher.publishEvent(ChannelEvent.getInstanceForOffline(this, channel)); - if (userSetting.getDeviceStatusNotify()) { // 发送redis消息 redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), catalogChannelEvent.getChannel().getDeviceId(), false); @@ -224,16 +207,11 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent channel.setUpdateTime(DateUtil.getNow()); channelList.add(NotifyCatalogChannel.getInstance(NotifyCatalogChannel.Type.UPDATE, channel)); - // 发送通道修改消息 - applicationEventPublisher.publishEvent(ChannelEvent.getInstanceForUpdate(this, channel)); - } else { catalogChannelEvent.getChannel().setUpdateTime(DateUtil.getNow()); catalogChannelEvent.getChannel().setCreateTime(DateUtil.getNow()); channelList.add(NotifyCatalogChannel.getInstance(NotifyCatalogChannel.Type.ADD, channel)); - // 发送通道修改消息 - applicationEventPublisher.publishEvent(ChannelEvent.getInstanceForAdd(this, channel)); if (userSetting.getDeviceStatusNotify()) { // 发送redis消息 redisCatchStorage.sendChannelAddOrDelete(device.getDeviceId(), catalogChannelEvent.getChannel().getDeviceId(), true); @@ -246,9 +224,6 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent log.info("[收到删除通道通知] 来自设备: {}, 通道 {}", device.getDeviceId(), catalogChannelEvent.getChannel().getDeviceId()); channelList.add(NotifyCatalogChannel.getInstance(NotifyCatalogChannel.Type.DELETE, channel)); - // 发送通道修改消息 - applicationEventPublisher.publishEvent(ChannelEvent.getInstanceForDelete(this, channel)); - if (userSetting.getDeviceStatusNotify()) { // 发送redis消息 redisCatchStorage.sendChannelAddOrDelete(device.getDeviceId(), catalogChannelEvent.getChannel().getDeviceId(), false); @@ -265,17 +240,12 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent channel.setUpdateTime(DateUtil.getNow()); channel.setUpdateTime(DateUtil.getNow()); channelList.add(NotifyCatalogChannel.getInstance(NotifyCatalogChannel.Type.UPDATE, channel)); - // 发送通道修改消息 - applicationEventPublisher.publishEvent(ChannelEvent.getInstanceForUpdate(this, channel)); } else { catalogChannelEvent.getChannel().setCreateTime(DateUtil.getNow()); catalogChannelEvent.getChannel().setUpdateTime(DateUtil.getNow()); channelList.add(NotifyCatalogChannel.getInstance(NotifyCatalogChannel.Type.ADD, channel)); - // 发送通道修改消息 - applicationEventPublisher.publishEvent(ChannelEvent.getInstanceForAdd(this, channel)); - if (userSetting.getDeviceStatusNotify()) { // 发送redis消息 redisCatchStorage.sendChannelAddOrDelete(device.getDeviceId(), catalogChannelEvent.getChannel().getDeviceId(), true);