From 6b50254a96ece7d91fd36fbcbd553a9d4c7a1076 Mon Sep 17 00:00:00 2001
From: lzh
Date: Thu, 23 Apr 2026 15:03:26 +0800
Subject: [PATCH] =?UTF-8?q?feat(video):=20=E9=80=82=E9=85=8D=E5=A4=9A?=
=?UTF-8?q?=E7=A7=9F=E6=88=B7=E4=B8=8A=E4=B8=8B=E6=96=87=E9=80=8F=E4=BC=A0?=
=?UTF-8?q?=20+=20=E5=AE=9A=E6=97=B6=E4=BB=BB=E5=8A=A1=20TTL=20=E8=A3=85?=
=?UTF-8?q?=E9=A5=B0=20+=20=E7=94=9F=E5=91=BD=E5=91=A8=E6=9C=9F=E6=94=B6?=
=?UTF-8?q?=E5=8F=A3?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
问题背景:
- WVP 原生代码的 SIP / @Scheduled / Hook EventListener 都跑在没有
TenantContextHolder / ProjectContextHolder 的线程上,接入多租户
框架后会漏 tenant_id / project_id 过滤或抛 NPE。
- 框架默认 @EnableAsync 走 JDK 动态代理,但 WVP 代码大量"按具体类
注入 / 按具体类查方法",JDK 代理下会 cast/查方法失败。
本次改动:
- VideoServerApplication 排除框架 ViewshAsyncAutoConfiguration,
由 ThreadPoolTaskConfig 本地 @EnableAsync(proxyTargetClass=true)
+ TtlRunnable 装饰器接管(从框架抄一份 BeanPostProcessor)。
- DynamicTask 的 scheduleAtFixedRate / schedule 入口都用 TtlRunnable
包一层,把调用线程的 TTL 快照透传到调度线程;新增 @PreDestroy
在容器关停时取消 future、shutdown scheduler、记录耗时,
避免关停阶段调度线程继续借 DataSource 触发噪音日志。
- 新增 VideoContextUtils.executeIgnoreTenantAndProject,
统一封装 TenantUtils.executeIgnore(ProjectUtils.executeIgnore(…))
的嵌套模板,覆盖启动初始化 / 定时任务 / Hook 事件 / Redis 消息
四种场景共 8 个调用点。
- MobilePositionServiceImpl 拆出 @Transactional persistPositions,
通过 @Lazy self 代理调用,避免外层方法 this 自调用让事务失效;
外层只做 Redis 拉队列 + 空队列短路,不再开空事务。
- StreamProxyServiceImpl / StreamPushServiceImpl 的 @EventListener
在业务体内包裹 executeIgnoreTenantAndProject,保证 ZLM Hook
到来时仍能跨租户落库。
---
.../module/video/VideoServerApplication.java | 17 ++-
.../video/framework/config/DynamicTask.java | 38 ++++-
.../config/ThreadPoolTaskConfig.java | 28 ++++
.../framework/util/VideoContextUtils.java | 42 +++++
.../service/impl/DeviceServiceImpl.java | 5 +
.../service/impl/GbChannelServiceImpl.java | 7 +
.../service/impl/PlatformServiceImpl.java | 6 +
.../impl/MobilePositionServiceImpl.java | 37 ++++-
.../service/impl/RecordPlanServiceImpl.java | 6 +
.../RedisPushStreamStatusMsgListener.java | 8 +
.../service/impl/StreamProxyServiceImpl.java | 43 +++---
.../service/impl/StreamPushServiceImpl.java | 144 +++++++++---------
12 files changed, 283 insertions(+), 98 deletions(-)
create mode 100644 viewsh-module-video/viewsh-module-video-server/src/main/java/com/viewsh/module/video/framework/util/VideoContextUtils.java
diff --git a/viewsh-module-video/viewsh-module-video-server/src/main/java/com/viewsh/module/video/VideoServerApplication.java b/viewsh-module-video/viewsh-module-video-server/src/main/java/com/viewsh/module/video/VideoServerApplication.java
index 7314f13b..7b24b418 100644
--- a/viewsh-module-video/viewsh-module-video-server/src/main/java/com/viewsh/module/video/VideoServerApplication.java
+++ b/viewsh-module-video/viewsh-module-video-server/src/main/java/com/viewsh/module/video/VideoServerApplication.java
@@ -1,12 +1,27 @@
package com.viewsh.module.video;
+import com.viewsh.framework.quartz.config.ViewshAsyncAutoConfiguration;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* Video 视频模块的启动类
+ *
+ * 排除框架的 {@link ViewshAsyncAutoConfiguration},因为它用的是默认
+ * {@code @EnableAsync}(JDK 动态代理)。在 WVP 迁移过来的 SIP/EventListener/@Async 代码里,
+ * 大量 Bean 是按"具体类注入 + 按具体类查方法"的写法(而不是面向接口),
+ * JDK 代理会报 "cannot be cast to concrete class" / 找不到方法等异常。
+ *
+ * 改由 {@link com.viewsh.module.video.framework.config.ThreadPoolTaskConfig} 里的
+ * {@code @EnableAsync(proxyTargetClass = true)} + TtlRunnable 装饰器接管,
+ * 同时那里复制了框架的 threadPoolTaskExecutorBeanPostProcessor 逻辑。
+ *
+ * TODO(框架侧):在 {@link ViewshAsyncAutoConfiguration} 上加
+ * {@code @ConditionalOnProperty} 开关(例如 {@code viewsh.async.enabled})并把
+ * {@code @EnableAsync} 改成 {@code proxyTargetClass = true},
+ * 这样 video 模块只需要关一下开关,不用在业务模块里抄代码。
*/
-@SpringBootApplication
+@SpringBootApplication(exclude = ViewshAsyncAutoConfiguration.class)
public class VideoServerApplication {
public static void main(String[] args) {
diff --git a/viewsh-module-video/viewsh-module-video-server/src/main/java/com/viewsh/module/video/framework/config/DynamicTask.java b/viewsh-module-video/viewsh-module-video-server/src/main/java/com/viewsh/module/video/framework/config/DynamicTask.java
index d8d3122e..1f2f557b 100644
--- a/viewsh-module-video/viewsh-module-video-server/src/main/java/com/viewsh/module/video/framework/config/DynamicTask.java
+++ b/viewsh-module-video/viewsh-module-video-server/src/main/java/com/viewsh/module/video/framework/config/DynamicTask.java
@@ -1,5 +1,6 @@
package com.viewsh.module.video.framework.config;
+import com.alibaba.ttl.TtlRunnable;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.ObjectUtils;
import org.springframework.scheduling.annotation.Scheduled;
@@ -7,6 +8,7 @@ import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.stereotype.Component;
import jakarta.annotation.PostConstruct;
+import jakarta.annotation.PreDestroy;
import java.time.Instant;
import java.util.Date;
import java.util.Map;
@@ -38,6 +40,32 @@ public class DynamicTask {
threadPoolTaskScheduler.initialize();
}
+ /**
+ * 上下文关闭时显式 shutdown 调度器。
+ * threadPoolTaskScheduler 是 @PostConstruct 手工 new 出来的(不是 Spring Bean),
+ * 所以 setWaitForTasksToCompleteOnShutdown / setAwaitTerminationSeconds 不会被 Spring 自动触发。
+ * 如果不加这段,关闭后本调度器的任务仍会继续调度数据库查询,
+ * 而 DataSource 已先关闭,产生 CannotGetJdbcConnectionException 噪音日志。
+ */
+ @PreDestroy
+ public void destroy() {
+ if (threadPoolTaskScheduler == null) {
+ return;
+ }
+ int pending = futureMap.size();
+ log.info("[动态任务] 关闭调度器,待清理任务数 {}", pending);
+ long startNanos = System.nanoTime();
+ // 先取消所有调度,避免 shutdown 等待间隙中再触发一次
+ futureMap.values().forEach(future -> future.cancel(false));
+ futureMap.clear();
+ runnableMap.clear();
+ // shutdown 会遵守上面 setWaitForTasksToCompleteOnShutdown(true) + awaitTermination 10s
+ threadPoolTaskScheduler.shutdown();
+ long elapsedMs = (System.nanoTime() - startNanos) / 1_000_000L;
+ // 如果接近 awaitTerminationSeconds(10) 的 10 000ms,说明有长任务压根没按期退出,需要排查
+ log.info("[动态任务] 调度器关闭完成,用时 {} ms,已清理任务数 {}", elapsedMs, pending);
+ }
+
/**
* 循环执行的任务
* @param key 任务ID
@@ -59,8 +87,11 @@ public class DynamicTask {
}
}
// scheduleWithFixedDelay 必须等待上一个任务结束才开始计时period, cycleForCatalog表示执行的间隔
-
- future = threadPoolTaskScheduler.scheduleAtFixedRate(task, new Date(System.currentTimeMillis() + cycleForCatalog), cycleForCatalog);
+ // 用 TtlRunnable 包一层:捕获提交时调用线程的 TransmittableThreadLocal 快照
+ // (包括 TenantContextHolder/ProjectContextHolder 的 tenantId/projectId/ignore 四个字段),
+ // 在调度线程执行时自动还原,解决"调用方有租户上下文,但 5s/10s 后回调线程没有"这类问题。
+ // 注意:runnableMap 仍然放原始 task,保证 get(key) 语义不变
+ future = threadPoolTaskScheduler.scheduleAtFixedRate(TtlRunnable.get(task), new Date(System.currentTimeMillis() + cycleForCatalog), cycleForCatalog);
if (future != null){
futureMap.put(key, future);
runnableMap.put(key, task);
@@ -96,7 +127,8 @@ public class DynamicTask {
}
}
// scheduleWithFixedDelay 必须等待上一个任务结束才开始计时period, cycleForCatalog表示执行的间隔
- future = threadPoolTaskScheduler.schedule(task, startInstant);
+ // 同 startCron:用 TtlRunnable 透传调用线程的 TTL 快照,避免 delay 回调在调度线程丢上下文
+ future = threadPoolTaskScheduler.schedule(TtlRunnable.get(task), startInstant);
if (future != null){
futureMap.put(key, future);
runnableMap.put(key, task);
diff --git a/viewsh-module-video/viewsh-module-video-server/src/main/java/com/viewsh/module/video/framework/config/ThreadPoolTaskConfig.java b/viewsh-module-video/viewsh-module-video-server/src/main/java/com/viewsh/module/video/framework/config/ThreadPoolTaskConfig.java
index d69f6acf..357ea251 100644
--- a/viewsh-module-video/viewsh-module-video-server/src/main/java/com/viewsh/module/video/framework/config/ThreadPoolTaskConfig.java
+++ b/viewsh-module-video/viewsh-module-video-server/src/main/java/com/viewsh/module/video/framework/config/ThreadPoolTaskConfig.java
@@ -1,8 +1,12 @@
package com.viewsh.module.video.framework.config;
+import com.alibaba.ttl.TtlRunnable;
+import org.springframework.beans.BeansException;
+import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.annotation.Order;
+import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
@@ -64,4 +68,28 @@ public class ThreadPoolTaskConfig {
executor.initialize();
return executor;
}
+
+ /**
+ * 给线程池中的任务包装 TtlRunnable,保证 TransmittableThreadLocal 透传
+ * (从框架 ViewshAsyncAutoConfiguration 迁移过来,配合主类 exclude 使用)
+ */
+ @Bean
+ public BeanPostProcessor threadPoolTaskExecutorBeanPostProcessor() {
+ return new BeanPostProcessor() {
+
+ @Override
+ public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
+ if (bean instanceof ThreadPoolTaskExecutor executor) {
+ executor.setTaskDecorator(TtlRunnable::get);
+ return executor;
+ }
+ if (bean instanceof SimpleAsyncTaskExecutor executor) {
+ executor.setTaskDecorator(TtlRunnable::get);
+ return executor;
+ }
+ return bean;
+ }
+
+ };
+ }
}
diff --git a/viewsh-module-video/viewsh-module-video-server/src/main/java/com/viewsh/module/video/framework/util/VideoContextUtils.java b/viewsh-module-video/viewsh-module-video-server/src/main/java/com/viewsh/module/video/framework/util/VideoContextUtils.java
new file mode 100644
index 00000000..0a37e916
--- /dev/null
+++ b/viewsh-module-video/viewsh-module-video-server/src/main/java/com/viewsh/module/video/framework/util/VideoContextUtils.java
@@ -0,0 +1,42 @@
+package com.viewsh.module.video.framework.util;
+
+import com.viewsh.framework.tenant.core.util.ProjectUtils;
+import com.viewsh.framework.tenant.core.util.TenantUtils;
+
+import java.util.concurrent.Callable;
+
+/**
+ * Video 模块专用的上下文执行工具。
+ *
+ * Video 模块的定时任务、启动初始化、Hook 事件回调等场景都跑在没有租户/项目上下文的线程里,
+ * 需要跨租户、跨项目操作数据。之前在各处重复写
+ *
+ * TenantUtils.executeIgnore(() -> ProjectUtils.executeIgnore(() -> { ... }));
+ *
+ * 现在统一收到这里,读写都更容易。
+ *
+ * 不要把这段逻辑往框架层 {@code TenantUtils} 里塞——
+ * 是否要同时 ignore project 是业务决策(Ops 模块里不用 project 概念时就不该强推),
+ * 所以留在业务模块里。
+ */
+public final class VideoContextUtils {
+
+ private VideoContextUtils() {}
+
+ /**
+ * 同时 ignore 租户和项目上下文执行 runnable。
+ *
+ * 语义等同于 {@code TenantUtils.executeIgnore(() -> ProjectUtils.executeIgnore(runnable))},
+ * 注意嵌套顺序:外层 tenant、内层 project,保持与原手写代码一致,避免拦截器读取顺序变化带来隐性 bug。
+ */
+ public static void executeIgnoreTenantAndProject(Runnable runnable) {
+ TenantUtils.executeIgnore(() -> ProjectUtils.executeIgnore(runnable));
+ }
+
+ /**
+ * 带返回值版本,方便给 {@code queryXxx} 这类读操作用。
+ */
+ public static V executeIgnoreTenantAndProject(Callable callable) {
+ return TenantUtils.executeIgnore(() -> ProjectUtils.executeIgnore(callable));
+ }
+}
diff --git a/viewsh-module-video/viewsh-module-video-server/src/main/java/com/viewsh/module/video/gb28181/service/impl/DeviceServiceImpl.java b/viewsh-module-video/viewsh-module-video-server/src/main/java/com/viewsh/module/video/gb28181/service/impl/DeviceServiceImpl.java
index 552f0feb..93694863 100644
--- a/viewsh-module-video/viewsh-module-video-server/src/main/java/com/viewsh/module/video/gb28181/service/impl/DeviceServiceImpl.java
+++ b/viewsh-module-video/viewsh-module-video-server/src/main/java/com/viewsh/module/video/gb28181/service/impl/DeviceServiceImpl.java
@@ -39,6 +39,7 @@ import com.viewsh.module.video.vmanager.bean.WVPResult;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.viewsh.framework.common.pojo.PageResult;
+import com.viewsh.module.video.framework.util.VideoContextUtils;
import gov.nist.javax.sip.message.SIPResponse;
import jakarta.validation.constraints.NotNull;
import lombok.extern.slf4j.Slf4j;
@@ -126,7 +127,11 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner {
@Override
public void run(String... args) throws Exception {
+ // 启动时主线程没有租户/项目上下文,需要全量加载/巡检所有设备(跨租户跨项目)
+ VideoContextUtils.executeIgnoreTenantAndProject(this::initOnStartup);
+ }
+ private void initOnStartup() {
// 清理数据库不存在但是redis中存在的数据
List devicesInDb = getAll();
if (devicesInDb.isEmpty()) {
diff --git a/viewsh-module-video/viewsh-module-video-server/src/main/java/com/viewsh/module/video/gb28181/service/impl/GbChannelServiceImpl.java b/viewsh-module-video/viewsh-module-video-server/src/main/java/com/viewsh/module/video/gb28181/service/impl/GbChannelServiceImpl.java
index 2ab396bc..928a8546 100644
--- a/viewsh-module-video/viewsh-module-video-server/src/main/java/com/viewsh/module/video/gb28181/service/impl/GbChannelServiceImpl.java
+++ b/viewsh-module-video/viewsh-module-video-server/src/main/java/com/viewsh/module/video/gb28181/service/impl/GbChannelServiceImpl.java
@@ -25,6 +25,7 @@ import com.viewsh.module.video.vmanager.bean.ErrorCode;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.viewsh.framework.common.pojo.PageResult;
+import com.viewsh.module.video.framework.util.VideoContextUtils;
import com.google.common.base.CaseFormat;
import lombok.extern.slf4j.Slf4j;
import no.ecc.vectortile.VectorTileEncoder;
@@ -86,6 +87,12 @@ public class GbChannelServiceImpl implements IGbChannelService, CommandLineRunne
@Override
public void run(String... args) throws Exception {
+ // 启动时主线程没有租户/项目上下文,需要跨租户加载通道以重建抽稀图层
+ // 参照 DeviceServiceImpl.run 的写法,统一用 executeIgnore 包裹,避免 ProjectContextHolder 抛错阻断启动
+ VideoContextUtils.executeIgnoreTenantAndProject(this::rebuildVectorTileOnStartup);
+ }
+
+ private void rebuildVectorTileOnStartup() {
// 启动时重新发布抽稀图层
List channelList = commonGBChannelMapper.queryAllWithPosition();
Map> zoomCameraMap = new ConcurrentHashMap<>();
diff --git a/viewsh-module-video/viewsh-module-video-server/src/main/java/com/viewsh/module/video/gb28181/service/impl/PlatformServiceImpl.java b/viewsh-module-video/viewsh-module-video-server/src/main/java/com/viewsh/module/video/gb28181/service/impl/PlatformServiceImpl.java
index e059cf7b..b6b35f0f 100644
--- a/viewsh-module-video/viewsh-module-video-server/src/main/java/com/viewsh/module/video/gb28181/service/impl/PlatformServiceImpl.java
+++ b/viewsh-module-video/viewsh-module-video-server/src/main/java/com/viewsh/module/video/gb28181/service/impl/PlatformServiceImpl.java
@@ -36,6 +36,7 @@ import com.viewsh.module.video.vmanager.bean.ErrorCode;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.viewsh.framework.common.pojo.PageResult;
+import com.viewsh.module.video.framework.util.VideoContextUtils;
import gov.nist.javax.sip.message.SIPResponse;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
@@ -115,7 +116,12 @@ public class PlatformServiceImpl implements IPlatformService, CommandLineRunner
@Override
public void run(String... args) throws Exception {
+ // 启动时主线程没有租户/项目上下文,统一跨租户清理级联状态
+ // 参照 DeviceServiceImpl.run 的写法,避免 ProjectContextHolder 抛错阻断启动
+ VideoContextUtils.executeIgnoreTenantAndProject(this::initOnStartup);
+ }
+ private void initOnStartup() {
// 查找国标推流
List sendRtpItems = redisCatchStorage.queryAllSendRTPServer();
if (!sendRtpItems.isEmpty()) {
diff --git a/viewsh-module-video/viewsh-module-video-server/src/main/java/com/viewsh/module/video/service/impl/MobilePositionServiceImpl.java b/viewsh-module-video/viewsh-module-video-server/src/main/java/com/viewsh/module/video/service/impl/MobilePositionServiceImpl.java
index 48fbe079..91ea2d9a 100644
--- a/viewsh-module-video/viewsh-module-video-server/src/main/java/com/viewsh/module/video/service/impl/MobilePositionServiceImpl.java
+++ b/viewsh-module-video/viewsh-module-video-server/src/main/java/com/viewsh/module/video/service/impl/MobilePositionServiceImpl.java
@@ -10,11 +10,13 @@ import com.viewsh.module.video.gb28181.dao.DeviceMapper;
import com.viewsh.module.video.gb28181.dao.DeviceMobilePositionMapper;
import com.viewsh.module.video.gb28181.dao.PlatformMapper;
import com.viewsh.module.video.gb28181.utils.Coordtransform;
+import com.viewsh.module.video.framework.util.VideoContextUtils;
import com.viewsh.module.video.service.IMobilePositionService;
import com.viewsh.module.video.utils.DateUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.context.annotation.Lazy;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
@@ -49,6 +51,15 @@ public class MobilePositionServiceImpl implements IMobilePositionService {
@Qualifier("videoRedisTemplateForMobilePosition")
private RedisTemplate redisTemplate;
+ /**
+ * 自身代理。@Scheduled 入口方法里从 Redis 拉取队列后,需要调用 {@link #persistPositions(List)} 完成
+ * 落库,这一步必须走代理(self.persistPositions)才能让方法上的 @Transactional 生效;
+ * 直接 this.persistPositions 是 Spring 自调用,事务注解会被绕过。
+ */
+ @Autowired
+ @Lazy
+ private MobilePositionServiceImpl self;
+
private final String REDIS_MOBILE_POSITION_LIST = "redis_mobile_position_list";
@Override
@@ -95,13 +106,27 @@ public class MobilePositionServiceImpl implements IMobilePositionService {
}
@Scheduled(fixedDelay = 1000)
- @Transactional
public void executeTaskQueue() {
- int countLimit = 3000;
- List mobilePositions = get(countLimit);
- if (mobilePositions == null || mobilePositions.isEmpty()) {
- return;
- }
+ // 定时任务线程没有租户/项目上下文,移动位置属于跨租户跨项目批量处理。
+ // 先从 Redis 拉队列(不需要事务),空队列直接返回,避免每秒一次的空事务;
+ // 拿到数据后再通过 self 代理进入 @Transactional 方法,保证 batchadd + 通道位置批量更新原子。
+ VideoContextUtils.executeIgnoreTenantAndProject(() -> {
+ int countLimit = 3000;
+ List mobilePositions = get(countLimit);
+ if (mobilePositions == null || mobilePositions.isEmpty()) {
+ return;
+ }
+ self.persistPositions(mobilePositions);
+ });
+ }
+
+ /**
+ * 落库 + 批量更新通道位置。必须通过 self 代理调用,否则 @Transactional 不生效。
+ * 把 mobilePositionMapper.batchadd 和 channelMapper.batchUpdatePosition 放进同一个事务,
+ * 避免"轨迹表写入成功但通道位置未更新"的数据不一致。
+ */
+ @Transactional
+ public void persistPositions(List mobilePositions) {
if (userSetting.getSavePositionHistory()) {
mobilePositionMapper.batchadd(mobilePositions);
}
diff --git a/viewsh-module-video/viewsh-module-video-server/src/main/java/com/viewsh/module/video/service/impl/RecordPlanServiceImpl.java b/viewsh-module-video/viewsh-module-video-server/src/main/java/com/viewsh/module/video/service/impl/RecordPlanServiceImpl.java
index 913067d1..dc10f878 100644
--- a/viewsh-module-video/viewsh-module-video-server/src/main/java/com/viewsh/module/video/service/impl/RecordPlanServiceImpl.java
+++ b/viewsh-module-video/viewsh-module-video-server/src/main/java/com/viewsh/module/video/service/impl/RecordPlanServiceImpl.java
@@ -18,6 +18,7 @@ import com.viewsh.module.video.vmanager.bean.ErrorCode;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.viewsh.framework.common.pojo.PageResult;
+import com.viewsh.module.video.framework.util.VideoContextUtils;
import com.google.common.base.Joiner;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
@@ -82,6 +83,11 @@ public class RecordPlanServiceImpl implements IRecordPlanService {
@Scheduled(fixedRate = 1, timeUnit = TimeUnit.MINUTES)
public void execution() {
+ // 定时任务线程没有租户/项目上下文,录像计划需要跨租户跨项目执行
+ VideoContextUtils.executeIgnoreTenantAndProject(this::doExecution);
+ }
+
+ private void doExecution() {
// 查询现在需要录像的通道Id
List startChannelIdList = queryCurrentChannelRecord();
diff --git a/viewsh-module-video/viewsh-module-video-server/src/main/java/com/viewsh/module/video/service/redisMsg/RedisPushStreamStatusMsgListener.java b/viewsh-module-video/viewsh-module-video-server/src/main/java/com/viewsh/module/video/service/redisMsg/RedisPushStreamStatusMsgListener.java
index bae1faf0..bfab8ad1 100644
--- a/viewsh-module-video/viewsh-module-video-server/src/main/java/com/viewsh/module/video/service/redisMsg/RedisPushStreamStatusMsgListener.java
+++ b/viewsh-module-video/viewsh-module-video-server/src/main/java/com/viewsh/module/video/service/redisMsg/RedisPushStreamStatusMsgListener.java
@@ -2,6 +2,7 @@ package com.viewsh.module.video.service.redisMsg;
import com.alibaba.fastjson2.JSON;
import com.viewsh.module.video.common.VideoManagerConstants;
+import com.viewsh.module.video.framework.util.VideoContextUtils;
import com.viewsh.module.video.framework.config.DynamicTask;
import com.viewsh.module.video.framework.config.UserSetting;
import com.viewsh.module.video.service.bean.PushStreamStatusChangeFromRedisDto;
@@ -102,6 +103,11 @@ public class RedisPushStreamStatusMsgListener implements MessageListener, Applic
@Override
public void run(ApplicationArguments args) throws Exception {
+ // 启动时主线程没有租户/项目上下文,查询推流设备需要跨租户;与 DeviceServiceImpl.run 对齐
+ VideoContextUtils.executeIgnoreTenantAndProject(this::initOnStartup);
+ }
+
+ private void initOnStartup() {
if (userSetting.getUsePushingAsStatus()) {
return;
}
@@ -112,6 +118,8 @@ public class RedisPushStreamStatusMsgListener implements MessageListener, Applic
}
// 启动时设置所有推流通道离线,发起查询请求
redisCatchStorage.sendStreamPushRequestedMsgForStatus();
+ // DynamicTask 内部已用 TtlRunnable 包装,会自动把当前线程(此刻处于 executeIgnore 作用域内)
+ // 的 TenantContextHolder/ProjectContextHolder 快照透传到 5s 后的调度线程
dynamicTask.startDelay(VideoManagerConstants.VM_MSG_GET_ALL_ONLINE_REQUESTED, () -> {
log.info("[REDIS消息]未收到redis回复推流设备状态,执行推流设备离线");
// 五秒收不到请求就设置通道离线,然后通知上级离线
diff --git a/viewsh-module-video/viewsh-module-video-server/src/main/java/com/viewsh/module/video/streamProxy/service/impl/StreamProxyServiceImpl.java b/viewsh-module-video/viewsh-module-video-server/src/main/java/com/viewsh/module/video/streamProxy/service/impl/StreamProxyServiceImpl.java
index 2e407087..c808ea2a 100644
--- a/viewsh-module-video/viewsh-module-video-server/src/main/java/com/viewsh/module/video/streamProxy/service/impl/StreamProxyServiceImpl.java
+++ b/viewsh-module-video/viewsh-module-video-server/src/main/java/com/viewsh/module/video/streamProxy/service/impl/StreamProxyServiceImpl.java
@@ -28,6 +28,7 @@ import com.viewsh.module.video.vmanager.bean.ResourceBaseInfo;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.viewsh.framework.common.pojo.PageResult;
+import com.viewsh.module.video.framework.util.VideoContextUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.event.EventListener;
@@ -88,9 +89,11 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
@Transactional
@org.springframework.context.event.EventListener
public void onApplicationEvent(MediaArrivalEvent event) {
- if ("rtsp".equals(event.getSchema())) {
- streamChangeHandler(event.getApp(), event.getStream(), event.getMediaServer().getId(), true);
- }
+ VideoContextUtils.executeIgnoreTenantAndProject(() -> {
+ if ("rtsp".equals(event.getSchema())) {
+ streamChangeHandler(event.getApp(), event.getStream(), event.getMediaServer().getId(), true);
+ }
+ });
}
/**
@@ -100,9 +103,11 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
@EventListener
@Transactional
public void onApplicationEvent(MediaDepartureEvent event) {
- if ("rtsp".equals(event.getSchema())) {
- streamChangeHandler(event.getApp(), event.getStream(), event.getMediaServer().getId(), false);
- }
+ VideoContextUtils.executeIgnoreTenantAndProject(() -> {
+ if ("rtsp".equals(event.getSchema())) {
+ streamChangeHandler(event.getApp(), event.getStream(), event.getMediaServer().getId(), false);
+ }
+ });
}
/**
@@ -111,16 +116,18 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
@Async("taskExecutor")
@EventListener
public void onApplicationEvent(MediaNotFoundEvent event) {
- if ("rtp".equals(event.getApp())) {
- return;
- }
- // 拉流代理
- StreamProxy streamProxyByAppAndStream = getStreamProxyByAppAndStream(event.getApp(), event.getStream());
- if (streamProxyByAppAndStream != null && streamProxyByAppAndStream.isEnableDisableNoneReader()) {
- startByAppAndStream(event.getApp(), event.getStream(), ((code, msg, data) -> {
- log.info("[拉流代理] 自动点播成功, app: {}, stream: {}", event.getApp(), event.getStream());
- }));
- }
+ VideoContextUtils.executeIgnoreTenantAndProject(() -> {
+ if ("rtp".equals(event.getApp())) {
+ return;
+ }
+ // 拉流代理
+ StreamProxy streamProxyByAppAndStream = getStreamProxyByAppAndStream(event.getApp(), event.getStream());
+ if (streamProxyByAppAndStream != null && streamProxyByAppAndStream.isEnableDisableNoneReader()) {
+ startByAppAndStream(event.getApp(), event.getStream(), ((code, msg, data) -> {
+ log.info("[拉流代理] 自动点播成功, app: {}, stream: {}", event.getApp(), event.getStream());
+ }));
+ }
+ });
}
/**
@@ -130,7 +137,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
@EventListener
@Transactional
public void onApplicationEvent(MediaServerOnlineEvent event) {
- zlmServerOnline(event.getMediaServer());
+ VideoContextUtils.executeIgnoreTenantAndProject(() -> zlmServerOnline(event.getMediaServer()));
}
/**
@@ -140,7 +147,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
@EventListener
@Transactional
public void onApplicationEvent(MediaServerOfflineEvent event) {
- zlmServerOffline(event.getMediaServer());
+ VideoContextUtils.executeIgnoreTenantAndProject(() -> zlmServerOffline(event.getMediaServer()));
}
diff --git a/viewsh-module-video/viewsh-module-video-server/src/main/java/com/viewsh/module/video/streamPush/service/impl/StreamPushServiceImpl.java b/viewsh-module-video/viewsh-module-video-server/src/main/java/com/viewsh/module/video/streamPush/service/impl/StreamPushServiceImpl.java
index 7bdb043a..f75cf556 100644
--- a/viewsh-module-video/viewsh-module-video-server/src/main/java/com/viewsh/module/video/streamPush/service/impl/StreamPushServiceImpl.java
+++ b/viewsh-module-video/viewsh-module-video-server/src/main/java/com/viewsh/module/video/streamPush/service/impl/StreamPushServiceImpl.java
@@ -27,6 +27,7 @@ import com.viewsh.module.video.vmanager.bean.ResourceBaseInfo;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.viewsh.framework.common.pojo.PageResult;
+import com.viewsh.module.video.framework.util.VideoContextUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.event.EventListener;
@@ -68,50 +69,52 @@ public class StreamPushServiceImpl implements IStreamPushService {
@EventListener
@Transactional
public void onApplicationEvent(MediaArrivalEvent event) {
- MediaInfo mediaInfo = event.getMediaInfo();
- if (mediaInfo == null) {
- return;
- }
- if (mediaInfo.getOriginType() != OriginType.RTMP_PUSH.ordinal()
- && mediaInfo.getOriginType() != OriginType.RTSP_PUSH.ordinal()
- && mediaInfo.getOriginType() != OriginType.RTC_PUSH.ordinal()) {
- return;
- }
+ VideoContextUtils.executeIgnoreTenantAndProject(() -> {
+ MediaInfo mediaInfo = event.getMediaInfo();
+ if (mediaInfo == null) {
+ return;
+ }
+ if (mediaInfo.getOriginType() != OriginType.RTMP_PUSH.ordinal()
+ && mediaInfo.getOriginType() != OriginType.RTSP_PUSH.ordinal()
+ && mediaInfo.getOriginType() != OriginType.RTC_PUSH.ordinal()) {
+ return;
+ }
- StreamAuthorityInfo streamAuthorityInfo = redisCatchStorage.getStreamAuthorityInfo(event.getApp(), event.getStream());
- if (streamAuthorityInfo == null) {
- streamAuthorityInfo = StreamAuthorityInfo.getInstanceByHook(event);
- } else {
- streamAuthorityInfo.setOriginType(mediaInfo.getOriginType());
- }
- redisCatchStorage.updateStreamAuthorityInfo(event.getApp(), event.getStream(), streamAuthorityInfo);
+ StreamAuthorityInfo streamAuthorityInfo = redisCatchStorage.getStreamAuthorityInfo(event.getApp(), event.getStream());
+ if (streamAuthorityInfo == null) {
+ streamAuthorityInfo = StreamAuthorityInfo.getInstanceByHook(event);
+ } else {
+ streamAuthorityInfo.setOriginType(mediaInfo.getOriginType());
+ }
+ redisCatchStorage.updateStreamAuthorityInfo(event.getApp(), event.getStream(), streamAuthorityInfo);
- StreamPush streamPushInDb = getPush(event.getApp(), event.getStream());
- if (streamPushInDb == null) {
- StreamPush streamPush = StreamPush.getInstance(event, userSetting.getServerId());
- streamPush.setPushing(true);
- streamPush.setServerId(userSetting.getServerId());
- streamPush.setPushTime(DateUtil.getNow());
- add(streamPush);
- }else {
- streamPushInDb.setPushing(true);
- streamPushInDb.setServerId(userSetting.getServerId());
- streamPushInDb.setMediaServerId(mediaInfo.getMediaServer().getId());
- updatePushStatus(streamPushInDb);
- }
- // 冗余数据,自己系统中自用
- if (!"broadcast".equals(event.getApp()) && !"talk".equals(event.getApp())) {
- redisCatchStorage.addPushListItem(event.getApp(), event.getStream(), event.getMediaInfo());
- }
+ StreamPush streamPushInDb = getPush(event.getApp(), event.getStream());
+ if (streamPushInDb == null) {
+ StreamPush streamPush = StreamPush.getInstance(event, userSetting.getServerId());
+ streamPush.setPushing(true);
+ streamPush.setServerId(userSetting.getServerId());
+ streamPush.setPushTime(DateUtil.getNow());
+ add(streamPush);
+ }else {
+ streamPushInDb.setPushing(true);
+ streamPushInDb.setServerId(userSetting.getServerId());
+ streamPushInDb.setMediaServerId(mediaInfo.getMediaServer().getId());
+ updatePushStatus(streamPushInDb);
+ }
+ // 冗余数据,自己系统中自用
+ if (!"broadcast".equals(event.getApp()) && !"talk".equals(event.getApp())) {
+ redisCatchStorage.addPushListItem(event.getApp(), event.getStream(), event.getMediaInfo());
+ }
- // 发送流变化redis消息
- JSONObject jsonObject = new JSONObject();
- jsonObject.put("serverId", userSetting.getServerId());
- jsonObject.put("app", event.getApp());
- jsonObject.put("stream", event.getStream());
- jsonObject.put("register", true);
- jsonObject.put("mediaServerId", event.getMediaServer().getId());
- redisCatchStorage.sendStreamChangeMsg(OriginType.values()[event.getMediaInfo().getOriginType()].getType(), jsonObject);
+ // 发送流变化redis消息
+ JSONObject jsonObject = new JSONObject();
+ jsonObject.put("serverId", userSetting.getServerId());
+ jsonObject.put("app", event.getApp());
+ jsonObject.put("stream", event.getStream());
+ jsonObject.put("register", true);
+ jsonObject.put("mediaServerId", event.getMediaServer().getId());
+ redisCatchStorage.sendStreamChangeMsg(OriginType.values()[event.getMediaInfo().getOriginType()].getType(), jsonObject);
+ });
}
/**
@@ -121,34 +124,35 @@ public class StreamPushServiceImpl implements IStreamPushService {
@EventListener
@Transactional
public void onApplicationEvent(MediaDepartureEvent event) {
+ VideoContextUtils.executeIgnoreTenantAndProject(() -> {
+ // 兼容流注销时类型从redis记录获取
+ MediaInfo mediaInfo = redisCatchStorage.getPushListItem(event.getApp(), event.getStream());
- // 兼容流注销时类型从redis记录获取
- MediaInfo mediaInfo = redisCatchStorage.getPushListItem(event.getApp(), event.getStream());
-
- if (mediaInfo != null) {
- log.info("[推流信息] 查询到redis存在推流缓存, 开始清理,{}/{}", event.getApp(), event.getStream());
- String type = OriginType.values()[mediaInfo.getOriginType()].getType();
- // 冗余数据,自己系统中自用
- redisCatchStorage.removePushListItem(event.getApp(), event.getStream(), event.getMediaServer().getId());
- // 发送流变化redis消息
- JSONObject jsonObject = new JSONObject();
- jsonObject.put("serverId", userSetting.getServerId());
- jsonObject.put("app", event.getApp());
- jsonObject.put("stream", event.getStream());
- jsonObject.put("register", false);
- jsonObject.put("mediaServerId", event.getMediaServer().getId());
- redisCatchStorage.sendStreamChangeMsg(type, jsonObject);
- }
- StreamPush streamPush = getPush(event.getApp(), event.getStream());
- if (streamPush == null) {
- return;
- }
- if (streamPush.getGbDeviceId() != null) {
- streamPush.setPushing(false);
- updatePushStatus(streamPush);
- }else {
- deleteByAppAndStream(event.getApp(), event.getStream());
- }
+ if (mediaInfo != null) {
+ log.info("[推流信息] 查询到redis存在推流缓存, 开始清理,{}/{}", event.getApp(), event.getStream());
+ String type = OriginType.values()[mediaInfo.getOriginType()].getType();
+ // 冗余数据,自己系统中自用
+ redisCatchStorage.removePushListItem(event.getApp(), event.getStream(), event.getMediaServer().getId());
+ // 发送流变化redis消息
+ JSONObject jsonObject = new JSONObject();
+ jsonObject.put("serverId", userSetting.getServerId());
+ jsonObject.put("app", event.getApp());
+ jsonObject.put("stream", event.getStream());
+ jsonObject.put("register", false);
+ jsonObject.put("mediaServerId", event.getMediaServer().getId());
+ redisCatchStorage.sendStreamChangeMsg(type, jsonObject);
+ }
+ StreamPush streamPush = getPush(event.getApp(), event.getStream());
+ if (streamPush == null) {
+ return;
+ }
+ if (streamPush.getGbDeviceId() != null) {
+ streamPush.setPushing(false);
+ updatePushStatus(streamPush);
+ }else {
+ deleteByAppAndStream(event.getApp(), event.getStream());
+ }
+ });
}
/**
@@ -158,7 +162,7 @@ public class StreamPushServiceImpl implements IStreamPushService {
@EventListener
@Transactional
public void onApplicationEvent(MediaServerOnlineEvent event) {
- zlmServerOnline(event.getMediaServer());
+ VideoContextUtils.executeIgnoreTenantAndProject(() -> zlmServerOnline(event.getMediaServer()));
}
/**
@@ -168,7 +172,7 @@ public class StreamPushServiceImpl implements IStreamPushService {
@EventListener
@Transactional
public void onApplicationEvent(MediaServerOfflineEvent event) {
- zlmServerOffline(event.getMediaServer());
+ VideoContextUtils.executeIgnoreTenantAndProject(() -> zlmServerOffline(event.getMediaServer()));
}
@Override