Merge branch 'wvp-28181-2.0'
# Conflicts: # src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java # src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java # src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java # src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java # src/main/java/com/genersoft/iot/vmp/service/impl/MediaServiceImpl.java
This commit is contained in:
@@ -50,7 +50,7 @@ public class AssistRESTfulUtils {
|
||||
if (mediaServerItem == null) {
|
||||
return null;
|
||||
}
|
||||
if (ObjectUtils.isEmpty(mediaServerItem.getRecordAssistPort())) {
|
||||
if (mediaServerItem.getRecordAssistPort() > 0) {
|
||||
logger.warn("未启用Assist服务");
|
||||
return null;
|
||||
}
|
||||
|
||||
@@ -7,12 +7,10 @@ import java.util.Map;
|
||||
import com.alibaba.fastjson.JSON;
|
||||
import com.genersoft.iot.vmp.common.StreamInfo;
|
||||
import com.genersoft.iot.vmp.conf.UserSetting;
|
||||
import com.genersoft.iot.vmp.gb28181.bean.Device;
|
||||
import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
|
||||
import com.genersoft.iot.vmp.gb28181.bean.GbStream;
|
||||
import com.genersoft.iot.vmp.gb28181.bean.SsrcTransaction;
|
||||
import com.genersoft.iot.vmp.gb28181.bean.*;
|
||||
import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
|
||||
import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
|
||||
import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform;
|
||||
import com.genersoft.iot.vmp.media.zlm.dto.*;
|
||||
import com.genersoft.iot.vmp.service.*;
|
||||
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
|
||||
@@ -20,10 +18,11 @@ import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.beans.factory.annotation.Qualifier;
|
||||
import org.springframework.http.HttpStatus;
|
||||
import org.springframework.http.ResponseEntity;
|
||||
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
|
||||
import org.springframework.util.ObjectUtils;
|
||||
import org.springframework.util.StringUtils;
|
||||
import org.springframework.web.bind.annotation.PostMapping;
|
||||
import org.springframework.web.bind.annotation.RequestBody;
|
||||
import org.springframework.web.bind.annotation.RequestMapping;
|
||||
@@ -49,6 +48,9 @@ public class ZLMHttpHookListener {
|
||||
@Autowired
|
||||
private SIPCommander cmder;
|
||||
|
||||
@Autowired
|
||||
private SIPCommanderFroPlatform commanderFroPlatform;
|
||||
|
||||
@Autowired
|
||||
private IPlayService playService;
|
||||
|
||||
@@ -77,7 +79,7 @@ public class ZLMHttpHookListener {
|
||||
private ZLMMediaListManager zlmMediaListManager;
|
||||
|
||||
@Autowired
|
||||
private ZLMHttpHookSubscribe subscribe;
|
||||
private ZlmHttpHookSubscribe subscribe;
|
||||
|
||||
@Autowired
|
||||
private UserSetting userSetting;
|
||||
@@ -91,6 +93,10 @@ public class ZLMHttpHookListener {
|
||||
@Autowired
|
||||
private AssistRESTfulUtils assistRESTfulUtils;
|
||||
|
||||
@Qualifier("taskExecutor")
|
||||
@Autowired
|
||||
private ThreadPoolTaskExecutor taskExecutor;
|
||||
|
||||
/**
|
||||
* 服务器定时上报时间,上报间隔可配置,默认10s上报一次
|
||||
*
|
||||
@@ -101,9 +107,9 @@ public class ZLMHttpHookListener {
|
||||
|
||||
logger.info("[ ZLM HOOK ] on_server_keepalive API调用,参数:" + json.toString());
|
||||
String mediaServerId = json.getString("mediaServerId");
|
||||
List<ZLMHttpHookSubscribe.Event> subscribes = this.subscribe.getSubscribes(HookType.on_server_keepalive);
|
||||
List<ZlmHttpHookSubscribe.Event> subscribes = this.subscribe.getSubscribes(HookType.on_server_keepalive);
|
||||
if (subscribes != null && subscribes.size() > 0) {
|
||||
for (ZLMHttpHookSubscribe.Event subscribe : subscribes) {
|
||||
for (ZlmHttpHookSubscribe.Event subscribe : subscribes) {
|
||||
subscribe.response(null, json);
|
||||
}
|
||||
}
|
||||
@@ -167,7 +173,7 @@ public class ZLMHttpHookListener {
|
||||
logger.debug("[ ZLM HOOK ]on_play API调用,参数:" + JSON.toJSONString(param));
|
||||
}
|
||||
String mediaServerId = param.getMediaServerId();
|
||||
ZLMHttpHookSubscribe.Event subscribe = this.subscribe.sendNotify(HookType.on_play, json);
|
||||
ZlmHttpHookSubscribe.Event subscribe = this.subscribe.sendNotify(HookType.on_play, json);
|
||||
if (subscribe != null ) {
|
||||
MediaServerItem mediaInfo = mediaServerService.getOne(mediaServerId);
|
||||
if (mediaInfo != null) {
|
||||
@@ -237,9 +243,11 @@ public class ZLMHttpHookListener {
|
||||
// 鉴权通过
|
||||
redisCatchStorage.updateStreamAuthorityInfo(param.getApp(), param.getStream(), streamAuthorityInfo);
|
||||
// 通知assist新的callId
|
||||
if (mediaInfo != null && mediaInfo.getRecordAssistPort() > 0) {
|
||||
assistRESTfulUtils.addStreamCallInfo(mediaInfo, param.getApp(), param.getStream(), callId, null);
|
||||
}
|
||||
taskExecutor.execute(()->{
|
||||
if (mediaInfo != null && mediaInfo.getRecordAssistPort() > 0) {
|
||||
assistRESTfulUtils.addStreamCallInfo(mediaInfo, param.getApp(), param.getStream(), callId, null);
|
||||
}
|
||||
});
|
||||
}else {
|
||||
zlmMediaListManager.sendStreamEvent(param.getApp(),param.getStream(), param.getMediaServerId());
|
||||
}
|
||||
@@ -252,7 +260,7 @@ public class ZLMHttpHookListener {
|
||||
}
|
||||
|
||||
|
||||
ZLMHttpHookSubscribe.Event subscribe = this.subscribe.sendNotify(HookType.on_publish, json);
|
||||
ZlmHttpHookSubscribe.Event subscribe = this.subscribe.sendNotify(HookType.on_publish, json);
|
||||
if (subscribe != null) {
|
||||
if (mediaInfo != null) {
|
||||
subscribe.response(mediaInfo, json);
|
||||
@@ -376,7 +384,7 @@ public class ZLMHttpHookListener {
|
||||
logger.debug("[ ZLM HOOK ]on_shell_login API调用,参数:" + json.toString());
|
||||
}
|
||||
String mediaServerId = json.getString("mediaServerId");
|
||||
ZLMHttpHookSubscribe.Event subscribe = this.subscribe.sendNotify(HookType.on_shell_login, json);
|
||||
ZlmHttpHookSubscribe.Event subscribe = this.subscribe.sendNotify(HookType.on_shell_login, json);
|
||||
if (subscribe != null ) {
|
||||
MediaServerItem mediaInfo = mediaServerService.getOne(mediaServerId);
|
||||
if (mediaInfo != null) {
|
||||
@@ -402,7 +410,7 @@ public class ZLMHttpHookListener {
|
||||
logger.info("[ ZLM HOOK ]on_stream_changed API调用,参数:" + JSONObject.toJSONString(item));
|
||||
String mediaServerId = item.getMediaServerId();
|
||||
JSONObject json = (JSONObject) JSON.toJSON(item);
|
||||
ZLMHttpHookSubscribe.Event subscribe = this.subscribe.sendNotify(HookType.on_stream_changed, json);
|
||||
ZlmHttpHookSubscribe.Event subscribe = this.subscribe.sendNotify(HookType.on_stream_changed, json);
|
||||
if (subscribe != null ) {
|
||||
MediaServerItem mediaInfo = mediaServerService.getOne(mediaServerId);
|
||||
if (mediaInfo != null) {
|
||||
@@ -415,19 +423,24 @@ public class ZLMHttpHookListener {
|
||||
String schema = item.getSchema();
|
||||
List<MediaItem.MediaTrack> tracks = item.getTracks();
|
||||
boolean regist = item.isRegist();
|
||||
if (regist) {
|
||||
StreamAuthorityInfo streamAuthorityInfo = redisCatchStorage.getStreamAuthorityInfo(app, stream);
|
||||
if (streamAuthorityInfo == null) {
|
||||
streamAuthorityInfo = StreamAuthorityInfo.getInstanceByHook(item);
|
||||
if (item.getOriginType() == OriginType.RTMP_PUSH.ordinal()
|
||||
|| item.getOriginType() == OriginType.RTSP_PUSH.ordinal()
|
||||
|| item.getOriginType() == OriginType.RTC_PUSH.ordinal()) {
|
||||
if (regist) {
|
||||
StreamAuthorityInfo streamAuthorityInfo = redisCatchStorage.getStreamAuthorityInfo(app, stream);
|
||||
if (streamAuthorityInfo == null) {
|
||||
streamAuthorityInfo = StreamAuthorityInfo.getInstanceByHook(item);
|
||||
}else {
|
||||
streamAuthorityInfo.setOriginType(item.getOriginType());
|
||||
streamAuthorityInfo.setOriginTypeStr(item.getOriginTypeStr());
|
||||
}
|
||||
redisCatchStorage.updateStreamAuthorityInfo(app, stream, streamAuthorityInfo);
|
||||
}else {
|
||||
streamAuthorityInfo.setOriginType(item.getOriginType());
|
||||
streamAuthorityInfo.setOriginTypeStr(item.getOriginTypeStr());
|
||||
redisCatchStorage.removeStreamAuthorityInfo(app, stream);
|
||||
}
|
||||
redisCatchStorage.updateStreamAuthorityInfo(app, stream, streamAuthorityInfo);
|
||||
}else {
|
||||
redisCatchStorage.removeStreamAuthorityInfo(app, stream);
|
||||
}
|
||||
if ("rtmp".equals(schema)){
|
||||
|
||||
if ("rtsp".equals(schema)){
|
||||
logger.info("on_stream_changed:注册->{}, app->{}, stream->{}", regist, app, stream);
|
||||
if (regist) {
|
||||
mediaServerService.addCount(mediaServerId);
|
||||
@@ -523,17 +536,21 @@ public class ZLMHttpHookListener {
|
||||
if ("rtp".equals(app)){
|
||||
ret.put("close", true);
|
||||
StreamInfo streamInfoForPlayCatch = redisCatchStorage.queryPlayByStreamId(streamId);
|
||||
SsrcTransaction ssrcTransaction = sessionManager.getSsrcTransaction(null, null, null, streamId);
|
||||
if (streamInfoForPlayCatch != null) {
|
||||
// 如果在给上级推流,也不停止。
|
||||
// 收到无人观看说明流也没有在往上级推送
|
||||
if (redisCatchStorage.isChannelSendingRTP(streamInfoForPlayCatch.getChannelId())) {
|
||||
ret.put("close", false);
|
||||
} else {
|
||||
cmder.streamByeCmd(streamInfoForPlayCatch.getDeviceID(), streamInfoForPlayCatch.getChannelId(),
|
||||
streamInfoForPlayCatch.getStream(), null);
|
||||
redisCatchStorage.stopPlay(streamInfoForPlayCatch);
|
||||
storager.stopPlay(streamInfoForPlayCatch.getDeviceID(), streamInfoForPlayCatch.getChannelId());
|
||||
List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServerByChnnelId(streamInfoForPlayCatch.getChannelId());
|
||||
if (sendRtpItems.size() > 0) {
|
||||
for (SendRtpItem sendRtpItem : sendRtpItems) {
|
||||
ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(sendRtpItem.getPlatformId());
|
||||
commanderFroPlatform.streamByeCmd(parentPlatform, sendRtpItem.getCallId());
|
||||
}
|
||||
}
|
||||
}
|
||||
cmder.streamByeCmd(streamInfoForPlayCatch.getDeviceID(), streamInfoForPlayCatch.getChannelId(),
|
||||
streamInfoForPlayCatch.getStream(), null);
|
||||
redisCatchStorage.stopPlay(streamInfoForPlayCatch);
|
||||
storager.stopPlay(streamInfoForPlayCatch.getDeviceID(), streamInfoForPlayCatch.getChannelId());
|
||||
}else{
|
||||
StreamInfo streamInfoForPlayBackCatch = redisCatchStorage.queryPlayback(null, null, streamId, null);
|
||||
if (streamInfoForPlayBackCatch != null) {
|
||||
@@ -615,9 +632,9 @@ public class ZLMHttpHookListener {
|
||||
}
|
||||
String remoteAddr = request.getRemoteAddr();
|
||||
jsonObject.put("ip", remoteAddr);
|
||||
List<ZLMHttpHookSubscribe.Event> subscribes = this.subscribe.getSubscribes(HookType.on_server_started);
|
||||
List<ZlmHttpHookSubscribe.Event> subscribes = this.subscribe.getSubscribes(HookType.on_server_started);
|
||||
if (subscribes != null && subscribes.size() > 0) {
|
||||
for (ZLMHttpHookSubscribe.Event subscribe : subscribes) {
|
||||
for (ZlmHttpHookSubscribe.Event subscribe : subscribes) {
|
||||
subscribe.response(null, jsonObject);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,30 +1,24 @@
|
||||
package com.genersoft.iot.vmp.media.zlm;
|
||||
|
||||
import com.alibaba.fastjson.JSONObject;
|
||||
import com.genersoft.iot.vmp.conf.UserSetting;
|
||||
import com.genersoft.iot.vmp.gb28181.bean.GbStream;
|
||||
import com.genersoft.iot.vmp.media.zlm.dto.*;
|
||||
import com.genersoft.iot.vmp.service.IMediaServerService;
|
||||
import com.genersoft.iot.vmp.service.IStreamProxyService;
|
||||
import com.genersoft.iot.vmp.service.IStreamPushService;
|
||||
import com.genersoft.iot.vmp.service.bean.ThirdPartyGB;
|
||||
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
|
||||
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
|
||||
import com.genersoft.iot.vmp.storager.dao.GbStreamMapper;
|
||||
import com.genersoft.iot.vmp.storager.dao.PlatformGbStreamMapper;
|
||||
import com.genersoft.iot.vmp.storager.dao.StreamPushMapper;
|
||||
import com.genersoft.iot.vmp.utils.DateUtil;
|
||||
import org.checkerframework.checker.units.qual.C;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.util.StringUtils;
|
||||
|
||||
import java.util.*;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
/**
|
||||
* @author lin
|
||||
@@ -59,7 +53,7 @@ public class ZLMMediaListManager {
|
||||
private StreamPushMapper streamPushMapper;
|
||||
|
||||
@Autowired
|
||||
private ZLMHttpHookSubscribe subscribe;
|
||||
private ZlmHttpHookSubscribe subscribe;
|
||||
|
||||
@Autowired
|
||||
private UserSetting userSetting;
|
||||
|
||||
@@ -92,6 +92,7 @@ public class ZLMRTPServerFactory {
|
||||
int result = -1;
|
||||
// 查询此rtp server 是否已经存在
|
||||
JSONObject rtpInfo = zlmresTfulUtils.getRtpInfo(mediaServerItem, streamId);
|
||||
logger.info(JSONObject.toJSONString(rtpInfo));
|
||||
if(rtpInfo.getInteger("code") == 0){
|
||||
if (rtpInfo.getBoolean("exist")) {
|
||||
result = rtpInfo.getInteger("local_port");
|
||||
@@ -113,7 +114,7 @@ public class ZLMRTPServerFactory {
|
||||
}
|
||||
param.put("ssrc", ssrc);
|
||||
JSONObject openRtpServerResultJson = zlmresTfulUtils.openRtpServer(mediaServerItem, param);
|
||||
|
||||
logger.info(JSONObject.toJSONString(openRtpServerResultJson));
|
||||
if (openRtpServerResultJson != null) {
|
||||
if (openRtpServerResultJson.getInteger("code") == 0) {
|
||||
result= openRtpServerResultJson.getInteger("port");
|
||||
@@ -277,7 +278,7 @@ public class ZLMRTPServerFactory {
|
||||
* 查询待转推的流是否就绪
|
||||
*/
|
||||
public Boolean isRtpReady(MediaServerItem mediaServerItem, String streamId) {
|
||||
JSONObject mediaInfo = zlmresTfulUtils.getMediaInfo(mediaServerItem,"rtp", "rtmp", streamId);
|
||||
JSONObject mediaInfo = zlmresTfulUtils.getMediaInfo(mediaServerItem,"rtp", "rtsp", streamId);
|
||||
return (mediaInfo.getInteger("code") == 0 && mediaInfo.getBoolean("online"));
|
||||
}
|
||||
|
||||
@@ -297,7 +298,7 @@ public class ZLMRTPServerFactory {
|
||||
* @return
|
||||
*/
|
||||
public int totalReaderCount(MediaServerItem mediaServerItem, String app, String streamId) {
|
||||
JSONObject mediaInfo = zlmresTfulUtils.getMediaInfo(mediaServerItem, app, "rtmp", streamId);
|
||||
JSONObject mediaInfo = zlmresTfulUtils.getMediaInfo(mediaServerItem, app, "rtsp", streamId);
|
||||
if (mediaInfo == null) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@@ -8,7 +8,6 @@ import com.genersoft.iot.vmp.conf.MediaConfig;
|
||||
import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
|
||||
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
|
||||
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForServerStarted;
|
||||
import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange;
|
||||
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
|
||||
import com.genersoft.iot.vmp.service.IMediaServerService;
|
||||
import org.slf4j.Logger;
|
||||
@@ -19,9 +18,7 @@ import org.springframework.core.annotation.Order;
|
||||
import org.springframework.scheduling.annotation.Async;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.time.Instant;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
@Component
|
||||
@Order(value=1)
|
||||
@@ -35,7 +32,7 @@ public class ZLMRunner implements CommandLineRunner {
|
||||
private ZLMRESTfulUtils zlmresTfulUtils;
|
||||
|
||||
@Autowired
|
||||
private ZLMHttpHookSubscribe hookSubscribe;
|
||||
private ZlmHttpHookSubscribe hookSubscribe;
|
||||
|
||||
@Autowired
|
||||
private EventPublisher publisher;
|
||||
@@ -62,8 +59,6 @@ public class ZLMRunner implements CommandLineRunner {
|
||||
}
|
||||
mediaServerService.syncCatchFromDatabase();
|
||||
HookSubscribeForServerStarted hookSubscribeForServerStarted = HookSubscribeFactory.on_server_started();
|
||||
// Instant expiresInstant = Instant.now().plusSeconds(TimeUnit.SECONDS.toSeconds(60));
|
||||
// hookSubscribeForStreamChange.setExpires(expiresInstant);
|
||||
// 订阅 zlm启动事件, 新的zlm也会从这里进入系统
|
||||
hookSubscribe.addSubscribe(hookSubscribeForServerStarted,
|
||||
(MediaServerItem mediaServerItem, JSONObject response)->{
|
||||
|
||||
@@ -4,6 +4,9 @@ import com.alibaba.fastjson.JSONObject;
|
||||
import com.genersoft.iot.vmp.media.zlm.dto.HookType;
|
||||
import com.genersoft.iot.vmp.media.zlm.dto.IHookSubscribe;
|
||||
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.scheduling.annotation.Scheduled;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.util.CollectionUtils;
|
||||
|
||||
@@ -13,21 +16,22 @@ import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/**
|
||||
* @description:针对 ZLMediaServer的hook事件订阅
|
||||
* @author: pan
|
||||
* @date: 2020年12月2日 21:17:32
|
||||
* ZLMediaServer的hook事件订阅
|
||||
* @author lin
|
||||
*/
|
||||
@Component
|
||||
public class ZLMHttpHookSubscribe {
|
||||
public class ZlmHttpHookSubscribe {
|
||||
|
||||
private final static Logger logger = LoggerFactory.getLogger(ZlmHttpHookSubscribe.class);
|
||||
|
||||
@FunctionalInterface
|
||||
public interface Event{
|
||||
void response(MediaServerItem mediaServerItem, JSONObject response);
|
||||
}
|
||||
|
||||
private Map<HookType, Map<IHookSubscribe, ZLMHttpHookSubscribe.Event>> allSubscribes = new ConcurrentHashMap<>();
|
||||
private Map<HookType, Map<IHookSubscribe, ZlmHttpHookSubscribe.Event>> allSubscribes = new ConcurrentHashMap<>();
|
||||
|
||||
public void addSubscribe(IHookSubscribe hookSubscribe, ZLMHttpHookSubscribe.Event event) {
|
||||
public void addSubscribe(IHookSubscribe hookSubscribe, ZlmHttpHookSubscribe.Event event) {
|
||||
if (hookSubscribe.getExpires() == null) {
|
||||
// 默认5分钟过期
|
||||
Instant expiresInstant = Instant.now().plusSeconds(TimeUnit.MINUTES.toSeconds(5));
|
||||
@@ -36,8 +40,8 @@ public class ZLMHttpHookSubscribe {
|
||||
allSubscribes.computeIfAbsent(hookSubscribe.getHookType(), k -> new ConcurrentHashMap<>()).put(hookSubscribe, event);
|
||||
}
|
||||
|
||||
public ZLMHttpHookSubscribe.Event sendNotify(HookType type, JSONObject hookResponse) {
|
||||
ZLMHttpHookSubscribe.Event event= null;
|
||||
public ZlmHttpHookSubscribe.Event sendNotify(HookType type, JSONObject hookResponse) {
|
||||
ZlmHttpHookSubscribe.Event event= null;
|
||||
Map<IHookSubscribe, Event> eventMap = allSubscribes.get(type);
|
||||
if (eventMap == null) {
|
||||
return null;
|
||||
@@ -69,8 +73,8 @@ public class ZLMHttpHookSubscribe {
|
||||
|
||||
Set<Map.Entry<IHookSubscribe, Event>> entries = eventMap.entrySet();
|
||||
if (entries.size() > 0) {
|
||||
List<Map.Entry<IHookSubscribe, ZLMHttpHookSubscribe.Event>> entriesToRemove = new ArrayList<>();
|
||||
for (Map.Entry<IHookSubscribe, ZLMHttpHookSubscribe.Event> entry : entries) {
|
||||
List<Map.Entry<IHookSubscribe, ZlmHttpHookSubscribe.Event>> entriesToRemove = new ArrayList<>();
|
||||
for (Map.Entry<IHookSubscribe, ZlmHttpHookSubscribe.Event> entry : entries) {
|
||||
JSONObject content = entry.getKey().getContent();
|
||||
if (content == null || content.size() == 0) {
|
||||
entriesToRemove.add(entry);
|
||||
@@ -87,13 +91,13 @@ public class ZLMHttpHookSubscribe {
|
||||
result = result && content.getString(s).equals(hookSubscribe.getContent().getString(s));
|
||||
}
|
||||
}
|
||||
if (null != result && result){
|
||||
if (result){
|
||||
entriesToRemove.add(entry);
|
||||
}
|
||||
}
|
||||
|
||||
if (!CollectionUtils.isEmpty(entriesToRemove)) {
|
||||
for (Map.Entry<IHookSubscribe, ZLMHttpHookSubscribe.Event> entry : entriesToRemove) {
|
||||
for (Map.Entry<IHookSubscribe, ZlmHttpHookSubscribe.Event> entry : entriesToRemove) {
|
||||
entries.remove(entry);
|
||||
}
|
||||
}
|
||||
@@ -106,12 +110,12 @@ public class ZLMHttpHookSubscribe {
|
||||
* @param type
|
||||
* @return
|
||||
*/
|
||||
public List<ZLMHttpHookSubscribe.Event> getSubscribes(HookType type) {
|
||||
public List<ZlmHttpHookSubscribe.Event> getSubscribes(HookType type) {
|
||||
Map<IHookSubscribe, Event> eventMap = allSubscribes.get(type);
|
||||
if (eventMap == null) {
|
||||
return null;
|
||||
}
|
||||
List<ZLMHttpHookSubscribe.Event> result = new ArrayList<>();
|
||||
List<ZlmHttpHookSubscribe.Event> result = new ArrayList<>();
|
||||
for (IHookSubscribe key : eventMap.keySet()) {
|
||||
result.add(eventMap.get(key));
|
||||
}
|
||||
@@ -127,5 +131,28 @@ public class ZLMHttpHookSubscribe {
|
||||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
* 对订阅数据进行过期清理
|
||||
*/
|
||||
@Scheduled(cron="0 0/5 * * * ?") //每5分钟执行一次
|
||||
public void execute(){
|
||||
|
||||
logger.info("[hook订阅] 清理");
|
||||
|
||||
Instant instant = Instant.now().minusMillis(TimeUnit.MINUTES.toMillis(5));
|
||||
int total = 0;
|
||||
for (HookType hookType : allSubscribes.keySet()) {
|
||||
Map<IHookSubscribe, Event> hookSubscribeEventMap = allSubscribes.get(hookType);
|
||||
if (hookSubscribeEventMap.size() > 0) {
|
||||
for (IHookSubscribe hookSubscribe : hookSubscribeEventMap.keySet()) {
|
||||
if (hookSubscribe.getExpires().isBefore(instant)) {
|
||||
// 过期的
|
||||
hookSubscribeEventMap.remove(hookSubscribe);
|
||||
total ++;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
logger.info("[hook订阅] 清理结束,共清理{}条过期数据", total);
|
||||
}
|
||||
}
|
||||
@@ -1,72 +0,0 @@
|
||||
package com.genersoft.iot.vmp.media.zlm.event;
|
||||
|
||||
import com.alibaba.fastjson.JSONObject;
|
||||
import com.genersoft.iot.vmp.common.VideoManagerConstants;
|
||||
import com.genersoft.iot.vmp.conf.RedisKeyExpirationEventMessageListener;
|
||||
import com.genersoft.iot.vmp.conf.UserSetting;
|
||||
import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
|
||||
import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils;
|
||||
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
|
||||
import com.genersoft.iot.vmp.service.IMediaServerService;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.data.redis.connection.Message;
|
||||
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
/**
|
||||
* @description:设备心跳超时监听,借助redis过期特性,进行监听,监听到说明设备心跳超时,发送离线事件
|
||||
* @author: swwheihei
|
||||
* @date: 2020年5月6日 上午11:35:46
|
||||
*/
|
||||
@Component
|
||||
public class ZLMKeepliveTimeoutListener extends RedisKeyExpirationEventMessageListener {
|
||||
|
||||
private Logger logger = LoggerFactory.getLogger(ZLMKeepliveTimeoutListener.class);
|
||||
|
||||
@Autowired
|
||||
private EventPublisher publisher;
|
||||
|
||||
@Autowired
|
||||
private ZLMRESTfulUtils zlmresTfulUtils;
|
||||
|
||||
@Autowired
|
||||
private UserSetting userSetting;
|
||||
|
||||
@Autowired
|
||||
private IMediaServerService mediaServerService;
|
||||
|
||||
public ZLMKeepliveTimeoutListener(RedisMessageListenerContainer listenerContainer, UserSetting userSetting) {
|
||||
super(listenerContainer, userSetting);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* 监听失效的key,key格式为keeplive_deviceId
|
||||
* @param message
|
||||
* @param pattern
|
||||
*/
|
||||
@Override
|
||||
public void onMessage(Message message, byte[] pattern) {
|
||||
// 获取失效的key
|
||||
String expiredKey = message.toString();
|
||||
String KEEPLIVEKEY_PREFIX = VideoManagerConstants.MEDIA_SERVER_KEEPALIVE_PREFIX + userSetting.getServerId() + "_";
|
||||
if(!expiredKey.startsWith(KEEPLIVEKEY_PREFIX)){
|
||||
return;
|
||||
}
|
||||
|
||||
String mediaServerId = expiredKey.substring(KEEPLIVEKEY_PREFIX.length(),expiredKey.length());
|
||||
logger.info("[zlm心跳到期]:" + mediaServerId);
|
||||
// 发起http请求验证zlm是否确实无法连接,如果确实无法连接则发送离线事件,否则不作处理
|
||||
MediaServerItem mediaServerItem = mediaServerService.getOne(mediaServerId);
|
||||
JSONObject mediaServerConfig = zlmresTfulUtils.getMediaServerConfig(mediaServerItem);
|
||||
if (mediaServerConfig != null && mediaServerConfig.getInteger("code") == 0) {
|
||||
logger.info("[zlm心跳到期]:{}验证后zlm仍在线,恢复心跳信息", mediaServerId);
|
||||
// 添加zlm信息
|
||||
mediaServerService.updateMediaServerKeepalive(mediaServerId, mediaServerConfig);
|
||||
}else {
|
||||
publisher.zlmOfflineEventPublish(mediaServerId);
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user