refactor(video): @Scheduled 转 xxl-job,对齐 ops 模块定时任务约定

WVP 原生单体采用 @Scheduled 做周期任务,位于 ServiceImpl 上的任务
Spring 用 JDK 代理无法匹配到接口方法,导致启动失败。按 ops 模块
约定(QueueSyncJob 模板),把 6 个 ServiceImpl 的周期任务转为
独立 @XxlJob Job 类;2 个 IMessageHandler 的高频轮询拆为独立
无接口 @Component。

新建 6 个 Job 类(framework/job/):
- InviteStreamCleanupJob (10s)    清理 Redis 错误 Invite 数据
- DeviceSubscribeLostCheckJob (10s) 设备订阅丢失检查
- DeviceStatusLostCheckJob (30s)   设备状态丢失检查
- PlatformStatusLostCheckJob (20s) 平台注册状态检查
- PlatformAutoRegisterJob (2s)     级联平台自动注册监听
- AiEdgeDeviceOfflineCheckJob (90s) AI 边缘设备离线标记

接口变更(让 Job 类通过 JDK 代理正常调用):
- IInviteStreamService 新增 cleanInvalidInviteCache()
- IDeviceService 新增 lostCheckForSubscribe() / lostCheckForStatus()
- IPlatformService 新增 statusLostCheck() / cascadePlatformAutoRegister()
- PlatformServiceImpl.execute() 重命名为 cascadePlatformAutoRegister()

ServiceImpl 调整:
- InviteStream/Device/Platform/AiEdgeDevice ServiceImpl 删除
  @Scheduled 注解,方法体保留
- 清理 @Scheduled / TimeUnit 无用 import

新建 2 个高频 Scheduler @Component(保持 100-200ms 毫秒级轮询):
- AlarmNotifyMessageQueueScheduler (200ms)
- KeepaliveNotifyMessageQueueScheduler (100ms)
这两个消息处理器是 SIP 协议栈内部机制,不适合走 xxl-job 中心调度,
拆到独立无接口 Component 后 Spring 自动走 CGLIB,代理正常。
原 MessageHandler 删除 @Scheduled 注解,executeTaskQueue() 保留
为 public 供新 Scheduler 调用。

配置:本地 xxl.job.enabled=false 已配置(与 ops 对齐)

编译通过(mvn compile BUILD SUCCESS)。

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
lzh
2026-04-22 14:29:51 +08:00
parent 4f0c8f7162
commit d876d0387a
17 changed files with 296 additions and 16 deletions

View File

