Merge branch 'wvp-28181-2.0'

# Conflicts:
#	sql/update.sql
#	src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
#	src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java
#	src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java
#	src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java
#	src/main/java/com/genersoft/iot/vmp/service/impl/MediaServiceImpl.java
#	src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java
This commit is contained in:
648540858
2022-08-09 14:41:21 +08:00
204 changed files with 4646 additions and 1896 deletions

View File

@@ -136,4 +136,12 @@ public class AssistRESTfulUtils {
return sendGet(mediaServerItem, "api/record/file/duration",param, callback);
}
public JSONObject addStreamCallInfo(MediaServerItem mediaServerItem, String app, String stream, String callId, RequestCallback callback){
Map<String, Object> param = new HashMap<>();
param.put("app",app);
param.put("stream",stream);
param.put("callId",callId);
return sendGet(mediaServerItem, "api/record/addStreamCallInfo",param, callback);
}
}

View File

@@ -87,6 +87,9 @@ public class ZLMHttpHookListener {
@Autowired
private VideoStreamSessionManager sessionManager;
@Autowired
private AssistRESTfulUtils assistRESTfulUtils;
/**
* 服务器定时上报时间上报间隔可配置默认10s上报一次
*
@@ -99,12 +102,13 @@ public class ZLMHttpHookListener {
logger.debug("[ ZLM HOOK ] on_server_keepalive API调用参数" + json.toString());
}
String mediaServerId = json.getString("mediaServerId");
List<ZLMHttpHookSubscribe.Event> subscribes = this.subscribe.getSubscribes(ZLMHttpHookSubscribe.HookType.on_server_keepalive);
List<ZLMHttpHookSubscribe.Event> subscribes = this.subscribe.getSubscribes(HookType.on_server_keepalive);
if (subscribes != null && subscribes.size() > 0) {
for (ZLMHttpHookSubscribe.Event subscribe : subscribes) {
subscribe.response(null, json);
}
}
mediaServerService.updateMediaServerKeepalive(mediaServerId, json.getJSONObject("data"));
JSONObject ret = new JSONObject();
ret.put("code", 0);
@@ -164,7 +168,7 @@ public class ZLMHttpHookListener {
logger.debug("[ ZLM HOOK ]on_play API调用参数" + JSON.toJSONString(param));
}
String mediaServerId = param.getMediaServerId();
ZLMHttpHookSubscribe.Event subscribe = this.subscribe.getSubscribe(ZLMHttpHookSubscribe.HookType.on_play, json);
ZLMHttpHookSubscribe.Event subscribe = this.subscribe.sendNotify(HookType.on_play, json);
if (subscribe != null ) {
MediaServerItem mediaInfo = mediaServerService.getOne(mediaServerId);
if (mediaInfo != null) {
@@ -200,7 +204,9 @@ public class ZLMHttpHookListener {
logger.info("[ ZLM HOOK ]on_publish API调用参数" + json.toString());
JSONObject ret = new JSONObject();
if (!"rtp".equals(param.getApp()) && !"broadcast".equals(param.getApp())) {
String mediaServerId = json.getString("mediaServerId");
MediaServerItem mediaInfo = mediaServerService.getOne(mediaServerId);
if (!"rtp".equals(param.getApp())) {
// 推流鉴权
if (param.getParams() == null) {
logger.info("推流鉴权失败: 缺少不要参数sign=md5(user表的pushKey)");
@@ -231,6 +237,12 @@ public class ZLMHttpHookListener {
streamAuthorityInfo.setSign(sign);
// 鉴权通过
redisCatchStorage.updateStreamAuthorityInfo(param.getApp(), param.getStream(), streamAuthorityInfo);
// 通知assist新的callId
if (mediaInfo != null) {
assistRESTfulUtils.addStreamCallInfo(mediaInfo, param.getApp(), param.getStream(), callId, null);
}
}else {
zlmMediaListManager.sendStreamEvent(param.getApp(),param.getStream(), param.getMediaServerId());
}
ret.put("code", 0);
@@ -240,10 +252,9 @@ public class ZLMHttpHookListener {
ret.put("enable_audio", true);
}
String mediaServerId = json.getString("mediaServerId");
ZLMHttpHookSubscribe.Event subscribe = this.subscribe.getSubscribe(ZLMHttpHookSubscribe.HookType.on_publish, json);
ZLMHttpHookSubscribe.Event subscribe = this.subscribe.sendNotify(HookType.on_publish, json);
if (subscribe != null) {
MediaServerItem mediaInfo = mediaServerService.getOne(mediaServerId);
if (mediaInfo != null) {
subscribe.response(mediaInfo, json);
}else {
@@ -270,10 +281,12 @@ public class ZLMHttpHookListener {
ret.put("mp4_max_second", 10);
ret.put("enable_mp4", true);
ret.put("enable_audio", true);
}
}
return new ResponseEntity<String>(ret.toString(), HttpStatus.OK);
}
@@ -364,7 +377,7 @@ public class ZLMHttpHookListener {
logger.debug("[ ZLM HOOK ]on_shell_login API调用参数" + json.toString());
}
String mediaServerId = json.getString("mediaServerId");
ZLMHttpHookSubscribe.Event subscribe = this.subscribe.getSubscribe(ZLMHttpHookSubscribe.HookType.on_shell_login, json);
ZLMHttpHookSubscribe.Event subscribe = this.subscribe.sendNotify(HookType.on_shell_login, json);
if (subscribe != null ) {
MediaServerItem mediaInfo = mediaServerService.getOne(mediaServerId);
if (mediaInfo != null) {
@@ -390,7 +403,7 @@ public class ZLMHttpHookListener {
logger.info("[ ZLM HOOK ]on_stream_changed API调用参数" + JSONObject.toJSONString(item));
String mediaServerId = item.getMediaServerId();
JSONObject json = (JSONObject) JSON.toJSON(item);
ZLMHttpHookSubscribe.Event subscribe = this.subscribe.getSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, json);
ZLMHttpHookSubscribe.Event subscribe = this.subscribe.sendNotify(HookType.on_stream_changed, json);
if (subscribe != null ) {
MediaServerItem mediaInfo = mediaServerService.getOne(mediaServerId);
if (mediaInfo != null) {
@@ -438,7 +451,6 @@ public class ZLMHttpHookListener {
redisCatchStorage.stopPlayback(streamInfo.getDeviceID(), streamInfo.getChannelId(),
streamInfo.getStream(), null);
}
}
}else {
if (!"rtp".equals(app)){
@@ -451,7 +463,6 @@ public class ZLMHttpHookListener {
StreamInfo streamInfoByAppAndStream = mediaService.getStreamInfoByAppAndStream(mediaServerItem,
app, stream, tracks, streamAuthorityInfo.getCallId());
item.setStreamInfo(streamInfoByAppAndStream);
redisCatchStorage.addStream(mediaServerItem, type, app, stream, item);
if (item.getOriginType() == OriginType.RTSP_PUSH.ordinal()
|| item.getOriginType() == OriginType.RTMP_PUSH.ordinal()
@@ -459,20 +470,6 @@ public class ZLMHttpHookListener {
item.setSeverId(userSetting.getServerId());
zlmMediaListManager.addPush(item);
}
// List<GbStream> gbStreams = new ArrayList<>();
// if (streamPushItem == null || streamPushItem.getGbId() == null) {
// GbStream gbStream = storager.getGbStream(app, streamId);
// gbStreams.add(gbStream);
// }else {
// if (streamPushItem.getGbId() != null) {
// gbStreams.add(streamPushItem);
// }
// }
// if (gbStreams.size() > 0) {
// eventPublisher.catalogEventPublishForStream(null, gbStreams, CatalogEvent.ON);
// }
}else {
// 兼容流注销时类型从redis记录获取
MediaItem mediaItem = redisCatchStorage.getStreamInfo(app, stream, mediaServerId);
@@ -616,16 +613,21 @@ public class ZLMHttpHookListener {
}
String remoteAddr = request.getRemoteAddr();
jsonObject.put("ip", remoteAddr);
List<ZLMHttpHookSubscribe.Event> subscribes = this.subscribe.getSubscribes(ZLMHttpHookSubscribe.HookType.on_server_started);
List<ZLMHttpHookSubscribe.Event> subscribes = this.subscribe.getSubscribes(HookType.on_server_started);
if (subscribes != null && subscribes.size() > 0) {
for (ZLMHttpHookSubscribe.Event subscribe : subscribes) {
subscribe.response(null, jsonObject);
}
}
ZLMServerConfig zlmServerConfig = JSONObject.toJavaObject(jsonObject, ZLMServerConfig.class);
if (zlmServerConfig !=null ) {
mediaServerService.zlmServerOnline(zlmServerConfig);
}
JSONObject ret = new JSONObject();
ret.put("code", 0);
ret.put("msg", "success");
return new ResponseEntity<String>(ret.toString(),HttpStatus.OK);
return new ResponseEntity<>(ret.toString(),HttpStatus.OK);
}
private Map<String, String> urlParamToMap(String params) {

View File

@@ -1,12 +1,16 @@
package com.genersoft.iot.vmp.media.zlm;
import com.alibaba.fastjson.JSONObject;
import com.genersoft.iot.vmp.media.zlm.dto.HookType;
import com.genersoft.iot.vmp.media.zlm.dto.IHookSubscribe;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import java.time.Instant;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
/**
* @description:针对 ZLMediaServer的hook事件订阅
@@ -16,51 +20,39 @@ import java.util.concurrent.ConcurrentHashMap;
@Component
public class ZLMHttpHookSubscribe {
public enum HookType{
on_flow_report,
on_http_access,
on_play,
on_publish,
on_record_mp4,
on_rtsp_auth,
on_rtsp_realm,
on_shell_login,
on_stream_changed,
on_stream_none_reader,
on_stream_not_found,
on_server_started,
on_server_keepalive
}
@FunctionalInterface
public interface Event{
void response(MediaServerItem mediaServerItem, JSONObject response);
}
private Map<HookType, Map<JSONObject, ZLMHttpHookSubscribe.Event>> allSubscribes = new ConcurrentHashMap<>();
private Map<HookType, Map<IHookSubscribe, ZLMHttpHookSubscribe.Event>> allSubscribes = new ConcurrentHashMap<>();
public void addSubscribe(HookType type, JSONObject hookResponse, ZLMHttpHookSubscribe.Event event) {
allSubscribes.computeIfAbsent(type, k -> new ConcurrentHashMap<>()).put(hookResponse, event);
public void addSubscribe(IHookSubscribe hookSubscribe, ZLMHttpHookSubscribe.Event event) {
if (hookSubscribe.getExpires() == null) {
// 默认5分钟过期
Instant expiresInstant = Instant.now().plusSeconds(TimeUnit.MINUTES.toSeconds(5));
hookSubscribe.setExpires(expiresInstant);
}
allSubscribes.computeIfAbsent(hookSubscribe.getHookType(), k -> new ConcurrentHashMap<>()).put(hookSubscribe, event);
}
public ZLMHttpHookSubscribe.Event getSubscribe(HookType type, JSONObject hookResponse) {
public ZLMHttpHookSubscribe.Event sendNotify(HookType type, JSONObject hookResponse) {
ZLMHttpHookSubscribe.Event event= null;
Map<JSONObject, Event> eventMap = allSubscribes.get(type);
Map<IHookSubscribe, Event> eventMap = allSubscribes.get(type);
if (eventMap == null) {
return null;
}
for (JSONObject key : eventMap.keySet()) {
for (IHookSubscribe key : eventMap.keySet()) {
Boolean result = null;
for (String s : key.keySet()) {
for (String s : key.getContent().keySet()) {
if (result == null) {
result = key.getString(s).equals(hookResponse.getString(s));
result = key.getContent().getString(s).equals(hookResponse.getString(s));
}else {
if (key.getString(s) == null) {
if (key.getContent().getString(s) == null) {
continue;
}
result = result && key.getString(s).equals(hookResponse.getString(s));
result = result && key.getContent().getString(s).equals(hookResponse.getString(s));
}
}
if (null != result && result) {
event = eventMap.get(key);
@@ -69,26 +61,30 @@ public class ZLMHttpHookSubscribe {
return event;
}
public void removeSubscribe(HookType type, JSONObject hookResponse) {
Map<JSONObject, Event> eventMap = allSubscribes.get(type);
public void removeSubscribe(IHookSubscribe hookSubscribe) {
Map<IHookSubscribe, Event> eventMap = allSubscribes.get(hookSubscribe.getHookType());
if (eventMap == null) {
return;
}
Set<Map.Entry<JSONObject, Event>> entries = eventMap.entrySet();
Set<Map.Entry<IHookSubscribe, Event>> entries = eventMap.entrySet();
if (entries.size() > 0) {
List<Map.Entry<JSONObject, ZLMHttpHookSubscribe.Event>> entriesToRemove = new ArrayList<>();
for (Map.Entry<JSONObject, ZLMHttpHookSubscribe.Event> entry : entries) {
JSONObject key = entry.getKey();
List<Map.Entry<IHookSubscribe, ZLMHttpHookSubscribe.Event>> entriesToRemove = new ArrayList<>();
for (Map.Entry<IHookSubscribe, ZLMHttpHookSubscribe.Event> entry : entries) {
JSONObject content = entry.getKey().getContent();
if (content == null || content.size() == 0) {
entriesToRemove.add(entry);
continue;
}
Boolean result = null;
for (String s : key.keySet()) {
for (String s : content.keySet()) {
if (result == null) {
result = key.getString(s).equals(hookResponse.getString(s));
result = content.getString(s).equals(hookSubscribe.getContent().getString(s));
}else {
if (key.getString(s) == null) {
if (content.getString(s) == null) {
continue;
}
result = result && key.getString(s).equals(hookResponse.getString(s));
result = result && content.getString(s).equals(hookSubscribe.getContent().getString(s));
}
}
if (null != result && result){
@@ -97,7 +93,7 @@ public class ZLMHttpHookSubscribe {
}
if (!CollectionUtils.isEmpty(entriesToRemove)) {
for (Map.Entry<JSONObject, ZLMHttpHookSubscribe.Event> entry : entriesToRemove) {
for (Map.Entry<IHookSubscribe, ZLMHttpHookSubscribe.Event> entry : entriesToRemove) {
entries.remove(entry);
}
}
@@ -111,17 +107,25 @@ public class ZLMHttpHookSubscribe {
* @return
*/
public List<ZLMHttpHookSubscribe.Event> getSubscribes(HookType type) {
// ZLMHttpHookSubscribe.Event event= null;
Map<JSONObject, Event> eventMap = allSubscribes.get(type);
Map<IHookSubscribe, Event> eventMap = allSubscribes.get(type);
if (eventMap == null) {
return null;
}
List<ZLMHttpHookSubscribe.Event> result = new ArrayList<>();
for (JSONObject key : eventMap.keySet()) {
for (IHookSubscribe key : eventMap.keySet()) {
result.add(eventMap.get(key));
}
return result;
}
public List<IHookSubscribe> getAll(){
ArrayList<IHookSubscribe> result = new ArrayList<>();
Collection<Map<IHookSubscribe, Event>> values = allSubscribes.values();
for (Map<IHookSubscribe, Event> value : values) {
result.addAll(value.keySet());
}
return result;
}
}

View File

@@ -4,6 +4,7 @@ import com.alibaba.fastjson.JSONObject;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.gb28181.bean.GbStream;
import com.genersoft.iot.vmp.media.zlm.dto.*;
import com.genersoft.iot.vmp.service.IMediaServerService;
import com.genersoft.iot.vmp.service.IStreamProxyService;
import com.genersoft.iot.vmp.service.IStreamPushService;
import com.genersoft.iot.vmp.service.bean.ThirdPartyGB;
@@ -63,125 +64,49 @@ public class ZLMMediaListManager {
@Autowired
private UserSetting userSetting;
private Map<String, ChannelOnlineEvent> channelOnlineEvents = new ConcurrentHashMap<>();
@Autowired
private ZLMRTPServerFactory zlmrtpServerFactory;
@Autowired
private IMediaServerService mediaServerService;
public void updateMediaList(MediaServerItem mediaServerItem) {
storager.clearMediaList();
// 使用异步的当时更新媒体流列表
zlmresTfulUtils.getMediaList(mediaServerItem, (mediaList ->{
if (mediaList == null) {
return;
}
String dataStr = mediaList.getString("data");
Integer code = mediaList.getInteger("code");
Map<String, StreamPushItem> result = new HashMap<>();
List<StreamPushItem> streamPushItems = null;
// 获取所有的国标关联
// List<GbStream> gbStreams = gbStreamMapper.selectAllByMediaServerId(mediaServerItem.getId());
if (code == 0 ) {
if (dataStr != null) {
streamPushItems = streamPushService.handleJSON(dataStr, mediaServerItem);
}
}else {
logger.warn("更新视频流失败错误code " + code);
}
if (streamPushItems != null) {
storager.updateMediaList(streamPushItems);
for (StreamPushItem streamPushItem : streamPushItems) {
JSONObject jsonObject = new JSONObject();
jsonObject.put("app", streamPushItem.getApp());
jsonObject.put("stream", streamPushItem.getStream());
jsonObject.put("mediaServerId", mediaServerItem.getId());
subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_play,jsonObject,
(MediaServerItem mediaServerItemInuse, JSONObject response)->{
updateMedia(mediaServerItem, response.getString("app"), response.getString("stream"));
}
);
}
}
}));
}
public void addMedia(MediaServerItem mediaServerItem, String app, String streamId) {
//使用异步更新推流
updateMedia(mediaServerItem, app, streamId);
}
private Map<String, ChannelOnlineEvent> channelOnPublishEvents = new ConcurrentHashMap<>();
public StreamPushItem addPush(MediaItem mediaItem) {
// 查找此直播流是否存在redis预设gbId
StreamPushItem transform = streamPushService.transform(mediaItem);
StreamPushItem pushInDb = streamPushService.getPush(mediaItem.getApp(), mediaItem.getStream());
transform.setPushIng(mediaItem.isRegist());
transform.setUpdateTime(DateUtil.getNow());
transform.setPushTime(DateUtil.getNow());
transform.setSelf(userSetting.getServerId().equals(mediaItem.getSeverId()));
if (pushInDb == null) {
transform.setCreateTime(DateUtil.getNow());
streamPushMapper.add(transform);
}else {
streamPushMapper.update(transform);
// if (!StringUtils.isEmpty(pushInDb.getGbId())) {
// List<GbStream> gbStreamList = gbStreamMapper.selectByGBId(transform.getGbId());
// if (gbStreamList != null && gbStreamList.size() == 1) {
// transform.setGbStreamId(gbStreamList.get(0).getGbStreamId());
// transform.setPlatformId(gbStreamList.get(0).getPlatformId());
// transform.setCatalogId(gbStreamList.get(0).getCatalogId());
// transform.setGbId(gbStreamList.get(0).getGbId());
// gbStreamMapper.update(transform);
// streamPushMapper.del(gbStreamList.get(0).getApp(), gbStreamList.get(0).getStream());
// }else {
// transform.setCreateTime(DateUtil.getNow());
// transform.setUpdateTime(DateUtil.getNow());
// gbStreamMapper.add(transform);
// }
// 通知通道上线
// if (transform != null) {
// if (channelOnlineEvents.get(transform.getGbId()) != null) {
// channelOnlineEvents.get(transform.getGbId()).run(transform.getApp(), transform.getStream(), transform.getServerId());
// channelOnlineEvents.remove(transform.getGbId());
// }
// }
// }
gbStreamMapper.updateMediaServer(mediaItem.getApp(), mediaItem.getStream(), mediaItem.getMediaServerId());
}
if (transform != null) {
if (getChannelOnlineEventLister(transform.getApp(), transform.getStream()) != null) {
getChannelOnlineEventLister(transform.getApp(), transform.getStream()).run(transform.getApp(), transform.getStream(), transform.getServerId());
removedChannelOnlineEventLister(transform.getApp(), transform.getStream());
}
}
return transform;
}
public void updateMedia(MediaServerItem mediaServerItem, String app, String streamId) {
//使用异步更新推流
zlmresTfulUtils.getMediaList(mediaServerItem, app, streamId, "rtmp", json->{
if (json == null) {
return;
public void sendStreamEvent(String app, String stream, String mediaServerId) {
MediaServerItem mediaServerItem = mediaServerService.getOne(mediaServerId);
// 查看推流状态
if (zlmrtpServerFactory.isStreamReady(mediaServerItem, app, stream)) {
if (getChannelOnlineEventLister(app, stream) != null) {
getChannelOnlineEventLister(app, stream).run(app, stream, mediaServerId);
removedChannelOnlineEventLister(app, stream);
}
String dataStr = json.getString("data");
Integer code = json.getInteger("code");
Map<String, StreamPushItem> result = new HashMap<>();
List<StreamPushItem> streamPushItems = null;
if (code == 0 ) {
if (dataStr != null) {
streamPushItems = streamPushService.handleJSON(dataStr, mediaServerItem);
}
}else {
logger.warn("更新视频流失败错误code " + code);
}
if (streamPushItems != null && streamPushItems.size() == 1) {
storager.updateMedia(streamPushItems.get(0));
}
});
}
}
public int removeMedia(String app, String streamId) {
// 查找是否关联了国标, 关联了不删除, 置为离线
GbStream gbStream = gbStreamMapper.selectOne(app, streamId);
@@ -189,48 +114,21 @@ public class ZLMMediaListManager {
if (gbStream == null) {
result = storager.removeMedia(app, streamId);
}else {
// TODO 暂不设置为离线
result =storager.mediaOffline(app, streamId);
}
return result;
}
public void addChannelOnlineEventLister(String key, ChannelOnlineEvent callback) {
this.channelOnlineEvents.put(key,callback);
public void addChannelOnlineEventLister(String app, String stream, ChannelOnlineEvent callback) {
this.channelOnPublishEvents.put(app + "_" + stream, callback);
}
public void removedChannelOnlineEventLister(String key) {
this.channelOnlineEvents.remove(key);
public void removedChannelOnlineEventLister(String app, String stream) {
this.channelOnPublishEvents.remove(app + "_" + stream);
}
public ChannelOnlineEvent getChannelOnlineEventLister(String app, String stream) {
return this.channelOnPublishEvents.get(app + "_" + stream);
}
// public void clearAllSessions() {
// logger.info("清空所有国标相关的session");
// JSONObject allSessionJSON = zlmresTfulUtils.getAllSession();
// ZLMServerConfig mediaInfo = redisCatchStorage.getMediaInfo();
// HashSet<String> allLocalPorts = new HashSet();
// if (allSessionJSON.getInteger("code") == 0) {
// JSONArray data = allSessionJSON.getJSONArray("data");
// if (data.size() > 0) {
// for (int i = 0; i < data.size(); i++) {
// JSONObject sessionJOSN = data.getJSONObject(i);
// Integer local_port = sessionJOSN.getInteger("local_port");
// if (!local_port.equals(Integer.valueOf(mediaInfo.getHttpPort())) &&
// !local_port.equals(Integer.valueOf(mediaInfo.getHttpSSLport())) &&
// !local_port.equals(Integer.valueOf(mediaInfo.getRtmpPort())) &&
// !local_port.equals(Integer.valueOf(mediaInfo.getRtspPort())) &&
// !local_port.equals(Integer.valueOf(mediaInfo.getRtspSSlport())) &&
// !local_port.equals(Integer.valueOf(mediaInfo.getHookOnFlowReport()))){
// allLocalPorts.add(sessionJOSN.getInteger("local_port") + "");
// }
// }
// }
// }
// if (allLocalPorts.size() > 0) {
// List<String> result = new ArrayList<>(allLocalPorts);
// String localPortSStr = String.join(",", result);
// zlmresTfulUtils.kickSessions(localPortSStr);
// }
// }
}

View File

@@ -87,7 +87,7 @@ public class ZLMRTPServerFactory {
return result;
}
public int createRTPServer(MediaServerItem mediaServerItem, String streamId, int ssrc) {
public int createRTPServer(MediaServerItem mediaServerItem, String streamId, int ssrc, Integer port) {
int result = -1;
// 查询此rtp server 是否已经存在
JSONObject rtpInfo = zlmresTfulUtils.getRtpInfo(mediaServerItem, streamId);
@@ -105,7 +105,11 @@ public class ZLMRTPServerFactory {
param.put("enable_tcp", 1);
param.put("stream_id", streamId);
// 推流端口设置0则使用随机端口
param.put("port", 0);
if (port == null) {
param.put("port", 0);
}else {
param.put("port", port);
}
param.put("ssrc", ssrc);
JSONObject openRtpServerResultJson = zlmresTfulUtils.openRtpServer(mediaServerItem, param);
@@ -280,8 +284,10 @@ public class ZLMRTPServerFactory {
* 查询待转推的流是否就绪
*/
public Boolean isStreamReady(MediaServerItem mediaServerItem, String app, String streamId) {
JSONObject mediaInfo = zlmresTfulUtils.getMediaInfo(mediaServerItem, app, "rtsp", streamId);
return (mediaInfo.getInteger("code") == 0 && mediaInfo.getBoolean("online") != null && mediaInfo.getBoolean("online"));
JSONObject mediaInfo = zlmresTfulUtils.getMediaList(mediaServerItem, app, streamId);
return (mediaInfo.getInteger("code") == 0
&& mediaInfo.getJSONArray("data") != null
&& mediaInfo.getJSONArray("data").size() > 0);
}
/**

View File

@@ -6,22 +6,22 @@ import com.alibaba.fastjson.JSONObject;
import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.conf.MediaConfig;
import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForServerStarted;
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.service.IMediaServerService;
import com.genersoft.iot.vmp.service.IStreamProxyService;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.CommandLineRunner;
import org.springframework.core.annotation.Order;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import java.time.Instant;
import java.util.*;
import java.util.concurrent.TimeUnit;
@Component
@Order(value=1)
@@ -37,18 +37,12 @@ public class ZLMRunner implements CommandLineRunner {
@Autowired
private ZLMHttpHookSubscribe hookSubscribe;
@Autowired
private IStreamProxyService streamProxyService;
@Autowired
private EventPublisher publisher;
@Autowired
private IMediaServerService mediaServerService;
@Autowired
private IRedisCatchStorage redisCatchStorage;
@Autowired
private MediaConfig mediaConfig;
@@ -67,26 +61,24 @@ public class ZLMRunner implements CommandLineRunner {
mediaServerService.updateToDatabase(mediaSerItem);
}
mediaServerService.syncCatchFromDatabase();
HookSubscribeForServerStarted hookSubscribeForServerStarted = HookSubscribeFactory.on_server_started();
// Instant expiresInstant = Instant.now().plusSeconds(TimeUnit.SECONDS.toSeconds(60));
// hookSubscribeForStreamChange.setExpires(expiresInstant);
// 订阅 zlm启动事件, 新的zlm也会从这里进入系统
hookSubscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_server_started,new JSONObject(),
hookSubscribe.addSubscribe(hookSubscribeForServerStarted,
(MediaServerItem mediaServerItem, JSONObject response)->{
ZLMServerConfig zlmServerConfig = JSONObject.toJavaObject(response, ZLMServerConfig.class);
if (zlmServerConfig !=null ) {
if (startGetMedia != null) {
startGetMedia.remove(zlmServerConfig.getGeneralMediaServerId());
if (startGetMedia.size() == 0) {
hookSubscribe.removeSubscribe(HookSubscribeFactory.on_server_started());
}
}
mediaServerService.zlmServerOnline(zlmServerConfig);
}
});
// 订阅 zlm保活事件, 当zlm离线时做业务的处理
hookSubscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_server_keepalive,new JSONObject(),
(MediaServerItem mediaServerItem, JSONObject response)->{
String mediaServerId = response.getString("mediaServerId");
if (mediaServerId !=null ) {
mediaServerService.updateMediaServerKeepalive(mediaServerId, response.getJSONObject("data"));
}
});
// 获取zlm信息
logger.info("[zlm] 等待默认zlm中...");
@@ -125,6 +117,9 @@ public class ZLMRunner implements CommandLineRunner {
zlmServerConfigFirst.setIp(mediaServerItem.getIp());
zlmServerConfigFirst.setHttpPort(mediaServerItem.getHttpPort());
startGetMedia.remove(mediaServerItem.getId());
if (startGetMedia.size() == 0) {
hookSubscribe.removeSubscribe(HookSubscribeFactory.on_server_started());
}
mediaServerService.zlmServerOnline(zlmServerConfigFirst);
}else {
logger.info("[ {} ]-[ {}:{} ]主动连接失败, 清理相关资源, 开始尝试重试连接",
@@ -139,6 +134,9 @@ public class ZLMRunner implements CommandLineRunner {
zlmServerConfig.setIp(mediaServerItem.getIp());
zlmServerConfig.setHttpPort(mediaServerItem.getHttpPort());
startGetMedia.remove(mediaServerItem.getId());
if (startGetMedia.size() == 0) {
hookSubscribe.removeSubscribe(HookSubscribeFactory.on_server_started());
}
mediaServerService.zlmServerOnline(zlmServerConfig);
}
}, 2000);

View File

@@ -0,0 +1,33 @@
package com.genersoft.iot.vmp.media.zlm.dto;
import com.alibaba.fastjson.JSONObject;
/**
* hook 订阅工厂
* @author lin
*/
public class HookSubscribeFactory {
public static HookSubscribeForStreamChange on_stream_changed(String app, String stream, boolean regist, String scheam, String mediaServerId) {
HookSubscribeForStreamChange hookSubscribe = new HookSubscribeForStreamChange();
JSONObject subscribeKey = new com.alibaba.fastjson.JSONObject();
subscribeKey.put("app", app);
subscribeKey.put("stream", stream);
subscribeKey.put("regist", regist);
if (scheam != null) {
subscribeKey.put("schema", scheam);
}
subscribeKey.put("mediaServerId", mediaServerId);
hookSubscribe.setContent(subscribeKey);
return hookSubscribe;
}
public static HookSubscribeForServerStarted on_server_started() {
HookSubscribeForServerStarted hookSubscribe = new HookSubscribeForServerStarted();
hookSubscribe.setContent(new JSONObject());
return hookSubscribe;
}
}

View File

@@ -0,0 +1,44 @@
package com.genersoft.iot.vmp.media.zlm.dto;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.annotation.JSONField;
import java.time.Instant;
/**
* hook订阅-流变化
* @author lin
*/
public class HookSubscribeForServerStarted implements IHookSubscribe{
private HookType hookType = HookType.on_server_started;
private JSONObject content;
@JSONField(format="yyyy-MM-dd HH:mm:ss")
private Instant expires;
@Override
public HookType getHookType() {
return hookType;
}
@Override
public JSONObject getContent() {
return content;
}
public void setContent(JSONObject content) {
this.content = content;
}
@Override
public Instant getExpires() {
return expires;
}
@Override
public void setExpires(Instant expires) {
this.expires = expires;
}
}

View File

@@ -0,0 +1,43 @@
package com.genersoft.iot.vmp.media.zlm.dto;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.annotation.JSONField;
import java.time.Instant;
/**
* hook订阅-流变化
* @author lin
*/
public class HookSubscribeForStreamChange implements IHookSubscribe{
private HookType hookType = HookType.on_stream_changed;
private JSONObject content;
private Instant expires;
@Override
public HookType getHookType() {
return hookType;
}
@Override
public JSONObject getContent() {
return content;
}
public void setContent(JSONObject content) {
this.content = content;
}
@Override
public Instant getExpires() {
return expires;
}
@Override
public void setExpires(Instant expires) {
this.expires = expires;
}
}

View File

@@ -0,0 +1,23 @@
package com.genersoft.iot.vmp.media.zlm.dto;
/**
* hook类型
* @author lin
*/
public enum HookType {
on_flow_report,
on_http_access,
on_play,
on_publish,
on_record_mp4,
on_rtsp_auth,
on_rtsp_realm,
on_shell_login,
on_stream_changed,
on_stream_none_reader,
on_stream_not_found,
on_server_started,
on_server_keepalive
}

View File

@@ -0,0 +1,36 @@
package com.genersoft.iot.vmp.media.zlm.dto;
import com.alibaba.fastjson.JSONObject;
import java.time.Instant;
/**
* zlm hook事件的参数
* @author lin
*/
public interface IHookSubscribe {
/**
* 获取hook类型
* @return hook类型
*/
HookType getHookType();
/**
* 获取hook的具体内容
* @return hook的具体内容
*/
JSONObject getContent();
/**
* 设置过期时间
* @param instant 过期时间
*/
void setExpires(Instant instant);
/**
* 获取过期时间
* @return 过期时间
*/
Instant getExpires();
}

View File

@@ -4,6 +4,9 @@ import com.genersoft.iot.vmp.common.StreamInfo;
import java.util.List;
/**
* @author lin
*/
public class MediaItem {
/**
@@ -21,6 +24,11 @@ public class MediaItem {
*/
private String stream;
/**
* 推流鉴权Id
*/
private String callId;
/**
* 观看总人数包括hls/rtsp/rtmp/http-flv/ws-flv
*/
@@ -427,4 +435,12 @@ public class MediaItem {
public void setSeverId(String severId) {
this.severId = severId;
}
public String getCallId() {
return callId;
}
public void setCallId(String callId) {
this.callId = callId;
}
}

View File

@@ -103,6 +103,18 @@ public class StreamPushItem extends GbStream implements Comparable<StreamPushIte
*/
private String createTime;
/**
* 是否正在推流
*/
private boolean pushIng;
/**
* 是否自己平台的推流
*/
private boolean self;
public String getVhost() {
return vhost;
}
@@ -277,5 +289,21 @@ public class StreamPushItem extends GbStream implements Comparable<StreamPushIte
public void setCreateTime(String createTime) {
this.createTime = createTime;
}
public boolean isPushIng() {
return pushIng;
}
public void setPushIng(boolean pushIng) {
this.pushIng = pushIng;
}
public boolean isSelf() {
return self;
}
public void setSelf(boolean self) {
this.self = self;
}
}