diff --git a/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java b/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java index 5e3ca47d4..e917e4a50 100644 --- a/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java +++ b/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java @@ -183,6 +183,7 @@ public class VideoManagerConstants { public static final String INVITE_INFO_1078_PLAY = "INVITE_INFO_1078_PLAY:"; public static final String INVITE_INFO_1078_PLAYBACK = "INVITE_INFO_1078_PLAYBACK:"; + public static final String INVITE_INFO_1078_BROADCAST = "INVITE_INFO_1078_BROADCAST:"; public static final String RECORD_LIST_1078 = "RECORD_LIST_1078:"; diff --git a/src/main/java/com/genersoft/iot/vmp/jt1078/controller/JT1078Controller.java b/src/main/java/com/genersoft/iot/vmp/jt1078/controller/JT1078Controller.java index f18df1bc3..fbc58d16b 100644 --- a/src/main/java/com/genersoft/iot/vmp/jt1078/controller/JT1078Controller.java +++ b/src/main/java/com/genersoft/iot/vmp/jt1078/controller/JT1078Controller.java @@ -61,13 +61,13 @@ public class JT1078Controller { @Operation(summary = "1078-开始点播", security = @SecurityRequirement(name = JwtUtils.HEADER)) @Parameter(name = "deviceId", description = "设备国标编号", required = true) @Parameter(name = "channelId", description = "通道国标编号, 一般为从1开始的数字", required = true) - @Parameter(name = "type", description = "类型:0:音视频,1:视频,2:音频", required = true) + @Parameter(name = "type", description = "类型:0:音视频,1:视频,3:音频", required = true) @GetMapping("/live/start") public DeferredResult> startLive(HttpServletRequest request, @Parameter(required = true) String deviceId, @Parameter(required = false) String channelId, @Parameter(required = false) Integer type) { - if (type == null || (type != 0 && type != 1 && type != 2)) { + if (type == null || (type != 0 && type != 1 && type != 3)) { type = 0; } DeferredResult> result = new DeferredResult<>(userSetting.getPlayTimeout().longValue()); @@ -80,7 +80,7 @@ public class JT1078Controller { // 释放rtpserver WVPResult wvpResult = new WVPResult<>(); wvpResult.setCode(ErrorCode.ERROR100.getCode()); - wvpResult.setMsg("点播超时"); + wvpResult.setMsg("超时"); result.setResult(wvpResult); service.stopPlay(deviceId, finalChannelId); }); @@ -131,6 +131,68 @@ public class JT1078Controller { service.stopPlay(deviceId, channelId); } + @Operation(summary = "1078-语音对讲", security = @SecurityRequirement(name = JwtUtils.HEADER)) + @Parameter(name = "deviceId", description = "设备国标编号", required = true) + @Parameter(name = "channelId", description = "通道国标编号, 一般为从1开始的数字", required = true) + @Parameter(name = "app", description = "推流应用名", required = true) + @Parameter(name = "stream", description = "推流ID", required = true) + @Parameter(name = "mediaServerId", description = "流媒体ID", required = true) + @GetMapping("/broadcast") + public DeferredResult> broadcast(HttpServletRequest request, + @Parameter(required = true) String deviceId, + @Parameter(required = true) String channelId, + @Parameter(required = true) String app, + @Parameter(required = true) String stream, + @Parameter(required = true) String mediaServerId) { + DeferredResult> result = new DeferredResult<>(userSetting.getPlayTimeout().longValue()); + if (ObjectUtils.isEmpty(channelId)) { + channelId = "1"; + } + String finalChannelId = channelId; + result.onTimeout(()->{ + logger.info("[1078-语音对讲超时] deviceId:{}, channelId:{}, ", deviceId, finalChannelId); + // 释放rtpserver + WVPResult wvpResult = new WVPResult<>(); + wvpResult.setCode(ErrorCode.ERROR100.getCode()); + wvpResult.setMsg("超时"); + result.setResult(wvpResult); + service.stopPlay(deviceId, finalChannelId); + }); + + service.broadcast(deviceId, channelId, app, stream, mediaServerId, (code, msg, streamInfo) -> { + WVPResult wvpResult = new WVPResult<>(); + if (code == InviteErrorCode.SUCCESS.getCode()) { + wvpResult.setCode(ErrorCode.SUCCESS.getCode()); + wvpResult.setMsg(ErrorCode.SUCCESS.getMsg()); + + if (streamInfo != null) { + if (userSetting.getUseSourceIpAsStreamIp()) { + streamInfo=streamInfo.clone();//深拷贝 + String host; + try { + URL url=new URL(request.getRequestURL().toString()); + host=url.getHost(); + } catch (MalformedURLException e) { + host=request.getLocalAddr(); + } + streamInfo.channgeStreamIp(host); + } + wvpResult.setData(new StreamContent(streamInfo)); + }else { + wvpResult.setCode(code); + wvpResult.setMsg(msg); + } + }else { + wvpResult.setCode(code); + wvpResult.setMsg(msg); + } + result.setResult(wvpResult); + }); + + return result; + } + + @Operation(summary = "1078-暂停点播", security = @SecurityRequirement(name = JwtUtils.HEADER)) @Parameter(name = "deviceId", description = "设备国标编号", required = true) @Parameter(name = "channelId", description = "通道国标编号, 一般为从1开始的数字", required = true) diff --git a/src/main/java/com/genersoft/iot/vmp/jt1078/service/Ijt1078Service.java b/src/main/java/com/genersoft/iot/vmp/jt1078/service/Ijt1078Service.java index b52256a88..170760e46 100644 --- a/src/main/java/com/genersoft/iot/vmp/jt1078/service/Ijt1078Service.java +++ b/src/main/java/com/genersoft/iot/vmp/jt1078/service/Ijt1078Service.java @@ -106,4 +106,6 @@ public interface Ijt1078Service { void uploadMediaDataForSingle(String deviceId, Long mediaId, Integer delete); JTMediaAttribute queryMediaAttribute(String deviceId); + + void broadcast(String deviceId, String channelId, String app, String stream, String mediaServerId, GeneralCallback callback); } diff --git a/src/main/java/com/genersoft/iot/vmp/jt1078/service/impl/jt1078ServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/jt1078/service/impl/jt1078ServiceImpl.java index 0af1713b0..d62975adc 100644 --- a/src/main/java/com/genersoft/iot/vmp/jt1078/service/impl/jt1078ServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/jt1078/service/impl/jt1078ServiceImpl.java @@ -140,7 +140,7 @@ public class jt1078ServiceImpl implements Ijt1078Service { // 清理数据 redisTemplate.delete(playKey); } - String stream = deviceId + "-" + channelId; + String stream = deviceId + "_" + channelId; MediaServer mediaServer = mediaServerService.getMediaServerForMinimumLoad(null); if (mediaServer == null) { for (GeneralCallback errorCallback : errorCallbacks) { @@ -750,4 +750,76 @@ public class jt1078ServiceImpl implements Ijt1078Service { J9003 j9003 = new J9003(); return (JTMediaAttribute)jt1078Template.queryMediaAttribute(deviceId, j9003, 300); } + + @Override + public void broadcast(String deviceId, String channelId, String app, String stream, String mediaServerId, GeneralCallback callback) { + // 检查流是否已经存在,存在则返回 + String playKey = VideoManagerConstants.INVITE_INFO_1078_BROADCAST + deviceId + ":" + channelId; + List> errorCallbacks = inviteErrorCallbackMap.computeIfAbsent(playKey, k -> new ArrayList<>()); + errorCallbacks.add(callback); + StreamInfo streamInfo = (StreamInfo)redisTemplate.opsForValue().get(playKey); + if (streamInfo != null) { + String mediaServerId = streamInfo.getMediaServerId(); + MediaServer mediaServer = mediaServerService.getOne(mediaServerId); + if (mediaServer != null) { + // 查询流是否存在,不存在则删除缓存数据 + JSONObject mediaInfo = zlmresTfulUtils.getMediaInfo(mediaServer, "rtp", "rtsp", streamInfo.getStream()); + if (mediaInfo != null && mediaInfo.getInteger("code") == 0 ) { + Boolean online = mediaInfo.getBoolean("online"); + if (online != null && online) { + logger.info("[1078-点播] 点播已经存在,直接返回, deviceId: {}, channelId: {}", deviceId, channelId); + for (GeneralCallback errorCallback : errorCallbacks) { + errorCallback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), streamInfo); + } + return; + } + } + } + // 清理数据 + redisTemplate.delete(playKey); + } + String stream = deviceId + "_" + channelId; + MediaServer mediaServer = mediaServerService.getMediaServerForMinimumLoad(null); + if (mediaServer == null) { + for (GeneralCallback errorCallback : errorCallbacks) { + errorCallback.run(InviteErrorCode.FAIL.getCode(), "未找到可用的媒体节点", streamInfo); + } + return; + } + // 设置hook监听 + Hook hook = Hook.getInstance(HookType.on_media_arrival, "rtp", stream, mediaServer.getId()); + subscribe.addSubscribe(hook, (hookData) -> { + dynamicTask.stop(playKey); + logger.info("[1078-点播] 点播成功, deviceId: {}, channelId: {}", deviceId, channelId); + StreamInfo info = onPublishHandler(mediaServer, hookData, deviceId, channelId); + + for (GeneralCallback errorCallback : errorCallbacks) { + errorCallback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), info); + } + subscribe.removeSubscribe(hook); + redisTemplate.opsForValue().set(playKey, info); + }); + // 设置超时监听 + dynamicTask.startDelay(playKey, () -> { + logger.info("[1078-点播] 超时, deviceId: {}, channelId: {}", deviceId, channelId); + for (GeneralCallback errorCallback : errorCallbacks) { + errorCallback.run(InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getCode(), + InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getMsg(), null); + } + + }, userSetting.getPlayTimeout()); + + // 开启收流端口 + SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServer, stream, null, false, false, 0, false, false, false,1); + logger.info("[1078-点播] deviceId: {}, channelId: {}, 端口: {}", deviceId, channelId, ssrcInfo.getPort()); + J9101 j9101 = new J9101(); + j9101.setChannel(Integer.valueOf(channelId)); + j9101.setIp(mediaServer.getSdpIp()); + j9101.setRate(1); + j9101.setTcpPort(ssrcInfo.getPort()); + j9101.setUdpPort(ssrcInfo.getPort()); + j9101.setType(type); + Object s = jt1078Template.startLive(deviceId, j9101, 6); + System.out.println("ssss=== " + s); + } }