优化通道变化消息发送以及增加设备状态丢失检测

This commit is contained in:
lin
2025-06-03 16:52:42 +08:00
parent cc0c73a64d
commit f4bbca78e5
18 changed files with 93 additions and 47 deletions

View File

@@ -212,7 +212,4 @@ public class Device {
@Schema(description = "所属服务Id") @Schema(description = "所属服务Id")
private String serverId; private String serverId;
@Schema(description = "所属服务Id")
private Integer deviceType;
} }

View File

@@ -123,7 +123,12 @@ public class DeviceQuery {
log.debug("设备通道信息同步API调用deviceId" + deviceId); log.debug("设备通道信息同步API调用deviceId" + deviceId);
} }
Device device = deviceService.getDeviceByDeviceId(deviceId); Device device = deviceService.getDeviceByDeviceId(deviceId);
if (device.getRegisterTime() == null) {
WVPResult<SyncStatus> wvpResult = new WVPResult<>();
wvpResult.setCode(ErrorCode.ERROR100.getCode());
wvpResult.setMsg("设备尚未注册过");
return wvpResult;
}
return deviceService.devicesSync(device); return deviceService.devicesSync(device);
} }

View File

@@ -578,4 +578,8 @@ public interface CommonGBChannelMapper {
" <foreach collection='channelIdsForClear' item='item' open='(' separator=',' close=')' > #{item}</foreach>" + " <foreach collection='channelIdsForClear' item='item' open='(' separator=',' close=')' > #{item}</foreach>" +
" </script>"}) " </script>"})
void removeParentIdByChannelIds(List<Integer> channelIdsForClear); void removeParentIdByChannelIds(List<Integer> channelIdsForClear);
@SelectProvider(type = ChannelProvider.class, method = "queryOnlineListsByGbDeviceId")
List<CommonGBChannel> queryOnlineListsByGbDeviceId(@Param("deviceId") int deviceId);
} }

View File

