diff --git a/viewsh-module-video/viewsh-module-video-server/src/main/java/com/viewsh/module/video/VideoServerApplication.java b/viewsh-module-video/viewsh-module-video-server/src/main/java/com/viewsh/module/video/VideoServerApplication.java index 7314f13b..7b24b418 100644 --- a/viewsh-module-video/viewsh-module-video-server/src/main/java/com/viewsh/module/video/VideoServerApplication.java +++ b/viewsh-module-video/viewsh-module-video-server/src/main/java/com/viewsh/module/video/VideoServerApplication.java @@ -1,12 +1,27 @@ package com.viewsh.module.video; +import com.viewsh.framework.quartz.config.ViewshAsyncAutoConfiguration; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; /** * Video 视频模块的启动类 + * + *

排除框架的 {@link ViewshAsyncAutoConfiguration},因为它用的是默认 + * {@code @EnableAsync}(JDK 动态代理)。在 WVP 迁移过来的 SIP/EventListener/@Async 代码里, + * 大量 Bean 是按"具体类注入 + 按具体类查方法"的写法(而不是面向接口), + * JDK 代理会报 "cannot be cast to concrete class" / 找不到方法等异常。

+ * + *

改由 {@link com.viewsh.module.video.framework.config.ThreadPoolTaskConfig} 里的 + * {@code @EnableAsync(proxyTargetClass = true)} + TtlRunnable 装饰器接管, + * 同时那里复制了框架的 threadPoolTaskExecutorBeanPostProcessor 逻辑。

+ * + *

TODO(框架侧):在 {@link ViewshAsyncAutoConfiguration} 上加 + * {@code @ConditionalOnProperty} 开关(例如 {@code viewsh.async.enabled})并把 + * {@code @EnableAsync} 改成 {@code proxyTargetClass = true}, + * 这样 video 模块只需要关一下开关,不用在业务模块里抄代码。

*/ -@SpringBootApplication +@SpringBootApplication(exclude = ViewshAsyncAutoConfiguration.class) public class VideoServerApplication { public static void main(String[] args) { diff --git a/viewsh-module-video/viewsh-module-video-server/src/main/java/com/viewsh/module/video/framework/config/DynamicTask.java b/viewsh-module-video/viewsh-module-video-server/src/main/java/com/viewsh/module/video/framework/config/DynamicTask.java index d8d3122e..1f2f557b 100644 --- a/viewsh-module-video/viewsh-module-video-server/src/main/java/com/viewsh/module/video/framework/config/DynamicTask.java +++ b/viewsh-module-video/viewsh-module-video-server/src/main/java/com/viewsh/module/video/framework/config/DynamicTask.java @@ -1,5 +1,6 @@ package com.viewsh.module.video.framework.config; +import com.alibaba.ttl.TtlRunnable; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.ObjectUtils; import org.springframework.scheduling.annotation.Scheduled; @@ -7,6 +8,7 @@ import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; import org.springframework.stereotype.Component; import jakarta.annotation.PostConstruct; +import jakarta.annotation.PreDestroy; import java.time.Instant; import java.util.Date; import java.util.Map; @@ -38,6 +40,32 @@ public class DynamicTask { threadPoolTaskScheduler.initialize(); } + /** + * 上下文关闭时显式 shutdown 调度器。 + *

threadPoolTaskScheduler 是 @PostConstruct 手工 new 出来的(不是 Spring Bean), + * 所以 setWaitForTasksToCompleteOnShutdown / setAwaitTerminationSeconds 不会被 Spring 自动触发。 + * 如果不加这段,关闭后本调度器的任务仍会继续调度数据库查询, + * 而 DataSource 已先关闭,产生 CannotGetJdbcConnectionException 噪音日志。

+ */ + @PreDestroy + public void destroy() { + if (threadPoolTaskScheduler == null) { + return; + } + int pending = futureMap.size(); + log.info("[动态任务] 关闭调度器,待清理任务数 {}", pending); + long startNanos = System.nanoTime(); + // 先取消所有调度,避免 shutdown 等待间隙中再触发一次 + futureMap.values().forEach(future -> future.cancel(false)); + futureMap.clear(); + runnableMap.clear(); + // shutdown 会遵守上面 setWaitForTasksToCompleteOnShutdown(true) + awaitTermination 10s + threadPoolTaskScheduler.shutdown(); + long elapsedMs = (System.nanoTime() - startNanos) / 1_000_000L; + // 如果接近 awaitTerminationSeconds(10) 的 10 000ms,说明有长任务压根没按期退出,需要排查 + log.info("[动态任务] 调度器关闭完成,用时 {} ms,已清理任务数 {}", elapsedMs, pending); + } + /** * 循环执行的任务 * @param key 任务ID @@ -59,8 +87,11 @@ public class DynamicTask { } } // scheduleWithFixedDelay 必须等待上一个任务结束才开始计时period, cycleForCatalog表示执行的间隔 - - future = threadPoolTaskScheduler.scheduleAtFixedRate(task, new Date(System.currentTimeMillis() + cycleForCatalog), cycleForCatalog); + // 用 TtlRunnable 包一层:捕获提交时调用线程的 TransmittableThreadLocal 快照 + // (包括 TenantContextHolder/ProjectContextHolder 的 tenantId/projectId/ignore 四个字段), + // 在调度线程执行时自动还原,解决"调用方有租户上下文,但 5s/10s 后回调线程没有"这类问题。 + // 注意:runnableMap 仍然放原始 task,保证 get(key) 语义不变 + future = threadPoolTaskScheduler.scheduleAtFixedRate(TtlRunnable.get(task), new Date(System.currentTimeMillis() + cycleForCatalog), cycleForCatalog); if (future != null){ futureMap.put(key, future); runnableMap.put(key, task); @@ -96,7 +127,8 @@ public class DynamicTask { } } // scheduleWithFixedDelay 必须等待上一个任务结束才开始计时period, cycleForCatalog表示执行的间隔 - future = threadPoolTaskScheduler.schedule(task, startInstant); + // 同 startCron:用 TtlRunnable 透传调用线程的 TTL 快照,避免 delay 回调在调度线程丢上下文 + future = threadPoolTaskScheduler.schedule(TtlRunnable.get(task), startInstant); if (future != null){ futureMap.put(key, future); runnableMap.put(key, task); diff --git a/viewsh-module-video/viewsh-module-video-server/src/main/java/com/viewsh/module/video/framework/config/ThreadPoolTaskConfig.java b/viewsh-module-video/viewsh-module-video-server/src/main/java/com/viewsh/module/video/framework/config/ThreadPoolTaskConfig.java index d69f6acf..357ea251 100644 --- a/viewsh-module-video/viewsh-module-video-server/src/main/java/com/viewsh/module/video/framework/config/ThreadPoolTaskConfig.java +++ b/viewsh-module-video/viewsh-module-video-server/src/main/java/com/viewsh/module/video/framework/config/ThreadPoolTaskConfig.java @@ -1,8 +1,12 @@ package com.viewsh.module.video.framework.config; +import com.alibaba.ttl.TtlRunnable; +import org.springframework.beans.BeansException; +import org.springframework.beans.factory.config.BeanPostProcessor; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.core.annotation.Order; +import org.springframework.core.task.SimpleAsyncTaskExecutor; import org.springframework.scheduling.annotation.EnableAsync; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; @@ -64,4 +68,28 @@ public class ThreadPoolTaskConfig { executor.initialize(); return executor; } + + /** + * 给线程池中的任务包装 TtlRunnable,保证 TransmittableThreadLocal 透传 + * (从框架 ViewshAsyncAutoConfiguration 迁移过来,配合主类 exclude 使用) + */ + @Bean + public BeanPostProcessor threadPoolTaskExecutorBeanPostProcessor() { + return new BeanPostProcessor() { + + @Override + public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException { + if (bean instanceof ThreadPoolTaskExecutor executor) { + executor.setTaskDecorator(TtlRunnable::get); + return executor; + } + if (bean instanceof SimpleAsyncTaskExecutor executor) { + executor.setTaskDecorator(TtlRunnable::get); + return executor; + } + return bean; + } + + }; + } } diff --git a/viewsh-module-video/viewsh-module-video-server/src/main/java/com/viewsh/module/video/framework/util/VideoContextUtils.java b/viewsh-module-video/viewsh-module-video-server/src/main/java/com/viewsh/module/video/framework/util/VideoContextUtils.java new file mode 100644 index 00000000..0a37e916 --- /dev/null +++ b/viewsh-module-video/viewsh-module-video-server/src/main/java/com/viewsh/module/video/framework/util/VideoContextUtils.java @@ -0,0 +1,42 @@ +package com.viewsh.module.video.framework.util; + +import com.viewsh.framework.tenant.core.util.ProjectUtils; +import com.viewsh.framework.tenant.core.util.TenantUtils; + +import java.util.concurrent.Callable; + +/** + * Video 模块专用的上下文执行工具。 + * + *

Video 模块的定时任务、启动初始化、Hook 事件回调等场景都跑在没有租户/项目上下文的线程里, + * 需要跨租户、跨项目操作数据。之前在各处重复写 + *

+ *     TenantUtils.executeIgnore(() -> ProjectUtils.executeIgnore(() -> { ... }));
+ * 
+ * 现在统一收到这里,读写都更容易。

+ * + *

不要把这段逻辑往框架层 {@code TenantUtils} 里塞—— + * 是否要同时 ignore project 是业务决策(Ops 模块里不用 project 概念时就不该强推), + * 所以留在业务模块里。

+ */ +public final class VideoContextUtils { + + private VideoContextUtils() {} + + /** + * 同时 ignore 租户和项目上下文执行 runnable。 + * + *

语义等同于 {@code TenantUtils.executeIgnore(() -> ProjectUtils.executeIgnore(runnable))}, + * 注意嵌套顺序:外层 tenant、内层 project,保持与原手写代码一致,避免拦截器读取顺序变化带来隐性 bug。

+ */ + public static void executeIgnoreTenantAndProject(Runnable runnable) { + TenantUtils.executeIgnore(() -> ProjectUtils.executeIgnore(runnable)); + } + + /** + * 带返回值版本,方便给 {@code queryXxx} 这类读操作用。 + */ + public static V executeIgnoreTenantAndProject(Callable callable) { + return TenantUtils.executeIgnore(() -> ProjectUtils.executeIgnore(callable)); + } +} diff --git a/viewsh-module-video/viewsh-module-video-server/src/main/java/com/viewsh/module/video/gb28181/service/impl/DeviceServiceImpl.java b/viewsh-module-video/viewsh-module-video-server/src/main/java/com/viewsh/module/video/gb28181/service/impl/DeviceServiceImpl.java index 552f0feb..93694863 100644 --- a/viewsh-module-video/viewsh-module-video-server/src/main/java/com/viewsh/module/video/gb28181/service/impl/DeviceServiceImpl.java +++ b/viewsh-module-video/viewsh-module-video-server/src/main/java/com/viewsh/module/video/gb28181/service/impl/DeviceServiceImpl.java @@ -39,6 +39,7 @@ import com.viewsh.module.video.vmanager.bean.WVPResult; import com.baomidou.mybatisplus.core.metadata.IPage; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import com.viewsh.framework.common.pojo.PageResult; +import com.viewsh.module.video.framework.util.VideoContextUtils; import gov.nist.javax.sip.message.SIPResponse; import jakarta.validation.constraints.NotNull; import lombok.extern.slf4j.Slf4j; @@ -126,7 +127,11 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner { @Override public void run(String... args) throws Exception { + // 启动时主线程没有租户/项目上下文,需要全量加载/巡检所有设备(跨租户跨项目) + VideoContextUtils.executeIgnoreTenantAndProject(this::initOnStartup); + } + private void initOnStartup() { // 清理数据库不存在但是redis中存在的数据 List devicesInDb = getAll(); if (devicesInDb.isEmpty()) { diff --git a/viewsh-module-video/viewsh-module-video-server/src/main/java/com/viewsh/module/video/gb28181/service/impl/GbChannelServiceImpl.java b/viewsh-module-video/viewsh-module-video-server/src/main/java/com/viewsh/module/video/gb28181/service/impl/GbChannelServiceImpl.java index 2ab396bc..928a8546 100644 --- a/viewsh-module-video/viewsh-module-video-server/src/main/java/com/viewsh/module/video/gb28181/service/impl/GbChannelServiceImpl.java +++ b/viewsh-module-video/viewsh-module-video-server/src/main/java/com/viewsh/module/video/gb28181/service/impl/GbChannelServiceImpl.java @@ -25,6 +25,7 @@ import com.viewsh.module.video.vmanager.bean.ErrorCode; import com.baomidou.mybatisplus.core.metadata.IPage; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import com.viewsh.framework.common.pojo.PageResult; +import com.viewsh.module.video.framework.util.VideoContextUtils; import com.google.common.base.CaseFormat; import lombok.extern.slf4j.Slf4j; import no.ecc.vectortile.VectorTileEncoder; @@ -86,6 +87,12 @@ public class GbChannelServiceImpl implements IGbChannelService, CommandLineRunne @Override public void run(String... args) throws Exception { + // 启动时主线程没有租户/项目上下文,需要跨租户加载通道以重建抽稀图层 + // 参照 DeviceServiceImpl.run 的写法,统一用 executeIgnore 包裹,避免 ProjectContextHolder 抛错阻断启动 + VideoContextUtils.executeIgnoreTenantAndProject(this::rebuildVectorTileOnStartup); + } + + private void rebuildVectorTileOnStartup() { // 启动时重新发布抽稀图层 List channelList = commonGBChannelMapper.queryAllWithPosition(); Map> zoomCameraMap = new ConcurrentHashMap<>(); diff --git a/viewsh-module-video/viewsh-module-video-server/src/main/java/com/viewsh/module/video/gb28181/service/impl/PlatformServiceImpl.java b/viewsh-module-video/viewsh-module-video-server/src/main/java/com/viewsh/module/video/gb28181/service/impl/PlatformServiceImpl.java index e059cf7b..b6b35f0f 100644 --- a/viewsh-module-video/viewsh-module-video-server/src/main/java/com/viewsh/module/video/gb28181/service/impl/PlatformServiceImpl.java +++ b/viewsh-module-video/viewsh-module-video-server/src/main/java/com/viewsh/module/video/gb28181/service/impl/PlatformServiceImpl.java @@ -36,6 +36,7 @@ import com.viewsh.module.video.vmanager.bean.ErrorCode; import com.baomidou.mybatisplus.core.metadata.IPage; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import com.viewsh.framework.common.pojo.PageResult; +import com.viewsh.module.video.framework.util.VideoContextUtils; import gov.nist.javax.sip.message.SIPResponse; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; @@ -115,7 +116,12 @@ public class PlatformServiceImpl implements IPlatformService, CommandLineRunner @Override public void run(String... args) throws Exception { + // 启动时主线程没有租户/项目上下文,统一跨租户清理级联状态 + // 参照 DeviceServiceImpl.run 的写法,避免 ProjectContextHolder 抛错阻断启动 + VideoContextUtils.executeIgnoreTenantAndProject(this::initOnStartup); + } + private void initOnStartup() { // 查找国标推流 List sendRtpItems = redisCatchStorage.queryAllSendRTPServer(); if (!sendRtpItems.isEmpty()) { diff --git a/viewsh-module-video/viewsh-module-video-server/src/main/java/com/viewsh/module/video/service/impl/MobilePositionServiceImpl.java b/viewsh-module-video/viewsh-module-video-server/src/main/java/com/viewsh/module/video/service/impl/MobilePositionServiceImpl.java index 48fbe079..91ea2d9a 100644 --- a/viewsh-module-video/viewsh-module-video-server/src/main/java/com/viewsh/module/video/service/impl/MobilePositionServiceImpl.java +++ b/viewsh-module-video/viewsh-module-video-server/src/main/java/com/viewsh/module/video/service/impl/MobilePositionServiceImpl.java @@ -10,11 +10,13 @@ import com.viewsh.module.video.gb28181.dao.DeviceMapper; import com.viewsh.module.video.gb28181.dao.DeviceMobilePositionMapper; import com.viewsh.module.video.gb28181.dao.PlatformMapper; import com.viewsh.module.video.gb28181.utils.Coordtransform; +import com.viewsh.module.video.framework.util.VideoContextUtils; import com.viewsh.module.video.service.IMobilePositionService; import com.viewsh.module.video.utils.DateUtil; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.context.annotation.Lazy; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service; @@ -49,6 +51,15 @@ public class MobilePositionServiceImpl implements IMobilePositionService { @Qualifier("videoRedisTemplateForMobilePosition") private RedisTemplate redisTemplate; + /** + * 自身代理。@Scheduled 入口方法里从 Redis 拉取队列后,需要调用 {@link #persistPositions(List)} 完成 + * 落库,这一步必须走代理(self.persistPositions)才能让方法上的 @Transactional 生效; + * 直接 this.persistPositions 是 Spring 自调用,事务注解会被绕过。 + */ + @Autowired + @Lazy + private MobilePositionServiceImpl self; + private final String REDIS_MOBILE_POSITION_LIST = "redis_mobile_position_list"; @Override @@ -95,13 +106,27 @@ public class MobilePositionServiceImpl implements IMobilePositionService { } @Scheduled(fixedDelay = 1000) - @Transactional public void executeTaskQueue() { - int countLimit = 3000; - List mobilePositions = get(countLimit); - if (mobilePositions == null || mobilePositions.isEmpty()) { - return; - } + // 定时任务线程没有租户/项目上下文,移动位置属于跨租户跨项目批量处理。 + // 先从 Redis 拉队列(不需要事务),空队列直接返回,避免每秒一次的空事务; + // 拿到数据后再通过 self 代理进入 @Transactional 方法,保证 batchadd + 通道位置批量更新原子。 + VideoContextUtils.executeIgnoreTenantAndProject(() -> { + int countLimit = 3000; + List mobilePositions = get(countLimit); + if (mobilePositions == null || mobilePositions.isEmpty()) { + return; + } + self.persistPositions(mobilePositions); + }); + } + + /** + * 落库 + 批量更新通道位置。必须通过 self 代理调用,否则 @Transactional 不生效。 + *

把 mobilePositionMapper.batchadd 和 channelMapper.batchUpdatePosition 放进同一个事务, + * 避免"轨迹表写入成功但通道位置未更新"的数据不一致。

+ */ + @Transactional + public void persistPositions(List mobilePositions) { if (userSetting.getSavePositionHistory()) { mobilePositionMapper.batchadd(mobilePositions); } diff --git a/viewsh-module-video/viewsh-module-video-server/src/main/java/com/viewsh/module/video/service/impl/RecordPlanServiceImpl.java b/viewsh-module-video/viewsh-module-video-server/src/main/java/com/viewsh/module/video/service/impl/RecordPlanServiceImpl.java index 913067d1..dc10f878 100644 --- a/viewsh-module-video/viewsh-module-video-server/src/main/java/com/viewsh/module/video/service/impl/RecordPlanServiceImpl.java +++ b/viewsh-module-video/viewsh-module-video-server/src/main/java/com/viewsh/module/video/service/impl/RecordPlanServiceImpl.java @@ -18,6 +18,7 @@ import com.viewsh.module.video.vmanager.bean.ErrorCode; import com.baomidou.mybatisplus.core.metadata.IPage; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import com.viewsh.framework.common.pojo.PageResult; +import com.viewsh.module.video.framework.util.VideoContextUtils; import com.google.common.base.Joiner; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; @@ -82,6 +83,11 @@ public class RecordPlanServiceImpl implements IRecordPlanService { @Scheduled(fixedRate = 1, timeUnit = TimeUnit.MINUTES) public void execution() { + // 定时任务线程没有租户/项目上下文,录像计划需要跨租户跨项目执行 + VideoContextUtils.executeIgnoreTenantAndProject(this::doExecution); + } + + private void doExecution() { // 查询现在需要录像的通道Id List startChannelIdList = queryCurrentChannelRecord(); diff --git a/viewsh-module-video/viewsh-module-video-server/src/main/java/com/viewsh/module/video/service/redisMsg/RedisPushStreamStatusMsgListener.java b/viewsh-module-video/viewsh-module-video-server/src/main/java/com/viewsh/module/video/service/redisMsg/RedisPushStreamStatusMsgListener.java index bae1faf0..bfab8ad1 100644 --- a/viewsh-module-video/viewsh-module-video-server/src/main/java/com/viewsh/module/video/service/redisMsg/RedisPushStreamStatusMsgListener.java +++ b/viewsh-module-video/viewsh-module-video-server/src/main/java/com/viewsh/module/video/service/redisMsg/RedisPushStreamStatusMsgListener.java @@ -2,6 +2,7 @@ package com.viewsh.module.video.service.redisMsg; import com.alibaba.fastjson2.JSON; import com.viewsh.module.video.common.VideoManagerConstants; +import com.viewsh.module.video.framework.util.VideoContextUtils; import com.viewsh.module.video.framework.config.DynamicTask; import com.viewsh.module.video.framework.config.UserSetting; import com.viewsh.module.video.service.bean.PushStreamStatusChangeFromRedisDto; @@ -102,6 +103,11 @@ public class RedisPushStreamStatusMsgListener implements MessageListener, Applic @Override public void run(ApplicationArguments args) throws Exception { + // 启动时主线程没有租户/项目上下文,查询推流设备需要跨租户;与 DeviceServiceImpl.run 对齐 + VideoContextUtils.executeIgnoreTenantAndProject(this::initOnStartup); + } + + private void initOnStartup() { if (userSetting.getUsePushingAsStatus()) { return; } @@ -112,6 +118,8 @@ public class RedisPushStreamStatusMsgListener implements MessageListener, Applic } // 启动时设置所有推流通道离线,发起查询请求 redisCatchStorage.sendStreamPushRequestedMsgForStatus(); + // DynamicTask 内部已用 TtlRunnable 包装,会自动把当前线程(此刻处于 executeIgnore 作用域内) + // 的 TenantContextHolder/ProjectContextHolder 快照透传到 5s 后的调度线程 dynamicTask.startDelay(VideoManagerConstants.VM_MSG_GET_ALL_ONLINE_REQUESTED, () -> { log.info("[REDIS消息]未收到redis回复推流设备状态,执行推流设备离线"); // 五秒收不到请求就设置通道离线,然后通知上级离线 diff --git a/viewsh-module-video/viewsh-module-video-server/src/main/java/com/viewsh/module/video/streamProxy/service/impl/StreamProxyServiceImpl.java b/viewsh-module-video/viewsh-module-video-server/src/main/java/com/viewsh/module/video/streamProxy/service/impl/StreamProxyServiceImpl.java index 2e407087..c808ea2a 100644 --- a/viewsh-module-video/viewsh-module-video-server/src/main/java/com/viewsh/module/video/streamProxy/service/impl/StreamProxyServiceImpl.java +++ b/viewsh-module-video/viewsh-module-video-server/src/main/java/com/viewsh/module/video/streamProxy/service/impl/StreamProxyServiceImpl.java @@ -28,6 +28,7 @@ import com.viewsh.module.video.vmanager.bean.ResourceBaseInfo; import com.baomidou.mybatisplus.core.metadata.IPage; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import com.viewsh.framework.common.pojo.PageResult; +import com.viewsh.module.video.framework.util.VideoContextUtils; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.event.EventListener; @@ -88,9 +89,11 @@ public class StreamProxyServiceImpl implements IStreamProxyService { @Transactional @org.springframework.context.event.EventListener public void onApplicationEvent(MediaArrivalEvent event) { - if ("rtsp".equals(event.getSchema())) { - streamChangeHandler(event.getApp(), event.getStream(), event.getMediaServer().getId(), true); - } + VideoContextUtils.executeIgnoreTenantAndProject(() -> { + if ("rtsp".equals(event.getSchema())) { + streamChangeHandler(event.getApp(), event.getStream(), event.getMediaServer().getId(), true); + } + }); } /** @@ -100,9 +103,11 @@ public class StreamProxyServiceImpl implements IStreamProxyService { @EventListener @Transactional public void onApplicationEvent(MediaDepartureEvent event) { - if ("rtsp".equals(event.getSchema())) { - streamChangeHandler(event.getApp(), event.getStream(), event.getMediaServer().getId(), false); - } + VideoContextUtils.executeIgnoreTenantAndProject(() -> { + if ("rtsp".equals(event.getSchema())) { + streamChangeHandler(event.getApp(), event.getStream(), event.getMediaServer().getId(), false); + } + }); } /** @@ -111,16 +116,18 @@ public class StreamProxyServiceImpl implements IStreamProxyService { @Async("taskExecutor") @EventListener public void onApplicationEvent(MediaNotFoundEvent event) { - if ("rtp".equals(event.getApp())) { - return; - } - // 拉流代理 - StreamProxy streamProxyByAppAndStream = getStreamProxyByAppAndStream(event.getApp(), event.getStream()); - if (streamProxyByAppAndStream != null && streamProxyByAppAndStream.isEnableDisableNoneReader()) { - startByAppAndStream(event.getApp(), event.getStream(), ((code, msg, data) -> { - log.info("[拉流代理] 自动点播成功, app: {}, stream: {}", event.getApp(), event.getStream()); - })); - } + VideoContextUtils.executeIgnoreTenantAndProject(() -> { + if ("rtp".equals(event.getApp())) { + return; + } + // 拉流代理 + StreamProxy streamProxyByAppAndStream = getStreamProxyByAppAndStream(event.getApp(), event.getStream()); + if (streamProxyByAppAndStream != null && streamProxyByAppAndStream.isEnableDisableNoneReader()) { + startByAppAndStream(event.getApp(), event.getStream(), ((code, msg, data) -> { + log.info("[拉流代理] 自动点播成功, app: {}, stream: {}", event.getApp(), event.getStream()); + })); + } + }); } /** @@ -130,7 +137,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService { @EventListener @Transactional public void onApplicationEvent(MediaServerOnlineEvent event) { - zlmServerOnline(event.getMediaServer()); + VideoContextUtils.executeIgnoreTenantAndProject(() -> zlmServerOnline(event.getMediaServer())); } /** @@ -140,7 +147,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService { @EventListener @Transactional public void onApplicationEvent(MediaServerOfflineEvent event) { - zlmServerOffline(event.getMediaServer()); + VideoContextUtils.executeIgnoreTenantAndProject(() -> zlmServerOffline(event.getMediaServer())); } diff --git a/viewsh-module-video/viewsh-module-video-server/src/main/java/com/viewsh/module/video/streamPush/service/impl/StreamPushServiceImpl.java b/viewsh-module-video/viewsh-module-video-server/src/main/java/com/viewsh/module/video/streamPush/service/impl/StreamPushServiceImpl.java index 7bdb043a..f75cf556 100644 --- a/viewsh-module-video/viewsh-module-video-server/src/main/java/com/viewsh/module/video/streamPush/service/impl/StreamPushServiceImpl.java +++ b/viewsh-module-video/viewsh-module-video-server/src/main/java/com/viewsh/module/video/streamPush/service/impl/StreamPushServiceImpl.java @@ -27,6 +27,7 @@ import com.viewsh.module.video.vmanager.bean.ResourceBaseInfo; import com.baomidou.mybatisplus.core.metadata.IPage; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import com.viewsh.framework.common.pojo.PageResult; +import com.viewsh.module.video.framework.util.VideoContextUtils; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.event.EventListener; @@ -68,50 +69,52 @@ public class StreamPushServiceImpl implements IStreamPushService { @EventListener @Transactional public void onApplicationEvent(MediaArrivalEvent event) { - MediaInfo mediaInfo = event.getMediaInfo(); - if (mediaInfo == null) { - return; - } - if (mediaInfo.getOriginType() != OriginType.RTMP_PUSH.ordinal() - && mediaInfo.getOriginType() != OriginType.RTSP_PUSH.ordinal() - && mediaInfo.getOriginType() != OriginType.RTC_PUSH.ordinal()) { - return; - } + VideoContextUtils.executeIgnoreTenantAndProject(() -> { + MediaInfo mediaInfo = event.getMediaInfo(); + if (mediaInfo == null) { + return; + } + if (mediaInfo.getOriginType() != OriginType.RTMP_PUSH.ordinal() + && mediaInfo.getOriginType() != OriginType.RTSP_PUSH.ordinal() + && mediaInfo.getOriginType() != OriginType.RTC_PUSH.ordinal()) { + return; + } - StreamAuthorityInfo streamAuthorityInfo = redisCatchStorage.getStreamAuthorityInfo(event.getApp(), event.getStream()); - if (streamAuthorityInfo == null) { - streamAuthorityInfo = StreamAuthorityInfo.getInstanceByHook(event); - } else { - streamAuthorityInfo.setOriginType(mediaInfo.getOriginType()); - } - redisCatchStorage.updateStreamAuthorityInfo(event.getApp(), event.getStream(), streamAuthorityInfo); + StreamAuthorityInfo streamAuthorityInfo = redisCatchStorage.getStreamAuthorityInfo(event.getApp(), event.getStream()); + if (streamAuthorityInfo == null) { + streamAuthorityInfo = StreamAuthorityInfo.getInstanceByHook(event); + } else { + streamAuthorityInfo.setOriginType(mediaInfo.getOriginType()); + } + redisCatchStorage.updateStreamAuthorityInfo(event.getApp(), event.getStream(), streamAuthorityInfo); - StreamPush streamPushInDb = getPush(event.getApp(), event.getStream()); - if (streamPushInDb == null) { - StreamPush streamPush = StreamPush.getInstance(event, userSetting.getServerId()); - streamPush.setPushing(true); - streamPush.setServerId(userSetting.getServerId()); - streamPush.setPushTime(DateUtil.getNow()); - add(streamPush); - }else { - streamPushInDb.setPushing(true); - streamPushInDb.setServerId(userSetting.getServerId()); - streamPushInDb.setMediaServerId(mediaInfo.getMediaServer().getId()); - updatePushStatus(streamPushInDb); - } - // 冗余数据,自己系统中自用 - if (!"broadcast".equals(event.getApp()) && !"talk".equals(event.getApp())) { - redisCatchStorage.addPushListItem(event.getApp(), event.getStream(), event.getMediaInfo()); - } + StreamPush streamPushInDb = getPush(event.getApp(), event.getStream()); + if (streamPushInDb == null) { + StreamPush streamPush = StreamPush.getInstance(event, userSetting.getServerId()); + streamPush.setPushing(true); + streamPush.setServerId(userSetting.getServerId()); + streamPush.setPushTime(DateUtil.getNow()); + add(streamPush); + }else { + streamPushInDb.setPushing(true); + streamPushInDb.setServerId(userSetting.getServerId()); + streamPushInDb.setMediaServerId(mediaInfo.getMediaServer().getId()); + updatePushStatus(streamPushInDb); + } + // 冗余数据,自己系统中自用 + if (!"broadcast".equals(event.getApp()) && !"talk".equals(event.getApp())) { + redisCatchStorage.addPushListItem(event.getApp(), event.getStream(), event.getMediaInfo()); + } - // 发送流变化redis消息 - JSONObject jsonObject = new JSONObject(); - jsonObject.put("serverId", userSetting.getServerId()); - jsonObject.put("app", event.getApp()); - jsonObject.put("stream", event.getStream()); - jsonObject.put("register", true); - jsonObject.put("mediaServerId", event.getMediaServer().getId()); - redisCatchStorage.sendStreamChangeMsg(OriginType.values()[event.getMediaInfo().getOriginType()].getType(), jsonObject); + // 发送流变化redis消息 + JSONObject jsonObject = new JSONObject(); + jsonObject.put("serverId", userSetting.getServerId()); + jsonObject.put("app", event.getApp()); + jsonObject.put("stream", event.getStream()); + jsonObject.put("register", true); + jsonObject.put("mediaServerId", event.getMediaServer().getId()); + redisCatchStorage.sendStreamChangeMsg(OriginType.values()[event.getMediaInfo().getOriginType()].getType(), jsonObject); + }); } /** @@ -121,34 +124,35 @@ public class StreamPushServiceImpl implements IStreamPushService { @EventListener @Transactional public void onApplicationEvent(MediaDepartureEvent event) { + VideoContextUtils.executeIgnoreTenantAndProject(() -> { + // 兼容流注销时类型从redis记录获取 + MediaInfo mediaInfo = redisCatchStorage.getPushListItem(event.getApp(), event.getStream()); - // 兼容流注销时类型从redis记录获取 - MediaInfo mediaInfo = redisCatchStorage.getPushListItem(event.getApp(), event.getStream()); - - if (mediaInfo != null) { - log.info("[推流信息] 查询到redis存在推流缓存, 开始清理,{}/{}", event.getApp(), event.getStream()); - String type = OriginType.values()[mediaInfo.getOriginType()].getType(); - // 冗余数据,自己系统中自用 - redisCatchStorage.removePushListItem(event.getApp(), event.getStream(), event.getMediaServer().getId()); - // 发送流变化redis消息 - JSONObject jsonObject = new JSONObject(); - jsonObject.put("serverId", userSetting.getServerId()); - jsonObject.put("app", event.getApp()); - jsonObject.put("stream", event.getStream()); - jsonObject.put("register", false); - jsonObject.put("mediaServerId", event.getMediaServer().getId()); - redisCatchStorage.sendStreamChangeMsg(type, jsonObject); - } - StreamPush streamPush = getPush(event.getApp(), event.getStream()); - if (streamPush == null) { - return; - } - if (streamPush.getGbDeviceId() != null) { - streamPush.setPushing(false); - updatePushStatus(streamPush); - }else { - deleteByAppAndStream(event.getApp(), event.getStream()); - } + if (mediaInfo != null) { + log.info("[推流信息] 查询到redis存在推流缓存, 开始清理,{}/{}", event.getApp(), event.getStream()); + String type = OriginType.values()[mediaInfo.getOriginType()].getType(); + // 冗余数据,自己系统中自用 + redisCatchStorage.removePushListItem(event.getApp(), event.getStream(), event.getMediaServer().getId()); + // 发送流变化redis消息 + JSONObject jsonObject = new JSONObject(); + jsonObject.put("serverId", userSetting.getServerId()); + jsonObject.put("app", event.getApp()); + jsonObject.put("stream", event.getStream()); + jsonObject.put("register", false); + jsonObject.put("mediaServerId", event.getMediaServer().getId()); + redisCatchStorage.sendStreamChangeMsg(type, jsonObject); + } + StreamPush streamPush = getPush(event.getApp(), event.getStream()); + if (streamPush == null) { + return; + } + if (streamPush.getGbDeviceId() != null) { + streamPush.setPushing(false); + updatePushStatus(streamPush); + }else { + deleteByAppAndStream(event.getApp(), event.getStream()); + } + }); } /** @@ -158,7 +162,7 @@ public class StreamPushServiceImpl implements IStreamPushService { @EventListener @Transactional public void onApplicationEvent(MediaServerOnlineEvent event) { - zlmServerOnline(event.getMediaServer()); + VideoContextUtils.executeIgnoreTenantAndProject(() -> zlmServerOnline(event.getMediaServer())); } /** @@ -168,7 +172,7 @@ public class StreamPushServiceImpl implements IStreamPushService { @EventListener @Transactional public void onApplicationEvent(MediaServerOfflineEvent event) { - zlmServerOffline(event.getMediaServer()); + VideoContextUtils.executeIgnoreTenantAndProject(() -> zlmServerOffline(event.getMediaServer())); } @Override