1078-优化点播

This commit is contained in:
648540858
2024-04-03 16:33:49 +08:00
parent 603ce18573
commit abc3766556
5 changed files with 68 additions and 18 deletions

View File

@@ -3,6 +3,7 @@ package com.genersoft.iot.vmp.jt1078.cmd;
import com.genersoft.iot.vmp.jt1078.proc.entity.Cmd;
import com.genersoft.iot.vmp.jt1078.proc.response.*;
import com.genersoft.iot.vmp.jt1078.session.SessionManager;
import org.springframework.stereotype.Component;
import java.util.Random;
@@ -11,6 +12,7 @@ import java.util.Random;
* @date 2023/4/27 18:58
* @email qingtaij@163.com
*/
@Component
public class JT1078Template {
private final Random random = new Random();

View File

@@ -20,7 +20,11 @@ import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.concurrent.Future;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.CommandLineRunner;
import org.springframework.context.ApplicationContextInitializer;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.stereotype.Component;
import java.util.concurrent.TimeUnit;
@@ -30,7 +34,8 @@ import java.util.concurrent.TimeUnit;
* @email qingtaij@163.com
*/
public class TcpServer {
public class TcpServer{
private final static Logger log = LoggerFactory.getLogger(TcpServer.class);
private final Integer port;

View File

@@ -31,9 +31,4 @@ public class JT1078AutoConfiguration {
public TcpServer jt1078Server(@Value("${jt1078.port}") Integer port) {
return new TcpServer(port, applicationEventPublisher, service);
}
@Bean
public JT1078Template jt1078Template() {
return new JT1078Template();
}
}

View File

@@ -9,6 +9,7 @@ import com.genersoft.iot.vmp.jt1078.bean.JTDevice;
import com.genersoft.iot.vmp.jt1078.cmd.JT1078Template;
import com.genersoft.iot.vmp.jt1078.proc.response.*;
import com.genersoft.iot.vmp.jt1078.service.Ijt1078Service;
import com.genersoft.iot.vmp.service.bean.InviteErrorCode;
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
import com.genersoft.iot.vmp.vmanager.bean.StreamContent;
import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
@@ -25,6 +26,9 @@ import org.springframework.web.bind.annotation.*;
import org.springframework.web.context.request.async.DeferredResult;
import javax.annotation.Resource;
import javax.servlet.http.HttpServletRequest;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.List;
/**
@@ -41,9 +45,6 @@ public class JT1078Controller {
private final static Logger logger = LoggerFactory.getLogger(JT1078Controller.class);
@Resource
JT1078Template jt1078Template;
@Resource
Ijt1078Service service;
@@ -54,7 +55,7 @@ public class JT1078Controller {
* jt1078Template 调用示例
*/
@GetMapping("/start/live/{deviceId}/{channelId}")
public DeferredResult<WVPResult<StreamContent>> startLive(@PathVariable String deviceId, @PathVariable String channelId) {
public DeferredResult<WVPResult<StreamContent>> startLive(HttpServletRequest request, @PathVariable String deviceId, @PathVariable String channelId) {
DeferredResult<WVPResult<StreamContent>> result = new DeferredResult<>(userSetting.getPlayTimeout().longValue());
result.onTimeout(()->{
@@ -67,8 +68,33 @@ public class JT1078Controller {
service.stopPlay(deviceId, channelId);
});
service.play(deviceId, channelId, (code, msg, data) -> {
service.play(deviceId, channelId, (code, msg, streamInfo) -> {
WVPResult<StreamContent> wvpResult = new WVPResult<>();
if (code == InviteErrorCode.SUCCESS.getCode()) {
wvpResult.setCode(ErrorCode.SUCCESS.getCode());
wvpResult.setMsg(ErrorCode.SUCCESS.getMsg());
if (streamInfo != null) {
if (userSetting.getUseSourceIpAsStreamIp()) {
streamInfo=streamInfo.clone();//深拷贝
String host;
try {
URL url=new URL(request.getRequestURL().toString());
host=url.getHost();
} catch (MalformedURLException e) {
host=request.getLocalAddr();
}
streamInfo.channgeStreamIp(host);
}
wvpResult.setData(new StreamContent(streamInfo));
}else {
wvpResult.setCode(code);
wvpResult.setMsg(msg);
}
}else {
wvpResult.setCode(code);
wvpResult.setMsg(msg);
}
});
return result;

View File

@@ -1,5 +1,6 @@
package com.genersoft.iot.vmp.jt1078.service.impl;
import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.common.GeneralCallback;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.common.VideoManagerConstants;
@@ -12,6 +13,7 @@ import com.genersoft.iot.vmp.jt1078.config.JT1078Controller;
import com.genersoft.iot.vmp.jt1078.dao.JTDeviceMapper;
import com.genersoft.iot.vmp.jt1078.proc.response.J9101;
import com.genersoft.iot.vmp.jt1078.service.Ijt1078Service;
import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils;
import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange;
@@ -51,8 +53,8 @@ public class jt1078ServiceImpl implements Ijt1078Service {
@Autowired
private JTDeviceMapper jtDeviceMapper;
@Resource
JT1078Template jt1078Template;
@Autowired
private JT1078Template jt1078Template;
@Autowired
private RedisTemplate<Object, Object> redisTemplate;
@@ -72,6 +74,9 @@ public class jt1078ServiceImpl implements Ijt1078Service {
@Autowired
private UserSetting userSetting;
@Autowired
private ZLMRESTfulUtils zlmresTfulUtils;
@Override
public JTDevice getDevice(String terminalId) {
@@ -119,11 +124,24 @@ public class jt1078ServiceImpl implements Ijt1078Service {
errorCallbacks.add(callback);
StreamInfo streamInfo = (StreamInfo)redisTemplate.opsForValue().get(playKey);
if (streamInfo != null) {
logger.info("[1078-点播] 点播已经存在,直接返回, deviceId {} channelId {}", deviceId, channelId);
for (GeneralCallback<StreamInfo> errorCallback : errorCallbacks) {
errorCallback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), streamInfo);
String mediaServerId = streamInfo.getMediaServerId();
MediaServerItem mediaServerItem = mediaServerService.getOne(mediaServerId);
if (mediaServerItem != null) {
// 查询流是否存在,不存在则删除缓存数据
JSONObject mediaInfo = zlmresTfulUtils.getMediaInfo(mediaServerItem, "rtp", "rtsp", streamInfo.getStream());
if (mediaInfo != null && mediaInfo.getInteger("code") == 0 ) {
Boolean online = mediaInfo.getBoolean("online");
if (online != null && online) {
logger.info("[1078-点播] 点播已经存在,直接返回, deviceId {} channelId {}", deviceId, channelId);
for (GeneralCallback<StreamInfo> errorCallback : errorCallbacks) {
errorCallback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), streamInfo);
}
return;
}
}
}
return;
// 清理数据
redisTemplate.delete(playKey);
}
String stream = deviceId + "-" + channelId;
MediaServerItem mediaServerItem = mediaServerService.getMediaServerForMinimumLoad(null);
@@ -132,6 +150,7 @@ public class jt1078ServiceImpl implements Ijt1078Service {
HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", stream, true, "rtsp", mediaServerItem.getId());
subscribe.addSubscribe(hookSubscribe, (MediaServerItem mediaServerItemInUse, HookParam hookParam) -> {
OnStreamChangedHookParam streamChangedHookParam = (OnStreamChangedHookParam) hookParam;
dynamicTask.stop(playKey);
logger.info("[1078-点播] 点播成功, deviceId {} channelId {}", deviceId, channelId);
StreamInfo info = onPublishHandler(mediaServerItemInUse, streamChangedHookParam, deviceId, channelId);
@@ -139,6 +158,7 @@ public class jt1078ServiceImpl implements Ijt1078Service {
errorCallback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), info);
}
subscribe.removeSubscribe(hookSubscribe);
redisTemplate.opsForValue().set(playKey, info);
});
// 设置超时监听
dynamicTask.startDelay(playKey, () -> {
@@ -151,7 +171,8 @@ public class jt1078ServiceImpl implements Ijt1078Service {
}, userSetting.getPlayTimeout());
// 开启收流端口
SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, stream, null, false, false, 0, false, false, 0);
SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, stream, null, false, false, 0, false, false, 1);
logger.info("[1078-点播] deviceId {} channelId {} 端口: {}", deviceId, channelId, ssrcInfo.getPort());
J9101 j9101 = new J9101();
j9101.setChannel(Integer.valueOf(channelId));
j9101.setIp(mediaServerItem.getSdpIp());
@@ -161,6 +182,7 @@ public class jt1078ServiceImpl implements Ijt1078Service {
j9101.setType(0);
String s = jt1078Template.startLive(deviceId, j9101, 6);
System.out.println("ssss=== " + s);
}
public StreamInfo onPublishHandler(MediaServerItem mediaServerItem, OnStreamChangedHookParam hookParam, String deviceId, String channelId) {