From 87ecf683327bcd4085ab2aaabfd224944a40e0f3 Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: Wed, 29 May 2024 16:55:07 +0800 Subject: [PATCH] =?UTF-8?q?1078-=E5=8F=8C=E5=90=91=E5=AF=B9=E8=AE=B2?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../iot/vmp/common/VideoManagerConstants.java | 2 +- .../jt1078/controller/JT1078Controller.java | 20 ++- .../vmp/jt1078/service/Ijt1078Service.java | 4 +- .../service/impl/jt1078ServiceImpl.java | 142 ++++++++++++++---- .../iot/vmp/jt1078/util/SSRCUtil.java | 8 + .../service/IMediaNodeServerService.java | 2 +- .../media/service/IMediaServerService.java | 2 +- .../service/impl/MediaServerServiceImpl.java | 4 +- .../media/zlm/ZLMMediaNodeServerService.java | 3 +- 9 files changed, 143 insertions(+), 44 deletions(-) create mode 100644 src/main/java/com/genersoft/iot/vmp/jt1078/util/SSRCUtil.java diff --git a/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java b/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java index e917e4a50..42d1381c9 100644 --- a/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java +++ b/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java @@ -183,7 +183,7 @@ public class VideoManagerConstants { public static final String INVITE_INFO_1078_PLAY = "INVITE_INFO_1078_PLAY:"; public static final String INVITE_INFO_1078_PLAYBACK = "INVITE_INFO_1078_PLAYBACK:"; - public static final String INVITE_INFO_1078_BROADCAST = "INVITE_INFO_1078_BROADCAST:"; + public static final String INVITE_INFO_1078_TALK = "INVITE_INFO_1078_TALK:"; public static final String RECORD_LIST_1078 = "RECORD_LIST_1078:"; diff --git a/src/main/java/com/genersoft/iot/vmp/jt1078/controller/JT1078Controller.java b/src/main/java/com/genersoft/iot/vmp/jt1078/controller/JT1078Controller.java index fbc58d16b..7f1db1a7a 100644 --- a/src/main/java/com/genersoft/iot/vmp/jt1078/controller/JT1078Controller.java +++ b/src/main/java/com/genersoft/iot/vmp/jt1078/controller/JT1078Controller.java @@ -15,7 +15,6 @@ import io.swagger.v3.oas.annotations.Operation; import io.swagger.v3.oas.annotations.Parameter; import io.swagger.v3.oas.annotations.security.SecurityRequirement; import io.swagger.v3.oas.annotations.tags.Tag; -import org.apache.commons.io.monitor.FileAlterationListener; import org.apache.commons.io.monitor.FileAlterationListenerAdaptor; import org.apache.commons.io.monitor.FileAlterationMonitor; import org.apache.commons.io.monitor.FileAlterationObserver; @@ -137,8 +136,8 @@ public class JT1078Controller { @Parameter(name = "app", description = "推流应用名", required = true) @Parameter(name = "stream", description = "推流ID", required = true) @Parameter(name = "mediaServerId", description = "流媒体ID", required = true) - @GetMapping("/broadcast") - public DeferredResult> broadcast(HttpServletRequest request, + @GetMapping("/talk/start") + public DeferredResult> startTalk(HttpServletRequest request, @Parameter(required = true) String deviceId, @Parameter(required = true) String channelId, @Parameter(required = true) String app, @@ -159,7 +158,7 @@ public class JT1078Controller { service.stopPlay(deviceId, finalChannelId); }); - service.broadcast(deviceId, channelId, app, stream, mediaServerId, (code, msg, streamInfo) -> { + service.startTalk(deviceId, channelId, app, stream, mediaServerId, (code, msg, streamInfo) -> { WVPResult wvpResult = new WVPResult<>(); if (code == InviteErrorCode.SUCCESS.getCode()) { wvpResult.setCode(ErrorCode.SUCCESS.getCode()); @@ -192,6 +191,19 @@ public class JT1078Controller { return result; } + @Operation(summary = "1078-结束对讲", security = @SecurityRequirement(name = JwtUtils.HEADER)) + @Parameter(name = "deviceId", description = "设备国标编号", required = true) + @Parameter(name = "channelId", description = "通道国标编号, 一般为从1开始的数字", required = true) + @GetMapping("/talk/stop") + public void stopTalk(HttpServletRequest request, + @Parameter(required = true) String deviceId, + @Parameter(required = false) String channelId) { + if (ObjectUtils.isEmpty(channelId)) { + channelId = "1"; + } + service.stopTalk(deviceId, channelId); + } + @Operation(summary = "1078-暂停点播", security = @SecurityRequirement(name = JwtUtils.HEADER)) @Parameter(name = "deviceId", description = "设备国标编号", required = true) diff --git a/src/main/java/com/genersoft/iot/vmp/jt1078/service/Ijt1078Service.java b/src/main/java/com/genersoft/iot/vmp/jt1078/service/Ijt1078Service.java index 170760e46..236dc8764 100644 --- a/src/main/java/com/genersoft/iot/vmp/jt1078/service/Ijt1078Service.java +++ b/src/main/java/com/genersoft/iot/vmp/jt1078/service/Ijt1078Service.java @@ -107,5 +107,7 @@ public interface Ijt1078Service { JTMediaAttribute queryMediaAttribute(String deviceId); - void broadcast(String deviceId, String channelId, String app, String stream, String mediaServerId, GeneralCallback callback); + void startTalk(String deviceId, String channelId, String app, String stream, String mediaServerId, GeneralCallback callback); + + void stopTalk(String deviceId, String channelId); } diff --git a/src/main/java/com/genersoft/iot/vmp/jt1078/service/impl/jt1078ServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/jt1078/service/impl/jt1078ServiceImpl.java index d62975adc..dfea3ec35 100644 --- a/src/main/java/com/genersoft/iot/vmp/jt1078/service/impl/jt1078ServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/jt1078/service/impl/jt1078ServiceImpl.java @@ -6,6 +6,9 @@ import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.common.VideoManagerConstants; import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.conf.UserSetting; +import com.genersoft.iot.vmp.conf.exception.ControllerException; +import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform; +import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; import com.genersoft.iot.vmp.jt1078.bean.*; import com.genersoft.iot.vmp.jt1078.bean.common.ConfigAttribute; import com.genersoft.iot.vmp.jt1078.cmd.JT1078Template; @@ -14,26 +17,37 @@ import com.genersoft.iot.vmp.jt1078.event.CallbackManager; import com.genersoft.iot.vmp.jt1078.proc.request.J1205; import com.genersoft.iot.vmp.jt1078.proc.response.*; import com.genersoft.iot.vmp.jt1078.service.Ijt1078Service; +import com.genersoft.iot.vmp.jt1078.util.SSRCUtil; +import com.genersoft.iot.vmp.media.bean.MediaInfo; import com.genersoft.iot.vmp.media.bean.MediaServer; import com.genersoft.iot.vmp.media.event.hook.Hook; import com.genersoft.iot.vmp.media.event.hook.HookData; import com.genersoft.iot.vmp.media.event.hook.HookSubscribe; import com.genersoft.iot.vmp.media.event.hook.HookType; +import com.genersoft.iot.vmp.media.event.mediaServer.MediaSendRtpStoppedEvent; import com.genersoft.iot.vmp.media.service.IMediaServerService; import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils; import com.genersoft.iot.vmp.service.IMediaService; import com.genersoft.iot.vmp.service.bean.InviteErrorCode; import com.genersoft.iot.vmp.service.bean.SSRCInfo; +import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.utils.DateUtil; +import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; import com.github.pagehelper.PageHelper; import com.github.pagehelper.PageInfo; +import org.apache.commons.lang3.ObjectUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.event.EventListener; import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Service; +import javax.sip.InvalidArgumentException; +import javax.sip.SipException; import java.lang.reflect.Field; +import java.text.ParseException; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -74,6 +88,9 @@ public class jt1078ServiceImpl implements Ijt1078Service { @Autowired private CallbackManager callbackManager; + @Autowired + private IRedisCatchStorage redisCatchStorage; + @Override public JTDevice getDevice(String terminalId) { @@ -213,7 +230,7 @@ public class jt1078ServiceImpl implements Ijt1078Service { } // 清理回调 List> generalCallbacks = inviteErrorCallbackMap.get(playKey); - if (!generalCallbacks.isEmpty()) { + if (generalCallbacks != null && !generalCallbacks.isEmpty()) { for (GeneralCallback callback : generalCallbacks) { callback.run(InviteErrorCode.ERROR_FOR_FINISH.getCode(), InviteErrorCode.ERROR_FOR_FINISH.getMsg(), null); } @@ -377,7 +394,7 @@ public class jt1078ServiceImpl implements Ijt1078Service { } // 清理回调 List> generalCallbacks = inviteErrorCallbackMap.get(playKey); - if (!generalCallbacks.isEmpty()) { + if (generalCallbacks != null && !generalCallbacks.isEmpty()) { for (GeneralCallback callback : generalCallbacks) { callback.run(InviteErrorCode.ERROR_FOR_FINISH.getCode(), InviteErrorCode.ERROR_FOR_FINISH.getMsg(), null); } @@ -751,46 +768,74 @@ public class jt1078ServiceImpl implements Ijt1078Service { return (JTMediaAttribute)jt1078Template.queryMediaAttribute(deviceId, j9003, 300); } + /** + * 监听发流停止 + */ + @EventListener + public void onApplicationEvent(MediaSendRtpStoppedEvent event) { + List sendRtpItems = redisCatchStorage.querySendRTPServerByStream(event.getStream()); + for (SendRtpItem sendRtpItem : sendRtpItems) { + if (!sendRtpItem.isOnlyAudio() + || ObjectUtils.isEmpty(sendRtpItem.getDeviceId()) + || ObjectUtils.isEmpty(sendRtpItem.getChannelId())) { + continue; + } + if (!sendRtpItem.getSsrc().contains("_")) { + continue; + } + redisCatchStorage.deleteSendRTPServer(sendRtpItem); + String playKey = VideoManagerConstants.INVITE_INFO_1078_TALK + sendRtpItem.getDeviceId() + ":" + sendRtpItem.getChannelId(); + redisTemplate.delete(playKey); + } + } + @Override - public void broadcast(String deviceId, String channelId, String app, String stream, String mediaServerId, GeneralCallback callback) { + public void startTalk(String deviceId, String channelId, String app, String stream, String mediaServerId, GeneralCallback callback) { // 检查流是否已经存在,存在则返回 - String playKey = VideoManagerConstants.INVITE_INFO_1078_BROADCAST + deviceId + ":" + channelId; + String playKey = VideoManagerConstants.INVITE_INFO_1078_TALK + deviceId + ":" + channelId; List> errorCallbacks = inviteErrorCallbackMap.computeIfAbsent(playKey, k -> new ArrayList<>()); errorCallbacks.add(callback); StreamInfo streamInfo = (StreamInfo)redisTemplate.opsForValue().get(playKey); if (streamInfo != null) { - String mediaServerId = streamInfo.getMediaServerId(); - MediaServer mediaServer = mediaServerService.getOne(mediaServerId); - if (mediaServer != null) { - // 查询流是否存在,不存在则删除缓存数据 - JSONObject mediaInfo = zlmresTfulUtils.getMediaInfo(mediaServer, "rtp", "rtsp", streamInfo.getStream()); - if (mediaInfo != null && mediaInfo.getInteger("code") == 0 ) { - Boolean online = mediaInfo.getBoolean("online"); - if (online != null && online) { - logger.info("[1078-点播] 点播已经存在,直接返回, deviceId: {}, channelId: {}", deviceId, channelId); - for (GeneralCallback errorCallback : errorCallbacks) { - errorCallback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), streamInfo); - } - return; - } - } - } - // 清理数据 - redisTemplate.delete(playKey); + throw new ControllerException(ErrorCode.ERROR100.getCode(), "对讲进行中"); } - String stream = deviceId + "_" + channelId; - MediaServer mediaServer = mediaServerService.getMediaServerForMinimumLoad(null); + + String reciveStream = deviceId + "_" + channelId; + MediaServer mediaServer = mediaServerService.getOne(mediaServerId); if (mediaServer == null) { for (GeneralCallback errorCallback : errorCallbacks) { errorCallback.run(InviteErrorCode.FAIL.getCode(), "未找到可用的媒体节点", streamInfo); } return; } + // 检查待发送的流是否存在, + MediaInfo mediaInfo = mediaServerService.getMediaInfo(mediaServer, app, stream); + if (mediaInfo == null) { + throw new ControllerException(ErrorCode.ERROR100.getCode(), app + "/" + stream + "流不存在"); + } + // 开启收流端口, zlm发送1078的rtp流需要将ssrc字段设置为 imei_channel格式 + String ssrc = deviceId + "_" + channelId; + SendRtpItem sendRtpItem = new SendRtpItem(); + sendRtpItem.setMediaServerId(mediaServerId); + sendRtpItem.setPort(0); + sendRtpItem.setSsrc(ssrc); + sendRtpItem.setDeviceId(deviceId); + sendRtpItem.setChannelId(channelId); + sendRtpItem.setRtcp(false); + sendRtpItem.setApp(app); + sendRtpItem.setStream(stream); + sendRtpItem.setTcp(true); + sendRtpItem.setTcpActive(true); + sendRtpItem.setUsePs(false); + sendRtpItem.setOnlyAudio(true); + sendRtpItem.setReceiveStream(reciveStream); + sendRtpItem.setPlatformId(deviceId); + // 设置hook监听 - Hook hook = Hook.getInstance(HookType.on_media_arrival, "rtp", stream, mediaServer.getId()); + Hook hook = Hook.getInstance(HookType.on_media_arrival, "rtp", reciveStream, mediaServer.getId()); subscribe.addSubscribe(hook, (hookData) -> { dynamicTask.stop(playKey); - logger.info("[1078-点播] 点播成功, deviceId: {}, channelId: {}", deviceId, channelId); + logger.info("[1078-对讲] 对讲成功, deviceId: {}, channelId: {}", deviceId, channelId); StreamInfo info = onPublishHandler(mediaServer, hookData, deviceId, channelId); for (GeneralCallback errorCallback : errorCallbacks) { @@ -798,10 +843,12 @@ public class jt1078ServiceImpl implements Ijt1078Service { } subscribe.removeSubscribe(hook); redisTemplate.opsForValue().set(playKey, info); + // 存储发流信息 + redisCatchStorage.updateSendRTPSever(sendRtpItem); }); // 设置超时监听 dynamicTask.startDelay(playKey, () -> { - logger.info("[1078-点播] 超时, deviceId: {}, channelId: {}", deviceId, channelId); + logger.info("[1078-对讲] 超时, deviceId: {}, channelId: {}", deviceId, channelId); for (GeneralCallback errorCallback : errorCallbacks) { errorCallback.run(InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getCode(), InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getMsg(), null); @@ -809,17 +856,46 @@ public class jt1078ServiceImpl implements Ijt1078Service { }, userSetting.getPlayTimeout()); - // 开启收流端口 - SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServer, stream, null, false, false, 0, false, false, false,1); - logger.info("[1078-点播] deviceId: {}, channelId: {}, 端口: {}", deviceId, channelId, ssrcInfo.getPort()); + Integer localPort = mediaServerService.startSendRtpPassive(mediaServer, sendRtpItem, 15000); + + logger.info("[1078-对讲] deviceId: {}, channelId: {}, 收发端口: {}, app: {}, stream: {}", + deviceId, channelId, localPort, app, stream); J9101 j9101 = new J9101(); j9101.setChannel(Integer.valueOf(channelId)); j9101.setIp(mediaServer.getSdpIp()); j9101.setRate(1); - j9101.setTcpPort(ssrcInfo.getPort()); - j9101.setUdpPort(ssrcInfo.getPort()); - j9101.setType(type); + j9101.setTcpPort(localPort); + j9101.setUdpPort(localPort); + j9101.setType(2); Object s = jt1078Template.startLive(deviceId, j9101, 6); System.out.println("ssss=== " + s); } + + @Override + public void stopTalk(String deviceId, String channelId) { + String playKey = VideoManagerConstants.INVITE_INFO_1078_TALK + deviceId + ":" + channelId; + dynamicTask.stop(playKey); + StreamInfo streamInfo = (StreamInfo) redisTemplate.opsForValue().get(playKey); + // 发送停止命令 + J9102 j9102 = new J9102(); + j9102.setChannel(Integer.valueOf(channelId)); + j9102.setCommand(0); + j9102.setCloseType(0); + j9102.setStreamType(1); + jt1078Template.stopLive(deviceId, j9102, 6); + logger.info("[1078-停止对讲] deviceId: {}, channelId: {}", deviceId, channelId); + // 删除缓存数据 + if (streamInfo != null) { + redisTemplate.delete(playKey); + // 关闭rtpServer + mediaServerService.closeRTPServer(streamInfo.getMediaServerId(), streamInfo.getStream()); + } + // 清理回调 + List> generalCallbacks = inviteErrorCallbackMap.get(playKey); + if (generalCallbacks != null && !generalCallbacks.isEmpty()) { + for (GeneralCallback callback : generalCallbacks) { + callback.run(InviteErrorCode.ERROR_FOR_FINISH.getCode(), InviteErrorCode.ERROR_FOR_FINISH.getMsg(), null); + } + } + } } diff --git a/src/main/java/com/genersoft/iot/vmp/jt1078/util/SSRCUtil.java b/src/main/java/com/genersoft/iot/vmp/jt1078/util/SSRCUtil.java new file mode 100644 index 000000000..4cba7b472 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/jt1078/util/SSRCUtil.java @@ -0,0 +1,8 @@ +package com.genersoft.iot.vmp.jt1078.util; + +public class SSRCUtil { + + public static String randomSSRC(){ + return String.format("%010d", Math.round(Math.random()*10000000000L)); + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/media/service/IMediaNodeServerService.java b/src/main/java/com/genersoft/iot/vmp/media/service/IMediaNodeServerService.java index 2566e0bee..0efc0b926 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/service/IMediaNodeServerService.java +++ b/src/main/java/com/genersoft/iot/vmp/media/service/IMediaNodeServerService.java @@ -57,7 +57,7 @@ public interface IMediaNodeServerService { Map getFFmpegCMDs(MediaServer mediaServer); - void startSendRtpPassive(MediaServer mediaServer, SendRtpItem sendRtpItem, Integer timeout); + Integer startSendRtpPassive(MediaServer mediaServer, SendRtpItem sendRtpItem, Integer timeout); void startSendRtpStream(MediaServer mediaServer, SendRtpItem sendRtpItem); diff --git a/src/main/java/com/genersoft/iot/vmp/media/service/IMediaServerService.java b/src/main/java/com/genersoft/iot/vmp/media/service/IMediaServerService.java index a8c53381e..943ea3df2 100755 --- a/src/main/java/com/genersoft/iot/vmp/media/service/IMediaServerService.java +++ b/src/main/java/com/genersoft/iot/vmp/media/service/IMediaServerService.java @@ -140,7 +140,7 @@ public interface IMediaServerService { Boolean isStreamReady(MediaServer mediaServer, String rtp, String streamId); - void startSendRtpPassive(MediaServer mediaServer, SendRtpItem sendRtpItem, Integer timeout); + Integer startSendRtpPassive(MediaServer mediaServer, SendRtpItem sendRtpItem, Integer timeout); void startSendRtp(MediaServer mediaServer, SendRtpItem sendRtpItem); diff --git a/src/main/java/com/genersoft/iot/vmp/media/service/impl/MediaServerServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/media/service/impl/MediaServerServiceImpl.java index a0b334137..24eb59055 100755 --- a/src/main/java/com/genersoft/iot/vmp/media/service/impl/MediaServerServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/media/service/impl/MediaServerServiceImpl.java @@ -824,13 +824,13 @@ public class MediaServerServiceImpl implements IMediaServerService { } @Override - public void startSendRtpPassive(MediaServer mediaServer, SendRtpItem sendRtpItem, Integer timeout) { + public Integer startSendRtpPassive(MediaServer mediaServer, SendRtpItem sendRtpItem, Integer timeout) { IMediaNodeServerService mediaNodeServerService = nodeServerServiceMap.get(mediaServer.getType()); if (mediaNodeServerService == null) { logger.info("[startSendRtpPassive] 失败, mediaServer的类型: {},未找到对应的实现类", mediaServer.getType()); throw new ControllerException(ErrorCode.ERROR100.getCode(), "未找到mediaServer对应的实现类"); } - mediaNodeServerService.startSendRtpPassive(mediaServer, sendRtpItem, timeout); + return mediaNodeServerService.startSendRtpPassive(mediaServer, sendRtpItem, timeout); } @Override diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaNodeServerService.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaNodeServerService.java index d9a6c6b0f..f220c1ecd 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaNodeServerService.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaNodeServerService.java @@ -327,7 +327,7 @@ public class ZLMMediaNodeServerService implements IMediaNodeServerService { } @Override - public void startSendRtpPassive(MediaServer mediaServer, SendRtpItem sendRtpItem, Integer timeout) { + public Integer startSendRtpPassive(MediaServer mediaServer, SendRtpItem sendRtpItem, Integer timeout) { Map param = new HashMap<>(12); param.put("vhost","__defaultVhost__"); param.put("app", sendRtpItem.getApp()); @@ -359,6 +359,7 @@ public class ZLMMediaNodeServerService implements IMediaNodeServerService { logger.info("调用ZLM-TCP被动推流接口, 结果: {}", jsonObject); logger.info("启动监听TCP被动推流成功[ {}/{} ],{}->{}:{}, " , sendRtpItem.getApp(), sendRtpItem.getStream(), jsonObject.getString("local_port"), param.get("dst_url"), param.get("dst_port")); + return jsonObject.getInteger("local_port"); } @Override