@@ -46,7 +46,6 @@ public interface DeviceMapper {
"server_id,"+ "server_id,"+
"media_server_id," + "media_server_id," +
"broadcast_push_after_ack," + "broadcast_push_after_ack," +
"device_type," +
"(SELECT count(0) FROM wvp_device_channel dc WHERE dc.data_type = 1 and dc.data_device_id= de.id) as channel_count "+ "(SELECT count(0) FROM wvp_device_channel dc WHERE dc.data_type = 1 and dc.data_device_id= de.id) as channel_count "+
" FROM wvp_device de WHERE de.device_id = #{deviceId}") " FROM wvp_device de WHERE de.device_id = #{deviceId}")
Device getDeviceByDeviceId( @Param("deviceId") String deviceId); Device getDeviceByDeviceId( @Param("deviceId") String deviceId);
@@ -81,7 +80,6 @@ public interface DeviceMapper {
"ssrc_check,"+ "ssrc_check,"+
"as_message_channel,"+ "as_message_channel,"+
"broadcast_push_after_ack,"+ "broadcast_push_after_ack,"+
"device_type,"+
"geo_coord_sys,"+ "geo_coord_sys,"+
"server_id,"+ "server_id,"+
"on_line"+ "on_line"+
@@ -115,7 +113,6 @@ public interface DeviceMapper {
"#{ssrcCheck}," + "#{ssrcCheck}," +
"#{asMessageChannel}," + "#{asMessageChannel}," +
"#{broadcastPushAfterAck}," + "#{broadcastPushAfterAck}," +
"#{deviceType}," +
"#{geoCoordSys}," + "#{geoCoordSys}," +
"#{serverId}," + "#{serverId}," +
"#{onLine}" + "#{onLine}" +
@@ -181,7 +178,6 @@ public interface DeviceMapper {
"ssrc_check,"+ "ssrc_check,"+
"as_message_channel,"+ "as_message_channel,"+
"broadcast_push_after_ack,"+ "broadcast_push_after_ack,"+
"device_type,"+
"geo_coord_sys,"+ "geo_coord_sys,"+
"on_line,"+ "on_line,"+
"media_server_id,"+ "media_server_id,"+
@@ -224,7 +220,6 @@ public interface DeviceMapper {
"ssrc_check,"+ "ssrc_check,"+
"as_message_channel,"+ "as_message_channel,"+
"broadcast_push_after_ack,"+ "broadcast_push_after_ack,"+
"device_type,"+
"geo_coord_sys,"+ "geo_coord_sys,"+
"server_id,"+ "server_id,"+
"on_line"+ "on_line"+
@@ -259,7 +254,6 @@ public interface DeviceMapper {
"ssrc_check,"+ "ssrc_check,"+
"as_message_channel,"+ "as_message_channel,"+
"broadcast_push_after_ack,"+ "broadcast_push_after_ack,"+
"device_type,"+
"geo_coord_sys,"+ "geo_coord_sys,"+
"server_id,"+ "server_id,"+
"on_line"+ "on_line"+
@@ -294,7 +288,6 @@ public interface DeviceMapper {
"ssrc_check,"+ "ssrc_check,"+
"as_message_channel,"+ "as_message_channel,"+
"broadcast_push_after_ack,"+ "broadcast_push_after_ack,"+
"device_type,"+
"geo_coord_sys,"+ "geo_coord_sys,"+
"on_line"+ "on_line"+
" FROM wvp_device WHERE ip = #{host} AND port=#{port}") " FROM wvp_device WHERE ip = #{host} AND port=#{port}")
@@ -305,7 +298,7 @@ public interface DeviceMapper {
"SET update_time=#{updateTime}, custom_name=#{name} , password=#{password}, stream_mode=#{streamMode}" + "SET update_time=#{updateTime}, custom_name=#{name} , password=#{password}, stream_mode=#{streamMode}" +
", ip=#{ip}, sdp_ip=#{sdpIp}, port=#{port}, charset=#{charset}" + ", ip=#{ip}, sdp_ip=#{sdpIp}, port=#{port}, charset=#{charset}" +
", ssrc_check=#{ssrcCheck}, as_message_channel=#{asMessageChannel}" + ", ssrc_check=#{ssrcCheck}, as_message_channel=#{asMessageChannel}" +
", broadcast_push_after_ack=#{broadcastPushAfterAck}, device_type=#{deviceType}, geo_coord_sys=#{geoCoordSys}, media_server_id=#{mediaServerId}" + ", broadcast_push_after_ack=#{broadcastPushAfterAck}, geo_coord_sys=#{geoCoordSys}, media_server_id=#{mediaServerId}" +
" WHERE id=#{id}"+ " WHERE id=#{id}"+
" </script>"}) " </script>"})
void updateCustom(Device device); void updateCustom(Device device);
@@ -321,7 +314,6 @@ public interface DeviceMapper {
"ssrc_check,"+ "ssrc_check,"+
"as_message_channel,"+ "as_message_channel,"+
"broadcast_push_after_ack,"+ "broadcast_push_after_ack,"+
"device_type,"+
"geo_coord_sys,"+ "geo_coord_sys,"+
"on_line,"+ "on_line,"+
"stream_mode," + "stream_mode," +
@@ -338,7 +330,6 @@ public interface DeviceMapper {
"#{ssrcCheck}," + "#{ssrcCheck}," +
"#{asMessageChannel}," + "#{asMessageChannel}," +
"#{broadcastPushAfterAck}," + "#{broadcastPushAfterAck}," +
"#{deviceType}," +
"#{geoCoordSys}," + "#{geoCoordSys}," +
"#{onLine}," + "#{onLine}," +
"#{streamMode}," + "#{streamMode}," +

View File

@@ -97,7 +97,7 @@ public class ChannelProvider {
" coalesce(wdc.gb_svc_time_support_mode, wdc.svc_time_support_mode) as gb_svc_time_support_mode\n" + " coalesce(wdc.gb_svc_time_support_mode, wdc.svc_time_support_mode) as gb_svc_time_support_mode\n" +
" from wvp_device_channel wdc\n" " from wvp_device_channel wdc\n"
; ;
private final static String BASE_SQL_FOR_PLATFORM = private final static String BASE_SQL_FOR_PLATFORM =
"select\n" + "select\n" +
" wdc.id as gb_id,\n" + " wdc.id as gb_id,\n" +
@@ -455,6 +455,13 @@ public class ChannelProvider {
return sqlBuild.toString(); return sqlBuild.toString();
} }
public String queryOnlineListsByGbDeviceId(Map<String, Object> params ){
StringBuilder sqlBuild = new StringBuilder();
sqlBuild.append(BASE_SQL_TABLE_NAME);
sqlBuild.append(" where wdc.channel_type = 0 AND coalesce(wdc.gb_status, wdc.status) = 'ON' AND wdc.data_type = 1 AND data_device_id = #{deviceId}");
return sqlBuild.toString();
}
public String queryAllForUnusualCivilCode(Map<String, Object> params ){ public String queryAllForUnusualCivilCode(Map<String, Object> params ){
StringBuilder sqlBuild = new StringBuilder(); StringBuilder sqlBuild = new StringBuilder();
sqlBuild.append("select wdc.id from wvp_device_channel wdc "); sqlBuild.append("select wdc.id from wvp_device_channel wdc ");

View File

@@ -68,11 +68,7 @@ public class EventPublisher {
deviceChannelList.add(deviceChannel); deviceChannelList.add(deviceChannel);
catalogEventPublish(platform, deviceChannelList, type); catalogEventPublish(platform, deviceChannelList, type);
} }
public void catalogEventPublish(Platform platform, List<CommonGBChannel> deviceChannels, String type) { public void catalogEventPublish(Platform platform, List<CommonGBChannel> deviceChannels, String type) {
catalogEventPublish(platform, deviceChannels, type, true);
}
public void catalogEventPublish(Platform platform, List<CommonGBChannel> deviceChannels, String type, boolean share) {
if (platform != null && !userSetting.getServerId().equals(platform.getServerId())) { if (platform != null && !userSetting.getServerId().equals(platform.getServerId())) {
log.info("[国标级联] 目录状态推送, 此上级平台由其他服务处理,消息已经忽略"); log.info("[国标级联] 目录状态推送, 此上级平台由其他服务处理,消息已经忽略");
return; return;
@@ -91,6 +87,7 @@ public class EventPublisher {
}else { }else {
channels = deviceChannels; channels = deviceChannels;
} }
System.out.println(5);
outEvent.setChannels(channels); outEvent.setChannels(channels);
outEvent.setType(type); outEvent.setType(type);
if (platform != null) { if (platform != null) {

View File

@@ -47,7 +47,7 @@ public class CatalogEventLister implements ApplicationListener<CatalogEvent> {
public void onApplicationEvent(CatalogEvent event) { public void onApplicationEvent(CatalogEvent event) {
SubscribeInfo subscribe = null; SubscribeInfo subscribe = null;
Platform parentPlatform = null; Platform parentPlatform = null;
log.info("[Catalog事件: {}] 通道数量: {}", event.getType(), event.getChannels().size()); log.info("[Catalog事件: {}]通道数量: {}", event.getType(), event.getChannels().size());
Map<String, List<Platform>> platformMap = new HashMap<>(); Map<String, List<Platform>> platformMap = new HashMap<>();
Map<String, CommonGBChannel> channelMap = new HashMap<>(); Map<String, CommonGBChannel> channelMap = new HashMap<>();
if (event.getPlatform() != null) { if (event.getPlatform() != null) {

View File

@@ -224,7 +224,6 @@ public class DeviceChannelServiceImpl implements IDeviceChannelService {
@Override @Override
public List<Device> getDeviceByChannelId(String channelId) { public List<Device> getDeviceByChannelId(String channelId) {
return channelMapper.getDeviceByChannelDeviceId(channelId); return channelMapper.getDeviceByChannelDeviceId(channelId);
} }
@@ -571,13 +570,17 @@ public class DeviceChannelServiceImpl implements IDeviceChannelService {
for (DeviceChannel deviceChannel : deviceChannelList) { for (DeviceChannel deviceChannel : deviceChannelList) {
DeviceChannel channelInDb = allChannelMap.get(deviceChannel.getDataDeviceId() + deviceChannel.getDeviceId()); DeviceChannel channelInDb = allChannelMap.get(deviceChannel.getDataDeviceId() + deviceChannel.getDeviceId());
if (channelInDb != null) { if (channelInDb != null) {
System.out.println(1);
deviceChannel.setStreamId(channelInDb.getStreamId()); deviceChannel.setStreamId(channelInDb.getStreamId());
deviceChannel.setHasAudio(channelInDb.isHasAudio()); deviceChannel.setHasAudio(channelInDb.isHasAudio());
deviceChannel.setId(channelInDb.getId()); deviceChannel.setId(channelInDb.getId());
if (channelInDb.getStatus() != null && channelInDb.getStatus().equalsIgnoreCase(deviceChannel.getStatus())){ if (channelInDb.getStatus() != null && !channelInDb.getStatus().equalsIgnoreCase(deviceChannel.getStatus())){
System.out.println(2);
List<Platform> platformList = platformChannelMapper.queryParentPlatformByChannelId(deviceChannel.getDeviceId()); List<Platform> platformList = platformChannelMapper.queryParentPlatformByChannelId(deviceChannel.getDeviceId());
if (!CollectionUtils.isEmpty(platformList)){ if (!CollectionUtils.isEmpty(platformList)){
System.out.println(3);
platformList.forEach(platform->{ platformList.forEach(platform->{
System.out.println(4);
eventPublisher.catalogEventPublish(platform, deviceChannel.buildCommonGBChannelForStatus(), deviceChannel.getStatus().equals("ON")? CatalogEvent.ON:CatalogEvent.OFF); eventPublisher.catalogEventPublish(platform, deviceChannel.buildCommonGBChannelForStatus(), deviceChannel.getStatus().equals("ON")? CatalogEvent.ON:CatalogEvent.OFF);
}); });
} }

View File

@@ -3,13 +3,15 @@ package com.genersoft.iot.vmp.gb28181.service.impl;
import com.alibaba.fastjson2.JSON; import com.alibaba.fastjson2.JSON;
import com.genersoft.iot.vmp.common.CommonCallback; import com.genersoft.iot.vmp.common.CommonCallback;
import com.genersoft.iot.vmp.common.enums.ChannelDataType; import com.genersoft.iot.vmp.common.enums.ChannelDataType;
import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.exception.ControllerException; import com.genersoft.iot.vmp.conf.exception.ControllerException;
import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.dao.CommonGBChannelMapper;
import com.genersoft.iot.vmp.gb28181.dao.DeviceChannelMapper; import com.genersoft.iot.vmp.gb28181.dao.DeviceChannelMapper;
import com.genersoft.iot.vmp.gb28181.dao.DeviceMapper; import com.genersoft.iot.vmp.gb28181.dao.DeviceMapper;
import com.genersoft.iot.vmp.gb28181.dao.PlatformChannelMapper; import com.genersoft.iot.vmp.gb28181.dao.PlatformChannelMapper;
import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent;
import com.genersoft.iot.vmp.gb28181.service.IDeviceService; import com.genersoft.iot.vmp.gb28181.service.IDeviceService;
import com.genersoft.iot.vmp.gb28181.service.IInviteStreamService; import com.genersoft.iot.vmp.gb28181.service.IInviteStreamService;
import com.genersoft.iot.vmp.gb28181.session.AudioBroadcastManager; import com.genersoft.iot.vmp.gb28181.session.AudioBroadcastManager;
@@ -64,9 +66,6 @@ import java.util.concurrent.TimeUnit;
@Order(value=16) @Order(value=16)
public class DeviceServiceImpl implements IDeviceService, CommandLineRunner { public class DeviceServiceImpl implements IDeviceService, CommandLineRunner {
@Autowired
private DynamicTask dynamicTask;
@Autowired @Autowired
private ISIPCommander sipCommander; private ISIPCommander sipCommander;
@@ -88,6 +87,12 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner {
@Autowired @Autowired
private DeviceChannelMapper deviceChannelMapper; private DeviceChannelMapper deviceChannelMapper;
@Autowired
private CommonGBChannelMapper commonGBChannelMapper;
@Autowired
private EventPublisher eventPublisher;
@Autowired @Autowired
private ISendRtpServerService sendRtpServerService; private ISendRtpServerService sendRtpServerService;
@@ -156,7 +161,7 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner {
} }
// 恢复定时任务, TCP因为连接已经断开必须等待设备重新连接 // 恢复定时任务, TCP因为连接已经断开必须等待设备重新连接
DeviceStatusTask deviceStatusTask = DeviceStatusTask.getInstance(taskInfo.getDeviceId(), DeviceStatusTask deviceStatusTask = DeviceStatusTask.getInstance(taskInfo.getDeviceId(),
taskInfo.getTransactionInfo(), taskInfo.getExpireTime(), this::deviceStatusExpire); taskInfo.getTransactionInfo(), taskInfo.getExpireTime() + 1000 + System.currentTimeMillis(), this::deviceStatusExpire);
deviceStatusTaskRunner.addTask(deviceStatusTask); deviceStatusTaskRunner.addTask(deviceStatusTask);
onlineDeviceIds.add(taskInfo.getDeviceId()); onlineDeviceIds.add(taskInfo.getDeviceId());
} }
@@ -238,8 +243,6 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner {
if (subscribeTaskRunner.containsKey(SubscribeTaskForMobilPosition.getKey(device))) { if (subscribeTaskRunner.containsKey(SubscribeTaskForMobilPosition.getKey(device))) {
subscribeTaskRunner.removeSubscribe(SubscribeTaskForMobilPosition.getKey(device)); subscribeTaskRunner.removeSubscribe(SubscribeTaskForMobilPosition.getKey(device));
} }
//进行通道离线
// deviceChannelMapper.offlineByDeviceId(deviceId);
// 离线释放所有ssrc // 离线释放所有ssrc
List<SsrcTransaction> ssrcTransactions = sessionManager.getSsrcTransactionByDeviceId(device.getDeviceId()); List<SsrcTransaction> ssrcTransactions = sessionManager.getSsrcTransactionByDeviceId(device.getDeviceId());
if (ssrcTransactions != null && !ssrcTransactions.isEmpty()) { if (ssrcTransactions != null && !ssrcTransactions.isEmpty()) {
@@ -308,7 +311,9 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner {
device.setCreateTime(now); device.setCreateTime(now);
device.setUpdateTime(now); device.setUpdateTime(now);
log.info("[设备上线,首次注册]: {},查询设备信息以及通道信息", device.getDeviceId()); log.info("[设备上线,首次注册]: {},查询设备信息以及通道信息", device.getDeviceId());
addCustomDevice(device); if(device.getStreamMode() == null) {
device.setStreamMode("TCP-PASSIVE");
}
deviceMapper.add(device); deviceMapper.add(device);
redisCatchStorage.updateDevice(device); redisCatchStorage.updateDevice(device);
try { try {
@@ -334,7 +339,7 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner {
} }
sync(device); sync(device);
}else { }else {
if (!isPlatform(device.getDeviceId())) { if (isDevice(device.getDeviceId())) {
sync(device); sync(device);
} }
} }
@@ -364,14 +369,14 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner {
long expiresTime = Math.min(device.getExpires(), device.getHeartBeatInterval() * device.getHeartBeatCount()) * 1000L; long expiresTime = Math.min(device.getExpires(), device.getHeartBeatInterval() * device.getHeartBeatCount()) * 1000L;
if (deviceStatusTaskRunner.containsKey(device.getDeviceId())) { if (deviceStatusTaskRunner.containsKey(device.getDeviceId())) {
if (sipTransactionInfo == null) { if (sipTransactionInfo == null) {
deviceStatusTaskRunner.updateDelay(device.getDeviceId(), System.currentTimeMillis() + expiresTime); deviceStatusTaskRunner.updateDelay(device.getDeviceId(), expiresTime + System.currentTimeMillis());
}else { }else {
deviceStatusTaskRunner.removeTask(device.getDeviceId()); deviceStatusTaskRunner.removeTask(device.getDeviceId());
DeviceStatusTask task = DeviceStatusTask.getInstance(device.getDeviceId(), sipTransactionInfo, expiresTime, this::deviceStatusExpire); DeviceStatusTask task = DeviceStatusTask.getInstance(device.getDeviceId(), sipTransactionInfo, expiresTime + System.currentTimeMillis(), this::deviceStatusExpire);
deviceStatusTaskRunner.addTask(task); deviceStatusTaskRunner.addTask(task);
} }
}else { }else {
DeviceStatusTask task = DeviceStatusTask.getInstance(device.getDeviceId(), sipTransactionInfo, expiresTime, this::deviceStatusExpire); DeviceStatusTask task = DeviceStatusTask.getInstance(device.getDeviceId(), sipTransactionInfo, expiresTime + System.currentTimeMillis(), this::deviceStatusExpire);
deviceStatusTaskRunner.addTask(task); deviceStatusTaskRunner.addTask(task);
} }
@@ -401,13 +406,27 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner {
cleanOfflineDevice(device); cleanOfflineDevice(device);
redisCatchStorage.updateDevice(device); redisCatchStorage.updateDevice(device);
deviceMapper.update(device); deviceMapper.update(device);
if (!isPlatform(deviceId)) { if (userSetting.getDeviceStatusNotify()) {
// 进行通道离线 // 发送redis消息
deviceChannelMapper.offlineByDeviceId(device.getId()); redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), null, false);
}
if (isDevice(deviceId)) {
channelOfflineByDevice(device);
} }
} }
private boolean isPlatform(String deviceId) { private void channelOfflineByDevice(Device device) {
// 进行通道离线
List<CommonGBChannel> channelList = commonGBChannelMapper.queryOnlineListsByGbDeviceId(device.getId());
if (channelList.isEmpty()) {
return;
}
deviceChannelMapper.offlineByDeviceId(device.getId());
// 发送通道离线通知
eventPublisher.catalogEventPublish(null, channelList, CatalogEvent.OFF);
}
private boolean isDevice(String deviceId) {
GbCode decode = GbCode.decode(deviceId); GbCode decode = GbCode.decode(deviceId);
if (decode == null) { if (decode == null) {
return true; return true;
@@ -418,7 +437,7 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner {
// 订阅丢失检查 // 订阅丢失检查
@Scheduled(fixedDelay = 10, timeUnit = TimeUnit.SECONDS) @Scheduled(fixedDelay = 10, timeUnit = TimeUnit.SECONDS)
public void lostCheck(){ public void lostCheckForSubscribe(){
// 获取所有设备 // 获取所有设备
List<Device> deviceList = redisCatchStorage.getAllDevices(); List<Device> deviceList = redisCatchStorage.getAllDevices();
if (deviceList.isEmpty()) { if (deviceList.isEmpty()) {
@@ -439,6 +458,25 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner {
} }
} }
// 设备状态丢失检查
@Scheduled(fixedDelay = 30, timeUnit = TimeUnit.SECONDS)
public void lostCheckForStatus(){
// 获取所有设备
List<Device> deviceList = redisCatchStorage.getAllDevices();
if (deviceList.isEmpty()) {
return;
}
for (Device device : deviceList) {
if (device == null || !device.isOnLine() || !userSetting.getServerId().equals(device.getServerId())) {
continue;
}
if (!deviceStatusTaskRunner.containsKey(device.getDeviceId())) {
log.debug("[状态丢失] 执行设备离线, 编号: {},", device.getDeviceId());
offline(device.getDeviceId(), "");
}
}
}
private void catalogSubscribeExpire(String deviceId, SipTransactionInfo transactionInfo) { private void catalogSubscribeExpire(String deviceId, SipTransactionInfo transactionInfo) {
log.info("[目录订阅] 到期, 编号: {}", deviceId); log.info("[目录订阅] 到期, 编号: {}", deviceId);
Device device = getDeviceByDeviceId(deviceId); Device device = getDeviceByDeviceId(deviceId);

View File

@@ -28,7 +28,7 @@ public class DeviceStatusTask implements Delayed {
DeviceStatusTask deviceStatusTask = new DeviceStatusTask(); DeviceStatusTask deviceStatusTask = new DeviceStatusTask();
deviceStatusTask.setDeviceId(deviceId); deviceStatusTask.setDeviceId(deviceId);
deviceStatusTask.setTransactionInfo(transactionInfo); deviceStatusTask.setTransactionInfo(transactionInfo);
deviceStatusTask.setDelayTime((delayTime * 1000L - 500L) + System.currentTimeMillis()); deviceStatusTask.setDelayTime(delayTime);
deviceStatusTask.setCallback(callback); deviceStatusTask.setCallback(callback);
return deviceStatusTask; return deviceStatusTask;
} }

View File

@@ -93,7 +93,7 @@ public class DeviceStatusTaskRunner {
if (task == null) { if (task == null) {
return false; return false;
} }
log.info("[更新状态任务时间] 编号: {}", key); log.debug("[更新状态任务时间] 编号: {}", key);
if (delayQueue.contains(task)) { if (delayQueue.contains(task)) {
boolean remove = delayQueue.remove(task); boolean remove = delayQueue.remove(task);
if (!remove) { if (!remove) {

View File

@@ -49,7 +49,7 @@ public class SIPRequestHeaderProvider {
// sipuri // sipuri
SipURI requestURI = SipFactory.getInstance().createAddressFactory().createSipURI(device.getDeviceId(), device.getHostAddress()); SipURI requestURI = SipFactory.getInstance().createAddressFactory().createSipURI(device.getDeviceId(), device.getHostAddress());
// via // via
ArrayList<ViaHeader> viaHeaders = new ArrayList<ViaHeader>(); ArrayList<ViaHeader> viaHeaders = new ArrayList<>();
ViaHeader viaHeader = SipFactory.getInstance().createHeaderFactory().createViaHeader(sipLayer.getLocalIp(device.getLocalIp()), sipConfig.getPort(), device.getTransport(), viaTag); ViaHeader viaHeader = SipFactory.getInstance().createHeaderFactory().createViaHeader(sipLayer.getLocalIp(device.getLocalIp()), sipConfig.getPort(), device.getTransport(), viaTag);
viaHeader.setRPort(); viaHeader.setRPort();
viaHeaders.add(viaHeader); viaHeaders.add(viaHeader);

View File

@@ -69,8 +69,9 @@ public class SubscribeRequestProcessor extends SIPRequestProcessorParent impleme
responseAck(request, Response.BAD_REQUEST); responseAck(request, Response.BAD_REQUEST);
return; return;
} }
String platformId = SipUtils.getUserIdFromFromHeader(request);
String cmd = XmlUtil.getText(rootElement, "CmdType"); String cmd = XmlUtil.getText(rootElement, "CmdType");
log.info("[收到订阅请求] 类型: {}", cmd); log.info("[收到订阅请求] 类型: {}, 来自: {}", cmd, platformId);
if (CmdType.MOBILE_POSITION.equals(cmd)) { if (CmdType.MOBILE_POSITION.equals(cmd)) {
processNotifyMobilePosition(request, rootElement); processNotifyMobilePosition(request, rootElement);
// } else if (CmdType.ALARM.equals(cmd)) { // } else if (CmdType.ALARM.equals(cmd)) {

View File

@@ -62,6 +62,11 @@ public class RedisRpcDeviceController extends RpcController {
response.setBody("param error"); response.setBody("param error");
return response; return response;
} }
if (device.getRegisterTime() == null) {
response.setStatusCode(ErrorCode.ERROR400.getCode());
response.setBody("设备尚未注册过");
return response;
}
WVPResult<SyncStatus> result = deviceService.devicesSync(device); WVPResult<SyncStatus> result = deviceService.devicesSync(device);
response.setStatusCode(ErrorCode.SUCCESS.getCode()); response.setStatusCode(ErrorCode.SUCCESS.getCode());
response.setBody(JSONObject.toJSONString(result)); response.setBody(JSONObject.toJSONString(result));

View File

@@ -74,7 +74,7 @@ public class RedisRpcPlatformController extends RpcController {
List<CommonGBChannel> channels = jsonObject.getJSONArray("channels").toJavaList(CommonGBChannel.class); List<CommonGBChannel> channels = jsonObject.getJSONArray("channels").toJavaList(CommonGBChannel.class);
String type = jsonObject.getString("type"); String type = jsonObject.getString("type");
eventPublisher.catalogEventPublish(platform, channels, type, false); eventPublisher.catalogEventPublish(platform, channels, type);
RedisRpcResponse response = request.getResponse(); RedisRpcResponse response = request.getResponse();
response.setStatusCode(ErrorCode.SUCCESS.getCode()); response.setStatusCode(ErrorCode.SUCCESS.getCode());
return response; return response;

View File

@@ -2,4 +2,4 @@ spring:
application: application:
name: wvp name: wvp
profiles: profiles:
active: 274 active: 274-dev

View File

@@ -37,7 +37,6 @@ create table IF NOT EXISTS wvp_device
position_capability integer, position_capability integer,
broadcast_push_after_ack bool default false, broadcast_push_after_ack bool default false,
server_id character varying(50), server_id character varying(50),
device_type integer default 200,
constraint uk_device_device unique (device_id) constraint uk_device_device unique (device_id)
); );

View File

@@ -37,7 +37,6 @@ create table IF NOT EXISTS wvp_device
position_capability integer, position_capability integer,
broadcast_push_after_ack bool default false, broadcast_push_after_ack bool default false,
server_id character varying(50), server_id character varying(50),
device_type integer,
constraint uk_device_device unique (device_id) constraint uk_device_device unique (device_id)
); );