[1078] 适配新的数据库结构

This commit is contained in:
lin
2025-03-24 17:53:43 +08:00
parent fd5079a652
commit f0eb101130
17 changed files with 216 additions and 972 deletions

View File

@@ -8,7 +8,9 @@ public enum ChannelDataType {
GB28181(1,"国标28181"),
STREAM_PUSH(2,"推流设备"),
STREAM_PROXY(3,"拉流代理");
STREAM_PROXY(3,"拉流代理"),
JT_1078(200,"部标设备"),
;
public final int value;

View File

@@ -144,9 +144,6 @@ public class CommonGBChannel {
@Schema(description = "关联的设备ID")
private Integer dataDeviceId;
@Schema(description = "关联的部标标通道ID")
private Integer jtChannelId;
@Schema(description = "创建时间")
private String createTime;

View File

@@ -1,5 +1,6 @@
package com.genersoft.iot.vmp.jt1078.bean;
import com.genersoft.iot.vmp.common.enums.ChannelDataType;
import com.genersoft.iot.vmp.gb28181.bean.CommonGBChannel;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data;
@@ -38,6 +39,8 @@ public class JTChannel extends CommonGBChannel {
@Schema(description = "流信息")
private String stream;
private Integer dataType = ChannelDataType.JT_1078.value;
@Override
public String toString() {
return "JTChannel{" +
@@ -58,7 +61,6 @@ public class JTChannel extends CommonGBChannel {
if (ObjectUtils.isEmpty(this.getGbName())) {
this.setGbName(this.getName());
}
this.setJtChannelId(this.getId());
return this;
}

View File

@@ -1,5 +1,6 @@
package com.genersoft.iot.vmp.jt1078.controller;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.conf.ftpServer.FtpSetting;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.exception.ControllerException;
@@ -96,11 +97,12 @@ public class JT1078Controller {
jt1078PlayService.stopPlay(phoneNumber, channelId);
});
jt1078PlayService.play(phoneNumber, channelId, type, (code, msg, streamInfo) -> {
WVPResult<StreamContent> wvpResult = new WVPResult<>();
if (code == InviteErrorCode.SUCCESS.getCode()) {
wvpResult.setCode(ErrorCode.SUCCESS.getCode());
wvpResult.setMsg(ErrorCode.SUCCESS.getMsg());
jt1078PlayService.play(phoneNumber, channelId, type, wvpResult -> {
WVPResult<StreamContent> wvpResultForFinish = new WVPResult<>();
wvpResultForFinish.setCode(wvpResult.getCode());
wvpResultForFinish.setMsg(wvpResult.getMsg());
if (wvpResult.getCode() == InviteErrorCode.SUCCESS.getCode()) {
StreamInfo streamInfo = wvpResult.getData();
if (streamInfo != null) {
if (userSetting.getUseSourceIpAsStreamIp()) {
@@ -114,16 +116,10 @@ public class JT1078Controller {
}
streamInfo.changeStreamIp(host);
}
wvpResult.setData(new StreamContent(streamInfo));
}else {
wvpResult.setCode(code);
wvpResult.setMsg(msg);
wvpResultForFinish.setData(new StreamContent(streamInfo));
}
}else {
wvpResult.setCode(code);
wvpResult.setMsg(msg);
}
result.setResult(wvpResult);
result.setResult(wvpResultForFinish);
});
return result;
@@ -165,15 +161,16 @@ public class JT1078Controller {
jt1078PlayService.stopPlay(phoneNumber, channelId);
});
jt1078PlayService.startTalk(phoneNumber, channelId, app, stream, mediaServerId, onlySend, (code, msg, streamInfo) -> {
WVPResult<StreamContent> wvpResult = new WVPResult<>();
if (code == InviteErrorCode.SUCCESS.getCode()) {
wvpResult.setCode(ErrorCode.SUCCESS.getCode());
wvpResult.setMsg(ErrorCode.SUCCESS.getMsg());
jt1078PlayService.startTalk(phoneNumber, channelId, app, stream, mediaServerId, onlySend, wvpResult -> {
WVPResult<StreamContent> wvpResultForFinish = new WVPResult<>();
wvpResultForFinish.setCode(wvpResult.getCode());
wvpResultForFinish.setMsg(wvpResult.getMsg());
if (wvpResult.getCode() == InviteErrorCode.SUCCESS.getCode()) {
if (streamInfo != null) {
if (wvpResult.getData() != null) {
StreamInfo streamInfo = wvpResult.getData();
if (userSetting.getUseSourceIpAsStreamIp()) {
streamInfo=streamInfo.clone();//深拷贝
streamInfo = wvpResult.getData().clone();//深拷贝
String host;
try {
URL url=new URL(request.getRequestURL().toString());
@@ -183,16 +180,10 @@ public class JT1078Controller {
}
streamInfo.changeStreamIp(host);
}
wvpResult.setData(new StreamContent(streamInfo));
}else {
wvpResult.setCode(code);
wvpResult.setMsg(msg);
wvpResultForFinish.setData(new StreamContent(streamInfo));
}
}else {
wvpResult.setCode(code);
wvpResult.setMsg(msg);
}
result.setResult(wvpResult);
result.setResult(wvpResultForFinish);
});
return result;
@@ -293,15 +284,14 @@ public class JT1078Controller {
jt1078PlayService.stopPlay(phoneNumber, channelId);
});
jt1078PlayService.playback(phoneNumber, channelId, startTime, endTime,type, rate, playbackType, playbackSpeed, (code, msg, streamInfo) -> {
WVPResult<StreamContent> wvpResult = new WVPResult<>();
if (code == InviteErrorCode.SUCCESS.getCode()) {
wvpResult.setCode(ErrorCode.SUCCESS.getCode());
wvpResult.setMsg(ErrorCode.SUCCESS.getMsg());
jt1078PlayService.playback(phoneNumber, channelId, startTime, endTime,type, rate, playbackType, playbackSpeed, wvpResult -> {
WVPResult<StreamContent> wvpResultForFinish = new WVPResult<>();
wvpResultForFinish.setCode(wvpResult.getCode());
wvpResultForFinish.setMsg(wvpResult.getMsg());
if (wvpResult.getCode() == InviteErrorCode.SUCCESS.getCode()) {
StreamInfo streamInfo = wvpResult.getData();
if (streamInfo != null) {
if (userSetting.getUseSourceIpAsStreamIp()) {
streamInfo=streamInfo.clone();//深拷贝
String host;
try {
URL url=new URL(request.getRequestURL().toString());
@@ -311,16 +301,10 @@ public class JT1078Controller {
}
streamInfo.changeStreamIp(host);
}
wvpResult.setData(new StreamContent(streamInfo));
}else {
wvpResult.setCode(code);
wvpResult.setMsg(msg);
wvpResultForFinish.setData(new StreamContent(streamInfo));
}
}else {
wvpResult.setCode(code);
wvpResult.setMsg(msg);
}
result.setResult(wvpResult);
result.setResult(wvpResultForFinish);
});
return result;
@@ -385,8 +369,8 @@ public class JT1078Controller {
response.setContentType(MediaType.APPLICATION_OCTET_STREAM_VALUE);
response.addHeader("Content-Disposition", "attachment;filename=" + URLEncoder.encode(phoneNumber + "_" + channelId + ".mp4", "UTF-8"));
response.setStatus(HttpServletResponse.SC_OK);
service.recordDownload(phoneNumber, channelId, startTime, endTime, alarmSign, mediaType, streamType, storageType, (code, msg, data) -> {
String filePath = "ftp" + data;
service.recordDownload(phoneNumber, channelId, startTime, endTime, alarmSign, mediaType, streamType, storageType, wvpResult -> {
String filePath = "ftp" + wvpResult.getData();
File file = new File(filePath);
if (!file.exists()) {
log.warn("[下载录像] 收到通知时未找到录像文件: {}", filePath);

View File

@@ -5,10 +5,10 @@ import java.util.Map;
public class JTChannelProvider {
public final static String BASE_SQL =
"SELECT jc.*, jc.id as jt_channel_id, wdc.*, wdc.id as gb_id " +
"SELECT jc.*, jc.id as data_device_id, wdc.*, wdc.id as gb_id " +
" from wvp_jt_channel jc " +
" LEFT join wvp_device_channel wdc " +
" on jc.id = wdc.jt_channel_id ";
" on jc.id = wdc.data_device_id and wdc.data_type = 200 ";
public String selectChannelByChannelId(Map<String, Object> params ){
StringBuilder sqlBuild = new StringBuilder();

View File

@@ -1,69 +0,0 @@
package com.genersoft.iot.vmp.jt1078.event;
import com.genersoft.iot.vmp.common.GeneralCallback;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* 管理callback回调支持设置超时时间未设置则按照五分钟超时自动移除
*/
@Component
public class CallbackManager {
private final long expire = 5 * 60 * 1000;
static class ManagerCallBack {
public String key;
public GeneralCallback<?> callback;
public long createTime;
public long expire;
}
private final Map<String, ManagerCallBack> allCallbacks = new ConcurrentHashMap<>();
public void addCallback(String key, GeneralCallback<?> callback) {
ManagerCallBack managerCallBack = new ManagerCallBack();
managerCallBack.callback = callback;
managerCallBack.key = key;
managerCallBack.createTime = System.currentTimeMillis();
managerCallBack.expire = expire;
allCallbacks.put(key, managerCallBack);
}
public void addCallback(String key, GeneralCallback<Object> callback, long timeout) {
ManagerCallBack managerCallBack = new ManagerCallBack();
managerCallBack.callback = callback;
managerCallBack.key = key;
managerCallBack.createTime = System.currentTimeMillis();
managerCallBack.expire = timeout;
allCallbacks.put(key, managerCallBack);
}
public GeneralCallback<?> getCallback(String key){
ManagerCallBack managerCallBack = allCallbacks.get(key);
if (managerCallBack != null) {
return managerCallBack.callback;
}else {
return null;
}
}
public void removeCallback(String key){
allCallbacks.remove(key);
}
/**
* 对订阅数据进行过期清理
*/
@Scheduled(fixedRate=expire) //每5分钟执行一次
public void execute(){
for (ManagerCallBack callBack : allCallbacks.values()) {
if ((System.currentTimeMillis() - callBack.createTime - callBack.expire) > 0) {
allCallbacks.remove(callBack.key);
}
}
}
}

View File

@@ -1,20 +1,20 @@
package com.genersoft.iot.vmp.jt1078.service;
import com.genersoft.iot.vmp.common.GeneralCallback;
import com.genersoft.iot.vmp.common.CommonCallback;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.jt1078.bean.*;
import com.genersoft.iot.vmp.jt1078.proc.request.J1205;
import com.github.pagehelper.PageInfo;
import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
import java.util.List;
public interface Ijt1078PlayService {
JTMediaStreamType checkStreamFromJt(String stream);
void play(String phoneNumber, Integer channelId, int type, GeneralCallback<StreamInfo> callback);
void play(String phoneNumber, Integer channelId, int type, CommonCallback<WVPResult<StreamInfo>> callback);
void playback(String phoneNumber, Integer channelId, String startTime, String endTime, Integer type,
Integer rate, Integer playbackType, Integer playbackSpeed, GeneralCallback<StreamInfo> callback);
Integer rate, Integer playbackType, Integer playbackSpeed, CommonCallback<WVPResult<StreamInfo>> callback);
void stopPlay(String phoneNumber, Integer channelId);
@@ -26,7 +26,7 @@ public interface Ijt1078PlayService {
void stopPlayback(String phoneNumber, Integer channelId);
void startTalk(String phoneNumber, Integer channelId, String app, String stream, String mediaServerId, Boolean onlySend, GeneralCallback<StreamInfo> callback);
void startTalk(String phoneNumber, Integer channelId, String app, String stream, String mediaServerId, Boolean onlySend, CommonCallback<WVPResult<StreamInfo>> callback);
void stopTalk(String phoneNumber, Integer channelId);

View File

@@ -1,9 +1,10 @@
package com.genersoft.iot.vmp.jt1078.service;
import com.genersoft.iot.vmp.common.GeneralCallback;
import com.genersoft.iot.vmp.common.CommonCallback;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.jt1078.bean.*;
import com.genersoft.iot.vmp.jt1078.proc.request.J1205;
import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
import com.github.pagehelper.PageInfo;
import java.util.List;
@@ -32,7 +33,7 @@ public interface Ijt1078Service {
void wiper(String phoneNumber, Integer channelId, String command);
JTDeviceConfig queryConfig(String phoneNumber, String[] params, GeneralCallback<StreamInfo> callback);
JTDeviceConfig queryConfig(String phoneNumber, String[] params, CommonCallback<WVPResult<StreamInfo>> callback);
void setConfig(String phoneNumber, JTDeviceConfig config);
@@ -100,7 +101,7 @@ public interface Ijt1078Service {
void changeStreamType(String phoneNumber, Integer channelId, Integer streamType);
void recordDownload(String phoneNumber, Integer channelId, String startTime, String endTime, Integer alarmSign, Integer mediaType, Integer streamType, Integer storageType, GeneralCallback<String> fileCallback);
void recordDownload(String phoneNumber, Integer channelId, String startTime, String endTime, Integer alarmSign, Integer mediaType, Integer streamType, Integer storageType, CommonCallback<WVPResult<String>> fileCallback);
PageInfo<JTChannel> getChannelList(int page, int count, int deviceId, String query);

View File

@@ -1,6 +1,6 @@
package com.genersoft.iot.vmp.jt1078.service.impl;
import com.genersoft.iot.vmp.common.GeneralCallback;
import com.genersoft.iot.vmp.common.CommonCallback;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.common.VideoManagerConstants;
import com.genersoft.iot.vmp.conf.DynamicTask;
@@ -10,8 +10,6 @@ import com.genersoft.iot.vmp.conf.ftpServer.FtpSetting;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpInfo;
import com.genersoft.iot.vmp.jt1078.bean.*;
import com.genersoft.iot.vmp.jt1078.cmd.JT1078Template;
import com.genersoft.iot.vmp.jt1078.dao.JTChannelMapper;
import com.genersoft.iot.vmp.jt1078.dao.JTTerminalMapper;
import com.genersoft.iot.vmp.jt1078.event.FtpUploadEvent;
import com.genersoft.iot.vmp.jt1078.proc.request.J1205;
import com.genersoft.iot.vmp.jt1078.proc.response.*;
@@ -35,6 +33,7 @@ import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.utils.DateUtil;
import com.genersoft.iot.vmp.utils.MediaServerUtils;
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.ObjectUtils;
import org.springframework.beans.factory.annotation.Autowired;
@@ -160,10 +159,10 @@ public class jt1078PlayServiceImpl implements Ijt1078PlayService {
}
}
private final Map<String, List<GeneralCallback<StreamInfo>>> inviteErrorCallbackMap = new ConcurrentHashMap<>();
private final Map<String, List<CommonCallback<WVPResult<StreamInfo>>>> inviteErrorCallbackMap = new ConcurrentHashMap<>();
@Override
public void play(String phoneNumber, Integer channelId, int type, GeneralCallback<StreamInfo> callback) {
public void play(String phoneNumber, Integer channelId, int type, CommonCallback<WVPResult<StreamInfo>> callback) {
JTDevice device = jt1078Service.getDevice(phoneNumber);
if (device == null) {
throw new ControllerException(ErrorCode.ERROR100.getCode(), "设备不存在");
@@ -175,7 +174,7 @@ public class jt1078PlayServiceImpl implements Ijt1078PlayService {
}
// 检查流是否已经存在,存在则返回
String playKey = VideoManagerConstants.INVITE_INFO_1078_PLAY + phoneNumber + ":" + channelId;
List<GeneralCallback<StreamInfo>> errorCallbacks = inviteErrorCallbackMap.computeIfAbsent(playKey, k -> new ArrayList<>());
List<CommonCallback<WVPResult<StreamInfo>>> errorCallbacks = inviteErrorCallbackMap.computeIfAbsent(playKey, k -> new ArrayList<>());
errorCallbacks.add(callback);
StreamInfo streamInfo = (StreamInfo) redisTemplate.opsForValue().get(playKey);
if (streamInfo != null) {
@@ -185,8 +184,8 @@ public class jt1078PlayServiceImpl implements Ijt1078PlayService {
MediaInfo mediaInfo = mediaServerService.getMediaInfo(mediaServer, "rtp", streamInfo.getStream());
if (mediaInfo != null) {
log.info("[1078-点播] 点播已经存在,直接返回, phoneNumber {} channelId {}", phoneNumber, channelId);
for (GeneralCallback<StreamInfo> errorCallback : errorCallbacks) {
errorCallback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), streamInfo);
for (CommonCallback<WVPResult<StreamInfo>> errorCallback : errorCallbacks) {
errorCallback.run(new WVPResult<>(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), streamInfo));
}
return;
}
@@ -197,8 +196,8 @@ public class jt1078PlayServiceImpl implements Ijt1078PlayService {
String stream = "jt_" + phoneNumber + "_" + channelId;
MediaServer mediaServer = mediaServerService.getMediaServerForMinimumLoad(null);
if (mediaServer == null) {
for (GeneralCallback<StreamInfo> errorCallback : errorCallbacks) {
errorCallback.run(InviteErrorCode.FAIL.getCode(), "未找到可用的媒体节点", streamInfo);
for (CommonCallback<WVPResult<StreamInfo>> errorCallback : errorCallbacks) {
errorCallback.run(new WVPResult<>(InviteErrorCode.FAIL.getCode(), "未找到可用的媒体节点", streamInfo));
}
return;
}
@@ -210,11 +209,11 @@ public class jt1078PlayServiceImpl implements Ijt1078PlayService {
// TODO 发送9105 实时音视频传输状态通知, 通知丢包率
StreamInfo info = onPublishHandler(mediaServer, hookData, phoneNumber, channelId);
for (GeneralCallback<StreamInfo> errorCallback : errorCallbacks) {
for (CommonCallback<WVPResult<StreamInfo>> errorCallback : errorCallbacks) {
if (errorCallback == null) {
continue;
}
errorCallback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), info);
errorCallback.run(new WVPResult<>(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), info));
}
subscribe.removeSubscribe(hook);
redisTemplate.opsForValue().set(playKey, info);
@@ -240,9 +239,9 @@ public class jt1078PlayServiceImpl implements Ijt1078PlayService {
// 设置超时监听
dynamicTask.startDelay(playKey, () -> {
log.info("[1078-点播] 超时, phoneNumber {} channelId {}", phoneNumber, channelId);
for (GeneralCallback<StreamInfo> errorCallback : errorCallbacks) {
errorCallback.run(InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getCode(),
InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getMsg(), null);
for (CommonCallback<WVPResult<StreamInfo>> errorCallback : errorCallbacks) {
errorCallback.run(new WVPResult<>(InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getCode(),
InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getMsg(), null));
}
mediaServerService.closeRTPServer(mediaServer, stream);
subscribe.removeSubscribe(hook);
@@ -271,10 +270,10 @@ public class jt1078PlayServiceImpl implements Ijt1078PlayService {
String playKey = VideoManagerConstants.INVITE_INFO_1078_PLAY + phoneNumber + ":" + channelId;
dynamicTask.stop(playKey);
// 清理回调
List<GeneralCallback<StreamInfo>> generalCallbacks = inviteErrorCallbackMap.get(playKey);
List<CommonCallback<WVPResult<StreamInfo>>> generalCallbacks = inviteErrorCallbackMap.get(playKey);
if (generalCallbacks != null && !generalCallbacks.isEmpty()) {
for (GeneralCallback<StreamInfo> callback : generalCallbacks) {
callback.run(InviteErrorCode.ERROR_FOR_FINISH.getCode(), InviteErrorCode.ERROR_FOR_FINISH.getMsg(), null);
for (CommonCallback<WVPResult<StreamInfo>> callback : generalCallbacks) {
callback.run(new WVPResult<>(InviteErrorCode.ERROR_FOR_FINISH.getCode(), InviteErrorCode.ERROR_FOR_FINISH.getMsg(), null));
}
}
jt1078Template.checkTerminalStatus(phoneNumber);
@@ -355,12 +354,12 @@ public class jt1078PlayServiceImpl implements Ijt1078PlayService {
@Override
public void playback(String phoneNumber, Integer channelId, String startTime, String endTime, Integer type,
Integer rate, Integer playbackType, Integer playbackSpeed, GeneralCallback<StreamInfo> callback) {
Integer rate, Integer playbackType, Integer playbackSpeed, CommonCallback<WVPResult<StreamInfo>> callback) {
log.info("[1078-回放] 回放,设备:{} 通道: {} 开始时间: {} 结束时间: {} 音视频类型: {} 码流类型: {} " +
"回放方式: {} 快进或快退倍数: {}", phoneNumber, channelId, startTime, endTime, type, rate, playbackType, playbackSpeed);
// 检查流是否已经存在,存在则返回
String playbackKey = VideoManagerConstants.INVITE_INFO_1078_PLAYBACK + phoneNumber + ":" + channelId;
List<GeneralCallback<StreamInfo>> errorCallbacks = inviteErrorCallbackMap.computeIfAbsent(playbackKey, k -> new ArrayList<>());
List<CommonCallback<WVPResult<StreamInfo>>> errorCallbacks = inviteErrorCallbackMap.computeIfAbsent(playbackKey, k -> new ArrayList<>());
errorCallbacks.add(callback);
String logInfo = String.format("phoneNumber:%s, channelId:%s, startTime:%s, endTime:%s", phoneNumber, channelId, startTime, endTime);
StreamInfo streamInfo = (StreamInfo) redisTemplate.opsForValue().get(playbackKey);
@@ -371,8 +370,8 @@ public class jt1078PlayServiceImpl implements Ijt1078PlayService {
MediaInfo mediaInfo = mediaServerService.getMediaInfo(mediaServer, "rtp", streamInfo.getStream());
if (mediaInfo != null) {
log.info("[1078-回放] 回放已经存在,直接返回, logInfo {}", logInfo);
for (GeneralCallback<StreamInfo> errorCallback : errorCallbacks) {
errorCallback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), streamInfo);
for (CommonCallback<WVPResult<StreamInfo>> errorCallback : errorCallbacks) {
errorCallback.run(new WVPResult<>(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), streamInfo));
}
return;
}
@@ -385,8 +384,8 @@ public class jt1078PlayServiceImpl implements Ijt1078PlayService {
String stream = "jt_" + phoneNumber + "_" + channelId + "_" + startTimeParam + "_" + endTimeParam;
MediaServer mediaServer = mediaServerService.getMediaServerForMinimumLoad(null);
if (mediaServer == null) {
for (GeneralCallback<StreamInfo> errorCallback : errorCallbacks) {
errorCallback.run(InviteErrorCode.FAIL.getCode(), "未找到可用的媒体节点", streamInfo);
for (CommonCallback<WVPResult<StreamInfo>> errorCallback : errorCallbacks) {
errorCallback.run(new WVPResult<>(InviteErrorCode.FAIL.getCode(), "未找到可用的媒体节点", streamInfo));
}
return;
}
@@ -397,11 +396,11 @@ public class jt1078PlayServiceImpl implements Ijt1078PlayService {
log.info("[1078-回放] 回放成功, logInfo {}", logInfo);
StreamInfo info = onPublishHandler(mediaServer, hookData, phoneNumber, channelId);
for (GeneralCallback<StreamInfo> errorCallback : errorCallbacks) {
for (CommonCallback<WVPResult<StreamInfo>> errorCallback : errorCallbacks) {
if (errorCallback == null) {
continue;
}
errorCallback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), info);
errorCallback.run(new WVPResult<>(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), info));
}
subscribe.removeSubscribe(hookSubscribe);
redisTemplate.opsForValue().set(playbackKey, info);
@@ -409,9 +408,9 @@ public class jt1078PlayServiceImpl implements Ijt1078PlayService {
// 设置超时监听
dynamicTask.startDelay(playbackKey, () -> {
log.info("[1078-回放] 回放超时, logInfo {}", logInfo);
for (GeneralCallback<StreamInfo> errorCallback : errorCallbacks) {
errorCallback.run(InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getCode(),
InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getMsg(), null);
for (CommonCallback<WVPResult<StreamInfo>> errorCallback : errorCallbacks) {
errorCallback.run(new WVPResult<>(InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getCode(),
InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getMsg(), null));
}
}, userSetting.getPlayTimeout());
@@ -456,13 +455,13 @@ public class jt1078PlayServiceImpl implements Ijt1078PlayService {
mediaServerService.closeRTPServer(streamInfo.getMediaServer(), streamInfo.getStream());
}
// 清理回调
List<GeneralCallback<StreamInfo>> generalCallbacks = inviteErrorCallbackMap.get(playKey);
List<CommonCallback<WVPResult<StreamInfo>>> generalCallbacks = inviteErrorCallbackMap.get(playKey);
if (generalCallbacks != null && !generalCallbacks.isEmpty()) {
for (GeneralCallback<StreamInfo> callback : generalCallbacks) {
for (CommonCallback<WVPResult<StreamInfo>> callback : generalCallbacks) {
if (callback == null) {
continue;
}
callback.run(InviteErrorCode.ERROR_FOR_FINISH.getCode(), InviteErrorCode.ERROR_FOR_FINISH.getMsg(), null);
callback.run(new WVPResult<>(InviteErrorCode.ERROR_FOR_FINISH.getCode(), InviteErrorCode.ERROR_FOR_FINISH.getMsg(), null));
}
}
}
@@ -484,7 +483,7 @@ public class jt1078PlayServiceImpl implements Ijt1078PlayService {
playbackControl(phoneNumber, channelId, 2, null, null);
}
private Map<String, GeneralCallback<String>> fileUploadMap = new ConcurrentHashMap<>();
private Map<String, CommonCallback<WVPResult<String>>> fileUploadMap = new ConcurrentHashMap<>();
@EventListener
public void onApplicationEvent(FtpUploadEvent event) {
@@ -495,9 +494,9 @@ public class jt1078PlayServiceImpl implements Ijt1078PlayService {
if (!event.getFileName().contains(key)) {
return;
}
GeneralCallback<String> callback = fileUploadMap.get(key);
CommonCallback<WVPResult<String>> callback = fileUploadMap.get(key);
if (callback != null) {
callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), event.getFileName());
callback.run(new WVPResult<>(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), event.getFileName()));
fileUploadMap.remove(key);
}
});
@@ -528,10 +527,10 @@ public class jt1078PlayServiceImpl implements Ijt1078PlayService {
@Override
public void startTalk(String phoneNumber, Integer channelId, String app, String stream, String mediaServerId, Boolean onlySend,
GeneralCallback<StreamInfo> callback) {
CommonCallback<WVPResult<StreamInfo>> callback) {
// 检查流是否已经存在,存在则返回
String playKey = VideoManagerConstants.INVITE_INFO_1078_TALK + phoneNumber + ":" + channelId;
List<GeneralCallback<StreamInfo>> errorCallbacks = inviteErrorCallbackMap.computeIfAbsent(playKey, k -> new ArrayList<>());
List<CommonCallback<WVPResult<StreamInfo>>> errorCallbacks = inviteErrorCallbackMap.computeIfAbsent(playKey, k -> new ArrayList<>());
errorCallbacks.add(callback);
StreamInfo streamInfo = (StreamInfo) redisTemplate.opsForValue().get(playKey);
if (streamInfo != null) {
@@ -541,8 +540,8 @@ public class jt1078PlayServiceImpl implements Ijt1078PlayService {
String receiveStream = "jt_" + phoneNumber + "_" + channelId + "_talk";
MediaServer mediaServer = mediaServerService.getOne(mediaServerId);
if (mediaServer == null) {
for (GeneralCallback<StreamInfo> errorCallback : errorCallbacks) {
errorCallback.run(InviteErrorCode.FAIL.getCode(), "未找到可用的媒体节点", streamInfo);
for (CommonCallback<WVPResult<StreamInfo>> errorCallback : errorCallbacks) {
errorCallback.run(new WVPResult<>(InviteErrorCode.FAIL.getCode(), "未找到可用的媒体节点", streamInfo));
}
return;
}
@@ -568,8 +567,8 @@ public class jt1078PlayServiceImpl implements Ijt1078PlayService {
log.info("[1078-对讲] 对讲成功, phoneNumber {} channelId {}", phoneNumber, channelId);
StreamInfo info = onPublishHandler(mediaServer, hookData, phoneNumber, channelId);
for (GeneralCallback<StreamInfo> errorCallback : errorCallbacks) {
errorCallback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), info);
for (CommonCallback<WVPResult<StreamInfo>> errorCallback : errorCallbacks) {
errorCallback.run(new WVPResult<>(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), info));
}
subscribe.removeSubscribe(hook);
redisTemplate.opsForValue().set(playKey, info);
@@ -584,9 +583,9 @@ public class jt1078PlayServiceImpl implements Ijt1078PlayService {
// 设置超时监听
dynamicTask.startDelay(playKey, () -> {
log.info("[1078-对讲] 超时, phoneNumber {} channelId {}", phoneNumber, channelId);
for (GeneralCallback<StreamInfo> errorCallback : errorCallbacks) {
errorCallback.run(InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getCode(),
InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getMsg(), null);
for (CommonCallback<WVPResult<StreamInfo>> errorCallback : errorCallbacks) {
errorCallback.run(new WVPResult<>(InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getCode(),
InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getMsg(), null));
}
}, userSetting.getPlayTimeout());
@@ -604,8 +603,8 @@ public class jt1078PlayServiceImpl implements Ijt1078PlayService {
jt1078Template.startLive(phoneNumber, j9101, 6);
if (onlySend != null && onlySend) {
log.info("[1078-对讲] 对讲成功, phoneNumber {} channelId {}", phoneNumber, channelId);
for (GeneralCallback<StreamInfo> errorCallback : errorCallbacks) {
errorCallback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), null);
for (CommonCallback<WVPResult<StreamInfo>> errorCallback : errorCallbacks) {
errorCallback.run(new WVPResult<>(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), null));
}
// 存储发流信息
sendRtpServerService.update(sendRtpInfo);
@@ -632,10 +631,10 @@ public class jt1078PlayServiceImpl implements Ijt1078PlayService {
mediaServerService.closeRTPServer(streamInfo.getMediaServer(), streamInfo.getStream());
}
// 清理回调
List<GeneralCallback<StreamInfo>> generalCallbacks = inviteErrorCallbackMap.get(playKey);
List<CommonCallback<WVPResult<StreamInfo>>> generalCallbacks = inviteErrorCallbackMap.get(playKey);
if (generalCallbacks != null && !generalCallbacks.isEmpty()) {
for (GeneralCallback<StreamInfo> callback : generalCallbacks) {
callback.run(InviteErrorCode.ERROR_FOR_FINISH.getCode(), InviteErrorCode.ERROR_FOR_FINISH.getMsg(), null);
for (CommonCallback<WVPResult<StreamInfo>> callback : generalCallbacks) {
callback.run(new WVPResult<>(InviteErrorCode.ERROR_FOR_FINISH.getCode(), InviteErrorCode.ERROR_FOR_FINISH.getMsg(), null));
}
}
}

View File

@@ -1,6 +1,6 @@
package com.genersoft.iot.vmp.jt1078.service.impl;
import com.genersoft.iot.vmp.common.GeneralCallback;
import com.genersoft.iot.vmp.common.CommonCallback;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.common.VideoManagerConstants;
import com.genersoft.iot.vmp.conf.DynamicTask;
@@ -18,19 +18,13 @@ import com.genersoft.iot.vmp.jt1078.proc.response.*;
import com.genersoft.iot.vmp.jt1078.service.Ijt1078Service;
import com.genersoft.iot.vmp.media.event.media.MediaArrivalEvent;
import com.genersoft.iot.vmp.media.event.media.MediaDepartureEvent;
import com.genersoft.iot.vmp.media.event.media.MediaNotFoundEvent;
import com.genersoft.iot.vmp.media.service.IMediaServerService;
import com.genersoft.iot.vmp.service.ISendRtpServerService;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.utils.DateUtil;
import com.genersoft.iot.vmp.utils.MediaServerUtils;
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
import com.github.pagehelper.PageHelper;
import com.github.pagehelper.PageInfo;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.ObjectUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.event.EventListener;
import org.springframework.data.redis.core.RedisTemplate;
@@ -154,7 +148,7 @@ public class jt1078ServiceImpl implements Ijt1078Service {
jtDeviceMapper.updateDeviceStatus(connected, phoneNumber);
}
private Map<String, GeneralCallback<String>> fileUploadMap = new ConcurrentHashMap<>();
private Map<String, CommonCallback<WVPResult<String>>> fileUploadMap = new ConcurrentHashMap<>();
@EventListener
public void onApplicationEvent(FtpUploadEvent event) {
@@ -165,16 +159,16 @@ public class jt1078ServiceImpl implements Ijt1078Service {
if (!event.getFileName().contains(key)) {
return;
}
GeneralCallback<String> callback = fileUploadMap.get(key);
CommonCallback<WVPResult<String>> callback = fileUploadMap.get(key);
if (callback != null) {
callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), event.getFileName());
callback.run(new WVPResult<>(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), event.getFileName()));
fileUploadMap.remove(key);
}
});
}
@Override
public void recordDownload(String phoneNumber, Integer channelId, String startTime, String endTime, Integer alarmSign, Integer mediaType, Integer streamType, Integer storageType, GeneralCallback<String> fileCallback) {
public void recordDownload(String phoneNumber, Integer channelId, String startTime, String endTime, Integer alarmSign, Integer mediaType, Integer streamType, Integer storageType, CommonCallback<WVPResult<String>> fileCallback) {
String filePath = UUID.randomUUID().toString();
fileUploadMap.put(filePath, fileCallback);
dynamicTask.startDelay(filePath, ()->{
@@ -307,7 +301,7 @@ public class jt1078ServiceImpl implements Ijt1078Service {
}
@Override
public JTDeviceConfig queryConfig(String phoneNumber, String[] params, GeneralCallback<StreamInfo> callback) {
public JTDeviceConfig queryConfig(String phoneNumber, String[] params, CommonCallback<WVPResult<StreamInfo>> callback) {
if (phoneNumber == null) {
return null;
}