Merge branch 'wvp-28181-2.0' into wvp-28181-2.0-multi-network

# Conflicts:
#	src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java
#	src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java
#	src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
#	src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java
This commit is contained in:
648540858
2022-11-17 11:31:30 +08:00
33 changed files with 377 additions and 267 deletions

View File

@@ -1,19 +1,19 @@
package com.genersoft.iot.vmp.media.zlm;
import java.text.ParseException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform;
import com.genersoft.iot.vmp.media.zlm.dto.*;
import com.genersoft.iot.vmp.media.zlm.dto.HookType;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.media.zlm.dto.StreamAuthorityInfo;
import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem;
import com.genersoft.iot.vmp.media.zlm.dto.hook.*;
import com.genersoft.iot.vmp.service.*;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
@@ -24,18 +24,15 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.util.ObjectUtils;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RestController;
import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander;
import org.springframework.web.bind.annotation.*;
import javax.servlet.http.HttpServletRequest;
import javax.sip.InvalidArgumentException;
import javax.sip.SipException;
import java.text.ParseException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* @description:针对 ZLMediaServer的hook事件监听
@@ -186,7 +183,7 @@ public class ZLMHttpHookListener {
if (!"rtp".equals(param.getApp())) {
if (userSetting.getPushAuthority()) {
// 推流鉴权
// 推流鉴权
if (param.getParams() == null) {
logger.info("推流鉴权失败: 缺少不要参数sign=md5(user表的pushKey)");
ret.put("code", 401);
@@ -571,6 +568,8 @@ public class ZLMHttpHookListener {
public JSONObject onServerStarted(HttpServletRequest request, @RequestBody JSONObject jsonObject){
jsonObject.put("ip", request.getRemoteAddr());
System.out.println(jsonObject.toJSONString()
);
ZLMServerConfig zlmServerConfig = JSON.to(ZLMServerConfig.class, jsonObject);
zlmServerConfig.setIp(request.getRemoteAddr());
logger.info("[ZLM HOOK] zlm 启动 " + zlmServerConfig.getGeneralMediaServerId());
@@ -627,6 +626,32 @@ public class ZLMHttpHookListener {
return ret;
}
/**
* rtpServer收流超时
*/
@ResponseBody
@PostMapping(value = "/on_rtp_server_timeout", produces = "application/json;charset=UTF-8")
public JSONObject onRtpServerTimeout(HttpServletRequest request, @RequestBody OnRtpServerTimeoutHookParam param){
System.out.println(param);
logger.info("[ZLM HOOK] rtpServer收流超时{}->{}({})", param.getMediaServerId(), param.getStream_id(), param.getSsrc());
JSONObject ret = new JSONObject();
ret.put("code", 0);
ret.put("msg", "success");
taskExecutor.execute(()->{
JSONObject json = (JSONObject) JSON.toJSON(param);
List<ZlmHttpHookSubscribe.Event> subscribes = this.subscribe.getSubscribes(HookType.on_rtp_server_timeout);
if (subscribes != null && subscribes.size() > 0) {
for (ZlmHttpHookSubscribe.Event subscribe : subscribes) {
subscribe.response(null, json);
}
}
});
return ret;
}
private Map<String, String> urlParamToMap(String params) {
HashMap<String, String> map = new HashMap<>();
if (ObjectUtils.isEmpty(params)) {

View File

@@ -1,16 +1,15 @@
package com.genersoft.iot.vmp.media.zlm;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONArray;
import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.media.zlm.dto.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;
import java.util.*;
@@ -25,6 +24,9 @@ public class ZLMRTPServerFactory {
@Autowired
private UserSetting userSetting;
@Autowired
private ZlmHttpHookSubscribe hookSubscribe;
private int[] portRangeArray = new int[2];
public int getFreePort(MediaServerItem mediaServerItem, int startPort, int endPort, List<Integer> usedFreelist) {
@@ -142,7 +144,7 @@ public class ZLMRTPServerFactory {
return result;
}
public boolean closeRTPServer(MediaServerItem serverItem, String streamId) {
public boolean closeRtpServer(MediaServerItem serverItem, String streamId) {
boolean result = false;
if (serverItem !=null){
Map<String, Object> param = new HashMap<>();
@@ -162,32 +164,6 @@ public class ZLMRTPServerFactory {
return result;
}
// private int getPortFromportRange(MediaServerItem mediaServerItem) {
// int currentPort = mediaServerItem.getCurrentPort();
// if (currentPort == 0) {
// String[] portRangeStrArray = mediaServerItem.getSendRtpPortRange().split(",");
// if (portRangeStrArray.length != 2) {
// portRangeArray[0] = 30000;
// portRangeArray[1] = 30500;
// }else {
// portRangeArray[0] = Integer.parseInt(portRangeStrArray[0]);
// portRangeArray[1] = Integer.parseInt(portRangeStrArray[1]);
// }
// }
//
// if (currentPort == 0 || currentPort++ > portRangeArray[1]) {
// currentPort = portRangeArray[0];
// mediaServerItem.setCurrentPort(currentPort);
// return portRangeArray[0];
// } else {
// if (currentPort % 2 == 1) {
// currentPort++;
// }
// currentPort++;
// mediaServerItem.setCurrentPort(currentPort);
// return currentPort;
// }
// }
/**
* 创建一个国标推流
@@ -201,21 +177,15 @@ public class ZLMRTPServerFactory {
*/
public SendRtpItem createSendRtpItem(MediaServerItem serverItem, String ip, int port, String ssrc, String platformId, String deviceId, String channelId, boolean tcp){
// 使用RTPServer 功能找一个可用的端口
String sendRtpPortRange = serverItem.getSendRtpPortRange();
if (ObjectUtils.isEmpty(sendRtpPortRange)) {
return null;
}
String[] portRangeStrArray = serverItem.getSendRtpPortRange().split(",");
int localPort = -1;
if (portRangeStrArray.length != 2) {
localPort = getFreePort(serverItem, 30000, 30500, null);
}else {
localPort = getFreePort(serverItem, Integer.parseInt(portRangeStrArray[0]), Integer.parseInt(portRangeStrArray[1]), null);
}
if (localPort == -1) {
logger.error("没有可用的端口");
return null;
// 默认为随机端口
int localPort = 0;
if (userSetting.getGbSendStreamStrict()) {
if (userSetting.getGbSendStreamStrict()) {
localPort = keepPort(serverItem, ssrc);
if (localPort == 0) {
return null;
}
}
}
SendRtpItem sendRtpItem = new SendRtpItem();
sendRtpItem.setIp(ip);
@@ -243,21 +213,13 @@ public class ZLMRTPServerFactory {
* @return SendRtpItem
*/
public SendRtpItem createSendRtpItem(MediaServerItem serverItem, String ip, int port, String ssrc, String platformId, String app, String stream, String channelId, boolean tcp){
// 使用RTPServer 功能找一个可用的端口
String sendRtpPortRange = serverItem.getSendRtpPortRange();
if (ObjectUtils.isEmpty(sendRtpPortRange)) {
return null;
}
String[] portRangeStrArray = serverItem.getSendRtpPortRange().split(",");
int localPort = -1;
if (portRangeStrArray.length != 2) {
localPort = getFreePort(serverItem, 30000, 30500, null);
}else {
localPort = getFreePort(serverItem, Integer.parseInt(portRangeStrArray[0]), Integer.parseInt(portRangeStrArray[1]), null);
}
if (localPort == -1) {
logger.error("没有可用的端口");
return null;
// 默认为随机端口
int localPort = 0;
if (userSetting.getGbSendStreamStrict()) {
localPort = keepPort(serverItem, ssrc);
if (localPort == 0) {
return null;
}
}
SendRtpItem sendRtpItem = new SendRtpItem();
sendRtpItem.setIp(ip);
@@ -274,6 +236,42 @@ public class ZLMRTPServerFactory {
return sendRtpItem;
}
/**
* 保持端口,直到需要需要发流时再释放
*/
public int keepPort(MediaServerItem serverItem, String ssrc) {
int localPort = 0;
Map<String, Object> param = new HashMap<>(3);
param.put("port", 0);
param.put("enable_tcp", 1);
param.put("stream_id", ssrc);
JSONObject jsonObject = zlmresTfulUtils.openRtpServer(serverItem, param);
if (jsonObject.getInteger("code") == 0) {
localPort = jsonObject.getInteger("port");
HookSubscribeForRtpServerTimeout hookSubscribeForRtpServerTimeout = HookSubscribeFactory.on_rtp_server_timeout(ssrc, null, serverItem.getId());
// 订阅 zlm启动事件, 新的zlm也会从这里进入系统
hookSubscribe.addSubscribe(hookSubscribeForRtpServerTimeout,
(MediaServerItem mediaServerItem, JSONObject response)->{
logger.info("[上级点播] {}->监听端口到期继续保持监听", ssrc);
keepPort(serverItem, ssrc);
});
}
logger.info("[上级点播] {}->监听端口: {}", ssrc, localPort);
return localPort;
}
/**
* 释放保持的端口
*/
public boolean releasePort(MediaServerItem serverItem, String ssrc) {
logger.info("[上级点播] {}->释放监听端口", ssrc);
boolean closeRTPServerResult = closeRtpServer(serverItem, ssrc);
HookSubscribeForRtpServerTimeout hookSubscribeForRtpServerTimeout = HookSubscribeFactory.on_rtp_server_timeout(ssrc, null, serverItem.getId());
// 订阅 zlm启动事件, 新的zlm也会从这里进入系统
hookSubscribe.removeSubscribe(hookSubscribeForRtpServerTimeout);
return closeRTPServerResult;
}
/**
* 调用zlm RESTFUL API —— startSendRtp
*/
@@ -294,7 +292,8 @@ public class ZLMRTPServerFactory {
*/
public Boolean isStreamReady(MediaServerItem mediaServerItem, String app, String streamId) {
JSONObject mediaInfo = zlmresTfulUtils.getMediaList(mediaServerItem, app, streamId);
return (mediaInfo.getInteger("code") == 0
return mediaInfo != null && (mediaInfo.getInteger("code") == 0
&& mediaInfo.getJSONArray("data") != null
&& mediaInfo.getJSONArray("data").size() > 0);
}
@@ -333,7 +332,7 @@ public class ZLMRTPServerFactory {
result= true;
logger.info("[停止RTP推流] 成功");
} else {
logger.error("[停止RTP推流] 失败: {}, 参数:{}->\r\n{}",jsonObject.getString("msg"),jsonObject.toJSONString(param));
logger.error("[停止RTP推流] 失败: {}, 参数:{}->\r\n{}",jsonObject.getString("msg"), JSON.toJSON(param), jsonObject);
}
return result;
}

View File

@@ -18,7 +18,11 @@ import org.springframework.core.annotation.Order;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import java.util.*;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
@Component
@Order(value=2)
@@ -73,8 +77,6 @@ public class ZLMRunner implements CommandLineRunner {
}
});
// 获取zlm信息
logger.info("[zlm] 等待默认zlm中...");
@@ -87,7 +89,7 @@ public class ZLMRunner implements CommandLineRunner {
}
for (MediaServerItem mediaServerItem : all) {
if (startGetMedia == null) {
startGetMedia = new HashMap<>();
startGetMedia = new ConcurrentHashMap<>();
}
startGetMedia.put(mediaServerItem.getId(), true);
connectZlmServer(mediaServerItem);
@@ -95,7 +97,7 @@ public class ZLMRunner implements CommandLineRunner {
}
String taskKey = "zlm-connect-timeout";
dynamicTask.startDelay(taskKey, ()->{
if (startGetMedia != null) {
if (startGetMedia != null && startGetMedia.size() > 0) {
Set<String> allZlmId = startGetMedia.keySet();
for (String id : allZlmId) {
logger.error("[ {} ]]主动连接失败,不再尝试连接", id);

View File

@@ -66,7 +66,7 @@ public class ZLMServerConfig {
private String hookAdminParams;
@JSONField(name = "hook.alive_interval")
private int hookAliveInterval;
private Float hookAliveInterval;
@JSONField(name = "hook.enable")
private String hookEnable;
@@ -798,11 +798,11 @@ public class ZLMServerConfig {
this.shellPhell = shellPhell;
}
public int getHookAliveInterval() {
public Float getHookAliveInterval() {
return hookAliveInterval;
}
public void setHookAliveInterval(int hookAliveInterval) {
public void setHookAliveInterval(Float hookAliveInterval) {
this.hookAliveInterval = hookAliveInterval;
}

View File

@@ -24,6 +24,17 @@ public class HookSubscribeFactory {
return hookSubscribe;
}
public static HookSubscribeForRtpServerTimeout on_rtp_server_timeout(String stream, String ssrc, String mediaServerId) {
HookSubscribeForRtpServerTimeout hookSubscribe = new HookSubscribeForRtpServerTimeout();
JSONObject subscribeKey = new com.alibaba.fastjson2.JSONObject();
subscribeKey.put("stream_id", stream);
subscribeKey.put("ssrc", ssrc);
subscribeKey.put("mediaServerId", mediaServerId);
hookSubscribe.setContent(subscribeKey);
return hookSubscribe;
}
public static HookSubscribeForServerStarted on_server_started() {
HookSubscribeForServerStarted hookSubscribe = new HookSubscribeForServerStarted();
hookSubscribe.setContent(new JSONObject());

View File

@@ -0,0 +1,44 @@
package com.genersoft.iot.vmp.media.zlm.dto;
import com.alibaba.fastjson2.JSONObject;
import com.alibaba.fastjson2.annotation.JSONField;
import java.time.Instant;
/**
* hook订阅-收流超时
* @author lin
*/
public class HookSubscribeForRtpServerTimeout implements IHookSubscribe{
private HookType hookType = HookType.on_rtp_server_timeout;
private JSONObject content;
@JSONField(format="yyyy-MM-dd HH:mm:ss")
private Instant expires;
@Override
public HookType getHookType() {
return hookType;
}
@Override
public JSONObject getContent() {
return content;
}
public void setContent(JSONObject content) {
this.content = content;
}
@Override
public Instant getExpires() {
return expires;
}
@Override
public void setExpires(Instant expires) {
this.expires = expires;
}
}

View File

@@ -15,6 +15,7 @@ public class HookSubscribeForStreamChange implements IHookSubscribe{
private JSONObject content;
@JSONField(format="yyyy-MM-dd HH:mm:ss")
private Instant expires;
@Override

View File

@@ -19,5 +19,7 @@ public enum HookType {
on_stream_none_reader,
on_stream_not_found,
on_server_started,
on_rtp_server_timeout,
on_server_keepalive
}

View File

@@ -5,7 +5,6 @@ import com.genersoft.iot.vmp.gb28181.session.SsrcConfig;
import com.genersoft.iot.vmp.media.zlm.ZLMServerConfig;
import io.swagger.v3.oas.annotations.media.Schema;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;
import java.util.HashMap;
@@ -55,7 +54,7 @@ public class MediaServerItem{
private String secret;
@Schema(description = "keepalive hook触发间隔,单位秒")
private int hookAliveInterval;
private Float hookAliveInterval;
@Schema(description = "是否使用多端口模式")
private boolean rtpEnable;
@@ -66,9 +65,6 @@ public class MediaServerItem{
@Schema(description = "多端口RTP收流端口范围")
private String rtpPortRange;
@Schema(description = "RTP发流端口范围")
private String sendRtpPortRange;
@Schema(description = "assist服务端口")
private int recordAssistPort;
@@ -119,7 +115,6 @@ public class MediaServerItem{
hookAliveInterval = zlmServerConfig.getHookAliveInterval();
rtpEnable = false; // 默认使用单端口;直到用户自己设置开启多端口
rtpPortRange = zlmServerConfig.getPortRange().replace("_",","); // 默认使用30000,30500作为级联时发送流的端口号
sendRtpPortRange = "30000,30500"; // 默认使用30000,30500作为级联时发送流的端口号
recordAssistPort = 0; // 默认关闭
}
@@ -324,19 +319,11 @@ public class MediaServerItem{
this.lastKeepaliveTime = lastKeepaliveTime;
}
public String getSendRtpPortRange() {
return sendRtpPortRange;
}
public void setSendRtpPortRange(String sendRtpPortRange) {
this.sendRtpPortRange = sendRtpPortRange;
}
public int getHookAliveInterval() {
public Float getHookAliveInterval() {
return hookAliveInterval;
}
public void setHookAliveInterval(int hookAliveInterval) {
public void setHookAliveInterval(Float hookAliveInterval) {
this.hookAliveInterval = hookAliveInterval;
}
}

View File

@@ -0,0 +1,53 @@
package com.genersoft.iot.vmp.media.zlm.dto.hook;
/**
* zlm hook事件中的on_rtp_server_timeout事件的参数
* @author lin
*/
public class OnRtpServerTimeoutHookParam extends HookParam{
private int local_port;
private String stream_id;
private int tcpMode;
private boolean re_use_port;
private String ssrc;
public int getLocal_port() {
return local_port;
}
public void setLocal_port(int local_port) {
this.local_port = local_port;
}
public String getStream_id() {
return stream_id;
}
public void setStream_id(String stream_id) {
this.stream_id = stream_id;
}
public int getTcpMode() {
return tcpMode;
}
public void setTcpMode(int tcpMode) {
this.tcpMode = tcpMode;
}
public boolean isRe_use_port() {
return re_use_port;
}
public void setRe_use_port(boolean re_use_port) {
this.re_use_port = re_use_port;
}
public String getSsrc() {
return ssrc;
}
public void setSsrc(String ssrc) {
this.ssrc = ssrc;
}
}