1078-添加点播支持
This commit is contained in:
@@ -175,4 +175,9 @@ public class VideoManagerConstants {
|
||||
*/
|
||||
public static final String REDIS_RECORD_INFO_RES_COUNT_PRE = "GB_RECORD_INFO_RES_COUNT:";
|
||||
|
||||
//************************** 1078 ****************************************
|
||||
|
||||
|
||||
public static final String INVITE_INFO_1078 = "INVITE_INFO_1078:";
|
||||
|
||||
}
|
||||
|
||||
@@ -1,18 +1,28 @@
|
||||
package com.genersoft.iot.vmp.jt1078.config;
|
||||
|
||||
import com.genersoft.iot.vmp.common.InviteSessionType;
|
||||
import com.genersoft.iot.vmp.common.StreamInfo;
|
||||
import com.genersoft.iot.vmp.conf.UserSetting;
|
||||
import com.genersoft.iot.vmp.conf.security.JwtUtils;
|
||||
import com.genersoft.iot.vmp.gb28181.bean.Device;
|
||||
import com.genersoft.iot.vmp.jt1078.bean.JTDevice;
|
||||
import com.genersoft.iot.vmp.jt1078.cmd.JT1078Template;
|
||||
import com.genersoft.iot.vmp.jt1078.proc.response.*;
|
||||
import com.genersoft.iot.vmp.jt1078.service.Ijt1078Service;
|
||||
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
|
||||
import com.genersoft.iot.vmp.vmanager.bean.StreamContent;
|
||||
import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
|
||||
import com.genersoft.iot.vmp.vmanager.gb28181.play.PlayController;
|
||||
import com.github.pagehelper.PageInfo;
|
||||
import io.swagger.v3.oas.annotations.Operation;
|
||||
import io.swagger.v3.oas.annotations.Parameter;
|
||||
import io.swagger.v3.oas.annotations.security.SecurityRequirement;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||
import org.springframework.web.bind.annotation.*;
|
||||
import org.springframework.web.context.request.async.DeferredResult;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
import java.util.List;
|
||||
@@ -29,31 +39,39 @@ import java.util.List;
|
||||
@RequestMapping("/api/jt1078")
|
||||
public class JT1078Controller {
|
||||
|
||||
private final static Logger logger = LoggerFactory.getLogger(JT1078Controller.class);
|
||||
|
||||
@Resource
|
||||
JT1078Template jt1078Template;
|
||||
|
||||
@Resource
|
||||
Ijt1078Service service;
|
||||
|
||||
@Autowired
|
||||
UserSetting userSetting;
|
||||
|
||||
/**
|
||||
* jt1078Template 调用示例
|
||||
*/
|
||||
@GetMapping("/start/live/{deviceId}/{channelId}")
|
||||
public WVPResult<?> startLive(@PathVariable String deviceId, @PathVariable String channelId) {
|
||||
J9101 j9101 = new J9101();
|
||||
j9101.setChannel(Integer.valueOf(channelId));
|
||||
j9101.setIp("192.168.1.3");
|
||||
j9101.setRate(1);
|
||||
j9101.setTcpPort(7618);
|
||||
j9101.setUdpPort(7618);
|
||||
j9101.setType(0);
|
||||
// TODO 分配ZLM,获取IP、端口
|
||||
String s = jt1078Template.startLive(deviceId, j9101, 6);
|
||||
// TODO 设备响应成功后,封装拉流结果集
|
||||
WVPResult<String> wvpResult = new WVPResult<>();
|
||||
wvpResult.setCode(200);
|
||||
wvpResult.setData(String.format("http://192.168.1.1/rtp/%s_%s.live.mp4", deviceId, channelId));
|
||||
return wvpResult;
|
||||
public DeferredResult<WVPResult<StreamContent>> startLive(@PathVariable String deviceId, @PathVariable String channelId) {
|
||||
DeferredResult<WVPResult<StreamContent>> result = new DeferredResult<>(userSetting.getPlayTimeout().longValue());
|
||||
|
||||
result.onTimeout(()->{
|
||||
logger.info("[1078-点播等待超时] deviceId:{}, channelId:{}, ", deviceId, channelId);
|
||||
// 释放rtpserver
|
||||
WVPResult<StreamContent> wvpResult = new WVPResult<>();
|
||||
wvpResult.setCode(ErrorCode.ERROR100.getCode());
|
||||
wvpResult.setMsg("点播超时");
|
||||
result.setResult(wvpResult);
|
||||
service.stopPlay(deviceId, channelId);
|
||||
});
|
||||
|
||||
service.play(deviceId, channelId, (code, msg, data) -> {
|
||||
|
||||
});
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
@Operation(summary = "分页查询部标设备", security = @SecurityRequirement(name = JwtUtils.HEADER))
|
||||
|
||||
@@ -1,5 +1,8 @@
|
||||
package com.genersoft.iot.vmp.jt1078.service;
|
||||
|
||||
import com.genersoft.iot.vmp.common.CommonCallback;
|
||||
import com.genersoft.iot.vmp.common.GeneralCallback;
|
||||
import com.genersoft.iot.vmp.common.StreamInfo;
|
||||
import com.genersoft.iot.vmp.jt1078.bean.JTDevice;
|
||||
import com.github.pagehelper.PageInfo;
|
||||
|
||||
@@ -17,4 +20,8 @@ public interface Ijt1078Service {
|
||||
void deleteDeviceByDeviceId(String deviceId);
|
||||
|
||||
void updateDeviceStatus(boolean connected, String terminalId);
|
||||
|
||||
void play(String deviceId, String channelId, GeneralCallback<StreamInfo> callback);
|
||||
|
||||
void stopPlay(String deviceId, String channelId);
|
||||
}
|
||||
|
||||
@@ -1,22 +1,77 @@
|
||||
package com.genersoft.iot.vmp.jt1078.service.impl;
|
||||
|
||||
import com.genersoft.iot.vmp.common.GeneralCallback;
|
||||
import com.genersoft.iot.vmp.common.StreamInfo;
|
||||
import com.genersoft.iot.vmp.common.VideoManagerConstants;
|
||||
import com.genersoft.iot.vmp.conf.DynamicTask;
|
||||
import com.genersoft.iot.vmp.conf.UserSetting;
|
||||
import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException;
|
||||
import com.genersoft.iot.vmp.jt1078.bean.JTDevice;
|
||||
import com.genersoft.iot.vmp.jt1078.cmd.JT1078Template;
|
||||
import com.genersoft.iot.vmp.jt1078.config.JT1078Controller;
|
||||
import com.genersoft.iot.vmp.jt1078.dao.JTDeviceMapper;
|
||||
import com.genersoft.iot.vmp.jt1078.proc.response.J9101;
|
||||
import com.genersoft.iot.vmp.jt1078.service.Ijt1078Service;
|
||||
import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
|
||||
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
|
||||
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange;
|
||||
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
|
||||
import com.genersoft.iot.vmp.media.zlm.dto.hook.HookParam;
|
||||
import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam;
|
||||
import com.genersoft.iot.vmp.service.IMediaServerService;
|
||||
import com.genersoft.iot.vmp.service.IMediaService;
|
||||
import com.genersoft.iot.vmp.service.bean.ErrorCallback;
|
||||
import com.genersoft.iot.vmp.service.bean.InviteErrorCode;
|
||||
import com.genersoft.iot.vmp.service.bean.SSRCInfo;
|
||||
import com.genersoft.iot.vmp.utils.DateUtil;
|
||||
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
|
||||
import com.github.pagehelper.PageHelper;
|
||||
import com.github.pagehelper.PageInfo;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.data.redis.core.RedisTemplate;
|
||||
import org.springframework.data.redis.core.StringRedisTemplate;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
import javax.sip.InvalidArgumentException;
|
||||
import javax.sip.SipException;
|
||||
import java.text.ParseException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
@Service
|
||||
public class jt1078ServiceImpl implements Ijt1078Service {
|
||||
|
||||
private final static Logger logger = LoggerFactory.getLogger(jt1078ServiceImpl.class);
|
||||
|
||||
@Autowired
|
||||
private JTDeviceMapper jtDeviceMapper;
|
||||
|
||||
@Resource
|
||||
JT1078Template jt1078Template;
|
||||
|
||||
@Autowired
|
||||
private RedisTemplate<Object, Object> redisTemplate;
|
||||
|
||||
@Autowired
|
||||
private ZlmHttpHookSubscribe subscribe;
|
||||
|
||||
@Autowired
|
||||
private IMediaServerService mediaServerService;
|
||||
|
||||
@Autowired
|
||||
private IMediaService mediaService;
|
||||
|
||||
@Autowired
|
||||
private DynamicTask dynamicTask;
|
||||
|
||||
@Autowired
|
||||
private UserSetting userSetting;
|
||||
|
||||
|
||||
@Override
|
||||
public JTDevice getDevice(String terminalId) {
|
||||
@@ -52,4 +107,71 @@ public class jt1078ServiceImpl implements Ijt1078Service {
|
||||
public void updateDeviceStatus(boolean connected, String terminalId) {
|
||||
jtDeviceMapper.updateDeviceStatus(connected, terminalId);
|
||||
}
|
||||
|
||||
private final Map<String, List<GeneralCallback<StreamInfo>>> inviteErrorCallbackMap = new ConcurrentHashMap<>();
|
||||
|
||||
@Override
|
||||
public void play(String deviceId, String channelId, GeneralCallback<StreamInfo> callback) {
|
||||
|
||||
// 检查流是否已经存在,存在则返回
|
||||
String playKey = VideoManagerConstants.INVITE_INFO_1078 + deviceId + ":" + channelId;
|
||||
List<GeneralCallback<StreamInfo>> errorCallbacks = inviteErrorCallbackMap.computeIfAbsent(playKey, k -> new ArrayList<>());
|
||||
errorCallbacks.add(callback);
|
||||
StreamInfo streamInfo = (StreamInfo)redisTemplate.opsForValue().get(playKey);
|
||||
if (streamInfo != null) {
|
||||
logger.info("[1078-点播] 点播已经存在,直接返回, deviceId: {}, channelId: {}", deviceId, channelId);
|
||||
for (GeneralCallback<StreamInfo> errorCallback : errorCallbacks) {
|
||||
errorCallback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), streamInfo);
|
||||
}
|
||||
return;
|
||||
}
|
||||
String stream = deviceId + "-" + channelId;
|
||||
MediaServerItem mediaServerItem = mediaServerService.getMediaServerForMinimumLoad(null);
|
||||
|
||||
// 设置hook监听
|
||||
HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", stream, true, "rtsp", mediaServerItem.getId());
|
||||
subscribe.addSubscribe(hookSubscribe, (MediaServerItem mediaServerItemInUse, HookParam hookParam) -> {
|
||||
OnStreamChangedHookParam streamChangedHookParam = (OnStreamChangedHookParam) hookParam;
|
||||
logger.info("[1078-点播] 点播成功, deviceId: {}, channelId: {}", deviceId, channelId);
|
||||
StreamInfo info = onPublishHandler(mediaServerItemInUse, streamChangedHookParam, deviceId, channelId);
|
||||
|
||||
for (GeneralCallback<StreamInfo> errorCallback : errorCallbacks) {
|
||||
errorCallback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), info);
|
||||
}
|
||||
subscribe.removeSubscribe(hookSubscribe);
|
||||
});
|
||||
// 设置超时监听
|
||||
dynamicTask.startDelay(playKey, () -> {
|
||||
logger.info("[1078-点播] 超时, deviceId: {}, channelId: {}", deviceId, channelId);
|
||||
for (GeneralCallback<StreamInfo> errorCallback : errorCallbacks) {
|
||||
errorCallback.run(InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getCode(),
|
||||
InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getMsg(), null);
|
||||
}
|
||||
|
||||
}, userSetting.getPlayTimeout());
|
||||
|
||||
// 开启收流端口
|
||||
SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, stream, null, false, false, 0, false, false, 0);
|
||||
J9101 j9101 = new J9101();
|
||||
j9101.setChannel(Integer.valueOf(channelId));
|
||||
j9101.setIp(mediaServerItem.getSdpIp());
|
||||
j9101.setRate(1);
|
||||
j9101.setTcpPort(ssrcInfo.getPort());
|
||||
j9101.setUdpPort(ssrcInfo.getPort());
|
||||
j9101.setType(0);
|
||||
String s = jt1078Template.startLive(deviceId, j9101, 6);
|
||||
System.out.println("ssss=== " + s);
|
||||
}
|
||||
|
||||
public StreamInfo onPublishHandler(MediaServerItem mediaServerItem, OnStreamChangedHookParam hookParam, String deviceId, String channelId) {
|
||||
StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream(mediaServerItem, "rtp", hookParam.getStream(), hookParam.getTracks(), null);
|
||||
streamInfo.setDeviceID(deviceId);
|
||||
streamInfo.setChannelId(channelId);
|
||||
return streamInfo;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stopPlay(String deviceId, String channelId) {
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user