diff --git a/src/main/java/com/genersoft/iot/vmp/jt1078/bean/JTDevice.java b/src/main/java/com/genersoft/iot/vmp/jt1078/bean/JTDevice.java index 3318c20df..c19923f31 100644 --- a/src/main/java/com/genersoft/iot/vmp/jt1078/bean/JTDevice.java +++ b/src/main/java/com/genersoft/iot/vmp/jt1078/bean/JTDevice.java @@ -12,89 +12,48 @@ public class JTDevice { private int id; - /** - * 省域ID - */ @Schema(description = "省域ID") private String provinceId; - /** - * 省域文字描述 - */ @Schema(description = "省域文字描述") private String provinceText; - /** - * 市县域ID - */ @Schema(description = "市县域ID") private String cityId; - /** - * 市县域文字描述 - */ @Schema(description = "市县域文字描述") private String cityText; - /** - * 制造商ID - */ @Schema(description = "制造商ID") private String makerId; - /** - * 终端型号 - */ @Schema(description = "终端型号") private String model; - /** - * 终端手机号 - */ @Schema(description = "终端手机号") private String phoneNumber; - /** - * 终端ID - */ @Schema(description = "终端ID") private String terminalId; - /** - * 车牌颜色 - */ @Schema(description = "车牌颜色") private int plateColor; - /** - * 车牌 - */ @Schema(description = "车牌") private String plateNo; - /** - * 鉴权码 - */ @Schema(description = "鉴权码") private String authenticationCode; - /** - * 经度 - */ @Schema(description = "经度") private Double longitude; - /** - * 纬度 - */ @Schema(description = "纬度") private Double latitude; - @Schema(description = "注册时间") private String registerTime; - @Schema(description = "创建时间") private String createTime; @@ -104,6 +63,15 @@ public class JTDevice { @Schema(description = "状态") private boolean status; + @Schema(description = "设备使用的媒体id, 默认为null") + private String mediaServerId; + + @Schema(description = "地理坐标系, 目前支持 WGS84,GCJ02") + private String geoCoordSys; + + @Schema(description = "收流IP") + private String sdpIp; + @Override public String toString() { return "JTDevice{" + diff --git a/src/main/java/com/genersoft/iot/vmp/jt1078/bean/config/JTChanel.java b/src/main/java/com/genersoft/iot/vmp/jt1078/bean/config/JTChanelConfig.java similarity index 87% rename from src/main/java/com/genersoft/iot/vmp/jt1078/bean/config/JTChanel.java rename to src/main/java/com/genersoft/iot/vmp/jt1078/bean/config/JTChanelConfig.java index cead16a8d..860dc8257 100644 --- a/src/main/java/com/genersoft/iot/vmp/jt1078/bean/config/JTChanel.java +++ b/src/main/java/com/genersoft/iot/vmp/jt1078/bean/config/JTChanelConfig.java @@ -5,14 +5,12 @@ import io.netty.buffer.Unpooled; import lombok.Getter; import lombok.Setter; -import java.util.Stack; - /** * 音视频通道 */ @Setter @Getter -public class JTChanel implements JTDeviceSubConfig{ +public class JTChanelConfig implements JTDeviceSubConfig{ /** * 物理通道号 单独 @@ -47,8 +45,8 @@ public class JTChanel implements JTDeviceSubConfig{ return byteBuf; } - public static JTChanel decode(ByteBuf byteBuf) { - JTChanel jtChanel = new JTChanel(); + public static JTChanelConfig decode(ByteBuf byteBuf) { + JTChanelConfig jtChanel = new JTChanelConfig(); jtChanel.setPhysicalChannelId(byteBuf.readUnsignedByte()); jtChanel.setLogicChannelId(byteBuf.readUnsignedByte()); jtChanel.setChannelType(byteBuf.readUnsignedByte()); diff --git a/src/main/java/com/genersoft/iot/vmp/jt1078/bean/config/JTChannelListParam.java b/src/main/java/com/genersoft/iot/vmp/jt1078/bean/config/JTChannelListParam.java index 70c8f3f05..ff03faa1a 100644 --- a/src/main/java/com/genersoft/iot/vmp/jt1078/bean/config/JTChannelListParam.java +++ b/src/main/java/com/genersoft/iot/vmp/jt1078/bean/config/JTChannelListParam.java @@ -30,7 +30,7 @@ public class JTChannelListParam implements JTDeviceSubConfig{ */ private int videoCount; - private List chanelList; + private List chanelList; @Override @@ -39,7 +39,7 @@ public class JTChannelListParam implements JTDeviceSubConfig{ byteBuf.writeByte(videoAndAudioCount); byteBuf.writeByte(audioCount); byteBuf.writeByte(videoCount); - for (JTChanel jtChanel : chanelList) { + for (JTChanelConfig jtChanel : chanelList) { byteBuf.writeBytes(jtChanel.encode()); } return byteBuf; @@ -51,9 +51,9 @@ public class JTChannelListParam implements JTDeviceSubConfig{ channelListParam.setAudioCount(byteBuf.readUnsignedByte()); channelListParam.setVideoCount(byteBuf.readUnsignedByte()); int total = channelListParam.getVideoAndAudioCount() + channelListParam.getVideoCount() + channelListParam.getAudioCount(); - List chanelList = new ArrayList<>(total); + List chanelList = new ArrayList<>(total); for (int i = 0; i < total; i++) { - chanelList.add(JTChanel.decode(byteBuf)); + chanelList.add(JTChanelConfig.decode(byteBuf)); } channelListParam.setChanelList(chanelList); return channelListParam; diff --git a/src/main/java/com/genersoft/iot/vmp/jt1078/controller/JT1078Controller.java b/src/main/java/com/genersoft/iot/vmp/jt1078/controller/JT1078Controller.java index e7600f35b..129ba7b1d 100644 --- a/src/main/java/com/genersoft/iot/vmp/jt1078/controller/JT1078Controller.java +++ b/src/main/java/com/genersoft/iot/vmp/jt1078/controller/JT1078Controller.java @@ -135,55 +135,24 @@ public class JT1078Controller { @Operation(summary = "JT-语音对讲", security = @SecurityRequirement(name = JwtUtils.HEADER)) @Parameter(name = "phoneNumber", description = "设备手机号", required = true) @Parameter(name = "channelId", description = "通道国标编号, 一般为从1开始的数字", required = true) - @Parameter(name = "app", description = "推流应用名", required = true) - @Parameter(name = "stream", description = "推流ID", required = true) - @Parameter(name = "mediaServerId", description = "流媒体ID", required = true) - @Parameter(name = "onlySend", description = "是否只发送", required = false) @GetMapping("/talk/start") - public DeferredResult> startTalk(HttpServletRequest request, + public StreamContent startTalk(HttpServletRequest request, @Parameter(required = true) String phoneNumber, - @Parameter(required = true) Integer channelId, - @Parameter(required = true) String app, - @Parameter(required = true) String stream, - @Parameter(required = true) String mediaServerId, - @Parameter(required = false) Boolean onlySend) { - DeferredResult> result = new DeferredResult<>(userSetting.getPlayTimeout().longValue()); - result.onTimeout(()->{ - log.info("[JT-语音对讲超时] phoneNumber:{}, channelId:{}, ", phoneNumber, channelId); - // 释放rtpserver - WVPResult wvpResult = new WVPResult<>(); - wvpResult.setCode(ErrorCode.ERROR100.getCode()); - wvpResult.setMsg("超时"); - result.setResult(wvpResult); - jt1078PlayService.stopPlay(phoneNumber, channelId); - }); + @Parameter(required = true) Integer channelId) { - jt1078PlayService.startTalk(phoneNumber, channelId, app, stream, mediaServerId, onlySend, wvpResult -> { - WVPResult wvpResultForFinish = new WVPResult<>(); - wvpResultForFinish.setCode(wvpResult.getCode()); - wvpResultForFinish.setMsg(wvpResult.getMsg()); - if (wvpResult.getCode() == InviteErrorCode.SUCCESS.getCode()) { - - if (wvpResult.getData() != null) { - StreamInfo streamInfo = wvpResult.getData(); - if (userSetting.getUseSourceIpAsStreamIp()) { - streamInfo = wvpResult.getData().clone();//深拷贝 - String host; - try { - URL url=new URL(request.getRequestURL().toString()); - host=url.getHost(); - } catch (MalformedURLException e) { - host=request.getLocalAddr(); - } - streamInfo.changeStreamIp(host); - } - wvpResultForFinish.setData(new StreamContent(streamInfo)); - } + StreamInfo streamInfo = jt1078PlayService.startTalk(phoneNumber, channelId); + if (userSetting.getUseSourceIpAsStreamIp()) { + String host; + try { + URL url=new URL(request.getRequestURL().toString()); + host=url.getHost(); + } catch (MalformedURLException e) { + host=request.getLocalAddr(); } - result.setResult(wvpResultForFinish); - }); - - return result; + streamInfo.changeStreamIp(host); + } + streamInfo.setIp("localhost"); + return new StreamContent(streamInfo); } @Operation(summary = "JT-结束对讲", security = @SecurityRequirement(name = JwtUtils.HEADER)) diff --git a/src/main/java/com/genersoft/iot/vmp/jt1078/service/Ijt1078PlayService.java b/src/main/java/com/genersoft/iot/vmp/jt1078/service/Ijt1078PlayService.java index 8823beb9e..9c7e8cb52 100644 --- a/src/main/java/com/genersoft/iot/vmp/jt1078/service/Ijt1078PlayService.java +++ b/src/main/java/com/genersoft/iot/vmp/jt1078/service/Ijt1078PlayService.java @@ -27,7 +27,7 @@ public interface Ijt1078PlayService { void stopPlayback(String phoneNumber, Integer channelId); - void startTalk(String phoneNumber, Integer channelId, String app, String stream, String mediaServerId, Boolean onlySend, CommonCallback> callback); + StreamInfo startTalk(String phoneNumber, Integer channelId); void stopTalk(String phoneNumber, Integer channelId); diff --git a/src/main/java/com/genersoft/iot/vmp/jt1078/service/impl/jt1078PlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/jt1078/service/impl/jt1078PlayServiceImpl.java index a372420dc..f549894c8 100644 --- a/src/main/java/com/genersoft/iot/vmp/jt1078/service/impl/jt1078PlayServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/jt1078/service/impl/jt1078PlayServiceImpl.java @@ -7,6 +7,8 @@ import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.conf.exception.ControllerException; import com.genersoft.iot.vmp.conf.ftpServer.FtpSetting; +import com.genersoft.iot.vmp.gb28181.bean.Device; +import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel; import com.genersoft.iot.vmp.gb28181.bean.SendRtpInfo; import com.genersoft.iot.vmp.jt1078.bean.*; import com.genersoft.iot.vmp.jt1078.cmd.JT1078Template; @@ -41,6 +43,7 @@ import org.springframework.context.event.EventListener; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Service; +import org.springframework.util.Assert; import java.util.ArrayList; import java.util.List; @@ -51,6 +54,8 @@ import java.util.concurrent.ConcurrentHashMap; @Slf4j public class jt1078PlayServiceImpl implements Ijt1078PlayService { + public static final String talkApp = "jt_talk"; + @Autowired private ISendRtpServerService sendRtpServerService; @@ -81,7 +86,27 @@ public class jt1078PlayServiceImpl implements Ijt1078PlayService { @Async("taskExecutor") @EventListener public void onApplicationEvent(MediaArrivalEvent event) { + if (event.getApp().equals(talkApp) && event.getStream().endsWith("_talk")) { + // 收到对JT讲的流 + if (event.getStream().indexOf("_") <= 0) { + log.info("[JT-对讲流到来] 流格式有误,stream应该为jt_[phoneNumber]_[channelId]_talk"); + return; + } + String[] streamArray = event.getStream().split("_"); + if (streamArray.length != 4) { + log.info("[JT-对讲流到来] 流格式有误,stream应该为jt_[phoneNumber]_[channelId]_talk"); + return; + } + String phoneNumber = streamArray[1]; + String channelId = streamArray[2]; + JTDevice device = jt1078Service.getDevice(phoneNumber); + if (device == null) { + log.info("[JT-对讲流到来] 未找到设备{}", phoneNumber); + return; + } + sendTalk(device, Integer.valueOf(channelId), event.getMediaServer(), event.getApp(), event.getStream()); + } } /** @@ -191,7 +216,12 @@ public class jt1078PlayServiceImpl implements Ijt1078PlayService { redisTemplate.delete(playKey); } - MediaServer mediaServer = mediaServerService.getMediaServerForMinimumLoad(null); + MediaServer mediaServer; + if (org.springframework.util.ObjectUtils.isEmpty(device.getMediaServerId()) || "auto".equals(device.getMediaServerId())) { + mediaServer = mediaServerService.getMediaServerForMinimumLoad(null); + } else { + mediaServer = mediaServerService.getOne(device.getMediaServerId()); + } if (mediaServer == null) { for (CommonCallback> errorCallback : errorCallbacks) { errorCallback.run(new WVPResult<>(InviteErrorCode.FAIL.getCode(), "未找到可用的媒体节点", streamInfo)); @@ -376,7 +406,12 @@ public class jt1078PlayServiceImpl implements Ijt1078PlayService { String app = "1078"; String stream = String.format("%s_%s_%s_%s", phoneNumber, channelId, DateUtil.yyyy_MM_dd_HH_mm_ssToUrl(startTime), DateUtil.yyyy_MM_dd_HH_mm_ssToUrl(endTime)); - MediaServer mediaServer = mediaServerService.getMediaServerForMinimumLoad(null); + MediaServer mediaServer; + if (org.springframework.util.ObjectUtils.isEmpty(device.getMediaServerId()) || "auto".equals(device.getMediaServerId())) { + mediaServer = mediaServerService.getMediaServerForMinimumLoad(null); + } else { + mediaServer = mediaServerService.getOne(device.getMediaServerId()); + } if (mediaServer == null) { for (CommonCallback> errorCallback : errorCallbacks) { errorCallback.run(new WVPResult<>(InviteErrorCode.FAIL.getCode(), "未找到可用的媒体节点", streamInfo)); @@ -483,25 +518,6 @@ public class jt1078PlayServiceImpl implements Ijt1078PlayService { playbackControl(phoneNumber, channelId, 2, null, null); } - private final Map>> fileUploadMap = new ConcurrentHashMap<>(); - - @EventListener - public void onApplicationEvent(FtpUploadEvent event) { - if (fileUploadMap.isEmpty()) { - return; - } - fileUploadMap.keySet().forEach(key -> { - if (!event.getFileName().contains(key)) { - return; - } - CommonCallback> callback = fileUploadMap.get(key); - if (callback != null) { - callback.run(new WVPResult<>(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), event.getFileName())); - fileUploadMap.remove(key); - } - }); - } - /** * 监听发流停止 */ @@ -525,90 +541,82 @@ public class jt1078PlayServiceImpl implements Ijt1078PlayService { } } + @Override - public void startTalk(String phoneNumber, Integer channelId, String app, String stream, String mediaServerId, Boolean onlySend, - CommonCallback> callback) { + public StreamInfo startTalk(String phoneNumber, Integer channelId) { // 检查流是否已经存在,存在则返回 String playKey = VideoManagerConstants.INVITE_INFO_1078_TALK + phoneNumber + ":" + channelId; - List>> errorCallbacks = inviteErrorCallbackMap.computeIfAbsent(playKey, k -> new ArrayList<>()); - errorCallbacks.add(callback); StreamInfo streamInfo = (StreamInfo) redisTemplate.opsForValue().get(playKey); + if (streamInfo != null) { throw new ControllerException(ErrorCode.ERROR100.getCode(), "对讲进行中"); } - String receiveStream = "jt_" + phoneNumber + "_" + channelId + "_talk"; - MediaServer mediaServer = mediaServerService.getOne(mediaServerId); - if (mediaServer == null) { - for (CommonCallback> errorCallback : errorCallbacks) { - errorCallback.run(new WVPResult<>(InviteErrorCode.FAIL.getCode(), "未找到可用的媒体节点", streamInfo)); - } - return; + JTDevice device = jt1078Service.getDevice(phoneNumber); + Assert.notNull(device, "部标设备不存在"); + + String stream = "jt_" + phoneNumber + "_" + channelId + "_talk"; + + MediaServer mediaServer; + if (org.springframework.util.ObjectUtils.isEmpty(device.getMediaServerId()) || "auto".equals(device.getMediaServerId())) { + mediaServer = mediaServerService.getMediaServerForMinimumLoad(null); + } else { + mediaServer = mediaServerService.getOne(device.getMediaServerId()); } + + // 检查待发送的流是否存在, + MediaInfo mediaInfo = mediaServerService.getMediaInfo(mediaServer, talkApp, stream); + Assert.isNull(mediaInfo, "对讲已经存在"); + return mediaServerService.getStreamInfoByAppAndStream(mediaServer, talkApp, stream, null, null, null, false); + + } + private void sendTalk(JTDevice device, Integer channelId, MediaServer mediaServer, String app, String stream) { // 检查待发送的流是否存在, MediaInfo mediaInfo = mediaServerService.getMediaInfo(mediaServer, app, stream); if (mediaInfo == null) { throw new ControllerException(ErrorCode.ERROR100.getCode(), app + "/" + stream + "流不存在"); } + + String phoneNumber = device.getPhoneNumber(); + // 开启收流端口, zlm发送1078的rtp流需要将ssrc字段设置为 imei_channel格式 - String ssrc = phoneNumber + "_" + channelId; - SendRtpInfo sendRtpInfo = sendRtpServerService.createSendRtpInfo(mediaServer, null, null, ssrc, phoneNumber, app, stream, channelId, true, false); + String ssrc = device.getPhoneNumber() + "_" + channelId; + SendRtpInfo sendRtpInfo = sendRtpServerService.createSendRtpInfo(mediaServer, null, null, ssrc, phoneNumber, talkApp, stream, channelId, true, false); sendRtpInfo.setTcpActive(true); sendRtpInfo.setUsePs(false); sendRtpInfo.setOnlyAudio(true); - if (onlySend == null || !onlySend) { - sendRtpInfo.setReceiveStream(receiveStream); - } - if (onlySend == null || !onlySend) { - // 设置hook监听 - Hook hook = Hook.getInstance(HookType.on_media_arrival, "1078", receiveStream, mediaServer.getId()); - subscribe.addSubscribe(hook, (hookData) -> { - dynamicTask.stop(playKey); - log.info("[JT-对讲] 对讲成功, phoneNumber: {}, channelId: {}", phoneNumber, channelId); - StreamInfo info = onPublishHandler(mediaServer, hookData, phoneNumber, channelId); + sendRtpInfo.setReceiveStream(stream + "_talk"); - for (CommonCallback> errorCallback : errorCallbacks) { - errorCallback.run(new WVPResult<>(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), info)); - } - subscribe.removeSubscribe(hook); - redisTemplate.opsForValue().set(playKey, info); - // 存储发流信息 - sendRtpServerService.update(sendRtpInfo); - }); - Hook hookForDeparture = Hook.getInstance(HookType.on_media_departure, "1078", receiveStream, mediaServer.getId()); - subscribe.addSubscribe(hookForDeparture, (hookData) -> { - log.info("[JT-对讲] 对讲时源流注销, app: {}. stream: {}, phoneNumber: {}, channelId: {}", app, stream, phoneNumber, channelId); - stopTalk(phoneNumber, channelId); - }); - // 设置超时监听 - dynamicTask.startDelay(playKey, () -> { - log.info("[JT-对讲] 超时, phoneNumber: {}, channelId: {}", phoneNumber, channelId); - for (CommonCallback> errorCallback : errorCallbacks) { - errorCallback.run(new WVPResult<>(InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getCode(), - InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getMsg(), null)); - } + // 设置hook监听 + Hook hook = Hook.getInstance(HookType.on_media_arrival, "1078", sendRtpInfo.getReceiveStream(), mediaServer.getId()); + subscribe.addSubscribe(hook, (hookData) -> { + log.info("[JT-对讲] 对讲连接建立, phoneNumber: {}, channelId: {}", phoneNumber, channelId); + subscribe.removeSubscribe(hook); + // 存储发流信息 + sendRtpServerService.update(sendRtpInfo); + }); + Hook hookForDeparture = Hook.getInstance(HookType.on_media_departure, app, stream, mediaServer.getId()); + subscribe.addSubscribe(hookForDeparture, (hookData) -> { + log.info("[JT-对讲] 对讲时源流注销, app: {}. stream: {}, phoneNumber: {}, channelId: {}", app, stream, phoneNumber, channelId); + stopTalk(phoneNumber, channelId); + }); - }, userSetting.getPlayTimeout()); - } + Integer localPort = mediaServerService.startSendRtpPassive(mediaServer, sendRtpInfo, userSetting.getPlayTimeout()); log.info("[JT-对讲] phoneNumber: {}, channelId: {}, 收发端口: {}, app: {}, stream: {}", - phoneNumber, channelId, sendRtpInfo.getLocalPort(), app, stream); + phoneNumber, channelId, localPort, app, stream); J9101 j9101 = new J9101(); - j9101.setChannel(Integer.valueOf(channelId)); + j9101.setChannel(channelId); j9101.setIp(mediaServer.getSdpIp()); j9101.setRate(1); j9101.setTcpPort(sendRtpInfo.getLocalPort()); j9101.setUdpPort(sendRtpInfo.getLocalPort()); - j9101.setType(2); + j9101.setType(4); jt1078Template.startLive(phoneNumber, j9101, 6); - if (onlySend != null && onlySend) { - log.info("[JT-对讲] 对讲成功, phoneNumber: {}, channelId: {}", phoneNumber, channelId); - for (CommonCallback> errorCallback : errorCallbacks) { - errorCallback.run(new WVPResult<>(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), null)); - } - // 存储发流信息 - sendRtpServerService.update(sendRtpInfo); - } + + log.info("[JT-对讲] 对讲消息下发成功, phoneNumber: {}, channelId: {}", phoneNumber, channelId); + // 存储发流信息 +// sendRtpServerService.update(sendRtpInfo); } @Override @@ -618,7 +626,7 @@ public class jt1078PlayServiceImpl implements Ijt1078PlayService { StreamInfo streamInfo = (StreamInfo) redisTemplate.opsForValue().get(playKey); // 发送停止命令 J9102 j9102 = new J9102(); - j9102.setChannel(Integer.valueOf(channelId)); + j9102.setChannel(channelId); j9102.setCommand(4); j9102.setCloseType(0); j9102.setStreamType(1); diff --git a/src/main/java/com/genersoft/iot/vmp/media/abl/ABLMediaNodeServerService.java b/src/main/java/com/genersoft/iot/vmp/media/abl/ABLMediaNodeServerService.java index 12ea96280..b86b8e19b 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/abl/ABLMediaNodeServerService.java +++ b/src/main/java/com/genersoft/iot/vmp/media/abl/ABLMediaNodeServerService.java @@ -362,6 +362,12 @@ public class ABLMediaNodeServerService implements IMediaNodeServerService { return 0; } + @Override + public Integer startSendRtpTalk(MediaServer mediaServer, SendRtpInfo sendRtpItem, Integer timeout) { + logger.warn("[abl-startSendRtpTalk] 未实现"); + return 0; + } + @Override public void startSendRtpStream(MediaServer mediaServer, SendRtpInfo sendRtpItem) { logger.warn("[abl-startSendRtpStream] 未实现"); diff --git a/src/main/java/com/genersoft/iot/vmp/media/service/IMediaNodeServerService.java b/src/main/java/com/genersoft/iot/vmp/media/service/IMediaNodeServerService.java index 50e61cd5c..3f3d142ed 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/service/IMediaNodeServerService.java +++ b/src/main/java/com/genersoft/iot/vmp/media/service/IMediaNodeServerService.java @@ -66,6 +66,8 @@ public interface IMediaNodeServerService { void startSendRtpStream(MediaServer mediaServer, SendRtpInfo sendRtpItem); + Integer startSendRtpTalk(MediaServer mediaServer, SendRtpInfo sendRtpItem, Integer timeout); + Long updateDownloadProcess(MediaServer mediaServer, String app, String stream); void startProxy(MediaServer mediaServer, StreamProxy streamProxy); diff --git a/src/main/java/com/genersoft/iot/vmp/media/service/IMediaServerService.java b/src/main/java/com/genersoft/iot/vmp/media/service/IMediaServerService.java index dd828c6f7..1e47d4ef3 100755 --- a/src/main/java/com/genersoft/iot/vmp/media/service/IMediaServerService.java +++ b/src/main/java/com/genersoft/iot/vmp/media/service/IMediaServerService.java @@ -148,6 +148,8 @@ public interface IMediaServerService { Integer startSendRtpPassive(MediaServer mediaServer, SendRtpInfo sendRtpItem, Integer timeout); + Integer startSendRtpTalk(MediaServer mediaServer, SendRtpInfo sendRtpItem, Integer timeout); + void startSendRtp(MediaServer mediaServer, SendRtpInfo sendRtpItem); MediaServer getMediaServerByAppAndStream(String app, String stream); diff --git a/src/main/java/com/genersoft/iot/vmp/media/service/impl/MediaServerServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/media/service/impl/MediaServerServiceImpl.java index 0cd6a9609..71709db94 100755 --- a/src/main/java/com/genersoft/iot/vmp/media/service/impl/MediaServerServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/media/service/impl/MediaServerServiceImpl.java @@ -936,6 +936,16 @@ public class MediaServerServiceImpl implements IMediaServerService { return mediaNodeServerService.startSendRtpPassive(mediaServer, sendRtpItem, timeout); } + @Override + public Integer startSendRtpTalk(MediaServer mediaServer, SendRtpInfo sendRtpItem, Integer timeout) { + IMediaNodeServerService mediaNodeServerService = nodeServerServiceMap.get(mediaServer.getType()); + if (mediaNodeServerService == null) { + log.info("[startSendRtpPassive] 失败, mediaServer的类型: {},未找到对应的实现类", mediaServer.getType()); + throw new ControllerException(ErrorCode.ERROR100.getCode(), "未找到mediaServer对应的实现类"); + } + return mediaNodeServerService.startSendRtpTalk(mediaServer, sendRtpItem, timeout); + } + @Override public void startSendRtp(MediaServer mediaServer, SendRtpInfo sendRtpItem) { IMediaNodeServerService mediaNodeServerService = nodeServerServiceMap.get(mediaServer.getType()); diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaNodeServerService.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaNodeServerService.java index c13543b46..fe55bd592 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaNodeServerService.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaNodeServerService.java @@ -425,6 +425,29 @@ public class ZLMMediaNodeServerService implements IMediaNodeServerService { log.info("[推流结果]:{} ,参数: {}",jsonObject, JSONObject.toJSONString(param)); } + @Override + public Integer startSendRtpTalk(MediaServer mediaServer, SendRtpInfo sendRtpItem, Integer timeout) { + Map param = new HashMap<>(12); + param.put("vhost","__defaultVhost__"); + param.put("app", sendRtpItem.getApp()); + param.put("stream", sendRtpItem.getStream()); + param.put("ssrc", sendRtpItem.getSsrc()); + param.put("pt", sendRtpItem.getPt()); + param.put("type", sendRtpItem.isUsePs() ? "1" : "0"); + param.put("only_audio", sendRtpItem.isOnlyAudio() ? "1" : "0"); + param.put("recv_stream_id", sendRtpItem.getReceiveStream()); + + JSONObject jsonObject = zlmServerFactory.startSendRtpTalk(mediaServer, param, null); + if (jsonObject == null || jsonObject.getInteger("code") != 0 ) { + log.error("启动监听TCP被动推流失败: {}, 参数:{}", jsonObject.getString("msg"), JSON.toJSONString(param)); + throw new ControllerException(jsonObject.getInteger("code"), jsonObject.getString("msg")); + } + log.info("调用ZLM-TCP被动推流接口, 结果: {}", jsonObject); + log.info("启动监听TCP被动推流成功[ {}/{} ],{}->{}:{}, " , sendRtpItem.getApp(), sendRtpItem.getStream(), + jsonObject.getString("local_port"), param.get("dst_url"), param.get("dst_port")); + return jsonObject.getInteger("local_port"); + } + @Override public Long updateDownloadProcess(MediaServer mediaServer, String app, String stream) { MediaInfo mediaInfo = getMediaInfo(mediaServer, app, stream); diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRESTfulUtils.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRESTfulUtils.java index 1219aa0c3..ba19e1421 100755 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRESTfulUtils.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRESTfulUtils.java @@ -331,6 +331,10 @@ public class ZLMRESTfulUtils { return sendPost(mediaServerItem, "startSendRtpPassive",param, callback); } + public JSONObject startSendRtpTalk(MediaServer mediaServerItem, Map param, RequestCallback callback) { + return sendPost(mediaServerItem, "startSendRtpTalk",param, callback); + } + public JSONObject stopSendRtp(MediaServer mediaServerItem, Map param) { return sendPost(mediaServerItem, "stopSendRtp",param, null); } diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMServerFactory.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMServerFactory.java index 401b3f5b8..9b643dd10 100755 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMServerFactory.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMServerFactory.java @@ -162,10 +162,14 @@ public class ZLMServerFactory { return zlmresTfulUtils.startSendRtpPassive(mediaServerItem, param); } - public JSONObject startSendRtpPassive(MediaServer mediaServerItem, Mapparam, ZLMRESTfulUtils.RequestCallback callback) { + public JSONObject startSendRtpPassive(MediaServer mediaServerItem, Map param, ZLMRESTfulUtils.RequestCallback callback) { return zlmresTfulUtils.startSendRtpPassive(mediaServerItem, param, callback); } + public JSONObject startSendRtpTalk(MediaServer mediaServer, Map param, ZLMRESTfulUtils.RequestCallback callback) { + return zlmresTfulUtils.startSendRtpTalk(mediaServer, param, callback); + } + /** * 查询待转推的流是否就绪 */ @@ -268,4 +272,6 @@ public class ZLMServerFactory { param.put("ssrc", sendRtpItem.getSsrc()); return zlmresTfulUtils.stopSendRtp(mediaServerItem, param); } + + } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServiceImpl.java index d78e684af..ed57ab4dd 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServiceImpl.java @@ -106,6 +106,12 @@ public class MediaServiceImpl implements IMediaService { result.setEnable_audio(true); return result; } + if ("jt_talk".equals(app) && stream.endsWith("_talk")) { + ResultForOnPublish result = new ResultForOnPublish(); + result.setEnable_mp4(false); + result.setEnable_audio(true); + return result; + } StreamProxy streamProxyItem = streamProxyService.getStreamProxyByAppAndStream(app, stream); if (streamProxyItem != null) { ResultForOnPublish result = new ResultForOnPublish(); diff --git a/web/src/api/jtDevice.js b/web/src/api/jtDevice.js index efb3b4fd3..0f1ef7876 100644 --- a/web/src/api/jtDevice.js +++ b/web/src/api/jtDevice.js @@ -340,5 +340,25 @@ export function shooting(data) { data: data }) } +export function startTalk({ phoneNumber, channelId }) { + return request({ + method: 'get', + url: '/api/jt1078/talk/start', + params: { + phoneNumber: phoneNumber, + channelId: channelId + } + }) +} +export function stopTalk({ phoneNumber, channelId }) { + return request({ + method: 'get', + url: '/api/jt1078/talk/stop', + params: { + phoneNumber: phoneNumber, + channelId: channelId + } + }) +} diff --git a/web/src/store/modules/jtDevice.js b/web/src/store/modules/jtDevice.js index 711e18578..65810b802 100644 --- a/web/src/store/modules/jtDevice.js +++ b/web/src/store/modules/jtDevice.js @@ -23,9 +23,9 @@ import { reset, sendTextMessage, setConfig, setPhoneBook, shooting, - startPlayback, + startPlayback, startTalk, stopPlay, - stopPlayback, + stopPlayback, stopTalk, telephoneCallback, update, updateChannel, @@ -372,6 +372,26 @@ const actions = { reject(error) }) }) + }, + startTalk({ commit }, param) { + return new Promise((resolve, reject) => { + startTalk(param).then(response => { + const { data } = response + resolve(data) + }).catch(error => { + reject(error) + }) + }) + }, + stopTalk({ commit }, param) { + return new Promise((resolve, reject) => { + stopTalk(param).then(response => { + const { data } = response + resolve(data) + }).catch(error => { + reject(error) + }) + }) } } diff --git a/web/src/views/dialog/devicePlayer.vue b/web/src/views/dialog/devicePlayer.vue index fa1c5b143..4bbe8702b 100755 --- a/web/src/views/dialog/devicePlayer.vue +++ b/web/src/views/dialog/devicePlayer.vue @@ -619,7 +619,7 @@ export default { // 获取推流鉴权Key this.$store.dispatch('user/getUserInfo') .then((data) => { - if (data == null) { + if (data === null) { this.broadcastStatus = -1 return } diff --git a/web/src/views/jtDevice/dialog/jtDevicePlayer.vue b/web/src/views/jtDevice/dialog/jtDevicePlayer.vue index eb79461d7..68fc2962a 100755 --- a/web/src/views/jtDevice/dialog/jtDevicePlayer.vue +++ b/web/src/views/jtDevice/dialog/jtDevicePlayer.vue @@ -327,7 +327,7 @@ -
+
{ - if (res.data.code === 0) { - const streamInfo = res.data.data.streamInfo + this.$store.dispatch('jtDevice/startTalk', { + phoneNumber: this.deviceId, + channelId: this.channelId + }).then(data => { + const streamInfo = data if (document.location.protocol.includes('https')) { this.startBroadcast(streamInfo.rtcs) } else { this.startBroadcast(streamInfo.rtc) } - } else { - this.$message({ - showClose: true, - message: res.data.msg, - type: 'error' - }) - } - }) + }).catch(error => { + this.$message.error(error) + this.broadcastStatus = -1 + }) } else if (this.broadcastStatus === 1) { this.broadcastStatus = -1 this.broadcastRtc.close() @@ -608,7 +603,7 @@ export default { // 获取推流鉴权Key this.$store.dispatch('user/getUserInfo') .then((data) => { - if (data == null) { + if (data === null) { this.broadcastStatus = -1 return } @@ -686,7 +681,10 @@ export default { stopBroadcast() { this.broadcastRtc.close() this.broadcastStatus = -1 - this.$store.dispatch('play/broadcastStop', [this.deviceId, this.channelId]) + this.$store.dispatch('jtDevice/stopTalk', { + phoneNumber: this.deviceId, + channelId: this.channelId + }) } } } diff --git a/数据库/2.7.4/初始化-mysql-2.7.4.sql b/数据库/2.7.4/初始化-mysql-2.7.4.sql index 3d25e5927..f8e3d625b 100644 --- a/数据库/2.7.4/初始化-mysql-2.7.4.sql +++ b/数据库/2.7.4/初始化-mysql-2.7.4.sql @@ -464,3 +464,41 @@ create table IF NOT EXISTS wvp_record_plan_item update_time character varying(50) ); +drop table IF EXISTS wvp_jt_terminal; +create table IF NOT EXISTS wvp_jt_terminal ( + id serial primary key, + phone_number character varying(50), + terminal_id character varying(50), + province_id character varying(50), + province_text character varying(100), + city_id character varying(50), + city_text character varying(100), + maker_id character varying(50), + model character varying(50), + plate_color character varying(50), + plate_no character varying(50), + authentication_code character varying(255), + longitude double precision, + latitude double precision, + status bool default false, + register_time character varying(50) default null, + update_time character varying(50) not null, + create_time character varying(50) not null, + geo_coord_sys character varying(50), + media_server_id character varying(50) default 'auto', + sdp_ip character varying(50), + constraint uk_jt_device_id_device_id unique (id, phone_number) +); + +drop table IF EXISTS wvp_jt_channel; +create table IF NOT EXISTS wvp_jt_channel ( + id serial primary key, + terminal_db_id integer, + channel_id integer, + has_audio bool default false, + name character varying(255), + update_time character varying(50) not null, + create_time character varying(50) not null, + constraint uk_jt_device_id_device_id unique (terminal_db_id, channel_id) +); + diff --git a/数据库/2.7.4/初始化-postgresql-kingbase-2.7.4.sql b/数据库/2.7.4/初始化-postgresql-kingbase-2.7.4.sql index 524fdbeff..b3e245e5f 100644 --- a/数据库/2.7.4/初始化-postgresql-kingbase-2.7.4.sql +++ b/数据库/2.7.4/初始化-postgresql-kingbase-2.7.4.sql @@ -465,3 +465,40 @@ create table IF NOT EXISTS wvp_record_plan_item update_time character varying(50) ); +drop table IF EXISTS wvp_jt_terminal; +create table IF NOT EXISTS wvp_jt_terminal ( + id serial primary key, + phone_number character varying(50), + terminal_id character varying(50), + province_id character varying(50), + province_text character varying(100), + city_id character varying(50), + city_text character varying(100), + maker_id character varying(50), + model character varying(50), + plate_color character varying(50), + plate_no character varying(50), + authentication_code character varying(255), + longitude double precision, + latitude double precision, + status bool default false, + register_time character varying(50) default null, + update_time character varying(50) not null, + create_time character varying(50) not null, + geo_coord_sys character varying(50), + media_server_id character varying(50) default 'auto', + sdp_ip character varying(50), + constraint uk_jt_device_id_device_id unique (id, phone_number) +); +drop table IF EXISTS wvp_jt_channel; +create table IF NOT EXISTS wvp_jt_channel ( + id serial primary key, + terminal_db_id integer, + channel_id integer, + has_audio bool default false, + name character varying(255), + update_time character varying(50) not null, + create_time character varying(50) not null, + constraint uk_jt_device_id_device_id unique (terminal_db_id, channel_id) +); + diff --git a/数据库/2.7.4/更新-mysql-2.7.4.sql b/数据库/2.7.4/更新-mysql-2.7.4.sql index 1129ddc1d..a14db8183 100644 --- a/数据库/2.7.4/更新-mysql-2.7.4.sql +++ b/数据库/2.7.4/更新-mysql-2.7.4.sql @@ -18,6 +18,9 @@ create table IF NOT EXISTS wvp_jt_terminal ( register_time character varying(50) default null, update_time character varying(50) not null, create_time character varying(50) not null, + geo_coord_sys character varying(50), + media_server_id character varying(50) default 'auto', + sdp_ip character varying(50), constraint uk_jt_device_id_device_id unique (id, phone_number) ); @@ -30,52 +33,6 @@ create table IF NOT EXISTS wvp_jt_channel ( name character varying(255), update_time character varying(50) not null, create_time character varying(50) not null, - gb_device_id character varying(255), - gb_name character varying(255), - gb_manufacturer character varying(255), - gb_model character varying(255), - gb_civil_code character varying(8), - gb_block character varying(255), - gb_address character varying(255), - gb_parental bool default false, - gb_parent_id character varying(255), - gb_register_way integer default 1, - gb_security_level_code character varying(255), - gb_secrecy integer default 0, - gb_ip_address character varying(255), - gb_port integer, - gb_password character varying(255), - gb_status bool default false, - gb_longitude double precision, - gb_latitude double precision, - gb_business_group_id character varying(255), - gb_ptz_type integer, - gb_photoelectric_imaging_type integer, - gb_capture_position_type integer, - gb_room_type integer, - gb_supply_light_type integer default 1, - gb_direction_type integer, - gb_resolution character varying(255), - gb_stream_number_list character varying(255), - gb_download_speed character varying(255), - gb_svc_space_support_mode integer, - gb_svc_time_support_mode integer, - gb_ssvc_ratio_support_list character varying(255), - gb_mobile_device_type integer, - gb_horizontal_field_angle double precision, - gb_vertical_field_angle double precision, - gb_max_view_distance double precision, - gb_grassroots_code character varying(255), - gb_point_type integer, - gb_point_common_name character varying(255), - gb_mac character varying(255), - gb_function_type character varying(255), - gb_encode_type character varying(255), - gb_install_time character varying(255), - gb_management_unit character varying(255), - gb_contact_info character varying(255), - gb_record_save_days character varying(255), - gb_industrial_classification character varying(255), constraint uk_jt_device_id_device_id unique (terminal_db_id, channel_id) ); diff --git a/数据库/2.7.4/更新-postgresql-kingbase-2.7.4.sql b/数据库/2.7.4/更新-postgresql-kingbase-2.7.4.sql index 616fdfa94..af7accf6a 100644 --- a/数据库/2.7.4/更新-postgresql-kingbase-2.7.4.sql +++ b/数据库/2.7.4/更新-postgresql-kingbase-2.7.4.sql @@ -18,6 +18,9 @@ create table IF NOT EXISTS wvp_jt_terminal ( register_time character varying(50) default null, update_time character varying(50) not null, create_time character varying(50) not null, + geo_coord_sys character varying(50), + media_server_id character varying(50) default 'auto', + sdp_ip character varying(50), constraint uk_jt_device_id_device_id unique (id, phone_number) ); drop table IF EXISTS wvp_jt_channel;