diff --git a/README.md b/README.md
index e44a673c7..6e2348b26 100644
--- a/README.md
+++ b/README.md
@@ -16,11 +16,11 @@ WEB VIDEO PLATFORM是一个基于GB28181-2016标准实现的开箱即用的网
前端页面基于vue-admin-template构建 [https://github.com/PanJiaChen/vue-admin-template?tab=readme-ov-file](https://github.com/PanJiaChen/vue-admin-template?tab=readme-ov-file)
# 应用场景:
-支持浏览器无插件播放摄像头视频。
-支持国标设备(摄像机、平台、NVR等)设备接入
-支持rtsp, rtmp,直播设备设备接入,充分利旧。
-支持国标级联。多平台级联。跨网视频预览。
-支持跨网网闸平台互联。
+- 支持浏览器无插件播放摄像头视频。
+- 支持国标设备(摄像机、平台、NVR等)设备接入
+- 支持rtsp, rtmp,直播设备设备接入,充分利旧。
+- 支持国标级联。多平台级联。跨网视频预览。
+- 支持跨网网闸平台互联。
# 文档
@@ -132,6 +132,7 @@ https://gitee.com/pan648540858/wvp-GB28181-pro.git
- [X] 支持ONVIF协议,设备检索,支持点播,云台控制,国标级联点播,自动点播等。
- [X] 支持部标1078+808协议,支持点播,云台控制,录像回放,位置上报,自动点播等。
- [X] 支持国标28181-2022协议,支持巡航轨迹查询,PTZ精准控制,存储卡格式化,设备软件升级,OSD配置,h265+aac,支持辅码流,录像倒放等。
+- [X] 支持国网B接口协议。支持注册,获取资源,预览等
# 授权协议
diff --git a/pom.xml b/pom.xml
index 8ab69582a..a2385b921 100644
--- a/pom.xml
+++ b/pom.xml
@@ -11,7 +11,7 @@
com.genersoft
wvp-pro
- 2.7.3
+ 2.7.4
web video platform
国标28181视频平台
${project.packaging}
diff --git a/src/main/java/com/genersoft/iot/vmp/common/DeviceStatusCallback.java b/src/main/java/com/genersoft/iot/vmp/common/DeviceStatusCallback.java
new file mode 100644
index 000000000..9ee9b9edb
--- /dev/null
+++ b/src/main/java/com/genersoft/iot/vmp/common/DeviceStatusCallback.java
@@ -0,0 +1,7 @@
+package com.genersoft.iot.vmp.common;
+
+import com.genersoft.iot.vmp.gb28181.bean.SipTransactionInfo;
+
+public interface DeviceStatusCallback {
+ public void run(String deviceId, SipTransactionInfo transactionInfo);
+}
diff --git a/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java b/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java
index 096f89d86..ca0be92e3 100644
--- a/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java
+++ b/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java
@@ -46,7 +46,6 @@ public class VideoManagerConstants {
public static final String SYSTEM_INFO_DISK_PREFIX = "VMP_SYSTEM_INFO_DISK_";
public static final String BROADCAST_WAITE_INVITE = "task_broadcast_waite_invite_";
- public static final String REGISTER_EXPIRE_TASK_KEY_PREFIX = "VMP_device_register_expire_";
public static final String PUSH_STREAM_LIST = "VMP_PUSH_STREAM_LIST_";
public static final String WAITE_SEND_PUSH_STREAM = "VMP_WAITE_SEND_PUSH_STREAM:";
public static final String START_SEND_PUSH_STREAM = "VMP_START_SEND_PUSH_STREAM:";
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/controller/DeviceQuery.java b/src/main/java/com/genersoft/iot/vmp/gb28181/controller/DeviceQuery.java
index 6788a8eb3..5d5a02beb 100755
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/controller/DeviceQuery.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/controller/DeviceQuery.java
@@ -223,7 +223,7 @@ public class DeviceQuery {
if (exist) {
throw new ControllerException(ErrorCode.ERROR100.getCode(), "设备编号已存在");
}
- deviceService.addDevice(device);
+ deviceService.addCustomDevice(device);
}
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/dao/DeviceMapper.java b/src/main/java/com/genersoft/iot/vmp/gb28181/dao/DeviceMapper.java
index b999fecd1..ff04459f3 100755
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/dao/DeviceMapper.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/dao/DeviceMapper.java
@@ -225,6 +225,7 @@ public interface DeviceMapper {
"on_line"+
" FROM wvp_device WHERE on_line = true")
List getOnlineDevices();
+
@Select("SELECT " +
"id, " +
"device_id, " +
@@ -414,4 +415,12 @@ public interface DeviceMapper {
" WHERE id=#{id}"+
" "})
void updateSubscribeMobilePosition(Device device);
+
+ @Update(value = {" "})
+ void offlineByList(List offlineDevices);
}
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/IDeviceService.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/IDeviceService.java
index c448c04fa..28b7f61c5 100755
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/IDeviceService.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/IDeviceService.java
@@ -128,7 +128,7 @@ public interface IDeviceService {
* 添加设备
* @param device
*/
- void addDevice(Device device);
+ void addCustomDevice(Device device);
/**
* 页面表单更新设备信息
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceServiceImpl.java
index 47dbfc931..b04ad9230 100755
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceServiceImpl.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceServiceImpl.java
@@ -2,7 +2,6 @@ package com.genersoft.iot.vmp.gb28181.service.impl;
import com.alibaba.fastjson2.JSON;
import com.genersoft.iot.vmp.common.CommonCallback;
-import com.genersoft.iot.vmp.common.VideoManagerConstants;
import com.genersoft.iot.vmp.common.enums.ChannelDataType;
import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.conf.UserSetting;
@@ -15,6 +14,9 @@ import com.genersoft.iot.vmp.gb28181.service.IDeviceService;
import com.genersoft.iot.vmp.gb28181.service.IInviteStreamService;
import com.genersoft.iot.vmp.gb28181.session.AudioBroadcastManager;
import com.genersoft.iot.vmp.gb28181.session.SipInviteSessionManager;
+import com.genersoft.iot.vmp.gb28181.task.deviceStatus.DeviceStatusTask;
+import com.genersoft.iot.vmp.gb28181.task.deviceStatus.DeviceStatusTaskInfo;
+import com.genersoft.iot.vmp.gb28181.task.deviceStatus.DeviceStatusTaskRunner;
import com.genersoft.iot.vmp.gb28181.task.deviceSubscribe.SubscribeTask;
import com.genersoft.iot.vmp.gb28181.task.deviceSubscribe.SubscribeTaskInfo;
import com.genersoft.iot.vmp.gb28181.task.deviceSubscribe.SubscribeTaskRunner;
@@ -50,8 +52,7 @@ import javax.sip.SipException;
import javax.validation.constraints.NotNull;
import java.text.ParseException;
import java.time.Instant;
-import java.util.List;
-import java.util.Objects;
+import java.util.*;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
@@ -111,13 +112,83 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner {
@Autowired
private SubscribeTaskRunner subscribeTaskRunner;
+ @Autowired
+ private DeviceStatusTaskRunner deviceStatusTaskRunner;
+
private Device getDeviceByDeviceIdFromDb(String deviceId) {
return deviceMapper.getDeviceByDeviceId(deviceId);
}
@Override
public void run(String... args) throws Exception {
- // TODO 处理设备离线
+
+ // 清理数据库不存在但是redis中存在的数据
+ List devicesInDb = getAll();
+ if (devicesInDb.isEmpty()) {
+ redisCatchStorage.removeAllDevice();
+ }else {
+ List devicesInRedis = redisCatchStorage.getAllDevices();
+ if (!devicesInRedis.isEmpty()) {
+ Map deviceMapInDb = new HashMap<>();
+ devicesInDb.parallelStream().forEach(device -> {
+ deviceMapInDb.put(device.getDeviceId(), device);
+ });
+ devicesInRedis.parallelStream().forEach(device -> {
+ if (deviceMapInDb.get(device.getDeviceId()) == null
+ && userSetting.getServerId().equals(device.getServerId())) {
+ redisCatchStorage.removeDevice(device.getDeviceId());
+ }
+ });
+ }
+ }
+
+ // 重置cseq计数
+ redisCatchStorage.resetAllCSEQ();
+ // 处理设备状态
+ List allTaskInfo = deviceStatusTaskRunner.getAllTaskInfo();
+ List onlineDeviceIds = new ArrayList<>();
+ if (!allTaskInfo.isEmpty()) {
+ for (DeviceStatusTaskInfo taskInfo : allTaskInfo) {
+ Device device = getDeviceByDeviceId(taskInfo.getDeviceId());
+ if (device == null) {
+ deviceStatusTaskRunner.removeTask(taskInfo.getDeviceId());
+ continue;
+ }
+ // 恢复定时任务, TCP因为连接已经断开必须等待设备重新连接
+ DeviceStatusTask deviceStatusTask = DeviceStatusTask.getInstance(taskInfo.getDeviceId(),
+ taskInfo.getTransactionInfo(), taskInfo.getExpireTime(), this::deviceStatusExpire);
+ deviceStatusTaskRunner.addTask(deviceStatusTask);
+ onlineDeviceIds.add(taskInfo.getDeviceId());
+ }
+ // 除了记录的设备以外, 其他设备全部离线
+ List onlineDevice = getAllOnlineDevice(userSetting.getServerId());
+ if (!onlineDevice.isEmpty()) {
+ List offlineDevices = new ArrayList<>();
+ for (Device device : onlineDevice) {
+ if (!onlineDeviceIds.contains(device.getDeviceId())) {
+ // 此设备需要离线
+ device.setOnLine(false);
+ // 清理离线设备的相关缓存
+ cleanOfflineDevice(device);
+ // 更新数据库
+ offlineDevices.add(device);
+ }
+ }
+ if (!offlineDevices.isEmpty()) {
+ offlineByIds(offlineDevices);
+ }
+ }
+ }else {
+ // 所有设备全部离线
+ List onlineDevice = getAllOnlineDevice(userSetting.getServerId());
+ for (Device device : onlineDevice) {
+ // 此设备需要离线
+ device.setOnLine(false);
+ // 清理离线设备的相关缓存
+ cleanOfflineDevice(device);
+ }
+ offlineByIds(onlineDevice);
+ }
// 处理订阅任务
List taskInfoList = subscribeTaskRunner.getAllTaskInfo();
@@ -127,7 +198,7 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner {
continue;
}
Device device = getDeviceByDeviceId(taskInfo.getDeviceId());
- if (device == null || !device.isOnLine()) {
+ if (device == null || !device.isOnLine() || !onlineDeviceIds.contains(taskInfo.getDeviceId())) {
subscribeTaskRunner.removeSubscribe(taskInfo.getKey());
continue;
}
@@ -148,6 +219,61 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner {
}
}
+ private void offlineByIds(List offlineDevices) {
+ if (offlineDevices.isEmpty()) {
+ log.info("[更新多个离线设备信息] 参数为空");
+ return;
+ }
+ deviceMapper.offlineByList(offlineDevices);
+ for (Device device : offlineDevices) {
+ device.setOnLine(false);
+ redisCatchStorage.updateDevice(device);
+ }
+ }
+
+ private void cleanOfflineDevice(Device device) {
+ if (subscribeTaskRunner.containsKey(SubscribeTaskForCatalog.getKey(device))) {
+ subscribeTaskRunner.removeSubscribe(SubscribeTaskForCatalog.getKey(device));
+ }
+ if (subscribeTaskRunner.containsKey(SubscribeTaskForMobilPosition.getKey(device))) {
+ subscribeTaskRunner.removeSubscribe(SubscribeTaskForMobilPosition.getKey(device));
+ }
+ //进行通道离线
+// deviceChannelMapper.offlineByDeviceId(deviceId);
+ // 离线释放所有ssrc
+ List ssrcTransactions = sessionManager.getSsrcTransactionByDeviceId(device.getDeviceId());
+ if (ssrcTransactions != null && !ssrcTransactions.isEmpty()) {
+ for (SsrcTransaction ssrcTransaction : ssrcTransactions) {
+ mediaServerService.releaseSsrc(ssrcTransaction.getMediaServerId(), ssrcTransaction.getSsrc());
+ mediaServerService.closeRTPServer(ssrcTransaction.getMediaServerId(), ssrcTransaction.getStream());
+ sessionManager.removeByCallId(ssrcTransaction.getCallId());
+ }
+ }
+ // 移除订阅
+ removeCatalogSubscribe(device, null);
+ removeMobilePositionSubscribe(device, null);
+
+ List audioBroadcastCatches = audioBroadcastManager.getByDeviceId(device.getDeviceId());
+ if (!audioBroadcastCatches.isEmpty()) {
+ for (AudioBroadcastCatch audioBroadcastCatch : audioBroadcastCatches) {
+
+ SendRtpInfo sendRtpItem = sendRtpServerService.queryByChannelId(audioBroadcastCatch.getChannelId(), device.getDeviceId());
+ if (sendRtpItem != null) {
+ sendRtpServerService.delete(sendRtpItem);
+ MediaServer mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
+ mediaServerService.stopSendRtp(mediaInfo, sendRtpItem.getApp(), sendRtpItem.getStream(), null);
+ }
+
+ audioBroadcastManager.del(audioBroadcastCatch.getChannelId());
+ }
+ }
+ }
+
+ private void deviceStatusExpire(String deviceId, SipTransactionInfo transactionInfo) {
+ log.info("[设备状态] 到期, 编号: {}", deviceId);
+ offline(deviceId, "保活到期");
+ }
+
@Override
public void online(Device device, SipTransactionInfo sipTransactionInfo) {
log.info("[设备上线] deviceId:{}->{}:{}", device.getDeviceId(), device.getIp(), device.getPort());
@@ -182,6 +308,7 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner {
device.setCreateTime(now);
device.setUpdateTime(now);
log.info("[设备上线,首次注册]: {},查询设备信息以及通道信息", device.getDeviceId());
+ addCustomDevice(device);
deviceMapper.add(device);
redisCatchStorage.updateDevice(device);
try {
@@ -193,7 +320,7 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner {
sync(device);
}else {
device.setServerId(userSetting.getServerId());
- if(!device.isOnLine()){
+ if(!deviceInDb.isOnLine()){
device.setOnLine(true);
device.setCreateTime(now);
deviceMapper.update(device);
@@ -216,6 +343,7 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner {
if (device.getSubscribeCycleForMobilePosition() > 0 && !subscribeTaskRunner.containsKey(SubscribeTaskForMobilPosition.getKey(device))) {
addMobilePositionSubscribe(device, null);
}
+
if (userSetting.getDeviceStatusNotify()) {
// 发送redis消息
redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), null, true);
@@ -230,12 +358,19 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner {
sync(device);
}
}
-
- // 刷新过期任务
- String registerExpireTaskKey = VideoManagerConstants.REGISTER_EXPIRE_TASK_KEY_PREFIX + device.getDeviceId();
- // 如果第一次注册那么必须在60 * 3时间内收到一个心跳,否则设备离线
- dynamicTask.startDelay(registerExpireTaskKey, ()-> offline(device.getDeviceId(), "三次心跳超时"),
- device.getHeartBeatInterval() * 1000 * device.getHeartBeatCount());
+ long expiresTime = Math.min(device.getExpires(), device.getHeartBeatInterval() * device.getHeartBeatCount()) * 1000L;
+ if (deviceStatusTaskRunner.containsKey(device.getDeviceId())) {
+ if (sipTransactionInfo == null) {
+ deviceStatusTaskRunner.updateDelay(device.getDeviceId(), System.currentTimeMillis() + expiresTime);
+ }else {
+ deviceStatusTaskRunner.removeTask(device.getDeviceId());
+ DeviceStatusTask task = DeviceStatusTask.getInstance(device.getDeviceId(), sipTransactionInfo, expiresTime, this::deviceStatusExpire);
+ deviceStatusTaskRunner.addTask(task);
+ }
+ }else {
+ DeviceStatusTask task = DeviceStatusTask.getInstance(device.getDeviceId(), sipTransactionInfo, expiresTime, this::deviceStatusExpire);
+ deviceStatusTaskRunner.addTask(task);
+ }
}
@@ -256,47 +391,10 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner {
}
log.info("[设备离线] {}, device:{}, 心跳间隔: {},心跳超时次数: {}, 上次心跳时间:{}, 上次注册时间: {}", reason, deviceId,
device.getHeartBeatInterval(), device.getHeartBeatCount(), device.getKeepaliveTime(), device.getRegisterTime());
- String registerExpireTaskKey = VideoManagerConstants.REGISTER_EXPIRE_TASK_KEY_PREFIX + deviceId;
- dynamicTask.stop(registerExpireTaskKey);
- if (device.isOnLine()) {
- if (userSetting.getDeviceStatusNotify()) {
- // 发送redis消息
- redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), null, false);
- }
- }
-
device.setOnLine(false);
+ cleanOfflineDevice(device);
redisCatchStorage.updateDevice(device);
deviceMapper.update(device);
- //进行通道离线
-// deviceChannelMapper.offlineByDeviceId(deviceId);
- // 离线释放所有ssrc
- List ssrcTransactions = sessionManager.getSsrcTransactionByDeviceId(deviceId);
- if (ssrcTransactions != null && !ssrcTransactions.isEmpty()) {
- for (SsrcTransaction ssrcTransaction : ssrcTransactions) {
- mediaServerService.releaseSsrc(ssrcTransaction.getMediaServerId(), ssrcTransaction.getSsrc());
- mediaServerService.closeRTPServer(ssrcTransaction.getMediaServerId(), ssrcTransaction.getStream());
- sessionManager.removeByCallId(ssrcTransaction.getCallId());
- }
- }
- // 移除订阅
- removeCatalogSubscribe(device, null);
- removeMobilePositionSubscribe(device, null);
-
- List audioBroadcastCatches = audioBroadcastManager.getByDeviceId(deviceId);
- if (!audioBroadcastCatches.isEmpty()) {
- for (AudioBroadcastCatch audioBroadcastCatch : audioBroadcastCatches) {
-
- SendRtpInfo sendRtpItem = sendRtpServerService.queryByChannelId(audioBroadcastCatch.getChannelId(), deviceId);
- if (sendRtpItem != null) {
- sendRtpServerService.delete(sendRtpItem);
- MediaServer mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
- mediaServerService.stopSendRtp(mediaInfo, sendRtpItem.getApp(), sendRtpItem.getStream(), null);
- }
-
- audioBroadcastManager.del(audioBroadcastCatch.getChannelId());
- }
- }
}
// 订阅丢失检查
@@ -329,7 +427,7 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner {
log.info("[目录订阅] 到期, 编号: {}, 设备不存在, 忽略", deviceId);
return;
}
- if (device.getSubscribeCycleForCatalog() > 0) {
+ if (device.isOnLine() && device.getSubscribeCycleForCatalog() > 0) {
addCatalogSubscribe(device, transactionInfo);
}
}
@@ -341,7 +439,7 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner {
log.info("[移动位置订阅] 到期, 编号: {}, 设备不存在, 忽略", deviceId);
return;
}
- if (device.getSubscribeCycleForMobilePosition() > 0) {
+ if (device.isOnLine() && device.getSubscribeCycleForMobilePosition() > 0) {
addMobilePositionSubscribe(device, transactionInfo);
}
}
@@ -381,9 +479,9 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner {
@Override
public boolean removeCatalogSubscribe(@NotNull Device device, CommonCallback callback) {
- log.info("[移除目录订阅]: {}", device.getDeviceId());
String key = SubscribeTaskForCatalog.getKey(device);
if (subscribeTaskRunner.containsKey(key)) {
+ log.info("[移除目录订阅]: {}", device.getDeviceId());
SipTransactionInfo transactionInfo = subscribeTaskRunner.getTransactionInfo(key);
if (transactionInfo == null) {
log.warn("[移除目录订阅] 未找到事务信息,{}", device.getDeviceId());
@@ -441,9 +539,10 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner {
@Override
public boolean removeMobilePositionSubscribe(Device device, CommonCallback callback) {
- log.info("[移除移动位置订阅]: {}", device.getDeviceId());
+
String key = SubscribeTaskForMobilPosition.getKey(device);
if (subscribeTaskRunner.containsKey(key)) {
+ log.info("[移除移动位置订阅]: {}", device.getDeviceId());
SipTransactionInfo transactionInfo = subscribeTaskRunner.getTransactionInfo(key);
if (transactionInfo == null) {
log.warn("[移除移动位置订阅] 未找到事务信息,{}", device.getDeviceId());
@@ -580,7 +679,7 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner {
}
@Override
- public void addDevice(Device device) {
+ public void addCustomDevice(Device device) {
device.setOnLine(false);
device.setCreateTime(DateUtil.getNow());
device.setUpdateTime(DateUtil.getNow());
@@ -625,9 +724,9 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner {
if (subscribeTaskRunner.containsKey(SubscribeTaskForMobilPosition.getKey(device))) {
removeMobilePositionSubscribe(device, null);
}
- // 停止状态检测
- String registerExpireTaskKey = VideoManagerConstants.REGISTER_EXPIRE_TASK_KEY_PREFIX + device.getDeviceId();
- dynamicTask.stop(registerExpireTaskKey);
+ if (deviceStatusTaskRunner.containsKey(deviceId)) {
+ deviceStatusTaskRunner.removeTask(deviceId);
+ }
platformChannelMapper.delChannelForDeviceId(deviceId);
deviceChannelMapper.cleanChannelsByDeviceId(device.getId());
deviceMapper.del(deviceId);
@@ -679,7 +778,7 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner {
public void subscribeCatalog(int id, int cycle) {
Device device = deviceMapper.query(id);
Assert.notNull(device, "未找到设备");
-
+ Assert.isTrue(device.isOnLine(), "设备已离线");
if (device.getSubscribeCycleForCatalog() == cycle) {
return;
}
@@ -710,6 +809,7 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner {
public void subscribeMobilePosition(int id, int cycle, int interval) {
Device device = deviceMapper.query(id);
Assert.notNull(device, "未找到设备");
+ Assert.isTrue(device.isOnLine(), "设备已离线");
if (device.getSubscribeCycleForMobilePosition() == cycle) {
return;
@@ -747,15 +847,16 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner {
}
if (!Objects.equals(deviceInDb.getHeartBeatCount(), device.getHeartBeatCount())
|| !Objects.equals(deviceInDb.getHeartBeatInterval(), device.getHeartBeatInterval())) {
- // 刷新过期任务
- String registerExpireTaskKey = VideoManagerConstants.REGISTER_EXPIRE_TASK_KEY_PREFIX + device.getDeviceId();
- // 如果第一次注册那么必须在60 * 3时间内收到一个心跳,否则设备离线
- dynamicTask.startDelay(registerExpireTaskKey, ()-> offline(device.getDeviceId(), "三次心跳超时"),
- device.getHeartBeatInterval() * 1000 * device.getHeartBeatCount());
+
deviceInDb.setHeartBeatCount(device.getHeartBeatCount());
deviceInDb.setHeartBeatInterval(device.getHeartBeatInterval());
deviceInDb.setPositionCapability(device.getPositionCapability());
updateDevice(deviceInDb);
+
+ long expiresTime = Math.min(device.getExpires(), device.getHeartBeatInterval() * device.getHeartBeatCount()) * 1000L;
+ if (deviceStatusTaskRunner.containsKey(device.getDeviceId())) {
+ deviceStatusTaskRunner.updateDelay(device.getDeviceId(), expiresTime + System.currentTimeMillis());
+ }
}
}
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlatformServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlatformServiceImpl.java
index c33247fe4..89cb637e8 100755
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlatformServiceImpl.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlatformServiceImpl.java
@@ -113,6 +113,35 @@ public class PlatformServiceImpl implements IPlatformService, CommandLineRunner
@Override
public void run(String... args) throws Exception {
+
+ // 查找国标推流
+ List sendRtpItems = redisCatchStorage.queryAllSendRTPServer();
+ if (!sendRtpItems.isEmpty()) {
+ for (SendRtpInfo sendRtpItem : sendRtpItems) {
+ MediaServer mediaServerItem = mediaServerService.getOne(sendRtpItem.getMediaServerId());
+ CommonGBChannel channel = channelService.getOne(sendRtpItem.getChannelId());
+ if (channel == null){
+ continue;
+ }
+ sendRtpServerService.delete(sendRtpItem);
+ if (mediaServerItem != null) {
+ ssrcFactory.releaseSsrc(sendRtpItem.getMediaServerId(), sendRtpItem.getSsrc());
+ boolean stopResult = mediaServerService.initStopSendRtp(mediaServerItem, sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getSsrc());
+ if (stopResult) {
+ Platform platform = queryPlatformByServerGBId(sendRtpItem.getTargetId());
+
+ if (platform != null) {
+ try {
+ commanderForPlatform.streamByeCmd(platform, sendRtpItem, channel);
+ } catch (InvalidArgumentException | ParseException | SipException e) {
+ log.error("[命令发送失败] 国标级联 发送BYE: {}", e.getMessage());
+ }
+ }
+ }
+ }
+ }
+ }
+
// 启动时 如果存在未过期的注册平台,则发送注销
List registerTaskInfoList = statusTaskRunner.getAllRegisterTaskInfo();
if (registerTaskInfoList.isEmpty()) {
@@ -415,7 +444,6 @@ public class PlatformServiceImpl implements IPlatformService, CommandLineRunner
}
private void keepaliveExpire(String platformServerId, int failCount) {
- log.info("[国标级联] 心跳到期, 上级平台编号: {}", platformServerId);
Platform platform = queryPlatformByServerGBId(platformServerId);
if (platform == null || !platform.isEnable()) {
log.info("[国标级联] 心跳到期, 上级平台编号: {}, 平台不存在或者未启用, 忽略", platformServerId);
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlayServiceImpl.java
index fb56c689b..46a4fb32d 100755
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlayServiceImpl.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlayServiceImpl.java
@@ -510,6 +510,7 @@ public class PlayServiceImpl implements IPlayService {
try {
sendRtpInfo = sendRtpServerService.createSendRtpInfo(mediaServerItem, null, null, playSsrc, device.getDeviceId(), "talk", stream,
channel.getId(), true, false);
+ sendRtpInfo.setPlayType(InviteStreamType.TALK);
}catch (PlayException e) {
log.info("[语音对讲]开始 获取发流端口失败 deviceId: {}, channelId: {},", device.getDeviceId(), channel.getDeviceId());
return;
@@ -582,7 +583,7 @@ public class PlayServiceImpl implements IPlayService {
sendRtpInfo.setCallId(response.getCallIdHeader().getCallId());
sendRtpServerService.update(sendRtpInfo);
- SsrcTransaction ssrcTransaction = SsrcTransaction.buildForDevice(device.getDeviceId(), sendRtpInfo.getChannelId(), "talk", sendRtpInfo.getApp(),
+ SsrcTransaction ssrcTransaction = SsrcTransaction.buildForDevice(device.getDeviceId(), sendRtpInfo.getChannelId(), response.getCallIdHeader().getCallId(), sendRtpInfo.getApp(),
sendRtpInfo.getStream(), sendRtpInfo.getSsrc(), sendRtpInfo.getMediaServerId(),
response, InviteSessionType.TALK);
@@ -654,11 +655,11 @@ public class PlayServiceImpl implements IPlayService {
// 主动连接失败,结束流程, 清理数据
receiveRtpServerService.closeRTPServer(mediaServerItem, ssrcInfo);
sessionManager.removeByStream(ssrcInfo.getApp(), ssrcInfo.getStream());
- callback.run(InviteErrorCode.ERROR_FOR_SDP_PARSING_EXCEPTIONS.getCode(),
- InviteErrorCode.ERROR_FOR_SDP_PARSING_EXCEPTIONS.getMsg(), null);
+ callback.run(InviteErrorCode.ERROR_FOR_TCP_ACTIVE_CONNECTION_REFUSED_ERROR.getCode(),
+ InviteErrorCode.ERROR_FOR_TCP_ACTIVE_CONNECTION_REFUSED_ERROR.getMsg(), null);
inviteStreamService.call(InviteSessionType.BROADCAST, channel.getId(), null,
- InviteErrorCode.ERROR_FOR_SDP_PARSING_EXCEPTIONS.getCode(),
- InviteErrorCode.ERROR_FOR_SDP_PARSING_EXCEPTIONS.getMsg(), null);
+ InviteErrorCode.ERROR_FOR_TCP_ACTIVE_CONNECTION_REFUSED_ERROR.getCode(),
+ InviteErrorCode.ERROR_FOR_TCP_ACTIVE_CONNECTION_REFUSED_ERROR.getMsg(), null);
}
} catch (SdpException e) {
log.error("[TCP主动连接对方] deviceId: {}, channelId: {}, 解析200OK的SDP信息失败", device.getDeviceId(), channel.getDeviceId(), e);
@@ -724,7 +725,6 @@ public class PlayServiceImpl implements IPlayService {
inviteInfo.setStreamInfo(streamInfo);
inviteStreamService.updateInviteInfo(inviteInfo);
}
-
}
return streamInfo;
}
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/task/SipRunner.java b/src/main/java/com/genersoft/iot/vmp/gb28181/task/SipRunner.java
deleted file mode 100755
index 56a649bc5..000000000
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/task/SipRunner.java
+++ /dev/null
@@ -1,132 +0,0 @@
-package com.genersoft.iot.vmp.gb28181.task;
-
-import com.genersoft.iot.vmp.conf.UserSetting;
-import com.genersoft.iot.vmp.gb28181.bean.CommonGBChannel;
-import com.genersoft.iot.vmp.gb28181.bean.Device;
-import com.genersoft.iot.vmp.gb28181.bean.Platform;
-import com.genersoft.iot.vmp.gb28181.bean.SendRtpInfo;
-import com.genersoft.iot.vmp.gb28181.service.IDeviceService;
-import com.genersoft.iot.vmp.gb28181.service.IGbChannelService;
-import com.genersoft.iot.vmp.gb28181.service.IPlatformService;
-import com.genersoft.iot.vmp.gb28181.session.SSRCFactory;
-import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
-import com.genersoft.iot.vmp.media.bean.MediaServer;
-import com.genersoft.iot.vmp.media.service.IMediaServerService;
-import com.genersoft.iot.vmp.service.ISendRtpServerService;
-import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
-import lombok.extern.slf4j.Slf4j;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.boot.CommandLineRunner;
-import org.springframework.core.annotation.Order;
-import org.springframework.stereotype.Component;
-
-import javax.sip.InvalidArgumentException;
-import javax.sip.SipException;
-import java.text.ParseException;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-
-/**
- * 系统启动时控制设备
- * @author lin
- */
-@Slf4j
-@Component
-@Order(value=14)
-public class SipRunner implements CommandLineRunner {
-
- @Autowired
- private IRedisCatchStorage redisCatchStorage;
-
- @Autowired
- private SSRCFactory ssrcFactory;
-
- @Autowired
- private IDeviceService deviceService;
-
- @Autowired
- private IMediaServerService mediaServerService;
-
- @Autowired
- private IPlatformService platformService;
-
- @Autowired
- private IGbChannelService channelService;
-
- @Autowired
- private ISIPCommanderForPlatform commanderForPlatform;
-
- @Autowired
- private ISendRtpServerService sendRtpServerService;
-
- @Autowired
- private UserSetting userSetting;
-
- @Override
- public void run(String... args) throws Exception {
- List deviceList = deviceService.getAllOnlineDevice(userSetting.getServerId());
-
- for (Device device : deviceList) {
- if (deviceService.expire(device)){
- deviceService.offline(device.getDeviceId(), "注册已过期");
- }else {
- deviceService.online(device, null);
- }
- }
- // 重置cseq计数
- redisCatchStorage.resetAllCSEQ();
- // 清理redis
- // 清理数据库不存在但是redis中存在的数据
- List devicesInDb = deviceService.getAll();
- if (devicesInDb.isEmpty()) {
- redisCatchStorage.removeAllDevice();
- }else {
- List devicesInRedis = redisCatchStorage.getAllDevices();
- if (!devicesInRedis.isEmpty()) {
- Map deviceMapInDb = new HashMap<>();
- devicesInDb.parallelStream().forEach(device -> {
- deviceMapInDb.put(device.getDeviceId(), device);
- });
- devicesInRedis.parallelStream().forEach(device -> {
- if (deviceMapInDb.get(device.getDeviceId()) == null
- && userSetting.getServerId().equals(device.getServerId())) {
- redisCatchStorage.removeDevice(device.getDeviceId());
- }
- });
- }
- }
-
-
- // 查找国标推流
- List sendRtpItems = redisCatchStorage.queryAllSendRTPServer();
- if (!sendRtpItems.isEmpty()) {
- for (SendRtpInfo sendRtpItem : sendRtpItems) {
- MediaServer mediaServerItem = mediaServerService.getOne(sendRtpItem.getMediaServerId());
- CommonGBChannel channel = channelService.getOne(sendRtpItem.getChannelId());
- if (channel == null){
- continue;
- }
- sendRtpServerService.delete(sendRtpItem);
- if (mediaServerItem != null) {
- ssrcFactory.releaseSsrc(sendRtpItem.getMediaServerId(), sendRtpItem.getSsrc());
- boolean stopResult = mediaServerService.initStopSendRtp(mediaServerItem, sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getSsrc());
- if (stopResult) {
- Platform platform = platformService.queryPlatformByServerGBId(sendRtpItem.getTargetId());
-
- if (platform != null) {
- try {
- commanderForPlatform.streamByeCmd(platform, sendRtpItem, channel);
- } catch (InvalidArgumentException | ParseException | SipException e) {
- log.error("[命令发送失败] 国标级联 发送BYE: {}", e.getMessage());
- }
- }
- }
- }
- }
- }
- }
-
-
-}
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/task/deviceStatus/DeviceStatusTask.java b/src/main/java/com/genersoft/iot/vmp/gb28181/task/deviceStatus/DeviceStatusTask.java
new file mode 100644
index 000000000..0844cbe5d
--- /dev/null
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/task/deviceStatus/DeviceStatusTask.java
@@ -0,0 +1,60 @@
+package com.genersoft.iot.vmp.gb28181.task.deviceStatus;
+
+import com.genersoft.iot.vmp.common.DeviceStatusCallback;
+import com.genersoft.iot.vmp.gb28181.bean.SipTransactionInfo;
+import lombok.Data;
+import lombok.extern.slf4j.Slf4j;
+import org.jetbrains.annotations.NotNull;
+
+import java.util.concurrent.Delayed;
+import java.util.concurrent.TimeUnit;
+
+@Slf4j
+@Data
+public class DeviceStatusTask implements Delayed {
+
+ private String deviceId;
+
+ private SipTransactionInfo transactionInfo;
+
+ /**
+ * 超时时间(单位: 毫秒)
+ */
+ private long delayTime;
+
+ private DeviceStatusCallback callback;
+
+ public static DeviceStatusTask getInstance(String deviceId, SipTransactionInfo transactionInfo, long delayTime, DeviceStatusCallback callback) {
+ DeviceStatusTask deviceStatusTask = new DeviceStatusTask();
+ deviceStatusTask.setDeviceId(deviceId);
+ deviceStatusTask.setTransactionInfo(transactionInfo);
+ deviceStatusTask.setDelayTime((delayTime * 1000L - 500L) + System.currentTimeMillis());
+ deviceStatusTask.setCallback(callback);
+ return deviceStatusTask;
+ }
+
+ public void expired() {
+ if (callback == null) {
+ log.info("[设备离线] 未找到过期处理回调, {}", deviceId);
+ return;
+ }
+ callback.run(deviceId, transactionInfo);
+ }
+
+ @Override
+ public long getDelay(@NotNull TimeUnit unit) {
+ return unit.convert(delayTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
+ }
+
+ @Override
+ public int compareTo(@NotNull Delayed o) {
+ return (int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));
+ }
+
+ public DeviceStatusTaskInfo getInfo(){
+ DeviceStatusTaskInfo taskInfo = new DeviceStatusTaskInfo();
+ taskInfo.setTransactionInfo(transactionInfo);
+ taskInfo.setDeviceId(deviceId);
+ return taskInfo;
+ }
+}
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/task/deviceStatus/DeviceStatusTaskInfo.java b/src/main/java/com/genersoft/iot/vmp/gb28181/task/deviceStatus/DeviceStatusTaskInfo.java
new file mode 100644
index 000000000..2e6380436
--- /dev/null
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/task/deviceStatus/DeviceStatusTaskInfo.java
@@ -0,0 +1,17 @@
+package com.genersoft.iot.vmp.gb28181.task.deviceStatus;
+
+import com.genersoft.iot.vmp.gb28181.bean.SipTransactionInfo;
+import lombok.Data;
+
+@Data
+public class DeviceStatusTaskInfo{
+
+ private String deviceId;
+
+ private SipTransactionInfo transactionInfo;
+
+ /**
+ * 过期时间
+ */
+ private long expireTime;
+}
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/task/deviceStatus/DeviceStatusTaskRunner.java b/src/main/java/com/genersoft/iot/vmp/gb28181/task/deviceStatus/DeviceStatusTaskRunner.java
new file mode 100644
index 000000000..a179879a2
--- /dev/null
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/task/deviceStatus/DeviceStatusTaskRunner.java
@@ -0,0 +1,135 @@
+package com.genersoft.iot.vmp.gb28181.task.deviceStatus;
+
+import com.genersoft.iot.vmp.conf.UserSetting;
+import com.genersoft.iot.vmp.gb28181.bean.SipTransactionInfo;
+import com.genersoft.iot.vmp.utils.redis.RedisUtil;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.data.redis.core.RedisTemplate;
+import org.springframework.scheduling.annotation.Scheduled;
+import org.springframework.stereotype.Component;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.DelayQueue;
+import java.util.concurrent.TimeUnit;
+
+@Slf4j
+@Component
+public class DeviceStatusTaskRunner {
+
+ private final Map subscribes = new ConcurrentHashMap<>();
+
+ private final DelayQueue delayQueue = new DelayQueue<>();
+
+ @Autowired
+ private RedisTemplate