添加推流列表和拉流代理,下一步与国标关联

This commit is contained in:
panlinlin
2021-03-30 18:46:34 +08:00
parent f8fe76add2
commit 56859d09df
37 changed files with 1417 additions and 217 deletions

View File

@@ -1,5 +1,6 @@
package com.genersoft.iot.vmp.media.zlm;
import java.util.List;
import java.util.UUID;
import com.alibaba.fastjson.JSON;
@@ -272,26 +273,35 @@ public class ZLMHttpHookListener {
}
String streamId = json.getString("stream");
String app = json.getString("app");
StreamInfo streamInfo = redisCatchStorage.queryPlayByStreamId(streamId);
JSONObject ret = new JSONObject();
ret.put("code", 0);
ret.put("close", true);
if (streamInfo != null) {
if (redisCatchStorage.isChannelSendingRTP(streamInfo.getChannelId())) {
ret.put("close", false);
} else {
if ("rtp".equals(app)){
JSONObject ret = new JSONObject();
ret.put("code", 0);
ret.put("close", true);
if (streamInfo != null) {
if (redisCatchStorage.isChannelSendingRTP(streamInfo.getChannelId())) {
ret.put("close", false);
} else {
cmder.streamByeCmd(streamId);
redisCatchStorage.stopPlay(streamInfo);
storager.stopPlay(streamInfo.getDeviceID(), streamInfo.getChannelId());
}
}else{
cmder.streamByeCmd(streamId);
redisCatchStorage.stopPlay(streamInfo);
storager.stopPlay(streamInfo.getDeviceID(), streamInfo.getChannelId());
streamInfo = redisCatchStorage.queryPlaybackByStreamId(streamId);
redisCatchStorage.stopPlayback(streamInfo);
}
}else{
cmder.streamByeCmd(streamId);
streamInfo = redisCatchStorage.queryPlaybackByStreamId(streamId);
redisCatchStorage.stopPlayback(streamInfo);
return new ResponseEntity<String>(ret.toString(),HttpStatus.OK);
}else {
JSONObject ret = new JSONObject();
ret.put("code", 0);
ret.put("close", false);
return new ResponseEntity<String>(ret.toString(),HttpStatus.OK);
}
return new ResponseEntity<String>(ret.toString(),HttpStatus.OK);
}
/**
@@ -350,10 +360,21 @@ public class ZLMHttpHookListener {
// String data = json.getString("data");
// List<MediaServerConfig> mediaServerConfigs = JSON.parseArray(JSON.toJSONString(json), MediaServerConfig.class);
// MediaServerConfig mediaServerConfig = mediaServerConfigs.get(0);
List<ZLMHttpHookSubscribe.Event> subscribes = this.subscribe.getSubscribes(ZLMHttpHookSubscribe.HookType.on_server_started);
if (subscribes != null && subscribes.size() > 0) {
for (ZLMHttpHookSubscribe.Event subscribe : subscribes) {
subscribe.response(json);
}
}
MediaServerConfig mediaServerConfig = JSON.toJavaObject(json, MediaServerConfig.class);
mediaServerConfig.setWanIp(StringUtils.isEmpty(mediaWanIp)? mediaIp: mediaWanIp);
mediaServerConfig.setLocalIP(mediaIp);
redisCatchStorage.updateMediaInfo(mediaServerConfig);
// 重新发起代理
JSONObject ret = new JSONObject();
ret.put("code", 0);
ret.put("msg", "success");

View File

@@ -3,7 +3,9 @@ package com.genersoft.iot.vmp.media.zlm;
import com.alibaba.fastjson.JSONObject;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -67,4 +69,19 @@ public class ZLMHttpHookSubscribe {
}
return event;
}
public List<ZLMHttpHookSubscribe.Event> getSubscribes(HookType type) {
ZLMHttpHookSubscribe.Event event= null;
Map<JSONObject, Event> eventMap = allSubscribes.get(type);
if (eventMap == null) {
return null;
}
List<ZLMHttpHookSubscribe.Event> result = new ArrayList<>();
for (JSONObject key : eventMap.keySet()) {
result.add(eventMap.get(key));
}
return result;
}
}

View File

@@ -32,6 +32,7 @@ public class ZLMMediaListManager {
public void updateMediaList() {
JSONObject mediaList = zlmresTfulUtils.getMediaList();
if (mediaList == null) return;
String dataStr = mediaList.getString("data");
Integer code = mediaList.getInteger("code");

View File

@@ -131,4 +131,25 @@ public class ZLMRESTfulUtils {
public JSONObject stopSendRtp(Map<String, Object> param) {
return sendPost("stopSendRtp",param);
}
public JSONObject addStreamProxy(String app, String stream, String url, boolean enable_hls, boolean enable_mp4, String rtp_type) {
Map<String, Object> param = new HashMap<>();
param.put("vhost", "__defaultVhost__");
param.put("app", app);
param.put("stream", stream);
param.put("url", url);
param.put("enable_hls", enable_hls?1:0);
param.put("enable_mp4", enable_mp4?1:0);
param.put("rtp_type", rtp_type);
return sendPost("addStreamProxy",param);
}
public JSONObject closeStreams(String app, String stream) {
Map<String, Object> param = new HashMap<>();
param.put("vhost", "__defaultVhost__");
param.put("app", app);
param.put("stream", stream);
param.put("force", 1);
return sendPost("close_streams",param);
}
}

View File

@@ -4,8 +4,11 @@ import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.genersoft.iot.vmp.conf.MediaServerConfig;
import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyDto;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
//import com.genersoft.iot.vmp.storager.IVideoManagerStorager;
import com.genersoft.iot.vmp.storager.IVideoManagerStorager;
import com.genersoft.iot.vmp.vmanager.service.IStreamProxyService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -16,6 +19,7 @@ import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@Component
@@ -24,8 +28,8 @@ public class ZLMRunner implements CommandLineRunner {
private final static Logger logger = LoggerFactory.getLogger(ZLMRunner.class);
// @Autowired
// private IVideoManagerStorager storager;
@Autowired
private IVideoManagerStorager storager;
@Autowired
private IRedisCatchStorage redisCatchStorage;
@@ -63,18 +67,27 @@ public class ZLMRunner implements CommandLineRunner {
@Autowired
private ZLMMediaListManager zlmMediaListManager;
@Autowired
private ZLMHttpHookSubscribe hookSubscribe;
@Autowired
private IStreamProxyService streamProxyService;
@Override
public void run(String... strings) throws Exception {
JSONObject subscribeKey = new JSONObject();
// 订阅 zlm启动事件
hookSubscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_server_started,subscribeKey,(response)->{
MediaServerConfig mediaServerConfig = JSONObject.toJavaObject(response, MediaServerConfig.class);
zLmRunning(mediaServerConfig);
});
// 获取zlm信息
logger.info("等待zlm接入...");
MediaServerConfig mediaServerConfig = getMediaServerConfig();
if (mediaServerConfig != null) {
logger.info("zlm接入成功...");
if (autoConfig) saveZLMConfig();
mediaServerConfig = getMediaServerConfig();
redisCatchStorage.updateMediaInfo(mediaServerConfig);
// 更新流列表
zlmMediaListManager.updateMediaList();
zLmRunning(mediaServerConfig);
}
}
@@ -85,8 +98,7 @@ public class ZLMRunner implements CommandLineRunner {
JSONArray data = responseJSON.getJSONArray("data");
if (data != null && data.size() > 0) {
mediaServerConfig = JSON.parseObject(JSON.toJSONString(data.get(0)), MediaServerConfig.class);
mediaServerConfig.setLocalIP(mediaIp);
mediaServerConfig.setWanIp(StringUtils.isEmpty(mediaWanIp)? mediaIp: mediaWanIp);
}
} else {
logger.error("getMediaServerConfig失败, 1s后重试");
@@ -136,4 +148,27 @@ public class ZLMRunner implements CommandLineRunner {
}
}
/**
* zlm 连接成功或者zlm重启后
*/
private void zLmRunning(MediaServerConfig mediaServerConfig){
logger.info("zlm接入成功...");
if (autoConfig) saveZLMConfig();
MediaServerConfig mediaInfo = redisCatchStorage.getMediaInfo();
if (System.currentTimeMillis() - mediaInfo.getUpdateTime() < 50){
logger.info("zlm刚刚更新忽略这次更新");
return;
}
mediaServerConfig.setLocalIP(mediaIp);
mediaServerConfig.setWanIp(StringUtils.isEmpty(mediaWanIp)? mediaIp: mediaWanIp);
redisCatchStorage.updateMediaInfo(mediaServerConfig);
// 更新流列表
zlmMediaListManager.updateMediaList();
// 恢复流代理
List<StreamProxyDto> streamProxyListForEnable = storager.getStreamProxyListForEnable(true);
for (StreamProxyDto streamProxyDto : streamProxyListForEnable) {
logger.info("恢复流代理," + streamProxyDto.getApp() + "/" + streamProxyDto.getStream());
streamProxyService.addStreamProxyToZlm(streamProxyDto);
}
}
}

