Merge branch 'wvp-28181-2.0'

# Conflicts:
#	src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommander.java
#	src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java
#	src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java
#	src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
#	src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/BroadcastResponseMessageHandler.java
This commit is contained in:
648540858
2022-09-24 21:04:58 +08:00
80 changed files with 1772 additions and 1459 deletions

View File

@@ -103,7 +103,7 @@ public class ZLMHttpHookListener {
@PostMapping(value = "/on_server_keepalive", produces = "application/json;charset=UTF-8")
public JSONObject onServerKeepalive(@RequestBody JSONObject json){
logger.info("[ ZLM HOOK ] on_server_keepalive API调用参数" + json.toString());
logger.info("[ ZLM HOOK ]on_server_keepalive API调用参数" + json.toString());
String mediaServerId = json.getString("mediaServerId");
List<ZlmHttpHookSubscribe.Event> subscribes = this.subscribe.getSubscribes(HookType.on_server_keepalive);
if (subscribes != null && subscribes.size() > 0) {
@@ -417,10 +417,11 @@ public class ZLMHttpHookListener {
String schema = item.getSchema();
List<MediaItem.MediaTrack> tracks = item.getTracks();
boolean regist = item.isRegist();
if (item.getOriginType() == OriginType.RTMP_PUSH.ordinal()
|| item.getOriginType() == OriginType.RTSP_PUSH.ordinal()
|| item.getOriginType() == OriginType.RTC_PUSH.ordinal()) {
if (regist) {
if (regist) {
if (item.getOriginType() == OriginType.RTMP_PUSH.ordinal()
|| item.getOriginType() == OriginType.RTSP_PUSH.ordinal()
|| item.getOriginType() == OriginType.RTC_PUSH.ordinal()) {
StreamAuthorityInfo streamAuthorityInfo = redisCatchStorage.getStreamAuthorityInfo(app, stream);
if (streamAuthorityInfo == null) {
streamAuthorityInfo = StreamAuthorityInfo.getInstanceByHook(item);
@@ -429,9 +430,9 @@ public class ZLMHttpHookListener {
streamAuthorityInfo.setOriginTypeStr(item.getOriginTypeStr());
}
redisCatchStorage.updateStreamAuthorityInfo(app, stream, streamAuthorityInfo);
}else {
redisCatchStorage.removeStreamAuthorityInfo(app, stream);
}
}else {
redisCatchStorage.removeStreamAuthorityInfo(app, stream);
}
if ("rtsp".equals(schema)){
@@ -451,15 +452,12 @@ public class ZLMHttpHookListener {
if (streamInfo!=null){
redisCatchStorage.stopPlay(streamInfo);
storager.stopPlay(streamInfo.getDeviceID(), streamInfo.getChannelId());
// 如果正在给上级推送则发送bye
}else{
streamInfo = redisCatchStorage.queryPlayback(null, null, stream, null);
if (streamInfo != null) {
redisCatchStorage.stopPlayback(streamInfo.getDeviceID(), streamInfo.getChannelId(),
streamInfo.getStream(), null);
}
// 如果正在给上级推送则发送bye
}
}else {
if (!"rtp".equals(app)){
@@ -509,6 +507,19 @@ public class ZLMHttpHookListener {
}
}
}
if (!regist) {
List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServerByStream(stream);
if (sendRtpItems.size() > 0) {
for (SendRtpItem sendRtpItem : sendRtpItems) {
if (sendRtpItem.getApp().equals(app)) {
String platformId = sendRtpItem.getPlatformId();
ParentPlatform platform = storager.queryParentPlatByServerGBId(platformId);
commanderFroPlatform.streamByeCmd(platform, sendRtpItem);
}
}
}
}
}
JSONObject ret = new JSONObject();

View File

@@ -17,6 +17,7 @@ import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.text.ParseException;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
@@ -83,7 +84,11 @@ public class ZLMMediaListManager {
}
if (transform != null) {
if (getChannelOnlineEventLister(transform.getApp(), transform.getStream()) != null) {
getChannelOnlineEventLister(transform.getApp(), transform.getStream()).run(transform.getApp(), transform.getStream(), transform.getServerId());
try {
getChannelOnlineEventLister(transform.getApp(), transform.getStream()).run(transform.getApp(), transform.getStream(), transform.getServerId());
} catch (ParseException e) {
throw new RuntimeException(e);
}
removedChannelOnlineEventLister(transform.getApp(), transform.getStream());
}
}
@@ -95,7 +100,11 @@ public class ZLMMediaListManager {
// 查看推流状态
if (zlmrtpServerFactory.isStreamReady(mediaServerItem, app, stream)) {
if (getChannelOnlineEventLister(app, stream) != null) {
getChannelOnlineEventLister(app, stream).run(app, stream, mediaServerId);
try {
getChannelOnlineEventLister(app, stream).run(app, stream, mediaServerId);
} catch (ParseException e) {
throw new RuntimeException(e);
}
removedChannelOnlineEventLister(app, stream);
}
}

View File

@@ -98,7 +98,18 @@ public class ZLMRTPServerFactory {
result = rtpInfo.getInteger("local_port");
if (result == 0) {
// 此时说明rtpServer已经创建但是流还没有推上来
// 此时重新打开rtpServer
Map<String, Object> param = new HashMap<>();
param.put("stream_id", streamId);
JSONObject jsonObject = zlmresTfulUtils.closeRtpServer(mediaServerItem, param);
if (jsonObject != null ) {
System.out.println(jsonObject);
if (jsonObject.getInteger("code") == 0) {
return createRTPServer(mediaServerItem, streamId, ssrc, port);
}else {
logger.warn("[开启rtpServer], 重启RtpServer错误");
}
}
}
return result;
}
@@ -326,12 +337,12 @@ public class ZLMRTPServerFactory {
Boolean result = false;
JSONObject jsonObject = zlmresTfulUtils.stopSendRtp(mediaServerItem, param);
if (jsonObject == null) {
logger.error("停止RTP推流失败: 请检查ZLM服务");
logger.error("[停止RTP推流] 失败: 请检查ZLM服务");
} else if (jsonObject.getInteger("code") == 0) {
result= true;
logger.info("停止RTP推流成功");
logger.info("[停止RTP推流] 成功");
} else {
logger.error("停止RTP推流失败: {}, 参数:{}",jsonObject.getString("msg"),JSONObject.toJSON(param));
logger.error("[停止RTP推流] 失败: {}, 参数:{}->\r\n{}",jsonObject.getString("msg"),JSONObject.toJSON(param), jsonObject);
}
return result;
}

View File

@@ -104,7 +104,7 @@ public class ZLMRunner implements CommandLineRunner {
}, 60 * 1000 );
}
@Async
@Async("taskExecutor")
public void connectZlmServer(MediaServerItem mediaServerItem){
String connectZlmServerTaskKey = "connect-zlm-" + mediaServerItem.getId();
ZLMServerConfig zlmServerConfigFirst = getMediaServerConfig(mediaServerItem);

View File

@@ -137,8 +137,6 @@ public class ZlmHttpHookSubscribe {
@Scheduled(cron="0 0/5 * * * ?") //每5分钟执行一次
public void execute(){
logger.info("[hook订阅] 清理");
Instant instant = Instant.now().minusMillis(TimeUnit.MINUTES.toMillis(5));
int total = 0;
for (HookType hookType : allSubscribes.keySet()) {
@@ -153,6 +151,5 @@ public class ZlmHttpHookSubscribe {
}
}
}
logger.info("[hook订阅] 清理结束,共清理{}条过期数据", total);
}
}

View File

@@ -1,9 +1,11 @@
package com.genersoft.iot.vmp.media.zlm.dto;
import java.text.ParseException;
/**
* @author lin
*/
public interface ChannelOnlineEvent {
void run(String app, String stream, String serverId);
void run(String app, String stream, String serverId) throws ParseException;
}

View File

@@ -36,7 +36,7 @@ public class ZLMStatusEventListener {
@Autowired
private IPlayService playService;
@Async
@Async("taskExecutor")
@EventListener
public void onApplicationEvent(ZLMOnlineEvent event) {
logger.info("[ZLM] 上线 ID" + event.getMediaServerId());
@@ -45,7 +45,7 @@ public class ZLMStatusEventListener {
playService.zlmServerOnline(event.getMediaServerId());
}
@Async
@Async("taskExecutor")
@EventListener
public void onApplicationEvent(ZLMOfflineEvent event) {