@@ -10,7 +10,6 @@ import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.viewsh.framework.common.pojo.PageResult;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import java.time.LocalDateTime;
@@ -79,7 +78,6 @@ public class AiEdgeDeviceServiceImpl implements IAiEdgeDeviceService {
}
@Override
@Scheduled(fixedRate = 90000) // 每90秒检查一次
public void checkOffline() {
LocalDateTime now = LocalDateTime.now();
LocalDateTime threshold = LocalDateTime.now().minusSeconds(90);

View File

@@ -0,0 +1,35 @@
package com.viewsh.module.video.framework.job;
import cn.hutool.core.util.StrUtil;
import com.viewsh.module.video.aiot.service.IAiEdgeDeviceService;
import com.xxl.job.core.handler.annotation.XxlJob;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
/**
* AI 边缘设备离线标记 Job
*
* <p>检查超过90秒未上报心跳的边缘设备将其状态标记为离线。
* 建议 xxl-job 配置每90秒执行一次CRON: 0 * * * * ?,即每分钟;或按需自定义)
*
* @author lzh
*/
@Component
@Slf4j
public class AiEdgeDeviceOfflineCheckJob {
@Resource
private IAiEdgeDeviceService aiEdgeDeviceService;
@XxlJob("aiEdgeDeviceOfflineCheckJob")
public String execute() {
try {
aiEdgeDeviceService.checkOffline();
return "AI 边缘设备离线检查完成";
} catch (Exception e) {
log.error("AI 边缘设备离线检查失败", e);
return StrUtil.format("失败: {}", e.getMessage());
}
}
}

View File

@@ -0,0 +1,35 @@
package com.viewsh.module.video.framework.job;
import cn.hutool.core.util.StrUtil;
import com.viewsh.module.video.gb28181.service.IDeviceService;
import com.xxl.job.core.handler.annotation.XxlJob;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
/**
* 设备状态丢失检查 Job
*
* <p>检测设备状态任务是否存活,丢失则执行设备离线。
* 建议 xxl-job 配置每30秒执行一次CRON: 0/30 * * * * ?
*
* @author lzh
*/
@Component
@Slf4j
public class DeviceStatusLostCheckJob {
@Resource
private IDeviceService deviceService;
@XxlJob("deviceStatusLostCheckJob")
public String execute() {
try {
deviceService.lostCheckForStatus();
return "设备状态丢失检查完成";
} catch (Exception e) {
log.error("设备状态丢失检查失败", e);
return StrUtil.format("失败: {}", e.getMessage());
}
}
}

View File

@@ -0,0 +1,35 @@
package com.viewsh.module.video.framework.job;
import cn.hutool.core.util.StrUtil;
import com.viewsh.module.video.gb28181.service.IDeviceService;
import com.xxl.job.core.handler.annotation.XxlJob;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
/**
* 设备目录/移动位置订阅丢失检查 Job
*
* <p>检测目录订阅和移动位置订阅任务是否存活,丢失则重新发起订阅。
* 建议 xxl-job 配置每10秒执行一次CRON: 0/10 * * * * ?
*
* @author lzh
*/
@Component
@Slf4j
public class DeviceSubscribeLostCheckJob {
@Resource
private IDeviceService deviceService;
@XxlJob("deviceSubscribeLostCheckJob")
public String execute() {
try {
deviceService.lostCheckForSubscribe();
return "设备订阅丢失检查完成";
} catch (Exception e) {
log.error("设备订阅丢失检查失败", e);
return StrUtil.format("失败: {}", e.getMessage());
}
}
}

View File

@@ -0,0 +1,35 @@
package com.viewsh.module.video.framework.job;
import cn.hutool.core.util.StrUtil;
import com.viewsh.module.video.gb28181.service.IInviteStreamService;
import com.xxl.job.core.handler.annotation.XxlJob;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
/**
* 清理 Invite 会话缓存中的错误数据 Job
*
* <p>防止错误的 Redis Invite 数据导致点播不可用。
* 建议 xxl-job 配置每10秒执行一次CRON: 0/10 * * * * ?
*
* @author lzh
*/
@Component
@Slf4j
public class InviteStreamCleanupJob {
@Resource
private IInviteStreamService inviteStreamService;
@XxlJob("inviteStreamCleanupJob")
public String execute() {
try {
inviteStreamService.cleanInvalidInviteCache();
return "清理 Invite 缓存成功";
} catch (Exception e) {
log.error("清理 Invite 缓存失败", e);
return StrUtil.format("失败: {}", e.getMessage());
}
}
}

View File

@@ -0,0 +1,35 @@
package com.viewsh.module.video.framework.job;
import cn.hutool.core.util.StrUtil;
import com.viewsh.module.video.gb28181.service.IPlatformService;
import com.xxl.job.core.handler.annotation.XxlJob;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
/**
* 级联平台自动注册监听 Job
*
* <p>监听国标级联所在的 WVP 服务是否正常,异常时选择新节点接管注册。
* 建议 xxl-job 配置每2秒执行一次CRON: 0/2 * * * * ?
*
* @author lzh
*/
@Component
@Slf4j
public class PlatformAutoRegisterJob {
@Resource
private IPlatformService platformService;
@XxlJob("platformAutoRegisterJob")
public String execute() {
try {
platformService.cascadePlatformAutoRegister();
return "级联平台自动注册监听完成";
} catch (Exception e) {
log.error("级联平台自动注册监听失败", e);
return StrUtil.format("失败: {}", e.getMessage());
}
}
}

View File

@@ -0,0 +1,35 @@
package com.viewsh.module.video.framework.job;
import cn.hutool.core.util.StrUtil;
import com.viewsh.module.video.gb28181.service.IPlatformService;
import com.xxl.job.core.handler.annotation.XxlJob;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
/**
* 上级级联平台注册状态丢失检查 Job
*
* <p>检测启用但未注册的平台,发现后重新发起注册。
* 建议 xxl-job 配置每20秒执行一次CRON: 0/20 * * * * ?
*
* @author lzh
*/
@Component
@Slf4j
public class PlatformStatusLostCheckJob {
@Resource
private IPlatformService platformService;
@XxlJob("platformStatusLostCheckJob")
public String execute() {
try {
platformService.statusLostCheck();
return "平台注册状态检查完成";
} catch (Exception e) {
log.error("平台注册状态检查失败", e);
return StrUtil.format("失败: {}", e.getMessage());
}
}
}

View File

@@ -91,6 +91,16 @@ public interface IDeviceService {
List<Device> getAllByStatus(Boolean status);
/**
* 订阅丢失检查:检测目录/移动位置订阅任务是否存活,丢失则重新发起
*/
void lostCheckForSubscribe();
/**
* 设备状态丢失检查:检测设备状态任务是否存活,丢失则执行离线
*/
void lostCheckForStatus();
/**
* 判断是否注册已经失效
* @param device 设备信息

View File

@@ -82,4 +82,9 @@ public interface IInviteStreamService {
* 更新ssrc
*/
InviteInfo updateInviteInfoForSSRC(InviteInfo inviteInfo, String ssrcInResponse);
/**
* 清理 Redis 中错误的 Invite 会话缓存(防止错误数据导致点播不可用)
*/
void cleanInvalidInviteCache();
}

View File

@@ -81,4 +81,14 @@ public interface IPlatformService {
List<Platform> queryAll(String serverId);
/**
* 平台注册状态丢失检查:检测启用但未注册的平台并重新发起注册
*/
void statusLostCheck();
/**
* 监听国标级联服务存活,异常时选择新的 WVP 节点接管
*/
void cascadePlatformAutoRegister();
}

View File

@@ -45,7 +45,6 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.core.annotation.Order;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.Assert;
@@ -432,7 +431,7 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner {
}
// 订阅丢失检查
@Scheduled(fixedDelay = 10, timeUnit = TimeUnit.SECONDS)
@Override
public void lostCheckForSubscribe(){
// 获取所有设备
List<Device> deviceList = redisCatchStorage.getAllDevices();
@@ -455,7 +454,7 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner {
}
// 设备状态丢失检查
@Scheduled(fixedDelay = 30, timeUnit = TimeUnit.SECONDS)
@Override
public void lostCheckForStatus(){
// 获取所有设备
List<Device> deviceList = redisCatchStorage.getAllDevices();

View File

@@ -17,7 +17,6 @@ import org.springframework.data.redis.core.Cursor;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.ScanOptions;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
@@ -332,8 +331,8 @@ public class InviteStreamServiceImpl implements IInviteStreamService {
return inviteInfoInDb;
}
@Scheduled(fixedRate = 10000) //定时检测,清理错误的redis数据,防止因为错误数据导致的点播不可用
public void execute(){
@Override
public void cleanInvalidInviteCache(){
String key = VideoManagerConstants.INVITE_PREFIX;
if(redisTemplate.opsForHash().size(key) == 0) {
return;

View File

@@ -43,7 +43,6 @@ import org.springframework.boot.CommandLineRunner;
import org.springframework.context.event.EventListener;
import org.springframework.core.annotation.Order;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.Assert;
@@ -164,7 +163,7 @@ public class PlatformServiceImpl implements IPlatformService, CommandLineRunner
// 启动时所有平台默认离线
platformMapper.offlineAll(userSetting.getServerId());
}
@Scheduled(fixedDelay = 20, timeUnit = TimeUnit.SECONDS) //每3秒执行一次
@Override
public void statusLostCheck(){
// 每隔20秒检测是否存在启用但是未注册的平台存在则发起注册
// 获取所有在线并且启用的平台
@@ -211,8 +210,8 @@ public class PlatformServiceImpl implements IPlatformService, CommandLineRunner
}
// 定时监听国标级联所进行的WVP服务是否正常 如果异常则选择新的wvp执行
@Scheduled(fixedDelay = 2, timeUnit = TimeUnit.SECONDS) //每3秒执行一次
public void execute(){
@Override
public void cascadePlatformAutoRegister(){
if (!userSetting.isAutoRegisterPlatform()) {
return;
}

View File

@@ -20,7 +20,6 @@ import lombok.extern.slf4j.Slf4j;
import org.dom4j.Element;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils;
@@ -81,7 +80,6 @@ public class AlarmNotifyMessageHandler extends SIPRequestProcessorParent impleme
taskQueue.offer(new SipMsgInfo(evt, device, rootElement));
}
@Scheduled(fixedDelay = 200)
public void executeTaskQueue() {
if (taskQueue.isEmpty()) {
return;

View File

@@ -0,0 +1,27 @@
package com.viewsh.module.video.gb28181.transmit.event.request.impl.message.notify.cmd;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
/**
* 报警消息队列消费调度器
*
* <p>高频 SIP 消息队列消费200ms 轮询),不适合 xxl-job 调度,
* 保持本机 @Scheduled 轮询;独立为无接口的 @Component 以避免 JDK 代理问题。
*
* @author lzh
*/
@Component
@Slf4j
public class AlarmNotifyMessageQueueScheduler {
@Autowired
private AlarmNotifyMessageHandler alarmNotifyMessageHandler;
@Scheduled(fixedDelay = 200)
public void consume() {
alarmNotifyMessageHandler.executeTaskQueue();
}
}

View File

@@ -19,7 +19,6 @@ import lombok.extern.slf4j.Slf4j;
import org.dom4j.Element;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import javax.sip.InvalidArgumentException;
@@ -72,7 +71,6 @@ public class KeepaliveNotifyMessageHandler extends SIPRequestProcessorParent imp
taskQueue.offer(new SipMsgInfo(evt, device, rootElement));
}
@Scheduled(fixedDelay = 100)
public void executeTaskQueue() {
if (taskQueue.isEmpty()) {
return;

View File

@@ -0,0 +1,27 @@
package com.viewsh.module.video.gb28181.transmit.event.request.impl.message.notify.cmd;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
/**
* 心跳消息队列消费调度器
*
* <p>高频 SIP 消息队列消费100ms 轮询),不适合 xxl-job 调度,
* 保持本机 @Scheduled 轮询;独立为无接口的 @Component 以避免 JDK 代理问题。
*
* @author lzh
*/
@Component
@Slf4j
public class KeepaliveNotifyMessageQueueScheduler {
@Autowired
private KeepaliveNotifyMessageHandler keepaliveNotifyMessageHandler;
@Scheduled(fixedDelay = 100)
public void consume() {
keepaliveNotifyMessageHandler.executeTaskQueue();
}
}