diff --git a/src/main/java/com/genersoft/iot/vmp/common/SubscribeCallback.java b/src/main/java/com/genersoft/iot/vmp/common/SubscribeCallback.java new file mode 100644 index 000000000..5443b50e0 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/common/SubscribeCallback.java @@ -0,0 +1,7 @@ +package com.genersoft.iot.vmp.common; + +import com.genersoft.iot.vmp.gb28181.bean.SipTransactionInfo; + +public interface SubscribeCallback{ + public void run(String deviceId, SipTransactionInfo transactionInfo); +} diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/AlarmChannelMessage.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/AlarmChannelMessage.java index 7f50f4d37..4b3bb132a 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/AlarmChannelMessage.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/AlarmChannelMessage.java @@ -1,18 +1,23 @@ package com.genersoft.iot.vmp.gb28181.bean; +import lombok.Data; + /** * 通过redis分发报警消息 */ +@Data public class AlarmChannelMessage { /** - * 国标编号 + * 通道国标编号 */ private String gbId; + /** * 报警编号 */ private int alarmSn; + /** * 告警类型 */ @@ -22,36 +27,4 @@ public class AlarmChannelMessage { * 报警描述 */ private String alarmDescription; - - public String getGbId() { - return gbId; - } - - public void setGbId(String gbId) { - this.gbId = gbId; - } - - public int getAlarmSn() { - return alarmSn; - } - - public void setAlarmSn(int alarmSn) { - this.alarmSn = alarmSn; - } - - public int getAlarmType() { - return alarmType; - } - - public void setAlarmType(int alarmType) { - this.alarmType = alarmType; - } - - public String getAlarmDescription() { - return alarmDescription; - } - - public void setAlarmDescription(String alarmDescription) { - this.alarmDescription = alarmDescription; - } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SipTransactionInfo.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SipTransactionInfo.java index 885843980..2edb71d5e 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SipTransactionInfo.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SipTransactionInfo.java @@ -1,7 +1,9 @@ package com.genersoft.iot.vmp.gb28181.bean; import gov.nist.javax.sip.message.SIPResponse; +import lombok.Data; +@Data public class SipTransactionInfo { private String callId; @@ -31,43 +33,4 @@ public class SipTransactionInfo { public SipTransactionInfo() { } - public String getCallId() { - return callId; - } - - public void setCallId(String callId) { - this.callId = callId; - } - - public String getFromTag() { - return fromTag; - } - - public void setFromTag(String fromTag) { - this.fromTag = fromTag; - } - - public String getToTag() { - return toTag; - } - - public void setToTag(String toTag) { - this.toTag = toTag; - } - - public String getViaBranch() { - return viaBranch; - } - - public void setViaBranch(String viaBranch) { - this.viaBranch = viaBranch; - } - - public boolean isAsSender() { - return asSender; - } - - public void setAsSender(boolean asSender) { - this.asSender = asSender; - } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeHolder.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeHolder.java index a15de224a..a03d5a706 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeHolder.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeHolder.java @@ -1,19 +1,20 @@ package com.genersoft.iot.vmp.gb28181.bean; -import com.genersoft.iot.vmp.common.VideoManagerConstants; import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.conf.UserSetting; -import com.genersoft.iot.vmp.gb28181.task.ISubscribeTask; +import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Component; +import java.time.Duration; import java.util.ArrayList; import java.util.List; -import java.util.concurrent.ConcurrentHashMap; /** * @author lin */ +@Slf4j @Component public class SubscribeHolder { @@ -23,44 +24,40 @@ public class SubscribeHolder { @Autowired private UserSetting userSetting; - private final String taskOverduePrefix = "subscribe_overdue_"; + @Autowired + private RedisTemplate redisTemplate; - private static ConcurrentHashMap catalogMap = new ConcurrentHashMap<>(); - private static ConcurrentHashMap mobilePositionMap = new ConcurrentHashMap<>(); + private final String prefix = "VMP_SUBSCRIBE_OVERDUE"; public void putCatalogSubscribe(String platformId, SubscribeInfo subscribeInfo) { - catalogMap.put(platformId, subscribeInfo); - if (subscribeInfo.getExpires() > 0) { - // 添加订阅到期 - String taskOverdueKey = taskOverduePrefix + "catalog_" + platformId; - // 添加任务处理订阅过期 - dynamicTask.startDelay(taskOverdueKey, () -> removeCatalogSubscribe(subscribeInfo.getId()), - subscribeInfo.getExpires() * 1000); + log.info("[国标级联] 添加目录订阅,平台: {}, 有效期: {}", platformId, subscribeInfo.getExpires()); + if (subscribeInfo.getExpires() < 0) { + return; } + String key = String.format("%s_%s_%s_%s", prefix, userSetting.getServerId(), "catalog", platformId); + Duration duration = Duration.ofSeconds(subscribeInfo.getExpires()); + redisTemplate.opsForValue().set(key, subscribeInfo, duration); } public SubscribeInfo getCatalogSubscribe(String platformId) { - return catalogMap.get(platformId); + String key = String.format("%s_%s_%s_%s", prefix, userSetting.getServerId(), "catalog", platformId); + return (SubscribeInfo)redisTemplate.opsForValue().get(key); } public void removeCatalogSubscribe(String platformId) { - - catalogMap.remove(platformId); - String taskOverdueKey = taskOverduePrefix + "catalog_" + platformId; - Runnable runnable = dynamicTask.get(taskOverdueKey); - if (runnable instanceof ISubscribeTask) { - ISubscribeTask subscribeTask = (ISubscribeTask) runnable; - subscribeTask.stop(null); - } - // 添加任务处理订阅过期 - dynamicTask.stop(taskOverdueKey); + String key = String.format("%s_%s_%s_%s", prefix, userSetting.getServerId(), "catalog", platformId); + redisTemplate.delete(key); } public void putMobilePositionSubscribe(String platformId, SubscribeInfo subscribeInfo, Runnable gpsTask) { - mobilePositionMap.put(platformId, subscribeInfo); - String key = VideoManagerConstants.SIP_SUBSCRIBE_PREFIX + userSetting.getServerId() + "MobilePosition_" + platformId; - // 添加任务处理GPS定时推送 + log.info("[国标级联] 添加移动位置订阅,平台: {}, 有效期: {}", platformId, subscribeInfo.getExpires()); + if (subscribeInfo.getExpires() < 0) { + return; + } + String key = String.format("%s_%s_%s_%s", prefix, userSetting.getServerId(), "mobilePosition", platformId); + Duration duration = Duration.ofSeconds(subscribeInfo.getExpires()); + redisTemplate.opsForValue().set(key, subscribeInfo, duration); int cycleForCatalog; if (subscribeInfo.getGpsInterval() <= 0) { @@ -68,59 +65,55 @@ public class SubscribeHolder { }else { cycleForCatalog = subscribeInfo.getGpsInterval(); } - dynamicTask.startCron(key, gpsTask, + dynamicTask.startCron( + key, + () -> { + SubscribeInfo subscribe = getMobilePositionSubscribe(platformId); + if (subscribe != null) { + gpsTask.run(); + }else { + dynamicTask.stop(key); + } + }, cycleForCatalog * 1000); - String taskOverdueKey = taskOverduePrefix + "MobilePosition_" + platformId; - if (subscribeInfo.getExpires() > 0) { - // 添加任务处理订阅过期 - dynamicTask.startDelay(taskOverdueKey, () -> { - removeMobilePositionSubscribe(subscribeInfo.getId()); - }, - subscribeInfo.getExpires() * 1000); - } + } public SubscribeInfo getMobilePositionSubscribe(String platformId) { - return mobilePositionMap.get(platformId); + String key = String.format("%s_%s_%s_%s", prefix, userSetting.getServerId(), "mobilePosition", platformId); + return (SubscribeInfo)redisTemplate.opsForValue().get(key); } public void removeMobilePositionSubscribe(String platformId) { - mobilePositionMap.remove(platformId); - String key = VideoManagerConstants.SIP_SUBSCRIBE_PREFIX + userSetting.getServerId() + "MobilePosition_" + platformId; - // 结束任务处理GPS定时推送 - dynamicTask.stop(key); - String taskOverdueKey = taskOverduePrefix + "MobilePosition_" + platformId; - Runnable runnable = dynamicTask.get(taskOverdueKey); - if (runnable instanceof ISubscribeTask) { - ISubscribeTask subscribeTask = (ISubscribeTask) runnable; - subscribeTask.stop(null); - } - // 添加任务处理订阅过期 - dynamicTask.stop(taskOverdueKey); + String key = String.format("%s_%s_%s_%s", prefix, userSetting.getServerId(), "mobilePosition", platformId); + redisTemplate.delete(key); } - public List getAllCatalogSubscribePlatform() { - List platforms = new ArrayList<>(); - if(catalogMap.size() > 0) { - for (String key : catalogMap.keySet()) { - platforms.add(catalogMap.get(key).getId()); + public List getAllCatalogSubscribePlatform(List platformList) { + if (platformList == null || platformList.isEmpty()) { + return new ArrayList<>(); + } + List result = new ArrayList<>(); + for (Platform platform : platformList) { + String key = String.format("%s_%s_%s_%s", prefix, userSetting.getServerId(), "catalog", platform.getServerGBId()); + if (redisTemplate.hasKey(key)) { + result.add(platform.getServerId()); } } - return platforms; + return result; } - public List getAllMobilePositionSubscribePlatform() { - List platforms = new ArrayList<>(); - if(!mobilePositionMap.isEmpty()) { - for (String key : mobilePositionMap.keySet()) { - platforms.add(mobilePositionMap.get(key).getId()); + public List getAllMobilePositionSubscribePlatform(List platformList) { + if (platformList == null || platformList.isEmpty()) { + return new ArrayList<>(); + } + List result = new ArrayList<>(); + for (Platform platform : platformList) { + String key = String.format("%s_%s_%s_%s", prefix, userSetting.getServerId(), "mobilePosition", platform.getServerGBId()); + if (redisTemplate.hasKey(key)) { + result.add(platform.getServerId()); } } - return platforms; - } - - public void removeAllSubscribe(String platformId) { - removeMobilePositionSubscribe(platformId); - removeCatalogSubscribe(platformId); + return result; } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/controller/DeviceQuery.java b/src/main/java/com/genersoft/iot/vmp/gb28181/controller/DeviceQuery.java index c18df8f59..e98a93a3f 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/controller/DeviceQuery.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/controller/DeviceQuery.java @@ -11,9 +11,6 @@ import com.genersoft.iot.vmp.gb28181.bean.SyncStatus; import com.genersoft.iot.vmp.gb28181.service.IDeviceChannelService; import com.genersoft.iot.vmp.gb28181.service.IDeviceService; import com.genersoft.iot.vmp.gb28181.service.IInviteStreamService; -import com.genersoft.iot.vmp.gb28181.task.ISubscribeTask; -import com.genersoft.iot.vmp.gb28181.task.impl.CatalogSubscribeTask; -import com.genersoft.iot.vmp.gb28181.task.impl.MobilePositionSubscribeTask; import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder; import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander; import com.genersoft.iot.vmp.service.redisMsg.IRedisRpcService; @@ -40,9 +37,6 @@ import java.io.File; import java.io.IOException; import java.io.InputStream; import java.nio.file.Files; -import java.util.HashMap; -import java.util.Map; -import java.util.Set; @Tag(name = "国标设备查询", description = "国标设备查询") @SuppressWarnings("rawtypes") @@ -79,7 +73,7 @@ public class DeviceQuery { @Parameter(name = "deviceId", description = "设备国标编号", required = true) @GetMapping("/devices/{deviceId}") public Device devices(@PathVariable String deviceId){ - + return deviceService.getDeviceByDeviceId(deviceId); } @@ -144,28 +138,10 @@ public class DeviceQuery { } // 清除redis记录 - boolean isSuccess = deviceService.delete(deviceId); - if (isSuccess) { - inviteStreamService.clearInviteInfo(deviceId); - // 停止此设备的订阅更新 - Set allKeys = dynamicTask.getAllKeys(); - for (String key : allKeys) { - if (key.startsWith(deviceId)) { - Runnable runnable = dynamicTask.get(key); - if (runnable instanceof ISubscribeTask) { - ISubscribeTask subscribeTask = (ISubscribeTask) runnable; - subscribeTask.stop(null); - } - dynamicTask.stop(key); - } - } - JSONObject json = new JSONObject(); - json.put("deviceId", deviceId); - return json.toString(); - } else { - log.warn("设备信息删除API调用失败!"); - throw new ControllerException(ErrorCode.ERROR100.getCode(), "设备信息删除API调用失败!"); - } + deviceService.delete(deviceId); + JSONObject json = new JSONObject(); + json.put("deviceId", deviceId); + return json.toString(); } @Operation(summary = "分页查询子目录通道", security = @SecurityRequirement(name = JwtUtils.HEADER)) @@ -355,28 +331,6 @@ public class DeviceQuery { return wvpResult; } - @GetMapping("/{deviceId}/subscribe_info") - @Operation(summary = "获取设备的订阅状态", security = @SecurityRequirement(name = JwtUtils.HEADER)) - @Parameter(name = "deviceId", description = "设备国标编号", required = true) - public WVPResult> getSubscribeInfo(@PathVariable String deviceId) { - Set allKeys = dynamicTask.getAllKeys(); - Map dialogStateMap = new HashMap<>(); - for (String key : allKeys) { - if (key.startsWith(deviceId)) { - ISubscribeTask subscribeTask = (ISubscribeTask)dynamicTask.get(key); - if (subscribeTask instanceof CatalogSubscribeTask) { - dialogStateMap.put("catalog", 1); - }else if (subscribeTask instanceof MobilePositionSubscribeTask) { - dialogStateMap.put("mobilePosition", 1); - } - } - } - WVPResult> wvpResult = new WVPResult<>(); - wvpResult.setCode(0); - wvpResult.setData(dialogStateMap); - return wvpResult; - } - @GetMapping("/snap/{deviceId}/{channelId}") @Operation(summary = "请求截图") @Parameter(name = "deviceId", description = "设备国标编号", required = true) diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/controller/MobilePositionController.java b/src/main/java/com/genersoft/iot/vmp/gb28181/controller/MobilePositionController.java index 1c3406f85..7ef963509 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/controller/MobilePositionController.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/controller/MobilePositionController.java @@ -7,7 +7,7 @@ import com.genersoft.iot.vmp.gb28181.bean.MobilePosition; import com.genersoft.iot.vmp.gb28181.service.IDeviceService; import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder; import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage; -import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander; +import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander; import com.genersoft.iot.vmp.service.IMobilePositionService; import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; import com.github.pagehelper.util.StringUtil; @@ -37,10 +37,10 @@ public class MobilePositionController { @Autowired private IMobilePositionService mobilePositionService; - + @Autowired - private SIPCommander cmder; - + private ISIPCommander cmder; + @Autowired private DeferredResultHolder resultHolder; diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/dao/PlatformMapper.java b/src/main/java/com/genersoft/iot/vmp/gb28181/dao/PlatformMapper.java index 5218537f9..182b4cd91 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/dao/PlatformMapper.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/dao/PlatformMapper.java @@ -97,4 +97,7 @@ public interface PlatformMapper { @Select("SELECT * FROM wvp_platform WHERE server_id = #{serverId}") List queryByServerId(@Param("serverId") String serverId); + + @Select("SELECT * FROM wvp_platform ") + List queryAll(); } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEventLister.java b/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEventLister.java index beb1c6917..9226a1ee2 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEventLister.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEventLister.java @@ -1,6 +1,5 @@ package com.genersoft.iot.vmp.gb28181.event.subscribe.catalog; -import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.gb28181.bean.CommonGBChannel; import com.genersoft.iot.vmp.gb28181.bean.Platform; import com.genersoft.iot.vmp.gb28181.bean.SubscribeHolder; @@ -8,7 +7,6 @@ import com.genersoft.iot.vmp.gb28181.bean.SubscribeInfo; import com.genersoft.iot.vmp.gb28181.service.IPlatformChannelService; import com.genersoft.iot.vmp.gb28181.service.IPlatformService; import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform; -import com.genersoft.iot.vmp.service.redisMsg.IRedisRpcService; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.ApplicationListener; @@ -17,7 +15,10 @@ import org.springframework.stereotype.Component; import javax.sip.InvalidArgumentException; import javax.sip.SipException; import java.text.ParseException; -import java.util.*; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; /** * catalog事件 @@ -26,6 +27,9 @@ import java.util.*; @Component public class CatalogEventLister implements ApplicationListener { + @Autowired + private IPlatformService platformService; + @Autowired private IPlatformChannelService platformChannelService; @@ -53,8 +57,9 @@ public class CatalogEventLister implements ApplicationListener { } }else { + List allPlatform = platformService.queryAll(); // 获取所用订阅 - List platforms = subscribeHolder.getAllCatalogSubscribePlatform(); + List platforms = subscribeHolder.getAllCatalogSubscribePlatform(allPlatform); if (event.getChannels() != null) { if (!platforms.isEmpty()) { for (CommonGBChannel deviceChannel : event.getChannels()) { @@ -159,4 +164,4 @@ public class CatalogEventLister implements ApplicationListener { } } } - + diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/mobilePosition/MobilePositionEventLister.java b/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/mobilePosition/MobilePositionEventLister.java index 40dc6c853..3ce1c14e0 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/mobilePosition/MobilePositionEventLister.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/mobilePosition/MobilePositionEventLister.java @@ -5,6 +5,7 @@ import com.genersoft.iot.vmp.gb28181.bean.Platform; import com.genersoft.iot.vmp.gb28181.bean.SubscribeHolder; import com.genersoft.iot.vmp.gb28181.bean.SubscribeInfo; import com.genersoft.iot.vmp.gb28181.service.IPlatformChannelService; +import com.genersoft.iot.vmp.gb28181.service.IPlatformService; import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderForPlatform; import com.genersoft.iot.vmp.service.bean.GPSMsgInfo; import lombok.extern.slf4j.Slf4j; @@ -24,6 +25,9 @@ import java.util.List; @Component public class MobilePositionEventLister implements ApplicationListener { + @Autowired + private IPlatformService platformService; + @Autowired private IPlatformChannelService platformChannelService; @@ -38,9 +42,9 @@ public class MobilePositionEventLister implements ApplicationListener allPlatforms = platformService.queryAll(); // 获取所用订阅 - List platforms = subscribeHolder.getAllMobilePositionSubscribePlatform(); + List platforms = subscribeHolder.getAllMobilePositionSubscribePlatform(allPlatforms); if (platforms.isEmpty()) { return; } @@ -65,4 +69,3 @@ public class MobilePositionEventLister implements ApplicationListener channelList); void checkRegionRemove(List channelList, List regionList); + + List queryByPlatformBySharChannelId(String gbId); } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/IPlatformService.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/IPlatformService.java index a6c2b2466..499c4a2a2 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/IPlatformService.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/IPlatformService.java @@ -85,4 +85,7 @@ public interface IPlatformService { List queryEnablePlatformList(String serverId); void delete(Integer platformId, CommonCallback callback); + + List queryAll(); + } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceServiceImpl.java index c36d94800..46702ad65 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceServiceImpl.java @@ -15,9 +15,11 @@ import com.genersoft.iot.vmp.gb28181.service.IDeviceService; import com.genersoft.iot.vmp.gb28181.service.IInviteStreamService; import com.genersoft.iot.vmp.gb28181.session.AudioBroadcastManager; import com.genersoft.iot.vmp.gb28181.session.SipInviteSessionManager; -import com.genersoft.iot.vmp.gb28181.task.ISubscribeTask; -import com.genersoft.iot.vmp.gb28181.task.impl.CatalogSubscribeTask; -import com.genersoft.iot.vmp.gb28181.task.impl.MobilePositionSubscribeTask; +import com.genersoft.iot.vmp.gb28181.task.SubscribeTask; +import com.genersoft.iot.vmp.gb28181.task.SubscribeTaskInfo; +import com.genersoft.iot.vmp.gb28181.task.SubscribeTaskRunner; +import com.genersoft.iot.vmp.gb28181.task.impl.SubscribeTaskForCatalog; +import com.genersoft.iot.vmp.gb28181.task.impl.SubscribeTaskForMobilPosition; import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander; import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.response.cmd.CatalogResponseMessageHandler; import com.genersoft.iot.vmp.media.bean.MediaServer; @@ -32,13 +34,17 @@ import com.genersoft.iot.vmp.vmanager.bean.ResourceBaseInfo; import com.genersoft.iot.vmp.vmanager.bean.WVPResult; import com.github.pagehelper.PageHelper; import com.github.pagehelper.PageInfo; +import gov.nist.javax.sip.message.SIPResponse; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.CommandLineRunner; +import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import org.springframework.util.Assert; import javax.sip.InvalidArgumentException; +import javax.sip.ResponseEvent; import javax.sip.SipException; import javax.validation.constraints.NotNull; import java.text.ParseException; @@ -53,7 +59,7 @@ import java.util.concurrent.TimeUnit; */ @Slf4j @Service -public class DeviceServiceImpl implements IDeviceService { +public class DeviceServiceImpl implements IDeviceService, CommandLineRunner { @Autowired private DynamicTask dynamicTask; @@ -100,10 +106,46 @@ public class DeviceServiceImpl implements IDeviceService { @Autowired private IRedisRpcService redisRpcService; + @Autowired + private SubscribeTaskRunner subscribeTaskRunner; + private Device getDeviceByDeviceIdFromDb(String deviceId) { return deviceMapper.getDeviceByDeviceId(deviceId); } + @Override + public void run(String... args) throws Exception { + // TODO 处理设备离线 + + // 处理订阅任务 + List taskInfoList = subscribeTaskRunner.getAllTaskInfo(); + if (!taskInfoList.isEmpty()) { + for (SubscribeTaskInfo taskInfo : taskInfoList) { + if (taskInfo == null) { + continue; + } + Device device = getDeviceByDeviceId(taskInfo.getDeviceId()); + if (device == null || !device.isOnLine()) { + subscribeTaskRunner.removeSubscribe(taskInfo.getKey()); + continue; + } + if (SubscribeTaskForCatalog.name.equals(taskInfo.getName())) { + device.setSubscribeCycleForCatalog((int)taskInfo.getExpireTime()); + SubscribeTask subscribeTask = SubscribeTaskForCatalog.getInstance(device, this::catalogSubscribeExpire, taskInfo.getTransactionInfo()); + if (subscribeTask != null) { + subscribeTaskRunner.addSubscribe(subscribeTask); + } + }else if (SubscribeTaskForMobilPosition.name.equals(taskInfo.getName())) { + device.setSubscribeCycleForMobilePosition((int)taskInfo.getExpireTime()); + SubscribeTask subscribeTask = SubscribeTaskForMobilPosition.getInstance(device, this::mobilPositionSubscribeExpire, taskInfo.getTransactionInfo()); + if (subscribeTask != null) { + subscribeTaskRunner.addSubscribe(subscribeTask); + } + } + } + } + } + @Override public void online(Device device, SipTransactionInfo sipTransactionInfo) { log.info("[设备上线] deviceId:{}->{}:{}", device.getDeviceId(), device.getIp(), device.getPort()); @@ -164,12 +206,12 @@ public class DeviceServiceImpl implements IDeviceService { // TODO 如果设备下的通道级联到了其他平台,那么需要发送事件或者notify给上级平台 } // 上线添加订阅 - if (device.getSubscribeCycleForCatalog() > 0) { + if (device.getSubscribeCycleForCatalog() > 0 && !subscribeTaskRunner.containsKey(SubscribeTaskForCatalog.getKey(device))) { // 查询在线设备那些开启了订阅,为设备开启定时的目录订阅 - addCatalogSubscribe(device); + addCatalogSubscribe(device, null); } - if (device.getSubscribeCycleForMobilePosition() > 0) { - addMobilePositionSubscribe(device); + if (device.getSubscribeCycleForMobilePosition() > 0 && !subscribeTaskRunner.containsKey(SubscribeTaskForMobilPosition.getKey(device))) { + addMobilePositionSubscribe(device, null); } if (userSetting.getDeviceStatusNotify()) { // 发送redis消息 @@ -254,20 +296,87 @@ public class DeviceServiceImpl implements IDeviceService { } } + // 订阅丢失检查 + @Scheduled(fixedDelay = 10, timeUnit = TimeUnit.SECONDS) + public void lostCheck(){ + // 获取所有设备 + List deviceList = redisCatchStorage.getAllDevices(); + if (deviceList.isEmpty()) { + return; + } + for (Device device : deviceList) { + if (device == null || !device.isOnLine()) { + continue; + } + if (device.getSubscribeCycleForCatalog() > 0 && !subscribeTaskRunner.containsKey(SubscribeTaskForCatalog.getKey(device))) { + log.info("[订阅丢失] 目录订阅, 编号: {}, 重新发起订阅", device.getDeviceId()); + addCatalogSubscribe(device, null); + } + if (device.getSubscribeCycleForMobilePosition() > 0 && !subscribeTaskRunner.containsKey(SubscribeTaskForMobilPosition.getKey(device))) { + log.info("[订阅丢失] 移动位置订阅, 编号: {}, 重新发起订阅", device.getDeviceId()); + addMobilePositionSubscribe(device, null); + } + } + } + + private void catalogSubscribeExpire(String deviceId, SipTransactionInfo transactionInfo) { + log.info("[目录订阅] 到期, 编号: {}", deviceId); + Device device = getDeviceByDeviceId(deviceId); + if (device == null) { + log.info("[目录订阅] 到期, 编号: {}, 设备不存在, 忽略", deviceId); + return; + } + if (device.getSubscribeCycleForCatalog() > 0) { + addCatalogSubscribe(device, transactionInfo); + } + } + + private void mobilPositionSubscribeExpire(String deviceId, SipTransactionInfo transactionInfo) { + log.info("[移动位置订阅] 到期, 编号: {}", deviceId); + Device device = getDeviceByDeviceId(deviceId); + if (device == null) { + log.info("[移动位置订阅] 到期, 编号: {}, 设备不存在, 忽略", deviceId); + return; + } + if (device.getSubscribeCycleForMobilePosition() > 0) { + addMobilePositionSubscribe(device, transactionInfo); + } + } + @Override - public boolean addCatalogSubscribe(Device device) { + public boolean addCatalogSubscribe(Device device, SipTransactionInfo transactionInfo) { if (device == null || device.getSubscribeCycleForCatalog() < 0) { return false; } - log.info("[添加目录订阅] 设备{}", device.getDeviceId()); - // 添加目录订阅 - CatalogSubscribeTask catalogSubscribeTask = new CatalogSubscribeTask(device, sipCommander, dynamicTask); - // 刷新订阅 - int subscribeCycleForCatalog = Math.max(device.getSubscribeCycleForCatalog(),30); - // 设置最小值为30 - dynamicTask.startCron(device.getDeviceId() + "catalog", catalogSubscribeTask, (subscribeCycleForCatalog -1) * 1000); + log.info("[添加目录订阅] 设备 {}", device.getDeviceId()); + try { + sipCommander.catalogSubscribe(device, transactionInfo, eventResult -> { + ResponseEvent event = (ResponseEvent) eventResult.event; + // 成功 + log.info("[目录订阅]成功: {}", device.getDeviceId()); + if (!subscribeTaskRunner.containsKey(SubscribeTaskForCatalog.getKey(device))) { + SIPResponse response = (SIPResponse) event.getResponse(); + SipTransactionInfo transactionInfoForResonse = new SipTransactionInfo(response); + SubscribeTask subscribeTask = SubscribeTaskForCatalog.getInstance(device, this::catalogSubscribeExpire, transactionInfoForResonse); + if (subscribeTask != null) { + subscribeTaskRunner.addSubscribe(subscribeTask); + } + }else { + subscribeTaskRunner.updateDelay(SubscribeTaskForCatalog.getKey(device), (device.getSubscribeCycleForCatalog() * 1000L - 500L) + System.currentTimeMillis()); + } - catalogSubscribeTask.run(); + },eventResult -> { + // 失败 + log.warn("[目录订阅]失败,信令发送失败: {}-{} ", device.getDeviceId(), eventResult.msg); + }); + } catch (InvalidArgumentException | SipException | ParseException e) { + log.error("[命令发送失败] 目录订阅: {}", e.getMessage()); + + } finally { + // 无论是否发起成功,都保存起来,如果失败后续任务会继续订阅 + deviceMapper.updateSubscribeCatalog(device); + redisCatchStorage.updateDevice(device); + } return true; } @@ -280,72 +389,110 @@ public class DeviceServiceImpl implements IDeviceService { return false; } log.info("[移除目录订阅]: {}", device.getDeviceId()); - String taskKey = device.getDeviceId() + "catalog"; - if (device.isOnLine()) { - Runnable runnable = dynamicTask.get(taskKey); - if (runnable instanceof ISubscribeTask) { - ISubscribeTask subscribeTask = (ISubscribeTask) runnable; - subscribeTask.stop(callback); - }else { - log.info("[移除目录订阅]失败,未找到订阅任务 : {}", device.getDeviceId()); - if (callback != null) { - callback.run(false); - } + device.setSubscribeCycleForCatalog(0); + String key = SubscribeTaskForCatalog.getKey(device); + if (subscribeTaskRunner.containsKey(key)) { + SipTransactionInfo transactionInfo = subscribeTaskRunner.getTransactionInfo(key); + if (transactionInfo == null) { + log.warn("[移除目录订阅] 未找到事务信息,{}", device.getDeviceId()); } - }else { - log.info("[移除移动位置订阅]失败,设备已经离线 : {}", device.getDeviceId()); - if (callback != null) { - callback.run(false); + try { + sipCommander.catalogSubscribe(device, transactionInfo, eventResult -> { + // 成功 + log.info("[取消目录订阅]成功: {}", device.getDeviceId()); + subscribeTaskRunner.removeSubscribe(SubscribeTaskForCatalog.getKey(device)); + if (callback != null) { + callback.run(true); + } + },eventResult -> { + // 失败 + log.warn("[取消目录订阅]失败,信令发送失败: {}-{} ", device.getDeviceId(), eventResult.msg); + }); + }catch (Exception e) { + // 失败 + log.warn("[取消目录订阅]失败: {}-{} ", device.getDeviceId(), e.getMessage()); + }finally { + // 无论是否发起成功,都保存起来,如果失败,到期后将不再发起 + deviceMapper.updateSubscribeCatalog(device); + redisCatchStorage.updateDevice(device); } } - dynamicTask.stop(taskKey); return true; } @Override - public boolean addMobilePositionSubscribe(Device device) { + public boolean addMobilePositionSubscribe(Device device, SipTransactionInfo transactionInfo) { if (device == null || device.getSubscribeCycleForMobilePosition() < 0) { return false; } - log.info("[添加移动位置订阅] 设备{}", device.getDeviceId()); - // 添加目录订阅 - MobilePositionSubscribeTask mobilePositionSubscribeTask = new MobilePositionSubscribeTask(device, sipCommander, dynamicTask); - // 设置最小值为30 - int subscribeCycleForCatalog = Math.max(device.getSubscribeCycleForMobilePosition(),30); - // 刷新订阅 - dynamicTask.startCron(device.getDeviceId() + "mobile_position" , mobilePositionSubscribeTask, subscribeCycleForCatalog * 1000); - mobilePositionSubscribeTask.run(); + log.info("[添加移动位置订阅] 设备 {}", device.getDeviceId()); + try { + sipCommander.mobilePositionSubscribe(device, transactionInfo, eventResult -> { + ResponseEvent event = (ResponseEvent) eventResult.event; + // 成功 + log.info("[移动位置订阅]成功: {}", device.getDeviceId()); + if (!subscribeTaskRunner.containsKey(SubscribeTaskForMobilPosition.getKey(device))) { + SIPResponse response = (SIPResponse) event.getResponse(); + SipTransactionInfo transactionInfoForResonse = new SipTransactionInfo(response); + SubscribeTask subscribeTask = SubscribeTaskForMobilPosition.getInstance(device, this::catalogSubscribeExpire, transactionInfoForResonse); + if (subscribeTask != null) { + subscribeTaskRunner.addSubscribe(subscribeTask); + } + }else { + subscribeTaskRunner.updateDelay(SubscribeTaskForMobilPosition.getKey(device), (device.getSubscribeCycleForCatalog() * 1000L - 500L) + System.currentTimeMillis()); + } + + },eventResult -> { + // 失败 + log.warn("[移动位置订阅]失败,信令发送失败: {}-{} ", device.getDeviceId(), eventResult.msg); + }); + } catch (InvalidArgumentException | SipException | ParseException e) { + log.error("[命令发送失败] 移动位置订阅: {}", e.getMessage()); + }finally { + // 无论是否发起成功,都保存起来,如果失败后续任务会继续订阅 + deviceMapper.updateSubscribeMobilePosition(device); + redisCatchStorage.updateDevice(device); + } return true; } @Override public boolean removeMobilePositionSubscribe(Device device, CommonCallback callback) { - if (device == null || device.getSubscribeCycleForCatalog() < 0) { + if (device == null || device.getSubscribeCycleForMobilePosition() < 0) { if (callback != null) { callback.run(false); } return false; } log.info("[移除移动位置订阅]: {}", device.getDeviceId()); - String taskKey = device.getDeviceId() + "mobile_position"; - if (device.isOnLine()) { - Runnable runnable = dynamicTask.get(taskKey); - if (runnable instanceof ISubscribeTask) { - ISubscribeTask subscribeTask = (ISubscribeTask) runnable; - subscribeTask.stop(callback); - }else { - log.info("[移除移动位置订阅]失败,未找到订阅任务 : {}", device.getDeviceId()); - if (callback != null) { - callback.run(false); - } + device.setSubscribeCycleForMobilePosition(0); + String key = SubscribeTaskForMobilPosition.getKey(device); + if (subscribeTaskRunner.containsKey(key)) { + SipTransactionInfo transactionInfo = subscribeTaskRunner.getTransactionInfo(key); + if (transactionInfo == null) { + log.warn("[移除移动位置订阅] 未找到事务信息,{}", device.getDeviceId()); } - }else { - log.info("[移除移动位置订阅]失败,设备已经离线 : {}", device.getDeviceId()); - if (callback != null) { - callback.run(false); + try { + sipCommander.mobilePositionSubscribe(device, transactionInfo, eventResult -> { + // 成功 + log.info("[取消移动位置订阅]成功: {}", device.getDeviceId()); + subscribeTaskRunner.removeSubscribe(SubscribeTaskForMobilPosition.getKey(device)); + if (callback != null) { + callback.run(true); + } + },eventResult -> { + // 失败 + log.warn("[取消移动位置订阅]失败,信令发送失败: {}-{} ", device.getDeviceId(), eventResult.msg); + }); + }catch (Exception e) { + // 失败 + log.warn("[取消移动位置订阅]失败: {}-{} ", device.getDeviceId(), e.getMessage()); + }finally { + // 无论是否发起成功,都保存起来,如果失败,到期后将不再发起 + deviceMapper.updateSubscribeMobilePosition(device); + redisCatchStorage.updateDevice(device); } } - dynamicTask.stop(taskKey); return true; } @@ -499,10 +646,20 @@ public class DeviceServiceImpl implements IDeviceService { public boolean delete(String deviceId) { Device device = getDeviceByDeviceIdFromDb(deviceId); Assert.notNull(device, "未找到设备"); + if (device.getSubscribeCycleForCatalog() > 0) { + removeCatalogSubscribe(device, null); + } + if (device.getSubscribeCycleForMobilePosition() > 0) { + removeMobilePositionSubscribe(device, null); + } + // 停止状态检测 + String registerExpireTaskKey = VideoManagerConstants.REGISTER_EXPIRE_TASK_KEY_PREFIX + device.getDeviceId(); + dynamicTask.stop(registerExpireTaskKey); platformChannelMapper.delChannelForDeviceId(deviceId); deviceChannelMapper.cleanChannelsByDeviceId(device.getId()); deviceMapper.del(deviceId); redisCatchStorage.removeDevice(deviceId); + inviteStreamService.clearInviteInfo(deviceId); return true; } @@ -564,16 +721,13 @@ public class DeviceServiceImpl implements IDeviceService { device.setSubscribeCycleForCatalog(cycle); if (cycle > 0) { // 开启订阅 - addCatalogSubscribe(device); + addCatalogSubscribe(device, null); } - // 因为是异步执行,需要在这里更新下数据 - deviceMapper.updateSubscribeCatalog(device); - redisCatchStorage.updateDevice(device); }); }else { // 开启订阅 device.setSubscribeCycleForCatalog(cycle); - addCatalogSubscribe(device); + addCatalogSubscribe(device, null); deviceMapper.updateSubscribeCatalog(device); redisCatchStorage.updateDevice(device); } @@ -598,21 +752,15 @@ public class DeviceServiceImpl implements IDeviceService { device.setSubscribeCycleForMobilePosition(cycle); device.setMobilePositionSubmissionInterval(interval); if (cycle > 0) { - addMobilePositionSubscribe(device); + addMobilePositionSubscribe(device, null); } - // 因为是异步执行,需要在这里更新下数据 - deviceMapper.updateSubscribeMobilePosition(device); - redisCatchStorage.updateDevice(device); }); }else { // 订阅未开启 device.setSubscribeCycleForMobilePosition(cycle); device.setMobilePositionSubmissionInterval(interval); // 开启订阅 - addMobilePositionSubscribe(device); - // 因为是异步执行,需要在这里更新下数据 - deviceMapper.updateSubscribeMobilePosition(device); - redisCatchStorage.updateDevice(device); + addMobilePositionSubscribe(device, null); } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlatformChannelServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlatformChannelServiceImpl.java index ff11f2ef7..f4bd244a2 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlatformChannelServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlatformChannelServiceImpl.java @@ -601,4 +601,12 @@ public class PlatformChannelServiceImpl implements IPlatformChannelService { public List queryChannelByPlatformIdAndChannelIds(Integer platformId, List channelIds) { return platformChannelMapper.queryShare(platformId, channelIds); } + + @Override + public List queryByPlatformBySharChannelId(String channelDeviceId) { + CommonGBChannel commonGBChannel = commonGBChannelMapper.queryByDeviceId(channelDeviceId); + ArrayList ids = new ArrayList<>(); + ids.add(commonGBChannel.getGbId()); + return platformChannelMapper.queryPlatFormListByChannelList(ids); + } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlatformServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlatformServiceImpl.java index 8f5f6223f..ac6a4cf93 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlatformServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlatformServiceImpl.java @@ -1,6 +1,5 @@ package com.genersoft.iot.vmp.gb28181.service.impl; -import com.genersoft.iot.vmp.common.InviteInfo; import com.genersoft.iot.vmp.common.*; import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.conf.SipConfig; @@ -950,4 +949,9 @@ public class PlatformServiceImpl implements IPlatformService { // 删除平台信息 platformMapper.delete(platform.getId()); } + + @Override + public List queryAll() { + return platformMapper.queryAll(); + } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/task/ISubscribeTask.java b/src/main/java/com/genersoft/iot/vmp/gb28181/task/ISubscribeTask.java deleted file mode 100755 index 8d1c7d2ee..000000000 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/task/ISubscribeTask.java +++ /dev/null @@ -1,10 +0,0 @@ -package com.genersoft.iot.vmp.gb28181.task; - -import com.genersoft.iot.vmp.common.CommonCallback; - -/** - * @author lin - */ -public interface ISubscribeTask extends Runnable{ - void stop(CommonCallback callback); -} diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/task/SubscribeTask.java b/src/main/java/com/genersoft/iot/vmp/gb28181/task/SubscribeTask.java new file mode 100644 index 000000000..6af18a451 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/task/SubscribeTask.java @@ -0,0 +1,49 @@ +package com.genersoft.iot.vmp.gb28181.task; + +import com.genersoft.iot.vmp.common.SubscribeCallback; +import com.genersoft.iot.vmp.gb28181.bean.SipTransactionInfo; +import lombok.Data; +import org.jetbrains.annotations.NotNull; + +import java.util.concurrent.Delayed; +import java.util.concurrent.TimeUnit; + +@Data +public abstract class SubscribeTask implements Delayed { + + private String deviceId; + + private SubscribeCallback callback; + + private SipTransactionInfo transactionInfo; + + /** + * 超时时间(单位: 毫秒) + */ + private long delayTime; + + public abstract void expired(); + + public abstract String getKey(); + + public abstract String getName(); + + @Override + public long getDelay(@NotNull TimeUnit unit) { + return unit.convert(delayTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS); + } + + @Override + public int compareTo(@NotNull Delayed o) { + return (int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS)); + } + + public SubscribeTaskInfo getInfo(){ + SubscribeTaskInfo subscribeTaskInfo = new SubscribeTaskInfo(); + subscribeTaskInfo.setName(getName()); + subscribeTaskInfo.setTransactionInfo(transactionInfo); + subscribeTaskInfo.setDeviceId(deviceId); + subscribeTaskInfo.setKey(getKey()); + return subscribeTaskInfo; + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/task/SubscribeTaskInfo.java b/src/main/java/com/genersoft/iot/vmp/gb28181/task/SubscribeTaskInfo.java new file mode 100644 index 000000000..8b93e8528 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/task/SubscribeTaskInfo.java @@ -0,0 +1,22 @@ +package com.genersoft.iot.vmp.gb28181.task; + +import com.genersoft.iot.vmp.gb28181.bean.SipTransactionInfo; +import lombok.Data; + +@Data +public class SubscribeTaskInfo { + + private String deviceId; + + private SipTransactionInfo transactionInfo; + + private String name; + + private String key; + + /** + * 过期时间 + */ + private long expireTime; + +} diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/task/SubscribeTaskRunner.java b/src/main/java/com/genersoft/iot/vmp/gb28181/task/SubscribeTaskRunner.java new file mode 100644 index 000000000..bd53dc49b --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/task/SubscribeTaskRunner.java @@ -0,0 +1,135 @@ +package com.genersoft.iot.vmp.gb28181.task; + +import com.genersoft.iot.vmp.conf.UserSetting; +import com.genersoft.iot.vmp.gb28181.bean.SipTransactionInfo; +import com.genersoft.iot.vmp.utils.redis.RedisUtil; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Component; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.DelayQueue; +import java.util.concurrent.TimeUnit; + +@Slf4j +@Component +public class SubscribeTaskRunner{ + + private final Map subscribes = new ConcurrentHashMap<>(); + + private final DelayQueue delayQueue = new DelayQueue<>(); + + @Autowired + private RedisTemplate redisTemplate; + + @Autowired + private UserSetting userSetting; + + private final String prefix = "VMP_DEVICE_SUBSCRIBE"; + + // 订阅过期检查 + @Scheduled(fixedDelay = 500, timeUnit = TimeUnit.MILLISECONDS) + public void expirationCheck(){ + while (!delayQueue.isEmpty()) { + SubscribeTask take = null; + try { + take = delayQueue.take(); + try { + removeSubscribe(take.getKey()); + take.expired(); + }catch (Exception e) { + log.error("[设备订阅到期] {} 到期处理时出现异常, 设备编号: {} ", take.getName(), take.getDeviceId()); + } + } catch (InterruptedException e) { + log.error("[设备订阅任务] ", e); + } + } + } + + public void addSubscribe(SubscribeTask task) { + Duration duration = Duration.ofSeconds((task.getDelayTime() - System.currentTimeMillis())/1000); + if (duration.getSeconds() < 0) { + return; + } + subscribes.put(task.getKey(), task); + String key = String.format("%s_%s_%s", prefix, userSetting.getServerId(), task.getKey()); + redisTemplate.opsForValue().set(key, task.getInfo(), duration); + delayQueue.offer(task); + } + + public boolean removeSubscribe(String key) { + SubscribeTask task = subscribes.get(key); + if (task == null) { + return false; + } + String redisKey = String.format("%s_%s_%s", prefix, userSetting.getServerId(), task.getKey()); + redisTemplate.delete(redisKey); + subscribes.remove(key); + if (delayQueue.contains(task)) { + boolean remove = delayQueue.remove(task); + if (!remove) { + log.info("[移除订阅任务] 从延时队列内移除失败: {}", key); + } + } + return true; + } + + public SipTransactionInfo getTransactionInfo(String key) { + SubscribeTask task = subscribes.get(key); + if (task == null) { + return null; + } + return task.getTransactionInfo(); + } + + public boolean updateDelay(String key, long expirationTime) { + SubscribeTask task = subscribes.get(key); + if (task == null) { + return false; + } + log.info("[更新订阅任务时间] {}, 编号: {}", task.getName(), key); + if (delayQueue.contains(task)) { + boolean remove = delayQueue.remove(task); + if (!remove) { + log.info("[更新订阅任务时间] 从延时队列内移除失败: {}", key); + } + } + task.setDelayTime(expirationTime); + delayQueue.offer(task); + String redisKey = String.format("%s_%s_%s", prefix, userSetting.getServerId(), task.getKey()); + Duration duration = Duration.ofSeconds((expirationTime - System.currentTimeMillis())/1000); + redisTemplate.expire(redisKey, duration); + return true; + } + + public boolean containsKey(String key) { + return subscribes.containsKey(key); + } + + public List getAllTaskInfo(){ + String scanKey = String.format("%s_%s_*", prefix, userSetting.getServerId()); + List values = RedisUtil.scan(redisTemplate, scanKey); + if (values.isEmpty()) { + return new ArrayList<>(); + } + List result = new ArrayList<>(); + for (Object value : values) { + String redisKey = (String)value; + SubscribeTaskInfo taskInfo = (SubscribeTaskInfo)redisTemplate.opsForValue().get(redisKey); + if (taskInfo == null) { + continue; + } + Long expire = redisTemplate.getExpire(redisKey); + taskInfo.setExpireTime(expire); + result.add(taskInfo); + } + return result; + + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/CatalogSubscribeTask.java b/src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/CatalogSubscribeTask.java deleted file mode 100755 index e3f191242..000000000 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/CatalogSubscribeTask.java +++ /dev/null @@ -1,107 +0,0 @@ -package com.genersoft.iot.vmp.gb28181.task.impl; - -import com.genersoft.iot.vmp.common.CommonCallback; -import com.genersoft.iot.vmp.conf.DynamicTask; -import com.genersoft.iot.vmp.gb28181.bean.Device; -import com.genersoft.iot.vmp.gb28181.task.ISubscribeTask; -import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander; -import gov.nist.javax.sip.message.SIPRequest; -import lombok.extern.slf4j.Slf4j; - -import javax.sip.DialogState; -import javax.sip.InvalidArgumentException; -import javax.sip.ResponseEvent; -import javax.sip.SipException; -import javax.sip.header.ToHeader; -import java.text.ParseException; - -/** - * 目录订阅任务 - * @author lin - */ -@Slf4j -public class CatalogSubscribeTask implements ISubscribeTask { - private final Device device; - private final ISIPCommander sipCommander; - private SIPRequest request; - - private final DynamicTask dynamicTask; - - private final String taskKey = "catalog-subscribe-timeout"; - - - public CatalogSubscribeTask(Device device, ISIPCommander sipCommander, DynamicTask dynamicTask) { - this.device = device; - this.sipCommander = sipCommander; - this.dynamicTask = dynamicTask; - } - - @Override - public void run() { - if (dynamicTask.get(taskKey) != null) { - dynamicTask.stop(taskKey); - } - SIPRequest sipRequest = null; - try { - sipRequest = sipCommander.catalogSubscribe(device, request, eventResult -> { - ResponseEvent event = (ResponseEvent) eventResult.event; - // 成功 - log.info("[目录订阅]成功: {}", device.getDeviceId()); - ToHeader toHeader = (ToHeader)event.getResponse().getHeader(ToHeader.NAME); - try { - this.request.getToHeader().setTag(toHeader.getTag()); - } catch (ParseException e) { - log.info("[目录订阅]成功: 但为request设置ToTag失败"); - this.request = null; - } - },eventResult -> { - this.request = null; - // 失败 - log.warn("[目录订阅]失败,信令发送失败: {}-{} ", device.getDeviceId(), eventResult.msg); - dynamicTask.startDelay(taskKey, CatalogSubscribeTask.this, 2000); - }); - } catch (InvalidArgumentException | SipException | ParseException e) { - log.error("[命令发送失败] 目录订阅: {}", e.getMessage()); - - } - if (sipRequest != null) { - this.request = sipRequest; - } - } - - @Override - public void stop(CommonCallback callback) { - /** - * dialog 的各个状态 - * EARLY-> Early state状态-初始请求发送以后,收到了一个临时响应消息 - * CONFIRMED-> Confirmed Dialog状态-已确认 - * COMPLETED-> Completed Dialog状态-已完成 - * TERMINATED-> Terminated Dialog状态-终止 - */ - log.info("取消目录订阅时dialog状态为{}", DialogState.CONFIRMED); - if (dynamicTask.get(taskKey) != null) { - dynamicTask.stop(taskKey); - } - device.setSubscribeCycleForCatalog(0); - try { - sipCommander.catalogSubscribe(device, request, eventResult -> { - ResponseEvent event = (ResponseEvent) eventResult.event; - if (event.getResponse().getRawContent() != null) { - // 成功 - log.info("[取消目录订阅]成功: {}", device.getDeviceId()); - }else { - // 成功 - log.info("[取消目录订阅]成功: {}", device.getDeviceId()); - } - if (callback != null) { - callback.run(event.getResponse().getRawContent() != null); - } - },eventResult -> { - // 失败 - log.warn("[取消目录订阅]失败,信令发送失败: {}-{} ", device.getDeviceId(), eventResult.msg); - }); - } catch (InvalidArgumentException | SipException | ParseException e) { - log.error("[命令发送失败] 取消目录订阅: {}", e.getMessage()); - } - } -} diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/MobilePositionSubscribeTask.java b/src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/MobilePositionSubscribeTask.java deleted file mode 100755 index 646b31eac..000000000 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/MobilePositionSubscribeTask.java +++ /dev/null @@ -1,103 +0,0 @@ -package com.genersoft.iot.vmp.gb28181.task.impl; - -import com.genersoft.iot.vmp.common.CommonCallback; -import com.genersoft.iot.vmp.conf.DynamicTask; -import com.genersoft.iot.vmp.gb28181.bean.Device; -import com.genersoft.iot.vmp.gb28181.task.ISubscribeTask; -import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander; -import gov.nist.javax.sip.message.SIPRequest; -import lombok.extern.slf4j.Slf4j; - -import javax.sip.InvalidArgumentException; -import javax.sip.ResponseEvent; -import javax.sip.SipException; -import javax.sip.header.ToHeader; -import java.text.ParseException; - -/** - * 移动位置订阅的定时更新 - * @author lin - */ -@Slf4j -public class MobilePositionSubscribeTask implements ISubscribeTask { - private final Device device; - private final ISIPCommander sipCommander; - - private SIPRequest request; - private final DynamicTask dynamicTask; - private final String taskKey = "mobile-position-subscribe-timeout"; - - public MobilePositionSubscribeTask(Device device, ISIPCommander sipCommander, DynamicTask dynamicTask) { - this.device = device; - this.sipCommander = sipCommander; - this.dynamicTask = dynamicTask; - } - - @Override - public void run() { - if (dynamicTask.get(taskKey) != null) { - dynamicTask.stop(taskKey); - } - SIPRequest sipRequest = null; - try { - sipRequest = sipCommander.mobilePositionSubscribe(device, request, eventResult -> { - // 成功 - log.info("[移动位置订阅]成功: {}", device.getDeviceId()); - ResponseEvent event = (ResponseEvent) eventResult.event; - ToHeader toHeader = (ToHeader)event.getResponse().getHeader(ToHeader.NAME); - try { - this.request.getToHeader().setTag(toHeader.getTag()); - } catch (ParseException e) { - log.info("[移动位置订阅]成功: 为request设置ToTag失败"); - this.request = null; - } - },eventResult -> { - this.request = null; - // 失败 - log.warn("[移动位置订阅]失败,信令发送失败: {}-{} ", device.getDeviceId(), eventResult.msg); - dynamicTask.startDelay(taskKey, MobilePositionSubscribeTask.this, 2000); - }); - } catch (InvalidArgumentException | SipException | ParseException e) { - log.error("[命令发送失败] 移动位置订阅: {}", e.getMessage()); - } - if (sipRequest != null) { - this.request = sipRequest; - } - - } - - @Override - public void stop(CommonCallback callback) { - /** - * dialog 的各个状态 - * EARLY-> Early state状态-初始请求发送以后,收到了一个临时响应消息 - * CONFIRMED-> Confirmed Dialog状态-已确认 - * COMPLETED-> Completed Dialog状态-已完成 - * TERMINATED-> Terminated Dialog状态-终止 - */ - if (dynamicTask.get(taskKey) != null) { - dynamicTask.stop(taskKey); - } - device.setSubscribeCycleForMobilePosition(0); - try { - sipCommander.mobilePositionSubscribe(device, request, eventResult -> { - ResponseEvent event = (ResponseEvent) eventResult.event; - if (event.getResponse().getRawContent() != null) { - // 成功 - log.info("[取消移动位置订阅]成功: {}", device.getDeviceId()); - }else { - // 成功 - log.info("[取消移动位置订阅]成功: {}", device.getDeviceId()); - } - if (callback != null) { - callback.run(event.getResponse().getRawContent() != null); - } - },eventResult -> { - // 失败 - log.warn("[取消移动位置订阅]失败,信令发送失败: {}-{} ", device.getDeviceId(), eventResult.msg); - }); - } catch (InvalidArgumentException | SipException | ParseException e) { - log.error("[命令发送失败] 取消移动位置订阅: {}", e.getMessage()); - } - } -} diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/SubscribeTaskForCatalog.java b/src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/SubscribeTaskForCatalog.java new file mode 100644 index 000000000..a0f1b2b1c --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/SubscribeTaskForCatalog.java @@ -0,0 +1,48 @@ +package com.genersoft.iot.vmp.gb28181.task.impl; + +import com.genersoft.iot.vmp.common.SubscribeCallback; +import com.genersoft.iot.vmp.gb28181.bean.Device; +import com.genersoft.iot.vmp.gb28181.bean.SipTransactionInfo; +import com.genersoft.iot.vmp.gb28181.task.SubscribeTask; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class SubscribeTaskForCatalog extends SubscribeTask { + + public static final String name = "catalog"; + + public static SubscribeTask getInstance(Device device, SubscribeCallback callback, SipTransactionInfo transactionInfo) { + if (device.getSubscribeCycleForCatalog() <= 0) { + return null; + } + SubscribeTaskForCatalog subscribeTaskForCatalog = new SubscribeTaskForCatalog(); + subscribeTaskForCatalog.setDelayTime((device.getSubscribeCycleForCatalog() * 1000L - 500L) + System.currentTimeMillis()); + subscribeTaskForCatalog.setDeviceId(device.getDeviceId()); + subscribeTaskForCatalog.setCallback(callback); + subscribeTaskForCatalog.setTransactionInfo(transactionInfo); + return subscribeTaskForCatalog; + } + + @Override + public void expired() { + if (super.getCallback() == null) { + log.info("[设备订阅到期] 目录订阅 未找到到期处理回调, 编号: {}", getDeviceId()); + return; + } + getCallback().run(getDeviceId(), getTransactionInfo()); + } + + @Override + public String getKey() { + return String.format("%s_%s", name, getDeviceId()); + } + + @Override + public String getName() { + return name; + } + + public static String getKey(Device device) { + return String.format("%s_%s", SubscribeTaskForCatalog.name, device.getDeviceId()); + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/SubscribeTaskForMobilPosition.java b/src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/SubscribeTaskForMobilPosition.java new file mode 100644 index 000000000..26a3afc53 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/SubscribeTaskForMobilPosition.java @@ -0,0 +1,48 @@ +package com.genersoft.iot.vmp.gb28181.task.impl; + +import com.genersoft.iot.vmp.common.SubscribeCallback; +import com.genersoft.iot.vmp.gb28181.bean.Device; +import com.genersoft.iot.vmp.gb28181.bean.SipTransactionInfo; +import com.genersoft.iot.vmp.gb28181.task.SubscribeTask; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class SubscribeTaskForMobilPosition extends SubscribeTask { + + public static final String name = "mobilPosition"; + + public static SubscribeTask getInstance(Device device, SubscribeCallback callback, SipTransactionInfo transactionInfo) { + if (device.getSubscribeCycleForCatalog() <= 0) { + return null; + } + SubscribeTaskForMobilPosition subscribeTaskForMobilPosition = new SubscribeTaskForMobilPosition(); + subscribeTaskForMobilPosition.setDelayTime((device.getSubscribeCycleForMobilePosition() * 1000L - 500L) + System.currentTimeMillis()); + subscribeTaskForMobilPosition.setDeviceId(device.getDeviceId()); + subscribeTaskForMobilPosition.setCallback(callback); + subscribeTaskForMobilPosition.setTransactionInfo(transactionInfo); + return subscribeTaskForMobilPosition; + } + + @Override + public void expired() { + if (super.getCallback() == null) { + log.info("[设备订阅到期] 移动位置订阅 未找到到期处理回调, 编号: {}", getDeviceId()); + return; + } + getCallback().run(getDeviceId(), getTransactionInfo()); + } + + @Override + public String getKey() { + return String.format("%s_%s", name, getDeviceId()); + } + + @Override + public String getName() { + return name; + } + + public static String getKey(Device device) { + return String.format("%s_%s", SubscribeTaskForMobilPosition.name, device.getDeviceId()); + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommander.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommander.java index 784a73c64..4d252d5d2 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommander.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommander.java @@ -17,16 +17,16 @@ import javax.sip.InvalidArgumentException; import javax.sip.SipException; import java.text.ParseException; -/** - * @description:设备能力接口,用于定义设备的控制、查询能力 +/** + * @description:设备能力接口,用于定义设备的控制、查询能力 * @author: swwheihei - * @date: 2020年5月3日 下午9:16:34 + * @date: 2020年5月3日 下午9:16:34 */ public interface ISIPCommander { /** * 云台控制,支持方向与缩放控制 - * + * * @param device 控制设备 * @param channelId 预览通道 * @param leftRight 镜头左移右移 0:停止 1:左移 2:右移 @@ -36,10 +36,10 @@ public interface ISIPCommander { * @param zoomSpeed 镜头缩放速度 */ void ptzCmd(Device device,String channelId,int leftRight, int upDown, int inOut, int moveSpeed, int zoomSpeed) throws InvalidArgumentException, SipException, ParseException; - + /** * 前端控制,包括PTZ指令、FI指令、预置位指令、巡航指令、扫描指令和辅助开关指令 - * + * * @param device 控制设备 * @param channelId 预览通道 * @param cmdCode 指令码 @@ -48,7 +48,7 @@ public interface ISIPCommander { * @param combineCode2 组合码2 */ void frontEndCmd(Device device, String channelId, int cmdCode, int parameter1, int parameter2, int combineCode2) throws SipException, InvalidArgumentException, ParseException; - + /** * 前端控制指令(用于转发上级指令) * @param device 控制设备 @@ -66,7 +66,7 @@ public interface ISIPCommander { /** * 请求回放视频流 - * + * * @param device 视频设备 * @param channel 预览通道 * @param startTime 开始时间,格式要求:yyyy-MM-dd HH:mm:ss @@ -76,13 +76,13 @@ public interface ISIPCommander { /** * 请求历史媒体下载 - * + * * @param device 视频设备 * @param channel 预览通道 * @param startTime 开始时间,格式要求:yyyy-MM-dd HH:mm:ss * @param endTime 结束时间,格式要求:yyyy-MM-dd HH:mm:ss * @param downloadSpeed 下载倍速参数 - */ + */ void downloadStreamCmd(MediaServer mediaServerItem, SSRCInfo ssrcInfo, Device device, DeviceChannel channel, String startTime, String endTime, int downloadSpeed, SipSubscribe.Event errorEvent, SipSubscribe.Event okEvent, Long timeout) throws InvalidArgumentException, SipException, ParseException; @@ -116,7 +116,7 @@ public interface ISIPCommander { * 回放倍速播放 */ void playSpeedCmd(Device device, DeviceChannel channel, StreamInfo streamInfo, Double speed) throws InvalidArgumentException, ParseException, SipException; - + /** * 回放控制 * @param device @@ -138,39 +138,39 @@ public interface ISIPCommander { /** * 音视频录像控制 - * + * * @param device 视频设备 * @param channelId 预览通道 * @param recordCmdStr 录像命令:Record / StopRecord */ void recordCmd(Device device, String channelId, String recordCmdStr, ErrorCallback callback) throws InvalidArgumentException, SipException, ParseException; - + /** * 远程启动控制命令 - * + * * @param device 视频设备 */ void teleBootCmd(Device device) throws InvalidArgumentException, SipException, ParseException; /** * 报警布防/撤防命令 - * + * * @param device 视频设备 */ void guardCmd(Device device, String guardCmdStr, ErrorCallback callback) throws InvalidArgumentException, SipException, ParseException; - + /** * 报警复位命令 - * + * * @param device 视频设备 * @param alarmMethod 报警方式(可选) * @param alarmType 报警类型(可选) */ void alarmResetCmd(Device device, String alarmMethod, String alarmType, ErrorCallback callback) throws InvalidArgumentException, SipException, ParseException; - + /** * 强制关键帧命令,设备收到此命令应立刻发送一个IDR帧 - * + * * @param device 视频设备 * @param channelId 预览通道 */ @@ -184,11 +184,11 @@ public interface ISIPCommander { /** * 设备配置命令 - * + * * @param device 视频设备 */ void deviceConfigCmd(Device device); - + /** * 设备配置命令:basicParam */ @@ -196,11 +196,11 @@ public interface ISIPCommander { /** * 查询设备状态 - * + * * @param device 视频设备 */ void deviceStatusQuery(Device device, ErrorCallback callback) throws InvalidArgumentException, SipException, ParseException; - + /** * 查询设备信息 * @@ -209,27 +209,27 @@ public interface ISIPCommander { * @return */ void deviceInfoQuery(Device device, ErrorCallback callback) throws InvalidArgumentException, SipException, ParseException; - + /** * 查询目录列表 - * + * * @param device 视频设备 */ void catalogQuery(Device device, int sn, SipSubscribe.Event errorEvent) throws SipException, InvalidArgumentException, ParseException; - + /** * 查询录像信息 - * + * * @param device 视频设备 * @param startTime 开始时间,格式要求:yyyy-MM-dd HH:mm:ss * @param endTime 结束时间,格式要求:yyyy-MM-dd HH:mm:ss * @param sn */ void recordInfoQuery(Device device, String channelId, String startTime, String endTime, int sn, Integer Secrecy, String type, SipSubscribe.Event okEvent, SipSubscribe.Event errorEvent) throws InvalidArgumentException, SipException, ParseException; - + /** * 查询报警信息 - * + * * @param device 视频设备 * @param startPriority 报警起始级别(可选) * @param endPriority 报警终止级别(可选) @@ -241,37 +241,37 @@ public interface ISIPCommander { */ void alarmInfoQuery(Device device, String startPriority, String endPriority, String alarmMethod, String alarmType, String startTime, String endTime, ErrorCallback callback) throws InvalidArgumentException, SipException, ParseException; - + /** * 查询设备配置 - * + * * @param device 视频设备 * @param channelId 通道编码(可选) * @param configType 配置类型: */ void deviceConfigQuery(Device device, String channelId, String configType, ErrorCallback callback) throws InvalidArgumentException, SipException, ParseException; - + /** * 查询设备预置位置 - * + * * @param device 视频设备 */ void presetQuery(Device device, String channelId, ErrorCallback callback) throws InvalidArgumentException, SipException, ParseException; - + /** * 查询移动设备位置数据 - * + * * @param device 视频设备 */ void mobilePostitionQuery(Device device, SipSubscribe.Event errorEvent) throws InvalidArgumentException, SipException, ParseException; /** * 订阅、取消订阅移动位置 - * + * * @param device 视频设备 * @return true = 命令发送成功 */ - SIPRequest mobilePositionSubscribe(Device device, SIPRequest request, SipSubscribe.Event okEvent , SipSubscribe.Event errorEvent) throws InvalidArgumentException, SipException, ParseException; + SIPRequest mobilePositionSubscribe(Device device, SipTransactionInfo transactionInfo, SipSubscribe.Event okEvent , SipSubscribe.Event errorEvent) throws InvalidArgumentException, SipException, ParseException; /** * 订阅、取消订阅报警信息 @@ -290,7 +290,7 @@ public interface ISIPCommander { * @param device 视频设备 * @return true = 命令发送成功 */ - SIPRequest catalogSubscribe(Device device, SIPRequest request, SipSubscribe.Event okEvent ,SipSubscribe.Event errorEvent) throws InvalidArgumentException, SipException, ParseException; + SIPRequest catalogSubscribe(Device device, SipTransactionInfo transactionInfo, SipSubscribe.Event okEvent ,SipSubscribe.Event errorEvent) throws InvalidArgumentException, SipException, ParseException; /** * 拉框控制命令 diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderProvider.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderProvider.java index 7bfd91bfa..3c084f7ce 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderProvider.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderProvider.java @@ -33,7 +33,7 @@ public class SIPRequestHeaderProvider { @Autowired private SipConfig sipConfig; - + @Autowired private SipLayer sipLayer; @@ -43,7 +43,7 @@ public class SIPRequestHeaderProvider { @Autowired private IRedisCatchStorage redisCatchStorage; - + public Request createMessageRequest(Device device, String content, String viaTag, String fromTag, String toTag, CallIdHeader callIdHeader) throws ParseException, InvalidArgumentException, PeerUnavailableException { Request request = null; // sipuri @@ -76,7 +76,7 @@ public class SIPRequestHeaderProvider { request.setContent(content, contentTypeHeader); return request; } - + public Request createInviteRequest(Device device, String channelId, String content, String viaTag, String fromTag, String toTag, String ssrc, CallIdHeader callIdHeader) throws ParseException, InvalidArgumentException, PeerUnavailableException { Request request = null; //请求行 @@ -96,10 +96,10 @@ public class SIPRequestHeaderProvider { SipURI toSipURI = SipFactory.getInstance().createAddressFactory().createSipURI(channelId, device.getHostAddress()); Address toAddress = SipFactory.getInstance().createAddressFactory().createAddress(toSipURI); ToHeader toHeader = SipFactory.getInstance().createHeaderFactory().createToHeader(toAddress,null); - + //Forwards MaxForwardsHeader maxForwards = SipFactory.getInstance().createHeaderFactory().createMaxForwardsHeader(70); - + //ceq CSeqHeader cSeqHeader = SipFactory.getInstance().createHeaderFactory().createCSeqHeader(redisCatchStorage.getCSEQ(), Request.INVITE); request = SipFactory.getInstance().createMessageFactory().createRequest(requestLine, Request.INVITE, callIdHeader, cSeqHeader,fromHeader, toHeader, viaHeaders, maxForwards); @@ -116,7 +116,7 @@ public class SIPRequestHeaderProvider { request.setContent(content, contentTypeHeader); return request; } - + public Request createPlaybackInviteRequest(Device device, String channelId, String content, String viaTag, String fromTag, String toTag, CallIdHeader callIdHeader, String ssrc) throws ParseException, InvalidArgumentException, PeerUnavailableException { Request request = null; //请求行 @@ -134,14 +134,14 @@ public class SIPRequestHeaderProvider { SipURI toSipURI = SipFactory.getInstance().createAddressFactory().createSipURI(channelId, device.getHostAddress()); Address toAddress = SipFactory.getInstance().createAddressFactory().createAddress(toSipURI); ToHeader toHeader = SipFactory.getInstance().createHeaderFactory().createToHeader(toAddress,null); - + //Forwards MaxForwardsHeader maxForwards = SipFactory.getInstance().createHeaderFactory().createMaxForwardsHeader(70); - + //ceq CSeqHeader cSeqHeader = SipFactory.getInstance().createHeaderFactory().createCSeqHeader(redisCatchStorage.getCSEQ(), Request.INVITE); request = SipFactory.getInstance().createMessageFactory().createRequest(requestLine, Request.INVITE, callIdHeader, cSeqHeader,fromHeader, toHeader, viaHeaders, maxForwards); - + Address concatAddress = SipFactory.getInstance().createAddressFactory().createAddress(SipFactory.getInstance().createAddressFactory().createSipURI(sipConfig.getId(), sipLayer.getLocalIp(device.getLocalIp())+":"+sipConfig.getPort())); // Address concatAddress = SipFactory.getInstance().createAddressFactory().createAddress(SipFactory.getInstance().createAddressFactory().createSipURI(sipConfig.getId(), device.getHost().getIp()+":"+device.getHost().getPort())); request.addHeader(SipFactory.getInstance().createHeaderFactory().createContactHeader(concatAddress)); @@ -231,7 +231,7 @@ public class SIPRequestHeaderProvider { return request; } - public Request createSubscribeRequest(Device device, String content, SIPRequest requestOld, Integer expires, String event, CallIdHeader callIdHeader) throws ParseException, InvalidArgumentException, PeerUnavailableException { + public Request createSubscribeRequest(Device device, String content, SipTransactionInfo sipTransactionInfo, Integer expires, String event, CallIdHeader callIdHeader) throws ParseException, InvalidArgumentException, PeerUnavailableException { Request request = null; // sipuri SipURI requestURI = SipFactory.getInstance().createAddressFactory().createSipURI(device.getDeviceId(), device.getHostAddress()); @@ -244,11 +244,11 @@ public class SIPRequestHeaderProvider { // from SipURI fromSipURI = SipFactory.getInstance().createAddressFactory().createSipURI(sipConfig.getId(), sipConfig.getDomain()); Address fromAddress = SipFactory.getInstance().createAddressFactory().createAddress(fromSipURI); - FromHeader fromHeader = SipFactory.getInstance().createHeaderFactory().createFromHeader(fromAddress, requestOld == null ? SipUtils.getNewFromTag() :requestOld.getFromTag()); + FromHeader fromHeader = SipFactory.getInstance().createHeaderFactory().createFromHeader(fromAddress, sipTransactionInfo == null ? SipUtils.getNewFromTag() :sipTransactionInfo.getFromTag()); // to SipURI toSipURI = SipFactory.getInstance().createAddressFactory().createSipURI(device.getDeviceId(), device.getHostAddress()); Address toAddress = SipFactory.getInstance().createAddressFactory().createAddress(toSipURI); - ToHeader toHeader = SipFactory.getInstance().createHeaderFactory().createToHeader(toAddress, requestOld == null ? null :requestOld.getToTag()); + ToHeader toHeader = SipFactory.getInstance().createHeaderFactory().createToHeader(toAddress, sipTransactionInfo == null ? null :sipTransactionInfo.getToTag()); // Forwards MaxForwardsHeader maxForwards = SipFactory.getInstance().createHeaderFactory().createMaxForwardsHeader(70); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java index 8e607ecc2..fd53e18e9 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java @@ -1179,7 +1179,7 @@ public class SIPCommander implements ISIPCommander { * @return true = 命令发送成功 */ @Override - public SIPRequest mobilePositionSubscribe(Device device, SIPRequest requestOld, SipSubscribe.Event okEvent, SipSubscribe.Event errorEvent) throws InvalidArgumentException, SipException, ParseException { + public SIPRequest mobilePositionSubscribe(Device device, SipTransactionInfo sipTransactionInfo, SipSubscribe.Event okEvent, SipSubscribe.Event errorEvent) throws InvalidArgumentException, SipException, ParseException { StringBuffer subscribePostitionXml = new StringBuffer(200); String charset = device.getCharset(); @@ -1197,12 +1197,12 @@ public class SIPCommander implements ISIPCommander { CallIdHeader callIdHeader; - if (requestOld != null) { - callIdHeader = SipFactory.getInstance().createHeaderFactory().createCallIdHeader(requestOld.getCallIdHeader().getCallId()); + if (sipTransactionInfo != null) { + callIdHeader = SipFactory.getInstance().createHeaderFactory().createCallIdHeader(sipTransactionInfo.getCallId()); } else { callIdHeader = sipSender.getNewCallIdHeader(sipLayer.getLocalIp(device.getLocalIp()),device.getTransport()); } - SIPRequest request = (SIPRequest) headerProvider.createSubscribeRequest(device, subscribePostitionXml.toString(), requestOld, device.getSubscribeCycleForMobilePosition(), "presence",callIdHeader); //Position;id=" + tm.substring(tm.length() - 4)); + SIPRequest request = (SIPRequest) headerProvider.createSubscribeRequest(device, subscribePostitionXml.toString(), sipTransactionInfo, device.getSubscribeCycleForMobilePosition(), "presence",callIdHeader); //Position;id=" + tm.substring(tm.length() - 4)); sipSender.transmitRequest(sipLayer.getLocalIp(device.getLocalIp()), request, errorEvent, okEvent); return request; @@ -1255,7 +1255,7 @@ public class SIPCommander implements ISIPCommander { } @Override - public SIPRequest catalogSubscribe(Device device, SIPRequest requestOld, SipSubscribe.Event okEvent, SipSubscribe.Event errorEvent) throws InvalidArgumentException, SipException, ParseException { + public SIPRequest catalogSubscribe(Device device, SipTransactionInfo sipTransactionInfo, SipSubscribe.Event okEvent, SipSubscribe.Event errorEvent) throws InvalidArgumentException, SipException, ParseException { StringBuffer cmdXml = new StringBuffer(200); String charset = device.getCharset(); @@ -1268,14 +1268,14 @@ public class SIPCommander implements ISIPCommander { CallIdHeader callIdHeader; - if (requestOld != null) { - callIdHeader = SipFactory.getInstance().createHeaderFactory().createCallIdHeader(requestOld.getCallIdHeader().getCallId()); + if (sipTransactionInfo != null) { + callIdHeader = SipFactory.getInstance().createHeaderFactory().createCallIdHeader(sipTransactionInfo.getCallId()); } else { callIdHeader = sipSender.getNewCallIdHeader(sipLayer.getLocalIp(device.getLocalIp()),device.getTransport()); } // 有效时间默认为60秒以上 - SIPRequest request = (SIPRequest) headerProvider.createSubscribeRequest(device, cmdXml.toString(), requestOld, device.getSubscribeCycleForCatalog(), "Catalog", + SIPRequest request = (SIPRequest) headerProvider.createSubscribeRequest(device, cmdXml.toString(), sipTransactionInfo, device.getSubscribeCycleForCatalog(), "Catalog", callIdHeader); sipSender.transmitRequest(sipLayer.getLocalIp(device.getLocalIp()), request, errorEvent, okEvent); return request; diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/KeepaliveNotifyMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/KeepaliveNotifyMessageHandler.java index 462751775..cdaea441d 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/KeepaliveNotifyMessageHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/KeepaliveNotifyMessageHandler.java @@ -97,10 +97,6 @@ public class KeepaliveNotifyMessageHandler extends SIPRequestProcessorParent imp } Device device = sipMsgInfo.getDevice(); SIPRequest request = (SIPRequest) evt.getRequest(); -// if (!ObjectUtils.isEmpty(device.getKeepaliveTime()) && DateUtil.getDifferenceForNow(device.getKeepaliveTime()) <= 3000L) { -// log.info("[收到心跳] 心跳发送过于频繁,已忽略 device: {}, callId: {}", device.getDeviceId(), request.getCallIdHeader().getCallId()); -// return; -// } RemoteAddressInfo remoteAddressInfo = SipUtils.getRemoteAddressFromRequest(request, userSetting.getSipUseSourceIpAsRemoteAddress()); if (!device.getIp().equalsIgnoreCase(remoteAddressInfo.getIp()) || device.getPort() != remoteAddressInfo.getPort()) { @@ -109,12 +105,6 @@ public class KeepaliveNotifyMessageHandler extends SIPRequestProcessorParent imp device.setHostAddress(remoteAddressInfo.getIp().concat(":").concat(String.valueOf(remoteAddressInfo.getPort()))); device.setIp(remoteAddressInfo.getIp()); device.setLocalIp(request.getLocalAddress().getHostAddress()); - // 设备地址变化会引起目录订阅任务失效,需要重新添加 - if (device.getSubscribeCycleForCatalog() > 0) { - deviceService.removeCatalogSubscribe(device, result -> { - deviceService.addCatalogSubscribe(device); - }); - } } device.setKeepaliveTime(DateUtil.getNow()); diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisAlarmMsgListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisAlarmMsgListener.java index 886cd88eb..dc20f64bd 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisAlarmMsgListener.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisAlarmMsgListener.java @@ -8,6 +8,7 @@ import com.genersoft.iot.vmp.gb28181.bean.DeviceAlarm; import com.genersoft.iot.vmp.gb28181.bean.Platform; import com.genersoft.iot.vmp.gb28181.service.IDeviceChannelService; import com.genersoft.iot.vmp.gb28181.service.IDeviceService; +import com.genersoft.iot.vmp.gb28181.service.IPlatformChannelService; import com.genersoft.iot.vmp.gb28181.service.IPlatformService; import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander; import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform; @@ -55,6 +56,9 @@ public class RedisAlarmMsgListener implements MessageListener { @Autowired private IPlatformService platformService; + @Autowired + private IPlatformChannelService platformChannelService; + private final ConcurrentLinkedQueue taskQueue = new ConcurrentLinkedQueue<>(); @Autowired @@ -145,22 +149,25 @@ public class RedisAlarmMsgListener implements MessageListener { } } else { - Device device = deviceService.getDeviceByDeviceId(gbId); - Platform platform = platformService.queryPlatformByServerGBId(gbId); - if (device != null && platform == null) { + // 获取该通道ID是属于设备还是对应的上级平台 + Device device = deviceService.getDeviceBySourceChannelDeviceId(gbId); + List platforms = platformChannelService.queryByPlatformBySharChannelId(gbId); + if (device != null && (platforms == null || platforms.isEmpty())) { try { commander.sendAlarmMessage(device, deviceAlarm); } catch (InvalidArgumentException | SipException | ParseException e) { log.error("[命令发送失败] 发送报警: {}", e.getMessage()); } - } else if (device == null && platform != null) { - try { - commanderForPlatform.sendAlarmMessage(platform, deviceAlarm); - } catch (InvalidArgumentException | SipException | ParseException e) { - log.error("[命令发送失败] 发送报警: {}", e.getMessage()); + } else if (device == null && (platforms != null && !platforms.isEmpty())) { + for (Platform platform : platforms) { + try { + commanderForPlatform.sendAlarmMessage(platform, deviceAlarm); + } catch (InvalidArgumentException | SipException | ParseException e) { + log.error("[命令发送失败] 发送报警: {}", e.getMessage()); + } } } else { - log.warn("无法确定" + gbId + "是平台还是设备"); + log.warn("[REDIS的ALARM通知] 未查询到" + gbId + "所属的平台或设备"); } } } catch (Exception e) {