diff --git a/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java b/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java index cee1993fa..2fa59b8ef 100644 --- a/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java +++ b/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java @@ -1,13 +1,13 @@ package com.genersoft.iot.vmp.common; -/** - * @description: 定义常量 +/** + * @description: 定义常量 * @author: swwheihei - * @date: 2019年5月30日 下午3:04:04 - * + * @date: 2019年5月30日 下午3:04:04 + * */ public class VideoManagerConstants { - + public static final String WVP_SERVER_PREFIX = "VMP_SIGNALLING_SERVER_INFO_"; public static final String WVP_SERVER_LIST = "VMP_SERVER_LIST"; @@ -22,10 +22,6 @@ public class VideoManagerConstants { public static final String INVITE_PREFIX = "VMP_GB_INVITE_INFO"; - public static final String PLATFORM_CATCH_PREFIX = "VMP_PLATFORM_CATCH_"; - - public static final String PLATFORM_REGISTER_INFO_PREFIX = "VMP_PLATFORM_REGISTER_INFO_"; - public static final String SEND_RTP_PORT = "VM_SEND_RTP_PORT:"; public static final String SEND_RTP_INFO_CALLID = "VMP_SEND_RTP_INFO:CALL_ID:"; public static final String SEND_RTP_INFO_STREAM = "VMP_SEND_RTP_INFO:STREAM:"; diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SipTransactionInfo.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SipTransactionInfo.java index 2edb71d5e..74c63e112 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SipTransactionInfo.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SipTransactionInfo.java @@ -10,6 +10,8 @@ public class SipTransactionInfo { private String fromTag; private String toTag; private String viaBranch; + private int expires; + private String user; // 自己是否媒体流发送者 private boolean asSender; diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/dao/PlatformMapper.java b/src/main/java/com/genersoft/iot/vmp/gb28181/dao/PlatformMapper.java index b52c27968..7b6846ea2 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/dao/PlatformMapper.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/dao/PlatformMapper.java @@ -101,7 +101,7 @@ public interface PlatformMapper { @Select("SELECT * FROM wvp_platform ") List queryAll(); - @Select("SELECT * FROM wvp_platform WHERE enable=true and server_id == #{serverId} group by server_id") + @Select("SELECT * FROM wvp_platform WHERE enable=true and server_id = #{serverId}") List queryServerIdsWithEnableAndServer(@Param("serverId") String serverId); @Update("UPDATE wvp_platform SET status=false" ) diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/IPlatformService.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/IPlatformService.java index 91eb00777..7ae44996f 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/IPlatformService.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/IPlatformService.java @@ -53,7 +53,7 @@ public interface IPlatformService { * 平台离线 * @param parentPlatform 平台信息 */ - void offline(Platform parentPlatform, boolean stopRegisterTask); + void offline(Platform parentPlatform); /** * 向上级平台发送位置订阅 diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceServiceImpl.java index 05881e9a3..38f4e004f 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceServiceImpl.java @@ -38,6 +38,7 @@ import gov.nist.javax.sip.message.SIPResponse; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.CommandLineRunner; +import org.springframework.core.annotation.Order; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; @@ -59,6 +60,7 @@ import java.util.concurrent.TimeUnit; */ @Slf4j @Service +@Order(value=16) public class DeviceServiceImpl implements IDeviceService, CommandLineRunner { @Autowired @@ -707,6 +709,7 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner { public void subscribeMobilePosition(int id, int cycle, int interval) { Device device = deviceMapper.query(id); Assert.notNull(device, "未找到设备"); + if (device.getSubscribeCycleForMobilePosition() == cycle) { return; } @@ -729,6 +732,7 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner { // 订阅未开启 device.setSubscribeCycleForMobilePosition(cycle); device.setMobilePositionSubmissionInterval(interval); + updateDevice(device); // 开启订阅 addMobilePositionSubscribe(device, null); } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlatformServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlatformServiceImpl.java index b40d8d8c6..2c58259b3 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlatformServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlatformServiceImpl.java @@ -38,6 +38,7 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.CommandLineRunner; import org.springframework.context.event.EventListener; +import org.springframework.core.annotation.Order; import org.springframework.scheduling.annotation.Async; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service; @@ -59,6 +60,7 @@ import java.util.concurrent.TimeUnit; */ @Slf4j @Service +@Order(value=15) public class PlatformServiceImpl implements IPlatformService, CommandLineRunner { private final static String REGISTER_KEY_PREFIX = "platform_register_"; @@ -142,14 +144,25 @@ public class PlatformServiceImpl implements IPlatformService, CommandLineRunner return; } for (Platform platform : platformList) { - sendRegister(platform, null); + if (statusTaskRunner.containsRegister(platform.getServerGBId()) && statusTaskRunner.containsKeepAlive(platform.getServerGBId())) { + continue; + } + if (statusTaskRunner.containsRegister(platform.getServerGBId())) { + SipTransactionInfo transactionInfo = statusTaskRunner.getRegisterTransactionInfo(platform.getServerGBId()); + // 注销后出发平台离线, 如果是启用的平台,那么下次丢失检测会检测到并重新注册上线 + sendUnRegister(platform, transactionInfo); + }else { + statusTaskRunner.removeKeepAliveTask(platform.getServerGBId()); + sendRegister(platform, null); + } } } private void sendRegister(Platform platform, SipTransactionInfo sipTransactionInfo) { try { commanderForPlatform.register(platform, sipTransactionInfo, eventResult -> { - log.info("[国标级联] {}({}),添加向上级注册失败,请确定上级平台可用时重新保存", platform.getName(), platform.getServerGBId()); + log.info("[国标级联] {}({}),注册失败", platform.getName(), platform.getServerGBId()); + offline(platform); }, null); } catch (InvalidArgumentException | ParseException | SipException e) { log.error("[命令发送失败] 国标级联: {}", e.getMessage()); @@ -339,6 +352,8 @@ public class PlatformServiceImpl implements IPlatformService, CommandLineRunner return result > 0; } + + @Override public boolean update(Platform platform) { Assert.isTrue(platform.getId() > 0, "ID必须存在"); @@ -366,13 +381,13 @@ public class PlatformServiceImpl implements IPlatformService, CommandLineRunner @Override public void online(Platform platform, SipTransactionInfo sipTransactionInfo) { log.info("[国标级联]:{}, 平台上线", platform.getServerGBId()); - PlatformRegisterTask registerTask = new PlatformRegisterTask(platform.getServerId(), platform.getExpires() * 1000L - 500L, + PlatformRegisterTask registerTask = new PlatformRegisterTask(platform.getServerGBId(), platform.getExpires() * 1000L - 500L, sipTransactionInfo, (platformServerGbId) -> { this.registerExpire(platformServerGbId, sipTransactionInfo); }); statusTaskRunner.addRegisterTask(registerTask); - PlatformKeepaliveTask keepaliveTask = new PlatformKeepaliveTask(platform.getServerId(), platform.getKeepTimeout() * 1000L, + PlatformKeepaliveTask keepaliveTask = new PlatformKeepaliveTask(platform.getServerGBId(), platform.getKeepTimeout() * 1000L, this::keepaliveExpire); statusTaskRunner.addKeepAliveTask(keepaliveTask); @@ -421,31 +436,31 @@ public class PlatformServiceImpl implements IPlatformService, CommandLineRunner // 心跳超时失败 if (failCount < 2) { log.info("[国标级联] 心跳发送超时, 平台服务编号: {}", platformServerId); - PlatformKeepaliveTask keepaliveTask = new PlatformKeepaliveTask(platform.getServerId(), platform.getKeepTimeout() * 1000L, + PlatformKeepaliveTask keepaliveTask = new PlatformKeepaliveTask(platform.getServerGBId(), platform.getKeepTimeout() * 1000L, this::keepaliveExpire); keepaliveTask.setFailCount(failCount + 1); statusTaskRunner.addKeepAliveTask(keepaliveTask); }else { // 心跳超时三次, 不再发送心跳, 平台离线 log.info("[国标级联] 心跳发送超时三次,平台离线, 平台服务编号: {}", platformServerId); - offline(platform, false); + offline(platform); } }, eventResult -> { - PlatformKeepaliveTask keepaliveTask = new PlatformKeepaliveTask(platform.getServerId(), platform.getKeepTimeout() * 1000L, + PlatformKeepaliveTask keepaliveTask = new PlatformKeepaliveTask(platform.getServerGBId(), platform.getKeepTimeout() * 1000L, this::keepaliveExpire); statusTaskRunner.addKeepAliveTask(keepaliveTask); }); } catch (SipException | InvalidArgumentException | ParseException e) { log.error("[命令发送失败] 国标级联 发送心跳: {}", e.getMessage()); if (failCount < 2) { - PlatformKeepaliveTask keepaliveTask = new PlatformKeepaliveTask(platform.getServerId(), platform.getKeepTimeout() * 1000L, + PlatformKeepaliveTask keepaliveTask = new PlatformKeepaliveTask(platform.getServerGBId(), platform.getKeepTimeout() * 1000L, this::keepaliveExpire); keepaliveTask.setFailCount(failCount + 1); statusTaskRunner.addKeepAliveTask(keepaliveTask); }else { // 心跳超时三次, 不再发送心跳, 平台离线 log.info("[国标级联] 心跳发送失败三次,平台离线, 平台服务编号: {}", platformServerId); - offline(platform, false); + offline(platform); } } } @@ -458,7 +473,7 @@ public class PlatformServiceImpl implements IPlatformService, CommandLineRunner } @Override - public void offline(Platform platform, boolean stopRegister) { + public void offline(Platform platform) { log.info("[平台离线]:{}({})", platform.getName(), platform.getServerGBId()); statusTaskRunner.removeRegisterTask(platform.getServerGBId()); statusTaskRunner.removeKeepAliveTask(platform.getServerGBId()); @@ -475,7 +490,7 @@ public class PlatformServiceImpl implements IPlatformService, CommandLineRunner private void stopAllPush(String platformId) { List sendRtpItems = sendRtpServerService.queryForPlatform(platformId); - if (sendRtpItems != null && sendRtpItems.size() > 0) { + if (sendRtpItems != null && !sendRtpItems.isEmpty()) { for (SendRtpInfo sendRtpItem : sendRtpItems) { ssrcFactory.releaseSsrc(sendRtpItem.getMediaServerId(), sendRtpItem.getSsrc()); sendRtpServerService.delete(sendRtpItem); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/task/platformStatus/PlatformRegisterTask.java b/src/main/java/com/genersoft/iot/vmp/gb28181/task/platformStatus/PlatformRegisterTask.java index 325cfee92..781dc96f7 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/task/platformStatus/PlatformRegisterTask.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/task/platformStatus/PlatformRegisterTask.java @@ -38,7 +38,7 @@ public class PlatformRegisterTask implements Delayed { public PlatformRegisterTask(String platformServerId, long delayTime, SipTransactionInfo sipTransactionInfo, CommonCallback callback) { this.platformServerId = platformServerId; - this.delayTime = System.currentTimeMillis() + delayTime; + this.delayTime = System.currentTimeMillis() + delayTime; this.callback = callback; this.sipTransactionInfo = sipTransactionInfo; } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/task/platformStatus/PlatformStatusTaskRunner.java b/src/main/java/com/genersoft/iot/vmp/gb28181/task/platformStatus/PlatformStatusTaskRunner.java index d19434aa9..eaa96aedf 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/task/platformStatus/PlatformStatusTaskRunner.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/task/platformStatus/PlatformStatusTaskRunner.java @@ -147,10 +147,9 @@ public class PlatformStatusTaskRunner { public boolean removeKeepAliveTask(String platformServerId) { PlatformKeepaliveTask task = keepaliveSubscribes.get(platformServerId); - if (task == null) { - return false; + if (task != null) { + keepaliveSubscribes.remove(platformServerId); } - keepaliveSubscribes.remove(platformServerId); if (keepaliveTaskDelayQueue.contains(task)) { boolean remove = keepaliveTaskDelayQueue.remove(task); if (!remove) { diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPProcessorObserver.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPProcessorObserver.java index 0fb336397..3ac4be1bf 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPProcessorObserver.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPProcessorObserver.java @@ -84,6 +84,11 @@ public class SIPProcessorObserver implements ISIPProcessorObserver { // Success if (((status >= Response.OK) && (status < Response.MULTIPLE_CHOICES)) || status == Response.UNAUTHORIZED) { + ISIPResponseProcessor sipRequestProcessor = responseProcessorMap.get(response.getCSeqHeader().getMethod()); + if (sipRequestProcessor != null) { + sipRequestProcessor.process(responseEvent); + } + CallIdHeader callIdHeader = response.getCallIdHeader(); CSeqHeader cSeqHeader = response.getCSeqHeader(); if (callIdHeader != null) { @@ -96,10 +101,6 @@ public class SIPProcessorObserver implements ISIPProcessorObserver { sipSubscribe.removeSubscribe(callIdHeader.getCallId() + cSeqHeader.getSeqNumber()); } } - ISIPResponseProcessor sipRequestProcessor = responseProcessorMap.get(response.getCSeqHeader().getMethod()); - if (sipRequestProcessor != null) { - sipRequestProcessor.process(responseEvent); - } } else if ((status >= Response.TRYING) && (status < Response.OK)) { // 增加其它无需回复的响应,如101、180等 // 更新sip订阅的时间 diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPSender.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPSender.java index a6640ea51..8c6ebf4e2 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPSender.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPSender.java @@ -8,6 +8,9 @@ import com.genersoft.iot.vmp.gb28181.event.sip.SipEvent; import com.genersoft.iot.vmp.gb28181.utils.SipUtils; import com.genersoft.iot.vmp.utils.GitUtil; import gov.nist.javax.sip.SipProviderImpl; +import gov.nist.javax.sip.address.SipUri; +import gov.nist.javax.sip.message.SIPRequest; +import gov.nist.javax.sip.message.SIPResponse; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @@ -37,6 +40,7 @@ public class SIPSender { @Autowired private SipSubscribe sipSubscribe; + @Autowired private SipConfig sipConfig; @@ -86,15 +90,25 @@ public class SIPSender { }), timeout == null ? sipConfig.getTimeout() : timeout); SipTransactionInfo sipTransactionInfo = new SipTransactionInfo(); sipTransactionInfo.setFromTag(fromHeader.getTag()); - sipTransactionInfo.setFromTag(fromHeader.getTag()); + sipTransactionInfo.setCallId(callIdHeader.getCallId()); - - if (message instanceof Response) { - ToHeader toHeader = (ToHeader) message.getHeader(ToHeader.NAME); - sipTransactionInfo.setToTag(toHeader.getTag()); + if (message instanceof SIPResponse) { + SIPResponse response = (SIPResponse) message; + sipTransactionInfo.setToTag(response.getToHeader().getTag()); + sipTransactionInfo.setViaBranch(response.getTopmostViaHeader().getBranch()); + }else if (message instanceof SIPRequest) { + SIPRequest request = (SIPRequest) message; + sipTransactionInfo.setViaBranch(request.getTopmostViaHeader().getBranch()); + SipUri sipUri = (SipUri)request.getRequestLine().getUri(); + sipTransactionInfo.setUser(sipUri.getUser()); } + + ExpiresHeader expiresHeader = (ExpiresHeader) message.getHeader(ExpiresHeader.NAME); + if (expiresHeader != null) { + sipTransactionInfo.setExpires(expiresHeader.getExpires()); + } sipEvent.setSipTransactionInfo(sipTransactionInfo); sipSubscribe.addSubscribe(key, sipEvent); } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderForPlatform.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderForPlatform.java index bce60a796..8ad0a6c2f 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderForPlatform.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderForPlatform.java @@ -21,7 +21,6 @@ import com.genersoft.iot.vmp.media.service.IMediaServerService; import com.genersoft.iot.vmp.service.bean.GPSMsgInfo; import com.genersoft.iot.vmp.service.bean.SSRCInfo; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; -import com.genersoft.iot.vmp.storager.dao.dto.PlatformRegisterInfo; import com.genersoft.iot.vmp.utils.DateUtil; import com.genersoft.iot.vmp.utils.GitUtil; import gov.nist.javax.sip.message.MessageFactoryImpl; @@ -121,9 +120,6 @@ public class SIPCommanderForPlatform implements ISIPCommanderForPlatform { request = headerProviderPlatformProvider.createRegisterRequest(parentPlatform, redisCatchStorage.getCSEQ(), fromTag, toTag, callIdHeader, isRegister? parentPlatform.getExpires() : 0); - // 将 callid 写入缓存, 等注册成功可以更新状态 - String callIdFromHeader = callIdHeader.getCallId(); - redisCatchStorage.updatePlatformRegisterInfo(callIdFromHeader, PlatformRegisterInfo.getInstance(parentPlatform.getServerGBId(), isRegister)); }else { request = headerProviderPlatformProvider.createRegisterRequest(parentPlatform, fromTag, toTag, www, callIdHeader, isRegister? parentPlatform.getExpires() : 0); } @@ -132,7 +128,6 @@ public class SIPCommanderForPlatform implements ISIPCommanderForPlatform { if (event != null) { log.info("[国标级联]:{}, 注册失败: {} ", parentPlatform.getServerGBId(), event.msg); } - redisCatchStorage.delPlatformRegisterInfo(callIdHeader.getCallId()); if (errorEvent != null ) { errorEvent.response(event); } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/RegisterResponseProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/RegisterResponseProcessor.java index 428152907..1f73370b1 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/RegisterResponseProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/RegisterResponseProcessor.java @@ -1,14 +1,14 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.response.impl; import com.genersoft.iot.vmp.gb28181.bean.Platform; -import com.genersoft.iot.vmp.gb28181.bean.PlatformCatch; import com.genersoft.iot.vmp.gb28181.bean.SipTransactionInfo; +import com.genersoft.iot.vmp.gb28181.event.SipSubscribe; +import com.genersoft.iot.vmp.gb28181.event.sip.SipEvent; +import com.genersoft.iot.vmp.gb28181.service.IPlatformService; import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver; import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform; import com.genersoft.iot.vmp.gb28181.transmit.event.response.SIPResponseProcessorAbstract; -import com.genersoft.iot.vmp.gb28181.service.IPlatformService; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; -import com.genersoft.iot.vmp.storager.dao.dto.PlatformRegisterInfo; import gov.nist.javax.sip.message.SIPResponse; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; @@ -44,6 +44,9 @@ public class RegisterResponseProcessor extends SIPResponseProcessorAbstract { @Autowired private IPlatformService platformService; + @Autowired + private SipSubscribe sipSubscribe; + @Override public void afterPropertiesSet() throws Exception { // 添加消息处理的订阅 @@ -59,23 +62,19 @@ public class RegisterResponseProcessor extends SIPResponseProcessorAbstract { public void process(ResponseEvent evt) { SIPResponse response = (SIPResponse)evt.getResponse(); String callId = response.getCallIdHeader().getCallId(); - PlatformRegisterInfo platformRegisterInfo = redisCatchStorage.queryPlatformRegisterInfo(callId); - if (platformRegisterInfo == null) { - log.info(String.format("[国标级联]未找到callId: %s 的注册/注销平台id", callId )); + long seqNumber = response.getCSeqHeader().getSeqNumber(); + SipEvent subscribe = sipSubscribe.getSubscribe(callId + seqNumber); + if (subscribe == null || subscribe.getSipTransactionInfo() == null || subscribe.getSipTransactionInfo().getUser() == null) { return; } - PlatformCatch parentPlatformCatch = redisCatchStorage.queryPlatformCatchInfo(platformRegisterInfo.getPlatformId()); - if (parentPlatformCatch == null) { - log.warn(String.format("[国标级联]收到注册/注销%S请求,平台:%s,但是平台缓存信息未查询到!!!", response.getStatusCode(),platformRegisterInfo.getPlatformId())); - return; - } + String action = subscribe.getSipTransactionInfo().getExpires() > 0 ? "注册" : "注销"; + String platFormServerGbId = subscribe.getSipTransactionInfo().getUser(); - String action = platformRegisterInfo.isRegister() ? "注册" : "注销"; - log.info(String.format("[国标级联]%s %S响应,%s ", action, response.getStatusCode(), platformRegisterInfo.getPlatformId() )); - Platform parentPlatform = parentPlatformCatch.getPlatform(); - if (parentPlatform == null) { - log.warn(String.format("[国标级联]收到 %s %s的%S请求, 但是平台信息未查询到!!!", platformRegisterInfo.getPlatformId(), action, response.getStatusCode())); + log.info("[国标级联]{} {}响应 {} ", action, response.getStatusCode(), platFormServerGbId); + Platform platform = platformService.queryPlatformByServerGBId(platFormServerGbId); + if (platform == null) { + log.warn("[国标级联]收到 来自{}的 {} 回复 {}, 但是平台信息未查询到!!!", platFormServerGbId, action, response.getStatusCode()); return; } @@ -83,20 +82,17 @@ public class RegisterResponseProcessor extends SIPResponseProcessorAbstract { WWWAuthenticateHeader www = (WWWAuthenticateHeader)response.getHeader(WWWAuthenticateHeader.NAME); SipTransactionInfo sipTransactionInfo = new SipTransactionInfo(response); try { - sipCommanderForPlatform.register(parentPlatform, sipTransactionInfo, www, null, null, platformRegisterInfo.isRegister()); + sipCommanderForPlatform.register(platform, sipTransactionInfo, www, null, null, subscribe.getSipTransactionInfo().getExpires() > 0); } catch (SipException | InvalidArgumentException | ParseException e) { log.error("[命令发送失败] 国标级联 再次注册: {}", e.getMessage()); } }else if (response.getStatusCode() == Response.OK){ - if (platformRegisterInfo.isRegister()) { + if (subscribe.getSipTransactionInfo().getExpires() > 0) { SipTransactionInfo sipTransactionInfo = new SipTransactionInfo(response); - platformService.online(parentPlatform, sipTransactionInfo); + platformService.online(platform, sipTransactionInfo); }else { - platformService.offline(parentPlatform, true); + platformService.offline(platform); } - - // 注册/注销成功移除缓存的信息 - redisCatchStorage.delPlatformRegisterInfo(callId); } } diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/service/RedisRpcServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/service/RedisRpcServiceImpl.java index b2d65963c..531ef6aaf 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/service/RedisRpcServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/service/RedisRpcServiceImpl.java @@ -231,6 +231,9 @@ public class RedisRpcServiceImpl implements IRedisRpcService { RedisRpcRequest request = buildRequest("platform/update", platform); request.setToId(serverId); RedisRpcResponse response = redisRpcConfig.request(request, 40, TimeUnit.MILLISECONDS); + if(response == null) { + return false; + } return Boolean.parseBoolean(response.getBody().toString()); } diff --git a/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java b/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java index 5a91f388c..d409acbfb 100755 --- a/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java @@ -9,7 +9,6 @@ import com.genersoft.iot.vmp.media.bean.MediaServer; import com.genersoft.iot.vmp.media.zlm.dto.StreamAuthorityInfo; import com.genersoft.iot.vmp.service.bean.GPSMsgInfo; import com.genersoft.iot.vmp.service.bean.MessageForPushChannel; -import com.genersoft.iot.vmp.storager.dao.dto.PlatformRegisterInfo; import java.util.List; import java.util.Map; @@ -23,16 +22,6 @@ public interface IRedisCatchStorage { */ Long getCSEQ(); - PlatformCatch queryPlatformCatchInfo(String platformGbId); - - void delPlatformCatchInfo(String platformGbId); - - void updatePlatformRegisterInfo(String callId, PlatformRegisterInfo platformRegisterInfo); - - PlatformRegisterInfo queryPlatformRegisterInfo(String callId); - - void delPlatformRegisterInfo(String callId); - /** * 在redis添加wvp的信息 */ diff --git a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java index d86c7ad32..25dcf4b76 100755 --- a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java @@ -15,7 +15,6 @@ import com.genersoft.iot.vmp.media.zlm.dto.StreamAuthorityInfo; import com.genersoft.iot.vmp.service.bean.GPSMsgInfo; import com.genersoft.iot.vmp.service.bean.MessageForPushChannel; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; -import com.genersoft.iot.vmp.storager.dao.dto.PlatformRegisterInfo; import com.genersoft.iot.vmp.utils.DateUtil; import com.genersoft.iot.vmp.utils.JsonUtil; import com.genersoft.iot.vmp.utils.SystemInfoUtils; @@ -73,35 +72,6 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage { redisTemplate.opsForValue().set(key, 1); } - @Override - public PlatformCatch queryPlatformCatchInfo(String platformGbId) { - return (PlatformCatch)redisTemplate.opsForValue().get(VideoManagerConstants.PLATFORM_CATCH_PREFIX + userSetting.getServerId() + "_" + platformGbId); - } - - @Override - public void delPlatformCatchInfo(String platformGbId) { - redisTemplate.delete(VideoManagerConstants.PLATFORM_CATCH_PREFIX + userSetting.getServerId() + "_" + platformGbId); - } - - @Override - public void updatePlatformRegisterInfo(String callId, PlatformRegisterInfo platformRegisterInfo) { - String key = VideoManagerConstants.PLATFORM_REGISTER_INFO_PREFIX + userSetting.getServerId() + "_" + callId; - Duration duration = Duration.ofSeconds(30L); - redisTemplate.opsForValue().set(key, platformRegisterInfo, duration); - } - - - @Override - public PlatformRegisterInfo queryPlatformRegisterInfo(String callId) { - return (PlatformRegisterInfo)redisTemplate.opsForValue().get(VideoManagerConstants.PLATFORM_REGISTER_INFO_PREFIX + userSetting.getServerId() + "_" + callId); - } - - @Override - public void delPlatformRegisterInfo(String callId) { - redisTemplate.delete(VideoManagerConstants.PLATFORM_REGISTER_INFO_PREFIX + userSetting.getServerId() + "_" + callId); - } - - @Override public void updateWVPInfo(ServerInfo serverInfo, int time) {