优化级联时的异常处理

This commit is contained in:
panlinlin
2021-04-26 18:38:57 +08:00
parent bc2b288547
commit 39078225f1
25 changed files with 226 additions and 31 deletions

View File

@@ -39,8 +39,8 @@ public class KeepaliveTimeoutListenerForPlatform extends KeyExpirationEventMessa
// 获取失效的key
String expiredKey = message.toString();
logger.info(expiredKey);
if(!expiredKey.startsWith(VideoManagerConstants.PLATFORM_PREFIX)){
logger.info("收到redis过期监听但开头不是"+VideoManagerConstants.PLATFORM_PREFIX+",忽略");
if(!expiredKey.startsWith(VideoManagerConstants.PLATFORM_KEEPLIVEKEY_PREFIX)){
logger.debug("收到redis过期监听但开头不是"+VideoManagerConstants.PLATFORM_KEEPLIVEKEY_PREFIX+",忽略");
return;
}
// 平台心跳到期,需要重发, 判断是否已经多次未收到心跳回复, 多次未收到,则重新发起注册, 注册尝试多次未得到回复,则认为平台离线
@@ -49,7 +49,6 @@ public class KeepaliveTimeoutListenerForPlatform extends KeyExpirationEventMessa
publisher.platformKeepaliveExpireEventPublish(platformGBId);
}else if (expiredKey.startsWith(VideoManagerConstants.PLATFORM_REGISTER_PREFIX)) {
logger.info("11111111111111");
String platformGBId = expiredKey.substring(VideoManagerConstants.PLATFORM_REGISTER_PREFIX.length(),expiredKey.length());
publisher.platformNotRegisterEventPublish(platformGBId);

View File

@@ -38,7 +38,7 @@ public class KeepliveTimeoutListener extends KeyExpirationEventMessageListener {
// 获取失效的key
String expiredKey = message.toString();
if(!expiredKey.startsWith(VideoManagerConstants.KEEPLIVEKEY_PREFIX)){
logger.info("收到redis过期监听但开头不是"+VideoManagerConstants.KEEPLIVEKEY_PREFIX+",忽略");
logger.debug("收到redis过期监听但开头不是"+VideoManagerConstants.KEEPLIVEKEY_PREFIX+",忽略");
return;
}

View File

@@ -63,6 +63,7 @@ public class PlatformKeepaliveExpireEventLister implements ApplicationListener<P
if (parentPlatformCatch.getKeepAliveReply() >= 3) {
// 有3次未收到心跳回复, 设置平台状态为离线, 开始重新注册
logger.warn("有3次未收到心跳回复,标记设置平台状态为离线, 并重新注册 平台国标ID" + event.getPlatformGbID());
storager.updateParentPlatformStatus(event.getPlatformGbID(), false);
publisher.platformNotRegisterEventPublish(event.getPlatformGbID());
parentPlatformCatch.setKeepAliveReply(0);
}else {

View File

@@ -1,7 +1,10 @@
package com.genersoft.iot.vmp.gb28181.event.platformNotRegister;
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform;
import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -9,6 +12,10 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationListener;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* @Description: 平台未注册事件,来源有二:
* 1、平台新添加
@@ -23,23 +30,53 @@ public class PlatformNotRegisterEventLister implements ApplicationListener<Platf
@Autowired
private IVideoManagerStorager storager;
@Autowired
private IRedisCatchStorage redisCatchStorage;
@Autowired
private SIPCommanderFroPlatform sipCommanderFroPlatform;
@Autowired
private ZLMRTPServerFactory zlmrtpServerFactory;
// @Autowired
// private RedisUtil redis;
@Override
public void onApplicationEvent(PlatformNotRegisterEvent event) {
logger.debug("平台未注册事件触发平台国标ID" + event.getPlatformGbID());
logger.info("平台未注册事件触发平台国标ID" + event.getPlatformGbID());
ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(event.getPlatformGbID());
if (parentPlatform == null) {
logger.debug("平台未注册事件触发,但平台已经删除!!! 平台国标ID" + event.getPlatformGbID());
logger.info("平台未注册事件触发,但平台已经删除!!! 平台国标ID" + event.getPlatformGbID());
return;
}
// 查询是否有推流, 如果有则都停止
List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServer(event.getPlatformGbID());
logger.info("停止[ {} ]的所有推流size", sendRtpItems.size());
if (sendRtpItems != null && sendRtpItems.size() > 0) {
logger.info("停止[ {} ]的所有推流", event.getPlatformGbID());
StringBuilder app = new StringBuilder();
StringBuilder stream = new StringBuilder();
for (int i = 0; i < sendRtpItems.size(); i++) {
if (app.length() != 0) {
app.append(",");
}
app.append(sendRtpItems.get(i).getApp());
if (stream.length() != 0) {
stream.append(",");
}
stream.append(sendRtpItems.get(i).getStreamId());
redisCatchStorage.deleteSendRTPServer(event.getPlatformGbID(), sendRtpItems.get(i).getChannelId());
}
Map<String, Object> param = new HashMap<>();
param.put("vhost","__defaultVhost__");
param.put("app", app.toString());
param.put("stream", stream.toString());
zlmrtpServerFactory.stopSendRtpStream(param);
}
sipCommanderFroPlatform.register(parentPlatform);
}
}

View File

@@ -43,7 +43,7 @@ public class CheckForAllRecordsThread extends Thread {
if (totalRecordList.size() < this.recordInfo.getSumNum()) {
logger.info("已获取" + totalRecordList.size() + "项录像数据,共" + this.recordInfo.getSumNum() + "");
} else {
logger.info("录像数据已全部获取,共" + this.recordInfo.getSumNum() + "");
logger.info("录像数据已全部获取,共 {} 项", this.recordInfo.getSumNum());
this.recordInfo.setRecordList(totalRecordList);
for (int i = 0; i < cacheKeys.size(); i++) {
redis.del(cacheKeys.get(i).toString());

View File

@@ -58,6 +58,7 @@ public class ByeRequestProcessor extends SIPRequestAbstractProcessor {
param.put("vhost","__defaultVhost__");
param.put("app",sendRtpItem.getApp());
param.put("stream",streamId);
param.put("ssrc",sendRtpItem.getSsrc());
logger.info("停止向上级推流:" + streamId);
zlmrtpServerFactory.stopSendRtpStream(param);
redisCatchStorage.deleteSendRTPServer(platformGbId, channelId);

View File

@@ -93,6 +93,11 @@ public class InviteRequestProcessor extends SIPRequestAbstractProcessor {
GbStream gbStream = storager.queryStreamInParentPlatform(requesterId, channelId);
// 不是通道可能是直播流
if (channel != null || gbStream != null ) {
if (channel.getStatus() == 0) {
logger.info("通道离线返回400");
responseAck(evt, Response.BAD_REQUEST, "channel [" + channel.getChannelId() + "] offline");
return;
}
responseAck(evt, Response.CALL_IS_BEING_FORWARDED); // 通道存在发181呼叫转接中
}else {
logger.info("通道不存在返回404");
@@ -367,6 +372,12 @@ public class InviteRequestProcessor extends SIPRequestAbstractProcessor {
getServerTransaction(evt).sendResponse(response);
}
private void responseAck(RequestEvent evt, int statusCode, String msg) throws SipException, InvalidArgumentException, ParseException {
Response response = getMessageFactory().createResponse(statusCode, evt.getRequest());
response.setReasonPhrase(msg);
getServerTransaction(evt).sendResponse(response);
}
/**
* 回复带sdp的200
* @param evt

View File

@@ -770,14 +770,17 @@ public class MessageRequestProcessor extends SIPRequestAbstractProcessor {
try {
Element rootElement = getRootElement(evt);
String deviceId = XmlUtil.getText(rootElement, "DeviceID");
// 检查设备是否存在, 不存在则不回复
if (storager.exists(deviceId)) {
Device device = storager.queryVideoDevice(deviceId);
// 检查设备是否存在并在线, 不存在则不回复
if (device != null && device.getOnline() == 1) {
// 回复200 OK
responseAck(evt);
if (offLineDetector.isOnline(deviceId)) {
publisher.onlineEventPublish(deviceId, VideoManagerConstants.EVENT_ONLINE_KEEPLIVE);
} else {
}
}else {
logger.warn("收到[ "+deviceId+" ]心跳信息, 但是设备" + (device == null? "不存在":"离线") + ", 心跳信息不予以回复");
}
} catch (ParseException | SipException | InvalidArgumentException | DocumentException e) {
e.printStackTrace();

View File

@@ -146,7 +146,7 @@ public class RegisterRequestProcessor extends SIPRequestAbstractProcessor {
// 注册成功
// 保存到redis
// 下发catelog查询目录
if (registerFlag == 1 && device != null) {
if (registerFlag == 1 ) {
logger.info("注册成功! deviceId:" + device.getDeviceId());
// boolean exists = storager.exists(device.getDeviceId());
device.setRegisterTimeMillis(System.currentTimeMillis());

View File

@@ -80,12 +80,13 @@ public class RegisterResponseProcessor implements ISIPResponseProcessor {
// 注册/注销成功
logger.info(String.format("%s %s成功", platformGBId, action));
redisCatchStorage.delPlatformRegisterInfo(callId);
parentPlatform.setStatus(true);
parentPlatform.setStatus("注册".equals(action));
// 取回Expires设置避免注销过程中被置为0
ParentPlatform parentPlatformTmp = storager.queryParentPlatByServerGBId(platformGBId);
String expires = parentPlatformTmp.getExpires();
parentPlatform.setExpires(expires);
storager.updateParentPlatform(parentPlatform);
parentPlatform.setId(parentPlatformTmp.getId());
storager.updateParentPlatformStatus(platformGBId, "注册".equals(action));
redisCatchStorage.updatePlatformRegister(parentPlatform);