1078-语音对讲

This commit is contained in:
648540858
2024-05-29 00:02:31 +08:00
parent 9ece77d54a
commit a945078788
4 changed files with 141 additions and 4 deletions

View File

@@ -183,6 +183,7 @@ public class VideoManagerConstants {
public static final String INVITE_INFO_1078_PLAY = "INVITE_INFO_1078_PLAY:";
public static final String INVITE_INFO_1078_PLAYBACK = "INVITE_INFO_1078_PLAYBACK:";
public static final String INVITE_INFO_1078_BROADCAST = "INVITE_INFO_1078_BROADCAST:";
public static final String RECORD_LIST_1078 = "RECORD_LIST_1078:";

View File

@@ -61,13 +61,13 @@ public class JT1078Controller {
@Operation(summary = "1078-开始点播", security = @SecurityRequirement(name = JwtUtils.HEADER))
@Parameter(name = "deviceId", description = "设备国标编号", required = true)
@Parameter(name = "channelId", description = "通道国标编号, 一般为从1开始的数字", required = true)
@Parameter(name = "type", description = "类型0:音视频,1:视频,2:音频", required = true)
@Parameter(name = "type", description = "类型0:音视频,1:视频,3:音频", required = true)
@GetMapping("/live/start")
public DeferredResult<WVPResult<StreamContent>> startLive(HttpServletRequest request,
@Parameter(required = true) String deviceId,
@Parameter(required = false) String channelId,
@Parameter(required = false) Integer type) {
if (type == null || (type != 0 && type != 1 && type != 2)) {
if (type == null || (type != 0 && type != 1 && type != 3)) {
type = 0;
}
DeferredResult<WVPResult<StreamContent>> result = new DeferredResult<>(userSetting.getPlayTimeout().longValue());
@@ -80,7 +80,7 @@ public class JT1078Controller {
// 释放rtpserver
WVPResult<StreamContent> wvpResult = new WVPResult<>();
wvpResult.setCode(ErrorCode.ERROR100.getCode());
wvpResult.setMsg("点播超时");
wvpResult.setMsg("超时");
result.setResult(wvpResult);
service.stopPlay(deviceId, finalChannelId);
});
@@ -131,6 +131,68 @@ public class JT1078Controller {
service.stopPlay(deviceId, channelId);
}
@Operation(summary = "1078-语音对讲", security = @SecurityRequirement(name = JwtUtils.HEADER))
@Parameter(name = "deviceId", description = "设备国标编号", required = true)
@Parameter(name = "channelId", description = "通道国标编号, 一般为从1开始的数字", required = true)
@Parameter(name = "app", description = "推流应用名", required = true)
@Parameter(name = "stream", description = "推流ID", required = true)
@Parameter(name = "mediaServerId", description = "流媒体ID", required = true)
@GetMapping("/broadcast")
public DeferredResult<WVPResult<StreamContent>> broadcast(HttpServletRequest request,
@Parameter(required = true) String deviceId,
@Parameter(required = true) String channelId,
@Parameter(required = true) String app,
@Parameter(required = true) String stream,
@Parameter(required = true) String mediaServerId) {
DeferredResult<WVPResult<StreamContent>> result = new DeferredResult<>(userSetting.getPlayTimeout().longValue());
if (ObjectUtils.isEmpty(channelId)) {
channelId = "1";
}
String finalChannelId = channelId;
result.onTimeout(()->{
logger.info("[1078-语音对讲超时] deviceId{}, channelId{}, ", deviceId, finalChannelId);
// 释放rtpserver
WVPResult<StreamContent> wvpResult = new WVPResult<>();
wvpResult.setCode(ErrorCode.ERROR100.getCode());
wvpResult.setMsg("超时");
result.setResult(wvpResult);
service.stopPlay(deviceId, finalChannelId);
});
service.broadcast(deviceId, channelId, app, stream, mediaServerId, (code, msg, streamInfo) -> {
WVPResult<StreamContent> wvpResult = new WVPResult<>();
if (code == InviteErrorCode.SUCCESS.getCode()) {
wvpResult.setCode(ErrorCode.SUCCESS.getCode());
wvpResult.setMsg(ErrorCode.SUCCESS.getMsg());
if (streamInfo != null) {
if (userSetting.getUseSourceIpAsStreamIp()) {
streamInfo=streamInfo.clone();//深拷贝
String host;
try {
URL url=new URL(request.getRequestURL().toString());
host=url.getHost();
} catch (MalformedURLException e) {
host=request.getLocalAddr();
}
streamInfo.channgeStreamIp(host);
}
wvpResult.setData(new StreamContent(streamInfo));
}else {
wvpResult.setCode(code);
wvpResult.setMsg(msg);
}
}else {
wvpResult.setCode(code);
wvpResult.setMsg(msg);
}
result.setResult(wvpResult);
});
return result;
}
@Operation(summary = "1078-暂停点播", security = @SecurityRequirement(name = JwtUtils.HEADER))
@Parameter(name = "deviceId", description = "设备国标编号", required = true)
@Parameter(name = "channelId", description = "通道国标编号, 一般为从1开始的数字", required = true)

View File

@@ -106,4 +106,6 @@ public interface Ijt1078Service {
void uploadMediaDataForSingle(String deviceId, Long mediaId, Integer delete);
JTMediaAttribute queryMediaAttribute(String deviceId);
void broadcast(String deviceId, String channelId, String app, String stream, String mediaServerId, GeneralCallback<StreamInfo> callback);
}

View File

@@ -140,7 +140,7 @@ public class jt1078ServiceImpl implements Ijt1078Service {
// 清理数据
redisTemplate.delete(playKey);
}
String stream = deviceId + "-" + channelId;
String stream = deviceId + "_" + channelId;
MediaServer mediaServer = mediaServerService.getMediaServerForMinimumLoad(null);
if (mediaServer == null) {
for (GeneralCallback<StreamInfo> errorCallback : errorCallbacks) {
@@ -750,4 +750,76 @@ public class jt1078ServiceImpl implements Ijt1078Service {
J9003 j9003 = new J9003();
return (JTMediaAttribute)jt1078Template.queryMediaAttribute(deviceId, j9003, 300);
}
@Override
public void broadcast(String deviceId, String channelId, String app, String stream, String mediaServerId, GeneralCallback<StreamInfo> callback) {
// 检查流是否已经存在,存在则返回
String playKey = VideoManagerConstants.INVITE_INFO_1078_BROADCAST + deviceId + ":" + channelId;
List<GeneralCallback<StreamInfo>> errorCallbacks = inviteErrorCallbackMap.computeIfAbsent(playKey, k -> new ArrayList<>());
errorCallbacks.add(callback);
StreamInfo streamInfo = (StreamInfo)redisTemplate.opsForValue().get(playKey);
if (streamInfo != null) {
String mediaServerId = streamInfo.getMediaServerId();
MediaServer mediaServer = mediaServerService.getOne(mediaServerId);
if (mediaServer != null) {
// 查询流是否存在,不存在则删除缓存数据
JSONObject mediaInfo = zlmresTfulUtils.getMediaInfo(mediaServer, "rtp", "rtsp", streamInfo.getStream());
if (mediaInfo != null && mediaInfo.getInteger("code") == 0 ) {
Boolean online = mediaInfo.getBoolean("online");
if (online != null && online) {
logger.info("[1078-点播] 点播已经存在,直接返回, deviceId {} channelId {}", deviceId, channelId);
for (GeneralCallback<StreamInfo> errorCallback : errorCallbacks) {
errorCallback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), streamInfo);
}
return;
}
}
}
// 清理数据
redisTemplate.delete(playKey);
}
String stream = deviceId + "_" + channelId;
MediaServer mediaServer = mediaServerService.getMediaServerForMinimumLoad(null);
if (mediaServer == null) {
for (GeneralCallback<StreamInfo> errorCallback : errorCallbacks) {
errorCallback.run(InviteErrorCode.FAIL.getCode(), "未找到可用的媒体节点", streamInfo);
}
return;
}
// 设置hook监听
Hook hook = Hook.getInstance(HookType.on_media_arrival, "rtp", stream, mediaServer.getId());
subscribe.addSubscribe(hook, (hookData) -> {
dynamicTask.stop(playKey);
logger.info("[1078-点播] 点播成功, deviceId {} channelId {}", deviceId, channelId);
StreamInfo info = onPublishHandler(mediaServer, hookData, deviceId, channelId);
for (GeneralCallback<StreamInfo> errorCallback : errorCallbacks) {
errorCallback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), info);
}
subscribe.removeSubscribe(hook);
redisTemplate.opsForValue().set(playKey, info);
});
// 设置超时监听
dynamicTask.startDelay(playKey, () -> {
logger.info("[1078-点播] 超时, deviceId {} channelId {}", deviceId, channelId);
for (GeneralCallback<StreamInfo> errorCallback : errorCallbacks) {
errorCallback.run(InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getCode(),
InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getMsg(), null);
}
}, userSetting.getPlayTimeout());
// 开启收流端口
SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServer, stream, null, false, false, 0, false, false, false,1);
logger.info("[1078-点播] deviceId {} channelId {} 端口: {}", deviceId, channelId, ssrcInfo.getPort());
J9101 j9101 = new J9101();
j9101.setChannel(Integer.valueOf(channelId));
j9101.setIp(mediaServer.getSdpIp());
j9101.setRate(1);
j9101.setTcpPort(ssrcInfo.getPort());
j9101.setUdpPort(ssrcInfo.getPort());
j9101.setType(type);
Object s = jt1078Template.startLive(deviceId, j9101, 6);
System.out.println("ssss=== " + s);
}
}