diff --git a/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java b/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java index d19b8f051..a35285bdd 100644 --- a/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java +++ b/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java @@ -175,4 +175,9 @@ public class VideoManagerConstants { */ public static final String REDIS_RECORD_INFO_RES_COUNT_PRE = "GB_RECORD_INFO_RES_COUNT:"; + //************************** 1078 **************************************** + + + public static final String INVITE_INFO_1078 = "INVITE_INFO_1078:"; + } diff --git a/src/main/java/com/genersoft/iot/vmp/jt1078/config/JT1078Controller.java b/src/main/java/com/genersoft/iot/vmp/jt1078/config/JT1078Controller.java index d50722227..9dfcd368e 100644 --- a/src/main/java/com/genersoft/iot/vmp/jt1078/config/JT1078Controller.java +++ b/src/main/java/com/genersoft/iot/vmp/jt1078/config/JT1078Controller.java @@ -1,18 +1,28 @@ package com.genersoft.iot.vmp.jt1078.config; +import com.genersoft.iot.vmp.common.InviteSessionType; +import com.genersoft.iot.vmp.common.StreamInfo; +import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.conf.security.JwtUtils; import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.jt1078.bean.JTDevice; import com.genersoft.iot.vmp.jt1078.cmd.JT1078Template; import com.genersoft.iot.vmp.jt1078.proc.response.*; import com.genersoft.iot.vmp.jt1078.service.Ijt1078Service; +import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; +import com.genersoft.iot.vmp.vmanager.bean.StreamContent; import com.genersoft.iot.vmp.vmanager.bean.WVPResult; +import com.genersoft.iot.vmp.vmanager.gb28181.play.PlayController; import com.github.pagehelper.PageInfo; import io.swagger.v3.oas.annotations.Operation; import io.swagger.v3.oas.annotations.Parameter; import io.swagger.v3.oas.annotations.security.SecurityRequirement; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.web.bind.annotation.*; +import org.springframework.web.context.request.async.DeferredResult; import javax.annotation.Resource; import java.util.List; @@ -29,31 +39,39 @@ import java.util.List; @RequestMapping("/api/jt1078") public class JT1078Controller { + private final static Logger logger = LoggerFactory.getLogger(JT1078Controller.class); + @Resource JT1078Template jt1078Template; @Resource Ijt1078Service service; + @Autowired + UserSetting userSetting; + /** * jt1078Template 调用示例 */ @GetMapping("/start/live/{deviceId}/{channelId}") - public WVPResult startLive(@PathVariable String deviceId, @PathVariable String channelId) { - J9101 j9101 = new J9101(); - j9101.setChannel(Integer.valueOf(channelId)); - j9101.setIp("192.168.1.3"); - j9101.setRate(1); - j9101.setTcpPort(7618); - j9101.setUdpPort(7618); - j9101.setType(0); - // TODO 分配ZLM,获取IP、端口 - String s = jt1078Template.startLive(deviceId, j9101, 6); - // TODO 设备响应成功后,封装拉流结果集 - WVPResult wvpResult = new WVPResult<>(); - wvpResult.setCode(200); - wvpResult.setData(String.format("http://192.168.1.1/rtp/%s_%s.live.mp4", deviceId, channelId)); - return wvpResult; + public DeferredResult> startLive(@PathVariable String deviceId, @PathVariable String channelId) { + DeferredResult> result = new DeferredResult<>(userSetting.getPlayTimeout().longValue()); + + result.onTimeout(()->{ + logger.info("[1078-点播等待超时] deviceId:{}, channelId:{}, ", deviceId, channelId); + // 释放rtpserver + WVPResult wvpResult = new WVPResult<>(); + wvpResult.setCode(ErrorCode.ERROR100.getCode()); + wvpResult.setMsg("点播超时"); + result.setResult(wvpResult); + service.stopPlay(deviceId, channelId); + }); + + service.play(deviceId, channelId, (code, msg, data) -> { + + }); + + return result; } @Operation(summary = "分页查询部标设备", security = @SecurityRequirement(name = JwtUtils.HEADER)) diff --git a/src/main/java/com/genersoft/iot/vmp/jt1078/service/Ijt1078Service.java b/src/main/java/com/genersoft/iot/vmp/jt1078/service/Ijt1078Service.java index 85b38428e..7406099df 100644 --- a/src/main/java/com/genersoft/iot/vmp/jt1078/service/Ijt1078Service.java +++ b/src/main/java/com/genersoft/iot/vmp/jt1078/service/Ijt1078Service.java @@ -1,5 +1,8 @@ package com.genersoft.iot.vmp.jt1078.service; +import com.genersoft.iot.vmp.common.CommonCallback; +import com.genersoft.iot.vmp.common.GeneralCallback; +import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.jt1078.bean.JTDevice; import com.github.pagehelper.PageInfo; @@ -17,4 +20,8 @@ public interface Ijt1078Service { void deleteDeviceByDeviceId(String deviceId); void updateDeviceStatus(boolean connected, String terminalId); + + void play(String deviceId, String channelId, GeneralCallback callback); + + void stopPlay(String deviceId, String channelId); } diff --git a/src/main/java/com/genersoft/iot/vmp/jt1078/service/impl/jt1078ServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/jt1078/service/impl/jt1078ServiceImpl.java index 86be01f70..f60acc768 100644 --- a/src/main/java/com/genersoft/iot/vmp/jt1078/service/impl/jt1078ServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/jt1078/service/impl/jt1078ServiceImpl.java @@ -1,22 +1,77 @@ package com.genersoft.iot.vmp.jt1078.service.impl; +import com.genersoft.iot.vmp.common.GeneralCallback; +import com.genersoft.iot.vmp.common.StreamInfo; +import com.genersoft.iot.vmp.common.VideoManagerConstants; +import com.genersoft.iot.vmp.conf.DynamicTask; +import com.genersoft.iot.vmp.conf.UserSetting; +import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException; import com.genersoft.iot.vmp.jt1078.bean.JTDevice; +import com.genersoft.iot.vmp.jt1078.cmd.JT1078Template; +import com.genersoft.iot.vmp.jt1078.config.JT1078Controller; import com.genersoft.iot.vmp.jt1078.dao.JTDeviceMapper; +import com.genersoft.iot.vmp.jt1078.proc.response.J9101; import com.genersoft.iot.vmp.jt1078.service.Ijt1078Service; +import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; +import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory; +import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange; +import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; +import com.genersoft.iot.vmp.media.zlm.dto.hook.HookParam; +import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam; +import com.genersoft.iot.vmp.service.IMediaServerService; +import com.genersoft.iot.vmp.service.IMediaService; +import com.genersoft.iot.vmp.service.bean.ErrorCallback; +import com.genersoft.iot.vmp.service.bean.InviteErrorCode; +import com.genersoft.iot.vmp.service.bean.SSRCInfo; import com.genersoft.iot.vmp.utils.DateUtil; +import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; import com.github.pagehelper.PageHelper; import com.github.pagehelper.PageInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.stereotype.Service; +import javax.annotation.Resource; +import javax.sip.InvalidArgumentException; +import javax.sip.SipException; +import java.text.ParseException; +import java.util.ArrayList; import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; @Service public class jt1078ServiceImpl implements Ijt1078Service { + private final static Logger logger = LoggerFactory.getLogger(jt1078ServiceImpl.class); + @Autowired private JTDeviceMapper jtDeviceMapper; + @Resource + JT1078Template jt1078Template; + + @Autowired + private RedisTemplate redisTemplate; + + @Autowired + private ZlmHttpHookSubscribe subscribe; + + @Autowired + private IMediaServerService mediaServerService; + + @Autowired + private IMediaService mediaService; + + @Autowired + private DynamicTask dynamicTask; + + @Autowired + private UserSetting userSetting; + @Override public JTDevice getDevice(String terminalId) { @@ -52,4 +107,71 @@ public class jt1078ServiceImpl implements Ijt1078Service { public void updateDeviceStatus(boolean connected, String terminalId) { jtDeviceMapper.updateDeviceStatus(connected, terminalId); } + + private final Map>> inviteErrorCallbackMap = new ConcurrentHashMap<>(); + + @Override + public void play(String deviceId, String channelId, GeneralCallback callback) { + + // 检查流是否已经存在,存在则返回 + String playKey = VideoManagerConstants.INVITE_INFO_1078 + deviceId + ":" + channelId; + List> errorCallbacks = inviteErrorCallbackMap.computeIfAbsent(playKey, k -> new ArrayList<>()); + errorCallbacks.add(callback); + StreamInfo streamInfo = (StreamInfo)redisTemplate.opsForValue().get(playKey); + if (streamInfo != null) { + logger.info("[1078-点播] 点播已经存在,直接返回, deviceId: {}, channelId: {}", deviceId, channelId); + for (GeneralCallback errorCallback : errorCallbacks) { + errorCallback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), streamInfo); + } + return; + } + String stream = deviceId + "-" + channelId; + MediaServerItem mediaServerItem = mediaServerService.getMediaServerForMinimumLoad(null); + + // 设置hook监听 + HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", stream, true, "rtsp", mediaServerItem.getId()); + subscribe.addSubscribe(hookSubscribe, (MediaServerItem mediaServerItemInUse, HookParam hookParam) -> { + OnStreamChangedHookParam streamChangedHookParam = (OnStreamChangedHookParam) hookParam; + logger.info("[1078-点播] 点播成功, deviceId: {}, channelId: {}", deviceId, channelId); + StreamInfo info = onPublishHandler(mediaServerItemInUse, streamChangedHookParam, deviceId, channelId); + + for (GeneralCallback errorCallback : errorCallbacks) { + errorCallback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), info); + } + subscribe.removeSubscribe(hookSubscribe); + }); + // 设置超时监听 + dynamicTask.startDelay(playKey, () -> { + logger.info("[1078-点播] 超时, deviceId: {}, channelId: {}", deviceId, channelId); + for (GeneralCallback errorCallback : errorCallbacks) { + errorCallback.run(InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getCode(), + InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getMsg(), null); + } + + }, userSetting.getPlayTimeout()); + + // 开启收流端口 + SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, stream, null, false, false, 0, false, false, 0); + J9101 j9101 = new J9101(); + j9101.setChannel(Integer.valueOf(channelId)); + j9101.setIp(mediaServerItem.getSdpIp()); + j9101.setRate(1); + j9101.setTcpPort(ssrcInfo.getPort()); + j9101.setUdpPort(ssrcInfo.getPort()); + j9101.setType(0); + String s = jt1078Template.startLive(deviceId, j9101, 6); + System.out.println("ssss=== " + s); + } + + public StreamInfo onPublishHandler(MediaServerItem mediaServerItem, OnStreamChangedHookParam hookParam, String deviceId, String channelId) { + StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream(mediaServerItem, "rtp", hookParam.getStream(), hookParam.getTracks(), null); + streamInfo.setDeviceID(deviceId); + streamInfo.setChannelId(channelId); + return streamInfo; + } + + @Override + public void stopPlay(String deviceId, String channelId) { + + } }