diff --git a/src/main/java/com/genersoft/iot/vmp/jt1078/bean/JTMediaStreamType.java b/src/main/java/com/genersoft/iot/vmp/jt1078/bean/JTMediaStreamType.java new file mode 100644 index 000000000..a5833ee70 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/jt1078/bean/JTMediaStreamType.java @@ -0,0 +1,5 @@ +package com.genersoft.iot.vmp.jt1078.bean; + +public enum JTMediaStreamType { + PLAY,PLAYBACK,TALK +} diff --git a/src/main/java/com/genersoft/iot/vmp/jt1078/service/Ijt1078Service.java b/src/main/java/com/genersoft/iot/vmp/jt1078/service/Ijt1078Service.java index 368284755..c1d34ff35 100644 --- a/src/main/java/com/genersoft/iot/vmp/jt1078/service/Ijt1078Service.java +++ b/src/main/java/com/genersoft/iot/vmp/jt1078/service/Ijt1078Service.java @@ -9,6 +9,8 @@ import com.github.pagehelper.PageInfo; import java.util.List; public interface Ijt1078Service { + JTMediaStreamType checkStreamFromJt(String stream); + JTDevice getDevice(String phoneNumber); JTChannel getChannel(Integer terminalDbId, Integer channelId); diff --git a/src/main/java/com/genersoft/iot/vmp/jt1078/service/impl/jt1078ServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/jt1078/service/impl/jt1078ServiceImpl.java index ba7b3e7f6..7dbec911d 100644 --- a/src/main/java/com/genersoft/iot/vmp/jt1078/service/impl/jt1078ServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/jt1078/service/impl/jt1078ServiceImpl.java @@ -1,6 +1,5 @@ package com.genersoft.iot.vmp.jt1078.service.impl; -import com.alibaba.fastjson2.JSONObject; import com.genersoft.iot.vmp.common.GeneralCallback; import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.common.VideoManagerConstants; @@ -14,7 +13,6 @@ import com.genersoft.iot.vmp.jt1078.bean.common.ConfigAttribute; import com.genersoft.iot.vmp.jt1078.cmd.JT1078Template; import com.genersoft.iot.vmp.jt1078.dao.JTChannelMapper; import com.genersoft.iot.vmp.jt1078.dao.JTTerminalMapper; -import com.genersoft.iot.vmp.jt1078.event.CallbackManager; import com.genersoft.iot.vmp.jt1078.event.FtpUploadEvent; import com.genersoft.iot.vmp.jt1078.proc.request.J1205; import com.genersoft.iot.vmp.jt1078.proc.response.*; @@ -25,14 +23,16 @@ import com.genersoft.iot.vmp.media.event.hook.Hook; import com.genersoft.iot.vmp.media.event.hook.HookData; import com.genersoft.iot.vmp.media.event.hook.HookSubscribe; import com.genersoft.iot.vmp.media.event.hook.HookType; +import com.genersoft.iot.vmp.media.event.media.MediaArrivalEvent; +import com.genersoft.iot.vmp.media.event.media.MediaDepartureEvent; +import com.genersoft.iot.vmp.media.event.media.MediaNotFoundEvent; import com.genersoft.iot.vmp.media.event.mediaServer.MediaSendRtpStoppedEvent; import com.genersoft.iot.vmp.media.service.IMediaServerService; -import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils; -import com.genersoft.iot.vmp.service.IMediaService; import com.genersoft.iot.vmp.service.bean.InviteErrorCode; import com.genersoft.iot.vmp.service.bean.SSRCInfo; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.utils.DateUtil; +import com.genersoft.iot.vmp.utils.MediaServerUtils; import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; import com.github.pagehelper.PageHelper; import com.github.pagehelper.PageInfo; @@ -42,6 +42,7 @@ import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.event.EventListener; import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Service; import java.lang.reflect.Field; @@ -71,27 +72,95 @@ public class jt1078ServiceImpl implements Ijt1078Service { @Autowired private IMediaServerService mediaServerService; - @Autowired - private IMediaService mediaService; - @Autowired private DynamicTask dynamicTask; @Autowired private UserSetting userSetting; - @Autowired - private ZLMRESTfulUtils zlmresTfulUtils; - - @Autowired - private CallbackManager callbackManager; - @Autowired private IRedisCatchStorage redisCatchStorage; @Autowired private FtpSetting ftpSetting; + /** + * 流到来的处理 + */ + @Async("taskExecutor") + @org.springframework.context.event.EventListener + public void onApplicationEvent(MediaArrivalEvent event) { + + } + + /** + * 流离开的处理 + */ + @Async("taskExecutor") + @EventListener + public void onApplicationEvent(MediaDepartureEvent event) { + + } + + /** + * 流未找到的处理 + */ + @Async("taskExecutor") + @EventListener + public void onApplicationEvent(MediaNotFoundEvent event) { + if (!userSetting.isAutoApplyPlay()) { + return; + } + JTMediaStreamType jtMediaStreamType = checkStreamFromJt(event.getStream()); + if (jtMediaStreamType == null){ + return; + } + String[] streamParamArray = event.getStream().split("_"); + String phoneNumber = streamParamArray[1]; + int channelId = Integer.parseInt(streamParamArray[2]); + String params = event.getParams(); + Map paramMap = MediaServerUtils.urlParamToMap(params); + int type = 0; + try { + type = Integer.parseInt(paramMap.get("type")); + }catch (NumberFormatException ignored) {} + if (jtMediaStreamType.equals(JTMediaStreamType.PLAY)) { + play(phoneNumber, channelId, 0, null); + }else if (jtMediaStreamType.equals(JTMediaStreamType.PLAYBACK)) { + String startTimeParam = DateUtil.jt1078Toyyyy_MM_dd_HH_mm_ss(streamParamArray[3]); + String endTimeParam = DateUtil.jt1078Toyyyy_MM_dd_HH_mm_ss(streamParamArray[4]); + int rate = 0; + int playbackType = 0; + int playbackSpeed = 0; + try { + rate = Integer.parseInt(paramMap.get("rate")); + playbackType = Integer.parseInt(paramMap.get("playbackType")); + playbackSpeed = Integer.parseInt(paramMap.get("playbackSpeed")); + }catch (NumberFormatException ignored) {} + playback(phoneNumber, channelId, startTimeParam, endTimeParam, type, rate, playbackType, playbackSpeed, null); + } + } + + + /** + * 校验流是否是属于部标的 + */ + @Override + public JTMediaStreamType checkStreamFromJt(String stream) { + if (!stream.startsWith("jt_")) { + return null; + } + String[] streamParamArray = stream.split("_"); + if (streamParamArray.length == 3) { + return JTMediaStreamType.PLAY; + }else if (streamParamArray.length == 5) { + return JTMediaStreamType.PLAYBACK; + }else if (streamParamArray.length == 4) { + return JTMediaStreamType.TALK; + }else { + return null; + } + } @Override public JTDevice getDevice(String phoneNumber) { @@ -172,7 +241,7 @@ public class jt1078ServiceImpl implements Ijt1078Service { // 清理数据 redisTemplate.delete(playKey); } - String stream = "jt_play_" + phoneNumber + "_" + channelId; + String stream = "jt_" + phoneNumber + "_" + channelId; MediaServer mediaServer = mediaServerService.getMediaServerForMinimumLoad(null); if (mediaServer == null) { for (GeneralCallback errorCallback : errorCallbacks) { @@ -189,6 +258,9 @@ public class jt1078ServiceImpl implements Ijt1078Service { StreamInfo info = onPublishHandler(mediaServer, hookData, phoneNumber, channelId); for (GeneralCallback errorCallback : errorCallbacks) { + if (errorCallback == null) { + continue; + } errorCallback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), info); } subscribe.removeSubscribe(hook); @@ -208,6 +280,10 @@ public class jt1078ServiceImpl implements Ijt1078Service { }); // 开启收流端口 SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServer, stream, "000", false, false, 0, false, !channel.getHasAudio(), false, 1); + if (ssrcInfo == null) { + stopPlay(phoneNumber, channelId); + return; + } // 设置超时监听 dynamicTask.startDelay(playKey, () -> { logger.info("[1078-点播] 超时, phoneNumber: {}, channelId: {}", phoneNumber, channelId); @@ -354,7 +430,7 @@ public class jt1078ServiceImpl implements Ijt1078Service { } String startTimeParam = DateUtil.yyyy_MM_dd_HH_mm_ssTo1078(startTime); String endTimeParam = DateUtil.yyyy_MM_dd_HH_mm_ssTo1078(endTime); - String stream = phoneNumber + "_" + channelId + "_" + startTimeParam + "_" + endTimeParam; + String stream = "jt_" + phoneNumber + "_" + channelId + "_" + startTimeParam + "_" + endTimeParam; MediaServer mediaServer = mediaServerService.getMediaServerForMinimumLoad(null); if (mediaServer == null) { for (GeneralCallback errorCallback : errorCallbacks) { @@ -904,7 +980,7 @@ public class jt1078ServiceImpl implements Ijt1078Service { throw new ControllerException(ErrorCode.ERROR100.getCode(), "对讲进行中"); } - String receiveStream = "1078" + "_" + phoneNumber + "_" + channelId; + String receiveStream = "jt_" + phoneNumber + "_" + channelId + "_talk"; MediaServer mediaServer = mediaServerService.getOne(mediaServerId); if (mediaServer == null) { for (GeneralCallback errorCallback : errorCallbacks) { diff --git a/src/main/java/com/genersoft/iot/vmp/media/event/media/MediaEvent.java b/src/main/java/com/genersoft/iot/vmp/media/event/media/MediaEvent.java index b6fb89022..1ddcdd459 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/event/media/MediaEvent.java +++ b/src/main/java/com/genersoft/iot/vmp/media/event/media/MediaEvent.java @@ -20,6 +20,16 @@ public class MediaEvent extends ApplicationEvent { private String schema; + private String params; + + public String getParams() { + return params; + } + + public void setParams(String params) { + this.params = params; + } + public String getApp() { return app; diff --git a/src/main/java/com/genersoft/iot/vmp/media/event/media/MediaNotFoundEvent.java b/src/main/java/com/genersoft/iot/vmp/media/event/media/MediaNotFoundEvent.java index 2415566a9..803e44103 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/event/media/MediaNotFoundEvent.java +++ b/src/main/java/com/genersoft/iot/vmp/media/event/media/MediaNotFoundEvent.java @@ -17,6 +17,7 @@ public class MediaNotFoundEvent extends MediaEvent { mediaDepartureEven.setStream(hookParam.getStream()); mediaDepartureEven.setSchema(hookParam.getSchema()); mediaDepartureEven.setMediaServer(mediaServer); + mediaDepartureEven.setParams(hookParam.getParams()); return mediaDepartureEven; } } diff --git a/src/main/java/com/genersoft/iot/vmp/media/event/media/MediaPublishEvent.java b/src/main/java/com/genersoft/iot/vmp/media/event/media/MediaPublishEvent.java index 0d9f032c7..4b5fa7052 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/event/media/MediaPublishEvent.java +++ b/src/main/java/com/genersoft/iot/vmp/media/event/media/MediaPublishEvent.java @@ -21,13 +21,4 @@ public class MediaPublishEvent extends MediaEvent { return mediaPublishEvent; } - private String params; - - public String getParams() { - return params; - } - - public void setParams(String params) { - this.params = params; - } } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServiceImpl.java index efadfc6e4..87312a465 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServiceImpl.java @@ -10,6 +10,8 @@ import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager; import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander; import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform; +import com.genersoft.iot.vmp.jt1078.bean.JTMediaStreamType; +import com.genersoft.iot.vmp.jt1078.service.Ijt1078Service; import com.genersoft.iot.vmp.media.bean.MediaServer; import com.genersoft.iot.vmp.media.bean.ResultForOnPublish; import com.genersoft.iot.vmp.media.zlm.dto.StreamAuthorityInfo; @@ -72,6 +74,9 @@ public class MediaServiceImpl implements IMediaService { @Autowired private ISIPCommander commander; + @Autowired + private Ijt1078Service ijt1078Service; + @Override public boolean authenticatePlay(String app, String stream, String callId) { if (app == null || stream == null) { @@ -267,10 +272,19 @@ public class MediaServiceImpl implements IMediaService { inviteInfo.getChannelId(), inviteInfo.getStream()); storager.stopPlay(inviteInfo.getDeviceId(), inviteInfo.getChannelId()); return result; + }else { + // 判断是否是1078点播 + JTMediaStreamType jtMediaStreamType = ijt1078Service.checkStreamFromJt(stream); + if (jtMediaStreamType != null) { + String[] streamParamArray = stream.split("_"); + if (jtMediaStreamType.equals(JTMediaStreamType.PLAY)) { + ijt1078Service.stopPlay(streamParamArray[1], Integer.parseInt(streamParamArray[2])); + }else if (jtMediaStreamType.equals(JTMediaStreamType.PLAYBACK)) { + ijt1078Service.stopPlayback(streamParamArray[1], Integer.parseInt(streamParamArray[2])); + } + } } - if (stream.startsWith("1078")) { - return false; - } + SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(null, null, stream, null); if (sendRtpItem != null && "talk".equals(sendRtpItem.getApp() )) { return false;