From e080534b156a1f55ea5a1693252c97b1dd2c65b3 Mon Sep 17 00:00:00 2001 From: lin <648540858@qq.com> Date: Wed, 5 Mar 2025 16:04:05 +0800 Subject: [PATCH] =?UTF-8?q?[=E5=9B=BD=E6=A0=87=E7=BA=A7=E8=81=94]=20?= =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E7=82=B9=E6=92=AD=E6=9C=AA=E5=BC=80=E5=A7=8B?= =?UTF-8?q?=E7=9A=84=E6=8B=89=E6=B5=81=E4=BB=A3=E7=90=86=E9=80=9A=E9=81=93?= =?UTF-8?q?=E6=97=B6=E6=8E=A8=E6=B5=81=E5=A4=B1=E8=B4=A5=E7=9A=84BUG?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../impl/StreamProxyPlayServiceImpl.java | 35 +++++++++++++++++-- 1 file changed, 33 insertions(+), 2 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/streamProxy/service/impl/StreamProxyPlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/streamProxy/service/impl/StreamProxyPlayServiceImpl.java index d31fca5ed..f4ea79d0a 100755 --- a/src/main/java/com/genersoft/iot/vmp/streamProxy/service/impl/StreamProxyPlayServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/streamProxy/service/impl/StreamProxyPlayServiceImpl.java @@ -2,9 +2,14 @@ package com.genersoft.iot.vmp.streamProxy.service.impl; import com.baomidou.dynamic.datasource.annotation.DS; import com.genersoft.iot.vmp.common.StreamInfo; +import com.genersoft.iot.vmp.conf.DynamicTask; +import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.conf.exception.ControllerException; import com.genersoft.iot.vmp.media.bean.MediaInfo; import com.genersoft.iot.vmp.media.bean.MediaServer; +import com.genersoft.iot.vmp.media.event.hook.Hook; +import com.genersoft.iot.vmp.media.event.hook.HookSubscribe; +import com.genersoft.iot.vmp.media.event.hook.HookType; import com.genersoft.iot.vmp.media.event.media.MediaArrivalEvent; import com.genersoft.iot.vmp.media.service.IMediaServerService; import com.genersoft.iot.vmp.service.bean.ErrorCallback; @@ -23,6 +28,7 @@ import org.springframework.util.Assert; import org.springframework.util.ObjectUtils; import javax.sip.message.Response; +import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; /** @@ -39,6 +45,15 @@ public class StreamProxyPlayServiceImpl implements IStreamProxyPlayService { @Autowired private IMediaServerService mediaServerService; + @Autowired + private HookSubscribe subscribe; + + @Autowired + private DynamicTask dynamicTask; + + @Autowired + private UserSetting userSetting; + private ConcurrentHashMap> callbackMap = new ConcurrentHashMap<>(); private ConcurrentHashMap streamInfoMap = new ConcurrentHashMap<>(); @@ -96,9 +111,25 @@ public class StreamProxyPlayServiceImpl implements IStreamProxyPlayService { if (record != null) { streamProxy.setEnableMp4(record); } + StreamInfo streamInfo = startProxy(streamProxy); - if (streamInfo != null && callback != null) { - callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo); + if (callback != null) { + // 设置流超时的定时任务 + String timeOutTaskKey = UUID.randomUUID().toString(); + Hook rtpHook = Hook.getInstance(HookType.on_media_arrival, streamProxy.getApp(), streamProxy.getStream(), streamInfo.getMediaServer().getId()); + dynamicTask.startDelay(timeOutTaskKey, () -> { + // 收流超时 + subscribe.removeSubscribe(rtpHook); + callback.run(InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getCode(), InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getMsg(), streamInfo); + }, userSetting.getPlayTimeout()); + + // 开启流到来的监听 + subscribe.addSubscribe(rtpHook, (hookData) -> { + dynamicTask.stop(timeOutTaskKey); + // hook响应 + callback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), streamInfo); + subscribe.removeSubscribe(rtpHook); + }); } return streamInfo; }