feat(video): 适配多租户上下文透传 + 定时任务 TTL 装饰 + 生命周期收口
问题背景:
- WVP 原生代码的 SIP / @Scheduled / Hook EventListener 都跑在没有
TenantContextHolder / ProjectContextHolder 的线程上,接入多租户
框架后会漏 tenant_id / project_id 过滤或抛 NPE。
- 框架默认 @EnableAsync 走 JDK 动态代理,但 WVP 代码大量"按具体类
注入 / 按具体类查方法",JDK 代理下会 cast/查方法失败。
本次改动:
- VideoServerApplication 排除框架 ViewshAsyncAutoConfiguration,
由 ThreadPoolTaskConfig 本地 @EnableAsync(proxyTargetClass=true)
+ TtlRunnable 装饰器接管(从框架抄一份 BeanPostProcessor)。
- DynamicTask 的 scheduleAtFixedRate / schedule 入口都用 TtlRunnable
包一层,把调用线程的 TTL 快照透传到调度线程;新增 @PreDestroy
在容器关停时取消 future、shutdown scheduler、记录耗时,
避免关停阶段调度线程继续借 DataSource 触发噪音日志。
- 新增 VideoContextUtils.executeIgnoreTenantAndProject,
统一封装 TenantUtils.executeIgnore(ProjectUtils.executeIgnore(…))
的嵌套模板,覆盖启动初始化 / 定时任务 / Hook 事件 / Redis 消息
四种场景共 8 个调用点。
- MobilePositionServiceImpl 拆出 @Transactional persistPositions,
通过 @Lazy self 代理调用,避免外层方法 this 自调用让事务失效;
外层只做 Redis 拉队列 + 空队列短路,不再开空事务。
- StreamProxyServiceImpl / StreamPushServiceImpl 的 @EventListener
在业务体内包裹 executeIgnoreTenantAndProject,保证 ZLM Hook
到来时仍能跨租户落库。
This commit is contained in:
@@ -1,12 +1,27 @@
|
||||
package com.viewsh.module.video;
|
||||
|
||||
import com.viewsh.framework.quartz.config.ViewshAsyncAutoConfiguration;
|
||||
import org.springframework.boot.SpringApplication;
|
||||
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||||
|
||||
/**
|
||||
* Video 视频模块的启动类
|
||||
*
|
||||
* <p>排除框架的 {@link ViewshAsyncAutoConfiguration},因为它用的是默认
|
||||
* {@code @EnableAsync}(JDK 动态代理)。在 WVP 迁移过来的 SIP/EventListener/@Async 代码里,
|
||||
* 大量 Bean 是按"具体类注入 + 按具体类查方法"的写法(而不是面向接口),
|
||||
* JDK 代理会报 "cannot be cast to concrete class" / 找不到方法等异常。</p>
|
||||
*
|
||||
* <p>改由 {@link com.viewsh.module.video.framework.config.ThreadPoolTaskConfig} 里的
|
||||
* {@code @EnableAsync(proxyTargetClass = true)} + TtlRunnable 装饰器接管,
|
||||
* 同时那里复制了框架的 threadPoolTaskExecutorBeanPostProcessor 逻辑。</p>
|
||||
*
|
||||
* <p>TODO(框架侧):在 {@link ViewshAsyncAutoConfiguration} 上加
|
||||
* {@code @ConditionalOnProperty} 开关(例如 {@code viewsh.async.enabled})并把
|
||||
* {@code @EnableAsync} 改成 {@code proxyTargetClass = true},
|
||||
* 这样 video 模块只需要关一下开关,不用在业务模块里抄代码。</p>
|
||||
*/
|
||||
@SpringBootApplication
|
||||
@SpringBootApplication(exclude = ViewshAsyncAutoConfiguration.class)
|
||||
public class VideoServerApplication {
|
||||
|
||||
public static void main(String[] args) {
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
package com.viewsh.module.video.framework.config;
|
||||
|
||||
import com.alibaba.ttl.TtlRunnable;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.lang3.ObjectUtils;
|
||||
import org.springframework.scheduling.annotation.Scheduled;
|
||||
@@ -7,6 +8,7 @@ import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import jakarta.annotation.PostConstruct;
|
||||
import jakarta.annotation.PreDestroy;
|
||||
import java.time.Instant;
|
||||
import java.util.Date;
|
||||
import java.util.Map;
|
||||
@@ -38,6 +40,32 @@ public class DynamicTask {
|
||||
threadPoolTaskScheduler.initialize();
|
||||
}
|
||||
|
||||
/**
|
||||
* 上下文关闭时显式 shutdown 调度器。
|
||||
* <p>threadPoolTaskScheduler 是 @PostConstruct 手工 new 出来的(不是 Spring Bean),
|
||||
* 所以 setWaitForTasksToCompleteOnShutdown / setAwaitTerminationSeconds 不会被 Spring 自动触发。
|
||||
* 如果不加这段,关闭后本调度器的任务仍会继续调度数据库查询,
|
||||
* 而 DataSource 已先关闭,产生 CannotGetJdbcConnectionException 噪音日志。</p>
|
||||
*/
|
||||
@PreDestroy
|
||||
public void destroy() {
|
||||
if (threadPoolTaskScheduler == null) {
|
||||
return;
|
||||
}
|
||||
int pending = futureMap.size();
|
||||
log.info("[动态任务] 关闭调度器,待清理任务数 {}", pending);
|
||||
long startNanos = System.nanoTime();
|
||||
// 先取消所有调度,避免 shutdown 等待间隙中再触发一次
|
||||
futureMap.values().forEach(future -> future.cancel(false));
|
||||
futureMap.clear();
|
||||
runnableMap.clear();
|
||||
// shutdown 会遵守上面 setWaitForTasksToCompleteOnShutdown(true) + awaitTermination 10s
|
||||
threadPoolTaskScheduler.shutdown();
|
||||
long elapsedMs = (System.nanoTime() - startNanos) / 1_000_000L;
|
||||
// 如果接近 awaitTerminationSeconds(10) 的 10 000ms,说明有长任务压根没按期退出,需要排查
|
||||
log.info("[动态任务] 调度器关闭完成,用时 {} ms,已清理任务数 {}", elapsedMs, pending);
|
||||
}
|
||||
|
||||
/**
|
||||
* 循环执行的任务
|
||||
* @param key 任务ID
|
||||
@@ -59,8 +87,11 @@ public class DynamicTask {
|
||||
}
|
||||
}
|
||||
// scheduleWithFixedDelay 必须等待上一个任务结束才开始计时period, cycleForCatalog表示执行的间隔
|
||||
|
||||
future = threadPoolTaskScheduler.scheduleAtFixedRate(task, new Date(System.currentTimeMillis() + cycleForCatalog), cycleForCatalog);
|
||||
// 用 TtlRunnable 包一层:捕获提交时调用线程的 TransmittableThreadLocal 快照
|
||||
// (包括 TenantContextHolder/ProjectContextHolder 的 tenantId/projectId/ignore 四个字段),
|
||||
// 在调度线程执行时自动还原,解决"调用方有租户上下文,但 5s/10s 后回调线程没有"这类问题。
|
||||
// 注意:runnableMap 仍然放原始 task,保证 get(key) 语义不变
|
||||
future = threadPoolTaskScheduler.scheduleAtFixedRate(TtlRunnable.get(task), new Date(System.currentTimeMillis() + cycleForCatalog), cycleForCatalog);
|
||||
if (future != null){
|
||||
futureMap.put(key, future);
|
||||
runnableMap.put(key, task);
|
||||
@@ -96,7 +127,8 @@ public class DynamicTask {
|
||||
}
|
||||
}
|
||||
// scheduleWithFixedDelay 必须等待上一个任务结束才开始计时period, cycleForCatalog表示执行的间隔
|
||||
future = threadPoolTaskScheduler.schedule(task, startInstant);
|
||||
// 同 startCron:用 TtlRunnable 透传调用线程的 TTL 快照,避免 delay 回调在调度线程丢上下文
|
||||
future = threadPoolTaskScheduler.schedule(TtlRunnable.get(task), startInstant);
|
||||
if (future != null){
|
||||
futureMap.put(key, future);
|
||||
runnableMap.put(key, task);
|
||||
|
||||
@@ -1,8 +1,12 @@
|
||||
package com.viewsh.module.video.framework.config;
|
||||
|
||||
import com.alibaba.ttl.TtlRunnable;
|
||||
import org.springframework.beans.BeansException;
|
||||
import org.springframework.beans.factory.config.BeanPostProcessor;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.core.annotation.Order;
|
||||
import org.springframework.core.task.SimpleAsyncTaskExecutor;
|
||||
import org.springframework.scheduling.annotation.EnableAsync;
|
||||
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
|
||||
|
||||
@@ -64,4 +68,28 @@ public class ThreadPoolTaskConfig {
|
||||
executor.initialize();
|
||||
return executor;
|
||||
}
|
||||
|
||||
/**
|
||||
* 给线程池中的任务包装 TtlRunnable,保证 TransmittableThreadLocal 透传
|
||||
* (从框架 ViewshAsyncAutoConfiguration 迁移过来,配合主类 exclude 使用)
|
||||
*/
|
||||
@Bean
|
||||
public BeanPostProcessor threadPoolTaskExecutorBeanPostProcessor() {
|
||||
return new BeanPostProcessor() {
|
||||
|
||||
@Override
|
||||
public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
|
||||
if (bean instanceof ThreadPoolTaskExecutor executor) {
|
||||
executor.setTaskDecorator(TtlRunnable::get);
|
||||
return executor;
|
||||
}
|
||||
if (bean instanceof SimpleAsyncTaskExecutor executor) {
|
||||
executor.setTaskDecorator(TtlRunnable::get);
|
||||
return executor;
|
||||
}
|
||||
return bean;
|
||||
}
|
||||
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,42 @@
|
||||
package com.viewsh.module.video.framework.util;
|
||||
|
||||
import com.viewsh.framework.tenant.core.util.ProjectUtils;
|
||||
import com.viewsh.framework.tenant.core.util.TenantUtils;
|
||||
|
||||
import java.util.concurrent.Callable;
|
||||
|
||||
/**
|
||||
* Video 模块专用的上下文执行工具。
|
||||
*
|
||||
* <p>Video 模块的定时任务、启动初始化、Hook 事件回调等场景都跑在没有租户/项目上下文的线程里,
|
||||
* 需要跨租户、跨项目操作数据。之前在各处重复写
|
||||
* <pre>
|
||||
* TenantUtils.executeIgnore(() -> ProjectUtils.executeIgnore(() -> { ... }));
|
||||
* </pre>
|
||||
* 现在统一收到这里,读写都更容易。</p>
|
||||
*
|
||||
* <p>不要把这段逻辑往框架层 {@code TenantUtils} 里塞——
|
||||
* 是否要同时 ignore project 是业务决策(Ops 模块里不用 project 概念时就不该强推),
|
||||
* 所以留在业务模块里。</p>
|
||||
*/
|
||||
public final class VideoContextUtils {
|
||||
|
||||
private VideoContextUtils() {}
|
||||
|
||||
/**
|
||||
* 同时 ignore 租户和项目上下文执行 runnable。
|
||||
*
|
||||
* <p>语义等同于 {@code TenantUtils.executeIgnore(() -> ProjectUtils.executeIgnore(runnable))},
|
||||
* 注意嵌套顺序:外层 tenant、内层 project,保持与原手写代码一致,避免拦截器读取顺序变化带来隐性 bug。</p>
|
||||
*/
|
||||
public static void executeIgnoreTenantAndProject(Runnable runnable) {
|
||||
TenantUtils.executeIgnore(() -> ProjectUtils.executeIgnore(runnable));
|
||||
}
|
||||
|
||||
/**
|
||||
* 带返回值版本,方便给 {@code queryXxx} 这类读操作用。
|
||||
*/
|
||||
public static <V> V executeIgnoreTenantAndProject(Callable<V> callable) {
|
||||
return TenantUtils.executeIgnore(() -> ProjectUtils.executeIgnore(callable));
|
||||
}
|
||||
}
|
||||
@@ -39,6 +39,7 @@ import com.viewsh.module.video.vmanager.bean.WVPResult;
|
||||
import com.baomidou.mybatisplus.core.metadata.IPage;
|
||||
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
|
||||
import com.viewsh.framework.common.pojo.PageResult;
|
||||
import com.viewsh.module.video.framework.util.VideoContextUtils;
|
||||
import gov.nist.javax.sip.message.SIPResponse;
|
||||
import jakarta.validation.constraints.NotNull;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
@@ -126,7 +127,11 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner {
|
||||
|
||||
@Override
|
||||
public void run(String... args) throws Exception {
|
||||
// 启动时主线程没有租户/项目上下文,需要全量加载/巡检所有设备(跨租户跨项目)
|
||||
VideoContextUtils.executeIgnoreTenantAndProject(this::initOnStartup);
|
||||
}
|
||||
|
||||
private void initOnStartup() {
|
||||
// 清理数据库不存在但是redis中存在的数据
|
||||
List<Device> devicesInDb = getAll();
|
||||
if (devicesInDb.isEmpty()) {
|
||||
|
||||
@@ -25,6 +25,7 @@ import com.viewsh.module.video.vmanager.bean.ErrorCode;
|
||||
import com.baomidou.mybatisplus.core.metadata.IPage;
|
||||
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
|
||||
import com.viewsh.framework.common.pojo.PageResult;
|
||||
import com.viewsh.module.video.framework.util.VideoContextUtils;
|
||||
import com.google.common.base.CaseFormat;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import no.ecc.vectortile.VectorTileEncoder;
|
||||
@@ -86,6 +87,12 @@ public class GbChannelServiceImpl implements IGbChannelService, CommandLineRunne
|
||||
|
||||
@Override
|
||||
public void run(String... args) throws Exception {
|
||||
// 启动时主线程没有租户/项目上下文,需要跨租户加载通道以重建抽稀图层
|
||||
// 参照 DeviceServiceImpl.run 的写法,统一用 executeIgnore 包裹,避免 ProjectContextHolder 抛错阻断启动
|
||||
VideoContextUtils.executeIgnoreTenantAndProject(this::rebuildVectorTileOnStartup);
|
||||
}
|
||||
|
||||
private void rebuildVectorTileOnStartup() {
|
||||
// 启动时重新发布抽稀图层
|
||||
List<CommonGBChannel> channelList = commonGBChannelMapper.queryAllWithPosition();
|
||||
Map<Integer, List<CommonGBChannel>> zoomCameraMap = new ConcurrentHashMap<>();
|
||||
|
||||
@@ -36,6 +36,7 @@ import com.viewsh.module.video.vmanager.bean.ErrorCode;
|
||||
import com.baomidou.mybatisplus.core.metadata.IPage;
|
||||
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
|
||||
import com.viewsh.framework.common.pojo.PageResult;
|
||||
import com.viewsh.module.video.framework.util.VideoContextUtils;
|
||||
import gov.nist.javax.sip.message.SIPResponse;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
@@ -115,7 +116,12 @@ public class PlatformServiceImpl implements IPlatformService, CommandLineRunner
|
||||
|
||||
@Override
|
||||
public void run(String... args) throws Exception {
|
||||
// 启动时主线程没有租户/项目上下文,统一跨租户清理级联状态
|
||||
// 参照 DeviceServiceImpl.run 的写法,避免 ProjectContextHolder 抛错阻断启动
|
||||
VideoContextUtils.executeIgnoreTenantAndProject(this::initOnStartup);
|
||||
}
|
||||
|
||||
private void initOnStartup() {
|
||||
// 查找国标推流
|
||||
List<SendRtpInfo> sendRtpItems = redisCatchStorage.queryAllSendRTPServer();
|
||||
if (!sendRtpItems.isEmpty()) {
|
||||
|
||||
@@ -10,11 +10,13 @@ import com.viewsh.module.video.gb28181.dao.DeviceMapper;
|
||||
import com.viewsh.module.video.gb28181.dao.DeviceMobilePositionMapper;
|
||||
import com.viewsh.module.video.gb28181.dao.PlatformMapper;
|
||||
import com.viewsh.module.video.gb28181.utils.Coordtransform;
|
||||
import com.viewsh.module.video.framework.util.VideoContextUtils;
|
||||
import com.viewsh.module.video.service.IMobilePositionService;
|
||||
import com.viewsh.module.video.utils.DateUtil;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.beans.factory.annotation.Qualifier;
|
||||
import org.springframework.context.annotation.Lazy;
|
||||
import org.springframework.data.redis.core.RedisTemplate;
|
||||
import org.springframework.scheduling.annotation.Scheduled;
|
||||
import org.springframework.stereotype.Service;
|
||||
@@ -49,6 +51,15 @@ public class MobilePositionServiceImpl implements IMobilePositionService {
|
||||
@Qualifier("videoRedisTemplateForMobilePosition")
|
||||
private RedisTemplate<String, MobilePosition> redisTemplate;
|
||||
|
||||
/**
|
||||
* 自身代理。@Scheduled 入口方法里从 Redis 拉取队列后,需要调用 {@link #persistPositions(List)} 完成
|
||||
* 落库,这一步必须走代理(self.persistPositions)才能让方法上的 @Transactional 生效;
|
||||
* 直接 this.persistPositions 是 Spring 自调用,事务注解会被绕过。
|
||||
*/
|
||||
@Autowired
|
||||
@Lazy
|
||||
private MobilePositionServiceImpl self;
|
||||
|
||||
private final String REDIS_MOBILE_POSITION_LIST = "redis_mobile_position_list";
|
||||
|
||||
@Override
|
||||
@@ -95,13 +106,27 @@ public class MobilePositionServiceImpl implements IMobilePositionService {
|
||||
}
|
||||
|
||||
@Scheduled(fixedDelay = 1000)
|
||||
@Transactional
|
||||
public void executeTaskQueue() {
|
||||
int countLimit = 3000;
|
||||
List<MobilePosition> mobilePositions = get(countLimit);
|
||||
if (mobilePositions == null || mobilePositions.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
// 定时任务线程没有租户/项目上下文,移动位置属于跨租户跨项目批量处理。
|
||||
// 先从 Redis 拉队列(不需要事务),空队列直接返回,避免每秒一次的空事务;
|
||||
// 拿到数据后再通过 self 代理进入 @Transactional 方法,保证 batchadd + 通道位置批量更新原子。
|
||||
VideoContextUtils.executeIgnoreTenantAndProject(() -> {
|
||||
int countLimit = 3000;
|
||||
List<MobilePosition> mobilePositions = get(countLimit);
|
||||
if (mobilePositions == null || mobilePositions.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
self.persistPositions(mobilePositions);
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* 落库 + 批量更新通道位置。必须通过 self 代理调用,否则 @Transactional 不生效。
|
||||
* <p>把 mobilePositionMapper.batchadd 和 channelMapper.batchUpdatePosition 放进同一个事务,
|
||||
* 避免"轨迹表写入成功但通道位置未更新"的数据不一致。</p>
|
||||
*/
|
||||
@Transactional
|
||||
public void persistPositions(List<MobilePosition> mobilePositions) {
|
||||
if (userSetting.getSavePositionHistory()) {
|
||||
mobilePositionMapper.batchadd(mobilePositions);
|
||||
}
|
||||
|
||||
@@ -18,6 +18,7 @@ import com.viewsh.module.video.vmanager.bean.ErrorCode;
|
||||
import com.baomidou.mybatisplus.core.metadata.IPage;
|
||||
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
|
||||
import com.viewsh.framework.common.pojo.PageResult;
|
||||
import com.viewsh.module.video.framework.util.VideoContextUtils;
|
||||
import com.google.common.base.Joiner;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
@@ -82,6 +83,11 @@ public class RecordPlanServiceImpl implements IRecordPlanService {
|
||||
|
||||
@Scheduled(fixedRate = 1, timeUnit = TimeUnit.MINUTES)
|
||||
public void execution() {
|
||||
// 定时任务线程没有租户/项目上下文,录像计划需要跨租户跨项目执行
|
||||
VideoContextUtils.executeIgnoreTenantAndProject(this::doExecution);
|
||||
}
|
||||
|
||||
private void doExecution() {
|
||||
// 查询现在需要录像的通道Id
|
||||
List<Long> startChannelIdList = queryCurrentChannelRecord();
|
||||
|
||||
|
||||
@@ -2,6 +2,7 @@ package com.viewsh.module.video.service.redisMsg;
|
||||
|
||||
import com.alibaba.fastjson2.JSON;
|
||||
import com.viewsh.module.video.common.VideoManagerConstants;
|
||||
import com.viewsh.module.video.framework.util.VideoContextUtils;
|
||||
import com.viewsh.module.video.framework.config.DynamicTask;
|
||||
import com.viewsh.module.video.framework.config.UserSetting;
|
||||
import com.viewsh.module.video.service.bean.PushStreamStatusChangeFromRedisDto;
|
||||
@@ -102,6 +103,11 @@ public class RedisPushStreamStatusMsgListener implements MessageListener, Applic
|
||||
|
||||
@Override
|
||||
public void run(ApplicationArguments args) throws Exception {
|
||||
// 启动时主线程没有租户/项目上下文,查询推流设备需要跨租户;与 DeviceServiceImpl.run 对齐
|
||||
VideoContextUtils.executeIgnoreTenantAndProject(this::initOnStartup);
|
||||
}
|
||||
|
||||
private void initOnStartup() {
|
||||
if (userSetting.getUsePushingAsStatus()) {
|
||||
return;
|
||||
}
|
||||
@@ -112,6 +118,8 @@ public class RedisPushStreamStatusMsgListener implements MessageListener, Applic
|
||||
}
|
||||
// 启动时设置所有推流通道离线,发起查询请求
|
||||
redisCatchStorage.sendStreamPushRequestedMsgForStatus();
|
||||
// DynamicTask 内部已用 TtlRunnable 包装,会自动把当前线程(此刻处于 executeIgnore 作用域内)
|
||||
// 的 TenantContextHolder/ProjectContextHolder 快照透传到 5s 后的调度线程
|
||||
dynamicTask.startDelay(VideoManagerConstants.VM_MSG_GET_ALL_ONLINE_REQUESTED, () -> {
|
||||
log.info("[REDIS消息]未收到redis回复推流设备状态,执行推流设备离线");
|
||||
// 五秒收不到请求就设置通道离线,然后通知上级离线
|
||||
|
||||
@@ -28,6 +28,7 @@ import com.viewsh.module.video.vmanager.bean.ResourceBaseInfo;
|
||||
import com.baomidou.mybatisplus.core.metadata.IPage;
|
||||
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
|
||||
import com.viewsh.framework.common.pojo.PageResult;
|
||||
import com.viewsh.module.video.framework.util.VideoContextUtils;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.context.event.EventListener;
|
||||
@@ -88,9 +89,11 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
|
||||
@Transactional
|
||||
@org.springframework.context.event.EventListener
|
||||
public void onApplicationEvent(MediaArrivalEvent event) {
|
||||
if ("rtsp".equals(event.getSchema())) {
|
||||
streamChangeHandler(event.getApp(), event.getStream(), event.getMediaServer().getId(), true);
|
||||
}
|
||||
VideoContextUtils.executeIgnoreTenantAndProject(() -> {
|
||||
if ("rtsp".equals(event.getSchema())) {
|
||||
streamChangeHandler(event.getApp(), event.getStream(), event.getMediaServer().getId(), true);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -100,9 +103,11 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
|
||||
@EventListener
|
||||
@Transactional
|
||||
public void onApplicationEvent(MediaDepartureEvent event) {
|
||||
if ("rtsp".equals(event.getSchema())) {
|
||||
streamChangeHandler(event.getApp(), event.getStream(), event.getMediaServer().getId(), false);
|
||||
}
|
||||
VideoContextUtils.executeIgnoreTenantAndProject(() -> {
|
||||
if ("rtsp".equals(event.getSchema())) {
|
||||
streamChangeHandler(event.getApp(), event.getStream(), event.getMediaServer().getId(), false);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -111,16 +116,18 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
|
||||
@Async("taskExecutor")
|
||||
@EventListener
|
||||
public void onApplicationEvent(MediaNotFoundEvent event) {
|
||||
if ("rtp".equals(event.getApp())) {
|
||||
return;
|
||||
}
|
||||
// 拉流代理
|
||||
StreamProxy streamProxyByAppAndStream = getStreamProxyByAppAndStream(event.getApp(), event.getStream());
|
||||
if (streamProxyByAppAndStream != null && streamProxyByAppAndStream.isEnableDisableNoneReader()) {
|
||||
startByAppAndStream(event.getApp(), event.getStream(), ((code, msg, data) -> {
|
||||
log.info("[拉流代理] 自动点播成功, app: {}, stream: {}", event.getApp(), event.getStream());
|
||||
}));
|
||||
}
|
||||
VideoContextUtils.executeIgnoreTenantAndProject(() -> {
|
||||
if ("rtp".equals(event.getApp())) {
|
||||
return;
|
||||
}
|
||||
// 拉流代理
|
||||
StreamProxy streamProxyByAppAndStream = getStreamProxyByAppAndStream(event.getApp(), event.getStream());
|
||||
if (streamProxyByAppAndStream != null && streamProxyByAppAndStream.isEnableDisableNoneReader()) {
|
||||
startByAppAndStream(event.getApp(), event.getStream(), ((code, msg, data) -> {
|
||||
log.info("[拉流代理] 自动点播成功, app: {}, stream: {}", event.getApp(), event.getStream());
|
||||
}));
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -130,7 +137,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
|
||||
@EventListener
|
||||
@Transactional
|
||||
public void onApplicationEvent(MediaServerOnlineEvent event) {
|
||||
zlmServerOnline(event.getMediaServer());
|
||||
VideoContextUtils.executeIgnoreTenantAndProject(() -> zlmServerOnline(event.getMediaServer()));
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -140,7 +147,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
|
||||
@EventListener
|
||||
@Transactional
|
||||
public void onApplicationEvent(MediaServerOfflineEvent event) {
|
||||
zlmServerOffline(event.getMediaServer());
|
||||
VideoContextUtils.executeIgnoreTenantAndProject(() -> zlmServerOffline(event.getMediaServer()));
|
||||
}
|
||||
|
||||
|
||||
|
||||
@@ -27,6 +27,7 @@ import com.viewsh.module.video.vmanager.bean.ResourceBaseInfo;
|
||||
import com.baomidou.mybatisplus.core.metadata.IPage;
|
||||
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
|
||||
import com.viewsh.framework.common.pojo.PageResult;
|
||||
import com.viewsh.module.video.framework.util.VideoContextUtils;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.context.event.EventListener;
|
||||
@@ -68,50 +69,52 @@ public class StreamPushServiceImpl implements IStreamPushService {
|
||||
@EventListener
|
||||
@Transactional
|
||||
public void onApplicationEvent(MediaArrivalEvent event) {
|
||||
MediaInfo mediaInfo = event.getMediaInfo();
|
||||
if (mediaInfo == null) {
|
||||
return;
|
||||
}
|
||||
if (mediaInfo.getOriginType() != OriginType.RTMP_PUSH.ordinal()
|
||||
&& mediaInfo.getOriginType() != OriginType.RTSP_PUSH.ordinal()
|
||||
&& mediaInfo.getOriginType() != OriginType.RTC_PUSH.ordinal()) {
|
||||
return;
|
||||
}
|
||||
VideoContextUtils.executeIgnoreTenantAndProject(() -> {
|
||||
MediaInfo mediaInfo = event.getMediaInfo();
|
||||
if (mediaInfo == null) {
|
||||
return;
|
||||
}
|
||||
if (mediaInfo.getOriginType() != OriginType.RTMP_PUSH.ordinal()
|
||||
&& mediaInfo.getOriginType() != OriginType.RTSP_PUSH.ordinal()
|
||||
&& mediaInfo.getOriginType() != OriginType.RTC_PUSH.ordinal()) {
|
||||
return;
|
||||
}
|
||||
|
||||
StreamAuthorityInfo streamAuthorityInfo = redisCatchStorage.getStreamAuthorityInfo(event.getApp(), event.getStream());
|
||||
if (streamAuthorityInfo == null) {
|
||||
streamAuthorityInfo = StreamAuthorityInfo.getInstanceByHook(event);
|
||||
} else {
|
||||
streamAuthorityInfo.setOriginType(mediaInfo.getOriginType());
|
||||
}
|
||||
redisCatchStorage.updateStreamAuthorityInfo(event.getApp(), event.getStream(), streamAuthorityInfo);
|
||||
StreamAuthorityInfo streamAuthorityInfo = redisCatchStorage.getStreamAuthorityInfo(event.getApp(), event.getStream());
|
||||
if (streamAuthorityInfo == null) {
|
||||
streamAuthorityInfo = StreamAuthorityInfo.getInstanceByHook(event);
|
||||
} else {
|
||||
streamAuthorityInfo.setOriginType(mediaInfo.getOriginType());
|
||||
}
|
||||
redisCatchStorage.updateStreamAuthorityInfo(event.getApp(), event.getStream(), streamAuthorityInfo);
|
||||
|
||||
StreamPush streamPushInDb = getPush(event.getApp(), event.getStream());
|
||||
if (streamPushInDb == null) {
|
||||
StreamPush streamPush = StreamPush.getInstance(event, userSetting.getServerId());
|
||||
streamPush.setPushing(true);
|
||||
streamPush.setServerId(userSetting.getServerId());
|
||||
streamPush.setPushTime(DateUtil.getNow());
|
||||
add(streamPush);
|
||||
}else {
|
||||
streamPushInDb.setPushing(true);
|
||||
streamPushInDb.setServerId(userSetting.getServerId());
|
||||
streamPushInDb.setMediaServerId(mediaInfo.getMediaServer().getId());
|
||||
updatePushStatus(streamPushInDb);
|
||||
}
|
||||
// 冗余数据,自己系统中自用
|
||||
if (!"broadcast".equals(event.getApp()) && !"talk".equals(event.getApp())) {
|
||||
redisCatchStorage.addPushListItem(event.getApp(), event.getStream(), event.getMediaInfo());
|
||||
}
|
||||
StreamPush streamPushInDb = getPush(event.getApp(), event.getStream());
|
||||
if (streamPushInDb == null) {
|
||||
StreamPush streamPush = StreamPush.getInstance(event, userSetting.getServerId());
|
||||
streamPush.setPushing(true);
|
||||
streamPush.setServerId(userSetting.getServerId());
|
||||
streamPush.setPushTime(DateUtil.getNow());
|
||||
add(streamPush);
|
||||
}else {
|
||||
streamPushInDb.setPushing(true);
|
||||
streamPushInDb.setServerId(userSetting.getServerId());
|
||||
streamPushInDb.setMediaServerId(mediaInfo.getMediaServer().getId());
|
||||
updatePushStatus(streamPushInDb);
|
||||
}
|
||||
// 冗余数据,自己系统中自用
|
||||
if (!"broadcast".equals(event.getApp()) && !"talk".equals(event.getApp())) {
|
||||
redisCatchStorage.addPushListItem(event.getApp(), event.getStream(), event.getMediaInfo());
|
||||
}
|
||||
|
||||
// 发送流变化redis消息
|
||||
JSONObject jsonObject = new JSONObject();
|
||||
jsonObject.put("serverId", userSetting.getServerId());
|
||||
jsonObject.put("app", event.getApp());
|
||||
jsonObject.put("stream", event.getStream());
|
||||
jsonObject.put("register", true);
|
||||
jsonObject.put("mediaServerId", event.getMediaServer().getId());
|
||||
redisCatchStorage.sendStreamChangeMsg(OriginType.values()[event.getMediaInfo().getOriginType()].getType(), jsonObject);
|
||||
// 发送流变化redis消息
|
||||
JSONObject jsonObject = new JSONObject();
|
||||
jsonObject.put("serverId", userSetting.getServerId());
|
||||
jsonObject.put("app", event.getApp());
|
||||
jsonObject.put("stream", event.getStream());
|
||||
jsonObject.put("register", true);
|
||||
jsonObject.put("mediaServerId", event.getMediaServer().getId());
|
||||
redisCatchStorage.sendStreamChangeMsg(OriginType.values()[event.getMediaInfo().getOriginType()].getType(), jsonObject);
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -121,34 +124,35 @@ public class StreamPushServiceImpl implements IStreamPushService {
|
||||
@EventListener
|
||||
@Transactional
|
||||
public void onApplicationEvent(MediaDepartureEvent event) {
|
||||
VideoContextUtils.executeIgnoreTenantAndProject(() -> {
|
||||
// 兼容流注销时类型从redis记录获取
|
||||
MediaInfo mediaInfo = redisCatchStorage.getPushListItem(event.getApp(), event.getStream());
|
||||
|
||||
// 兼容流注销时类型从redis记录获取
|
||||
MediaInfo mediaInfo = redisCatchStorage.getPushListItem(event.getApp(), event.getStream());
|
||||
|
||||
if (mediaInfo != null) {
|
||||
log.info("[推流信息] 查询到redis存在推流缓存, 开始清理,{}/{}", event.getApp(), event.getStream());
|
||||
String type = OriginType.values()[mediaInfo.getOriginType()].getType();
|
||||
// 冗余数据,自己系统中自用
|
||||
redisCatchStorage.removePushListItem(event.getApp(), event.getStream(), event.getMediaServer().getId());
|
||||
// 发送流变化redis消息
|
||||
JSONObject jsonObject = new JSONObject();
|
||||
jsonObject.put("serverId", userSetting.getServerId());
|
||||
jsonObject.put("app", event.getApp());
|
||||
jsonObject.put("stream", event.getStream());
|
||||
jsonObject.put("register", false);
|
||||
jsonObject.put("mediaServerId", event.getMediaServer().getId());
|
||||
redisCatchStorage.sendStreamChangeMsg(type, jsonObject);
|
||||
}
|
||||
StreamPush streamPush = getPush(event.getApp(), event.getStream());
|
||||
if (streamPush == null) {
|
||||
return;
|
||||
}
|
||||
if (streamPush.getGbDeviceId() != null) {
|
||||
streamPush.setPushing(false);
|
||||
updatePushStatus(streamPush);
|
||||
}else {
|
||||
deleteByAppAndStream(event.getApp(), event.getStream());
|
||||
}
|
||||
if (mediaInfo != null) {
|
||||
log.info("[推流信息] 查询到redis存在推流缓存, 开始清理,{}/{}", event.getApp(), event.getStream());
|
||||
String type = OriginType.values()[mediaInfo.getOriginType()].getType();
|
||||
// 冗余数据,自己系统中自用
|
||||
redisCatchStorage.removePushListItem(event.getApp(), event.getStream(), event.getMediaServer().getId());
|
||||
// 发送流变化redis消息
|
||||
JSONObject jsonObject = new JSONObject();
|
||||
jsonObject.put("serverId", userSetting.getServerId());
|
||||
jsonObject.put("app", event.getApp());
|
||||
jsonObject.put("stream", event.getStream());
|
||||
jsonObject.put("register", false);
|
||||
jsonObject.put("mediaServerId", event.getMediaServer().getId());
|
||||
redisCatchStorage.sendStreamChangeMsg(type, jsonObject);
|
||||
}
|
||||
StreamPush streamPush = getPush(event.getApp(), event.getStream());
|
||||
if (streamPush == null) {
|
||||
return;
|
||||
}
|
||||
if (streamPush.getGbDeviceId() != null) {
|
||||
streamPush.setPushing(false);
|
||||
updatePushStatus(streamPush);
|
||||
}else {
|
||||
deleteByAppAndStream(event.getApp(), event.getStream());
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -158,7 +162,7 @@ public class StreamPushServiceImpl implements IStreamPushService {
|
||||
@EventListener
|
||||
@Transactional
|
||||
public void onApplicationEvent(MediaServerOnlineEvent event) {
|
||||
zlmServerOnline(event.getMediaServer());
|
||||
VideoContextUtils.executeIgnoreTenantAndProject(() -> zlmServerOnline(event.getMediaServer()));
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -168,7 +172,7 @@ public class StreamPushServiceImpl implements IStreamPushService {
|
||||
@EventListener
|
||||
@Transactional
|
||||
public void onApplicationEvent(MediaServerOfflineEvent event) {
|
||||
zlmServerOffline(event.getMediaServer());
|
||||
VideoContextUtils.executeIgnoreTenantAndProject(() -> zlmServerOffline(event.getMediaServer()));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
Reference in New Issue
Block a user