From f8e5e8f0573510ebaf72874802803b5b0fa9aa85 Mon Sep 17 00:00:00 2001 From: lin <648540858@qq.com> Date: Tue, 1 Jul 2025 17:07:10 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E6=8B=89=E6=B5=81=E4=BB=A3?= =?UTF-8?q?=E7=90=86=E6=92=AD=E6=94=BE=E6=9C=BA=E5=88=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../gb28181/controller/MediaController.java | 69 +++++---- .../service/IMediaNodeServerService.java | 2 +- .../media/service/IMediaServerService.java | 2 +- .../service/impl/MediaServerServiceImpl.java | 4 +- .../media/zlm/ZLMMediaNodeServerService.java | 15 +- .../redisMsg/IRedisRpcPlayService.java | 2 +- .../RedisRpcStreamProxyController.java | 12 +- .../service/RedisRpcPlayServiceImpl.java | 15 +- .../controller/StreamProxyController.java | 99 +++++++++---- .../streamProxy/dao/StreamProxyMapper.java | 2 +- .../service/IStreamProxyPlayService.java | 8 +- .../service/IStreamProxyService.java | 5 +- .../impl/StreamProxyPlayServiceImpl.java | 133 ++++++------------ .../service/impl/StreamProxyServiceImpl.java | 18 +-- 14 files changed, 196 insertions(+), 190 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/controller/MediaController.java b/src/main/java/com/genersoft/iot/vmp/gb28181/controller/MediaController.java index af2062b59..a292f58ff 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/controller/MediaController.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/controller/MediaController.java @@ -7,19 +7,26 @@ import com.genersoft.iot.vmp.conf.security.SecurityUtils; import com.genersoft.iot.vmp.conf.security.dto.LoginUser; import com.genersoft.iot.vmp.media.service.IMediaServerService; import com.genersoft.iot.vmp.media.zlm.dto.StreamAuthorityInfo; +import com.genersoft.iot.vmp.service.bean.ErrorCallback; +import com.genersoft.iot.vmp.service.bean.InviteErrorCode; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.streamProxy.service.IStreamProxyService; import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; import com.genersoft.iot.vmp.vmanager.bean.StreamContent; +import com.genersoft.iot.vmp.vmanager.bean.WVPResult; import io.swagger.v3.oas.annotations.Operation; import io.swagger.v3.oas.annotations.Parameter; import io.swagger.v3.oas.annotations.security.SecurityRequirement; import io.swagger.v3.oas.annotations.tags.Tag; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.util.ObjectUtils; import org.springframework.web.bind.annotation.*; +import org.springframework.web.context.request.async.DeferredResult; import javax.servlet.http.HttpServletRequest; +import java.net.MalformedURLException; +import java.net.URL; @Tag(name = "媒体流相关") @@ -52,11 +59,12 @@ public class MediaController { @Parameter(name = "useSourceIpAsStreamIp", description = "是否使用请求IP作为返回的地址IP") @GetMapping(value = "/stream_info_by_app_and_stream") @ResponseBody - public StreamContent getStreamInfoByAppAndStream(HttpServletRequest request, @RequestParam String app, - @RequestParam String stream, - @RequestParam(required = false) String mediaServerId, - @RequestParam(required = false) String callId, - @RequestParam(required = false) Boolean useSourceIpAsStreamIp){ + public DeferredResult> getStreamInfoByAppAndStream(HttpServletRequest request, @RequestParam String app, + @RequestParam String stream, + @RequestParam(required = false) String mediaServerId, + @RequestParam(required = false) String callId, + @RequestParam(required = false) Boolean useSourceIpAsStreamIp){ + DeferredResult> result = new DeferredResult<>(); boolean authority = false; if (callId != null) { // 权限校验 @@ -75,9 +83,7 @@ public class MediaController { authority = true; } } - StreamInfo streamInfo; - if (useSourceIpAsStreamIp != null && useSourceIpAsStreamIp) { String host = request.getHeader("Host"); String localAddr = host.split(":")[0]; @@ -88,30 +94,37 @@ public class MediaController { } if (streamInfo != null){ - return new StreamContent(streamInfo); + WVPResult wvpResult = WVPResult.success(); + wvpResult.setData(new StreamContent(streamInfo)); + result.setResult(wvpResult); }else { + ErrorCallback callback = (code, msg, streamInfoStoStart) -> { + if (code == InviteErrorCode.SUCCESS.getCode()) { + WVPResult wvpResult = WVPResult.success(); + if (useSourceIpAsStreamIp != null && useSourceIpAsStreamIp) { + String host; + try { + URL url=new URL(request.getRequestURL().toString()); + host=url.getHost(); + } catch (MalformedURLException e) { + host=request.getLocalAddr(); + } + streamInfoStoStart.changeStreamIp(host); + } + if (!ObjectUtils.isEmpty(streamInfoStoStart.getMediaServer().getTranscodeSuffix()) + && !"null".equalsIgnoreCase(streamInfoStoStart.getMediaServer().getTranscodeSuffix())) { + streamInfoStoStart.setStream(streamInfoStoStart.getStream() + "_" + streamInfoStoStart.getMediaServer().getTranscodeSuffix()); + } + wvpResult.setData(new StreamContent(streamInfoStoStart)); + result.setResult(wvpResult); + }else { + result.setResult(WVPResult.fail(code, msg)); + } + }; //获取流失败,重启拉流后重试一次 - streamProxyService.stopByAppAndStream(app,stream); - boolean start = streamProxyService.startByAppAndStream(app, stream); - try { - Thread.sleep(1000); - } catch (InterruptedException e) { - log.error("[线程休眠失败], {}", e.getMessage()); - } - if (useSourceIpAsStreamIp != null && useSourceIpAsStreamIp) { - String host = request.getHeader("Host"); - String localAddr = host.split(":")[0]; - log.info("使用{}作为返回流的ip", localAddr); - streamInfo = mediaServerService.getStreamInfoByAppAndStreamWithCheck(app, stream, mediaServerId, localAddr, authority); - }else { - streamInfo = mediaServerService.getStreamInfoByAppAndStreamWithCheck(app, stream, mediaServerId, authority); - } - if (streamInfo != null){ - return new StreamContent(streamInfo); - }else { - throw new ControllerException(ErrorCode.ERROR100); - } + streamProxyService.startByAppAndStream(app, stream, callback); } + return result; } /** * 获取推流播放地址 diff --git a/src/main/java/com/genersoft/iot/vmp/media/service/IMediaNodeServerService.java b/src/main/java/com/genersoft/iot/vmp/media/service/IMediaNodeServerService.java index e3256d18e..a1c5b9d25 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/service/IMediaNodeServerService.java +++ b/src/main/java/com/genersoft/iot/vmp/media/service/IMediaNodeServerService.java @@ -64,7 +64,7 @@ public interface IMediaNodeServerService { Long updateDownloadProcess(MediaServer mediaServer, String app, String stream); - StreamInfo startProxy(MediaServer mediaServer, StreamProxy streamProxy); + void startProxy(MediaServer mediaServer, StreamProxy streamProxy); void stopProxy(MediaServer mediaServer, String streamKey); diff --git a/src/main/java/com/genersoft/iot/vmp/media/service/IMediaServerService.java b/src/main/java/com/genersoft/iot/vmp/media/service/IMediaServerService.java index 4eda22704..7f92bd221 100755 --- a/src/main/java/com/genersoft/iot/vmp/media/service/IMediaServerService.java +++ b/src/main/java/com/genersoft/iot/vmp/media/service/IMediaServerService.java @@ -150,7 +150,7 @@ public interface IMediaServerService { Long updateDownloadProcess(MediaServer mediaServerItem, String app, String stream); - StreamInfo startProxy(MediaServer mediaServer, StreamProxy streamProxy); + void startProxy(MediaServer mediaServer, StreamProxy streamProxy); void stopProxy(MediaServer mediaServer, String streamKey); diff --git a/src/main/java/com/genersoft/iot/vmp/media/service/impl/MediaServerServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/media/service/impl/MediaServerServiceImpl.java index 8d4ef0a5c..09bc1199d 100755 --- a/src/main/java/com/genersoft/iot/vmp/media/service/impl/MediaServerServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/media/service/impl/MediaServerServiceImpl.java @@ -952,13 +952,13 @@ public class MediaServerServiceImpl implements IMediaServerService { } @Override - public StreamInfo startProxy(MediaServer mediaServer, StreamProxy streamProxy) { + public void startProxy(MediaServer mediaServer, StreamProxy streamProxy) { IMediaNodeServerService mediaNodeServerService = nodeServerServiceMap.get(mediaServer.getType()); if (mediaNodeServerService == null) { log.info("[startProxy] 失败, mediaServer的类型: {},未找到对应的实现类", mediaServer.getType()); throw new ControllerException(ErrorCode.ERROR100.getCode(), "未找到mediaServer对应的实现类"); } - return mediaNodeServerService.startProxy(mediaServer, streamProxy); + mediaNodeServerService.startProxy(mediaServer, streamProxy); } @Override diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaNodeServerService.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaNodeServerService.java index 6786404aa..755a7fcec 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaNodeServerService.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaNodeServerService.java @@ -425,7 +425,7 @@ public class ZLMMediaNodeServerService implements IMediaNodeServerService { } @Override - public StreamInfo startProxy(MediaServer mediaServer, StreamProxy streamProxy) { + public void startProxy(MediaServer mediaServer, StreamProxy streamProxy) { String dstUrl; if ("ffmpeg".equalsIgnoreCase(streamProxy.getType())) { @@ -463,10 +463,6 @@ public class ZLMMediaNodeServerService implements IMediaNodeServerService { MediaInfo mediaInfo = getMediaInfo(mediaServer, streamProxy.getApp(), streamProxy.getStream()); if (mediaInfo != null) { - if (mediaInfo.getOriginUrl() != null && mediaInfo.getOriginUrl().equals(streamProxy.getSrcUrl())) { - log.info("[启动拉流代理] 已存在, 直接返回, app: {}, stream: {}", mediaInfo.getApp(), streamProxy.getStream()); - return getStreamInfoByAppAndStream(mediaServer, streamProxy.getApp(), streamProxy.getStream(), mediaInfo, null, true); - } closeStreams(mediaServer, streamProxy.getApp(), streamProxy.getStream()); } @@ -490,15 +486,6 @@ public class ZLMMediaNodeServerService implements IMediaNodeServerService { JSONObject data = jsonObject.getJSONObject("data"); if (data == null) { throw new ControllerException(jsonObject.getInteger("code"), "代理结果异常: " + jsonObject); - }else { - streamProxy.setStreamKey(data.getString("key")); - // 由于此时流未注册,手动拼装流信息 - mediaInfo = new MediaInfo(); - mediaInfo.setApp(streamProxy.getApp()); - mediaInfo.setStream(streamProxy.getStream()); - mediaInfo.setOriginType(4); - mediaInfo.setOriginTypeStr("pull"); - return getStreamInfoByAppAndStream(mediaServer, streamProxy.getApp(), streamProxy.getStream(), mediaInfo, null, true); } } } diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/IRedisRpcPlayService.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/IRedisRpcPlayService.java index db1035e3e..9b5ecb3b3 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/IRedisRpcPlayService.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/IRedisRpcPlayService.java @@ -28,7 +28,7 @@ public interface IRedisRpcPlayService { void playPush(String serverId, Integer id, ErrorCallback callback); - StreamInfo playProxy(String serverId, int id); + void playProxy(String serverId, int id, ErrorCallback callback); void stopProxy(String serverId, int id); diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcStreamProxyController.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcStreamProxyController.java index 55764c531..4a688c3df 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcStreamProxyController.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcStreamProxyController.java @@ -1,7 +1,6 @@ package com.genersoft.iot.vmp.service.redisMsg.control; import com.alibaba.fastjson2.JSONObject; -import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.conf.redis.RedisRpcConfig; import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcMessage; @@ -63,10 +62,13 @@ public class RedisRpcStreamProxyController extends RpcController { response.setBody("param error"); return response; } - StreamInfo streamInfo = streamProxyPlayService.startProxy(streamProxy); - response.setStatusCode(ErrorCode.SUCCESS.getCode()); - response.setBody(JSONObject.toJSONString(streamInfo)); - return response; + streamProxyPlayService.startProxy(streamProxy, (code, msg, streamInfo) -> { + response.setStatusCode(code); + response.setBody(JSONObject.toJSONString(streamInfo)); + sendResponse(response); + }); + + return null; } /** diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/service/RedisRpcPlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/service/RedisRpcPlayServiceImpl.java index 5f369efaf..42e171372 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/service/RedisRpcPlayServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/service/RedisRpcPlayServiceImpl.java @@ -212,13 +212,20 @@ public class RedisRpcPlayServiceImpl implements IRedisRpcPlayService { } @Override - public StreamInfo playProxy(String serverId, int id) { + public void playProxy(String serverId, int id, ErrorCallback callback) { RedisRpcRequest request = buildRequest("streamProxy/play", id); + request.setToId(serverId); RedisRpcResponse response = redisRpcConfig.request(request, userSetting.getPlayTimeout(), TimeUnit.SECONDS); - if (response != null && response.getStatusCode() == ErrorCode.SUCCESS.getCode()) { - return JSON.parseObject(response.getBody().toString(), StreamInfo.class); + if (response == null) { + callback.run(ErrorCode.ERROR100.getCode(), ErrorCode.ERROR100.getMsg(), null); + }else { + if (response.getStatusCode() == ErrorCode.SUCCESS.getCode()) { + StreamInfo streamInfo = JSON.parseObject(response.getBody().toString(), StreamInfo.class); + callback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), streamInfo); + }else { + callback.run(response.getStatusCode(), response.getBody().toString(), null); + } } - return null; } @Override diff --git a/src/main/java/com/genersoft/iot/vmp/streamProxy/controller/StreamProxyController.java b/src/main/java/com/genersoft/iot/vmp/streamProxy/controller/StreamProxyController.java index 7d651219e..331f8f3a4 100755 --- a/src/main/java/com/genersoft/iot/vmp/streamProxy/controller/StreamProxyController.java +++ b/src/main/java/com/genersoft/iot/vmp/streamProxy/controller/StreamProxyController.java @@ -7,12 +7,15 @@ import com.genersoft.iot.vmp.conf.exception.ControllerException; import com.genersoft.iot.vmp.conf.security.JwtUtils; import com.genersoft.iot.vmp.media.bean.MediaServer; import com.genersoft.iot.vmp.media.service.IMediaServerService; +import com.genersoft.iot.vmp.service.bean.ErrorCallback; +import com.genersoft.iot.vmp.service.bean.InviteErrorCode; import com.genersoft.iot.vmp.streamProxy.bean.StreamProxy; import com.genersoft.iot.vmp.streamProxy.bean.StreamProxyParam; import com.genersoft.iot.vmp.streamProxy.service.IStreamProxyPlayService; import com.genersoft.iot.vmp.streamProxy.service.IStreamProxyService; import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; import com.genersoft.iot.vmp.vmanager.bean.StreamContent; +import com.genersoft.iot.vmp.vmanager.bean.WVPResult; import com.github.pagehelper.PageInfo; import io.swagger.v3.oas.annotations.Operation; import io.swagger.v3.oas.annotations.Parameter; @@ -20,8 +23,10 @@ import io.swagger.v3.oas.annotations.security.SecurityRequirement; import io.swagger.v3.oas.annotations.tags.Tag; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.util.Assert; import org.springframework.util.ObjectUtils; import org.springframework.web.bind.annotation.*; +import org.springframework.web.context.request.async.DeferredResult; import javax.servlet.http.HttpServletRequest; import java.net.MalformedURLException; @@ -89,7 +94,7 @@ public class StreamProxyController { }) @PostMapping(value = "/save") @ResponseBody - public StreamContent save(@RequestBody StreamProxyParam param){ + public DeferredResult> save(HttpServletRequest request, @RequestBody StreamProxyParam param){ log.info("添加代理: " + JSONObject.toJSONString(param)); if (ObjectUtils.isEmpty(param.getMediaServerId())) { param.setMediaServerId("auto"); @@ -97,18 +102,39 @@ public class StreamProxyController { if (ObjectUtils.isEmpty(param.getType())) { param.setType("default"); } + DeferredResult> result = new DeferredResult<>(userSetting.getPlayTimeout().longValue()); + ErrorCallback callback = (code, msg, streamInfo) -> { + if (code == InviteErrorCode.SUCCESS.getCode()) { + WVPResult wvpResult = WVPResult.success(); + if (streamInfo != null) { + if (userSetting.getUseSourceIpAsStreamIp()) { + streamInfo=streamInfo.clone();//深拷贝 + String host; + try { + URL url=new URL(request.getRequestURL().toString()); + host=url.getHost(); + } catch (MalformedURLException e) { + host=request.getLocalAddr(); + } + streamInfo.changeStreamIp(host); + } + if (!ObjectUtils.isEmpty(streamInfo.getMediaServer().getTranscodeSuffix()) + && !"null".equalsIgnoreCase(streamInfo.getMediaServer().getTranscodeSuffix())) { + streamInfo.setStream(streamInfo.getStream() + "_" + streamInfo.getMediaServer().getTranscodeSuffix()); + } + wvpResult.setData(new StreamContent(streamInfo)); + }else { + wvpResult.setCode(code); + wvpResult.setMsg(msg); + } - StreamInfo streamInfo = streamProxyService.save(param); - if (param.isEnable()) { - if (streamInfo == null) { - throw new ControllerException(ErrorCode.ERROR100.getCode(), ErrorCode.ERROR100.getMsg()); + result.setResult(wvpResult); }else { - return new StreamContent(streamInfo); + result.setResult(WVPResult.fail(code, msg)); } - }else { - return null; - } - + }; + streamProxyService.save(param, callback); + return result; } @Operation(summary = "新增代理", security = @SecurityRequirement(name = JwtUtils.HEADER), parameters = { @@ -193,25 +219,46 @@ public class StreamProxyController { @ResponseBody @Operation(summary = "启用代理", security = @SecurityRequirement(name = JwtUtils.HEADER)) @Parameter(name = "id", description = "代理Id", required = true) - public StreamContent start(HttpServletRequest request, int id){ + public DeferredResult> start(HttpServletRequest request, int id){ log.info("播放代理: {}", id); - StreamInfo streamInfo = streamProxyPlayService.start(id, null, null); - if (streamInfo == null) { - throw new ControllerException(ErrorCode.ERROR100.getCode(), ErrorCode.ERROR100.getMsg()); - }else { - if (userSetting.getUseSourceIpAsStreamIp()) { - streamInfo=streamInfo.clone();//深拷贝 - String host; - try { - URL url=new URL(request.getRequestURL().toString()); - host=url.getHost(); - } catch (MalformedURLException e) { - host=request.getLocalAddr(); + StreamProxy streamProxy = streamProxyService.getStreamProxy(id); + Assert.notNull(streamProxy, "代理信息不存在"); + + DeferredResult> result = new DeferredResult<>(userSetting.getPlayTimeout().longValue()); + + ErrorCallback callback = (code, msg, streamInfo) -> { + if (code == InviteErrorCode.SUCCESS.getCode()) { + WVPResult wvpResult = WVPResult.success(); + if (streamInfo != null) { + if (userSetting.getUseSourceIpAsStreamIp()) { + streamInfo=streamInfo.clone();//深拷贝 + String host; + try { + URL url=new URL(request.getRequestURL().toString()); + host=url.getHost(); + } catch (MalformedURLException e) { + host=request.getLocalAddr(); + } + streamInfo.changeStreamIp(host); + } + if (!ObjectUtils.isEmpty(streamInfo.getMediaServer().getTranscodeSuffix()) + && !"null".equalsIgnoreCase(streamInfo.getMediaServer().getTranscodeSuffix())) { + streamInfo.setStream(streamInfo.getStream() + "_" + streamInfo.getMediaServer().getTranscodeSuffix()); + } + wvpResult.setData(new StreamContent(streamInfo)); + }else { + wvpResult.setCode(code); + wvpResult.setMsg(msg); } - streamInfo.changeStreamIp(host); + + result.setResult(wvpResult); + }else { + result.setResult(WVPResult.fail(code, msg)); } - return new StreamContent(streamInfo); - } + }; + + streamProxyPlayService.start(id, null, callback); + return result; } @GetMapping(value = "/stop") diff --git a/src/main/java/com/genersoft/iot/vmp/streamProxy/dao/StreamProxyMapper.java b/src/main/java/com/genersoft/iot/vmp/streamProxy/dao/StreamProxyMapper.java index 4258458b7..db19e5961 100755 --- a/src/main/java/com/genersoft/iot/vmp/streamProxy/dao/StreamProxyMapper.java +++ b/src/main/java/com/genersoft/iot/vmp/streamProxy/dao/StreamProxyMapper.java @@ -92,5 +92,5 @@ public interface StreamProxyMapper { " SET pulling=#{pulling}, media_server_id = #{mediaServerId}, " + " stream_key = #{streamKey} " + " WHERE id=#{id}") - void addStream(StreamProxy streamProxy); + void updateStream(StreamProxy streamProxy); } diff --git a/src/main/java/com/genersoft/iot/vmp/streamProxy/service/IStreamProxyPlayService.java b/src/main/java/com/genersoft/iot/vmp/streamProxy/service/IStreamProxyPlayService.java index 284c90bf2..7836e80f6 100755 --- a/src/main/java/com/genersoft/iot/vmp/streamProxy/service/IStreamProxyPlayService.java +++ b/src/main/java/com/genersoft/iot/vmp/streamProxy/service/IStreamProxyPlayService.java @@ -4,13 +4,13 @@ import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.service.bean.ErrorCallback; import com.genersoft.iot.vmp.streamProxy.bean.StreamProxy; +import javax.validation.constraints.NotNull; + public interface IStreamProxyPlayService { - StreamInfo start(int id, Boolean record, ErrorCallback callback); + void start(int id, Boolean record, ErrorCallback callback); - void start(int id, ErrorCallback callback); - - StreamInfo startProxy(StreamProxy streamProxy); + void startProxy(@NotNull StreamProxy streamProxy, ErrorCallback callback); void stop(int id); diff --git a/src/main/java/com/genersoft/iot/vmp/streamProxy/service/IStreamProxyService.java b/src/main/java/com/genersoft/iot/vmp/streamProxy/service/IStreamProxyService.java index 1d848b64d..3b0cdbe0d 100755 --- a/src/main/java/com/genersoft/iot/vmp/streamProxy/service/IStreamProxyService.java +++ b/src/main/java/com/genersoft/iot/vmp/streamProxy/service/IStreamProxyService.java @@ -2,6 +2,7 @@ package com.genersoft.iot.vmp.streamProxy.service; import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.media.bean.MediaServer; +import com.genersoft.iot.vmp.service.bean.ErrorCallback; import com.genersoft.iot.vmp.streamProxy.bean.StreamProxy; import com.genersoft.iot.vmp.streamProxy.bean.StreamProxyParam; import com.genersoft.iot.vmp.vmanager.bean.ResourceBaseInfo; @@ -15,7 +16,7 @@ public interface IStreamProxyService { * 保存视频代理 * @param param */ - StreamInfo save(StreamProxyParam param); + void save(StreamProxyParam param, ErrorCallback callback); /** * 分页查询 @@ -38,7 +39,7 @@ public interface IStreamProxyService { * @param stream * @return */ - boolean startByAppAndStream(String app, String stream); + void startByAppAndStream(String app, String stream, ErrorCallback callback); /** * 停用用视频代理 diff --git a/src/main/java/com/genersoft/iot/vmp/streamProxy/service/impl/StreamProxyPlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/streamProxy/service/impl/StreamProxyPlayServiceImpl.java index a9ff6dc23..797126b90 100755 --- a/src/main/java/com/genersoft/iot/vmp/streamProxy/service/impl/StreamProxyPlayServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/streamProxy/service/impl/StreamProxyPlayServiceImpl.java @@ -4,12 +4,10 @@ import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.conf.exception.ControllerException; -import com.genersoft.iot.vmp.media.bean.MediaInfo; import com.genersoft.iot.vmp.media.bean.MediaServer; import com.genersoft.iot.vmp.media.event.hook.Hook; import com.genersoft.iot.vmp.media.event.hook.HookSubscribe; import com.genersoft.iot.vmp.media.event.hook.HookType; -import com.genersoft.iot.vmp.media.event.media.MediaArrivalEvent; import com.genersoft.iot.vmp.media.service.IMediaServerService; import com.genersoft.iot.vmp.service.bean.ErrorCallback; import com.genersoft.iot.vmp.service.bean.InviteErrorCode; @@ -20,16 +18,12 @@ import com.genersoft.iot.vmp.streamProxy.service.IStreamProxyPlayService; import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.context.event.EventListener; -import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Service; -import org.springframework.transaction.annotation.Transactional; import org.springframework.util.Assert; import org.springframework.util.ObjectUtils; -import javax.sip.message.Response; +import javax.validation.constraints.NotNull; import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; /** * 视频代理业务 @@ -56,107 +50,42 @@ public class StreamProxyPlayServiceImpl implements IStreamProxyPlayService { @Autowired private IRedisRpcPlayService redisRpcPlayService; - private ConcurrentHashMap> callbackMap = new ConcurrentHashMap<>(); - - private ConcurrentHashMap streamInfoMap = new ConcurrentHashMap<>(); - - /** - * 流到来的处理 - */ - @Async("taskExecutor") - @Transactional - @EventListener - public void onApplicationEvent(MediaArrivalEvent event) { - if ("rtsp".equals(event.getSchema())) { - StreamProxy streamProxy = streamProxyMapper.selectOneByAppAndStream(event.getApp(), event.getStream()); - if (streamProxy != null) { - ErrorCallback callback = callbackMap.remove(streamProxy.getId()); - StreamInfo streamInfo = streamInfoMap.remove(streamProxy.getId()); - if (callback != null && streamInfo != null) { - callback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), streamInfo); - } - } - } - } - @Override - public void start(int id, ErrorCallback callback) { - StreamProxy streamProxy = streamProxyMapper.select(id); - if (streamProxy == null) { - throw new ControllerException(ErrorCode.ERROR404.getCode(), "代理信息未找到"); - } - StreamInfo streamInfo = startProxy(streamProxy); - if (streamInfo == null) { - callback.run(Response.BUSY_HERE, "busy here", null); - return; - } - callbackMap.put(id, callback); - streamInfoMap.put(id, streamInfo); - - MediaServer mediaServer = mediaServerService.getOne(streamProxy.getMediaServerId()); - if (mediaServer != null) { - MediaInfo mediaInfo = mediaServerService.getMediaInfo(mediaServer, streamProxy.getApp(), streamProxy.getStream()); - if (mediaInfo != null) { - callbackMap.remove(id); - streamInfoMap.remove(id); - callback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), streamInfo); - } - } - } - - @Override - public StreamInfo start(int id, Boolean record, ErrorCallback callback) { + public void start(int id, Boolean record, ErrorCallback callback) { log.info("[拉流代理], 开始拉流,ID:{}", id); StreamProxy streamProxy = streamProxyMapper.select(id); if (streamProxy == null) { throw new ControllerException(ErrorCode.ERROR404.getCode(), "代理信息未找到"); } + log.info("[拉流代理] 类型: {}, app:{}, stream: {}, 流地址: {}", streamProxy.getType(), streamProxy.getApp(), streamProxy.getStream(), streamProxy.getSrcUrl()); if (record != null) { streamProxy.setEnableMp4(record); } - if (streamProxy.getMediaServerId() != null) { - StreamInfo streamInfo = mediaServerService.getStreamInfoByAppAndStreamWithCheck(streamProxy.getApp(), streamProxy.getStream(), streamProxy.getMediaServerId(), false); - if (streamInfo != null) { - callbackMap.remove(id); - streamInfoMap.remove(id); - callback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), streamInfo); - return streamInfo; - } - } - StreamInfo streamInfo = startProxy(streamProxy); - if (callback != null) { - // 设置流超时的定时任务 - String timeOutTaskKey = UUID.randomUUID().toString(); - Hook rtpHook = Hook.getInstance(HookType.on_media_arrival, streamProxy.getApp(), streamProxy.getStream(), streamInfo.getMediaServer().getId()); - dynamicTask.startDelay(timeOutTaskKey, () -> { - log.info("[拉流代理], 收流超时,ID:{}", id); - // 收流超时 - subscribe.removeSubscribe(rtpHook); - callback.run(InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getCode(), InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getMsg(), streamInfo); - }, userSetting.getPlayTimeout()); - - // 开启流到来的监听 - subscribe.addSubscribe(rtpHook, (hookData) -> { - dynamicTask.stop(timeOutTaskKey); - // hook响应 - callback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), streamInfo); - subscribe.removeSubscribe(rtpHook); - }); - } - return streamInfo; + startProxy(streamProxy, callback); } @Override - public StreamInfo startProxy(StreamProxy streamProxy){ + public void startProxy(@NotNull StreamProxy streamProxy, ErrorCallback callback){ if (!streamProxy.isEnable()) { - return null; + callback.run(ErrorCode.ERROR100.getCode(), "代理未启用", null); + return; } if (streamProxy.getServerId() == null) { streamProxy.setServerId(userSetting.getServerId()); } if (!userSetting.getServerId().equals(streamProxy.getServerId())) { - return redisRpcPlayService.playProxy(streamProxy.getServerId(), streamProxy.getId()); + log.info("[拉流代理] 由其他服务{}管理", streamProxy.getServerId()); + redisRpcPlayService.playProxy(streamProxy.getServerId(), streamProxy.getId(), callback); + return; + } + + if (streamProxy.getMediaServerId() != null) { + StreamInfo streamInfo = mediaServerService.getStreamInfoByAppAndStreamWithCheck(streamProxy.getApp(), streamProxy.getStream(), streamProxy.getMediaServerId(), null, false); + if (streamInfo != null) { + callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo); + return; + } } MediaServer mediaServer; @@ -169,12 +98,32 @@ public class StreamProxyPlayServiceImpl implements IStreamProxyPlayService { if (mediaServer == null) { throw new ControllerException(ErrorCode.ERROR100.getCode(), mediaServerId == null?"未找到可用的媒体节点":"未找到节点" + mediaServerId); } - StreamInfo streamInfo = mediaServerService.startProxy(mediaServer, streamProxy); + + // 设置流超时的定时任务 + String timeOutTaskKey = UUID.randomUUID().toString(); + Hook rtpHook = Hook.getInstance(HookType.on_media_arrival, streamProxy.getApp(), streamProxy.getStream(), mediaServer.getId()); + dynamicTask.startDelay(timeOutTaskKey, () -> { + log.info("[拉流代理] 收流超时,app:{},stream: {}", streamProxy.getApp(), streamProxy.getStream()); + // 收流超时 + subscribe.removeSubscribe(rtpHook); + callback.run(InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getCode(), InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getMsg(), null); + }, userSetting.getPlayTimeout()); + + // 开启流到来的监听 + subscribe.addSubscribe(rtpHook, (hookData) -> { + log.info("[拉流代理] 收流成功,app:{},stream: {}", hookData.getApp(), hookData.getStream()); + dynamicTask.stop(timeOutTaskKey); + StreamInfo streamInfo = mediaServerService.getStreamInfoByAppAndStream(mediaServer, hookData.getApp(), hookData.getStream(), hookData.getMediaInfo(), null); + // hook响应 + callback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), streamInfo); + subscribe.removeSubscribe(rtpHook); + }); + + mediaServerService.startProxy(mediaServer, streamProxy); if (mediaServerId == null || !mediaServerId.equals(mediaServer.getId())) { streamProxy.setMediaServerId(mediaServer.getId()); - streamProxyMapper.addStream(streamProxy); + streamProxyMapper.updateStream(streamProxy); } - return streamInfo; } @Override diff --git a/src/main/java/com/genersoft/iot/vmp/streamProxy/service/impl/StreamProxyServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/streamProxy/service/impl/StreamProxyServiceImpl.java index 6c89b02d8..b6d8261b3 100755 --- a/src/main/java/com/genersoft/iot/vmp/streamProxy/service/impl/StreamProxyServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/streamProxy/service/impl/StreamProxyServiceImpl.java @@ -15,6 +15,7 @@ import com.genersoft.iot.vmp.media.event.mediaServer.MediaServerOfflineEvent; import com.genersoft.iot.vmp.media.event.mediaServer.MediaServerOnlineEvent; import com.genersoft.iot.vmp.media.service.IMediaServerService; import com.genersoft.iot.vmp.media.zlm.dto.hook.OriginType; +import com.genersoft.iot.vmp.service.bean.ErrorCallback; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.streamProxy.bean.StreamProxy; import com.genersoft.iot.vmp.streamProxy.bean.StreamProxyParam; @@ -109,7 +110,9 @@ public class StreamProxyServiceImpl implements IStreamProxyService { // 拉流代理 StreamProxy streamProxyByAppAndStream = getStreamProxyByAppAndStream(event.getApp(), event.getStream()); if (streamProxyByAppAndStream != null && streamProxyByAppAndStream.isEnableDisableNoneReader()) { - startByAppAndStream(event.getApp(), event.getStream()); + startByAppAndStream(event.getApp(), event.getStream(), ((code, msg, data) -> { + log.info("[拉流代理] 自动点播成功, app: {}, stream: {}", event.getApp(), event.getStream()); + })); } } @@ -136,7 +139,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService { @Override @Transactional - public StreamInfo save(StreamProxyParam param) { + public void save(StreamProxyParam param, ErrorCallback callback) { // 兼容旧接口 StreamProxy streamProxyInDb = getStreamProxyByAppAndStream(param.getApp(), param.getStream()); if (streamProxyInDb != null && streamProxyInDb.getPulling() != null && streamProxyInDb.getPulling()) { @@ -159,9 +162,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService { } if (param.isEnable()) { - return playService.startProxy(streamProxy); - } else { - return null; + playService.startProxy(streamProxy, callback); } } @@ -247,13 +248,12 @@ public class StreamProxyServiceImpl implements IStreamProxyService { @Override - public boolean startByAppAndStream(String app, String stream) { + public void startByAppAndStream(String app, String stream, ErrorCallback callback) { StreamProxy streamProxy = streamProxyMapper.selectOneByAppAndStream(app, stream); if (streamProxy == null) { throw new ControllerException(ErrorCode.ERROR404.getCode(), "代理信息未找到"); } - StreamInfo streamInfo = playService.startProxy(streamProxy); - return streamInfo != null; + playService.startProxy(streamProxy, callback); } @Override @@ -406,7 +406,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService { streamProxy.setPulling(status); streamProxy.setMediaServerId(mediaServerId); streamProxy.setUpdateTime(DateUtil.getNow()); - streamProxyMapper.addStream(streamProxy); + streamProxyMapper.updateStream(streamProxy); streamProxy.setGbStatus(status ? "ON" : "OFF"); if (streamProxy.getGbId() > 0) {