diff --git a/README.md b/README.md index e44a673c7..6e2348b26 100644 --- a/README.md +++ b/README.md @@ -16,11 +16,11 @@ WEB VIDEO PLATFORM是一个基于GB28181-2016标准实现的开箱即用的网 前端页面基于vue-admin-template构建 [https://github.com/PanJiaChen/vue-admin-template?tab=readme-ov-file](https://github.com/PanJiaChen/vue-admin-template?tab=readme-ov-file) # 应用场景: -支持浏览器无插件播放摄像头视频。 -支持国标设备(摄像机、平台、NVR等)设备接入 -支持rtsp, rtmp,直播设备设备接入,充分利旧。 -支持国标级联。多平台级联。跨网视频预览。 -支持跨网网闸平台互联。 +- 支持浏览器无插件播放摄像头视频。 +- 支持国标设备(摄像机、平台、NVR等)设备接入 +- 支持rtsp, rtmp,直播设备设备接入,充分利旧。 +- 支持国标级联。多平台级联。跨网视频预览。 +- 支持跨网网闸平台互联。 # 文档 @@ -132,6 +132,7 @@ https://gitee.com/pan648540858/wvp-GB28181-pro.git - [X] 支持ONVIF协议,设备检索,支持点播,云台控制,国标级联点播,自动点播等。 - [X] 支持部标1078+808协议,支持点播,云台控制,录像回放,位置上报,自动点播等。 - [X] 支持国标28181-2022协议,支持巡航轨迹查询,PTZ精准控制,存储卡格式化,设备软件升级,OSD配置,h265+aac,支持辅码流,录像倒放等。 +- [X] 支持国网B接口协议。支持注册,获取资源,预览等 # 授权协议 diff --git a/pom.xml b/pom.xml index dc929b5d2..2ba98012b 100644 --- a/pom.xml +++ b/pom.xml @@ -11,7 +11,7 @@ com.genersoft wvp-pro - 2.7.3 + 2.7.4 web video platform 国标28181视频平台 ${project.packaging} diff --git a/src/main/java/com/genersoft/iot/vmp/common/DeviceStatusCallback.java b/src/main/java/com/genersoft/iot/vmp/common/DeviceStatusCallback.java new file mode 100644 index 000000000..9ee9b9edb --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/common/DeviceStatusCallback.java @@ -0,0 +1,7 @@ +package com.genersoft.iot.vmp.common; + +import com.genersoft.iot.vmp.gb28181.bean.SipTransactionInfo; + +public interface DeviceStatusCallback { + public void run(String deviceId, SipTransactionInfo transactionInfo); +} diff --git a/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java b/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java index 2fa59b8ef..17bd502df 100644 --- a/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java +++ b/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java @@ -46,7 +46,6 @@ public class VideoManagerConstants { public static final String SYSTEM_INFO_DISK_PREFIX = "VMP_SYSTEM_INFO_DISK_"; public static final String BROADCAST_WAITE_INVITE = "task_broadcast_waite_invite_"; - public static final String REGISTER_EXPIRE_TASK_KEY_PREFIX = "VMP_device_register_expire_"; public static final String PUSH_STREAM_LIST = "VMP_PUSH_STREAM_LIST_"; public static final String WAITE_SEND_PUSH_STREAM = "VMP_WAITE_SEND_PUSH_STREAM:"; public static final String START_SEND_PUSH_STREAM = "VMP_START_SEND_PUSH_STREAM:"; diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/controller/DeviceQuery.java b/src/main/java/com/genersoft/iot/vmp/gb28181/controller/DeviceQuery.java index 6788a8eb3..5d5a02beb 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/controller/DeviceQuery.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/controller/DeviceQuery.java @@ -223,7 +223,7 @@ public class DeviceQuery { if (exist) { throw new ControllerException(ErrorCode.ERROR100.getCode(), "设备编号已存在"); } - deviceService.addDevice(device); + deviceService.addCustomDevice(device); } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/dao/DeviceMapper.java b/src/main/java/com/genersoft/iot/vmp/gb28181/dao/DeviceMapper.java index af84f8327..e51238587 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/dao/DeviceMapper.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/dao/DeviceMapper.java @@ -230,6 +230,7 @@ public interface DeviceMapper { "on_line"+ " FROM wvp_device WHERE on_line = true") List getOnlineDevices(); + @Select("SELECT " + "id, " + "device_id, " + @@ -423,4 +424,12 @@ public interface DeviceMapper { " WHERE id=#{id}"+ " "}) void updateSubscribeMobilePosition(Device device); + + @Update(value = {" "}) + void offlineByList(List offlineDevices); } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/IDeviceService.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/IDeviceService.java index c448c04fa..28b7f61c5 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/IDeviceService.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/IDeviceService.java @@ -128,7 +128,7 @@ public interface IDeviceService { * 添加设备 * @param device */ - void addDevice(Device device); + void addCustomDevice(Device device); /** * 页面表单更新设备信息 diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceServiceImpl.java index 18114548d..00643f5b2 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceServiceImpl.java @@ -2,7 +2,6 @@ package com.genersoft.iot.vmp.gb28181.service.impl; import com.alibaba.fastjson2.JSON; import com.genersoft.iot.vmp.common.CommonCallback; -import com.genersoft.iot.vmp.common.VideoManagerConstants; import com.genersoft.iot.vmp.common.enums.ChannelDataType; import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.conf.UserSetting; @@ -15,6 +14,9 @@ import com.genersoft.iot.vmp.gb28181.service.IDeviceService; import com.genersoft.iot.vmp.gb28181.service.IInviteStreamService; import com.genersoft.iot.vmp.gb28181.session.AudioBroadcastManager; import com.genersoft.iot.vmp.gb28181.session.SipInviteSessionManager; +import com.genersoft.iot.vmp.gb28181.task.deviceStatus.DeviceStatusTask; +import com.genersoft.iot.vmp.gb28181.task.deviceStatus.DeviceStatusTaskInfo; +import com.genersoft.iot.vmp.gb28181.task.deviceStatus.DeviceStatusTaskRunner; import com.genersoft.iot.vmp.gb28181.task.deviceSubscribe.SubscribeTask; import com.genersoft.iot.vmp.gb28181.task.deviceSubscribe.SubscribeTaskInfo; import com.genersoft.iot.vmp.gb28181.task.deviceSubscribe.SubscribeTaskRunner; @@ -50,8 +52,7 @@ import javax.sip.SipException; import javax.validation.constraints.NotNull; import java.text.ParseException; import java.time.Instant; -import java.util.List; -import java.util.Objects; +import java.util.*; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.TimeUnit; @@ -111,13 +112,83 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner { @Autowired private SubscribeTaskRunner subscribeTaskRunner; + @Autowired + private DeviceStatusTaskRunner deviceStatusTaskRunner; + private Device getDeviceByDeviceIdFromDb(String deviceId) { return deviceMapper.getDeviceByDeviceId(deviceId); } @Override public void run(String... args) throws Exception { - // TODO 处理设备离线 + + // 清理数据库不存在但是redis中存在的数据 + List devicesInDb = getAll(); + if (devicesInDb.isEmpty()) { + redisCatchStorage.removeAllDevice(); + }else { + List devicesInRedis = redisCatchStorage.getAllDevices(); + if (!devicesInRedis.isEmpty()) { + Map deviceMapInDb = new HashMap<>(); + devicesInDb.parallelStream().forEach(device -> { + deviceMapInDb.put(device.getDeviceId(), device); + }); + devicesInRedis.parallelStream().forEach(device -> { + if (deviceMapInDb.get(device.getDeviceId()) == null + && userSetting.getServerId().equals(device.getServerId())) { + redisCatchStorage.removeDevice(device.getDeviceId()); + } + }); + } + } + + // 重置cseq计数 + redisCatchStorage.resetAllCSEQ(); + // 处理设备状态 + List allTaskInfo = deviceStatusTaskRunner.getAllTaskInfo(); + List onlineDeviceIds = new ArrayList<>(); + if (!allTaskInfo.isEmpty()) { + for (DeviceStatusTaskInfo taskInfo : allTaskInfo) { + Device device = getDeviceByDeviceId(taskInfo.getDeviceId()); + if (device == null) { + deviceStatusTaskRunner.removeTask(taskInfo.getDeviceId()); + continue; + } + // 恢复定时任务, TCP因为连接已经断开必须等待设备重新连接 + DeviceStatusTask deviceStatusTask = DeviceStatusTask.getInstance(taskInfo.getDeviceId(), + taskInfo.getTransactionInfo(), taskInfo.getExpireTime(), this::deviceStatusExpire); + deviceStatusTaskRunner.addTask(deviceStatusTask); + onlineDeviceIds.add(taskInfo.getDeviceId()); + } + // 除了记录的设备以外, 其他设备全部离线 + List onlineDevice = getAllOnlineDevice(userSetting.getServerId()); + if (!onlineDevice.isEmpty()) { + List offlineDevices = new ArrayList<>(); + for (Device device : onlineDevice) { + if (!onlineDeviceIds.contains(device.getDeviceId())) { + // 此设备需要离线 + device.setOnLine(false); + // 清理离线设备的相关缓存 + cleanOfflineDevice(device); + // 更新数据库 + offlineDevices.add(device); + } + } + if (!offlineDevices.isEmpty()) { + offlineByIds(offlineDevices); + } + } + }else { + // 所有设备全部离线 + List onlineDevice = getAllOnlineDevice(userSetting.getServerId()); + for (Device device : onlineDevice) { + // 此设备需要离线 + device.setOnLine(false); + // 清理离线设备的相关缓存 + cleanOfflineDevice(device); + } + offlineByIds(onlineDevice); + } // 处理订阅任务 List taskInfoList = subscribeTaskRunner.getAllTaskInfo(); @@ -127,7 +198,7 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner { continue; } Device device = getDeviceByDeviceId(taskInfo.getDeviceId()); - if (device == null || !device.isOnLine()) { + if (device == null || !device.isOnLine() || !onlineDeviceIds.contains(taskInfo.getDeviceId())) { subscribeTaskRunner.removeSubscribe(taskInfo.getKey()); continue; } @@ -148,6 +219,61 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner { } } + private void offlineByIds(List offlineDevices) { + if (offlineDevices.isEmpty()) { + log.info("[更新多个离线设备信息] 参数为空"); + return; + } + deviceMapper.offlineByList(offlineDevices); + for (Device device : offlineDevices) { + device.setOnLine(false); + redisCatchStorage.updateDevice(device); + } + } + + private void cleanOfflineDevice(Device device) { + if (subscribeTaskRunner.containsKey(SubscribeTaskForCatalog.getKey(device))) { + subscribeTaskRunner.removeSubscribe(SubscribeTaskForCatalog.getKey(device)); + } + if (subscribeTaskRunner.containsKey(SubscribeTaskForMobilPosition.getKey(device))) { + subscribeTaskRunner.removeSubscribe(SubscribeTaskForMobilPosition.getKey(device)); + } + //进行通道离线 +// deviceChannelMapper.offlineByDeviceId(deviceId); + // 离线释放所有ssrc + List ssrcTransactions = sessionManager.getSsrcTransactionByDeviceId(device.getDeviceId()); + if (ssrcTransactions != null && !ssrcTransactions.isEmpty()) { + for (SsrcTransaction ssrcTransaction : ssrcTransactions) { + mediaServerService.releaseSsrc(ssrcTransaction.getMediaServerId(), ssrcTransaction.getSsrc()); + mediaServerService.closeRTPServer(ssrcTransaction.getMediaServerId(), ssrcTransaction.getStream()); + sessionManager.removeByCallId(ssrcTransaction.getCallId()); + } + } + // 移除订阅 + removeCatalogSubscribe(device, null); + removeMobilePositionSubscribe(device, null); + + List audioBroadcastCatches = audioBroadcastManager.getByDeviceId(device.getDeviceId()); + if (!audioBroadcastCatches.isEmpty()) { + for (AudioBroadcastCatch audioBroadcastCatch : audioBroadcastCatches) { + + SendRtpInfo sendRtpItem = sendRtpServerService.queryByChannelId(audioBroadcastCatch.getChannelId(), device.getDeviceId()); + if (sendRtpItem != null) { + sendRtpServerService.delete(sendRtpItem); + MediaServer mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); + mediaServerService.stopSendRtp(mediaInfo, sendRtpItem.getApp(), sendRtpItem.getStream(), null); + } + + audioBroadcastManager.del(audioBroadcastCatch.getChannelId()); + } + } + } + + private void deviceStatusExpire(String deviceId, SipTransactionInfo transactionInfo) { + log.info("[设备状态] 到期, 编号: {}", deviceId); + offline(deviceId, "保活到期"); + } + @Override public void online(Device device, SipTransactionInfo sipTransactionInfo) { log.info("[设备上线] deviceId:{}->{}:{}", device.getDeviceId(), device.getIp(), device.getPort()); @@ -182,6 +308,7 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner { device.setCreateTime(now); device.setUpdateTime(now); log.info("[设备上线,首次注册]: {},查询设备信息以及通道信息", device.getDeviceId()); + addCustomDevice(device); deviceMapper.add(device); redisCatchStorage.updateDevice(device); try { @@ -193,7 +320,7 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner { sync(device); }else { device.setServerId(userSetting.getServerId()); - if(!device.isOnLine()){ + if(!deviceInDb.isOnLine()){ device.setOnLine(true); device.setCreateTime(now); deviceMapper.update(device); @@ -219,6 +346,7 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner { if (device.getSubscribeCycleForMobilePosition() > 0 && !subscribeTaskRunner.containsKey(SubscribeTaskForMobilPosition.getKey(device))) { addMobilePositionSubscribe(device, null); } + if (userSetting.getDeviceStatusNotify()) { // 发送redis消息 redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), null, true); @@ -233,12 +361,19 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner { sync(device); } } - - // 刷新过期任务 - String registerExpireTaskKey = VideoManagerConstants.REGISTER_EXPIRE_TASK_KEY_PREFIX + device.getDeviceId(); - // 如果第一次注册那么必须在60 * 3时间内收到一个心跳,否则设备离线 - dynamicTask.startDelay(registerExpireTaskKey, ()-> offline(device.getDeviceId(), "三次心跳超时"), - device.getHeartBeatInterval() * 1000 * device.getHeartBeatCount()); + long expiresTime = Math.min(device.getExpires(), device.getHeartBeatInterval() * device.getHeartBeatCount()) * 1000L; + if (deviceStatusTaskRunner.containsKey(device.getDeviceId())) { + if (sipTransactionInfo == null) { + deviceStatusTaskRunner.updateDelay(device.getDeviceId(), System.currentTimeMillis() + expiresTime); + }else { + deviceStatusTaskRunner.removeTask(device.getDeviceId()); + DeviceStatusTask task = DeviceStatusTask.getInstance(device.getDeviceId(), sipTransactionInfo, expiresTime, this::deviceStatusExpire); + deviceStatusTaskRunner.addTask(task); + } + }else { + DeviceStatusTask task = DeviceStatusTask.getInstance(device.getDeviceId(), sipTransactionInfo, expiresTime, this::deviceStatusExpire); + deviceStatusTaskRunner.addTask(task); + } } @@ -260,50 +395,14 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner { } log.info("[设备离线] {}, device:{}, 心跳间隔: {},心跳超时次数: {}, 上次心跳时间:{}, 上次注册时间: {}", reason, deviceId, device.getHeartBeatInterval(), device.getHeartBeatCount(), device.getKeepaliveTime(), device.getRegisterTime()); - String registerExpireTaskKey = VideoManagerConstants.REGISTER_EXPIRE_TASK_KEY_PREFIX + deviceId; - dynamicTask.stop(registerExpireTaskKey); - if (device.isOnLine()) { - if (userSetting.getDeviceStatusNotify()) { - // 发送redis消息 - redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), null, false); - } - } - device.setOnLine(false); + cleanOfflineDevice(device); redisCatchStorage.updateDevice(device); deviceMapper.update(device); if (!isPlatform(deviceId)) { // 进行通道离线 deviceChannelMapper.offlineByDeviceId(device.getId()); } - - // 离线释放所有ssrc - List ssrcTransactions = sessionManager.getSsrcTransactionByDeviceId(deviceId); - if (ssrcTransactions != null && !ssrcTransactions.isEmpty()) { - for (SsrcTransaction ssrcTransaction : ssrcTransactions) { - mediaServerService.releaseSsrc(ssrcTransaction.getMediaServerId(), ssrcTransaction.getSsrc()); - mediaServerService.closeRTPServer(ssrcTransaction.getMediaServerId(), ssrcTransaction.getStream()); - sessionManager.removeByCallId(ssrcTransaction.getCallId()); - } - } - // 移除订阅 - removeCatalogSubscribe(device, null); - removeMobilePositionSubscribe(device, null); - - List audioBroadcastCatches = audioBroadcastManager.getByDeviceId(deviceId); - if (!audioBroadcastCatches.isEmpty()) { - for (AudioBroadcastCatch audioBroadcastCatch : audioBroadcastCatches) { - - SendRtpInfo sendRtpItem = sendRtpServerService.queryByChannelId(audioBroadcastCatch.getChannelId(), deviceId); - if (sendRtpItem != null) { - sendRtpServerService.delete(sendRtpItem); - MediaServer mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); - mediaServerService.stopSendRtp(mediaInfo, sendRtpItem.getApp(), sendRtpItem.getStream(), null); - } - - audioBroadcastManager.del(audioBroadcastCatch.getChannelId()); - } - } } private boolean isPlatform(String deviceId) { @@ -345,7 +444,7 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner { log.info("[目录订阅] 到期, 编号: {}, 设备不存在, 忽略", deviceId); return; } - if (device.getSubscribeCycleForCatalog() > 0) { + if (device.isOnLine() && device.getSubscribeCycleForCatalog() > 0) { addCatalogSubscribe(device, transactionInfo); } } @@ -357,7 +456,7 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner { log.info("[移动位置订阅] 到期, 编号: {}, 设备不存在, 忽略", deviceId); return; } - if (device.getSubscribeCycleForMobilePosition() > 0) { + if (device.isOnLine() && device.getSubscribeCycleForMobilePosition() > 0) { addMobilePositionSubscribe(device, transactionInfo); } } @@ -397,9 +496,9 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner { @Override public boolean removeCatalogSubscribe(@NotNull Device device, CommonCallback callback) { - log.info("[移除目录订阅]: {}", device.getDeviceId()); String key = SubscribeTaskForCatalog.getKey(device); if (subscribeTaskRunner.containsKey(key)) { + log.info("[移除目录订阅]: {}", device.getDeviceId()); SipTransactionInfo transactionInfo = subscribeTaskRunner.getTransactionInfo(key); if (transactionInfo == null) { log.warn("[移除目录订阅] 未找到事务信息,{}", device.getDeviceId()); @@ -457,9 +556,10 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner { @Override public boolean removeMobilePositionSubscribe(Device device, CommonCallback callback) { - log.info("[移除移动位置订阅]: {}", device.getDeviceId()); + String key = SubscribeTaskForMobilPosition.getKey(device); if (subscribeTaskRunner.containsKey(key)) { + log.info("[移除移动位置订阅]: {}", device.getDeviceId()); SipTransactionInfo transactionInfo = subscribeTaskRunner.getTransactionInfo(key); if (transactionInfo == null) { log.warn("[移除移动位置订阅] 未找到事务信息,{}", device.getDeviceId()); @@ -596,7 +696,7 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner { } @Override - public void addDevice(Device device) { + public void addCustomDevice(Device device) { device.setOnLine(false); device.setCreateTime(DateUtil.getNow()); device.setUpdateTime(DateUtil.getNow()); @@ -641,9 +741,9 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner { if (subscribeTaskRunner.containsKey(SubscribeTaskForMobilPosition.getKey(device))) { removeMobilePositionSubscribe(device, null); } - // 停止状态检测 - String registerExpireTaskKey = VideoManagerConstants.REGISTER_EXPIRE_TASK_KEY_PREFIX + device.getDeviceId(); - dynamicTask.stop(registerExpireTaskKey); + if (deviceStatusTaskRunner.containsKey(deviceId)) { + deviceStatusTaskRunner.removeTask(deviceId); + } platformChannelMapper.delChannelForDeviceId(deviceId); deviceChannelMapper.cleanChannelsByDeviceId(device.getId()); deviceMapper.del(deviceId); @@ -695,7 +795,7 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner { public void subscribeCatalog(int id, int cycle) { Device device = deviceMapper.query(id); Assert.notNull(device, "未找到设备"); - + Assert.isTrue(device.isOnLine(), "设备已离线"); if (device.getSubscribeCycleForCatalog() == cycle) { return; } @@ -726,6 +826,7 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner { public void subscribeMobilePosition(int id, int cycle, int interval) { Device device = deviceMapper.query(id); Assert.notNull(device, "未找到设备"); + Assert.isTrue(device.isOnLine(), "设备已离线"); if (device.getSubscribeCycleForMobilePosition() == cycle) { return; @@ -763,15 +864,16 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner { } if (!Objects.equals(deviceInDb.getHeartBeatCount(), device.getHeartBeatCount()) || !Objects.equals(deviceInDb.getHeartBeatInterval(), device.getHeartBeatInterval())) { - // 刷新过期任务 - String registerExpireTaskKey = VideoManagerConstants.REGISTER_EXPIRE_TASK_KEY_PREFIX + device.getDeviceId(); - // 如果第一次注册那么必须在60 * 3时间内收到一个心跳,否则设备离线 - dynamicTask.startDelay(registerExpireTaskKey, ()-> offline(device.getDeviceId(), "三次心跳超时"), - device.getHeartBeatInterval() * 1000 * device.getHeartBeatCount()); + deviceInDb.setHeartBeatCount(device.getHeartBeatCount()); deviceInDb.setHeartBeatInterval(device.getHeartBeatInterval()); deviceInDb.setPositionCapability(device.getPositionCapability()); updateDevice(deviceInDb); + + long expiresTime = Math.min(device.getExpires(), device.getHeartBeatInterval() * device.getHeartBeatCount()) * 1000L; + if (deviceStatusTaskRunner.containsKey(device.getDeviceId())) { + deviceStatusTaskRunner.updateDelay(device.getDeviceId(), expiresTime + System.currentTimeMillis()); + } } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlatformServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlatformServiceImpl.java index c33247fe4..89cb637e8 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlatformServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlatformServiceImpl.java @@ -113,6 +113,35 @@ public class PlatformServiceImpl implements IPlatformService, CommandLineRunner @Override public void run(String... args) throws Exception { + + // 查找国标推流 + List sendRtpItems = redisCatchStorage.queryAllSendRTPServer(); + if (!sendRtpItems.isEmpty()) { + for (SendRtpInfo sendRtpItem : sendRtpItems) { + MediaServer mediaServerItem = mediaServerService.getOne(sendRtpItem.getMediaServerId()); + CommonGBChannel channel = channelService.getOne(sendRtpItem.getChannelId()); + if (channel == null){ + continue; + } + sendRtpServerService.delete(sendRtpItem); + if (mediaServerItem != null) { + ssrcFactory.releaseSsrc(sendRtpItem.getMediaServerId(), sendRtpItem.getSsrc()); + boolean stopResult = mediaServerService.initStopSendRtp(mediaServerItem, sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getSsrc()); + if (stopResult) { + Platform platform = queryPlatformByServerGBId(sendRtpItem.getTargetId()); + + if (platform != null) { + try { + commanderForPlatform.streamByeCmd(platform, sendRtpItem, channel); + } catch (InvalidArgumentException | ParseException | SipException e) { + log.error("[命令发送失败] 国标级联 发送BYE: {}", e.getMessage()); + } + } + } + } + } + } + // 启动时 如果存在未过期的注册平台,则发送注销 List registerTaskInfoList = statusTaskRunner.getAllRegisterTaskInfo(); if (registerTaskInfoList.isEmpty()) { @@ -415,7 +444,6 @@ public class PlatformServiceImpl implements IPlatformService, CommandLineRunner } private void keepaliveExpire(String platformServerId, int failCount) { - log.info("[国标级联] 心跳到期, 上级平台编号: {}", platformServerId); Platform platform = queryPlatformByServerGBId(platformServerId); if (platform == null || !platform.isEnable()) { log.info("[国标级联] 心跳到期, 上级平台编号: {}, 平台不存在或者未启用, 忽略", platformServerId); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlayServiceImpl.java index fb56c689b..f959bfc5c 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlayServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlayServiceImpl.java @@ -654,11 +654,11 @@ public class PlayServiceImpl implements IPlayService { // 主动连接失败,结束流程, 清理数据 receiveRtpServerService.closeRTPServer(mediaServerItem, ssrcInfo); sessionManager.removeByStream(ssrcInfo.getApp(), ssrcInfo.getStream()); - callback.run(InviteErrorCode.ERROR_FOR_SDP_PARSING_EXCEPTIONS.getCode(), - InviteErrorCode.ERROR_FOR_SDP_PARSING_EXCEPTIONS.getMsg(), null); + callback.run(InviteErrorCode.ERROR_FOR_TCP_ACTIVE_CONNECTION_REFUSED_ERROR.getCode(), + InviteErrorCode.ERROR_FOR_TCP_ACTIVE_CONNECTION_REFUSED_ERROR.getMsg(), null); inviteStreamService.call(InviteSessionType.BROADCAST, channel.getId(), null, - InviteErrorCode.ERROR_FOR_SDP_PARSING_EXCEPTIONS.getCode(), - InviteErrorCode.ERROR_FOR_SDP_PARSING_EXCEPTIONS.getMsg(), null); + InviteErrorCode.ERROR_FOR_TCP_ACTIVE_CONNECTION_REFUSED_ERROR.getCode(), + InviteErrorCode.ERROR_FOR_TCP_ACTIVE_CONNECTION_REFUSED_ERROR.getMsg(), null); } } catch (SdpException e) { log.error("[TCP主动连接对方] deviceId: {}, channelId: {}, 解析200OK的SDP信息失败", device.getDeviceId(), channel.getDeviceId(), e); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/task/SipRunner.java b/src/main/java/com/genersoft/iot/vmp/gb28181/task/SipRunner.java deleted file mode 100755 index 56a649bc5..000000000 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/task/SipRunner.java +++ /dev/null @@ -1,132 +0,0 @@ -package com.genersoft.iot.vmp.gb28181.task; - -import com.genersoft.iot.vmp.conf.UserSetting; -import com.genersoft.iot.vmp.gb28181.bean.CommonGBChannel; -import com.genersoft.iot.vmp.gb28181.bean.Device; -import com.genersoft.iot.vmp.gb28181.bean.Platform; -import com.genersoft.iot.vmp.gb28181.bean.SendRtpInfo; -import com.genersoft.iot.vmp.gb28181.service.IDeviceService; -import com.genersoft.iot.vmp.gb28181.service.IGbChannelService; -import com.genersoft.iot.vmp.gb28181.service.IPlatformService; -import com.genersoft.iot.vmp.gb28181.session.SSRCFactory; -import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform; -import com.genersoft.iot.vmp.media.bean.MediaServer; -import com.genersoft.iot.vmp.media.service.IMediaServerService; -import com.genersoft.iot.vmp.service.ISendRtpServerService; -import com.genersoft.iot.vmp.storager.IRedisCatchStorage; -import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.boot.CommandLineRunner; -import org.springframework.core.annotation.Order; -import org.springframework.stereotype.Component; - -import javax.sip.InvalidArgumentException; -import javax.sip.SipException; -import java.text.ParseException; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - - -/** - * 系统启动时控制设备 - * @author lin - */ -@Slf4j -@Component -@Order(value=14) -public class SipRunner implements CommandLineRunner { - - @Autowired - private IRedisCatchStorage redisCatchStorage; - - @Autowired - private SSRCFactory ssrcFactory; - - @Autowired - private IDeviceService deviceService; - - @Autowired - private IMediaServerService mediaServerService; - - @Autowired - private IPlatformService platformService; - - @Autowired - private IGbChannelService channelService; - - @Autowired - private ISIPCommanderForPlatform commanderForPlatform; - - @Autowired - private ISendRtpServerService sendRtpServerService; - - @Autowired - private UserSetting userSetting; - - @Override - public void run(String... args) throws Exception { - List deviceList = deviceService.getAllOnlineDevice(userSetting.getServerId()); - - for (Device device : deviceList) { - if (deviceService.expire(device)){ - deviceService.offline(device.getDeviceId(), "注册已过期"); - }else { - deviceService.online(device, null); - } - } - // 重置cseq计数 - redisCatchStorage.resetAllCSEQ(); - // 清理redis - // 清理数据库不存在但是redis中存在的数据 - List devicesInDb = deviceService.getAll(); - if (devicesInDb.isEmpty()) { - redisCatchStorage.removeAllDevice(); - }else { - List devicesInRedis = redisCatchStorage.getAllDevices(); - if (!devicesInRedis.isEmpty()) { - Map deviceMapInDb = new HashMap<>(); - devicesInDb.parallelStream().forEach(device -> { - deviceMapInDb.put(device.getDeviceId(), device); - }); - devicesInRedis.parallelStream().forEach(device -> { - if (deviceMapInDb.get(device.getDeviceId()) == null - && userSetting.getServerId().equals(device.getServerId())) { - redisCatchStorage.removeDevice(device.getDeviceId()); - } - }); - } - } - - - // 查找国标推流 - List sendRtpItems = redisCatchStorage.queryAllSendRTPServer(); - if (!sendRtpItems.isEmpty()) { - for (SendRtpInfo sendRtpItem : sendRtpItems) { - MediaServer mediaServerItem = mediaServerService.getOne(sendRtpItem.getMediaServerId()); - CommonGBChannel channel = channelService.getOne(sendRtpItem.getChannelId()); - if (channel == null){ - continue; - } - sendRtpServerService.delete(sendRtpItem); - if (mediaServerItem != null) { - ssrcFactory.releaseSsrc(sendRtpItem.getMediaServerId(), sendRtpItem.getSsrc()); - boolean stopResult = mediaServerService.initStopSendRtp(mediaServerItem, sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getSsrc()); - if (stopResult) { - Platform platform = platformService.queryPlatformByServerGBId(sendRtpItem.getTargetId()); - - if (platform != null) { - try { - commanderForPlatform.streamByeCmd(platform, sendRtpItem, channel); - } catch (InvalidArgumentException | ParseException | SipException e) { - log.error("[命令发送失败] 国标级联 发送BYE: {}", e.getMessage()); - } - } - } - } - } - } - } - - -} diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/task/deviceStatus/DeviceStatusTask.java b/src/main/java/com/genersoft/iot/vmp/gb28181/task/deviceStatus/DeviceStatusTask.java new file mode 100644 index 000000000..0844cbe5d --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/task/deviceStatus/DeviceStatusTask.java @@ -0,0 +1,60 @@ +package com.genersoft.iot.vmp.gb28181.task.deviceStatus; + +import com.genersoft.iot.vmp.common.DeviceStatusCallback; +import com.genersoft.iot.vmp.gb28181.bean.SipTransactionInfo; +import lombok.Data; +import lombok.extern.slf4j.Slf4j; +import org.jetbrains.annotations.NotNull; + +import java.util.concurrent.Delayed; +import java.util.concurrent.TimeUnit; + +@Slf4j +@Data +public class DeviceStatusTask implements Delayed { + + private String deviceId; + + private SipTransactionInfo transactionInfo; + + /** + * 超时时间(单位: 毫秒) + */ + private long delayTime; + + private DeviceStatusCallback callback; + + public static DeviceStatusTask getInstance(String deviceId, SipTransactionInfo transactionInfo, long delayTime, DeviceStatusCallback callback) { + DeviceStatusTask deviceStatusTask = new DeviceStatusTask(); + deviceStatusTask.setDeviceId(deviceId); + deviceStatusTask.setTransactionInfo(transactionInfo); + deviceStatusTask.setDelayTime((delayTime * 1000L - 500L) + System.currentTimeMillis()); + deviceStatusTask.setCallback(callback); + return deviceStatusTask; + } + + public void expired() { + if (callback == null) { + log.info("[设备离线] 未找到过期处理回调, {}", deviceId); + return; + } + callback.run(deviceId, transactionInfo); + } + + @Override + public long getDelay(@NotNull TimeUnit unit) { + return unit.convert(delayTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS); + } + + @Override + public int compareTo(@NotNull Delayed o) { + return (int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS)); + } + + public DeviceStatusTaskInfo getInfo(){ + DeviceStatusTaskInfo taskInfo = new DeviceStatusTaskInfo(); + taskInfo.setTransactionInfo(transactionInfo); + taskInfo.setDeviceId(deviceId); + return taskInfo; + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/task/deviceStatus/DeviceStatusTaskInfo.java b/src/main/java/com/genersoft/iot/vmp/gb28181/task/deviceStatus/DeviceStatusTaskInfo.java new file mode 100644 index 000000000..2e6380436 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/task/deviceStatus/DeviceStatusTaskInfo.java @@ -0,0 +1,17 @@ +package com.genersoft.iot.vmp.gb28181.task.deviceStatus; + +import com.genersoft.iot.vmp.gb28181.bean.SipTransactionInfo; +import lombok.Data; + +@Data +public class DeviceStatusTaskInfo{ + + private String deviceId; + + private SipTransactionInfo transactionInfo; + + /** + * 过期时间 + */ + private long expireTime; +} diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/task/deviceStatus/DeviceStatusTaskRunner.java b/src/main/java/com/genersoft/iot/vmp/gb28181/task/deviceStatus/DeviceStatusTaskRunner.java new file mode 100644 index 000000000..a179879a2 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/task/deviceStatus/DeviceStatusTaskRunner.java @@ -0,0 +1,135 @@ +package com.genersoft.iot.vmp.gb28181.task.deviceStatus; + +import com.genersoft.iot.vmp.conf.UserSetting; +import com.genersoft.iot.vmp.gb28181.bean.SipTransactionInfo; +import com.genersoft.iot.vmp.utils.redis.RedisUtil; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Component; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.DelayQueue; +import java.util.concurrent.TimeUnit; + +@Slf4j +@Component +public class DeviceStatusTaskRunner { + + private final Map subscribes = new ConcurrentHashMap<>(); + + private final DelayQueue delayQueue = new DelayQueue<>(); + + @Autowired + private RedisTemplate redisTemplate; + + @Autowired + private UserSetting userSetting; + + private final String prefix = "VMP_DEVICE_STATUS"; + + // 状态过期检查 + @Scheduled(fixedDelay = 500, timeUnit = TimeUnit.MILLISECONDS) + public void expirationCheck(){ + while (!delayQueue.isEmpty()) { + DeviceStatusTask take = null; + try { + take = delayQueue.take(); + try { + removeTask(take.getDeviceId()); + take.expired(); + }catch (Exception e) { + log.error("[设备状态到期] 到期处理时出现异常, 设备编号: {} ", take.getDeviceId()); + } + } catch (InterruptedException e) { + log.error("[设备状态任务] ", e); + } + } + } + + public void addTask(DeviceStatusTask task) { + Duration duration = Duration.ofSeconds((task.getDelayTime() - System.currentTimeMillis())/1000); + if (duration.getSeconds() < 0) { + return; + } + subscribes.put(task.getDeviceId(), task); + String key = String.format("%s_%s_%s", prefix, userSetting.getServerId(), task.getDeviceId()); + redisTemplate.opsForValue().set(key, task.getInfo(), duration); + delayQueue.offer(task); + } + + public boolean removeTask(String key) { + DeviceStatusTask task = subscribes.get(key); + if (task == null) { + return false; + } + String redisKey = String.format("%s_%s_%s", prefix, userSetting.getServerId(), task.getDeviceId()); + redisTemplate.delete(redisKey); + subscribes.remove(key); + if (delayQueue.contains(task)) { + boolean remove = delayQueue.remove(task); + if (!remove) { + log.info("[移除状态任务] 从延时队列内移除失败: {}", key); + } + } + return true; + } + + public SipTransactionInfo getTransactionInfo(String key) { + DeviceStatusTask task = subscribes.get(key); + if (task == null) { + return null; + } + return task.getTransactionInfo(); + } + + public boolean updateDelay(String key, long expirationTime) { + DeviceStatusTask task = subscribes.get(key); + if (task == null) { + return false; + } + log.info("[更新状态任务时间] 编号: {}", key); + if (delayQueue.contains(task)) { + boolean remove = delayQueue.remove(task); + if (!remove) { + log.info("[更新状态任务时间] 从延时队列内移除失败: {}", key); + } + } + task.setDelayTime(expirationTime); + delayQueue.offer(task); + String redisKey = String.format("%s_%s_%s", prefix, userSetting.getServerId(), task.getDeviceId()); + Duration duration = Duration.ofSeconds((expirationTime - System.currentTimeMillis())/1000); + redisTemplate.expire(redisKey, duration); + return true; + } + + public boolean containsKey(String key) { + return subscribes.containsKey(key); + } + + public List getAllTaskInfo(){ + String scanKey = String.format("%s_%s_*", prefix, userSetting.getServerId()); + List values = RedisUtil.scan(redisTemplate, scanKey); + if (values.isEmpty()) { + return new ArrayList<>(); + } + List result = new ArrayList<>(); + for (Object value : values) { + String redisKey = (String)value; + DeviceStatusTaskInfo taskInfo = (DeviceStatusTaskInfo)redisTemplate.opsForValue().get(redisKey); + if (taskInfo == null) { + continue; + } + Long expire = redisTemplate.getExpire(redisKey); + taskInfo.setExpireTime(expire); + result.add(taskInfo); + } + return result; + + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderForPlatform.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderForPlatform.java index 8ad0a6c2f..4159c8ca7 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderForPlatform.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderForPlatform.java @@ -136,16 +136,17 @@ public class SIPCommanderForPlatform implements ISIPCommanderForPlatform { @Override public String keepalive(Platform parentPlatform, SipSubscribe.Event errorEvent , SipSubscribe.Event okEvent) throws SipException, InvalidArgumentException, ParseException { - String characterSet = parentPlatform.getCharacterSet(); - StringBuffer keepaliveXml = new StringBuffer(200); - keepaliveXml.append("\r\n") - .append("\r\n") - .append("Keepalive\r\n") - .append("" + (int)((Math.random()*9+1)*100000) + "\r\n") - .append("" + parentPlatform.getDeviceGBId() + "\r\n") - .append("OK\r\n") - .append("\r\n"); + log.info("[国标级联] 发送心跳, 上级平台编号: {}", parentPlatform.getServerGBId()); + String characterSet = parentPlatform.getCharacterSet(); + StringBuffer keepaliveXml = new StringBuffer(200); + keepaliveXml.append("\r\n") + .append("\r\n") + .append("Keepalive\r\n") + .append("" + (int)((Math.random()*9+1)*100000) + "\r\n") + .append("" + parentPlatform.getDeviceGBId() + "\r\n") + .append("OK\r\n") + .append("\r\n"); CallIdHeader callIdHeader = sipSender.getNewCallIdHeader(parentPlatform.getDeviceIp(),parentPlatform.getTransport()); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/RegisterRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/RegisterRequestProcessor.java index 8e367a2c8..fe5ce9134 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/RegisterRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/RegisterRequestProcessor.java @@ -113,8 +113,7 @@ public class RegisterRequestProcessor extends SIPRequestProcessorParent implemen device.setTransport("TCP".equalsIgnoreCase(transport) ? "TCP" : "UDP"); sipSender.transmitRequest(request.getLocalAddress().getHostAddress(), registerOkResponse); device.setRegisterTime(DateUtil.getNow()); - SipTransactionInfo sipTransactionInfo = new SipTransactionInfo((SIPResponse) registerOkResponse); - deviceService.online(device, sipTransactionInfo); + deviceService.online(device, null); } else { deviceService.offline(deviceId, "主动注销"); } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/SubscribeRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/SubscribeRequestProcessor.java index f59091ccd..e2793d5f2 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/SubscribeRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/SubscribeRequestProcessor.java @@ -70,6 +70,7 @@ public class SubscribeRequestProcessor extends SIPRequestProcessorParent impleme return; } String cmd = XmlUtil.getText(rootElement, "CmdType"); + log.info("[收到订阅请求] 类型: {}", cmd); if (CmdType.MOBILE_POSITION.equals(cmd)) { processNotifyMobilePosition(request, rootElement); // } else if (CmdType.ALARM.equals(cmd)) { @@ -157,12 +158,14 @@ public class SubscribeRequestProcessor extends SIPRequestProcessorParent impleme private void processNotifyCatalogList(SIPRequest request, Element rootElement) throws SipException { if (request == null) { + log.info("[处理目录订阅] 发现request为NUll。已忽略"); return; } String platformId = SipUtils.getUserIdFromFromHeader(request); String deviceId = XmlUtil.getText(rootElement, "DeviceID"); Platform platform = platformService.queryPlatformByServerGBId(platformId); if (platform == null){ + log.info("[处理目录订阅] 未找到平台 {}。已忽略", platformId); return; } @@ -186,12 +189,6 @@ public class SubscribeRequestProcessor extends SIPRequestProcessorParent impleme SubscribeInfo subscribeInfo = SubscribeInfo.getInstance(response, platformId, expires, (EventHeader)request.getHeader(EventHeader.NAME)); - if (subscribeInfo.getExpires() > 0) { - subscribeHolder.putCatalogSubscribe(platformId, subscribeInfo); - }else if (subscribeInfo.getExpires() == 0) { - subscribeHolder.removeCatalogSubscribe(platformId); - } - if (subscribeInfo.getExpires() == 0) { subscribeHolder.removeCatalogSubscribe(platformId); }else { diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/KeepaliveNotifyMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/KeepaliveNotifyMessageHandler.java index cdaea441d..dee3b821f 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/KeepaliveNotifyMessageHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/KeepaliveNotifyMessageHandler.java @@ -1,13 +1,13 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.notify.cmd; -import com.genersoft.iot.vmp.common.VideoManagerConstants; +import com.genersoft.iot.vmp.common.RemoteAddressInfo; import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.bean.Platform; -import com.genersoft.iot.vmp.common.RemoteAddressInfo; import com.genersoft.iot.vmp.gb28181.bean.SipMsgInfo; import com.genersoft.iot.vmp.gb28181.service.IDeviceService; +import com.genersoft.iot.vmp.gb28181.task.deviceStatus.DeviceStatusTaskRunner; import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent; import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.IMessageHandler; import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.notify.NotifyMessageHandler; @@ -48,6 +48,9 @@ public class KeepaliveNotifyMessageHandler extends SIPRequestProcessorParent imp @Autowired private IDeviceService deviceService; + @Autowired + private DeviceStatusTaskRunner statusTaskRunner; + @Autowired private UserSetting userSetting; @@ -111,20 +114,16 @@ public class KeepaliveNotifyMessageHandler extends SIPRequestProcessorParent imp if (device.isOnLine()) { deviceService.updateDevice(device); + long expiresTime = Math.min(device.getExpires(), device.getHeartBeatInterval() * device.getHeartBeatCount()) * 1000L; + if (statusTaskRunner.containsKey(device.getDeviceId())) { + statusTaskRunner.updateDelay(device.getDeviceId(), expiresTime + System.currentTimeMillis()); + } } else { if (userSetting.getGbDeviceOnline() == 1) { // 对于已经离线的设备判断他的注册是否已经过期 - device.setOnLine(true); - device.setRegisterTime(DateUtil.getNow()); deviceService.online(device, null); } } - // 刷新过期任务 - String registerExpireTaskKey = VideoManagerConstants.REGISTER_EXPIRE_TASK_KEY_PREFIX + device.getDeviceId(); - // 如果三次心跳失败,则设置设备离线 - dynamicTask.startDelay(registerExpireTaskKey, () -> deviceService.offline(device.getDeviceId(), "三次心跳超时"), - device.getHeartBeatInterval() * 1000 * device.getHeartBeatCount()); - } } diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRESTfulUtils.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRESTfulUtils.java index 7a766eb56..9808cf65e 100755 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRESTfulUtils.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRESTfulUtils.java @@ -12,6 +12,7 @@ import org.springframework.stereotype.Component; import java.io.File; import java.io.FileOutputStream; import java.io.IOException; +import java.math.BigDecimal; import java.net.ConnectException; import java.net.SocketTimeoutException; import java.util.HashMap; @@ -442,8 +443,10 @@ public class ZLMRESTfulUtils { param.put("vhost", "__defaultVhost__"); param.put("app", app); param.put("stream", stream); - param.put("stamp", stamp); + BigDecimal bigDecimal = new BigDecimal(stamp); + param.put("stamp", bigDecimal); param.put("schema", schema); + System.out.println(bigDecimal); return sendPost(mediaServer, "seekRecordStamp",param, null); } } diff --git a/src/main/java/com/genersoft/iot/vmp/service/bean/InviteErrorCode.java b/src/main/java/com/genersoft/iot/vmp/service/bean/InviteErrorCode.java index d43792e79..faf8323f8 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/bean/InviteErrorCode.java +++ b/src/main/java/com/genersoft/iot/vmp/service/bean/InviteErrorCode.java @@ -17,7 +17,8 @@ public enum InviteErrorCode { ERROR_FOR_RESET_SSRC(-9, "重新设置收流信息失败"), ERROR_FOR_SIP_SENDING_FAILED(-10, "命令发送失败"), ERROR_FOR_ASSIST_NOT_READY(-11, "没有可用的assist服务"), - ERROR_FOR_PARAMETER_ERROR(-13, "参数异常"); + ERROR_FOR_PARAMETER_ERROR(-13, "参数异常"), + ERROR_FOR_TCP_ACTIVE_CONNECTION_REFUSED_ERROR(-14, "TCP主动连接失败"); private final int code; private final String msg; diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 2164909c6..98f1ff2e0 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -2,4 +2,4 @@ spring: application: name: wvp profiles: - active: 273 + active: 274 diff --git a/web/src/styles/variables.scss b/web/src/styles/variables.scss index be5577263..bac37464b 100644 --- a/web/src/styles/variables.scss +++ b/web/src/styles/variables.scss @@ -1,9 +1,9 @@ // sidebar $menuText:#bfcbd9; $menuActiveText:#409EFF; -$subMenuActiveText:#f4f4f5; //https://github.com/ElemeFE/element/issues/12951 +$subMenuActiveText: #f4f4f5; //https://github.com/ElemeFE/element/issues/12951 -$menuBg:#304156; +$menuBg: #304156; $menuHover:#263445; $subMenuBg:#1f2d3d;