diff --git a/src/main/java/com/genersoft/iot/vmp/jt1078/controller/JT1078Controller.java b/src/main/java/com/genersoft/iot/vmp/jt1078/controller/JT1078Controller.java index 7f1db1a7a..7556d713a 100644 --- a/src/main/java/com/genersoft/iot/vmp/jt1078/controller/JT1078Controller.java +++ b/src/main/java/com/genersoft/iot/vmp/jt1078/controller/JT1078Controller.java @@ -136,13 +136,15 @@ public class JT1078Controller { @Parameter(name = "app", description = "推流应用名", required = true) @Parameter(name = "stream", description = "推流ID", required = true) @Parameter(name = "mediaServerId", description = "流媒体ID", required = true) + @Parameter(name = "onlySend", description = "是否只发送", required = false) @GetMapping("/talk/start") public DeferredResult> startTalk(HttpServletRequest request, @Parameter(required = true) String deviceId, @Parameter(required = true) String channelId, @Parameter(required = true) String app, @Parameter(required = true) String stream, - @Parameter(required = true) String mediaServerId) { + @Parameter(required = true) String mediaServerId, + @Parameter(required = false) Boolean onlySend) { DeferredResult> result = new DeferredResult<>(userSetting.getPlayTimeout().longValue()); if (ObjectUtils.isEmpty(channelId)) { channelId = "1"; @@ -158,7 +160,7 @@ public class JT1078Controller { service.stopPlay(deviceId, finalChannelId); }); - service.startTalk(deviceId, channelId, app, stream, mediaServerId, (code, msg, streamInfo) -> { + service.startTalk(deviceId, channelId, app, stream, mediaServerId, onlySend, (code, msg, streamInfo) -> { WVPResult wvpResult = new WVPResult<>(); if (code == InviteErrorCode.SUCCESS.getCode()) { wvpResult.setCode(ErrorCode.SUCCESS.getCode()); diff --git a/src/main/java/com/genersoft/iot/vmp/jt1078/service/Ijt1078Service.java b/src/main/java/com/genersoft/iot/vmp/jt1078/service/Ijt1078Service.java index 236dc8764..f39b7f730 100644 --- a/src/main/java/com/genersoft/iot/vmp/jt1078/service/Ijt1078Service.java +++ b/src/main/java/com/genersoft/iot/vmp/jt1078/service/Ijt1078Service.java @@ -107,7 +107,7 @@ public interface Ijt1078Service { JTMediaAttribute queryMediaAttribute(String deviceId); - void startTalk(String deviceId, String channelId, String app, String stream, String mediaServerId, GeneralCallback callback); + void startTalk(String deviceId, String channelId, String app, String stream, String mediaServerId, Boolean onlySend, GeneralCallback callback); void stopTalk(String deviceId, String channelId); } diff --git a/src/main/java/com/genersoft/iot/vmp/jt1078/service/impl/jt1078ServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/jt1078/service/impl/jt1078ServiceImpl.java index 5977b1a8f..1311047b1 100644 --- a/src/main/java/com/genersoft/iot/vmp/jt1078/service/impl/jt1078ServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/jt1078/service/impl/jt1078ServiceImpl.java @@ -790,7 +790,8 @@ public class jt1078ServiceImpl implements Ijt1078Service { } @Override - public void startTalk(String deviceId, String channelId, String app, String stream, String mediaServerId, GeneralCallback callback) { + public void startTalk(String deviceId, String channelId, String app, String stream, String mediaServerId, Boolean onlySend, + GeneralCallback callback) { // 检查流是否已经存在,存在则返回 String playKey = VideoManagerConstants.INVITE_INFO_1078_TALK + deviceId + ":" + channelId; List> errorCallbacks = inviteErrorCallbackMap.computeIfAbsent(playKey, k -> new ArrayList<>()); @@ -828,39 +829,41 @@ public class jt1078ServiceImpl implements Ijt1078Service { sendRtpItem.setTcpActive(true); sendRtpItem.setUsePs(false); sendRtpItem.setOnlyAudio(true); - sendRtpItem.setReceiveStream(receiveStream); + if (onlySend == null || !onlySend) { + sendRtpItem.setReceiveStream(receiveStream); + } sendRtpItem.setPlatformId(deviceId); + if (onlySend != null && onlySend) { + // 设置hook监听 + Hook hook = Hook.getInstance(HookType.on_media_arrival, "rtp", receiveStream, mediaServer.getId()); + subscribe.addSubscribe(hook, (hookData) -> { + dynamicTask.stop(playKey); + logger.info("[1078-对讲] 对讲成功, deviceId: {}, channelId: {}", deviceId, channelId); + StreamInfo info = onPublishHandler(mediaServer, hookData, deviceId, channelId); - // 设置hook监听 - Hook hook = Hook.getInstance(HookType.on_media_arrival, "rtp", receiveStream, mediaServer.getId()); - subscribe.addSubscribe(hook, (hookData) -> { - dynamicTask.stop(playKey); - logger.info("[1078-对讲] 对讲成功, deviceId: {}, channelId: {}", deviceId, channelId); - StreamInfo info = onPublishHandler(mediaServer, hookData, deviceId, channelId); + for (GeneralCallback errorCallback : errorCallbacks) { + errorCallback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), info); + } + subscribe.removeSubscribe(hook); + redisTemplate.opsForValue().set(playKey, info); + // 存储发流信息 + redisCatchStorage.updateSendRTPSever(sendRtpItem); + }); + Hook hookForDeparture = Hook.getInstance(HookType.on_media_departure, "rtp", receiveStream, mediaServer.getId()); + subscribe.addSubscribe(hookForDeparture, (hookData) -> { + logger.info("[1078-对讲] 对讲时源流注销, app: {}. stream: {}, deviceId: {}, channelId: {}",app, stream, deviceId, channelId); + stopTalk(deviceId, channelId); + }); + // 设置超时监听 + dynamicTask.startDelay(playKey, () -> { + logger.info("[1078-对讲] 超时, deviceId: {}, channelId: {}", deviceId, channelId); + for (GeneralCallback errorCallback : errorCallbacks) { + errorCallback.run(InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getCode(), + InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getMsg(), null); + } - for (GeneralCallback errorCallback : errorCallbacks) { - errorCallback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), info); - } - subscribe.removeSubscribe(hook); - redisTemplate.opsForValue().set(playKey, info); - // 存储发流信息 - redisCatchStorage.updateSendRTPSever(sendRtpItem); - }); - Hook hookForDeparture = Hook.getInstance(HookType.on_media_departure, "rtp", receiveStream, mediaServer.getId()); - subscribe.addSubscribe(hookForDeparture, (hookData) -> { - logger.info("[1078-对讲] 对讲时源流注销, app: {}. stream: {}, deviceId: {}, channelId: {}",app, stream, deviceId, channelId); - stopTalk(deviceId, channelId); - }); - - // 设置超时监听 - dynamicTask.startDelay(playKey, () -> { - logger.info("[1078-对讲] 超时, deviceId: {}, channelId: {}", deviceId, channelId); - for (GeneralCallback errorCallback : errorCallbacks) { - errorCallback.run(InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getCode(), - InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getMsg(), null); - } - - }, userSetting.getPlayTimeout()); + }, userSetting.getPlayTimeout()); + } Integer localPort = mediaServerService.startSendRtpPassive(mediaServer, sendRtpItem, 15000); @@ -873,8 +876,15 @@ public class jt1078ServiceImpl implements Ijt1078Service { j9101.setTcpPort(localPort); j9101.setUdpPort(localPort); j9101.setType(2); - Object s = jt1078Template.startLive(deviceId, j9101, 6); - System.out.println("ssss=== " + s); + jt1078Template.startLive(deviceId, j9101, 6); + if (onlySend != null && onlySend) { + logger.info("[1078-对讲] 对讲成功, deviceId: {}, channelId: {}", deviceId, channelId); + for (GeneralCallback errorCallback : errorCallbacks) { + errorCallback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), null); + } + // 存储发流信息 + redisCatchStorage.updateSendRTPSever(sendRtpItem); + } } @Override