支持级联移动位置订阅通知转发
This commit is contained in:
@@ -103,6 +103,16 @@ public class SubscribeHolder {
|
||||
return platforms;
|
||||
}
|
||||
|
||||
public List<String> getAllMobilePositionSubscribePlatform() {
|
||||
List<String> platforms = new ArrayList<>();
|
||||
if(!mobilePositionMap.isEmpty()) {
|
||||
for (String key : mobilePositionMap.keySet()) {
|
||||
platforms.add(mobilePositionMap.get(key).getId());
|
||||
}
|
||||
}
|
||||
return platforms;
|
||||
}
|
||||
|
||||
public void removeAllSubscribe(String platformId) {
|
||||
removeMobilePositionSubscribe(platformId);
|
||||
removeCatalogSubscribe(platformId);
|
||||
|
||||
@@ -4,6 +4,7 @@ import com.genersoft.iot.vmp.gb28181.bean.*;
|
||||
import com.genersoft.iot.vmp.gb28181.event.device.RequestTimeoutEvent;
|
||||
import com.genersoft.iot.vmp.gb28181.event.record.RecordEndEvent;
|
||||
import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent;
|
||||
import com.genersoft.iot.vmp.gb28181.event.subscribe.mobilePosition.MobilePositionEvent;
|
||||
import com.genersoft.iot.vmp.media.zlm.event.ZLMOfflineEvent;
|
||||
import com.genersoft.iot.vmp.media.zlm.event.ZLMOnlineEvent;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
@@ -94,6 +95,13 @@ public class EventPublisher {
|
||||
}
|
||||
|
||||
|
||||
public void mobilePositionEventPublish(MobilePosition mobilePosition) {
|
||||
MobilePositionEvent event = new MobilePositionEvent(this);
|
||||
event.setMobilePosition(mobilePosition);
|
||||
applicationEventPublisher.publishEvent(event);
|
||||
}
|
||||
|
||||
|
||||
public void catalogEventPublishForStream(String platformId, List<GbStream> gbStreams, String type) {
|
||||
CatalogEvent outEvent = new CatalogEvent(this);
|
||||
outEvent.setGbStreams(gbStreams);
|
||||
|
||||
@@ -0,0 +1,20 @@
|
||||
package com.genersoft.iot.vmp.gb28181.event.subscribe.mobilePosition;
|
||||
|
||||
import com.genersoft.iot.vmp.gb28181.bean.MobilePosition;
|
||||
import org.springframework.context.ApplicationEvent;
|
||||
|
||||
public class MobilePositionEvent extends ApplicationEvent {
|
||||
public MobilePositionEvent(Object source) {
|
||||
super(source);
|
||||
}
|
||||
|
||||
private MobilePosition mobilePosition;
|
||||
|
||||
public MobilePosition getMobilePosition() {
|
||||
return mobilePosition;
|
||||
}
|
||||
|
||||
public void setMobilePosition(MobilePosition mobilePosition) {
|
||||
this.mobilePosition = mobilePosition;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,61 @@
|
||||
package com.genersoft.iot.vmp.gb28181.event.subscribe.mobilePosition;
|
||||
|
||||
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
|
||||
import com.genersoft.iot.vmp.gb28181.bean.SubscribeHolder;
|
||||
import com.genersoft.iot.vmp.gb28181.bean.SubscribeInfo;
|
||||
import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform;
|
||||
import com.genersoft.iot.vmp.service.bean.GPSMsgInfo;
|
||||
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.context.ApplicationListener;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import javax.sip.InvalidArgumentException;
|
||||
import javax.sip.SipException;
|
||||
import java.text.ParseException;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* 移动位置通知消息转发
|
||||
*/
|
||||
@Component
|
||||
public class MobilePositionEventLister implements ApplicationListener<MobilePositionEvent> {
|
||||
|
||||
private final static Logger logger = LoggerFactory.getLogger(MobilePositionEventLister.class);
|
||||
|
||||
@Autowired
|
||||
private IVideoManagerStorage storager;
|
||||
|
||||
@Autowired
|
||||
private SIPCommanderFroPlatform sipCommanderFroPlatform;
|
||||
|
||||
@Autowired
|
||||
private SubscribeHolder subscribeHolder;
|
||||
|
||||
@Override
|
||||
public void onApplicationEvent(MobilePositionEvent event) {
|
||||
// 获取所用订阅
|
||||
List<String> platforms = subscribeHolder.getAllMobilePositionSubscribePlatform();
|
||||
if (platforms.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
List<ParentPlatform> parentPlatformsForGB = storager.queryPlatFormListForGBWithGBId(event.getMobilePosition().getChannelId(), platforms);
|
||||
|
||||
for (ParentPlatform platform : parentPlatformsForGB) {
|
||||
logger.info("[向上级发送MobilePosition] 通道:{},平台:{}, 位置: {}:{}", event.getMobilePosition().getChannelId(),
|
||||
platform.getServerGBId(), event.getMobilePosition().getLongitude(), event.getMobilePosition().getLatitude());
|
||||
SubscribeInfo subscribe = subscribeHolder.getMobilePositionSubscribe(platform.getServerGBId());
|
||||
try {
|
||||
sipCommanderFroPlatform.sendNotifyMobilePosition(platform, GPSMsgInfo.getInstance(event.getMobilePosition()),
|
||||
subscribe);
|
||||
} catch (InvalidArgumentException | ParseException | NoSuchFieldException | SipException |
|
||||
IllegalAccessException e) {
|
||||
logger.error("[命令发送失败] 国标级联 Catalog通知: {}", e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,7 +1,5 @@
|
||||
package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl;
|
||||
|
||||
import com.alibaba.fastjson2.JSONObject;
|
||||
import com.genersoft.iot.vmp.conf.CivilCodeFileConf;
|
||||
import com.genersoft.iot.vmp.conf.SipConfig;
|
||||
import com.genersoft.iot.vmp.conf.UserSetting;
|
||||
import com.genersoft.iot.vmp.gb28181.bean.*;
|
||||
@@ -78,9 +76,6 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements
|
||||
@Autowired
|
||||
private NotifyRequestForCatalogProcessor notifyRequestForCatalogProcessor;
|
||||
|
||||
@Autowired
|
||||
private CivilCodeFileConf civilCodeFileConf;
|
||||
|
||||
private ConcurrentLinkedQueue<HandlerCatchData> taskQueue = new ConcurrentLinkedQueue<>();
|
||||
|
||||
@Qualifier("taskExecutor")
|
||||
@@ -98,7 +93,6 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements
|
||||
@Override
|
||||
public void process(RequestEvent evt) {
|
||||
try {
|
||||
|
||||
if (taskQueue.size() >= userSetting.getMaxNotifyCountQueue()) {
|
||||
responseAck((SIPRequest) evt.getRequest(), Response.BUSY_HERE, null, null);
|
||||
logger.error("[notify] 待处理消息队列已满 {},返回486 BUSY_HERE,消息不做处理", userSetting.getMaxNotifyCountQueue());
|
||||
@@ -234,25 +228,8 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements
|
||||
mobilePosition.setLongitudeGcj02(deviceChannel.getLongitudeGcj02());
|
||||
mobilePosition.setLatitudeGcj02(deviceChannel.getLatitudeGcj02());
|
||||
|
||||
if (userSetting.getSavePositionHistory()) {
|
||||
storager.insertMobilePosition(mobilePosition);
|
||||
}
|
||||
deviceChannelService.updateChannelGPS(device, deviceChannel, mobilePosition);
|
||||
|
||||
storager.updateChannelPosition(deviceChannel);
|
||||
// 向关联了该通道并且开启移动位置订阅的上级平台发送移动位置订阅消息
|
||||
|
||||
|
||||
// 发送redis消息。 通知位置信息的变化
|
||||
JSONObject jsonObject = new JSONObject();
|
||||
jsonObject.put("time", DateUtil.yyyy_MM_dd_HH_mm_ssToISO8601(mobilePosition.getTime()));
|
||||
jsonObject.put("serial", deviceId);
|
||||
jsonObject.put("code", channelId);
|
||||
jsonObject.put("longitude", mobilePosition.getLongitude());
|
||||
jsonObject.put("latitude", mobilePosition.getLatitude());
|
||||
jsonObject.put("altitude", mobilePosition.getAltitude());
|
||||
jsonObject.put("direction", mobilePosition.getDirection());
|
||||
jsonObject.put("speed", mobilePosition.getSpeed());
|
||||
redisCatchStorage.sendMobilePositionMsg(jsonObject);
|
||||
} catch (DocumentException e) {
|
||||
logger.error("未处理的异常 ", e);
|
||||
}
|
||||
@@ -340,25 +317,8 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements
|
||||
mobilePosition.setLongitudeGcj02(deviceChannel.getLongitudeGcj02());
|
||||
mobilePosition.setLatitudeGcj02(deviceChannel.getLatitudeGcj02());
|
||||
|
||||
if (userSetting.getSavePositionHistory()) {
|
||||
storager.insertMobilePosition(mobilePosition);
|
||||
}
|
||||
|
||||
storager.updateChannelPosition(deviceChannel);
|
||||
// 发送redis消息。 通知位置信息的变化
|
||||
JSONObject jsonObject = new JSONObject();
|
||||
jsonObject.put("time", DateUtil.yyyy_MM_dd_HH_mm_ssToISO8601(mobilePosition.getTime()));
|
||||
jsonObject.put("serial", deviceChannel.getDeviceId());
|
||||
jsonObject.put("code", deviceChannel.getChannelId());
|
||||
jsonObject.put("longitude", mobilePosition.getLongitude());
|
||||
jsonObject.put("latitude", mobilePosition.getLatitude());
|
||||
jsonObject.put("altitude", mobilePosition.getAltitude());
|
||||
jsonObject.put("direction", mobilePosition.getDirection());
|
||||
jsonObject.put("speed", mobilePosition.getSpeed());
|
||||
redisCatchStorage.sendMobilePositionMsg(jsonObject);
|
||||
|
||||
deviceChannelService.updateChannelGPS(device, deviceChannel, mobilePosition);
|
||||
}
|
||||
// TODO: 需要实现存储报警信息、报警分类
|
||||
|
||||
// 回复200 OK
|
||||
if (redisCatchStorage.deviceIsOnline(deviceId)) {
|
||||
|
||||
@@ -75,6 +75,9 @@ public class AlarmNotifyMessageHandler extends SIPRequestProcessorParent impleme
|
||||
@Autowired
|
||||
private ThreadPoolTaskExecutor taskExecutor;
|
||||
|
||||
@Autowired
|
||||
private EventPublisher eventPublisher;
|
||||
|
||||
|
||||
@Override
|
||||
public void afterPropertiesSet() throws Exception {
|
||||
@@ -158,22 +161,7 @@ public class AlarmNotifyMessageHandler extends SIPRequestProcessorParent impleme
|
||||
mobilePosition.setLongitudeGcj02(deviceChannel.getLongitudeGcj02());
|
||||
mobilePosition.setLatitudeGcj02(deviceChannel.getLatitudeGcj02());
|
||||
|
||||
if (userSetting.getSavePositionHistory()) {
|
||||
storager.insertMobilePosition(mobilePosition);
|
||||
}
|
||||
storager.updateChannelPosition(deviceChannel);
|
||||
|
||||
// 发送redis消息。 通知位置信息的变化
|
||||
JSONObject jsonObject = new JSONObject();
|
||||
jsonObject.put("time", DateUtil.yyyy_MM_dd_HH_mm_ssToISO8601(mobilePosition.getTime()));
|
||||
jsonObject.put("serial", deviceChannel.getDeviceId());
|
||||
jsonObject.put("code", deviceChannel.getChannelId());
|
||||
jsonObject.put("longitude", mobilePosition.getLongitude());
|
||||
jsonObject.put("latitude", mobilePosition.getLatitude());
|
||||
jsonObject.put("altitude", mobilePosition.getAltitude());
|
||||
jsonObject.put("direction", mobilePosition.getDirection());
|
||||
jsonObject.put("speed", mobilePosition.getSpeed());
|
||||
redisCatchStorage.sendMobilePositionMsg(jsonObject);
|
||||
deviceChannelService.updateChannelGPS(device, deviceChannel, mobilePosition);
|
||||
}
|
||||
}
|
||||
if (!ObjectUtils.isEmpty(deviceAlarm.getDeviceId())) {
|
||||
|
||||
@@ -1,8 +1,8 @@
|
||||
package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.notify.cmd;
|
||||
|
||||
import com.alibaba.fastjson2.JSONObject;
|
||||
import com.genersoft.iot.vmp.conf.UserSetting;
|
||||
import com.genersoft.iot.vmp.gb28181.bean.*;
|
||||
import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
|
||||
import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
|
||||
import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.IMessageHandler;
|
||||
import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.notify.NotifyMessageHandler;
|
||||
@@ -57,6 +57,9 @@ public class MobilePositionNotifyMessageHandler extends SIPRequestProcessorParen
|
||||
@Autowired
|
||||
private IDeviceChannelService deviceChannelService;
|
||||
|
||||
@Autowired
|
||||
private EventPublisher eventPublisher;
|
||||
|
||||
private ConcurrentLinkedQueue<SipMsgInfo> taskQueue = new ConcurrentLinkedQueue<>();
|
||||
|
||||
@Qualifier("taskExecutor")
|
||||
@@ -137,22 +140,7 @@ public class MobilePositionNotifyMessageHandler extends SIPRequestProcessorParen
|
||||
mobilePosition.setLongitudeGcj02(deviceChannel.getLongitudeGcj02());
|
||||
mobilePosition.setLatitudeGcj02(deviceChannel.getLatitudeGcj02());
|
||||
|
||||
if (userSetting.getSavePositionHistory()) {
|
||||
storager.insertMobilePosition(mobilePosition);
|
||||
}
|
||||
storager.updateChannelPosition(deviceChannel);
|
||||
|
||||
// 发送redis消息。 通知位置信息的变化
|
||||
JSONObject jsonObject = new JSONObject();
|
||||
jsonObject.put("time", DateUtil.yyyy_MM_dd_HH_mm_ssToISO8601(mobilePosition.getTime()));
|
||||
jsonObject.put("serial", deviceChannel.getDeviceId());
|
||||
jsonObject.put("code", deviceChannel.getChannelId());
|
||||
jsonObject.put("longitude", mobilePosition.getLongitude());
|
||||
jsonObject.put("latitude", mobilePosition.getLatitude());
|
||||
jsonObject.put("altitude", mobilePosition.getAltitude());
|
||||
jsonObject.put("direction", mobilePosition.getDirection());
|
||||
jsonObject.put("speed", mobilePosition.getSpeed());
|
||||
redisCatchStorage.sendMobilePositionMsg(jsonObject);
|
||||
deviceChannelService.updateChannelGPS(device, deviceChannel, mobilePosition);
|
||||
|
||||
} catch (DocumentException e) {
|
||||
logger.error("未处理的异常 ", e);
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.response.cmd;
|
||||
|
||||
import com.alibaba.fastjson2.JSONObject;
|
||||
import com.genersoft.iot.vmp.conf.UserSetting;
|
||||
import com.genersoft.iot.vmp.gb28181.bean.Device;
|
||||
import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
|
||||
@@ -131,11 +130,7 @@ public class MobilePositionResponseMessageHandler extends SIPRequestProcessorPar
|
||||
mobilePosition.setLongitudeGcj02(deviceChannel.getLongitudeGcj02());
|
||||
mobilePosition.setLatitudeGcj02(deviceChannel.getLatitudeGcj02());
|
||||
|
||||
if (userSetting.getSavePositionHistory()) {
|
||||
storager.insertMobilePosition(mobilePosition);
|
||||
}
|
||||
|
||||
storager.updateChannelPosition(deviceChannel);
|
||||
deviceChannelService.updateChannelGPS(device, deviceChannel, mobilePosition);
|
||||
|
||||
String key = DeferredResultHolder.CALLBACK_CMD_MOBILE_POSITION + device.getDeviceId();
|
||||
RequestMessage msg = new RequestMessage();
|
||||
@@ -143,17 +138,6 @@ public class MobilePositionResponseMessageHandler extends SIPRequestProcessorPar
|
||||
msg.setData(mobilePosition);
|
||||
resultHolder.invokeAllResult(msg);
|
||||
|
||||
// 发送redis消息。 通知位置信息的变化
|
||||
JSONObject jsonObject = new JSONObject();
|
||||
jsonObject.put("time", DateUtil.yyyy_MM_dd_HH_mm_ssToISO8601(mobilePosition.getTime()));
|
||||
jsonObject.put("serial", deviceChannel.getDeviceId());
|
||||
jsonObject.put("code", deviceChannel.getChannelId());
|
||||
jsonObject.put("longitude", mobilePosition.getLongitude());
|
||||
jsonObject.put("latitude", mobilePosition.getLatitude());
|
||||
jsonObject.put("altitude", mobilePosition.getAltitude());
|
||||
jsonObject.put("direction", mobilePosition.getDirection());
|
||||
jsonObject.put("speed", mobilePosition.getSpeed());
|
||||
redisCatchStorage.sendMobilePositionMsg(jsonObject);
|
||||
//回复 200 OK
|
||||
try {
|
||||
responseAck(request, Response.OK);
|
||||
|
||||
Reference in New Issue
Block a user