Merge remote-tracking branch 'origin/wvp-28181-2.0' into map

# Conflicts:
#	src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/SubscribeListenerForPlatform.java
This commit is contained in:
648540858
2022-04-15 09:37:33 +08:00
39 changed files with 1024 additions and 340 deletions

View File

@@ -8,6 +8,12 @@ public class CatalogData {
private List<DeviceChannel> channelList;
private Date lastTime;
private Device device;
private String errorMsg;
public enum CatalogDataStatus{
ready, runIng, end
}
private CatalogDataStatus status;
public int getTotal() {
return total;
@@ -40,4 +46,20 @@ public class CatalogData {
public void setDevice(Device device) {
this.device = device;
}
public String getErrorMsg() {
return errorMsg;
}
public void setErrorMsg(String errorMsg) {
this.errorMsg = errorMsg;
}
public CatalogDataStatus getStatus() {
return status;
}
public void setStatus(CatalogDataStatus status) {
this.status = status;
}
}

View File

@@ -1,5 +1,12 @@
package com.genersoft.iot.vmp.gb28181.bean;
import com.genersoft.iot.vmp.common.VideoManagerConstants;
import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.gb28181.task.impl.MobilePositionSubscribeHandlerTask;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
@@ -9,12 +16,32 @@ import java.util.concurrent.ConcurrentHashMap;
@Component
public class SubscribeHolder {
@Autowired
private DynamicTask dynamicTask;
@Autowired
private IRedisCatchStorage redisCatchStorage;
@Autowired
private ISIPCommanderForPlatform sipCommanderForPlatform;
@Autowired
private IVideoManagerStorage storager;
private final String taskOverduePrefix = "subscribe_overdue_";
private static ConcurrentHashMap<String, SubscribeInfo> catalogMap = new ConcurrentHashMap<>();
private static ConcurrentHashMap<String, SubscribeInfo> mobilePositionMap = new ConcurrentHashMap<>();
public void putCatalogSubscribe(String platformId, SubscribeInfo subscribeInfo) {
catalogMap.put(platformId, subscribeInfo);
// 添加订阅到期
String taskOverdueKey = taskOverduePrefix + "catalog_" + platformId;
dynamicTask.stop(taskOverdueKey);
// 添加任务处理订阅过期
dynamicTask.startDelay(taskOverdueKey, () -> removeCatalogSubscribe(subscribeInfo.getId()),
subscribeInfo.getExpires() * 1000);
}
public SubscribeInfo getCatalogSubscribe(String platformId) {
@@ -23,10 +50,24 @@ public class SubscribeHolder {
public void removeCatalogSubscribe(String platformId) {
catalogMap.remove(platformId);
String taskOverdueKey = taskOverduePrefix + "catalog_" + platformId;
// 添加任务处理订阅过期
dynamicTask.stop(taskOverdueKey);
}
public void putMobilePositionSubscribe(String platformId, SubscribeInfo subscribeInfo) {
mobilePositionMap.put(platformId, subscribeInfo);
String key = VideoManagerConstants.SIP_SUBSCRIBE_PREFIX + "MobilePosition_" + platformId;
// 添加任务处理GPS定时推送
dynamicTask.startCron(key, new MobilePositionSubscribeHandlerTask(redisCatchStorage, sipCommanderForPlatform, storager, platformId, subscribeInfo.getSn(), key, this), subscribeInfo.getGpsInterval());
String taskOverdueKey = taskOverduePrefix + "MobilePosition_" + platformId;
dynamicTask.stop(taskOverdueKey);
// 添加任务处理订阅过期
dynamicTask.startDelay(taskOverdueKey, () -> {
System.out.println("订阅过期");
removeMobilePositionSubscribe(subscribeInfo.getId());
},
subscribeInfo.getExpires() * 1000);
}
public SubscribeInfo getMobilePositionSubscribe(String platformId) {
@@ -35,6 +76,12 @@ public class SubscribeHolder {
public void removeMobilePositionSubscribe(String platformId) {
mobilePositionMap.remove(platformId);
String key = VideoManagerConstants.SIP_SUBSCRIBE_PREFIX + "MobilePosition_" + platformId;
// 结束任务处理GPS定时推送
dynamicTask.stop(key);
String taskOverdueKey = taskOverduePrefix + "MobilePosition_" + platformId;
// 添加任务处理订阅过期
dynamicTask.stop(taskOverdueKey);
}
public List<String> getAllCatalogSubscribePlatform() {
@@ -48,7 +95,7 @@ public class SubscribeHolder {
}
public void removeAllSubscribe(String platformId) {
mobilePositionMap.remove(platformId);
catalogMap.remove(platformId);
removeMobilePositionSubscribe(platformId);
removeCatalogSubscribe(platformId);
}
}

View File

@@ -33,6 +33,14 @@ public class SubscribeInfo {
private ServerTransaction transaction;
private Dialog dialog;
/**
* 以下为可选字段
* @return
*/
private String sn;
private int gpsInterval;
public String getId() {
return id;
}
@@ -88,4 +96,20 @@ public class SubscribeInfo {
public void setDialog(Dialog dialog) {
this.dialog = dialog;
}
public String getSn() {
return sn;
}
public void setSn(String sn) {
this.sn = sn;
}
public int getGpsInterval() {
return gpsInterval;
}
public void setGpsInterval(int gpsInterval) {
this.gpsInterval = gpsInterval;
}
}

View File

@@ -0,0 +1,34 @@
package com.genersoft.iot.vmp.gb28181.bean;
/**
* 摄像机同步状态
*/
public class SyncStatus {
private int total;
private int current;
private String errorMsg;
public int getTotal() {
return total;
}
public void setTotal(int total) {
this.total = total;
}
public int getCurrent() {
return current;
}
public void setCurrent(int current) {
this.current = current;
}
public String getErrorMsg() {
return errorMsg;
}
public void setErrorMsg(String errorMsg) {
this.errorMsg = errorMsg;
}
}

View File

@@ -95,11 +95,12 @@ public class OnlineEventListener implements ApplicationListener<OnlineEvent> {
}
// 处理上线监听
storager.updateDevice(device);
List<DeviceChannel> deviceChannelList = storager.queryOnlineChannelsByDeviceId(device.getDeviceId());
eventPublisher.catalogEventPublish(null, deviceChannelList, CatalogEvent.ON);
// 上线添加订阅
if (device.getSubscribeCycleForCatalog() > 0) {
// 查询在线设备那些开启了订阅,为设备开启定时的目录订阅
deviceService.addCatalogSubscribe(device);
}
if (device.getSubscribeCycleForMobilePosition() > 0) {
deviceService.addMobilePositionSubscribe(device);
}
}

View File

@@ -1,49 +0,0 @@
package com.genersoft.iot.vmp.gb28181.event.subscribe;
import com.genersoft.iot.vmp.common.VideoManagerConstants;
import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.conf.RedisKeyExpirationEventMessageListener;
import com.genersoft.iot.vmp.conf.UserSetting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.stereotype.Component;
/**
* 平台订阅到期事件
*/
@Component
public class SubscribeListenerForPlatform extends RedisKeyExpirationEventMessageListener {
private Logger logger = LoggerFactory.getLogger(SubscribeListenerForPlatform.class);
@Autowired
private UserSetting userSetting;
@Autowired
private DynamicTask dynamicTask;
public SubscribeListenerForPlatform(RedisMessageListenerContainer listenerContainer, UserSetting userSetting) {
super(listenerContainer, userSetting);
}
/**
* 监听失效的key
* @param message
* @param pattern
*/
@Override
public void onMessage(Message message, byte[] pattern) {
// 获取失效的key
String expiredKey = message.toString();
// 订阅到期
String PLATFORM_KEEPLIVEKEY_PREFIX = VideoManagerConstants.SIP_SUBSCRIBE_PREFIX + userSetting.getServerId() + "_";
if (expiredKey.startsWith(PLATFORM_KEEPLIVEKEY_PREFIX)) {
// 取消定时任务
dynamicTask.stop(expiredKey);
}
}
}

View File

@@ -61,8 +61,6 @@ public class CatalogEventLister implements ApplicationListener<CatalogEvent> {
if (event.getPlatformId() != null) {
parentPlatform = storager.queryParentPlatByServerGBId(event.getPlatformId());
if (parentPlatform != null && !parentPlatform.isStatus())return;
String key = VideoManagerConstants.SIP_SUBSCRIBE_PREFIX + userSetting.getServerId() + "_Catalog_" + event.getPlatformId();
// subscribe = redisCatchStorage.getSubscribe(key);
subscribe = subscribeHolder.getCatalogSubscribe(event.getPlatformId());
if (subscribe == null) {

View File

@@ -3,6 +3,7 @@ package com.genersoft.iot.vmp.gb28181.session;
import com.genersoft.iot.vmp.gb28181.bean.CatalogData;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
import com.genersoft.iot.vmp.gb28181.bean.SyncStatus;
import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder;
import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
@@ -25,6 +26,17 @@ public class CatalogDataCatch {
@Autowired
private IVideoManagerStorage storager;
public void addReady(String key) {
CatalogData catalogData = data.get(key);
if (catalogData == null || catalogData.getStatus().equals(CatalogData.CatalogDataStatus.end)) {
catalogData = new CatalogData();
catalogData.setChannelList(new ArrayList<>());
catalogData.setStatus(CatalogData.CatalogDataStatus.ready);
catalogData.setLastTime(new Date(System.currentTimeMillis()));
data.put(key, catalogData);
}
}
public void put(String key, int total, Device device, List<DeviceChannel> deviceChannelList) {
CatalogData catalogData = data.get(key);
if (catalogData == null) {
@@ -32,10 +44,16 @@ public class CatalogDataCatch {
catalogData.setTotal(total);
catalogData.setDevice(device);
catalogData.setChannelList(new ArrayList<>());
catalogData.setStatus(CatalogData.CatalogDataStatus.runIng);
catalogData.setLastTime(new Date(System.currentTimeMillis()));
data.put(key, catalogData);
}else {
catalogData.setTotal(total);
catalogData.setDevice(device);
catalogData.setStatus(CatalogData.CatalogDataStatus.runIng);
catalogData.getChannelList().addAll(deviceChannelList);
catalogData.setLastTime(new Date(System.currentTimeMillis()));
}
catalogData.getChannelList().addAll(deviceChannelList);
catalogData.setLastTime(new Date(System.currentTimeMillis()));
}
public List<DeviceChannel> get(String key) {
@@ -44,6 +62,22 @@ public class CatalogDataCatch {
return catalogData.getChannelList();
}
public int getTotal(String key) {
CatalogData catalogData = data.get(key);
if (catalogData == null) return 0;
return catalogData.getTotal();
}
public SyncStatus getSyncStatus(String key) {
CatalogData catalogData = data.get(key);
if (catalogData == null) return null;
SyncStatus syncStatus = new SyncStatus();
syncStatus.setCurrent(catalogData.getChannelList().size());
syncStatus.setTotal(catalogData.getTotal());
syncStatus.setErrorMsg(catalogData.getErrorMsg());
return syncStatus;
}
public void del(String key) {
data.remove(key);
}
@@ -51,24 +85,32 @@ public class CatalogDataCatch {
@Scheduled(fixedRate = 5 * 1000) //每5秒执行一次, 发现数据5秒未更新则移除数据并认为数据接收超时
private void timerTask(){
Set<String> keys = data.keySet();
Calendar calendar = Calendar.getInstance();
calendar.setTime(new Date());
calendar.set(Calendar.SECOND, calendar.get(Calendar.SECOND) - 5);
Calendar calendarBefore5S = Calendar.getInstance();
calendarBefore5S.setTime(new Date());
calendarBefore5S.set(Calendar.SECOND, calendarBefore5S.get(Calendar.SECOND) - 5);
Calendar calendarBefore30S = Calendar.getInstance();
calendarBefore30S.setTime(new Date());
calendarBefore30S.set(Calendar.SECOND, calendarBefore30S.get(Calendar.SECOND) - 30);
for (String key : keys) {
CatalogData catalogData = data.get(key);
if (catalogData.getLastTime().before(calendar.getTime())) {
if (catalogData.getLastTime().before(calendarBefore5S.getTime())) { // 超过五秒收不到消息任务超时, 只更新这一部分数据
storager.resetChannels(catalogData.getDevice().getDeviceId(), catalogData.getChannelList());
RequestMessage msg = new RequestMessage();
msg.setKey(key);
WVPResult<Object> result = new WVPResult<>();
result.setCode(0);
result.setMsg("更新成功,共" + catalogData.getTotal() + "条,已更新" + catalogData.getChannelList().size() + "");
result.setData(catalogData.getDevice());
msg.setData(result);
deferredResultHolder.invokeAllResult(msg);
String errorMsg = "更新成功,共" + catalogData.getTotal() + "条,已更新" + catalogData.getChannelList().size() + "";
catalogData.setStatus(CatalogData.CatalogDataStatus.end);
catalogData.setErrorMsg(errorMsg);
}
if (catalogData.getLastTime().before(calendarBefore30S.getTime())) { // 超过三十秒如果标记为end则删除
data.remove(key);
}
}
}
public void setChannelSyncEnd(String key, String errorMsg) {
CatalogData catalogData = data.get(key);
if (catalogData == null)return;
catalogData.setStatus(CatalogData.CatalogDataStatus.end);
catalogData.setErrorMsg(errorMsg);
}
}

View File

@@ -1,5 +1,9 @@
package com.genersoft.iot.vmp.gb28181.task;
import javax.sip.DialogState;
public interface ISubscribeTask extends Runnable{
void stop();
DialogState getDialogState();
}

View File

@@ -5,6 +5,7 @@ import com.genersoft.iot.vmp.gb28181.task.ISubscribeTask;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.Async;
import javax.sip.Dialog;
import javax.sip.DialogState;
@@ -72,4 +73,10 @@ public class CatalogSubscribeTask implements ISubscribeTask {
});
}
}
@Override
public DialogState getDialogState() {
if (dialog == null) return null;
return dialog.getState();
}
}

View File

@@ -1,16 +1,16 @@
package com.genersoft.iot.vmp.gb28181.task.impl;
import com.genersoft.iot.vmp.gb28181.bean.GbStream;
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
import com.genersoft.iot.vmp.gb28181.bean.SubscribeHolder;
import com.genersoft.iot.vmp.gb28181.bean.SubscribeInfo;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.task.ISubscribeTask;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
import com.genersoft.iot.vmp.service.bean.GPSMsgInfo;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.Async;
import java.text.SimpleDateFormat;
import javax.sip.DialogState;
import java.util.List;
/**
@@ -18,20 +18,21 @@ import java.util.List;
*/
public class MobilePositionSubscribeHandlerTask implements ISubscribeTask {
private Logger logger = LoggerFactory.getLogger(MobilePositionSubscribeHandlerTask.class);
private IRedisCatchStorage redisCatchStorage;
private IVideoManagerStorage storager;
private ISIPCommanderForPlatform sipCommanderForPlatform;
private SubscribeHolder subscribeHolder;
private String platformId;
private ParentPlatform platform;
private String sn;
private String key;
private final SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
public MobilePositionSubscribeHandlerTask(IRedisCatchStorage redisCatchStorage, ISIPCommanderForPlatform sipCommanderForPlatform, IVideoManagerStorage storager, String platformId, String sn, String key, SubscribeHolder subscribeInfo) {
System.out.println("MobilePositionSubscribeHandlerTask 初始化");
this.redisCatchStorage = redisCatchStorage;
this.storager = storager;
this.platformId = platformId;
this.platform = storager.queryParentPlatByServerGBId(platformId);
this.sn = sn;
this.key = key;
this.sipCommanderForPlatform = sipCommanderForPlatform;
@@ -41,37 +42,45 @@ public class MobilePositionSubscribeHandlerTask implements ISubscribeTask {
@Override
public void run() {
SubscribeInfo subscribe = subscribeHolder.getMobilePositionSubscribe(platformId);
logger.info("执行MobilePositionSubscribeHandlerTask");
if (platform == null) return;
SubscribeInfo subscribe = subscribeHolder.getMobilePositionSubscribe(platform.getServerGBId());
if (subscribe != null) {
ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(platformId);
if (parentPlatform == null || parentPlatform.isStatus()) {
// TODO 暂时只处理视频流的回复,后续增加对国标设备的支持
List<GbStream> gbStreams = storager.queryGbStreamListInPlatform(platformId);
if (gbStreams.size() > 0) {
for (GbStream gbStream : gbStreams) {
String gbId = gbStream.getGbId();
GPSMsgInfo gpsMsgInfo = redisCatchStorage.getGpsMsgInfo(gbId);
if (gpsMsgInfo != null) {
// 发送GPS消息
sipCommanderForPlatform.sendNotifyMobilePosition(parentPlatform, gpsMsgInfo, subscribe);
}else {
// 没有在redis找到新的消息就使用数据库的消息
gpsMsgInfo = new GPSMsgInfo();
gpsMsgInfo.setId(gbId);
gpsMsgInfo.setLat(gbStream.getLongitude());
gpsMsgInfo.setLng(gbStream.getLongitude());
// 发送GPS消息
sipCommanderForPlatform.sendNotifyMobilePosition(parentPlatform, gpsMsgInfo, subscribe);
}
// if (!parentPlatform.isStatus()) {
// logger.info("发送订阅时发现平台已经离线:{}", platformId);
// return;
// }
// TODO 暂时只处理视频流的回复,后续增加对国标设备的支持
List<GbStream> gbStreams = storager.queryGbStreamListInPlatform(platform.getServerGBId());
if (gbStreams.size() == 0) {
logger.info("发送订阅时发现平台已经没有关联的直播流:{}", platform.getServerGBId());
return;
}
for (GbStream gbStream : gbStreams) {
String gbId = gbStream.getGbId();
GPSMsgInfo gpsMsgInfo = redisCatchStorage.getGpsMsgInfo(gbId);
if (gpsMsgInfo != null) { // 无最新位置不发送
logger.info("无最新位置不发送");
// 经纬度都为0不发送
if (gpsMsgInfo.getLng() == 0 && gpsMsgInfo.getLat() == 0) {
continue;
}
// 发送GPS消息
sipCommanderForPlatform.sendNotifyMobilePosition(platform, gpsMsgInfo, subscribe);
}
}
}
logger.info("结束执行MobilePositionSubscribeHandlerTask");
}
@Override
public void stop() {
}
@Override
public DialogState getDialogState() {
return null;
}
}

View File

@@ -6,10 +6,13 @@ import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander;
import org.dom4j.Element;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.Async;
import javax.sip.Dialog;
import javax.sip.DialogState;
import javax.sip.ResponseEvent;
import java.util.Timer;
import java.util.TimerTask;
/**
* 移动位置订阅的定时更新
@@ -20,6 +23,8 @@ public class MobilePositionSubscribeTask implements ISubscribeTask {
private ISIPCommander sipCommander;
private Dialog dialog;
private Timer timer ;
public MobilePositionSubscribeTask(Device device, ISIPCommander sipCommander) {
this.device = device;
this.sipCommander = sipCommander;
@@ -27,10 +32,14 @@ public class MobilePositionSubscribeTask implements ISubscribeTask {
@Override
public void run() {
if (timer != null ) {
timer.cancel();
timer = null;
}
sipCommander.mobilePositionSubscribe(device, dialog, eventResult -> {
if (eventResult.dialog != null || eventResult.dialog.getState().equals(DialogState.CONFIRMED)) {
dialog = eventResult.dialog;
}
// if (eventResult.dialog != null || eventResult.dialog.getState().equals(DialogState.CONFIRMED)) {
// dialog = eventResult.dialog;
// }
ResponseEvent event = (ResponseEvent) eventResult.event;
if (event.getResponse().getRawContent() != null) {
// 成功
@@ -43,6 +52,13 @@ public class MobilePositionSubscribeTask implements ISubscribeTask {
dialog = null;
// 失败
logger.warn("[移动位置订阅]失败,信令发送失败: {}-{} ", device.getDeviceId(), eventResult.msg);
timer = new Timer();
timer.schedule(new TimerTask() {
@Override
public void run() {
MobilePositionSubscribeTask.this.run();
}
}, 2000);
});
}
@@ -56,8 +72,12 @@ public class MobilePositionSubscribeTask implements ISubscribeTask {
* COMPLETED-> Completed Dialog状态-已完成
* TERMINATED-> Terminated Dialog状态-终止
*/
logger.info("取消移动订阅时dialog状态为{}", dialog.getState());
if (timer != null ) {
timer.cancel();
timer = null;
}
if (dialog != null && dialog.getState().equals(DialogState.CONFIRMED)) {
logger.info("取消移动订阅时dialog状态为{}", dialog.getState());
device.setSubscribeCycleForMobilePosition(0);
sipCommander.mobilePositionSubscribe(device, dialog, eventResult -> {
ResponseEvent event = (ResponseEvent) eventResult.event;
@@ -74,4 +94,9 @@ public class MobilePositionSubscribeTask implements ISubscribeTask {
});
}
}
@Override
public DialogState getDialogState() {
if (dialog == null) return null;
return dialog.getState();
}
}

View File

@@ -46,6 +46,7 @@ public interface ISIPCommanderForPlatform {
* @return
*/
boolean catalogQuery(DeviceChannel channel, ParentPlatform parentPlatform, String sn, String fromTag, int size);
boolean catalogQuery(List<DeviceChannel> channels, ParentPlatform parentPlatform, String sn, String fromTag);
/**
* 向上级回复DeviceInfo查询信息

View File

@@ -1566,17 +1566,28 @@ public class SIPCommander implements ISIPCommander {
cmdXml.append("<DeviceID>" + device.getDeviceId() + "</DeviceID>\r\n");
cmdXml.append("</Query>\r\n");
String tm = Long.toString(System.currentTimeMillis());
CallIdHeader callIdHeader = device.getTransport().equals("TCP") ? tcpSipProvider.getNewCallId()
: udpSipProvider.getNewCallId();
Request request;
if (dialog != null) {
logger.info("发送目录订阅消息时 dialog的状态为 {}", dialog.getState());
request = dialog.createRequest(Request.SUBSCRIBE);
ContentTypeHeader contentTypeHeader = sipFactory.createHeaderFactory().createContentTypeHeader("Application", "MANSCDP+xml");
request.setContent(cmdXml.toString(), contentTypeHeader);
ExpiresHeader expireHeader = sipFactory.createHeaderFactory().createExpiresHeader(device.getSubscribeCycleForMobilePosition());
request.addHeader(expireHeader);
}else {
String tm = Long.toString(System.currentTimeMillis());
// 有效时间默认为60秒以上
Request request = headerProvider.createSubscribeRequest(device, cmdXml.toString(), "z9hG4bK-viaPos-" + tm,
"fromTagPos" + tm, null, device.getSubscribeCycleForCatalog(), "Catalog" ,
callIdHeader);
CallIdHeader callIdHeader = device.getTransport().equals("TCP") ? tcpSipProvider.getNewCallId()
: udpSipProvider.getNewCallId();
// 有效时间默认为60秒以上
request = headerProvider.createSubscribeRequest(device, cmdXml.toString(), "z9hG4bK-viaPos-" + tm,
"fromTagPos" + tm, null, device.getSubscribeCycleForCatalog(), "Catalog" ,
callIdHeader);
}
transmitRequest(device, request, errorEvent, okEvent);
return true;
} catch ( NumberFormatException | ParseException | InvalidArgumentException | SipException e) {

View File

@@ -215,44 +215,7 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform {
return false;
}
try {
String characterSet = parentPlatform.getCharacterSet();
StringBuffer catalogXml = new StringBuffer(600);
catalogXml.append("<?xml version=\"1.0\" encoding=\"" + characterSet +"\"?>\r\n");
catalogXml.append("<Response>\r\n");
catalogXml.append("<CmdType>Catalog</CmdType>\r\n");
catalogXml.append("<SN>" +sn + "</SN>\r\n");
catalogXml.append("<DeviceID>" + parentPlatform.getDeviceGBId() + "</DeviceID>\r\n");
catalogXml.append("<SumNum>" + size + "</SumNum>\r\n");
catalogXml.append("<DeviceList Num=\"1\">\r\n");
catalogXml.append("<Item>\r\n");
if (channel != null) {
catalogXml.append("<DeviceID>" + channel.getChannelId() + "</DeviceID>\r\n");
catalogXml.append("<Name>" + channel.getName() + "</Name>\r\n");
catalogXml.append("<Manufacturer>" + channel.getManufacture() + "</Manufacturer>\r\n");
catalogXml.append("<Model>" + channel.getModel() + "</Model>\r\n");
catalogXml.append("<Owner>" + channel.getOwner() + "</Owner>\r\n");
catalogXml.append("<CivilCode>" + channel.getCivilCode() + "</CivilCode>\r\n");
catalogXml.append("<Address>" + channel.getAddress() + "</Address>\r\n");
catalogXml.append("<Parental>" + channel.getParental() + "</Parental>\r\n");
if (channel.getParentId() != null) {
catalogXml.append("<ParentID>" + channel.getParentId() + "</ParentID>\r\n");
}
catalogXml.append("<Secrecy>" + channel.getSecrecy() + "</Secrecy>\r\n");
catalogXml.append("<RegisterWay>" + channel.getRegisterWay() + "</RegisterWay>\r\n");
catalogXml.append("<Status>" + (channel.getStatus() == 0?"OFF":"ON") + "</Status>\r\n");
catalogXml.append("<Longitude>" + channel.getLongitude() + "</Longitude>\r\n");
catalogXml.append("<Latitude>" + channel.getLatitude() + "</Latitude>\r\n");
catalogXml.append("<IPAddress>" + channel.getIpAddress() + "</IPAddress>\r\n");
catalogXml.append("<Port>" + channel.getPort() + "</Port>\r\n");
catalogXml.append("<Info>\r\n");
catalogXml.append("<PTZType>" + channel.getPTZType() + "</PTZType>\r\n");
catalogXml.append("</Info>\r\n");
}
catalogXml.append("</Item>\r\n");
catalogXml.append("</DeviceList>\r\n");
catalogXml.append("</Response>\r\n");
String catalogXml = getCatalogXml(channel, sn, parentPlatform, size);
// callid
CallIdHeader callIdHeader = parentPlatform.getTransport().equals("TCP") ? tcpSipProvider.getNewCallId()
@@ -268,6 +231,77 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform {
return true;
}
@Override
public boolean catalogQuery(List<DeviceChannel> channels, ParentPlatform parentPlatform, String sn, String fromTag) {
if ( parentPlatform ==null) {
return false;
}
sendCatalogResponse(channels, parentPlatform, sn, fromTag, 0);
return true;
}
private String getCatalogXml(DeviceChannel channel, String sn, ParentPlatform parentPlatform, int size) {
String characterSet = parentPlatform.getCharacterSet();
StringBuffer catalogXml = new StringBuffer(600);
catalogXml.append("<?xml version=\"1.0\" encoding=\"" + characterSet +"\"?>\r\n");
catalogXml.append("<Response>\r\n");
catalogXml.append("<CmdType>Catalog</CmdType>\r\n");
catalogXml.append("<SN>" +sn + "</SN>\r\n");
catalogXml.append("<DeviceID>" + parentPlatform.getDeviceGBId() + "</DeviceID>\r\n");
catalogXml.append("<SumNum>" + size + "</SumNum>\r\n");
catalogXml.append("<DeviceList Num=\"1\">\r\n");
catalogXml.append("<Item>\r\n");
if (channel != null) {
catalogXml.append("<DeviceID>" + channel.getChannelId() + "</DeviceID>\r\n");
catalogXml.append("<Name>" + channel.getName() + "</Name>\r\n");
catalogXml.append("<Manufacturer>" + channel.getManufacture() + "</Manufacturer>\r\n");
catalogXml.append("<Model>" + channel.getModel() + "</Model>\r\n");
catalogXml.append("<Owner>" + channel.getOwner() + "</Owner>\r\n");
catalogXml.append("<CivilCode>" + channel.getCivilCode() + "</CivilCode>\r\n");
catalogXml.append("<Address>" + channel.getAddress() + "</Address>\r\n");
catalogXml.append("<Parental>" + channel.getParental() + "</Parental>\r\n");
if (channel.getParentId() != null) {
catalogXml.append("<ParentID>" + channel.getParentId() + "</ParentID>\r\n");
}
catalogXml.append("<Secrecy>" + channel.getSecrecy() + "</Secrecy>\r\n");
catalogXml.append("<RegisterWay>" + channel.getRegisterWay() + "</RegisterWay>\r\n");
catalogXml.append("<Status>" + (channel.getStatus() == 0?"OFF":"ON") + "</Status>\r\n");
catalogXml.append("<Longitude>" + channel.getLongitude() + "</Longitude>\r\n");
catalogXml.append("<Latitude>" + channel.getLatitude() + "</Latitude>\r\n");
catalogXml.append("<IPAddress>" + channel.getIpAddress() + "</IPAddress>\r\n");
catalogXml.append("<Port>" + channel.getPort() + "</Port>\r\n");
catalogXml.append("<Info>\r\n");
catalogXml.append("<PTZType>" + channel.getPTZType() + "</PTZType>\r\n");
catalogXml.append("</Info>\r\n");
}
catalogXml.append("</Item>\r\n");
catalogXml.append("</DeviceList>\r\n");
catalogXml.append("</Response>\r\n");
return catalogXml.toString();
}
private void sendCatalogResponse(List<DeviceChannel> channels, ParentPlatform parentPlatform, String sn, String fromTag, int index) {
if (index >= channels.size()) {
return;
}
try {
DeviceChannel deviceChannel = channels.get(index);
String catalogXml = getCatalogXml(deviceChannel, sn, parentPlatform, channels.size());
// callid
CallIdHeader callIdHeader = parentPlatform.getTransport().equals("TCP") ? tcpSipProvider.getNewCallId()
: udpSipProvider.getNewCallId();
Request request = headerProviderPlarformProvider.createMessageRequest(parentPlatform, catalogXml, fromTag, callIdHeader);
transmitRequest(parentPlatform, request, null, eventResult -> {
int indexNext = index + 1;
sendCatalogResponse(channels, parentPlatform, sn, fromTag, indexNext);
});
} catch (SipException | ParseException | InvalidArgumentException e) {
e.printStackTrace();
}
}
/**
* 向上级回复DeviceInfo查询信息
* @param parentPlatform 平台信息
@@ -351,7 +385,7 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform {
if (parentPlatform == null) {
return false;
}
logger.info("[发送 移动位置订阅] {}/{}->{},{}", parentPlatform.getServerGBId(), gpsMsgInfo.getId(), gpsMsgInfo.getLng(), gpsMsgInfo.getLat());
try {
String characterSet = parentPlatform.getCharacterSet();
StringBuffer deviceStatusXml = new StringBuffer(600);
@@ -371,7 +405,7 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform {
CallIdHeader callIdHeader = parentPlatform.getTransport().equals("TCP") ? tcpSipProvider.getNewCallId()
: udpSipProvider.getNewCallId();
callIdHeader.setCallId(subscribeInfo.getCallId());
logger.info("[发送Notify-MobilePosition] {}/{}->{},{}", parentPlatform.getServerGBId(), gpsMsgInfo.getId(), gpsMsgInfo.getLng(), gpsMsgInfo.getLat());
sendNotify(parentPlatform, deviceStatusXml.toString(), subscribeInfo, eventResult -> {
logger.error("发送NOTIFY通知消息失败。错误{} {}", eventResult.statusCode, eventResult.msg);
}, null);
@@ -425,7 +459,7 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform {
// 设置编码, 防止中文乱码
messageFactory.setDefaultContentEncodingCharset(characterSet);
Dialog dialog = subscribeInfo.getDialog();
if (dialog == null) return;
if (dialog == null || !dialog.getState().equals(DialogState.CONFIRMED)) return;
SIPRequest notifyRequest = (SIPRequest)dialog.createRequest(Request.NOTIFY);
ContentTypeHeader contentTypeHeader = sipFactory.createHeaderFactory().createContentTypeHeader("Application", "MANSCDP+xml");
notifyRequest.setContent(catalogXmlContent, contentTypeHeader);

View File

@@ -147,7 +147,7 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements
} else {
mobilePosition.setAltitude(0.0);
}
logger.info("[收到Notify-MobilePosition]{}/{}->{}.{}", mobilePosition.getDeviceId(), mobilePosition.getChannelId(),
logger.info("[收到 移动位置订阅]{}/{}->{}.{}", mobilePosition.getDeviceId(), mobilePosition.getChannelId(),
mobilePosition.getLongitude(), mobilePosition.getLatitude());
mobilePosition.setReportSource("Mobile Position");
// 默认来源坐标系为WGS-84处理
@@ -283,7 +283,7 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements
Element eventElement = itemDevice.element("Event");
DeviceChannel channel = XmlUtil.channelContentHander(itemDevice);
channel.setDeviceId(device.getDeviceId());
logger.info("[收到Notify-Catalog]{}/{}", device.getDeviceId(), channel.getChannelId());
logger.info("[收到 目录订阅]{}/{}", device.getDeviceId(), channel.getChannelId());
switch (eventElement.getText().toUpperCase()) {
case CatalogEvent.ON: // 上线
logger.info("收到来自设备【{}】的通道【{}】上线通知", device.getDeviceId(), channel.getChannelId());

View File

@@ -137,6 +137,9 @@ public class SubscribeRequestProcessor extends SIPRequestProcessorParent impleme
String deviceID = XmlUtil.getText(rootElement, "DeviceID");
ParentPlatform platform = storager.queryParentPlatByServerGBId(platformId);
SubscribeInfo subscribeInfo = new SubscribeInfo(evt, platformId);
if (platform == null) {
return;
}
if (evt.getServerTransaction() == null) {
ServerTransaction serverTransaction = platform.getTransport().equals("TCP") ? tcpSipProvider.getNewServerTransaction(evt.getRequest())
: udpSipProvider.getNewServerTransaction(evt.getRequest());
@@ -146,8 +149,7 @@ public class SubscribeRequestProcessor extends SIPRequestProcessorParent impleme
subscribeInfo.setDialog(dialog);
}
String sn = XmlUtil.getText(rootElement, "SN");
String key = VideoManagerConstants.SIP_SUBSCRIBE_PREFIX + userSetting.getServerId() + "_MobilePosition_" + platformId;
logger.info("[notify-MobilePosition]: {}", platformId);
logger.info("[回复 移动位置订阅]: {}", platformId);
StringBuilder resultXml = new StringBuilder(200);
resultXml.append("<?xml version=\"1.0\" ?>\r\n")
.append("<Response>\r\n")
@@ -158,14 +160,25 @@ public class SubscribeRequestProcessor extends SIPRequestProcessorParent impleme
.append("</Response>\r\n");
if (subscribeInfo.getExpires() > 0) {
if (subscribeHolder.getMobilePositionSubscribe(platformId) != null) {
dynamicTask.stop(key);
}
String interval = XmlUtil.getText(rootElement, "Interval"); // GPS上报时间间隔
dynamicTask.startCron(key, new MobilePositionSubscribeHandlerTask(redisCatchStorage, sipCommanderForPlatform, storager, platformId, sn, key, subscribeHolder), Integer.parseInt(interval) -1 );
if (interval == null) {
subscribeInfo.setGpsInterval(5);
}else {
subscribeInfo.setGpsInterval(Integer.parseInt(interval));
}
subscribeInfo.setSn(sn);
subscribeHolder.putMobilePositionSubscribe(platformId, subscribeInfo);
// if (subscribeHolder.getMobilePositionSubscribe(platformId) == null ) {
// subscribeHolder.putMobilePositionSubscribe(platformId, subscribeInfo);
// }else {
// if (subscribeHolder.getMobilePositionSubscribe(platformId).getDialog() != null
// && subscribeHolder.getMobilePositionSubscribe(platformId).getDialog().getState() != null
// && !subscribeHolder.getMobilePositionSubscribe(platformId).getDialog().getState().equals(DialogState.CONFIRMED)) {
// subscribeHolder.putMobilePositionSubscribe(platformId, subscribeInfo);
// }
// }
}else if (subscribeInfo.getExpires() == 0) {
dynamicTask.stop(key);
subscribeHolder.removeMobilePositionSubscribe(platformId);
}
@@ -199,8 +212,7 @@ public class SubscribeRequestProcessor extends SIPRequestProcessorParent impleme
subscribeInfo.setDialog(dialog);
}
String sn = XmlUtil.getText(rootElement, "SN");
String key = VideoManagerConstants.SIP_SUBSCRIBE_PREFIX + userSetting.getServerId() + "_Catalog_" + platformId;
logger.info("[notify-Catalog]: {}", platformId);
logger.info("[回复 目录订阅]: {}/{}", platformId, deviceID);
StringBuilder resultXml = new StringBuilder(200);
resultXml.append("<?xml version=\"1.0\" ?>\r\n")
.append("<Response>\r\n")

View File

@@ -12,6 +12,7 @@ import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorP
import com.genersoft.iot.vmp.gb28181.utils.SipUtils;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
import gov.nist.javax.sip.message.SIPRequest;
import org.dom4j.DocumentException;
import org.dom4j.Element;
import org.slf4j.Logger;
@@ -23,6 +24,7 @@ import org.springframework.stereotype.Component;
import javax.sip.InvalidArgumentException;
import javax.sip.RequestEvent;
import javax.sip.SipException;
import javax.sip.address.SipURI;
import javax.sip.header.CSeqHeader;
import javax.sip.header.CallIdHeader;
import javax.sip.message.Response;
@@ -81,6 +83,17 @@ public class MessageRequestProcessor extends SIPRequestProcessorParent implement
// 查询上级平台是否存在
ParentPlatform parentPlatform = storage.queryParentPlatByServerGBId(deviceId);
try {
if (device != null && parentPlatform != null) {
logger.warn("[重复]平台与设备编号重复:{}", deviceId);
SIPRequest request = (SIPRequest) evt.getRequest();
String hostAddress = request.getRemoteAddress().getHostAddress();
int remotePort = request.getRemotePort();
if (device.getHostAddress().equals(hostAddress + ":" + remotePort)) {
parentPlatform = null;
}else {
device = null;
}
}
if (device == null && parentPlatform == null) {
// 不存在则回复404
responseAck(evt, Response.NOT_FOUND, "device "+ deviceId +" not found");

View File

@@ -23,6 +23,7 @@ import javax.sip.*;
import javax.sip.address.SipURI;
import javax.sip.header.HeaderAddress;
import javax.sip.header.ToHeader;
import javax.sip.message.Response;
import java.text.ParseException;
import java.util.Iterator;
@@ -103,6 +104,18 @@ public class DeviceControlQueryMessageHandler extends SIPRequestProcessorParent
if (!StringUtils.isEmpty(getText(rootElement,"PTZCmd")) && !parentPlatform.getServerGBId().equals(targetGBId)) {
String cmdString = getText(rootElement,"PTZCmd");
Device deviceForPlatform = storager.queryVideoDeviceByPlatformIdAndChannelId(parentPlatform.getServerGBId(), channelId);
if (deviceForPlatform == null) {
try {
responseAck(evt, Response.NOT_FOUND);
return;
} catch (SipException e) {
e.printStackTrace();
} catch (InvalidArgumentException e) {
e.printStackTrace();
} catch (ParseException e) {
e.printStackTrace();
}
}
cmder.fronEndCmd(deviceForPlatform, channelId, cmdString, eventResult -> {
// 失败的回复
try {

View File

@@ -18,6 +18,7 @@ import javax.sip.SipException;
import javax.sip.header.FromHeader;
import javax.sip.message.Response;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.List;
@Component
@@ -58,7 +59,8 @@ public class CatalogNotifyMessageHandler extends SIPRequestProcessorParent imple
List<DeviceChannelInPlatform> deviceChannels = storage.queryChannelListInParentPlatform(parentPlatform.getServerGBId());
// 查询关联的直播通道
List<GbStream> gbStreams = storage.queryGbStreamListInPlatform(parentPlatform.getServerGBId());
int size = deviceChannels.size() + gbStreams.size();
List<DeviceChannel> allChannels = new ArrayList<>();
// 回复目录信息
List<PlatformCatalog> catalogs = storage.queryCatalogInPlatform(parentPlatform.getServerGBId());
if (catalogs.size() > 0) {
@@ -81,9 +83,7 @@ public class CatalogNotifyMessageHandler extends SIPRequestProcessorParent imple
deviceChannel.setModel("live");
deviceChannel.setOwner("wvp-pro");
deviceChannel.setSecrecy("0");
cmderFroPlatform.catalogQuery(deviceChannel, parentPlatform, sn, fromHeader.getTag(), size);
// 防止发送过快
Thread.sleep(100);
allChannels.add(deviceChannel);
}
}
// 回复级联的通道
@@ -96,9 +96,7 @@ public class CatalogNotifyMessageHandler extends SIPRequestProcessorParent imple
deviceChannel.setParental(0);
deviceChannel.setParentId(channel.getCatalogId());
deviceChannel.setCivilCode(parentPlatform.getDeviceGBId().substring(0, 6));
cmderFroPlatform.catalogQuery(deviceChannel, parentPlatform, sn, fromHeader.getTag(), size);
// 防止发送过快
Thread.sleep(100);
allChannels.add(deviceChannel);
}
}
// 回复直播的通道
@@ -114,7 +112,8 @@ public class CatalogNotifyMessageHandler extends SIPRequestProcessorParent imple
deviceChannel.setLatitude(gbStream.getLatitude());
deviceChannel.setDeviceId(parentPlatform.getDeviceGBId());
deviceChannel.setManufacture("wvp-pro");
deviceChannel.setStatus(gbStream.isStatus()?1:0);
// deviceChannel.setStatus(gbStream.isStatus()?1:0);
deviceChannel.setStatus(1);
deviceChannel.setParentId(gbStream.getCatalogId());
deviceChannel.setRegisterWay(1);
deviceChannel.setCivilCode(parentPlatform.getDeviceGBId().substring(0,6));
@@ -122,16 +121,16 @@ public class CatalogNotifyMessageHandler extends SIPRequestProcessorParent imple
deviceChannel.setOwner("wvp-pro");
deviceChannel.setParental(0);
deviceChannel.setSecrecy("0");
cmderFroPlatform.catalogQuery(deviceChannel, parentPlatform, sn, fromHeader.getTag(), size);
// 防止发送过快
Thread.sleep(100);
allChannels.add(deviceChannel);
}
}
if (size == 0) {
if (allChannels.size() > 0) {
cmderFroPlatform.catalogQuery(allChannels, parentPlatform, sn, fromHeader.getTag());
}else {
// 回复无通道
cmderFroPlatform.catalogQuery(null, parentPlatform, sn, fromHeader.getTag(), size);
cmderFroPlatform.catalogQuery(null, parentPlatform, sn, fromHeader.getTag(), 0);
}
} catch (SipException | InvalidArgumentException | ParseException | InterruptedException e) {
} catch (SipException | InvalidArgumentException | ParseException e) {
e.printStackTrace();
}

View File

@@ -22,6 +22,7 @@ import javax.sip.SipException;
import javax.sip.header.FromHeader;
import javax.sip.message.Response;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.List;
@Component
@@ -45,6 +46,9 @@ public class CatalogQueryMessageHandler extends SIPRequestProcessorParent implem
@Autowired
private EventPublisher publisher;
@Autowired
private IVideoManagerStorage storage;
@Override
public void afterPropertiesSet() throws Exception {
queryMessageHandler.addHandler(cmdType, this);
@@ -71,10 +75,11 @@ public class CatalogQueryMessageHandler extends SIPRequestProcessorParent implem
List<GbStream> gbStreams = storager.queryGbStreamListInPlatform(parentPlatform.getServerGBId());
// 回复目录信息
List<PlatformCatalog> catalogs = storager.queryCatalogInPlatform(parentPlatform.getServerGBId());
int size = catalogs.size() + deviceChannelInPlatforms.size() + gbStreams.size();
List<DeviceChannel> allChannels = new ArrayList<>();
if (catalogs.size() > 0) {
for (PlatformCatalog catalog : catalogs) {
if (catalog.getParentId().equals(parentPlatform.getServerGBId())) {
if (catalog.getParentId().equals(catalog.getPlatformId())) {
catalog.setParentId(parentPlatform.getDeviceGBId());
}
DeviceChannel deviceChannel = new DeviceChannel();
@@ -92,9 +97,7 @@ public class CatalogQueryMessageHandler extends SIPRequestProcessorParent implem
deviceChannel.setModel("live");
deviceChannel.setOwner("wvp-pro");
deviceChannel.setSecrecy("0");
cmderFroPlatform.catalogQuery(deviceChannel, parentPlatform, sn, fromHeader.getTag(), size);
// 防止发送过快
Thread.sleep(100);
allChannels.add(deviceChannel);
}
}
// 回复级联的通道
@@ -103,20 +106,18 @@ public class CatalogQueryMessageHandler extends SIPRequestProcessorParent implem
if (channel.getCatalogId().equals(parentPlatform.getServerGBId())) {
channel.setCatalogId(parentPlatform.getDeviceGBId());
}
DeviceChannel deviceChannel = storager.queryChannel(channel.getDeviceId(), channel.getChannelId());
DeviceChannel deviceChannel = storage.queryChannel(channel.getDeviceId(), channel.getChannelId());
deviceChannel.setParental(0);
deviceChannel.setParentId(channel.getCatalogId());
deviceChannel.setCivilCode(parentPlatform.getDeviceGBId().substring(0, 6));
cmderFroPlatform.catalogQuery(deviceChannel, parentPlatform, sn, fromHeader.getTag(), size);
// 防止发送过快
Thread.sleep(100);
allChannels.add(deviceChannel);
}
}
// 回复直播的通道
if (gbStreams.size() > 0) {
for (GbStream gbStream : gbStreams) {
if (gbStream.getCatalogId().equals(parentPlatform.getServerGBId())) {
gbStream.setCatalogId(parentPlatform.getDeviceGBId());
gbStream.setCatalogId(null);
}
DeviceChannel deviceChannel = new DeviceChannel();
deviceChannel.setChannelId(gbStream.getGbId());
@@ -125,7 +126,8 @@ public class CatalogQueryMessageHandler extends SIPRequestProcessorParent implem
deviceChannel.setLatitude(gbStream.getLatitude());
deviceChannel.setDeviceId(parentPlatform.getDeviceGBId());
deviceChannel.setManufacture("wvp-pro");
deviceChannel.setStatus(gbStream.isStatus()?1:0);
// deviceChannel.setStatus(gbStream.isStatus()?1:0);
deviceChannel.setStatus(1);
deviceChannel.setParentId(gbStream.getCatalogId());
deviceChannel.setRegisterWay(1);
deviceChannel.setCivilCode(parentPlatform.getDeviceGBId().substring(0,6));
@@ -133,15 +135,14 @@ public class CatalogQueryMessageHandler extends SIPRequestProcessorParent implem
deviceChannel.setOwner("wvp-pro");
deviceChannel.setParental(0);
deviceChannel.setSecrecy("0");
cmderFroPlatform.catalogQuery(deviceChannel, parentPlatform, sn, fromHeader.getTag(), size);
// 防止发送过快
Thread.sleep(100);
allChannels.add(deviceChannel);
}
}
if (size == 0) {
if (allChannels.size() > 0) {
cmderFroPlatform.catalogQuery(allChannels, parentPlatform, sn, fromHeader.getTag());
}else {
// 回复无通道
cmderFroPlatform.catalogQuery(null, parentPlatform, sn, fromHeader.getTag(), size);
cmderFroPlatform.catalogQuery(null, parentPlatform, sn, fromHeader.getTag(), 0);
}
} catch (SipException e) {
e.printStackTrace();
@@ -149,8 +150,6 @@ public class CatalogQueryMessageHandler extends SIPRequestProcessorParent implem
e.printStackTrace();
} catch (ParseException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}

View File

@@ -116,16 +116,15 @@ public class CatalogResponseMessageHandler extends SIPRequestProcessorParent imp
continue;
}
//by brewswang
if (NumericUtil.isDouble(XmlUtil.getText(itemDevice, "Longitude"))) {//如果包含位置信息,就更新一下位置
processNotifyMobilePosition(evt, itemDevice);
}
// if (NumericUtil.isDouble(XmlUtil.getText(itemDevice, "Longitude"))) {//如果包含位置信息,就更新一下位置
// processNotifyMobilePosition(evt, itemDevice);
// }
DeviceChannel deviceChannel = XmlUtil.channelContentHander(itemDevice);
deviceChannel.setDeviceId(device.getDeviceId());
logger.debug("收到来自设备【{}】的通道: {}【{}】", device.getDeviceId(), deviceChannel.getName(), deviceChannel.getChannelId());
channelList.add(deviceChannel);
}
logger.info("收到来自设备【{}】的通道: {}个,{}/{}", device.getDeviceId(), channelList.size(), catalogDataCatch.get(key) == null ? 0 :catalogDataCatch.get(key).size(), sumNum);
catalogDataCatch.put(key, sumNum, device, channelList);
if (catalogDataCatch.get(key).size() == sumNum) {
// 数据已经完整接收
@@ -147,9 +146,6 @@ public class CatalogResponseMessageHandler extends SIPRequestProcessorParent imp
}
// 回复200 OK
responseAck(evt, Response.OK);
if (offLineDetector.isOnline(device.getDeviceId())) {
publisher.onlineEventPublish(device, VideoManagerConstants.EVENT_ONLINE_MESSAGE);
}
}
} catch (DocumentException e) {
e.printStackTrace();
@@ -231,4 +227,23 @@ public class CatalogResponseMessageHandler extends SIPRequestProcessorParent imp
e.printStackTrace();
}
}
public SyncStatus getChannelSyncProgress(String deviceId) {
String key = DeferredResultHolder.CALLBACK_CMD_CATALOG + deviceId;
if (catalogDataCatch.get(key) == null) {
return null;
}else {
return catalogDataCatch.getSyncStatus(key);
}
}
public void setChannelSyncReady(String deviceId) {
String key = DeferredResultHolder.CALLBACK_CMD_CATALOG + deviceId;
catalogDataCatch.addReady(key);
}
public void setChannelSyncEnd(String deviceId, String errorMsg) {
String key = DeferredResultHolder.CALLBACK_CMD_CATALOG + deviceId;
catalogDataCatch.setChannelSyncEnd(key, errorMsg);
}
}