View File

@@ -0,0 +1,112 @@
package com.genersoft.iot.vmp.media.zlm.dto;
public class StreamProxyDto {
private String type;
private String app;
private String stream;
private String url;
private String src_url;
private String dst_url;
private int timeout_ms;
private String ffmpeg_cmd_key;
private String rtp_type;
private boolean enable;
private boolean enable_hls;
private boolean enable_mp4;
public String getType() {
return type;
}
public void setType(String type) {
this.type = type;
}
public String getApp() {
return app;
}
public void setApp(String app) {
this.app = app;
}
public String getStream() {
return stream;
}
public void setStream(String stream) {
this.stream = stream;
}
public String getUrl() {
return url;
}
public void setUrl(String url) {
this.url = url;
}
public String getSrc_url() {
return src_url;
}
public void setSrc_url(String src_url) {
this.src_url = src_url;
}
public String getDst_url() {
return dst_url;
}
public void setDst_url(String dst_url) {
this.dst_url = dst_url;
}
public int getTimeout_ms() {
return timeout_ms;
}
public void setTimeout_ms(int timeout_ms) {
this.timeout_ms = timeout_ms;
}
public String getFfmpeg_cmd_key() {
return ffmpeg_cmd_key;
}
public void setFfmpeg_cmd_key(String ffmpeg_cmd_key) {
this.ffmpeg_cmd_key = ffmpeg_cmd_key;
}
public String getRtp_type() {
return rtp_type;
}
public void setRtp_type(String rtp_type) {
this.rtp_type = rtp_type;
}
public boolean isEnable() {
return enable;
}
public void setEnable(boolean enable) {
this.enable = enable;
}
public boolean isEnable_hls() {
return enable_hls;
}
public void setEnable_hls(boolean enable_hls) {
this.enable_hls = enable_hls;
}
public boolean isEnable_mp4() {
return enable_mp4;
}
public void setEnable_mp4(boolean enable_mp4) {
this.enable_mp4 = enable_mp4;
}
}