[1078] 临时提交

This commit is contained in:
lin
2025-07-09 16:16:04 +08:00
parent 48eff320fd
commit 3c6b715ce1
7 changed files with 63 additions and 34 deletions

View File

@@ -26,7 +26,7 @@ public class Jt808Encoder extends MessageToByteEncoder<Rs> {
List<ByteBuf> encodeList = Jt808EncoderCmd.encode(msg, session, session.nextSerialNo());
if(encodeList!=null && !encodeList.isEmpty()){
for (ByteBuf byteBuf : encodeList) {
log.debug("< {} hex:{}", session, ByteBufUtil.hexDump(byteBuf));
log.info("< {} hex:{}", session, ByteBufUtil.hexDump(byteBuf));
out.writeBytes(byteBuf);
}
}

View File

@@ -30,7 +30,7 @@ public class J0200 extends Re {
// JTPositionAdditionalInfo positionAdditionalInfo = new JTPositionAdditionalInfo();
// Map<Integer, byte[]> additionalMsg = new HashMap<>();
// getAdditionalMsg(buf, positionAdditionalInfo);
log.info("[JT-位置汇报]: {}", positionInfo.toString());
log.debug("[JT-位置汇报]: {}", positionInfo.toString());
return null;
}

View File

@@ -356,6 +356,16 @@ public class jt1078PlayServiceImpl implements Ijt1078PlayService {
@Override
public void playback(String phoneNumber, Integer channelId, String startTime, String endTime, Integer type,
Integer rate, Integer playbackType, Integer playbackSpeed, CommonCallback<WVPResult<StreamInfo>> callback) {
JTDevice device = jt1078Service.getDevice(phoneNumber);
if (device == null) {
throw new ControllerException(ErrorCode.ERROR100.getCode(), "设备不存在");
}
jt1078Template.checkTerminalStatus(phoneNumber);
JTChannel channel = jt1078Service.getChannel(device.getId(), channelId);
if (channel == null) {
throw new ControllerException(ErrorCode.ERROR100.getCode(), "通道不存在");
}
log.info("[JT-回放] 回放,设备:{} 通道: {} 开始时间: {} 结束时间: {} 音视频类型: {} 码流类型: {} " +
"回放方式: {} 快进或快退倍数: {}", phoneNumber, channelId, startTime, endTime, type, rate, playbackType, playbackSpeed);
// 检查流是否已经存在,存在则返回
@@ -365,24 +375,14 @@ public class jt1078PlayServiceImpl implements Ijt1078PlayService {
String logInfo = String.format("phoneNumber:%s, channelId:%s, startTime:%s, endTime:%s", phoneNumber, channelId, startTime, endTime);
StreamInfo streamInfo = (StreamInfo) redisTemplate.opsForValue().get(playbackKey);
if (streamInfo != null) {
MediaServer mediaServer = streamInfo.getMediaServer();
if (mediaServer != null) {
// 查询流是否存在,不存在则删除缓存数据
MediaInfo mediaInfo = mediaServerService.getMediaInfo(mediaServer, "1078", streamInfo.getStream());
if (mediaInfo != null) {
log.info("[JT-回放] 回放已经存在,直接返回, logInfo {}", logInfo);
for (CommonCallback<WVPResult<StreamInfo>> errorCallback : errorCallbacks) {
errorCallback.run(new WVPResult<>(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), streamInfo));
}
return;
}
}
mediaServerService.closeJTTServer(streamInfo.getMediaServer(), streamInfo.getStream(), null);
// 清理数据
redisTemplate.delete(playbackKey);
}
String startTimeParam = DateUtil.yyyy_MM_dd_HH_mm_ssTo1078(startTime);
String endTimeParam = DateUtil.yyyy_MM_dd_HH_mm_ssTo1078(endTime);
String stream = "jt_" + phoneNumber + "_" + channelId + "_" + startTimeParam + "_" + endTimeParam;
String app = "1078";
String stream = phoneNumber + "_" + channelId;
MediaServer mediaServer = mediaServerService.getMediaServerForMinimumLoad(null);
if (mediaServer == null) {
for (CommonCallback<WVPResult<StreamInfo>> errorCallback : errorCallbacks) {
@@ -391,7 +391,7 @@ public class jt1078PlayServiceImpl implements Ijt1078PlayService {
return;
}
// 设置hook监听
Hook hookSubscribe = Hook.getInstance(HookType.on_media_arrival, "1078", stream, mediaServer.getId());
Hook hookSubscribe = Hook.getInstance(HookType.on_media_arrival, app, stream, mediaServer.getId());
subscribe.addSubscribe(hookSubscribe, (hookData) -> {
dynamicTask.stop(playbackKey);
log.info("[JT-回放] 回放成功, logInfo {}", logInfo);
@@ -413,11 +413,12 @@ public class jt1078PlayServiceImpl implements Ijt1078PlayService {
errorCallback.run(new WVPResult<>(InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getCode(),
InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getMsg(), null));
}
mediaServerService.closeJTTServer(mediaServer, stream, null);
subscribe.removeSubscribe(hookSubscribe);
}, userSetting.getPlayTimeout());
// 开启收流端口
SSRCInfo ssrcInfo = mediaServerService.openJTTServer(mediaServer, stream, null, false, false, 1);
SSRCInfo ssrcInfo = mediaServerService.openJTTServer(mediaServer, stream, null, false, !channel.isHasAudio(), 1);
log.info("[JT-回放] logInfo {} 端口: {}", logInfo, ssrcInfo.getPort());
J9201 j9201 = new J9201();
j9201.setChannel(channelId);
@@ -443,11 +444,12 @@ public class jt1078PlayServiceImpl implements Ijt1078PlayService {
@Override
public void playbackControl(String phoneNumber, Integer channelId, Integer command, Integer playbackSpeed, String time) {
log.info("[JT-回放控制] phoneNumber {} channelId {} command {} playbackSpeed {} time {}",
phoneNumber, channelId, command, playbackSpeed, time);
long l = System.currentTimeMillis();
String playKey = VideoManagerConstants.INVITE_INFO_1078_PLAYBACK + phoneNumber + ":" + channelId;
dynamicTask.stop(playKey);
if (command == 2) {
log.info("[JT-停止回放] phoneNumber {} channelId {} command {} playbackSpeed {} time {}",
phoneNumber, channelId, command, playbackSpeed, time);
// 结束回放
StreamInfo streamInfo = (StreamInfo) redisTemplate.opsForValue().get(playKey);
// 删除缓存数据
@@ -465,18 +467,26 @@ public class jt1078PlayServiceImpl implements Ijt1078PlayService {
callback.run(new WVPResult<>(InviteErrorCode.ERROR_FOR_FINISH.getCode(), InviteErrorCode.ERROR_FOR_FINISH.getMsg(), null));
}
}
}else {
log.info("[JT-回放控制] phoneNumber {} channelId {} command {} playbackSpeed {} time {}",
phoneNumber, channelId, command, playbackSpeed, time);
}
System.out.println("清理回调 " + (System.currentTimeMillis() - l));
l = System.currentTimeMillis();
// 发送停止命令
J9202 j9202 = new J9202();
j9202.setChannel(channelId);
j9202.setPlaybackType(command);
if (playbackSpeed != null) {
j9202.setPlaybackSpeed(playbackSpeed);
}
if (!ObjectUtils.isEmpty(time)) {
j9202.setPlaybackTime(DateUtil.yyyy_MM_dd_HH_mm_ssTo1078(time));
}
jt1078Template.controlBackLive(phoneNumber, j9202, 6);
jt1078Template.controlBackLive(phoneNumber, j9202, 4);
System.out.println("发送指令 " + (System.currentTimeMillis() - l));
}
@Override

View File

@@ -2,6 +2,7 @@ package com.genersoft.iot.vmp.jt1078.session;
import com.genersoft.iot.vmp.jt1078.proc.Header;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.util.AttributeKey;
import lombok.extern.slf4j.Slf4j;
@@ -44,7 +45,9 @@ public class Session {
public void writeObject(Object message) {
log.info("<<<<<<<<<< cmd{},{}", this, message);
channel.writeAndFlush(message);
System.out.println(message);
ChannelFuture channelFuture = channel.writeAndFlush(message);
System.out.println(222);
}
/**

View File

@@ -65,14 +65,14 @@ public class ABLMediaNodeServerService implements IMediaNodeServerService {
}
@Override
public void closeRtpServer(MediaServer serverItem, String streamId, CommonCallback<Boolean> callback) {
if (serverItem == null) {
public void closeRtpServer(MediaServer mediaServer, String streamId, CommonCallback<Boolean> callback) {
if (mediaServer == null) {
return;
}
Map<String, Object> param = new HashMap<>();
param.put("stream_id", streamId);
param.put("force", 1);
JSONObject jsonObject = ablresTfulUtils.closeStreams(serverItem, "rtp", streamId);
JSONObject jsonObject = ablresTfulUtils.closeStreams(mediaServer, "rtp", streamId);
logger.info("关闭RTP Server " + jsonObject);
if (jsonObject != null ) {
if (jsonObject.getInteger("code") != 0) {
@@ -92,7 +92,19 @@ public class ABLMediaNodeServerService implements IMediaNodeServerService {
@Override
public void closeJTTServer(MediaServer mediaServer, String streamId, CommonCallback<Boolean> callback) {
closeRtpServer(mediaServer, streamId, callback);
if (mediaServer == null) {
return;
}
JSONObject jsonObject = ablresTfulUtils.closeStreams(mediaServer, "1078", streamId);
logger.info("关闭RTP Server " + jsonObject);
if (jsonObject != null ) {
if (jsonObject.getInteger("code") != 0) {
logger.error("[closeRtpServer] 失败: " + jsonObject.getString("msg"));
}
}else {
// 检查ZLM状态
logger.error("[closeRtpServer] 失败: 请检查ZLM服务");
}
}
@Override

View File

@@ -301,7 +301,7 @@ public class MediaServerServiceImpl implements IMediaServerService {
log.info("[closeJTTServer] 失败, mediaServer的类型 {},未找到对应的实现类", mediaServer.getType());
return;
}
mediaNodeServerService.closeRtpServer(mediaServer, streamId, callback);
mediaNodeServerService.closeJTTServer(mediaServer, streamId, callback);
}
@Override

View File

@@ -119,7 +119,9 @@ public class ZLMServerFactory {
public void closeRtpServer(MediaServer serverItem, String streamId, CommonCallback<Boolean> callback) {
if (serverItem == null) {
callback.run(false);
if (callback != null) {
callback.run(false);
}
return;
}
Map<String, Object> param = new HashMap<>();
@@ -127,7 +129,9 @@ public class ZLMServerFactory {
zlmresTfulUtils.closeRtpServer(serverItem, param, jsonObject -> {
if (jsonObject != null ) {
if (jsonObject.getInteger("code") == 0) {
callback.run(jsonObject.getInteger("hit") == 1);
if (callback != null) {
callback.run(jsonObject.getInteger("hit") == 1);
}
return;
}else {
log.error("关闭RTP Server 失败: " + jsonObject.getString("msg"));
@@ -136,10 +140,10 @@ public class ZLMServerFactory {
// 检查ZLM状态
log.error("关闭RTP Server 失败: 请检查ZLM服务");
}
callback.run(false);
if (callback != null) {
callback.run(false);
}
});
}