diff --git a/src/main/java/com/genersoft/iot/vmp/jt1078/cmd/JT1078Template.java b/src/main/java/com/genersoft/iot/vmp/jt1078/cmd/JT1078Template.java index c55c62760..98af47a9d 100644 --- a/src/main/java/com/genersoft/iot/vmp/jt1078/cmd/JT1078Template.java +++ b/src/main/java/com/genersoft/iot/vmp/jt1078/cmd/JT1078Template.java @@ -3,6 +3,7 @@ package com.genersoft.iot.vmp.jt1078.cmd; import com.genersoft.iot.vmp.jt1078.proc.entity.Cmd; import com.genersoft.iot.vmp.jt1078.proc.response.*; import com.genersoft.iot.vmp.jt1078.session.SessionManager; +import org.springframework.stereotype.Component; import java.util.Random; @@ -11,6 +12,7 @@ import java.util.Random; * @date 2023/4/27 18:58 * @email qingtaij@163.com */ +@Component public class JT1078Template { private final Random random = new Random(); diff --git a/src/main/java/com/genersoft/iot/vmp/jt1078/codec/netty/TcpServer.java b/src/main/java/com/genersoft/iot/vmp/jt1078/codec/netty/TcpServer.java index ccbcc134a..44b734c78 100644 --- a/src/main/java/com/genersoft/iot/vmp/jt1078/codec/netty/TcpServer.java +++ b/src/main/java/com/genersoft/iot/vmp/jt1078/codec/netty/TcpServer.java @@ -20,7 +20,11 @@ import io.netty.handler.timeout.IdleStateHandler; import io.netty.util.concurrent.Future; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.CommandLineRunner; +import org.springframework.context.ApplicationContextInitializer; import org.springframework.context.ApplicationEventPublisher; +import org.springframework.stereotype.Component; import java.util.concurrent.TimeUnit; @@ -30,7 +34,8 @@ import java.util.concurrent.TimeUnit; * @email qingtaij@163.com */ -public class TcpServer { + +public class TcpServer{ private final static Logger log = LoggerFactory.getLogger(TcpServer.class); private final Integer port; diff --git a/src/main/java/com/genersoft/iot/vmp/jt1078/config/JT1078AutoConfiguration.java b/src/main/java/com/genersoft/iot/vmp/jt1078/config/JT1078AutoConfiguration.java index 9b1117d75..ca72683b3 100644 --- a/src/main/java/com/genersoft/iot/vmp/jt1078/config/JT1078AutoConfiguration.java +++ b/src/main/java/com/genersoft/iot/vmp/jt1078/config/JT1078AutoConfiguration.java @@ -31,9 +31,4 @@ public class JT1078AutoConfiguration { public TcpServer jt1078Server(@Value("${jt1078.port}") Integer port) { return new TcpServer(port, applicationEventPublisher, service); } - - @Bean - public JT1078Template jt1078Template() { - return new JT1078Template(); - } } diff --git a/src/main/java/com/genersoft/iot/vmp/jt1078/config/JT1078Controller.java b/src/main/java/com/genersoft/iot/vmp/jt1078/config/JT1078Controller.java index 9dfcd368e..a09b98f1b 100644 --- a/src/main/java/com/genersoft/iot/vmp/jt1078/config/JT1078Controller.java +++ b/src/main/java/com/genersoft/iot/vmp/jt1078/config/JT1078Controller.java @@ -9,6 +9,7 @@ import com.genersoft.iot.vmp.jt1078.bean.JTDevice; import com.genersoft.iot.vmp.jt1078.cmd.JT1078Template; import com.genersoft.iot.vmp.jt1078.proc.response.*; import com.genersoft.iot.vmp.jt1078.service.Ijt1078Service; +import com.genersoft.iot.vmp.service.bean.InviteErrorCode; import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; import com.genersoft.iot.vmp.vmanager.bean.StreamContent; import com.genersoft.iot.vmp.vmanager.bean.WVPResult; @@ -25,6 +26,9 @@ import org.springframework.web.bind.annotation.*; import org.springframework.web.context.request.async.DeferredResult; import javax.annotation.Resource; +import javax.servlet.http.HttpServletRequest; +import java.net.MalformedURLException; +import java.net.URL; import java.util.List; /** @@ -41,9 +45,6 @@ public class JT1078Controller { private final static Logger logger = LoggerFactory.getLogger(JT1078Controller.class); - @Resource - JT1078Template jt1078Template; - @Resource Ijt1078Service service; @@ -54,7 +55,7 @@ public class JT1078Controller { * jt1078Template 调用示例 */ @GetMapping("/start/live/{deviceId}/{channelId}") - public DeferredResult> startLive(@PathVariable String deviceId, @PathVariable String channelId) { + public DeferredResult> startLive(HttpServletRequest request, @PathVariable String deviceId, @PathVariable String channelId) { DeferredResult> result = new DeferredResult<>(userSetting.getPlayTimeout().longValue()); result.onTimeout(()->{ @@ -67,8 +68,33 @@ public class JT1078Controller { service.stopPlay(deviceId, channelId); }); - service.play(deviceId, channelId, (code, msg, data) -> { + service.play(deviceId, channelId, (code, msg, streamInfo) -> { + WVPResult wvpResult = new WVPResult<>(); + if (code == InviteErrorCode.SUCCESS.getCode()) { + wvpResult.setCode(ErrorCode.SUCCESS.getCode()); + wvpResult.setMsg(ErrorCode.SUCCESS.getMsg()); + if (streamInfo != null) { + if (userSetting.getUseSourceIpAsStreamIp()) { + streamInfo=streamInfo.clone();//深拷贝 + String host; + try { + URL url=new URL(request.getRequestURL().toString()); + host=url.getHost(); + } catch (MalformedURLException e) { + host=request.getLocalAddr(); + } + streamInfo.channgeStreamIp(host); + } + wvpResult.setData(new StreamContent(streamInfo)); + }else { + wvpResult.setCode(code); + wvpResult.setMsg(msg); + } + }else { + wvpResult.setCode(code); + wvpResult.setMsg(msg); + } }); return result; diff --git a/src/main/java/com/genersoft/iot/vmp/jt1078/service/impl/jt1078ServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/jt1078/service/impl/jt1078ServiceImpl.java index f60acc768..45e47767c 100644 --- a/src/main/java/com/genersoft/iot/vmp/jt1078/service/impl/jt1078ServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/jt1078/service/impl/jt1078ServiceImpl.java @@ -1,5 +1,6 @@ package com.genersoft.iot.vmp.jt1078.service.impl; +import com.alibaba.fastjson2.JSONObject; import com.genersoft.iot.vmp.common.GeneralCallback; import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.common.VideoManagerConstants; @@ -12,6 +13,7 @@ import com.genersoft.iot.vmp.jt1078.config.JT1078Controller; import com.genersoft.iot.vmp.jt1078.dao.JTDeviceMapper; import com.genersoft.iot.vmp.jt1078.proc.response.J9101; import com.genersoft.iot.vmp.jt1078.service.Ijt1078Service; +import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils; import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory; import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange; @@ -51,8 +53,8 @@ public class jt1078ServiceImpl implements Ijt1078Service { @Autowired private JTDeviceMapper jtDeviceMapper; - @Resource - JT1078Template jt1078Template; + @Autowired + private JT1078Template jt1078Template; @Autowired private RedisTemplate redisTemplate; @@ -72,6 +74,9 @@ public class jt1078ServiceImpl implements Ijt1078Service { @Autowired private UserSetting userSetting; + @Autowired + private ZLMRESTfulUtils zlmresTfulUtils; + @Override public JTDevice getDevice(String terminalId) { @@ -119,11 +124,24 @@ public class jt1078ServiceImpl implements Ijt1078Service { errorCallbacks.add(callback); StreamInfo streamInfo = (StreamInfo)redisTemplate.opsForValue().get(playKey); if (streamInfo != null) { - logger.info("[1078-点播] 点播已经存在,直接返回, deviceId: {}, channelId: {}", deviceId, channelId); - for (GeneralCallback errorCallback : errorCallbacks) { - errorCallback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), streamInfo); + String mediaServerId = streamInfo.getMediaServerId(); + MediaServerItem mediaServerItem = mediaServerService.getOne(mediaServerId); + if (mediaServerItem != null) { + // 查询流是否存在,不存在则删除缓存数据 + JSONObject mediaInfo = zlmresTfulUtils.getMediaInfo(mediaServerItem, "rtp", "rtsp", streamInfo.getStream()); + if (mediaInfo != null && mediaInfo.getInteger("code") == 0 ) { + Boolean online = mediaInfo.getBoolean("online"); + if (online != null && online) { + logger.info("[1078-点播] 点播已经存在,直接返回, deviceId: {}, channelId: {}", deviceId, channelId); + for (GeneralCallback errorCallback : errorCallbacks) { + errorCallback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), streamInfo); + } + return; + } + } } - return; + // 清理数据 + redisTemplate.delete(playKey); } String stream = deviceId + "-" + channelId; MediaServerItem mediaServerItem = mediaServerService.getMediaServerForMinimumLoad(null); @@ -132,6 +150,7 @@ public class jt1078ServiceImpl implements Ijt1078Service { HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", stream, true, "rtsp", mediaServerItem.getId()); subscribe.addSubscribe(hookSubscribe, (MediaServerItem mediaServerItemInUse, HookParam hookParam) -> { OnStreamChangedHookParam streamChangedHookParam = (OnStreamChangedHookParam) hookParam; + dynamicTask.stop(playKey); logger.info("[1078-点播] 点播成功, deviceId: {}, channelId: {}", deviceId, channelId); StreamInfo info = onPublishHandler(mediaServerItemInUse, streamChangedHookParam, deviceId, channelId); @@ -139,6 +158,7 @@ public class jt1078ServiceImpl implements Ijt1078Service { errorCallback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), info); } subscribe.removeSubscribe(hookSubscribe); + redisTemplate.opsForValue().set(playKey, info); }); // 设置超时监听 dynamicTask.startDelay(playKey, () -> { @@ -151,7 +171,8 @@ public class jt1078ServiceImpl implements Ijt1078Service { }, userSetting.getPlayTimeout()); // 开启收流端口 - SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, stream, null, false, false, 0, false, false, 0); + SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, stream, null, false, false, 0, false, false, 1); + logger.info("[1078-点播] deviceId: {}, channelId: {}, 端口: {}", deviceId, channelId, ssrcInfo.getPort()); J9101 j9101 = new J9101(); j9101.setChannel(Integer.valueOf(channelId)); j9101.setIp(mediaServerItem.getSdpIp()); @@ -161,6 +182,7 @@ public class jt1078ServiceImpl implements Ijt1078Service { j9101.setType(0); String s = jt1078Template.startLive(deviceId, j9101, 6); System.out.println("ssss=== " + s); + } public StreamInfo onPublishHandler(MediaServerItem mediaServerItem, OnStreamChangedHookParam hookParam, String deviceId, String channelId) {