增加推流转发到国标,尚不完善

This commit is contained in:
panlinlin
2021-04-02 19:04:01 +08:00
parent f83a192bfd
commit b17280522b
42 changed files with 720 additions and 509 deletions

View File

@@ -9,7 +9,7 @@ import com.genersoft.iot.vmp.conf.MediaServerConfig;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorager;
import com.genersoft.iot.vmp.vmanager.service.IPlayService;
import com.genersoft.iot.vmp.service.IPlayService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -240,8 +240,9 @@ public class ZLMHttpHookListener {
String streamId = json.getString("stream");
String schema = json.getString("schema");
boolean regist = json.getBoolean("regist");
StreamInfo streamInfo = redisCatchStorage.queryPlayByStreamId(streamId);
if ("rtp".equals(app) && !regist ) {
StreamInfo streamInfo = redisCatchStorage.queryPlayByStreamId(streamId);
if (streamInfo!=null){
redisCatchStorage.stopPlay(streamInfo);
storager.stopPlay(streamInfo.getDeviceID(), streamInfo.getChannelId());
@@ -250,8 +251,12 @@ public class ZLMHttpHookListener {
redisCatchStorage.stopPlayback(streamInfo);
}
}else {
if (!"rtp".equals(app) && "rtsp".equals(schema)){
zlmMediaListManager.updateMediaList();
if (!"rtp".equals(app) && "rtmp".equals(schema)){
if (regist) {
zlmMediaListManager.addMedia(app, streamId);
}else {
zlmMediaListManager.removeMedia(app, streamId);
}
}
}
JSONObject ret = new JSONObject();

View File

@@ -70,6 +70,11 @@ public class ZLMHttpHookSubscribe {
return event;
}
/**
* 获取某个类型的所有的订阅
* @param type
* @return
*/
public List<ZLMHttpHookSubscribe.Event> getSubscribes(HookType type) {
ZLMHttpHookSubscribe.Event event= null;
Map<JSONObject, Event> eventMap = allSubscribes.get(type);

View File

@@ -1,19 +1,20 @@
package com.genersoft.iot.vmp.media.zlm;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.TypeReference;
import com.genersoft.iot.vmp.common.RealVideo;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
import com.genersoft.iot.vmp.gb28181.session.SsrcUtil;
import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem;
import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
import com.genersoft.iot.vmp.gb28181.bean.GbStream;
import com.genersoft.iot.vmp.media.zlm.dto.MediaItem;
import com.genersoft.iot.vmp.service.IStreamPushService;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.impl.RedisCatchStorageImpl;
import com.genersoft.iot.vmp.storager.IVideoManagerStorager;
import com.genersoft.iot.vmp.storager.dao.GbStreamMapper;
import com.genersoft.iot.vmp.storager.dao.PlatformGbStreamMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import java.util.*;
@@ -29,60 +30,79 @@ public class ZLMMediaListManager {
@Autowired
private IRedisCatchStorage redisCatchStorage;
@Autowired
private IVideoManagerStorager storager;
@Autowired
private GbStreamMapper gbStreamMapper;
@Autowired
private PlatformGbStreamMapper platformGbStreamMapper;
@Autowired
private IStreamPushService streamPushService;
public void updateMediaList() {
JSONObject mediaList = zlmresTfulUtils.getMediaList();
if (mediaList == null) return;
String dataStr = mediaList.getString("data");
storager.clearMediaList();
Integer code = mediaList.getInteger("code");
Map<String, RealVideo> result = new HashMap<>();
if (code == 0 ) {
if (dataStr != null) {
List<MediaItem> mediaItems = JSON.parseObject(dataStr, new TypeReference<List<MediaItem>>() {});
for (MediaItem item : mediaItems) {
if ("rtp".equals(item.getApp())) {
continue;
}
String key = item.getApp() + "_" + item.getStream();
RealVideo realVideo = result.get(key);
if (realVideo == null) {
realVideo = new RealVideo();
realVideo.setApp(item.getApp());
realVideo.setStream(item.getStream());
realVideo.setAliveSecond(item.getAliveSecond());
realVideo.setCreateStamp(item.getCreateStamp());
realVideo.setOriginSock(item.getOriginSock());
realVideo.setTotalReaderCount(item.getTotalReaderCount());
realVideo.setOriginType(item.getOriginType());
realVideo.setOriginTypeStr(item.getOriginTypeStr());
realVideo.setOriginUrl(item.getOriginUrl());
realVideo.setCreateStamp(item.getCreateStamp());
realVideo.setAliveSecond(item.getAliveSecond());
// 使用异步的当时更新媒体流列表
zlmresTfulUtils.getMediaList((mediaList ->{
if (mediaList == null) return;
String dataStr = mediaList.getString("data");
ArrayList<RealVideo.MediaSchema> mediaSchemas = new ArrayList<>();
realVideo.setSchemas(mediaSchemas);
realVideo.setTracks(item.getTracks());
realVideo.setVhost(item.getVhost());
result.put(key, realVideo);
}
RealVideo.MediaSchema mediaSchema = new RealVideo.MediaSchema();
mediaSchema.setSchema(item.getSchema());
mediaSchema.setBytesSpeed(item.getBytesSpeed());
realVideo.getSchemas().add(mediaSchema);
Integer code = mediaList.getInteger("code");
Map<String, StreamPushItem> result = new HashMap<>();
List<StreamPushItem> streamPushItems = null;
// 获取所有的国标关联
List<GbStream> gbStreams = gbStreamMapper.selectAll();
if (code == 0 ) {
if (dataStr != null) {
streamPushItems = streamPushService.handleJSON(dataStr);
}
}else {
logger.warn("更新视频流失败错误code " + code);
}
}else {
logger.warn("更新视频流失败错误code " + code);
}
List<RealVideo> realVideos = new ArrayList<>(result.values());
Collections.sort(realVideos);
redisCatchStorage.updateMediaList(realVideos);
if (streamPushItems != null) {
storager.updateMediaList(streamPushItems);
}
}));
}
public void addMedia(String app, String streamId) {
//使用异步更新推流
zlmresTfulUtils.getMediaList(app, streamId, "rtmp", json->{
if (json == null) return;
String dataStr = json.getString("data");
Integer code = json.getInteger("code");
Map<String, StreamPushItem> result = new HashMap<>();
List<StreamPushItem> streamPushItems = null;
if (code == 0 ) {
if (dataStr != null) {
streamPushItems = streamPushService.handleJSON(dataStr);
}
}else {
logger.warn("更新视频流失败错误code " + code);
}
if (streamPushItems != null && streamPushItems.size() == 1) {
storager.updateMedia(streamPushItems.get(0));
}
});
}
public void removeMedia(String app, String streamId) {
// 查找是否关联了国标, 关联了不删除, 置为离线
StreamProxyItem streamProxyItem = gbStreamMapper.selectOne(app, streamId);
if (streamProxyItem == null) {
storager.removeMedia(app, streamId);
}else {
storager.mediaOutline(app, streamId);
}
}
}

View File

@@ -3,6 +3,7 @@ package com.genersoft.iot.vmp.media.zlm;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import okhttp3.*;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
@@ -27,7 +28,11 @@ public class ZLMRESTfulUtils {
@Value("${media.secret}")
private String mediaSecret;
public JSONObject sendPost(String api, Map<String, Object> param) {
public interface RequestCallback{
void run(JSONObject response);
}
public JSONObject sendPost(String api, Map<String, Object> param, RequestCallback callback) {
OkHttpClient client = new OkHttpClient();
String url = String.format("http://%s:%s/index/api/%s", mediaIp, mediaPort, api);
JSONObject responseJSON = null;
@@ -47,34 +52,68 @@ public class ZLMRESTfulUtils {
.post(body)
.url(url)
.build();
try {
Response response = client.newCall(request).execute();
if (response.isSuccessful()) {
String responseStr = response.body().string();
if (responseStr != null) {
responseJSON = JSON.parseObject(responseStr);
if (callback == null) {
try {
Response response = client.newCall(request).execute();
if (response.isSuccessful()) {
String responseStr = response.body().string();
if (responseStr != null) {
responseJSON = JSON.parseObject(responseStr);
}
}
} catch (ConnectException e) {
logger.error(String.format("连接ZLM失败: %s, %s", e.getCause().getMessage(), e.getMessage()));
logger.info("请检查media配置并确认ZLM已启动...");
}catch (IOException e) {
e.printStackTrace();
}
}else {
client.newCall(request).enqueue(new Callback(){
@Override
public void onResponse(@NotNull Call call, @NotNull Response response){
if (response.isSuccessful()) {
try {
String responseStr = response.body().string();
if (responseStr != null) {
callback.run(JSON.parseObject(responseStr));
}else {
callback.run(null);
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
@Override
public void onFailure(@NotNull Call call, @NotNull IOException e) {
logger.error(String.format("连接ZLM失败: %s, %s", e.getCause().getMessage(), e.getMessage()));
logger.info("请检查media配置并确认ZLM已启动...");
}
});
}
} catch (ConnectException e) {
logger.error(String.format("连接ZLM失败: %s, %s", e.getCause().getMessage(), e.getMessage()));
logger.info("请检查media配置并确认ZLM已启动...");
}catch (IOException e) {
e.printStackTrace();
}
return responseJSON;
}
public JSONObject getMediaList(String app, String schema){
public JSONObject getMediaList(String app, String stream, String schema, RequestCallback callback){
Map<String, Object> param = new HashMap<>();
param.put("app",app);
param.put("schema",schema);
if (app != null) param.put("app",app);
if (stream != null) param.put("stream",stream);
if (schema != null) param.put("schema",schema);
param.put("vhost","__defaultVhost__");
return sendPost("getMediaList",param);
return sendPost("getMediaList",param, callback);
}
public JSONObject getMediaList(){
return sendPost("getMediaList",null);
public JSONObject getMediaList(String app, String stream){
return getMediaList(app, stream,null, null);
}
public JSONObject getMediaList(RequestCallback callback){
return sendPost("getMediaList",null, callback);
}
public JSONObject getMediaInfo(String app, String schema, String stream){
@@ -83,13 +122,13 @@ public class ZLMRESTfulUtils {
param.put("schema",schema);
param.put("stream",stream);
param.put("vhost","__defaultVhost__");
return sendPost("getMediaInfo",param);
return sendPost("getMediaInfo",param, null);
}
public JSONObject getRtpInfo(String stream_id){
Map<String, Object> param = new HashMap<>();
param.put("stream_id",stream_id);
return sendPost("getRtpInfo",param);
return sendPost("getRtpInfo",param, null);
}
public JSONObject addFFmpegSource(String src_url, String dst_url, String timeout_ms){
@@ -99,37 +138,37 @@ public class ZLMRESTfulUtils {
param.put("src_url", src_url);
param.put("dst_url", dst_url);
param.put("timeout_ms", timeout_ms);
return sendPost("addFFmpegSource",param);
return sendPost("addFFmpegSource",param, null);
}
public JSONObject delFFmpegSource(String key){
Map<String, Object> param = new HashMap<>();
param.put("key", key);
return sendPost("delFFmpegSource",param);
return sendPost("delFFmpegSource",param, null);
}
public JSONObject getMediaServerConfig(){
return sendPost("getServerConfig",null);
return sendPost("getServerConfig",null, null);
}
public JSONObject setServerConfig(Map<String, Object> param){
return sendPost("setServerConfig",param);
return sendPost("setServerConfig",param, null);
}
public JSONObject openRtpServer(Map<String, Object> param){
return sendPost("openRtpServer",param);
return sendPost("openRtpServer",param, null);
}
public JSONObject closeRtpServer(Map<String, Object> param) {
return sendPost("closeRtpServer",param);
return sendPost("closeRtpServer",param, null);
}
public JSONObject startSendRtp(Map<String, Object> param) {
return sendPost("startSendRtp",param);
return sendPost("startSendRtp",param, null);
}
public JSONObject stopSendRtp(Map<String, Object> param) {
return sendPost("stopSendRtp",param);
return sendPost("stopSendRtp",param, null);
}
public JSONObject addStreamProxy(String app, String stream, String url, boolean enable_hls, boolean enable_mp4, String rtp_type) {
@@ -141,7 +180,7 @@ public class ZLMRESTfulUtils {
param.put("enable_hls", enable_hls?1:0);
param.put("enable_mp4", enable_mp4?1:0);
param.put("rtp_type", rtp_type);
return sendPost("addStreamProxy",param);
return sendPost("addStreamProxy",param, null);
}
public JSONObject closeStreams(String app, String stream) {
@@ -150,6 +189,6 @@ public class ZLMRESTfulUtils {
param.put("app", app);
param.put("stream", stream);
param.put("force", 1);
return sendPost("close_streams",param);
return sendPost("close_streams",param, null);
}
}

View File

@@ -8,7 +8,7 @@ import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
//import com.genersoft.iot.vmp.storager.IVideoManagerStorager;
import com.genersoft.iot.vmp.storager.IVideoManagerStorager;
import com.genersoft.iot.vmp.vmanager.service.IStreamProxyService;
import com.genersoft.iot.vmp.service.IStreamProxyService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -75,9 +75,8 @@ public class ZLMRunner implements CommandLineRunner {
@Override
public void run(String... strings) throws Exception {
JSONObject subscribeKey = new JSONObject();
// 订阅 zlm启动事件
hookSubscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_server_started,subscribeKey,(response)->{
hookSubscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_server_started,null,(response)->{
MediaServerConfig mediaServerConfig = JSONObject.toJavaObject(response, MediaServerConfig.class);
zLmRunning(mediaServerConfig);
});
@@ -155,7 +154,7 @@ public class ZLMRunner implements CommandLineRunner {
logger.info("zlm接入成功...");
if (autoConfig) saveZLMConfig();
MediaServerConfig mediaInfo = redisCatchStorage.getMediaInfo();
if (System.currentTimeMillis() - mediaInfo.getUpdateTime() < 50){
if (mediaInfo != null && System.currentTimeMillis() - mediaInfo.getUpdateTime() < 50){
logger.info("zlm刚刚更新忽略这次更新");
return;
}

View File

@@ -0,0 +1,206 @@
package com.genersoft.iot.vmp.media.zlm.dto;
import com.genersoft.iot.vmp.gb28181.bean.GbStream;
import org.jetbrains.annotations.NotNull;
import java.util.List;
public class StreamPushItem extends GbStream implements Comparable<StreamPushItem>{
/**
* 应用名
*/
private String app;
/**
* 流id
*/
private String stream;
/**
* 观看总人数包括hls/rtsp/rtmp/http-flv/ws-flv
*/
private String totalReaderCount;
/**
* 协议 包括hls/rtsp/rtmp/http-flv/ws-flv
*/
private List<MediaSchema> schemas;
/**
* 产生源类型,
* unknown = 0,
* rtmp_push=1,
* rtsp_push=2,
* rtp_push=3,
* pull=4,
* ffmpeg_pull=5,
* mp4_vod=6,
* device_chn=7
*/
private int originType;
/**
* 客户端和服务器网络信息可能为null类型
*/
private MediaItem.OriginSock originSock;
/**
* 产生源类型的字符串描述
*/
private String originTypeStr;
/**
* 产生源的url
*/
private String originUrl;
/**
* GMT unix系统时间戳单位秒
*/
private Long createStamp;
/**
* 存活时间,单位秒
*/
private Long aliveSecond;
/**
* 音视频轨道
*/
private List<MediaItem.MediaTrack> tracks;
/**
* 音视频轨道
*/
private String vhost;
public String getVhost() {
return vhost;
}
public void setVhost(String vhost) {
this.vhost = vhost;
}
@Override
public int compareTo(@NotNull StreamPushItem streamPushItem) {
return new Long(this.createStamp - streamPushItem.getCreateStamp().intValue()).intValue();
}
public static class MediaSchema {
private String schema;
private Long bytesSpeed;
public String getSchema() {
return schema;
}
public void setSchema(String schema) {
this.schema = schema;
}
public Long getBytesSpeed() {
return bytesSpeed;
}
public void setBytesSpeed(Long bytesSpeed) {
this.bytesSpeed = bytesSpeed;
}
}
public String getApp() {
return app;
}
public void setApp(String app) {
this.app = app;
}
public String getStream() {
return stream;
}
public void setStream(String stream) {
this.stream = stream;
}
public String getTotalReaderCount() {
return totalReaderCount;
}
public void setTotalReaderCount(String totalReaderCount) {
this.totalReaderCount = totalReaderCount;
}
public List<MediaSchema> getSchemas() {
return schemas;
}
public void setSchemas(List<MediaSchema> schemas) {
this.schemas = schemas;
}
public int getOriginType() {
return originType;
}
public void setOriginType(int originType) {
this.originType = originType;
}
public MediaItem.OriginSock getOriginSock() {
return originSock;
}
public void setOriginSock(MediaItem.OriginSock originSock) {
this.originSock = originSock;
}
public String getOriginTypeStr() {
return originTypeStr;
}
public void setOriginTypeStr(String originTypeStr) {
this.originTypeStr = originTypeStr;
}
public String getOriginUrl() {
return originUrl;
}
public void setOriginUrl(String originUrl) {
this.originUrl = originUrl;
}
public Long getCreateStamp() {
return createStamp;
}
public void setCreateStamp(Long createStamp) {
this.createStamp = createStamp;
}
public Long getAliveSecond() {
return aliveSecond;
}
public void setAliveSecond(Long aliveSecond) {
this.aliveSecond = aliveSecond;
}
public List<MediaItem.MediaTrack> getTracks() {
return tracks;
}
public void setTracks(List<MediaItem.MediaTrack> tracks) {
this.tracks = tracks;
}
}