From 3c6b715ce11b5fd4e186489765c28e01fc12ddfe Mon Sep 17 00:00:00 2001 From: lin <648540858@qq.com> Date: Wed, 9 Jul 2025 16:16:04 +0800 Subject: [PATCH] =?UTF-8?q?[1078]=20=E4=B8=B4=E6=97=B6=E6=8F=90=E4=BA=A4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../vmp/jt1078/codec/encode/Jt808Encoder.java | 2 +- .../iot/vmp/jt1078/proc/request/J0200.java | 2 +- .../service/impl/jt1078PlayServiceImpl.java | 52 +++++++++++-------- .../iot/vmp/jt1078/session/Session.java | 5 +- .../media/abl/ABLMediaNodeServerService.java | 20 +++++-- .../service/impl/MediaServerServiceImpl.java | 2 +- .../iot/vmp/media/zlm/ZLMServerFactory.java | 14 +++-- 7 files changed, 63 insertions(+), 34 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/jt1078/codec/encode/Jt808Encoder.java b/src/main/java/com/genersoft/iot/vmp/jt1078/codec/encode/Jt808Encoder.java index 6868f642a..949177db6 100644 --- a/src/main/java/com/genersoft/iot/vmp/jt1078/codec/encode/Jt808Encoder.java +++ b/src/main/java/com/genersoft/iot/vmp/jt1078/codec/encode/Jt808Encoder.java @@ -26,7 +26,7 @@ public class Jt808Encoder extends MessageToByteEncoder { List encodeList = Jt808EncoderCmd.encode(msg, session, session.nextSerialNo()); if(encodeList!=null && !encodeList.isEmpty()){ for (ByteBuf byteBuf : encodeList) { - log.debug("< {} hex:{}", session, ByteBufUtil.hexDump(byteBuf)); + log.info("< {} hex:{}", session, ByteBufUtil.hexDump(byteBuf)); out.writeBytes(byteBuf); } } diff --git a/src/main/java/com/genersoft/iot/vmp/jt1078/proc/request/J0200.java b/src/main/java/com/genersoft/iot/vmp/jt1078/proc/request/J0200.java index 85f7b4725..30bc973cc 100644 --- a/src/main/java/com/genersoft/iot/vmp/jt1078/proc/request/J0200.java +++ b/src/main/java/com/genersoft/iot/vmp/jt1078/proc/request/J0200.java @@ -30,7 +30,7 @@ public class J0200 extends Re { // JTPositionAdditionalInfo positionAdditionalInfo = new JTPositionAdditionalInfo(); // Map additionalMsg = new HashMap<>(); // getAdditionalMsg(buf, positionAdditionalInfo); - log.info("[JT-位置汇报]: {}", positionInfo.toString()); + log.debug("[JT-位置汇报]: {}", positionInfo.toString()); return null; } diff --git a/src/main/java/com/genersoft/iot/vmp/jt1078/service/impl/jt1078PlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/jt1078/service/impl/jt1078PlayServiceImpl.java index 392c2da11..dc1de0234 100644 --- a/src/main/java/com/genersoft/iot/vmp/jt1078/service/impl/jt1078PlayServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/jt1078/service/impl/jt1078PlayServiceImpl.java @@ -356,6 +356,16 @@ public class jt1078PlayServiceImpl implements Ijt1078PlayService { @Override public void playback(String phoneNumber, Integer channelId, String startTime, String endTime, Integer type, Integer rate, Integer playbackType, Integer playbackSpeed, CommonCallback> callback) { + JTDevice device = jt1078Service.getDevice(phoneNumber); + if (device == null) { + throw new ControllerException(ErrorCode.ERROR100.getCode(), "设备不存在"); + } + jt1078Template.checkTerminalStatus(phoneNumber); + JTChannel channel = jt1078Service.getChannel(device.getId(), channelId); + if (channel == null) { + throw new ControllerException(ErrorCode.ERROR100.getCode(), "通道不存在"); + } + log.info("[JT-回放] 回放,设备:{}, 通道: {}, 开始时间: {}, 结束时间: {}, 音视频类型: {}, 码流类型: {}, " + "回放方式: {}, 快进或快退倍数: {}", phoneNumber, channelId, startTime, endTime, type, rate, playbackType, playbackSpeed); // 检查流是否已经存在,存在则返回 @@ -365,24 +375,14 @@ public class jt1078PlayServiceImpl implements Ijt1078PlayService { String logInfo = String.format("phoneNumber:%s, channelId:%s, startTime:%s, endTime:%s", phoneNumber, channelId, startTime, endTime); StreamInfo streamInfo = (StreamInfo) redisTemplate.opsForValue().get(playbackKey); if (streamInfo != null) { - MediaServer mediaServer = streamInfo.getMediaServer(); - if (mediaServer != null) { - // 查询流是否存在,不存在则删除缓存数据 - MediaInfo mediaInfo = mediaServerService.getMediaInfo(mediaServer, "1078", streamInfo.getStream()); - if (mediaInfo != null) { - log.info("[JT-回放] 回放已经存在,直接返回, logInfo: {}", logInfo); - for (CommonCallback> errorCallback : errorCallbacks) { - errorCallback.run(new WVPResult<>(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), streamInfo)); - } - return; - } - } + + mediaServerService.closeJTTServer(streamInfo.getMediaServer(), streamInfo.getStream(), null); // 清理数据 redisTemplate.delete(playbackKey); } - String startTimeParam = DateUtil.yyyy_MM_dd_HH_mm_ssTo1078(startTime); - String endTimeParam = DateUtil.yyyy_MM_dd_HH_mm_ssTo1078(endTime); - String stream = "jt_" + phoneNumber + "_" + channelId + "_" + startTimeParam + "_" + endTimeParam; + + String app = "1078"; + String stream = phoneNumber + "_" + channelId; MediaServer mediaServer = mediaServerService.getMediaServerForMinimumLoad(null); if (mediaServer == null) { for (CommonCallback> errorCallback : errorCallbacks) { @@ -391,7 +391,7 @@ public class jt1078PlayServiceImpl implements Ijt1078PlayService { return; } // 设置hook监听 - Hook hookSubscribe = Hook.getInstance(HookType.on_media_arrival, "1078", stream, mediaServer.getId()); + Hook hookSubscribe = Hook.getInstance(HookType.on_media_arrival, app, stream, mediaServer.getId()); subscribe.addSubscribe(hookSubscribe, (hookData) -> { dynamicTask.stop(playbackKey); log.info("[JT-回放] 回放成功, logInfo: {}", logInfo); @@ -413,11 +413,12 @@ public class jt1078PlayServiceImpl implements Ijt1078PlayService { errorCallback.run(new WVPResult<>(InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getCode(), InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getMsg(), null)); } - + mediaServerService.closeJTTServer(mediaServer, stream, null); + subscribe.removeSubscribe(hookSubscribe); }, userSetting.getPlayTimeout()); // 开启收流端口 - SSRCInfo ssrcInfo = mediaServerService.openJTTServer(mediaServer, stream, null, false, false, 1); + SSRCInfo ssrcInfo = mediaServerService.openJTTServer(mediaServer, stream, null, false, !channel.isHasAudio(), 1); log.info("[JT-回放] logInfo: {}, 端口: {}", logInfo, ssrcInfo.getPort()); J9201 j9201 = new J9201(); j9201.setChannel(channelId); @@ -443,11 +444,12 @@ public class jt1078PlayServiceImpl implements Ijt1078PlayService { @Override public void playbackControl(String phoneNumber, Integer channelId, Integer command, Integer playbackSpeed, String time) { - log.info("[JT-回放控制] phoneNumber: {}, channelId: {}, command: {}, playbackSpeed: {}, time: {}", - phoneNumber, channelId, command, playbackSpeed, time); + long l = System.currentTimeMillis(); String playKey = VideoManagerConstants.INVITE_INFO_1078_PLAYBACK + phoneNumber + ":" + channelId; dynamicTask.stop(playKey); if (command == 2) { + log.info("[JT-停止回放] phoneNumber: {}, channelId: {}, command: {}, playbackSpeed: {}, time: {}", + phoneNumber, channelId, command, playbackSpeed, time); // 结束回放 StreamInfo streamInfo = (StreamInfo) redisTemplate.opsForValue().get(playKey); // 删除缓存数据 @@ -465,18 +467,26 @@ public class jt1078PlayServiceImpl implements Ijt1078PlayService { callback.run(new WVPResult<>(InviteErrorCode.ERROR_FOR_FINISH.getCode(), InviteErrorCode.ERROR_FOR_FINISH.getMsg(), null)); } } + }else { + log.info("[JT-回放控制] phoneNumber: {}, channelId: {}, command: {}, playbackSpeed: {}, time: {}", + phoneNumber, channelId, command, playbackSpeed, time); } + System.out.println("清理回调 " + (System.currentTimeMillis() - l)); + l = System.currentTimeMillis(); // 发送停止命令 J9202 j9202 = new J9202(); j9202.setChannel(channelId); j9202.setPlaybackType(command); + if (playbackSpeed != null) { j9202.setPlaybackSpeed(playbackSpeed); + } if (!ObjectUtils.isEmpty(time)) { j9202.setPlaybackTime(DateUtil.yyyy_MM_dd_HH_mm_ssTo1078(time)); } - jt1078Template.controlBackLive(phoneNumber, j9202, 6); + jt1078Template.controlBackLive(phoneNumber, j9202, 4); + System.out.println("发送指令 " + (System.currentTimeMillis() - l)); } @Override diff --git a/src/main/java/com/genersoft/iot/vmp/jt1078/session/Session.java b/src/main/java/com/genersoft/iot/vmp/jt1078/session/Session.java index 392f911d5..9faea3b74 100644 --- a/src/main/java/com/genersoft/iot/vmp/jt1078/session/Session.java +++ b/src/main/java/com/genersoft/iot/vmp/jt1078/session/Session.java @@ -2,6 +2,7 @@ package com.genersoft.iot.vmp.jt1078.session; import com.genersoft.iot.vmp.jt1078.proc.Header; import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; import io.netty.util.AttributeKey; import lombok.extern.slf4j.Slf4j; @@ -44,7 +45,9 @@ public class Session { public void writeObject(Object message) { log.info("<<<<<<<<<< cmd{},{}", this, message); - channel.writeAndFlush(message); + System.out.println(message); + ChannelFuture channelFuture = channel.writeAndFlush(message); + System.out.println(222); } /** diff --git a/src/main/java/com/genersoft/iot/vmp/media/abl/ABLMediaNodeServerService.java b/src/main/java/com/genersoft/iot/vmp/media/abl/ABLMediaNodeServerService.java index 6ec567b77..12ea96280 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/abl/ABLMediaNodeServerService.java +++ b/src/main/java/com/genersoft/iot/vmp/media/abl/ABLMediaNodeServerService.java @@ -65,14 +65,14 @@ public class ABLMediaNodeServerService implements IMediaNodeServerService { } @Override - public void closeRtpServer(MediaServer serverItem, String streamId, CommonCallback callback) { - if (serverItem == null) { + public void closeRtpServer(MediaServer mediaServer, String streamId, CommonCallback callback) { + if (mediaServer == null) { return; } Map param = new HashMap<>(); param.put("stream_id", streamId); param.put("force", 1); - JSONObject jsonObject = ablresTfulUtils.closeStreams(serverItem, "rtp", streamId); + JSONObject jsonObject = ablresTfulUtils.closeStreams(mediaServer, "rtp", streamId); logger.info("关闭RTP Server " + jsonObject); if (jsonObject != null ) { if (jsonObject.getInteger("code") != 0) { @@ -92,7 +92,19 @@ public class ABLMediaNodeServerService implements IMediaNodeServerService { @Override public void closeJTTServer(MediaServer mediaServer, String streamId, CommonCallback callback) { - closeRtpServer(mediaServer, streamId, callback); + if (mediaServer == null) { + return; + } + JSONObject jsonObject = ablresTfulUtils.closeStreams(mediaServer, "1078", streamId); + logger.info("关闭RTP Server " + jsonObject); + if (jsonObject != null ) { + if (jsonObject.getInteger("code") != 0) { + logger.error("[closeRtpServer] 失败: " + jsonObject.getString("msg")); + } + }else { + // 检查ZLM状态 + logger.error("[closeRtpServer] 失败: 请检查ZLM服务"); + } } @Override diff --git a/src/main/java/com/genersoft/iot/vmp/media/service/impl/MediaServerServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/media/service/impl/MediaServerServiceImpl.java index bd105c1b7..0cd6a9609 100755 --- a/src/main/java/com/genersoft/iot/vmp/media/service/impl/MediaServerServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/media/service/impl/MediaServerServiceImpl.java @@ -301,7 +301,7 @@ public class MediaServerServiceImpl implements IMediaServerService { log.info("[closeJTTServer] 失败, mediaServer的类型: {},未找到对应的实现类", mediaServer.getType()); return; } - mediaNodeServerService.closeRtpServer(mediaServer, streamId, callback); + mediaNodeServerService.closeJTTServer(mediaServer, streamId, callback); } @Override diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMServerFactory.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMServerFactory.java index 4b4ba2e4e..733146382 100755 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMServerFactory.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMServerFactory.java @@ -119,7 +119,9 @@ public class ZLMServerFactory { public void closeRtpServer(MediaServer serverItem, String streamId, CommonCallback callback) { if (serverItem == null) { - callback.run(false); + if (callback != null) { + callback.run(false); + } return; } Map param = new HashMap<>(); @@ -127,7 +129,9 @@ public class ZLMServerFactory { zlmresTfulUtils.closeRtpServer(serverItem, param, jsonObject -> { if (jsonObject != null ) { if (jsonObject.getInteger("code") == 0) { - callback.run(jsonObject.getInteger("hit") == 1); + if (callback != null) { + callback.run(jsonObject.getInteger("hit") == 1); + } return; }else { log.error("关闭RTP Server 失败: " + jsonObject.getString("msg")); @@ -136,10 +140,10 @@ public class ZLMServerFactory { // 检查ZLM状态 log.error("关闭RTP Server 失败: 请检查ZLM服务"); } - callback.run(false); + if (callback != null) { + callback.run(false); + } }); - - }