refactor(video): SIP 高频消息队列消费拆独立 Scheduler + 修 Catalog 事务自调用
背景:
- 原先 Alarm / Keepalive / Catalog 三个业务 Handler 自带 @Scheduled,
@Scheduled 会让 Spring 对业务类生成 CGLIB 代理,
与 @EventListener / @Async 等场景叠加时容易出现"找不到方法"/
重复包装等怪异行为。
- CatalogResponseMessageHandler.executeTaskQueue 上标 @Transactional
会导致空队列 50ms 触发一次空事务,浪费连接池;与此同时
this.saveData(...) 是自调用,saveData 上的 @Transactional
又根本不生效,事务语义双重翻车。
本次改动:
- 新增 CatalogResponseMessageQueueScheduler 无接口 @Component,
@Scheduled(fixedDelay=50) 驱动业务 Handler 的 executeTaskQueue。
- Alarm / Keepalive 两个 QueueScheduler 补 @ConditionalOnProperty
(video.sip-queue.enabled, matchIfMissing=true),三个调度器统一
开关;注释对齐,标明关闭后消息会在内存堆积、需运维兜底。
- CatalogResponseMessageHandler:
· 去掉 executeTaskQueue 上的 @Scheduled + @Transactional,
入口处 taskQueue.isEmpty() 直接 return,不再开空事务;
· @Autowired @Lazy 注入自身代理 self,把 this.saveData(...)
改成 self.saveData(...),让 saveData 上的 @Transactional
真正生效(MyBatis 落库 + 区域/分组批写回到同一事务)。
This commit is contained in:
@@ -2,19 +2,25 @@ package com.viewsh.module.video.gb28181.transmit.event.request.impl.message.noti
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||
import org.springframework.scheduling.annotation.Scheduled;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
/**
|
||||
* 报警消息队列消费调度器
|
||||
*
|
||||
* <p>高频 SIP 消息队列消费(200ms 轮询),不适合 xxl-job 调度,
|
||||
* 保持本机 @Scheduled 轮询;独立为无接口的 @Component 以避免 JDK 代理问题。
|
||||
* <p>高频 SIP 消息队列消费(200ms 轮询),不适合 xxl-job 调度,保持本机 @Scheduled 轮询。
|
||||
* 独立为无接口的 @Component:避免业务 Handler 被 Spring AOP(CGLIB/JDK)代理后,
|
||||
* 在 @EventListener / @Async 等场景下出现找不到方法或重复包装的问题。</p>
|
||||
*
|
||||
* <p>通过 {@code video.sip-queue.enabled} 统一开关(默认开启)。关闭后消息队列会在内存堆积,
|
||||
* 运维侧需确认有监控告警兜底,否则不要关。</p>
|
||||
*
|
||||
* @author lzh
|
||||
*/
|
||||
@Component
|
||||
@Slf4j
|
||||
@ConditionalOnProperty(prefix = "video.sip-queue", name = "enabled", havingValue = "true", matchIfMissing = true)
|
||||
public class AlarmNotifyMessageQueueScheduler {
|
||||
|
||||
@Autowired
|
||||
|
||||
@@ -2,19 +2,25 @@ package com.viewsh.module.video.gb28181.transmit.event.request.impl.message.noti
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||
import org.springframework.scheduling.annotation.Scheduled;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
/**
|
||||
* 心跳消息队列消费调度器
|
||||
*
|
||||
* <p>高频 SIP 消息队列消费(100ms 轮询),不适合 xxl-job 调度,
|
||||
* 保持本机 @Scheduled 轮询;独立为无接口的 @Component 以避免 JDK 代理问题。
|
||||
* <p>高频 SIP 消息队列消费(100ms 轮询),不适合 xxl-job 调度,保持本机 @Scheduled 轮询。
|
||||
* 独立为无接口的 @Component:避免业务 Handler 被 Spring AOP(CGLIB/JDK)代理后,
|
||||
* 在 @EventListener / @Async 等场景下出现找不到方法或重复包装的问题。</p>
|
||||
*
|
||||
* <p>通过 {@code video.sip-queue.enabled} 统一开关(默认开启)。关闭后消息队列会在内存堆积,
|
||||
* 运维侧需确认有监控告警兜底,否则不要关。</p>
|
||||
*
|
||||
* @author lzh
|
||||
*/
|
||||
@Component
|
||||
@Slf4j
|
||||
@ConditionalOnProperty(prefix = "video.sip-queue", name = "enabled", havingValue = "true", matchIfMissing = true)
|
||||
public class KeepaliveNotifyMessageQueueScheduler {
|
||||
|
||||
@Autowired
|
||||
|
||||
@@ -16,7 +16,7 @@ import org.dom4j.DocumentException;
|
||||
import org.dom4j.Element;
|
||||
import org.springframework.beans.factory.InitializingBean;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.scheduling.annotation.Scheduled;
|
||||
import org.springframework.context.annotation.Lazy;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.transaction.annotation.Transactional;
|
||||
|
||||
@@ -60,6 +60,16 @@ public class CatalogResponseMessageHandler extends SIPRequestProcessorParent imp
|
||||
@Autowired
|
||||
private SipConfig sipConfig;
|
||||
|
||||
/**
|
||||
* 通过 @Lazy 注入自身代理,用来在 {@link #executeTaskQueue()} 内部调用 {@link #saveData(Device, int)}
|
||||
* 时走 Spring AOP 代理,从而让 saveData 上的 @Transactional 真正生效。
|
||||
* <p>背景:executeTaskQueue → this.saveData(...) 是典型的 Spring 自调用,this 不是代理对象,
|
||||
* 事务注解会静默失效。用 self.saveData(...) 才能触发事务拦截器开启 / 提交 / 回滚事务。</p>
|
||||
*/
|
||||
@Autowired
|
||||
@Lazy
|
||||
private CatalogResponseMessageHandler self;
|
||||
|
||||
@Override
|
||||
public void afterPropertiesSet() throws Exception {
|
||||
responseMessageHandler.addHandler(cmdType, this);
|
||||
@@ -76,8 +86,12 @@ public class CatalogResponseMessageHandler extends SIPRequestProcessorParent imp
|
||||
}
|
||||
}
|
||||
|
||||
@Scheduled(fixedDelay = 50)
|
||||
@Transactional
|
||||
/**
|
||||
* 消费目录同步消息队列。
|
||||
* <p>由独立的 {@link CatalogResponseMessageQueueScheduler} 以 50ms fixedDelay 驱动,
|
||||
* 不在方法上加 {@link Transactional}:空队列时立即 return,避免每 50ms 一次的空事务浪费连接;
|
||||
* 真正的落库事务边界收敛到 {@link #saveData(Device, int)}。</p>
|
||||
*/
|
||||
public void executeTaskQueue(){
|
||||
if (taskQueue.isEmpty()) {
|
||||
return;
|
||||
@@ -189,7 +203,8 @@ public class CatalogResponseMessageHandler extends SIPRequestProcessorParent imp
|
||||
&& catalogDataCatch.size(deviceId, sn) == catalogDataCatch.sumNum(deviceId, sn)) {
|
||||
// 数据已经完整接收, 此时可能存在某个设备离线变上线的情况,但是考虑到性能,此处不做处理,
|
||||
// 目前支持设备通道上线通知时和设备上线时向上级通知
|
||||
boolean resetChannelsResult = saveData(take.getDevice(), sn);
|
||||
// 必须走 self(代理对象),this.saveData 是自调用,@Transactional 不生效
|
||||
boolean resetChannelsResult = self.saveData(take.getDevice(), sn);
|
||||
if (!resetChannelsResult) {
|
||||
String errorMsg = "接收成功,写入失败,共" + catalogDataCatch.sumNum(deviceId, sn) + "条,已接收" + catalogDataCatch.getDeviceChannelList(take.getDevice().getDeviceId(), sn).size() + "条";
|
||||
catalogDataCatch.setChannelSyncEnd(deviceId, sn, errorMsg);
|
||||
|
||||
@@ -0,0 +1,38 @@
|
||||
package com.viewsh.module.video.gb28181.transmit.event.request.impl.message.response.cmd;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||
import org.springframework.scheduling.annotation.Scheduled;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
/**
|
||||
* 目录响应消息队列消费调度器
|
||||
*
|
||||
* <p>高频 SIP 目录消息队列消费(50ms 轮询),不适合 xxl-job 调度,保持本机 @Scheduled 轮询。
|
||||
* 独立为无接口的 @Component:避免业务 Handler 被 Spring AOP(CGLIB/JDK)代理后,
|
||||
* 在 @EventListener / @Async 等场景下出现找不到方法或重复包装的问题。</p>
|
||||
*
|
||||
* <p>通过 {@code video.sip-queue.enabled} 统一开关(默认开启)。关闭后消息队列会在内存堆积,
|
||||
* 运维侧需确认有监控告警兜底,否则不要关。</p>
|
||||
*
|
||||
* <p>不再在消费入口加 @Transactional:空队列 50ms 触发一次,加事务会浪费连接池。
|
||||
* 真正的落库事务边界下沉到
|
||||
* {@link CatalogResponseMessageHandler#saveData(com.viewsh.module.video.gb28181.bean.Device, int)},
|
||||
* 由 @Lazy self 代理确保自调用时事务仍然生效。</p>
|
||||
*
|
||||
* @author lzh
|
||||
*/
|
||||
@Component
|
||||
@Slf4j
|
||||
@ConditionalOnProperty(prefix = "video.sip-queue", name = "enabled", havingValue = "true", matchIfMissing = true)
|
||||
public class CatalogResponseMessageQueueScheduler {
|
||||
|
||||
@Autowired
|
||||
private CatalogResponseMessageHandler catalogResponseMessageHandler;
|
||||
|
||||
@Scheduled(fixedDelay = 50)
|
||||
public void consume() {
|
||||
catalogResponseMessageHandler.executeTaskQueue();
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user