From a40446e42be2cb6fd988fc2f7a8de9e3a78f3057 Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: Tue, 27 May 2025 23:19:10 +0800 Subject: [PATCH] =?UTF-8?q?=E4=B8=B4=E6=97=B6=E6=8F=90=E4=BA=A4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../service/impl/DeviceServiceImpl.java | 45 ++++++++++++++++++- .../gb28181/service/impl/PlayServiceImpl.java | 8 ++-- .../task/deviceStatus/DeviceStatusTask.java | 25 +++++++++-- .../deviceStatus/DeviceStatusTaskRunner.java | 10 ++--- .../cmd/KeepaliveNotifyMessageHandler.java | 2 - .../iot/vmp/service/bean/InviteErrorCode.java | 3 +- web/src/styles/variables.scss | 4 +- 7 files changed, 76 insertions(+), 21 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceServiceImpl.java index 47dbfc931..6ef295c7e 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceServiceImpl.java @@ -15,6 +15,9 @@ import com.genersoft.iot.vmp.gb28181.service.IDeviceService; import com.genersoft.iot.vmp.gb28181.service.IInviteStreamService; import com.genersoft.iot.vmp.gb28181.session.AudioBroadcastManager; import com.genersoft.iot.vmp.gb28181.session.SipInviteSessionManager; +import com.genersoft.iot.vmp.gb28181.task.deviceStatus.DeviceStatusTask; +import com.genersoft.iot.vmp.gb28181.task.deviceStatus.DeviceStatusTaskInfo; +import com.genersoft.iot.vmp.gb28181.task.deviceStatus.DeviceStatusTaskRunner; import com.genersoft.iot.vmp.gb28181.task.deviceSubscribe.SubscribeTask; import com.genersoft.iot.vmp.gb28181.task.deviceSubscribe.SubscribeTaskInfo; import com.genersoft.iot.vmp.gb28181.task.deviceSubscribe.SubscribeTaskRunner; @@ -50,6 +53,7 @@ import javax.sip.SipException; import javax.validation.constraints.NotNull; import java.text.ParseException; import java.time.Instant; +import java.util.ArrayList; import java.util.List; import java.util.Objects; import java.util.concurrent.SynchronousQueue; @@ -111,13 +115,45 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner { @Autowired private SubscribeTaskRunner subscribeTaskRunner; + @Autowired + private DeviceStatusTaskRunner deviceStatusTaskRunner; + private Device getDeviceByDeviceIdFromDb(String deviceId) { return deviceMapper.getDeviceByDeviceId(deviceId); } @Override public void run(String... args) throws Exception { - // TODO 处理设备离线 + List allTaskInfo = deviceStatusTaskRunner.getAllTaskInfo(); + List onlineDeviceIds = new ArrayList<>(); + if (!allTaskInfo.isEmpty()) { + for (DeviceStatusTaskInfo taskInfo : allTaskInfo) { + Device device = getDeviceByDeviceId(taskInfo.getDeviceId()); + if (device == null) { + deviceStatusTaskRunner.removeTask(taskInfo.getDeviceId()); + continue; + } + // 恢复定时任务, TCP因为连接已经断开必须等待设备重新连接 + DeviceStatusTask deviceStatusTask = DeviceStatusTask.getInstance(taskInfo.getDeviceId(), + taskInfo.getTransactionInfo(), taskInfo.getExpireTime(), this::deviceStatusExpire); + deviceStatusTaskRunner.addTask(deviceStatusTask); + onlineDeviceIds.add(taskInfo.getDeviceId()); + } + // 除了记录的设备以外, 其他设备全部离线 + List onlineDevice = getAllOnlineDevice(userSetting.getServerId()); + if (!onlineDevice.isEmpty()) { + for (Device device : onlineDevice) { + if (!onlineDeviceIds.contains(device.getDeviceId())) { + // 此设备需要离线 + // 清理订阅 + // 更新数据库 + + } + } + } + + } + // 处理订阅任务 List taskInfoList = subscribeTaskRunner.getAllTaskInfo(); @@ -127,7 +163,7 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner { continue; } Device device = getDeviceByDeviceId(taskInfo.getDeviceId()); - if (device == null || !device.isOnLine()) { + if (device == null || !device.isOnLine() || !onlineDeviceIds.contains(taskInfo.getDeviceId())) { subscribeTaskRunner.removeSubscribe(taskInfo.getKey()); continue; } @@ -148,6 +184,11 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner { } } + private void deviceStatusExpire(String deviceId, SipTransactionInfo transactionInfo) { + log.info("[设备状态] 到期, 编号: {}", deviceId); + offline(deviceId, ""); + } + @Override public void online(Device device, SipTransactionInfo sipTransactionInfo) { log.info("[设备上线] deviceId:{}->{}:{}", device.getDeviceId(), device.getIp(), device.getPort()); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlayServiceImpl.java index fb56c689b..f959bfc5c 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlayServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlayServiceImpl.java @@ -654,11 +654,11 @@ public class PlayServiceImpl implements IPlayService { // 主动连接失败,结束流程, 清理数据 receiveRtpServerService.closeRTPServer(mediaServerItem, ssrcInfo); sessionManager.removeByStream(ssrcInfo.getApp(), ssrcInfo.getStream()); - callback.run(InviteErrorCode.ERROR_FOR_SDP_PARSING_EXCEPTIONS.getCode(), - InviteErrorCode.ERROR_FOR_SDP_PARSING_EXCEPTIONS.getMsg(), null); + callback.run(InviteErrorCode.ERROR_FOR_TCP_ACTIVE_CONNECTION_REFUSED_ERROR.getCode(), + InviteErrorCode.ERROR_FOR_TCP_ACTIVE_CONNECTION_REFUSED_ERROR.getMsg(), null); inviteStreamService.call(InviteSessionType.BROADCAST, channel.getId(), null, - InviteErrorCode.ERROR_FOR_SDP_PARSING_EXCEPTIONS.getCode(), - InviteErrorCode.ERROR_FOR_SDP_PARSING_EXCEPTIONS.getMsg(), null); + InviteErrorCode.ERROR_FOR_TCP_ACTIVE_CONNECTION_REFUSED_ERROR.getCode(), + InviteErrorCode.ERROR_FOR_TCP_ACTIVE_CONNECTION_REFUSED_ERROR.getMsg(), null); } } catch (SdpException e) { log.error("[TCP主动连接对方] deviceId: {}, channelId: {}, 解析200OK的SDP信息失败", device.getDeviceId(), channel.getDeviceId(), e); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/task/deviceStatus/DeviceStatusTask.java b/src/main/java/com/genersoft/iot/vmp/gb28181/task/deviceStatus/DeviceStatusTask.java index f08150899..0844cbe5d 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/task/deviceStatus/DeviceStatusTask.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/task/deviceStatus/DeviceStatusTask.java @@ -3,18 +3,18 @@ package com.genersoft.iot.vmp.gb28181.task.deviceStatus; import com.genersoft.iot.vmp.common.DeviceStatusCallback; import com.genersoft.iot.vmp.gb28181.bean.SipTransactionInfo; import lombok.Data; +import lombok.extern.slf4j.Slf4j; import org.jetbrains.annotations.NotNull; import java.util.concurrent.Delayed; import java.util.concurrent.TimeUnit; +@Slf4j @Data -public abstract class DeviceStatusTask implements Delayed { +public class DeviceStatusTask implements Delayed { private String deviceId; - private DeviceStatusCallback callback; - private SipTransactionInfo transactionInfo; /** @@ -22,7 +22,24 @@ public abstract class DeviceStatusTask implements Delayed { */ private long delayTime; - public abstract void expired(); + private DeviceStatusCallback callback; + + public static DeviceStatusTask getInstance(String deviceId, SipTransactionInfo transactionInfo, long delayTime, DeviceStatusCallback callback) { + DeviceStatusTask deviceStatusTask = new DeviceStatusTask(); + deviceStatusTask.setDeviceId(deviceId); + deviceStatusTask.setTransactionInfo(transactionInfo); + deviceStatusTask.setDelayTime((delayTime * 1000L - 500L) + System.currentTimeMillis()); + deviceStatusTask.setCallback(callback); + return deviceStatusTask; + } + + public void expired() { + if (callback == null) { + log.info("[设备离线] 未找到过期处理回调, {}", deviceId); + return; + } + callback.run(deviceId, transactionInfo); + } @Override public long getDelay(@NotNull TimeUnit unit) { diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/task/deviceStatus/DeviceStatusTaskRunner.java b/src/main/java/com/genersoft/iot/vmp/gb28181/task/deviceStatus/DeviceStatusTaskRunner.java index 604383a58..a179879a2 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/task/deviceStatus/DeviceStatusTaskRunner.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/task/deviceStatus/DeviceStatusTaskRunner.java @@ -2,8 +2,6 @@ package com.genersoft.iot.vmp.gb28181.task.deviceStatus; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.gb28181.bean.SipTransactionInfo; -import com.genersoft.iot.vmp.gb28181.task.deviceSubscribe.SubscribeTask; -import com.genersoft.iot.vmp.gb28181.task.deviceSubscribe.SubscribeTaskInfo; import com.genersoft.iot.vmp.utils.redis.RedisUtil; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; @@ -43,7 +41,7 @@ public class DeviceStatusTaskRunner { try { take = delayQueue.take(); try { - removeSubscribe(take.getDeviceId()); + removeTask(take.getDeviceId()); take.expired(); }catch (Exception e) { log.error("[设备状态到期] 到期处理时出现异常, 设备编号: {} ", take.getDeviceId()); @@ -54,7 +52,7 @@ public class DeviceStatusTaskRunner { } } - public void addSubscribe(DeviceStatusTask task) { + public void addTask(DeviceStatusTask task) { Duration duration = Duration.ofSeconds((task.getDelayTime() - System.currentTimeMillis())/1000); if (duration.getSeconds() < 0) { return; @@ -65,7 +63,7 @@ public class DeviceStatusTaskRunner { delayQueue.offer(task); } - public boolean removeSubscribe(String key) { + public boolean removeTask(String key) { DeviceStatusTask task = subscribes.get(key); if (task == null) { return false; @@ -95,7 +93,7 @@ public class DeviceStatusTaskRunner { if (task == null) { return false; } - log.info("[更新状态任务时间] {}, 编号: {}", task.getName(), key); + log.info("[更新状态任务时间] 编号: {}", key); if (delayQueue.contains(task)) { boolean remove = delayQueue.remove(task); if (!remove) { diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/KeepaliveNotifyMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/KeepaliveNotifyMessageHandler.java index cdaea441d..7b9b68ae5 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/KeepaliveNotifyMessageHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/KeepaliveNotifyMessageHandler.java @@ -114,8 +114,6 @@ public class KeepaliveNotifyMessageHandler extends SIPRequestProcessorParent imp } else { if (userSetting.getGbDeviceOnline() == 1) { // 对于已经离线的设备判断他的注册是否已经过期 - device.setOnLine(true); - device.setRegisterTime(DateUtil.getNow()); deviceService.online(device, null); } } diff --git a/src/main/java/com/genersoft/iot/vmp/service/bean/InviteErrorCode.java b/src/main/java/com/genersoft/iot/vmp/service/bean/InviteErrorCode.java index d43792e79..faf8323f8 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/bean/InviteErrorCode.java +++ b/src/main/java/com/genersoft/iot/vmp/service/bean/InviteErrorCode.java @@ -17,7 +17,8 @@ public enum InviteErrorCode { ERROR_FOR_RESET_SSRC(-9, "重新设置收流信息失败"), ERROR_FOR_SIP_SENDING_FAILED(-10, "命令发送失败"), ERROR_FOR_ASSIST_NOT_READY(-11, "没有可用的assist服务"), - ERROR_FOR_PARAMETER_ERROR(-13, "参数异常"); + ERROR_FOR_PARAMETER_ERROR(-13, "参数异常"), + ERROR_FOR_TCP_ACTIVE_CONNECTION_REFUSED_ERROR(-14, "TCP主动连接失败"); private final int code; private final String msg; diff --git a/web/src/styles/variables.scss b/web/src/styles/variables.scss index be5577263..40eafa2f0 100644 --- a/web/src/styles/variables.scss +++ b/web/src/styles/variables.scss @@ -1,9 +1,9 @@ // sidebar $menuText:#bfcbd9; $menuActiveText:#409EFF; -$subMenuActiveText:#f4f4f5; //https://github.com/ElemeFE/element/issues/12951 +$subMenuActiveText: #f7ff17; //https://github.com/ElemeFE/element/issues/12951 -$menuBg:#304156; +$menuBg: #1f61cd; $menuHover:#263445; $subMenuBg:#1f2d3d;