diff --git a/viewsh-module-video/viewsh-module-video-server/src/main/java/com/viewsh/module/video/gb28181/transmit/event/request/impl/message/notify/cmd/AlarmNotifyMessageQueueScheduler.java b/viewsh-module-video/viewsh-module-video-server/src/main/java/com/viewsh/module/video/gb28181/transmit/event/request/impl/message/notify/cmd/AlarmNotifyMessageQueueScheduler.java index b87b9660..6bd8f0c8 100644 --- a/viewsh-module-video/viewsh-module-video-server/src/main/java/com/viewsh/module/video/gb28181/transmit/event/request/impl/message/notify/cmd/AlarmNotifyMessageQueueScheduler.java +++ b/viewsh-module-video/viewsh-module-video-server/src/main/java/com/viewsh/module/video/gb28181/transmit/event/request/impl/message/notify/cmd/AlarmNotifyMessageQueueScheduler.java @@ -2,19 +2,25 @@ package com.viewsh.module.video.gb28181.transmit.event.request.impl.message.noti import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; /** * 报警消息队列消费调度器 * - *

高频 SIP 消息队列消费(200ms 轮询),不适合 xxl-job 调度, - * 保持本机 @Scheduled 轮询;独立为无接口的 @Component 以避免 JDK 代理问题。 + *

高频 SIP 消息队列消费(200ms 轮询),不适合 xxl-job 调度,保持本机 @Scheduled 轮询。 + * 独立为无接口的 @Component:避免业务 Handler 被 Spring AOP(CGLIB/JDK)代理后, + * 在 @EventListener / @Async 等场景下出现找不到方法或重复包装的问题。

+ * + *

通过 {@code video.sip-queue.enabled} 统一开关(默认开启)。关闭后消息队列会在内存堆积, + * 运维侧需确认有监控告警兜底,否则不要关。

* * @author lzh */ @Component @Slf4j +@ConditionalOnProperty(prefix = "video.sip-queue", name = "enabled", havingValue = "true", matchIfMissing = true) public class AlarmNotifyMessageQueueScheduler { @Autowired diff --git a/viewsh-module-video/viewsh-module-video-server/src/main/java/com/viewsh/module/video/gb28181/transmit/event/request/impl/message/notify/cmd/KeepaliveNotifyMessageQueueScheduler.java b/viewsh-module-video/viewsh-module-video-server/src/main/java/com/viewsh/module/video/gb28181/transmit/event/request/impl/message/notify/cmd/KeepaliveNotifyMessageQueueScheduler.java index 32d39b18..4fdb2f0c 100644 --- a/viewsh-module-video/viewsh-module-video-server/src/main/java/com/viewsh/module/video/gb28181/transmit/event/request/impl/message/notify/cmd/KeepaliveNotifyMessageQueueScheduler.java +++ b/viewsh-module-video/viewsh-module-video-server/src/main/java/com/viewsh/module/video/gb28181/transmit/event/request/impl/message/notify/cmd/KeepaliveNotifyMessageQueueScheduler.java @@ -2,19 +2,25 @@ package com.viewsh.module.video.gb28181.transmit.event.request.impl.message.noti import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; /** * 心跳消息队列消费调度器 * - *

高频 SIP 消息队列消费(100ms 轮询),不适合 xxl-job 调度, - * 保持本机 @Scheduled 轮询;独立为无接口的 @Component 以避免 JDK 代理问题。 + *

高频 SIP 消息队列消费(100ms 轮询),不适合 xxl-job 调度,保持本机 @Scheduled 轮询。 + * 独立为无接口的 @Component:避免业务 Handler 被 Spring AOP(CGLIB/JDK)代理后, + * 在 @EventListener / @Async 等场景下出现找不到方法或重复包装的问题。

+ * + *

通过 {@code video.sip-queue.enabled} 统一开关(默认开启)。关闭后消息队列会在内存堆积, + * 运维侧需确认有监控告警兜底,否则不要关。

* * @author lzh */ @Component @Slf4j +@ConditionalOnProperty(prefix = "video.sip-queue", name = "enabled", havingValue = "true", matchIfMissing = true) public class KeepaliveNotifyMessageQueueScheduler { @Autowired diff --git a/viewsh-module-video/viewsh-module-video-server/src/main/java/com/viewsh/module/video/gb28181/transmit/event/request/impl/message/response/cmd/CatalogResponseMessageHandler.java b/viewsh-module-video/viewsh-module-video-server/src/main/java/com/viewsh/module/video/gb28181/transmit/event/request/impl/message/response/cmd/CatalogResponseMessageHandler.java index 9a9cb040..9eff333f 100644 --- a/viewsh-module-video/viewsh-module-video-server/src/main/java/com/viewsh/module/video/gb28181/transmit/event/request/impl/message/response/cmd/CatalogResponseMessageHandler.java +++ b/viewsh-module-video/viewsh-module-video-server/src/main/java/com/viewsh/module/video/gb28181/transmit/event/request/impl/message/response/cmd/CatalogResponseMessageHandler.java @@ -16,7 +16,7 @@ import org.dom4j.DocumentException; import org.dom4j.Element; import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.context.annotation.Lazy; import org.springframework.stereotype.Component; import org.springframework.transaction.annotation.Transactional; @@ -60,6 +60,16 @@ public class CatalogResponseMessageHandler extends SIPRequestProcessorParent imp @Autowired private SipConfig sipConfig; + /** + * 通过 @Lazy 注入自身代理,用来在 {@link #executeTaskQueue()} 内部调用 {@link #saveData(Device, int)} + * 时走 Spring AOP 代理,从而让 saveData 上的 @Transactional 真正生效。 + *

背景:executeTaskQueue → this.saveData(...) 是典型的 Spring 自调用,this 不是代理对象, + * 事务注解会静默失效。用 self.saveData(...) 才能触发事务拦截器开启 / 提交 / 回滚事务。

+ */ + @Autowired + @Lazy + private CatalogResponseMessageHandler self; + @Override public void afterPropertiesSet() throws Exception { responseMessageHandler.addHandler(cmdType, this); @@ -76,8 +86,12 @@ public class CatalogResponseMessageHandler extends SIPRequestProcessorParent imp } } - @Scheduled(fixedDelay = 50) - @Transactional + /** + * 消费目录同步消息队列。 + *

由独立的 {@link CatalogResponseMessageQueueScheduler} 以 50ms fixedDelay 驱动, + * 不在方法上加 {@link Transactional}:空队列时立即 return,避免每 50ms 一次的空事务浪费连接; + * 真正的落库事务边界收敛到 {@link #saveData(Device, int)}。

+ */ public void executeTaskQueue(){ if (taskQueue.isEmpty()) { return; @@ -189,7 +203,8 @@ public class CatalogResponseMessageHandler extends SIPRequestProcessorParent imp && catalogDataCatch.size(deviceId, sn) == catalogDataCatch.sumNum(deviceId, sn)) { // 数据已经完整接收, 此时可能存在某个设备离线变上线的情况,但是考虑到性能,此处不做处理, // 目前支持设备通道上线通知时和设备上线时向上级通知 - boolean resetChannelsResult = saveData(take.getDevice(), sn); + // 必须走 self(代理对象),this.saveData 是自调用,@Transactional 不生效 + boolean resetChannelsResult = self.saveData(take.getDevice(), sn); if (!resetChannelsResult) { String errorMsg = "接收成功,写入失败,共" + catalogDataCatch.sumNum(deviceId, sn) + "条,已接收" + catalogDataCatch.getDeviceChannelList(take.getDevice().getDeviceId(), sn).size() + "条"; catalogDataCatch.setChannelSyncEnd(deviceId, sn, errorMsg); diff --git a/viewsh-module-video/viewsh-module-video-server/src/main/java/com/viewsh/module/video/gb28181/transmit/event/request/impl/message/response/cmd/CatalogResponseMessageQueueScheduler.java b/viewsh-module-video/viewsh-module-video-server/src/main/java/com/viewsh/module/video/gb28181/transmit/event/request/impl/message/response/cmd/CatalogResponseMessageQueueScheduler.java new file mode 100644 index 00000000..8ea3a534 --- /dev/null +++ b/viewsh-module-video/viewsh-module-video-server/src/main/java/com/viewsh/module/video/gb28181/transmit/event/request/impl/message/response/cmd/CatalogResponseMessageQueueScheduler.java @@ -0,0 +1,38 @@ +package com.viewsh.module.video.gb28181.transmit.event.request.impl.message.response.cmd; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Component; + +/** + * 目录响应消息队列消费调度器 + * + *

高频 SIP 目录消息队列消费(50ms 轮询),不适合 xxl-job 调度,保持本机 @Scheduled 轮询。 + * 独立为无接口的 @Component:避免业务 Handler 被 Spring AOP(CGLIB/JDK)代理后, + * 在 @EventListener / @Async 等场景下出现找不到方法或重复包装的问题。

+ * + *

通过 {@code video.sip-queue.enabled} 统一开关(默认开启)。关闭后消息队列会在内存堆积, + * 运维侧需确认有监控告警兜底,否则不要关。

+ * + *

不再在消费入口加 @Transactional:空队列 50ms 触发一次,加事务会浪费连接池。 + * 真正的落库事务边界下沉到 + * {@link CatalogResponseMessageHandler#saveData(com.viewsh.module.video.gb28181.bean.Device, int)}, + * 由 @Lazy self 代理确保自调用时事务仍然生效。

+ * + * @author lzh + */ +@Component +@Slf4j +@ConditionalOnProperty(prefix = "video.sip-queue", name = "enabled", havingValue = "true", matchIfMissing = true) +public class CatalogResponseMessageQueueScheduler { + + @Autowired + private CatalogResponseMessageHandler catalogResponseMessageHandler; + + @Scheduled(fixedDelay = 50) + public void consume() { + catalogResponseMessageHandler.executeTaskQueue(); + } +}