1078-优化点播以及通道表结构

This commit is contained in:
648540858
2024-06-08 00:46:08 +08:00
parent 6ae623d639
commit 595d9be283
10 changed files with 85 additions and 39 deletions

View File

@@ -20,7 +20,7 @@ public class JTChannel {
* 设备的数据库ID
*/
@Schema(description = "设备的数据库ID")
private int terminalId;
private int terminalDbId;
/**
* 通道ID
@@ -41,6 +41,9 @@ public class JTChannel {
@Schema(description = "更新时间")
private String updateTime;
@Schema(description = "流信息")
private String stream;
public int getId() {
return id;
}
@@ -57,12 +60,12 @@ public class JTChannel {
this.name = name;
}
public int getTerminalId() {
return terminalId;
public int getTerminalDbId() {
return terminalDbId;
}
public void setTerminalId(int terminalId) {
this.terminalId = terminalId;
public void setTerminalDbId(int terminalDbId) {
this.terminalDbId = terminalDbId;
}
public String getCreateTime() {
@@ -97,12 +100,20 @@ public class JTChannel {
this.hasAudio = hasAudio;
}
public String getStream() {
return stream;
}
public void setStream(String stream) {
this.stream = stream;
}
@Override
public String toString() {
return "JTChannel{" +
"id=" + id +
", name='" + name + '\'' +
", terminalId=" + terminalId +
", terminalDbId=" + terminalDbId +
", channelId=" + channelId +
", createTime='" + createTime + '\'' +
", updateTime='" + updateTime + '\'' +

View File

@@ -14,18 +14,18 @@ public interface JTChannelMapper {
"from " +
"wvp_jt_channel jc " +
"WHERE " +
"terminal_id = #{terminalId}" +
"terminal_db_id = #{terminalDbId}" +
" <if test='query != null'> AND " +
"jc.name LIKE concat('%',#{query},'%') " +
"</if> " +
"ORDER BY jc.channel_id " +
" </script>"})
List<JTChannel> getAll(@Param("terminalId") int terminalId, @Param("query") String query);
List<JTChannel> getAll(@Param("terminalDbId") int terminalDbId, @Param("query") String query);
@Update(value = {" <script>" +
"UPDATE wvp_jt_channel " +
"SET update_time=#{updateTime}" +
"<if test=\"terminalId != null\">, terminal_id=#{terminalId}</if>" +
"<if test=\"terminalDbId != null\">, terminal_db_id=#{terminalDbId}</if>" +
"<if test=\"hasAudio != null\">, has_audio=#{hasAudio}</if>" +
"<if test=\"name != null\">, name=#{name}</if>" +
"<if test=\"channelId != null\">, channel_id=#{channelId}</if>" +
@@ -34,14 +34,14 @@ public interface JTChannelMapper {
void update(JTChannel channel);
@Insert("INSERT INTO wvp_jt_channel (" +
"terminal_id,"+
"terminal_db_id,"+
"channel_id,"+
"name,"+
"has_audio,"+
"create_time,"+
"update_time"+
") VALUES (" +
"#{terminalId}," +
"#{terminalDbId}," +
"#{channelId}," +
"#{name}," +
"#{hasAudio}," +
@@ -53,4 +53,13 @@ public interface JTChannelMapper {
@Delete("delete from wvp_jt_channel where id = #{id}")
void delete(@Param("id") int id);
@Select(value = {" <script>" +
"SELECT * " +
"from " +
"wvp_jt_channel jc " +
"WHERE " +
"terminal_db_id = #{terminalDbId} and channel_id = #{channelId}" +
" </script>"})
JTChannel getChannel(@Param("terminalDbId") int terminalDbId, @Param("channelId") Integer channelId);
}

View File

@@ -19,6 +19,7 @@ import com.genersoft.iot.vmp.jt1078.event.FtpUploadEvent;
import com.genersoft.iot.vmp.jt1078.proc.request.J1205;
import com.genersoft.iot.vmp.jt1078.proc.response.*;
import com.genersoft.iot.vmp.jt1078.service.Ijt1078Service;
import com.genersoft.iot.vmp.jt1078.session.SessionManager;
import com.genersoft.iot.vmp.media.bean.MediaInfo;
import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.media.event.hook.Hook;
@@ -136,7 +137,13 @@ public class jt1078ServiceImpl implements Ijt1078Service {
@Override
public void play(String phoneNumber, String channelId, int type, GeneralCallback<StreamInfo> callback) {
JTDevice device = getDevice(phoneNumber);
if (device == null) {
throw new ControllerException(ErrorCode.ERROR100.getCode(), "设备不存在");
}
if (SessionManager.INSTANCE.get(phoneNumber) == null) {
throw new ControllerException(ErrorCode.ERROR100.getCode(), "设备离线");
}
// 检查流是否已经存在,存在则返回
String playKey = VideoManagerConstants.INVITE_INFO_1078_PLAY + phoneNumber + ":" + channelId;
List<GeneralCallback<StreamInfo>> errorCallbacks = inviteErrorCallbackMap.computeIfAbsent(playKey, k -> new ArrayList<>());
@@ -149,14 +156,11 @@ public class jt1078ServiceImpl implements Ijt1078Service {
// 查询流是否存在,不存在则删除缓存数据
JSONObject mediaInfo = zlmresTfulUtils.getMediaInfo(mediaServer, "rtp", "rtsp", streamInfo.getStream());
if (mediaInfo != null && mediaInfo.getInteger("code") == 0) {
Boolean online = mediaInfo.getBoolean("online");
if (online != null && online) {
logger.info("[1078-点播] 点播已经存在,直接返回, phoneNumber {} channelId {}", phoneNumber, channelId);
for (GeneralCallback<StreamInfo> errorCallback : errorCallbacks) {
errorCallback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), streamInfo);
}
return;
logger.info("[1078-点播] 点播已经存在,直接返回, phoneNumber {} channelId {}", phoneNumber, channelId);
for (GeneralCallback<StreamInfo> errorCallback : errorCallbacks) {
errorCallback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), streamInfo);
}
return;
}
}
// 清理数据
@@ -174,7 +178,7 @@ public class jt1078ServiceImpl implements Ijt1078Service {
Hook hook = Hook.getInstance(HookType.on_media_arrival, "rtp", stream, mediaServer.getId());
subscribe.addSubscribe(hook, (hookData) -> {
dynamicTask.stop(playKey);
logger.info("[1078-点播] 点播成功, phoneNumber {} channelId {}", phoneNumber, channelId);
logger.info("[1078-点播] 点播成功, 手机号: {} 通道 {}", phoneNumber, channelId);
// TODO 发送9105 实时音视频传输状态通知, 通知丢包率
StreamInfo info = onPublishHandler(mediaServer, hookData, phoneNumber, channelId);
@@ -184,6 +188,8 @@ public class jt1078ServiceImpl implements Ijt1078Service {
subscribe.removeSubscribe(hook);
redisTemplate.opsForValue().set(playKey, info);
});
// 开启收流端口
SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServer, stream, "000", false, false, 0, false, false, false, 1);
// 设置超时监听
dynamicTask.startDelay(playKey, () -> {
logger.info("[1078-点播] 超时, phoneNumber {} channelId {}", phoneNumber, channelId);
@@ -191,11 +197,10 @@ public class jt1078ServiceImpl implements Ijt1078Service {
errorCallback.run(InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getCode(),
InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getMsg(), null);
}
mediaServerService.closeRTPServer(mediaServer, stream);
subscribe.removeSubscribe(hook);
}, userSetting.getPlayTimeout());
// 开启收流端口
SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServer, stream, null, false, false, 0, false, false, false, 1);
logger.info("[1078-点播] phoneNumber {} channelId {} 端口: {}", phoneNumber, channelId, ssrcInfo.getPort());
J9101 j9101 = new J9101();
j9101.setChannel(Integer.valueOf(channelId));
@@ -204,9 +209,7 @@ public class jt1078ServiceImpl implements Ijt1078Service {
j9101.setTcpPort(ssrcInfo.getPort());
j9101.setUdpPort(ssrcInfo.getPort());
j9101.setType(type);
Object s = jt1078Template.startLive(phoneNumber, j9101, 6);
System.out.println("ssss=== " + s);
jt1078Template.startLive(phoneNumber, j9101, 6);
}
public StreamInfo onPublishHandler(MediaServer mediaServerItem, HookData hookData, String phoneNumber, String channelId) {
@@ -1011,8 +1014,21 @@ public class jt1078ServiceImpl implements Ijt1078Service {
@Override
public PageInfo<JTChannel> getChannelList(int page, int count, int deviceId, String query) {
JTDevice device = getDeviceById(deviceId);
if (device == null) {
throw new ControllerException(ErrorCode.ERROR100.getCode(), "设备不存在");
}
PageHelper.startPage(page, count);
List<JTChannel> all = jtChannelMapper.getAll(deviceId, query);
PageInfo<JTChannel> jtChannelPageInfo = new PageInfo<>(all);
for (JTChannel jtChannel : jtChannelPageInfo.getList()) {
String playKey = VideoManagerConstants.INVITE_INFO_1078_PLAY + device.getPhoneNumber() + ":" + jtChannel.getChannelId();
StreamInfo streamInfo = (StreamInfo) redisTemplate.opsForValue().get(playKey);
if (streamInfo != null) {
jtChannel.setStream(streamInfo.getStream());
}
}
return new PageInfo<>(all);
}
@@ -1024,6 +1040,10 @@ public class jt1078ServiceImpl implements Ijt1078Service {
@Override
public void addChannel(JTChannel channel) {
JTChannel channelInDb = jtChannelMapper.getChannel(channel.getTerminalDbId(), channel.getChannelId());
if (channelInDb != null) {
throw new ControllerException(ErrorCode.ERROR100.getCode(), "通道已存在");
}
channel.setCreateTime(DateUtil.getNow());
channel.setUpdateTime(DateUtil.getNow());
jtChannelMapper.add(channel);