diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeHolder.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeHolder.java index 85004db3a..58e0b81c5 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeHolder.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeHolder.java @@ -29,7 +29,6 @@ public class SubscribeHolder { private final String prefix = "VMP_SUBSCRIBE_OVERDUE"; - public void putCatalogSubscribe(String platformId, SubscribeInfo subscribeInfo) { log.info("[国标级联] 添加目录订阅,平台: {}, 有效期: {}", platformId, subscribeInfo.getExpires()); @@ -53,7 +52,7 @@ public class SubscribeHolder { } public void putMobilePositionSubscribe(String platformId, SubscribeInfo subscribeInfo, Runnable gpsTask) { - log.info("[国标级联] 添加移动位置订阅,平台: {}, 有效期: {}", platformId, subscribeInfo.getExpires()); + log.info("[国标级联] 添加移动位置订阅,平台: {}, 有效期: {}s", platformId, subscribeInfo.getExpires()); String key = String.format("%s_%s_%s_%s", prefix, userSetting.getServerId(), "mobilePosition", platformId); if (subscribeInfo.getExpires() > 0) { Duration duration = Duration.ofSeconds(subscribeInfo.getExpires()); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceServiceImpl.java index 2b5bdacf5..a7035eba2 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceServiceImpl.java @@ -15,11 +15,11 @@ import com.genersoft.iot.vmp.gb28181.service.IDeviceService; import com.genersoft.iot.vmp.gb28181.service.IInviteStreamService; import com.genersoft.iot.vmp.gb28181.session.AudioBroadcastManager; import com.genersoft.iot.vmp.gb28181.session.SipInviteSessionManager; -import com.genersoft.iot.vmp.gb28181.task.SubscribeTask; -import com.genersoft.iot.vmp.gb28181.task.SubscribeTaskInfo; -import com.genersoft.iot.vmp.gb28181.task.SubscribeTaskRunner; -import com.genersoft.iot.vmp.gb28181.task.impl.SubscribeTaskForCatalog; -import com.genersoft.iot.vmp.gb28181.task.impl.SubscribeTaskForMobilPosition; +import com.genersoft.iot.vmp.gb28181.task.deviceSubscribe.SubscribeTask; +import com.genersoft.iot.vmp.gb28181.task.deviceSubscribe.SubscribeTaskInfo; +import com.genersoft.iot.vmp.gb28181.task.deviceSubscribe.SubscribeTaskRunner; +import com.genersoft.iot.vmp.gb28181.task.deviceSubscribe.impl.SubscribeTaskForCatalog; +import com.genersoft.iot.vmp.gb28181.task.deviceSubscribe.impl.SubscribeTaskForMobilPosition; import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander; import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.response.cmd.CatalogResponseMessageHandler; import com.genersoft.iot.vmp.media.bean.MediaServer; diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/task/SubscribeTask.java b/src/main/java/com/genersoft/iot/vmp/gb28181/task/deviceSubscribe/SubscribeTask.java similarity index 95% rename from src/main/java/com/genersoft/iot/vmp/gb28181/task/SubscribeTask.java rename to src/main/java/com/genersoft/iot/vmp/gb28181/task/deviceSubscribe/SubscribeTask.java index 6af18a451..e33ac3813 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/task/SubscribeTask.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/task/deviceSubscribe/SubscribeTask.java @@ -1,4 +1,4 @@ -package com.genersoft.iot.vmp.gb28181.task; +package com.genersoft.iot.vmp.gb28181.task.deviceSubscribe; import com.genersoft.iot.vmp.common.SubscribeCallback; import com.genersoft.iot.vmp.gb28181.bean.SipTransactionInfo; diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/task/SubscribeTaskInfo.java b/src/main/java/com/genersoft/iot/vmp/gb28181/task/deviceSubscribe/SubscribeTaskInfo.java similarity index 84% rename from src/main/java/com/genersoft/iot/vmp/gb28181/task/SubscribeTaskInfo.java rename to src/main/java/com/genersoft/iot/vmp/gb28181/task/deviceSubscribe/SubscribeTaskInfo.java index 8b93e8528..3a8741130 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/task/SubscribeTaskInfo.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/task/deviceSubscribe/SubscribeTaskInfo.java @@ -1,4 +1,4 @@ -package com.genersoft.iot.vmp.gb28181.task; +package com.genersoft.iot.vmp.gb28181.task.deviceSubscribe; import com.genersoft.iot.vmp.gb28181.bean.SipTransactionInfo; import lombok.Data; diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/task/SubscribeTaskRunner.java b/src/main/java/com/genersoft/iot/vmp/gb28181/task/deviceSubscribe/SubscribeTaskRunner.java similarity index 98% rename from src/main/java/com/genersoft/iot/vmp/gb28181/task/SubscribeTaskRunner.java rename to src/main/java/com/genersoft/iot/vmp/gb28181/task/deviceSubscribe/SubscribeTaskRunner.java index bd53dc49b..7019121d3 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/task/SubscribeTaskRunner.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/task/deviceSubscribe/SubscribeTaskRunner.java @@ -1,4 +1,4 @@ -package com.genersoft.iot.vmp.gb28181.task; +package com.genersoft.iot.vmp.gb28181.task.deviceSubscribe; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.gb28181.bean.SipTransactionInfo; diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/SubscribeTaskForCatalog.java b/src/main/java/com/genersoft/iot/vmp/gb28181/task/deviceSubscribe/impl/SubscribeTaskForCatalog.java similarity index 92% rename from src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/SubscribeTaskForCatalog.java rename to src/main/java/com/genersoft/iot/vmp/gb28181/task/deviceSubscribe/impl/SubscribeTaskForCatalog.java index a0f1b2b1c..8ca7b390e 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/SubscribeTaskForCatalog.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/task/deviceSubscribe/impl/SubscribeTaskForCatalog.java @@ -1,9 +1,9 @@ -package com.genersoft.iot.vmp.gb28181.task.impl; +package com.genersoft.iot.vmp.gb28181.task.deviceSubscribe.impl; import com.genersoft.iot.vmp.common.SubscribeCallback; import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.bean.SipTransactionInfo; -import com.genersoft.iot.vmp.gb28181.task.SubscribeTask; +import com.genersoft.iot.vmp.gb28181.task.deviceSubscribe.SubscribeTask; import lombok.extern.slf4j.Slf4j; @Slf4j diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/SubscribeTaskForMobilPosition.java b/src/main/java/com/genersoft/iot/vmp/gb28181/task/deviceSubscribe/impl/SubscribeTaskForMobilPosition.java similarity index 92% rename from src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/SubscribeTaskForMobilPosition.java rename to src/main/java/com/genersoft/iot/vmp/gb28181/task/deviceSubscribe/impl/SubscribeTaskForMobilPosition.java index 26a3afc53..5ff3c6198 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/SubscribeTaskForMobilPosition.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/task/deviceSubscribe/impl/SubscribeTaskForMobilPosition.java @@ -1,9 +1,9 @@ -package com.genersoft.iot.vmp.gb28181.task.impl; +package com.genersoft.iot.vmp.gb28181.task.deviceSubscribe.impl; import com.genersoft.iot.vmp.common.SubscribeCallback; import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.bean.SipTransactionInfo; -import com.genersoft.iot.vmp.gb28181.task.SubscribeTask; +import com.genersoft.iot.vmp.gb28181.task.deviceSubscribe.SubscribeTask; import lombok.extern.slf4j.Slf4j; @Slf4j diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/task/platformStatus/PlatformKeepaliveTask.java b/src/main/java/com/genersoft/iot/vmp/gb28181/task/platformStatus/PlatformKeepaliveTask.java new file mode 100644 index 000000000..9d58db640 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/task/platformStatus/PlatformKeepaliveTask.java @@ -0,0 +1,57 @@ +package com.genersoft.iot.vmp.gb28181.task.platformStatus; + +import com.genersoft.iot.vmp.common.CommonCallback; +import lombok.Getter; +import lombok.Setter; +import lombok.extern.slf4j.Slf4j; +import org.jetbrains.annotations.NotNull; + +import java.util.concurrent.Delayed; +import java.util.concurrent.TimeUnit; + +/** + * 平台心跳任务 + */ +@Slf4j +public class PlatformKeepaliveTask implements Delayed { + + @Getter + private String platformServerId; + + /** + * 超时时间(单位: 毫秒) + */ + @Getter + @Setter + private long delayTime; + + /** + * 到期回调 + */ + @Getter + private CommonCallback callback; + + public PlatformKeepaliveTask(String platformServerId, long delayTime, CommonCallback callback) { + this.platformServerId = platformServerId; + this.delayTime = delayTime; + this.callback = callback; + } + + public void expired() { + if (callback == null) { + log.info("[平台心跳到期] 未找到到期处理回调, 平台上级编号: {}", platformServerId); + return; + } + getCallback().run(platformServerId); + } + + @Override + public long getDelay(@NotNull TimeUnit unit) { + return unit.convert(delayTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS); + } + + @Override + public int compareTo(@NotNull Delayed o) { + return (int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS)); + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/task/platformStatus/PlatformRegisterTask.java b/src/main/java/com/genersoft/iot/vmp/gb28181/task/platformStatus/PlatformRegisterTask.java new file mode 100644 index 000000000..690c3505f --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/task/platformStatus/PlatformRegisterTask.java @@ -0,0 +1,77 @@ +package com.genersoft.iot.vmp.gb28181.task.platformStatus; + +import com.genersoft.iot.vmp.common.CommonCallback; +import com.genersoft.iot.vmp.gb28181.bean.SipTransactionInfo; +import lombok.Getter; +import lombok.Setter; +import lombok.extern.slf4j.Slf4j; +import org.jetbrains.annotations.NotNull; + +import java.util.concurrent.Delayed; +import java.util.concurrent.TimeUnit; + +/** + * 平台注册任务 + */ +@Slf4j +public class PlatformRegisterTask implements Delayed { + + @Getter + private String platformServerId; + + /** + * 超时时间(单位: 毫秒) + */ + @Getter + @Setter + private long delayTime; + + @Getter + private SipTransactionInfo sipTransactionInfo; + + /** + * 过期时间 + */ + @Getter + @Setter + private long expireTime; + + /** + * 到期回调 + */ + @Getter + private CommonCallback callback; + + + public PlatformRegisterTask(String platformServerId, long delayTime, SipTransactionInfo sipTransactionInfo, CommonCallback callback) { + this.platformServerId = platformServerId; + this.delayTime = delayTime; + this.callback = callback; + this.sipTransactionInfo = sipTransactionInfo; + } + + public void expired() { + if (callback == null) { + log.info("[平台注册到期] 未找到到期处理回调, 平台上级编号: {}", platformServerId); + return; + } + getCallback().run(platformServerId); + } + + @Override + public long getDelay(@NotNull TimeUnit unit) { + return unit.convert(delayTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS); + } + + @Override + public int compareTo(@NotNull Delayed o) { + return (int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS)); + } + + public PlatformRegisterTaskInfo getInfo() { + PlatformRegisterTaskInfo taskInfo = new PlatformRegisterTaskInfo(); + taskInfo.setPlatformServerId(platformServerId); + taskInfo.setSipTransactionInfo(sipTransactionInfo); + return taskInfo; + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/task/platformStatus/PlatformRegisterTaskInfo.java b/src/main/java/com/genersoft/iot/vmp/gb28181/task/platformStatus/PlatformRegisterTaskInfo.java new file mode 100644 index 000000000..c6ba92b87 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/task/platformStatus/PlatformRegisterTaskInfo.java @@ -0,0 +1,25 @@ +package com.genersoft.iot.vmp.gb28181.task.platformStatus; + +import com.genersoft.iot.vmp.common.CommonCallback; +import com.genersoft.iot.vmp.gb28181.bean.SipTransactionInfo; +import lombok.Data; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; +import org.jetbrains.annotations.NotNull; + +import java.util.concurrent.Delayed; +import java.util.concurrent.TimeUnit; + +/** + * 平台注册任务可序列化的信息 + */ +@Slf4j +@Data +public class PlatformRegisterTaskInfo{ + + private String platformServerId; + + private SipTransactionInfo sipTransactionInfo; + + +} diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/task/platformStatus/PlatformStatusTaskRunner.java b/src/main/java/com/genersoft/iot/vmp/gb28181/task/platformStatus/PlatformStatusTaskRunner.java new file mode 100644 index 000000000..9c295b4cc --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/task/platformStatus/PlatformStatusTaskRunner.java @@ -0,0 +1,203 @@ +package com.genersoft.iot.vmp.gb28181.task.platformStatus; + +import com.genersoft.iot.vmp.conf.UserSetting; +import com.genersoft.iot.vmp.gb28181.bean.SipTransactionInfo; +import com.genersoft.iot.vmp.gb28181.task.deviceSubscribe.SubscribeTask; +import com.genersoft.iot.vmp.gb28181.task.deviceSubscribe.SubscribeTaskInfo; +import com.genersoft.iot.vmp.utils.redis.RedisUtil; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Component; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.DelayQueue; +import java.util.concurrent.TimeUnit; + +@Slf4j +@Component +public class PlatformStatusTaskRunner { + + private final Map registerSubscribes = new ConcurrentHashMap<>(); + + private final DelayQueue registerDelayQueue = new DelayQueue<>(); + + private final Map keepaliveSubscribes = new ConcurrentHashMap<>(); + + private final DelayQueue keepaliveTaskDelayQueue = new DelayQueue<>(); + + @Autowired + private RedisTemplate redisTemplate; + + @Autowired + private UserSetting userSetting; + + private final String prefix = "VMP_PLATFORM_STATUS"; + + // 订阅过期检查 + @Scheduled(fixedDelay = 500, timeUnit = TimeUnit.MILLISECONDS) + public void expirationCheckForRegister(){ + while (!registerDelayQueue.isEmpty()) { + PlatformRegisterTask take = null; + try { + take = registerDelayQueue.take(); + try { + removeRegisterTask(take.getPlatformServerId()); + take.expired(); + }catch (Exception e) { + log.error("[平台注册到期] 到期处理时出现异常, 平台上级编号: {} ", take.getPlatformServerId()); + } + } catch (InterruptedException e) { + log.error("[平台注册到期] ", e); + } + } + } + @Scheduled(fixedDelay = 500, timeUnit = TimeUnit.MILLISECONDS) + public void expirationCheckForKeepalive(){ + while (!keepaliveTaskDelayQueue.isEmpty()) { + PlatformKeepaliveTask take = null; + try { + take = keepaliveTaskDelayQueue.take(); + try { + removeKeepAliveTask(take.getPlatformServerId()); + take.expired(); + }catch (Exception e) { + log.error("[平台心跳到期] 到期处理时出现异常, 平台上级编号: {} ", take.getPlatformServerId()); + } + } catch (InterruptedException e) { + log.error("[平台心跳到期] ", e); + } + } + } + + public void addRegisterTask(PlatformRegisterTask task) { + Duration duration = Duration.ofSeconds((task.getDelayTime() - System.currentTimeMillis())/1000); + if (duration.getSeconds() < 0) { + return; + } + registerSubscribes.put(task.getPlatformServerId(), task); + String key = String.format("%s_%s_%s", prefix, userSetting.getServerId(), task.getPlatformServerId()); + redisTemplate.opsForValue().set(key, task.getInfo(), duration); + registerDelayQueue.offer(task); + } + + public boolean removeRegisterTask(String platformServerId) { + PlatformRegisterTask task = registerSubscribes.get(platformServerId); + if (task == null) { + return false; + } + String redisKey = String.format("%s_%s_%s", prefix, userSetting.getServerId(), platformServerId); + redisTemplate.delete(redisKey); + registerSubscribes.remove(platformServerId); + if (registerDelayQueue.contains(task)) { + boolean remove = registerDelayQueue.remove(task); + if (!remove) { + log.info("[移除平台注册任务] 从延时队列内移除失败: {}", platformServerId); + } + } + return true; + } + + public SipTransactionInfo getRegisterTransactionInfo(String platformServerId) { + PlatformRegisterTask task = registerSubscribes.get(platformServerId); + if (task == null) { + return null; + } + return task.getSipTransactionInfo(); + } + + public boolean updateRegisterDelay(String platformServerId, long expirationTime) { + PlatformRegisterTask task = registerSubscribes.get(platformServerId); + if (task == null) { + return false; + } + log.info("[更新平台注册任务时间] 平台上级编号: {}", platformServerId); + if (registerDelayQueue.contains(task)) { + boolean remove = registerDelayQueue.remove(task); + if (!remove) { + log.info("[更新平台注册任务时间] 从延时队列内移除失败: {}", platformServerId); + } + } + task.setDelayTime(expirationTime); + registerDelayQueue.offer(task); + String redisKey = String.format("%s_%s_%s", prefix, userSetting.getServerId(), platformServerId); + Duration duration = Duration.ofSeconds((expirationTime - System.currentTimeMillis())/1000); + redisTemplate.expire(redisKey, duration); + return true; + } + + public boolean containsRegister(String platformServerId) { + return registerSubscribes.containsKey(platformServerId); + } + + public List getAllRegisterTaskInfo(){ + String scanKey = String.format("%s_%s_*", prefix, userSetting.getServerId()); + List values = RedisUtil.scan(redisTemplate, scanKey); + if (values.isEmpty()) { + return new ArrayList<>(); + } + List result = new ArrayList<>(); + for (Object value : values) { + String redisKey = (String)value; + PlatformRegisterTask taskInfo = (PlatformRegisterTask)redisTemplate.opsForValue().get(redisKey); + if (taskInfo == null) { + continue; + } + Long expire = redisTemplate.getExpire(redisKey); + taskInfo.setExpireTime(expire); + result.add(taskInfo); + } + return result; + + } + + public void addKeepAliveTask(PlatformKeepaliveTask task) { + Duration duration = Duration.ofSeconds((task.getDelayTime() - System.currentTimeMillis())/1000); + if (duration.getSeconds() < 0) { + return; + } + keepaliveSubscribes.put(task.getPlatformServerId(), task); + keepaliveTaskDelayQueue.offer(task); + } + + public boolean removeKeepAliveTask(String platformServerId) { + PlatformKeepaliveTask task = keepaliveSubscribes.get(platformServerId); + if (task == null) { + return false; + } + keepaliveSubscribes.remove(platformServerId); + if (keepaliveTaskDelayQueue.contains(task)) { + boolean remove = keepaliveTaskDelayQueue.remove(task); + if (!remove) { + log.info("[移除平台心跳任务] 从延时队列内移除失败: {}", platformServerId); + } + } + return true; + } + + public boolean updateKeepAliveDelay(String platformServerId, long expirationTime) { + PlatformKeepaliveTask task = keepaliveSubscribes.get(platformServerId); + if (task == null) { + return false; + } + log.info("[更新平台心跳任务时间] 平台上级编号: {}", platformServerId); + if (keepaliveTaskDelayQueue.contains(task)) { + boolean remove = keepaliveTaskDelayQueue.remove(task); + if (!remove) { + log.info("[更新平台心跳任务时间] 从延时队列内移除失败: {}", platformServerId); + } + } + task.setDelayTime(expirationTime); + keepaliveTaskDelayQueue.offer(task); + return true; + } + + public boolean containsKeepAlive(String platformServerId) { + return keepaliveSubscribes.containsKey(platformServerId); + } +}