From 8cba63642fcff122907bd7d7a8d7d822555d34ca Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: Mon, 22 Apr 2024 20:29:36 +0800 Subject: [PATCH 1/8] =?UTF-8?q?=E4=BC=98=E5=8C=96notify=E6=B6=88=E6=81=AF?= =?UTF-8?q?=E5=A4=84=E7=90=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../genersoft/iot/vmp/conf/UserSetting.java | 2 +- .../NotifyRequestForCatalogProcessor.java | 26 ++- ...tifyRequestForMobilePositionProcessor.java | 163 ++++++++++-------- .../request/impl/NotifyRequestProcessor.java | 24 ++- .../impl/DeviceChannelServiceImpl.java | 4 +- .../vmp/storager/dao/DeviceChannelMapper.java | 4 +- .../dao/DeviceMobilePositionMapper.java | 15 ++ 7 files changed, 143 insertions(+), 95 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/conf/UserSetting.java b/src/main/java/com/genersoft/iot/vmp/conf/UserSetting.java index a9f5c88a5..a9b17aef2 100644 --- a/src/main/java/com/genersoft/iot/vmp/conf/UserSetting.java +++ b/src/main/java/com/genersoft/iot/vmp/conf/UserSetting.java @@ -66,7 +66,7 @@ public class UserSetting { private List allowedOrigins = new ArrayList<>(); - private int maxNotifyCountQueue = 10000; + private int maxNotifyCountQueue = 100000; private int registerAgainAfterTime = 60; diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForCatalogProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForCatalogProcessor.java index cde70eb4d..c80cc88f0 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForCatalogProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForCatalogProcessor.java @@ -96,10 +96,6 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent // 遍历DeviceList while (deviceListIterator.hasNext()) { Element itemDevice = deviceListIterator.next(); - Element channelDeviceElement = itemDevice.element("DeviceID"); - if (channelDeviceElement == null) { - continue; - } Element eventElement = itemDevice.element("Event"); String event; if (eventElement == null) { @@ -264,22 +260,13 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent } } + // TODO 同一个通道如果先发送更新再发送离线可能无法正常离线 private void executeSave(){ try { executeSaveForAdd(); } catch (Exception e) { logger.error("[存储收到的增加通道] 异常: ", e ); } - try { - executeSaveForUpdate(); - } catch (Exception e) { - logger.error("[存储收到的更新通道] 异常: ", e ); - } - try { - executeSaveForDelete(); - } catch (Exception e) { - logger.error("[存储收到的删除通道] 异常: ", e ); - } try { executeSaveForOnline(); } catch (Exception e) { @@ -290,6 +277,17 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent } catch (Exception e) { logger.error("[存储收到的通道离线] 异常: ", e ); } + try { + executeSaveForUpdate(); + } catch (Exception e) { + logger.error("[存储收到的更新通道] 异常: ", e ); + } + try { + executeSaveForDelete(); + } catch (Exception e) { + logger.error("[存储收到的删除通道] 异常: ", e ); + } + dynamicTask.stop(talkKey); } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForMobilePositionProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForMobilePositionProcessor.java index 89f57c2a2..460a50789 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForMobilePositionProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForMobilePositionProcessor.java @@ -11,7 +11,6 @@ import com.genersoft.iot.vmp.gb28181.event.EventPublisher; import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent; import com.genersoft.iot.vmp.gb28181.utils.NumericUtil; import com.genersoft.iot.vmp.gb28181.utils.SipUtils; -import com.genersoft.iot.vmp.gb28181.utils.XmlUtil; import com.genersoft.iot.vmp.service.IDeviceChannelService; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.utils.DateUtil; @@ -20,12 +19,12 @@ import org.dom4j.Element; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Component; import org.springframework.util.ObjectUtils; import javax.sip.RequestEvent; import javax.sip.header.FromHeader; -import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -68,78 +67,100 @@ public class NotifyRequestForMobilePositionProcessor extends SIPRequestProcessor private final static String talkKey = "notify-request-for-mobile-position-task"; +// @Async("taskExecutor") public void process(RequestEvent evt) { try { FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME); String deviceId = SipUtils.getUserIdFromFromHeader(fromHeader); - + long startTime = System.currentTimeMillis(); // 回复 200 OK Element rootElement = getRootElement(evt); if (rootElement == null) { logger.error("处理MobilePosition移动位置Notify时未获取到消息体,{}", evt.getRequest()); return; } - + Device device = redisCatchStorage.getDevice(deviceId); MobilePosition mobilePosition = new MobilePosition(); mobilePosition.setCreateTime(DateUtil.getNow()); + List elements = rootElement.elements(); + String channelId = null; + for (Element element : elements) { + switch (element.getName()){ + case "DeviceID": + channelId = element.getStringValue(); + if (device == null) { + device = redisCatchStorage.getDevice(channelId); + if (device == null) { + // 根据通道id查询设备Id + List deviceList = deviceChannelService.getDeviceByChannelId(channelId); + if (!deviceList.isEmpty()) { + device = deviceList.get(0); + } + } + } + if (device == null) { + logger.warn("[mobilePosition移动位置Notify] 未找到通道{}所属的设备", channelId); + return; + } + mobilePosition.setDeviceId(device.getDeviceId()); + mobilePosition.setChannelId(channelId); + // 兼容设备部分设备上报是通道编号与设备编号一致的情况 + if (deviceId.equals(channelId)) { + List deviceChannels = deviceChannelService.queryChaneListByDeviceId(deviceId); + if (deviceChannels.size() == 1) { + channelId = deviceChannels.get(0).getChannelId(); + } + } + if (!ObjectUtils.isEmpty(device.getName())) { + mobilePosition.setDeviceName(device.getName()); + } + mobilePosition.setDeviceId(device.getDeviceId()); + mobilePosition.setChannelId(channelId); + continue; + case "Time": + String timeVal = element.getStringValue(); + if (ObjectUtils.isEmpty(timeVal)) { + mobilePosition.setTime(DateUtil.getNow()); + } else { + mobilePosition.setTime(SipUtils.parseTime(timeVal)); + } + continue; + case "Longitude": + mobilePosition.setLongitude(Double.parseDouble(element.getStringValue())); + continue; + case "Latitude": + mobilePosition.setLatitude(Double.parseDouble(element.getStringValue())); + continue; + case "Speed": + String speedVal = element.getStringValue(); + if (NumericUtil.isDouble(speedVal)) { + mobilePosition.setSpeed(Double.parseDouble(speedVal)); + } else { + mobilePosition.setSpeed(0.0); + } + continue; + case "Direction": + String directionVal = element.getStringValue(); + if (NumericUtil.isDouble(directionVal)) { + mobilePosition.setDirection(Double.parseDouble(directionVal)); + } else { + mobilePosition.setDirection(0.0); + } + continue; + case "Altitude": + String altitudeVal = element.getStringValue(); + if (NumericUtil.isDouble(altitudeVal)) { + mobilePosition.setAltitude(Double.parseDouble(altitudeVal)); + } else { + mobilePosition.setAltitude(0.0); + } + continue; - Element deviceIdElement = rootElement.element("DeviceID"); - String channelId = deviceIdElement.getTextTrim().toString(); - Device device = redisCatchStorage.getDevice(deviceId); - - if (device == null) { - device = redisCatchStorage.getDevice(channelId); - if (device == null) { - // 根据通道id查询设备Id - List deviceList = deviceChannelService.getDeviceByChannelId(channelId); - if (deviceList.size() > 0) { - device = deviceList.get(0); - } } } - if (device == null) { - logger.warn("[mobilePosition移动位置Notify] 未找到通道{}所属的设备", channelId); - return; - } - // 兼容设备部分设备上报是通道编号与设备编号一致的情况 - if (deviceId.equals(channelId)) { - List deviceChannels = deviceChannelService.queryChaneListByDeviceId(deviceId); - if (deviceChannels.size() == 1) { - channelId = deviceChannels.get(0).getChannelId(); - } - } - if (!ObjectUtils.isEmpty(device.getName())) { - mobilePosition.setDeviceName(device.getName()); - } - mobilePosition.setDeviceId(device.getDeviceId()); - mobilePosition.setChannelId(channelId); - String time = XmlUtil.getText(rootElement, "Time"); - if (ObjectUtils.isEmpty(time)) { - mobilePosition.setTime(DateUtil.getNow()); - } else { - mobilePosition.setTime(SipUtils.parseTime(time)); - } - - mobilePosition.setLongitude(Double.parseDouble(XmlUtil.getText(rootElement, "Longitude"))); - mobilePosition.setLatitude(Double.parseDouble(XmlUtil.getText(rootElement, "Latitude"))); - if (NumericUtil.isDouble(XmlUtil.getText(rootElement, "Speed"))) { - mobilePosition.setSpeed(Double.parseDouble(XmlUtil.getText(rootElement, "Speed"))); - } else { - mobilePosition.setSpeed(0.0); - } - if (NumericUtil.isDouble(XmlUtil.getText(rootElement, "Direction"))) { - mobilePosition.setDirection(Double.parseDouble(XmlUtil.getText(rootElement, "Direction"))); - } else { - mobilePosition.setDirection(0.0); - } - if (NumericUtil.isDouble(XmlUtil.getText(rootElement, "Altitude"))) { - mobilePosition.setAltitude(Double.parseDouble(XmlUtil.getText(rootElement, "Altitude"))); - } else { - mobilePosition.setAltitude(0.0); - } - logger.info("[收到移动位置订阅通知]:{}/{}->{}.{}", mobilePosition.getDeviceId(), mobilePosition.getChannelId(), - mobilePosition.getLongitude(), mobilePosition.getLatitude()); +// logger.info("[收到移动位置订阅通知]:{}/{}->{}.{}, 时间: {}", mobilePosition.getDeviceId(), mobilePosition.getChannelId(), +// mobilePosition.getLongitude(), mobilePosition.getLatitude(), System.currentTimeMillis() - startTime); mobilePosition.setReportSource("Mobile Position"); // 更新device channel 的经纬度 @@ -149,13 +170,13 @@ public class NotifyRequestForMobilePositionProcessor extends SIPRequestProcessor deviceChannel.setLongitude(mobilePosition.getLongitude()); deviceChannel.setLatitude(mobilePosition.getLatitude()); deviceChannel.setGpsTime(mobilePosition.getTime()); - updateChannelMap.put(channelId, deviceChannel); + updateChannelMap.put(deviceId + channelId, deviceChannel); addMobilePositionList.add(mobilePosition); - if(updateChannelMap.size() > 300) { + if(updateChannelMap.size() > 100) { executeSaveChannel(); } if (userSetting.isSavePositionHistory()) { - if(addMobilePositionList.size() > 300) { + if(addMobilePositionList.size() > 100) { executeSaveMobilePosition(); } } @@ -170,7 +191,7 @@ public class NotifyRequestForMobilePositionProcessor extends SIPRequestProcessor // deviceChannelService.updateChannelGPS(device, deviceChannel, mobilePosition); if (!dynamicTask.contains(talkKey)) { - dynamicTask.startDelay(talkKey, this::executeSave, 1000); + dynamicTask.startDelay(talkKey, this::executeSave, 3000); } } catch (DocumentException e) { @@ -186,22 +207,24 @@ public class NotifyRequestForMobilePositionProcessor extends SIPRequestProcessor dynamicTask.stop(talkKey); } - private void executeSaveChannel(){ + @Async("taskExecutor") + public void executeSaveChannel(){ + dynamicTask.execute(); try { logger.info("[移动位置订阅]更新通道位置: {}", updateChannelMap.size()); - ArrayList deviceChannels = new ArrayList<>(updateChannelMap.values()); - deviceChannelService.batchUpdateChannelGPS(deviceChannels); +// ArrayList deviceChannels = new ArrayList<>(updateChannelMap.values()); +// deviceChannelService.batchUpdateChannelGPS(deviceChannels); updateChannelMap.clear(); }catch (Exception e) { } } - - private void executeSaveMobilePosition(){ + @Async("taskExecutor") + public void executeSaveMobilePosition(){ if (userSetting.isSavePositionHistory()) { try { - logger.info("[移动位置订阅] 添加通道轨迹点位: {}", addMobilePositionList.size()); - deviceChannelService.batchAddMobilePosition(addMobilePositionList); +// logger.info("[移动位置订阅] 添加通道轨迹点位: {}", addMobilePositionList.size()); +// deviceChannelService.batchAddMobilePosition(addMobilePositionList); addMobilePositionList.clear(); }catch (Exception e) { logger.info("[移动位置订阅] b添加通道轨迹点位保存失败: {}", addMobilePositionList.size()); @@ -209,6 +232,4 @@ public class NotifyRequestForMobilePositionProcessor extends SIPRequestProcessor } } - - } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java index 84f44b583..2dd107a77 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java @@ -25,6 +25,8 @@ import org.slf4j.LoggerFactory; import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.scheduling.annotation.Async; +import org.springframework.scheduling.annotation.Scheduled; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Component; import org.springframework.util.ObjectUtils; @@ -76,6 +78,9 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements @Autowired private NotifyRequestForCatalogProcessor notifyRequestForCatalogProcessor; + @Autowired + private NotifyRequestForMobilePositionProcessor notifyRequestForMobilePositionProcessor; + private ConcurrentLinkedQueue taskQueue = new ConcurrentLinkedQueue<>(); @Qualifier("taskExecutor") @@ -105,10 +110,10 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements logger.error("未处理的异常 ", e); } boolean runed = !taskQueue.isEmpty(); - logger.info("[notify] 待处理消息数量: {}", taskQueue.size()); taskQueue.offer(new HandlerCatchData(evt, null, null)); if (!runed) { taskExecutor.execute(()-> { +// logger.warn("开始处理"); while (!taskQueue.isEmpty()) { try { HandlerCatchData take = taskQueue.poll(); @@ -129,8 +134,12 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements logger.info("接收到Alarm通知"); processNotifyAlarm(take.getEvt()); } else if (CmdType.MOBILE_POSITION.equals(cmd)) { - logger.info("接收到MobilePosition通知"); - processNotifyMobilePosition(take.getEvt()); +// logger.info("接收到MobilePosition通知"); +// processNotifyMobilePosition(take.getEvt()); + taskExecutor.execute(() -> { + notifyRequestForMobilePositionProcessor.process(take.getEvt()); + }); + } else { logger.info("接收到消息:" + cmd); } @@ -147,11 +156,11 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements * * @param evt */ - private void processNotifyMobilePosition(RequestEvent evt) { + @Async("taskExecutor") + public void processNotifyMobilePosition(RequestEvent evt) { try { FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME); String deviceId = SipUtils.getUserIdFromFromHeader(fromHeader); - // 回复 200 OK Element rootElement = getRootElement(evt); if (rootElement == null) { @@ -360,4 +369,9 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements public void setRedisCatchStorage(IRedisCatchStorage redisCatchStorage) { this.redisCatchStorage = redisCatchStorage; } + + @Scheduled(fixedRate = 1000) //每1秒执行一次 + public void execute(){ + System.out.println("待处理消息数量: " + taskQueue.size()); + } } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceChannelServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceChannelServiceImpl.java index 376038123..f7666409b 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceChannelServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceChannelServiceImpl.java @@ -356,11 +356,11 @@ public class DeviceChannelServiceImpl implements IDeviceChannelService { @Override public void batchUpdateChannelGPS(List channelList) { - + channelMapper.batchUpdate(channelList); } @Override public void batchAddMobilePosition(List mobilePositions) { - + deviceMobilePositionMapper.batchadd(mobilePositions); } } diff --git a/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java b/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java index 4aa985302..ae2233672 100755 --- a/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java @@ -347,8 +347,8 @@ public interface DeviceChannelMapper { ", has_audio=#{item.hasAudio}" + ", longitude=#{item.longitude}" + ", latitude=#{item.latitude}" + - ", custom_longitude=#{item.customLongitude}" + - ", custom_latitude=#{item.customLatitude}" + + ", custom_longitude=#{item.customLongitude}" + + ", custom_latitude=#{item.customLatitude}" + ", longitude_gcj02=#{item.longitudeGcj02}" + ", latitude_gcj02=#{item.latitudeGcj02}" + ", longitude_wgs84=#{item.longitudeWgs84}" + diff --git a/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceMobilePositionMapper.java b/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceMobilePositionMapper.java index 7bf243cae..e3d898260 100755 --- a/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceMobilePositionMapper.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceMobilePositionMapper.java @@ -33,4 +33,19 @@ public interface DeviceMobilePositionMapper { @Delete("DELETE FROM wvp_device_mobile_position WHERE device_id = #{deviceId}") int clearMobilePositionsByDeviceId(String deviceId); + + @Insert("") + void batchadd(List mobilePositions); + } From 901dee2bf4c91fa92306b5d8aa66b3148658186c Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: Tue, 23 Apr 2024 10:12:00 +0800 Subject: [PATCH 2/8] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E8=BD=AC=E5=8F=91?= =?UTF-8?q?=E5=9B=BD=E6=A0=87notify-update=E6=97=B6=E4=BF=A1=E6=81=AF?= =?UTF-8?q?=E9=94=99=E8=AF=AF=E7=9A=84=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../subscribe/catalog/CatalogEventLister.java | 4 +++- .../cmd/impl/SIPCommanderFroPlatform.java | 2 +- .../event/request/SIPRequestProcessorParent.java | 2 +- .../NotifyRequestForMobilePositionProcessor.java | 15 ++++++++------- .../request/impl/NotifyRequestProcessor.java | 12 ++++++------ .../iot/vmp/service/impl/PlayServiceImpl.java | 3 ++- 6 files changed, 21 insertions(+), 17 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEventLister.java b/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEventLister.java index 557563ee9..18ad2b011 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEventLister.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEventLister.java @@ -49,6 +49,7 @@ public class CatalogEventLister implements ApplicationListener { ParentPlatform parentPlatform = null; Map> parentPlatformMap = new HashMap<>(); + Map channelMap = new HashMap<>(); if (!ObjectUtils.isEmpty(event.getPlatformId())) { subscribe = subscribeHolder.getCatalogSubscribe(event.getPlatformId()); if (subscribe == null) { @@ -67,6 +68,7 @@ public class CatalogEventLister implements ApplicationListener { for (DeviceChannel deviceChannel : event.getDeviceChannels()) { List parentPlatformsForGB = storager.queryPlatFormListForGBWithGBId(deviceChannel.getChannelId(), platforms); parentPlatformMap.put(deviceChannel.getChannelId(), parentPlatformsForGB); + channelMap.put(deviceChannel.getChannelId(), deviceChannel); } } }else if (event.getGbStreams() != null) { @@ -174,7 +176,7 @@ public class CatalogEventLister implements ApplicationListener { } logger.info("[Catalog事件: {}]平台:{},影响通道{}", event.getType(), platform.getServerGBId(), gbId); List deviceChannelList = new ArrayList<>(); - DeviceChannel deviceChannel = storager.queryChannelInParentPlatform(platform.getServerGBId(), gbId); + DeviceChannel deviceChannel = channelMap.get(gbId); deviceChannelList.add(deviceChannel); GbStream gbStream = storager.queryStreamInParentPlatform(platform.getServerGBId(), gbId); if(gbStream != null){ diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java index 2082d39e6..a9ce759fd 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java @@ -597,6 +597,7 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform { Integer finalIndex = index; String catalogXmlContent = getCatalogXmlContentForCatalogAddOrUpdate(parentPlatform, channels, deviceChannels.size(), type, subscribeInfo); + System.out.println(catalogXmlContent); logger.info("[发送NOTIFY通知]类型: {},发送数量: {}", type, channels.size()); sendNotify(parentPlatform, catalogXmlContent, subscribeInfo, eventResult -> { logger.error("发送NOTIFY通知消息失败。错误:{} {}", eventResult.statusCode, eventResult.msg); @@ -626,7 +627,6 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform { private String getCatalogXmlContentForCatalogAddOrUpdate(ParentPlatform parentPlatform, List channels, int sumNum, String type, SubscribeInfo subscribeInfo) { StringBuffer catalogXml = new StringBuffer(600); - String characterSet = parentPlatform.getCharacterSet(); catalogXml.append("\r\n") .append("\r\n") diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/SIPRequestProcessorParent.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/SIPRequestProcessorParent.java index 7cbfe70e4..f3f743172 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/SIPRequestProcessorParent.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/SIPRequestProcessorParent.java @@ -6,7 +6,6 @@ import com.genersoft.iot.vmp.gb28181.utils.SipUtils; import com.google.common.primitives.Bytes; import gov.nist.javax.sip.message.SIPRequest; import gov.nist.javax.sip.message.SIPResponse; -import org.apache.commons.lang3.ArrayUtils; import org.dom4j.Document; import org.dom4j.DocumentException; import org.dom4j.Element; @@ -172,6 +171,7 @@ public abstract class SIPRequestProcessorParent { return getRootElement(evt, "gb2312"); } public Element getRootElement(RequestEvent evt, String charset) throws DocumentException { + if (charset == null) { charset = "gb2312"; } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForMobilePositionProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForMobilePositionProcessor.java index 460a50789..1b7daf02b 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForMobilePositionProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForMobilePositionProcessor.java @@ -25,6 +25,7 @@ import org.springframework.util.ObjectUtils; import javax.sip.RequestEvent; import javax.sip.header.FromHeader; +import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -67,7 +68,7 @@ public class NotifyRequestForMobilePositionProcessor extends SIPRequestProcessor private final static String talkKey = "notify-request-for-mobile-position-task"; -// @Async("taskExecutor") + @Async("taskExecutor") public void process(RequestEvent evt) { try { FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME); @@ -172,11 +173,11 @@ public class NotifyRequestForMobilePositionProcessor extends SIPRequestProcessor deviceChannel.setGpsTime(mobilePosition.getTime()); updateChannelMap.put(deviceId + channelId, deviceChannel); addMobilePositionList.add(mobilePosition); - if(updateChannelMap.size() > 100) { + if(updateChannelMap.size() > 2000) { executeSaveChannel(); } if (userSetting.isSavePositionHistory()) { - if(addMobilePositionList.size() > 100) { + if(addMobilePositionList.size() > 2000) { executeSaveMobilePosition(); } } @@ -212,8 +213,8 @@ public class NotifyRequestForMobilePositionProcessor extends SIPRequestProcessor dynamicTask.execute(); try { logger.info("[移动位置订阅]更新通道位置: {}", updateChannelMap.size()); -// ArrayList deviceChannels = new ArrayList<>(updateChannelMap.values()); -// deviceChannelService.batchUpdateChannelGPS(deviceChannels); + ArrayList deviceChannels = new ArrayList<>(updateChannelMap.values()); + deviceChannelService.batchUpdateChannelGPS(deviceChannels); updateChannelMap.clear(); }catch (Exception e) { @@ -223,8 +224,8 @@ public class NotifyRequestForMobilePositionProcessor extends SIPRequestProcessor public void executeSaveMobilePosition(){ if (userSetting.isSavePositionHistory()) { try { -// logger.info("[移动位置订阅] 添加通道轨迹点位: {}", addMobilePositionList.size()); -// deviceChannelService.batchAddMobilePosition(addMobilePositionList); + logger.info("[移动位置订阅] 添加通道轨迹点位: {}", addMobilePositionList.size()); + deviceChannelService.batchAddMobilePosition(addMobilePositionList); addMobilePositionList.clear(); }catch (Exception e) { logger.info("[移动位置订阅] b添加通道轨迹点位保存失败: {}", addMobilePositionList.size()); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java index 2dd107a77..da348b785 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java @@ -136,9 +136,9 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements } else if (CmdType.MOBILE_POSITION.equals(cmd)) { // logger.info("接收到MobilePosition通知"); // processNotifyMobilePosition(take.getEvt()); - taskExecutor.execute(() -> { +// taskExecutor.execute(() -> { notifyRequestForMobilePositionProcessor.process(take.getEvt()); - }); +// }); } else { logger.info("接收到消息:" + cmd); @@ -226,8 +226,8 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements } else { mobilePosition.setAltitude(0.0); } - logger.info("[收到移动位置订阅通知]:{}/{}->{}.{}", mobilePosition.getDeviceId(), mobilePosition.getChannelId(), - mobilePosition.getLongitude(), mobilePosition.getLatitude()); +// logger.info("[收到移动位置订阅通知]:{}/{}->{}.{}", mobilePosition.getDeviceId(), mobilePosition.getChannelId(), +// mobilePosition.getLongitude(), mobilePosition.getLatitude()); mobilePosition.setReportSource("Mobile Position"); // 更新device channel 的经纬度 @@ -370,8 +370,8 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements this.redisCatchStorage = redisCatchStorage; } - @Scheduled(fixedRate = 1000) //每1秒执行一次 + @Scheduled(fixedRate = 10000) //每1秒执行一次 public void execute(){ - System.out.println("待处理消息数量: " + taskQueue.size()); + logger.info("[待处理Notify消息数量]: {}", taskQueue.size()); } } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java index bc0527674..26867eedf 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java @@ -149,7 +149,6 @@ public class PlayServiceImpl implements IPlayService { logger.warn("[点播] 未找到可用的zlm deviceId: {},channelId:{}", deviceId, channelId); throw new ControllerException(ErrorCode.ERROR100.getCode(), "未找到可用的zlm"); } - Device device = redisCatchStorage.getDevice(deviceId); if (device.getStreamMode().equalsIgnoreCase("TCP-ACTIVE") && !mediaServerItem.isRtpEnable()) { logger.warn("[点播] 单端口收流时不支持TCP主动方式收流 deviceId: {},channelId:{}", deviceId, channelId); @@ -163,6 +162,8 @@ public class PlayServiceImpl implements IPlayService { InviteInfo inviteInfo = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, deviceId, channelId); if (inviteInfo != null ) { if (inviteInfo.getStreamInfo() == null) { + // 释放生成的ssrc,使用上一次申请的 + ssrcFactory.releaseSsrc(mediaServerItem.getId(), ssrc); // 点播发起了但是尚未成功, 仅注册回调等待结果即可 inviteStreamService.once(InviteSessionType.PLAY, deviceId, channelId, null, callback); logger.info("[点播开始] 已经请求中,等待结果, deviceId: {}, channelId: {}", device.getDeviceId(), channelId); From 66a681d67969ae64c5cfe7140cb60a7885edef86 Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: Tue, 23 Apr 2024 20:45:12 +0800 Subject: [PATCH 3/8] =?UTF-8?q?=E4=BC=98=E5=8C=96notify=E6=B6=88=E6=81=AF?= =?UTF-8?q?=E4=B8=AD=E7=9B=AE=E5=BD=95=E5=92=8C=E7=A7=BB=E5=8A=A8=E4=BD=8D?= =?UTF-8?q?=E7=BD=AE=E7=9A=84=E5=A4=84=E7=90=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../NotifyRequestForCatalogProcessor.java | 362 +++++------------- ...tifyRequestForMobilePositionProcessor.java | 297 +++++++------- .../request/impl/NotifyRequestProcessor.java | 243 ++++++------ .../vmp/service/IDeviceChannelService.java | 5 + .../impl/DeviceChannelServiceImpl.java | 52 ++- .../vmp/storager/dao/DeviceChannelMapper.java | 17 + .../dao/DeviceMobilePositionMapper.java | 14 + .../storager/impl/RedisCatchStorageImpl.java | 2 +- 8 files changed, 442 insertions(+), 550 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForCatalogProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForCatalogProcessor.java index c80cc88f0..de93804c3 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForCatalogProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForCatalogProcessor.java @@ -1,7 +1,5 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl; -import com.genersoft.iot.vmp.conf.CivilCodeFileConf; -import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.conf.SipConfig; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.gb28181.bean.Device; @@ -20,10 +18,10 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; +import org.springframework.transaction.annotation.Transactional; import javax.sip.RequestEvent; import javax.sip.header.FromHeader; -import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -39,8 +37,6 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent private final static Logger logger = LoggerFactory.getLogger(NotifyRequestForCatalogProcessor.class); - private final List updateChannelOnlineList = new CopyOnWriteArrayList<>(); - private final List updateChannelOfflineList = new CopyOnWriteArrayList<>(); private final Map updateChannelMap = new ConcurrentHashMap<>(); private final Map addChannelMap = new ConcurrentHashMap<>(); @@ -59,275 +55,111 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent @Autowired private IDeviceChannelService deviceChannelService; - @Autowired - private DynamicTask dynamicTask; - - @Autowired - private CivilCodeFileConf civilCodeFileConf; - @Autowired private SipConfig sipConfig; - private final static String talkKey = "notify-request-for-catalog-task"; + @Transactional + public void process(List evtList) { + if (evtList.isEmpty()) { + return; + } + for (RequestEvent evt : evtList) { + try { + long start = System.currentTimeMillis(); + FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME); + String deviceId = SipUtils.getUserIdFromFromHeader(fromHeader); - public void process(RequestEvent evt) { - try { - long start = System.currentTimeMillis(); - FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME); - String deviceId = SipUtils.getUserIdFromFromHeader(fromHeader); + Device device = redisCatchStorage.getDevice(deviceId); + if (device == null || !device.isOnLine()) { + logger.warn("[收到目录订阅]:{}, 但是设备已经离线", (device != null ? device.getDeviceId():"" )); + return; + } + Element rootElement = getRootElement(evt, device.getCharset()); + if (rootElement == null) { + logger.warn("[ 收到目录订阅 ] content cannot be null, {}", evt.getRequest()); + return; + } + Element deviceListElement = rootElement.element("DeviceList"); + if (deviceListElement == null) { + return; + } + Iterator deviceListIterator = deviceListElement.elementIterator(); + if (deviceListIterator != null) { - Device device = redisCatchStorage.getDevice(deviceId); - if (device == null || !device.isOnLine()) { - logger.warn("[收到目录订阅]:{}, 但是设备已经离线", (device != null ? device.getDeviceId():"" )); - return; - } - Element rootElement = getRootElement(evt, device.getCharset()); - if (rootElement == null) { - logger.warn("[ 收到目录订阅 ] content cannot be null, {}", evt.getRequest()); - return; - } - Element deviceListElement = rootElement.element("DeviceList"); - if (deviceListElement == null) { - return; - } - Iterator deviceListIterator = deviceListElement.elementIterator(); - if (deviceListIterator != null) { - - // 遍历DeviceList - while (deviceListIterator.hasNext()) { - Element itemDevice = deviceListIterator.next(); - Element eventElement = itemDevice.element("Event"); - String event; - if (eventElement == null) { - logger.warn("[收到目录订阅]:{}, 但是Event为空, 设为默认值 ADD", (device != null ? device.getDeviceId():"" )); - event = CatalogEvent.ADD; - }else { - event = eventElement.getText().toUpperCase(); - } - DeviceChannel channel = XmlUtil.channelContentHandler(itemDevice, device, event); - if (channel == null) { - logger.info("[收到目录订阅]:但是解析失败 {}", new String(evt.getRequest().getRawContent())); - continue; - } - if (channel.getParentId() != null && channel.getParentId().equals(sipConfig.getId())) { - channel.setParentId(null); - } - channel.setDeviceId(device.getDeviceId()); - logger.info("[收到目录订阅]:{}/{}", device.getDeviceId(), channel.getChannelId()); - switch (event) { - case CatalogEvent.ON: - // 上线 - logger.info("[收到通道上线通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId()); - updateChannelOnlineList.add(channel); - if (updateChannelOnlineList.size() > 300) { - executeSaveForOnline(); - } - if (userSetting.getDeviceStatusNotify()) { - // 发送redis消息 - redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), channel.getChannelId(), true); - } - - break; - case CatalogEvent.OFF : - // 离线 - logger.info("[收到通道离线通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId()); - if (userSetting.getRefuseChannelStatusChannelFormNotify()) { - logger.info("[收到通道离线通知] 但是平台已配置拒绝此消息,来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId()); - }else { - updateChannelOfflineList.add(channel); - if (updateChannelOfflineList.size() > 300) { - executeSaveForOffline(); - } - if (userSetting.getDeviceStatusNotify()) { - // 发送redis消息 - redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), channel.getChannelId(), false); - } - } - break; - case CatalogEvent.VLOST: - // 视频丢失 - logger.info("[收到通道视频丢失通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId()); - if (userSetting.getRefuseChannelStatusChannelFormNotify()) { - logger.info("[收到通道视频丢失通知] 但是平台已配置拒绝此消息,来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId()); - }else { - updateChannelOfflineList.add(channel); - if (updateChannelOfflineList.size() > 300) { - executeSaveForOffline(); - } - if (userSetting.getDeviceStatusNotify()) { - // 发送redis消息 - redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), channel.getChannelId(), false); - } - } - break; - case CatalogEvent.DEFECT: - // 故障 - logger.info("[收到通道视频故障通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId()); - if (userSetting.getRefuseChannelStatusChannelFormNotify()) { - logger.info("[收到通道视频故障通知] 但是平台已配置拒绝此消息,来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId()); - }else { - updateChannelOfflineList.add(channel); - if (updateChannelOfflineList.size() > 300) { - executeSaveForOffline(); - } - if (userSetting.getDeviceStatusNotify()) { - // 发送redis消息 - redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), channel.getChannelId(), false); - } - } - break; - case CatalogEvent.ADD: - // 增加 - logger.info("[收到增加通道通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId()); - // 判断此通道是否存在 - DeviceChannel deviceChannel = deviceChannelService.getOne(deviceId, channel.getChannelId()); - if (deviceChannel != null) { - logger.info("[增加通道] 已存在,不发送通知只更新,设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId()); - channel.setId(deviceChannel.getId()); - updateChannelMap.put(channel.getChannelId(), channel); - if (updateChannelMap.keySet().size() > 300) { - executeSaveForUpdate(); - } - }else { - addChannelMap.put(channel.getChannelId(), channel); - if (userSetting.getDeviceStatusNotify()) { - // 发送redis消息 - redisCatchStorage.sendChannelAddOrDelete(device.getDeviceId(), channel.getChannelId(), true); - } - - if (addChannelMap.keySet().size() > 300) { - executeSaveForAdd(); - } - } - - break; - case CatalogEvent.DEL: - // 删除 - logger.info("[收到删除通道通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId()); - deleteChannelList.add(channel); - if (userSetting.getDeviceStatusNotify()) { - // 发送redis消息 - redisCatchStorage.sendChannelAddOrDelete(device.getDeviceId(), channel.getChannelId(), false); - } - if (deleteChannelList.size() > 300) { - executeSaveForDelete(); - } - break; - case CatalogEvent.UPDATE: - // 更新 - logger.info("[收到更新通道通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId()); - // 判断此通道是否存在 - DeviceChannel deviceChannelForUpdate = deviceChannelService.getOne(deviceId, channel.getChannelId()); - if (deviceChannelForUpdate != null) { - channel.setId(deviceChannelForUpdate.getId()); - channel.setUpdateTime(DateUtil.getNow()); - updateChannelMap.put(channel.getChannelId(), channel); - if (updateChannelMap.keySet().size() > 300) { - executeSaveForUpdate(); - } - }else { - addChannelMap.put(channel.getChannelId(), channel); - if (addChannelMap.keySet().size() > 300) { - executeSaveForAdd(); - } - if (userSetting.getDeviceStatusNotify()) { - // 发送redis消息 - redisCatchStorage.sendChannelAddOrDelete(device.getDeviceId(), channel.getChannelId(), true); - } - } - break; - default: - logger.warn("[ NotifyCatalog ] event not found : {}", event ); - - } - // 转发变化信息 - eventPublisher.catalogEventPublish(null, channel, event); - - if (!updateChannelMap.keySet().isEmpty() - || !addChannelMap.keySet().isEmpty() - || !updateChannelOnlineList.isEmpty() - || !updateChannelOfflineList.isEmpty() - || !deleteChannelList.isEmpty()) { - - if (!dynamicTask.contains(talkKey)) { - dynamicTask.startDelay(talkKey, this::executeSave, 1000); + // 遍历DeviceList + while (deviceListIterator.hasNext()) { + Element itemDevice = deviceListIterator.next(); + Element eventElement = itemDevice.element("Event"); + String event; + if (eventElement == null) { + logger.warn("[收到目录订阅]:{}, 但是Event为空, 设为默认值 ADD", (device != null ? device.getDeviceId():"" )); + event = CatalogEvent.ADD; + }else { + event = eventElement.getText().toUpperCase(); } + DeviceChannel channel = XmlUtil.channelContentHandler(itemDevice, device, event); + if (channel == null) { + logger.info("[收到目录订阅]:但是解析失败 {}", new String(evt.getRequest().getRawContent())); + continue; + } + if (channel.getParentId() != null && channel.getParentId().equals(sipConfig.getId())) { + channel.setParentId(null); + } + channel.setDeviceId(device.getDeviceId()); + logger.info("[收到目录订阅]:{}, {}/{}",event, device.getDeviceId(), channel.getChannelId()); + switch (event) { + case CatalogEvent.ON: + // 上线 + deviceChannelService.online(channel); + if (userSetting.getDeviceStatusNotify()) { + // 发送redis消息 + redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), channel.getChannelId(), true); + } + + break; + case CatalogEvent.OFF : + case CatalogEvent.VLOST: + case CatalogEvent.DEFECT: + // 离线 + if (userSetting.getRefuseChannelStatusChannelFormNotify()) { + logger.info("[目录订阅] 离线 但是平台已配置拒绝此消息,来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId()); + }else { + deviceChannelService.offline(channel); + if (userSetting.getDeviceStatusNotify()) { + // 发送redis消息 + redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), channel.getChannelId(), false); + } + } + break; + case CatalogEvent.DEL: + // 删除 + deviceChannelService.delete(channel); + if (userSetting.getDeviceStatusNotify()) { + // 发送redis消息 + redisCatchStorage.sendChannelAddOrDelete(device.getDeviceId(), channel.getChannelId(), false); + } + break; + case CatalogEvent.ADD: + case CatalogEvent.UPDATE: + // 更新 + channel.setUpdateTime(DateUtil.getNow()); + deviceChannelService.updateChannel(deviceId,channel); + if (userSetting.getDeviceStatusNotify()) { + // 发送redis消息 + redisCatchStorage.sendChannelAddOrDelete(device.getDeviceId(), channel.getChannelId(), true); + } + break; + default: + logger.warn("[ NotifyCatalog ] event not found : {}", event ); + + } + // 转发变化信息 + eventPublisher.catalogEventPublish(null, channel, event); } } + } catch (DocumentException e) { + logger.error("未处理的异常 ", e); } - } catch (DocumentException e) { - logger.error("未处理的异常 ", e); } } - - // TODO 同一个通道如果先发送更新再发送离线可能无法正常离线 - private void executeSave(){ - try { - executeSaveForAdd(); - } catch (Exception e) { - logger.error("[存储收到的增加通道] 异常: ", e ); - } - try { - executeSaveForOnline(); - } catch (Exception e) { - logger.error("[存储收到的通道上线] 异常: ", e ); - } - try { - executeSaveForOffline(); - } catch (Exception e) { - logger.error("[存储收到的通道离线] 异常: ", e ); - } - try { - executeSaveForUpdate(); - } catch (Exception e) { - logger.error("[存储收到的更新通道] 异常: ", e ); - } - try { - executeSaveForDelete(); - } catch (Exception e) { - logger.error("[存储收到的删除通道] 异常: ", e ); - } - - dynamicTask.stop(talkKey); - } - - private void executeSaveForUpdate(){ - if (!updateChannelMap.values().isEmpty()) { - logger.info("[存储收到的更新通道], 数量: {}", updateChannelMap.size()); - ArrayList deviceChannels = new ArrayList<>(updateChannelMap.values()); - deviceChannelService.batchUpdateChannel(deviceChannels); - updateChannelMap.clear(); - } - - } - - private void executeSaveForAdd(){ - if (!addChannelMap.values().isEmpty()) { - ArrayList deviceChannels = new ArrayList<>(addChannelMap.values()); - addChannelMap.clear(); - deviceChannelService.batchAddChannel(deviceChannels); - } - } - - private void executeSaveForDelete(){ - if (!deleteChannelList.isEmpty()) { - deviceChannelService.deleteChannels(deleteChannelList); - deleteChannelList.clear(); - } - } - - private void executeSaveForOnline(){ - if (!updateChannelOnlineList.isEmpty()) { - deviceChannelService.channelsOnline(updateChannelOnlineList); - updateChannelOnlineList.clear(); - } - } - - private void executeSaveForOffline(){ - if (!updateChannelOfflineList.isEmpty()) { - deviceChannelService.channelsOffline(updateChannelOfflineList); - updateChannelOfflineList.clear(); - } - } - } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForMobilePositionProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForMobilePositionProcessor.java index 1b7daf02b..05a5336a0 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForMobilePositionProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForMobilePositionProcessor.java @@ -1,8 +1,6 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl; -import com.genersoft.iot.vmp.conf.CivilCodeFileConf; -import com.genersoft.iot.vmp.conf.DynamicTask; -import com.genersoft.iot.vmp.conf.SipConfig; +import com.alibaba.fastjson2.JSONObject; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel; @@ -19,8 +17,8 @@ import org.dom4j.Element; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Component; +import org.springframework.transaction.annotation.Transactional; import org.springframework.util.ObjectUtils; import javax.sip.RequestEvent; @@ -29,7 +27,6 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CopyOnWriteArrayList; /** * SIP命令类型: NOTIFY请求中的移动位置请求处理 @@ -40,10 +37,6 @@ public class NotifyRequestForMobilePositionProcessor extends SIPRequestProcessor private final static Logger logger = LoggerFactory.getLogger(NotifyRequestForMobilePositionProcessor.class); - private final Map updateChannelMap = new ConcurrentHashMap<>(); - - private final List addMobilePositionList = new CopyOnWriteArrayList(); - @Autowired private UserSetting userSetting; @@ -57,180 +50,150 @@ public class NotifyRequestForMobilePositionProcessor extends SIPRequestProcessor @Autowired private IDeviceChannelService deviceChannelService; - @Autowired - private DynamicTask dynamicTask; - - @Autowired - private CivilCodeFileConf civilCodeFileConf; - - @Autowired - private SipConfig sipConfig; - - private final static String talkKey = "notify-request-for-mobile-position-task"; - - @Async("taskExecutor") - public void process(RequestEvent evt) { - try { - FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME); - String deviceId = SipUtils.getUserIdFromFromHeader(fromHeader); - long startTime = System.currentTimeMillis(); - // 回复 200 OK - Element rootElement = getRootElement(evt); - if (rootElement == null) { - logger.error("处理MobilePosition移动位置Notify时未获取到消息体,{}", evt.getRequest()); - return; - } - Device device = redisCatchStorage.getDevice(deviceId); - MobilePosition mobilePosition = new MobilePosition(); - mobilePosition.setCreateTime(DateUtil.getNow()); - List elements = rootElement.elements(); - String channelId = null; - for (Element element : elements) { - switch (element.getName()){ - case "DeviceID": - channelId = element.getStringValue(); - if (device == null) { - device = redisCatchStorage.getDevice(channelId); - if (device == null) { - // 根据通道id查询设备Id - List deviceList = deviceChannelService.getDeviceByChannelId(channelId); - if (!deviceList.isEmpty()) { - device = deviceList.get(0); - } - } - } - if (device == null) { - logger.warn("[mobilePosition移动位置Notify] 未找到通道{}所属的设备", channelId); - return; - } - mobilePosition.setDeviceId(device.getDeviceId()); - mobilePosition.setChannelId(channelId); - // 兼容设备部分设备上报是通道编号与设备编号一致的情况 - if (deviceId.equals(channelId)) { - List deviceChannels = deviceChannelService.queryChaneListByDeviceId(deviceId); - if (deviceChannels.size() == 1) { - channelId = deviceChannels.get(0).getChannelId(); - } - } - if (!ObjectUtils.isEmpty(device.getName())) { - mobilePosition.setDeviceName(device.getName()); - } - mobilePosition.setDeviceId(device.getDeviceId()); - mobilePosition.setChannelId(channelId); - continue; - case "Time": - String timeVal = element.getStringValue(); - if (ObjectUtils.isEmpty(timeVal)) { - mobilePosition.setTime(DateUtil.getNow()); - } else { - mobilePosition.setTime(SipUtils.parseTime(timeVal)); - } - continue; - case "Longitude": - mobilePosition.setLongitude(Double.parseDouble(element.getStringValue())); - continue; - case "Latitude": - mobilePosition.setLatitude(Double.parseDouble(element.getStringValue())); - continue; - case "Speed": - String speedVal = element.getStringValue(); - if (NumericUtil.isDouble(speedVal)) { - mobilePosition.setSpeed(Double.parseDouble(speedVal)); - } else { - mobilePosition.setSpeed(0.0); - } - continue; - case "Direction": - String directionVal = element.getStringValue(); - if (NumericUtil.isDouble(directionVal)) { - mobilePosition.setDirection(Double.parseDouble(directionVal)); - } else { - mobilePosition.setDirection(0.0); - } - continue; - case "Altitude": - String altitudeVal = element.getStringValue(); - if (NumericUtil.isDouble(altitudeVal)) { - mobilePosition.setAltitude(Double.parseDouble(altitudeVal)); - } else { - mobilePosition.setAltitude(0.0); - } - continue; - + @Transactional + public void process(List eventList) { + if (eventList.isEmpty()) { + return; + } + Map updateChannelMap = new ConcurrentHashMap<>(); + List addMobilePositionList = new ArrayList<>(); + for (RequestEvent evt : eventList) { + try { + FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME); + String deviceId = SipUtils.getUserIdFromFromHeader(fromHeader); + long startTime = System.currentTimeMillis(); + // 回复 200 OK + Element rootElement = getRootElement(evt); + if (rootElement == null) { + logger.error("处理MobilePosition移动位置Notify时未获取到消息体,{}", evt.getRequest()); + return; + } + Device device = redisCatchStorage.getDevice(deviceId); + if (device == null) { + logger.error("处理MobilePosition移动位置Notify时未获取到device,{}", deviceId); + return; + } + MobilePosition mobilePosition = new MobilePosition(); + mobilePosition.setDeviceId(device.getDeviceId()); + mobilePosition.setDeviceName(device.getName()); + mobilePosition.setCreateTime(DateUtil.getNow()); + List elements = rootElement.elements(); + for (Element element : elements) { + switch (element.getName()){ + case "DeviceID": + String channelId = element.getStringValue(); + if (!deviceId.equals(channelId)) { + mobilePosition.setChannelId(channelId); + } + continue; + case "Time": + String timeVal = element.getStringValue(); + if (ObjectUtils.isEmpty(timeVal)) { + mobilePosition.setTime(DateUtil.getNow()); + } else { + mobilePosition.setTime(SipUtils.parseTime(timeVal)); + } + continue; + case "Longitude": + mobilePosition.setLongitude(Double.parseDouble(element.getStringValue())); + continue; + case "Latitude": + mobilePosition.setLatitude(Double.parseDouble(element.getStringValue())); + continue; + case "Speed": + String speedVal = element.getStringValue(); + if (NumericUtil.isDouble(speedVal)) { + mobilePosition.setSpeed(Double.parseDouble(speedVal)); + } else { + mobilePosition.setSpeed(0.0); + } + continue; + case "Direction": + String directionVal = element.getStringValue(); + if (NumericUtil.isDouble(directionVal)) { + mobilePosition.setDirection(Double.parseDouble(directionVal)); + } else { + mobilePosition.setDirection(0.0); + } + continue; + case "Altitude": + String altitudeVal = element.getStringValue(); + if (NumericUtil.isDouble(altitudeVal)) { + mobilePosition.setAltitude(Double.parseDouble(altitudeVal)); + } else { + mobilePosition.setAltitude(0.0); + } + continue; + + } } - } // logger.info("[收到移动位置订阅通知]:{}/{}->{}.{}, 时间: {}", mobilePosition.getDeviceId(), mobilePosition.getChannelId(), // mobilePosition.getLongitude(), mobilePosition.getLatitude(), System.currentTimeMillis() - startTime); - mobilePosition.setReportSource("Mobile Position"); + mobilePosition.setReportSource("Mobile Position"); - // 更新device channel 的经纬度 - DeviceChannel deviceChannel = new DeviceChannel(); - deviceChannel.setDeviceId(device.getDeviceId()); - deviceChannel.setChannelId(channelId); - deviceChannel.setLongitude(mobilePosition.getLongitude()); - deviceChannel.setLatitude(mobilePosition.getLatitude()); - deviceChannel.setGpsTime(mobilePosition.getTime()); - updateChannelMap.put(deviceId + channelId, deviceChannel); - addMobilePositionList.add(mobilePosition); - if(updateChannelMap.size() > 2000) { - executeSaveChannel(); - } - if (userSetting.isSavePositionHistory()) { - if(addMobilePositionList.size() > 2000) { - executeSaveMobilePosition(); + // 更新device channel 的经纬度 + DeviceChannel deviceChannel = new DeviceChannel(); + deviceChannel.setDeviceId(device.getDeviceId()); + deviceChannel.setLongitude(mobilePosition.getLongitude()); + deviceChannel.setLatitude(mobilePosition.getLatitude()); + deviceChannel.setGpsTime(mobilePosition.getTime()); + updateChannelMap.put(deviceId + mobilePosition.getChannelId(), deviceChannel); + addMobilePositionList.add(mobilePosition); + + + // 向关联了该通道并且开启移动位置订阅的上级平台发送移动位置订阅消息 + try { + eventPublisher.mobilePositionEventPublish(mobilePosition); + }catch (Exception e) { + logger.error("[向上级转发移动位置失败] ", e); } + if (mobilePosition.getChannelId().equals(mobilePosition.getDeviceId()) || mobilePosition.getChannelId() == null) { + List channels = deviceChannelService.queryChaneListByDeviceId(mobilePosition.getDeviceId()); + channels.forEach(channel -> { + // 发送redis消息。 通知位置信息的变化 + JSONObject jsonObject = new JSONObject(); + jsonObject.put("time", DateUtil.yyyy_MM_dd_HH_mm_ssToISO8601(mobilePosition.getTime())); + jsonObject.put("serial", channel.getDeviceId()); + jsonObject.put("code", channel.getChannelId()); + jsonObject.put("longitude", mobilePosition.getLongitude()); + jsonObject.put("latitude", mobilePosition.getLatitude()); + jsonObject.put("altitude", mobilePosition.getAltitude()); + jsonObject.put("direction", mobilePosition.getDirection()); + jsonObject.put("speed", mobilePosition.getSpeed()); + redisCatchStorage.sendMobilePositionMsg(jsonObject); + }); + }else { + // 发送redis消息。 通知位置信息的变化 + JSONObject jsonObject = new JSONObject(); + jsonObject.put("time", DateUtil.yyyy_MM_dd_HH_mm_ssToISO8601(mobilePosition.getTime())); + jsonObject.put("serial", mobilePosition.getDeviceId()); + jsonObject.put("code", mobilePosition.getChannelId()); + jsonObject.put("longitude", mobilePosition.getLongitude()); + jsonObject.put("latitude", mobilePosition.getLatitude()); + jsonObject.put("altitude", mobilePosition.getAltitude()); + jsonObject.put("direction", mobilePosition.getDirection()); + jsonObject.put("speed", mobilePosition.getSpeed()); + redisCatchStorage.sendMobilePositionMsg(jsonObject); + } + } catch (DocumentException e) { + logger.error("未处理的异常 ", e); } - -// deviceChannel = deviceChannelService.updateGps(deviceChannel, device); -// -// mobilePosition.setLongitudeWgs84(deviceChannel.getLongitudeWgs84()); -// mobilePosition.setLatitudeWgs84(deviceChannel.getLatitudeWgs84()); -// mobilePosition.setLongitudeGcj02(deviceChannel.getLongitudeGcj02()); -// mobilePosition.setLatitudeGcj02(deviceChannel.getLatitudeGcj02()); - -// deviceChannelService.updateChannelGPS(device, deviceChannel, mobilePosition); - - if (!dynamicTask.contains(talkKey)) { - dynamicTask.startDelay(talkKey, this::executeSave, 3000); - } - - } catch (DocumentException e) { - logger.error("未处理的异常 ", e); } - - - } - - private void executeSave(){ - executeSaveChannel(); - executeSaveMobilePosition(); - dynamicTask.stop(talkKey); - } - - @Async("taskExecutor") - public void executeSaveChannel(){ - dynamicTask.execute(); - try { - logger.info("[移动位置订阅]更新通道位置: {}", updateChannelMap.size()); - ArrayList deviceChannels = new ArrayList<>(updateChannelMap.values()); - deviceChannelService.batchUpdateChannelGPS(deviceChannels); + if(!updateChannelMap.isEmpty()) { + List channels = new ArrayList<>(updateChannelMap.values()); + logger.info("[移动位置订阅]更新通道位置: {}", channels.size()); + deviceChannelService.batchUpdateChannelGPS(channels); updateChannelMap.clear(); - }catch (Exception e) { - } - } - @Async("taskExecutor") - public void executeSaveMobilePosition(){ - if (userSetting.isSavePositionHistory()) { + if (userSetting.isSavePositionHistory() && !addMobilePositionList.isEmpty()) { try { logger.info("[移动位置订阅] 添加通道轨迹点位: {}", addMobilePositionList.size()); deviceChannelService.batchAddMobilePosition(addMobilePositionList); - addMobilePositionList.clear(); }catch (Exception e) { logger.info("[移动位置订阅] b添加通道轨迹点位保存失败: {}", addMobilePositionList.size()); } + addMobilePositionList.clear(); } } - } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java index da348b785..8a4262420 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java @@ -37,6 +37,7 @@ import javax.sip.SipException; import javax.sip.header.FromHeader; import javax.sip.message.Response; import java.text.ParseException; +import java.util.ArrayList; import java.util.List; import java.util.concurrent.ConcurrentLinkedQueue; @@ -102,55 +103,63 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements responseAck((SIPRequest) evt.getRequest(), Response.BUSY_HERE, null, null); logger.error("[notify] 待处理消息队列已满 {},返回486 BUSY_HERE,消息不做处理", userSetting.getMaxNotifyCountQueue()); return; - }else { + } else { responseAck((SIPRequest) evt.getRequest(), Response.OK, null, null); } - }catch (SipException | InvalidArgumentException | ParseException e) { + } catch (SipException | InvalidArgumentException | ParseException e) { logger.error("未处理的异常 ", e); } boolean runed = !taskQueue.isEmpty(); taskQueue.offer(new HandlerCatchData(evt, null, null)); - if (!runed) { - taskExecutor.execute(()-> { -// logger.warn("开始处理"); - while (!taskQueue.isEmpty()) { - try { - HandlerCatchData take = taskQueue.poll(); - if (take == null) { - continue; - } - Element rootElement = getRootElement(take.getEvt()); - if (rootElement == null) { - logger.error("处理NOTIFY消息时未获取到消息体,{}", take.getEvt().getRequest()); - continue; - } - String cmd = XmlUtil.getText(rootElement, "CmdType"); - - if (CmdType.CATALOG.equals(cmd)) { - logger.info("接收到Catalog通知"); - notifyRequestForCatalogProcessor.process(take.getEvt()); - } else if (CmdType.ALARM.equals(cmd)) { - logger.info("接收到Alarm通知"); - processNotifyAlarm(take.getEvt()); - } else if (CmdType.MOBILE_POSITION.equals(cmd)) { -// logger.info("接收到MobilePosition通知"); -// processNotifyMobilePosition(take.getEvt()); -// taskExecutor.execute(() -> { - notifyRequestForMobilePositionProcessor.process(take.getEvt()); -// }); - - } else { - logger.info("接收到消息:" + cmd); - } - } catch (DocumentException e) { - logger.error("处理NOTIFY消息时错误", e); - } + } + @Scheduled(fixedRate = 200) //每200毫秒执行一次 + public void executeTaskQueue(){ + if (taskQueue.isEmpty()) { + return; + } + try { + List catalogEventList = new ArrayList<>(); + List alarmEventList = new ArrayList<>(); + List mobilePositionEventList = new ArrayList<>(); + for (HandlerCatchData take : taskQueue) { + if (take == null) { + continue; } - }); + Element rootElement = getRootElement(take.getEvt()); + if (rootElement == null) { + logger.error("处理NOTIFY消息时未获取到消息体,{}", take.getEvt().getRequest()); + continue; + } + String cmd = XmlUtil.getText(rootElement, "CmdType"); + + if (CmdType.CATALOG.equals(cmd)) { + catalogEventList.add(take.getEvt()); + } else if (CmdType.ALARM.equals(cmd)) { + alarmEventList.add(take.getEvt()); + } else if (CmdType.MOBILE_POSITION.equals(cmd)) { + mobilePositionEventList.add(take.getEvt()); + } else { + logger.info("接收到消息:" + cmd); + } + } + taskQueue.clear(); + if (!alarmEventList.isEmpty()) { + processNotifyAlarm(alarmEventList); + } + if (!catalogEventList.isEmpty()) { + notifyRequestForCatalogProcessor.process(catalogEventList); + } + if (!mobilePositionEventList.isEmpty()) { + notifyRequestForMobilePositionProcessor.process(mobilePositionEventList); + } + } catch (DocumentException e) { + logger.error("处理NOTIFY消息时错误", e); } } + + /** * 处理MobilePosition移动位置Notify * @@ -253,95 +262,97 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements /*** * 处理alarm设备报警Notify - * - * @param evt */ - private void processNotifyAlarm(RequestEvent evt) { + private void processNotifyAlarm(List evtList) { if (!sipConfig.isAlarm()) { return; } - try { - FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME); - String deviceId = SipUtils.getUserIdFromFromHeader(fromHeader); + if (!evtList.isEmpty()) { + for (RequestEvent evt : evtList) { + try { + FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME); + String deviceId = SipUtils.getUserIdFromFromHeader(fromHeader); - Element rootElement = getRootElement(evt); - if (rootElement == null) { - logger.error("处理alarm设备报警Notify时未获取到消息体{}", evt.getRequest()); - return; - } - Element deviceIdElement = rootElement.element("DeviceID"); - String channelId = deviceIdElement.getText().toString(); + Element rootElement = getRootElement(evt); + if (rootElement == null) { + logger.error("处理alarm设备报警Notify时未获取到消息体{}", evt.getRequest()); + return; + } + Element deviceIdElement = rootElement.element("DeviceID"); + String channelId = deviceIdElement.getText().toString(); - Device device = redisCatchStorage.getDevice(deviceId); - if (device == null) { - logger.warn("[ NotifyAlarm ] 未找到设备:{}", deviceId); - return; - } - rootElement = getRootElement(evt, device.getCharset()); - if (rootElement == null) { - logger.warn("[ NotifyAlarm ] content cannot be null, {}", evt.getRequest()); - return; - } - DeviceAlarm deviceAlarm = new DeviceAlarm(); - deviceAlarm.setDeviceId(deviceId); - deviceAlarm.setAlarmPriority(XmlUtil.getText(rootElement, "AlarmPriority")); - deviceAlarm.setAlarmMethod(XmlUtil.getText(rootElement, "AlarmMethod")); - String alarmTime = XmlUtil.getText(rootElement, "AlarmTime"); - if (alarmTime == null) { - logger.warn("[ NotifyAlarm ] AlarmTime cannot be null"); - return; - } - deviceAlarm.setAlarmTime(DateUtil.ISO8601Toyyyy_MM_dd_HH_mm_ss(alarmTime)); - if (XmlUtil.getText(rootElement, "AlarmDescription") == null) { - deviceAlarm.setAlarmDescription(""); - } else { - deviceAlarm.setAlarmDescription(XmlUtil.getText(rootElement, "AlarmDescription")); - } - if (NumericUtil.isDouble(XmlUtil.getText(rootElement, "Longitude"))) { - deviceAlarm.setLongitude(Double.parseDouble(XmlUtil.getText(rootElement, "Longitude"))); - } else { - deviceAlarm.setLongitude(0.00); - } - if (NumericUtil.isDouble(XmlUtil.getText(rootElement, "Latitude"))) { - deviceAlarm.setLatitude(Double.parseDouble(XmlUtil.getText(rootElement, "Latitude"))); - } else { - deviceAlarm.setLatitude(0.00); - } - logger.info("[收到Notify-Alarm]:{}/{}", device.getDeviceId(), deviceAlarm.getChannelId()); - if ("4".equals(deviceAlarm.getAlarmMethod())) { - MobilePosition mobilePosition = new MobilePosition(); - mobilePosition.setChannelId(channelId); - mobilePosition.setCreateTime(DateUtil.getNow()); - mobilePosition.setDeviceId(deviceAlarm.getDeviceId()); - mobilePosition.setTime(deviceAlarm.getAlarmTime()); - mobilePosition.setLongitude(deviceAlarm.getLongitude()); - mobilePosition.setLatitude(deviceAlarm.getLatitude()); - mobilePosition.setReportSource("GPS Alarm"); + Device device = redisCatchStorage.getDevice(deviceId); + if (device == null) { + logger.warn("[ NotifyAlarm ] 未找到设备:{}", deviceId); + return; + } + rootElement = getRootElement(evt, device.getCharset()); + if (rootElement == null) { + logger.warn("[ NotifyAlarm ] content cannot be null, {}", evt.getRequest()); + return; + } + DeviceAlarm deviceAlarm = new DeviceAlarm(); + deviceAlarm.setDeviceId(deviceId); + deviceAlarm.setAlarmPriority(XmlUtil.getText(rootElement, "AlarmPriority")); + deviceAlarm.setAlarmMethod(XmlUtil.getText(rootElement, "AlarmMethod")); + String alarmTime = XmlUtil.getText(rootElement, "AlarmTime"); + if (alarmTime == null) { + logger.warn("[ NotifyAlarm ] AlarmTime cannot be null"); + return; + } + deviceAlarm.setAlarmTime(DateUtil.ISO8601Toyyyy_MM_dd_HH_mm_ss(alarmTime)); + if (XmlUtil.getText(rootElement, "AlarmDescription") == null) { + deviceAlarm.setAlarmDescription(""); + } else { + deviceAlarm.setAlarmDescription(XmlUtil.getText(rootElement, "AlarmDescription")); + } + if (NumericUtil.isDouble(XmlUtil.getText(rootElement, "Longitude"))) { + deviceAlarm.setLongitude(Double.parseDouble(XmlUtil.getText(rootElement, "Longitude"))); + } else { + deviceAlarm.setLongitude(0.00); + } + if (NumericUtil.isDouble(XmlUtil.getText(rootElement, "Latitude"))) { + deviceAlarm.setLatitude(Double.parseDouble(XmlUtil.getText(rootElement, "Latitude"))); + } else { + deviceAlarm.setLatitude(0.00); + } + logger.info("[收到Notify-Alarm]:{}/{}", device.getDeviceId(), deviceAlarm.getChannelId()); + if ("4".equals(deviceAlarm.getAlarmMethod())) { + MobilePosition mobilePosition = new MobilePosition(); + mobilePosition.setChannelId(channelId); + mobilePosition.setCreateTime(DateUtil.getNow()); + mobilePosition.setDeviceId(deviceAlarm.getDeviceId()); + mobilePosition.setTime(deviceAlarm.getAlarmTime()); + mobilePosition.setLongitude(deviceAlarm.getLongitude()); + mobilePosition.setLatitude(deviceAlarm.getLatitude()); + mobilePosition.setReportSource("GPS Alarm"); - // 更新device channel 的经纬度 - DeviceChannel deviceChannel = new DeviceChannel(); - deviceChannel.setDeviceId(device.getDeviceId()); - deviceChannel.setChannelId(channelId); - deviceChannel.setLongitude(mobilePosition.getLongitude()); - deviceChannel.setLatitude(mobilePosition.getLatitude()); - deviceChannel.setGpsTime(mobilePosition.getTime()); + // 更新device channel 的经纬度 + DeviceChannel deviceChannel = new DeviceChannel(); + deviceChannel.setDeviceId(device.getDeviceId()); + deviceChannel.setChannelId(channelId); + deviceChannel.setLongitude(mobilePosition.getLongitude()); + deviceChannel.setLatitude(mobilePosition.getLatitude()); + deviceChannel.setGpsTime(mobilePosition.getTime()); - deviceChannel = deviceChannelService.updateGps(deviceChannel, device); + deviceChannel = deviceChannelService.updateGps(deviceChannel, device); - mobilePosition.setLongitudeWgs84(deviceChannel.getLongitudeWgs84()); - mobilePosition.setLatitudeWgs84(deviceChannel.getLatitudeWgs84()); - mobilePosition.setLongitudeGcj02(deviceChannel.getLongitudeGcj02()); - mobilePosition.setLatitudeGcj02(deviceChannel.getLatitudeGcj02()); + mobilePosition.setLongitudeWgs84(deviceChannel.getLongitudeWgs84()); + mobilePosition.setLatitudeWgs84(deviceChannel.getLatitudeWgs84()); + mobilePosition.setLongitudeGcj02(deviceChannel.getLongitudeGcj02()); + mobilePosition.setLatitudeGcj02(deviceChannel.getLatitudeGcj02()); - deviceChannelService.updateChannelGPS(device, deviceChannel, mobilePosition); + deviceChannelService.updateChannelGPS(device, deviceChannel, mobilePosition); + } + + // 回复200 OK + if (redisCatchStorage.deviceIsOnline(deviceId)) { + publisher.deviceAlarmEventPublish(deviceAlarm); + } + } catch (DocumentException e) { + logger.error("未处理的异常 ", e); + } } - - // 回复200 OK - if (redisCatchStorage.deviceIsOnline(deviceId)) { - publisher.deviceAlarmEventPublish(deviceAlarm); - } - } catch (DocumentException e) { - logger.error("未处理的异常 ", e); } } diff --git a/src/main/java/com/genersoft/iot/vmp/service/IDeviceChannelService.java b/src/main/java/com/genersoft/iot/vmp/service/IDeviceChannelService.java index 7dd04e8ab..a8cd58132 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/IDeviceChannelService.java +++ b/src/main/java/com/genersoft/iot/vmp/service/IDeviceChannelService.java @@ -102,4 +102,9 @@ public interface IDeviceChannelService { void batchAddMobilePosition(List addMobilePositionList); + void online(DeviceChannel channel); + + void offline(DeviceChannel channel); + + void delete(DeviceChannel channel); } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceChannelServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceChannelServiceImpl.java index f7666409b..67e34a024 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceChannelServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceChannelServiceImpl.java @@ -23,6 +23,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; import org.springframework.util.ObjectUtils; import java.util.ArrayList; @@ -247,11 +248,27 @@ public class DeviceChannelServiceImpl implements IDeviceChannelService { return channelMapper.batchOnline(channels); } + @Override + public void online(DeviceChannel channel) { + channelMapper.online(channel.getDeviceId(), channel.getChannelId()); + } + @Override public int channelsOffline(List channels) { return channelMapper.batchOffline(channels); } + + @Override + public void offline(DeviceChannel channel) { + channelMapper.offline(channel.getDeviceId(), channel.getChannelId()); + } + + @Override + public void delete(DeviceChannel channel) { + channelMapper.del(channel.getDeviceId(), channel.getChannelId()); + } + @Override public DeviceChannel getOne(String deviceId, String channelId){ return channelMapper.queryChannel(deviceId, channelId); @@ -355,12 +372,45 @@ public class DeviceChannelServiceImpl implements IDeviceChannelService { } @Override + @Transactional public void batchUpdateChannelGPS(List channelList) { - channelMapper.batchUpdate(channelList); + for (DeviceChannel deviceChannel : channelList) { + deviceChannel.setUpdateTime(DateUtil.getNow()); + if (deviceChannel.getGpsTime() == null) { + deviceChannel.setGpsTime(DateUtil.getNow()); + } + } + int count = 1000; + if (channelList.size() > count) { + for (int i = 0; i < channelList.size(); i+=count) { + int toIndex = i+count; + if ( i + count > channelList.size()) { + toIndex = channelList.size(); + } + List channels = channelList.subList(i, toIndex); + channelMapper.batchUpdatePosition(channels); + } + }else { + channelMapper.batchUpdatePosition(channelList); + } } @Override + @Transactional public void batchAddMobilePosition(List mobilePositions) { +// int count = 500; +// if (mobilePositions.size() > count) { +// for (int i = 0; i < mobilePositions.size(); i+=count) { +// int toIndex = i+count; +// if ( i + count > mobilePositions.size()) { +// toIndex = mobilePositions.size(); +// } +// List mobilePositionsSub = mobilePositions.subList(i, toIndex); +// deviceMobilePositionMapper.batchadd(mobilePositionsSub); +// } +// }else { +// deviceMobilePositionMapper.batchadd(mobilePositions); +// } deviceMobilePositionMapper.batchadd(mobilePositions); } } diff --git a/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java b/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java index ae2233672..10d0ee1c5 100755 --- a/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java @@ -401,6 +401,23 @@ public interface DeviceChannelMapper { " "}) int updatePosition(DeviceChannel deviceChannel); + @Update({""}) + int batchUpdatePosition(List deviceChannelList); + @Select("SELECT * FROM wvp_device_channel WHERE length(trim(stream_id)) > 0") List getAllChannelInPlay(); diff --git a/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceMobilePositionMapper.java b/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceMobilePositionMapper.java index e3d898260..c28b16ec2 100755 --- a/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceMobilePositionMapper.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceMobilePositionMapper.java @@ -46,6 +46,20 @@ public interface DeviceMobilePositionMapper { "#{item.createTime}) " + " " + "") + void batchadd2(List mobilePositions); + + @Insert("") void batchadd(List mobilePositions); } diff --git a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java index e42ea68b6..8b9cc0799 100755 --- a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java @@ -570,7 +570,7 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage { @Override public void sendMobilePositionMsg(JSONObject jsonObject) { String key = VideoManagerConstants.VM_MSG_SUBSCRIBE_MOBILE_POSITION; - logger.info("[redis发送通知] 发送 移动位置 {}: {}", key, jsonObject.toString()); +// logger.info("[redis发送通知] 发送 移动位置 {}: {}", key, jsonObject.toString()); redisTemplate.convertAndSend(key, jsonObject); } From c21d973977a9f1d00d26179de764687ddd0ec56c Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: Wed, 24 Apr 2024 14:59:41 +0800 Subject: [PATCH 4/8] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E6=94=B6=E5=88=B0catalog?= =?UTF-8?q?=E6=B6=88=E6=81=AF=E6=98=AF=E6=9B=B4=E6=96=B0=E5=AF=BC=E8=87=B4?= =?UTF-8?q?=E6=98=AF=E5=90=A6=E6=9C=89=E9=9F=B3=E9=A2=91=E7=9A=84=E8=AE=BE?= =?UTF-8?q?=E7=BD=AE=E5=A4=B1=E6=95=88=E7=9A=84BUG?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../iot/vmp/gb28181/bean/DeviceChannel.java | 4 +- .../NotifyRequestForCatalogProcessor.java | 1 + .../request/impl/NotifyRequestProcessor.java | 105 ------------------ .../vmp/media/zlm/ZLMHttpHookListener.java | 2 +- .../impl/VideoManagerStorageImpl.java | 5 +- 5 files changed, 6 insertions(+), 111 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/DeviceChannel.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/DeviceChannel.java index 8290a458b..6e230afef 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/DeviceChannel.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/DeviceChannel.java @@ -535,11 +535,11 @@ public class DeviceChannel { this.subCount = subCount; } - public boolean isHasAudio() { + public Boolean getHasAudio() { return hasAudio; } - public void setHasAudio(boolean hasAudio) { + public void setHasAudio(Boolean hasAudio) { this.hasAudio = hasAudio; } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForCatalogProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForCatalogProcessor.java index de93804c3..4d261897e 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForCatalogProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForCatalogProcessor.java @@ -143,6 +143,7 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent case CatalogEvent.UPDATE: // 更新 channel.setUpdateTime(DateUtil.getNow()); + channel.setHasAudio(null); deviceChannelService.updateChannel(deviceId,channel); if (userSetting.getDeviceStatusNotify()) { // 发送redis消息 diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java index 8a4262420..dcf823bfe 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java @@ -25,11 +25,9 @@ import org.slf4j.LoggerFactory; import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; -import org.springframework.scheduling.annotation.Async; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Component; -import org.springframework.util.ObjectUtils; import javax.sip.InvalidArgumentException; import javax.sip.RequestEvent; @@ -110,7 +108,6 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements } catch (SipException | InvalidArgumentException | ParseException e) { logger.error("未处理的异常 ", e); } - boolean runed = !taskQueue.isEmpty(); taskQueue.offer(new HandlerCatchData(evt, null, null)); } @Scheduled(fixedRate = 200) //每200毫秒执行一次 @@ -158,108 +155,6 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements } } - - - /** - * 处理MobilePosition移动位置Notify - * - * @param evt - */ - @Async("taskExecutor") - public void processNotifyMobilePosition(RequestEvent evt) { - try { - FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME); - String deviceId = SipUtils.getUserIdFromFromHeader(fromHeader); - // 回复 200 OK - Element rootElement = getRootElement(evt); - if (rootElement == null) { - logger.error("处理MobilePosition移动位置Notify时未获取到消息体,{}", evt.getRequest()); - return; - } - - MobilePosition mobilePosition = new MobilePosition(); - mobilePosition.setCreateTime(DateUtil.getNow()); - - Element deviceIdElement = rootElement.element("DeviceID"); - String channelId = deviceIdElement.getTextTrim().toString(); - Device device = redisCatchStorage.getDevice(deviceId); - - if (device == null) { - device = redisCatchStorage.getDevice(channelId); - if (device == null) { - // 根据通道id查询设备Id - List deviceList = deviceChannelService.getDeviceByChannelId(channelId); - if (deviceList.size() > 0) { - device = deviceList.get(0); - } - } - } - if (device == null) { - logger.warn("[mobilePosition移动位置Notify] 未找到通道{}所属的设备", channelId); - return; - } - // 兼容设备部分设备上报是通道编号与设备编号一致的情况 - if(deviceId.equals(channelId)) { - List deviceChannels = deviceChannelService.queryChaneListByDeviceId(deviceId); - if (deviceChannels.size() == 1) { - channelId = deviceChannels.get(0).getChannelId(); - } - } - if (!ObjectUtils.isEmpty(device.getName())) { - mobilePosition.setDeviceName(device.getName()); - } - - mobilePosition.setDeviceId(device.getDeviceId()); - mobilePosition.setChannelId(channelId); - String time = XmlUtil.getText(rootElement, "Time"); - if (ObjectUtils.isEmpty(time)){ - mobilePosition.setTime(DateUtil.getNow()); - }else { - mobilePosition.setTime(SipUtils.parseTime(time)); - } - - mobilePosition.setLongitude(Double.parseDouble(XmlUtil.getText(rootElement, "Longitude"))); - mobilePosition.setLatitude(Double.parseDouble(XmlUtil.getText(rootElement, "Latitude"))); - if (NumericUtil.isDouble(XmlUtil.getText(rootElement, "Speed"))) { - mobilePosition.setSpeed(Double.parseDouble(XmlUtil.getText(rootElement, "Speed"))); - } else { - mobilePosition.setSpeed(0.0); - } - if (NumericUtil.isDouble(XmlUtil.getText(rootElement, "Direction"))) { - mobilePosition.setDirection(Double.parseDouble(XmlUtil.getText(rootElement, "Direction"))); - } else { - mobilePosition.setDirection(0.0); - } - if (NumericUtil.isDouble(XmlUtil.getText(rootElement, "Altitude"))) { - mobilePosition.setAltitude(Double.parseDouble(XmlUtil.getText(rootElement, "Altitude"))); - } else { - mobilePosition.setAltitude(0.0); - } -// logger.info("[收到移动位置订阅通知]:{}/{}->{}.{}", mobilePosition.getDeviceId(), mobilePosition.getChannelId(), -// mobilePosition.getLongitude(), mobilePosition.getLatitude()); - mobilePosition.setReportSource("Mobile Position"); - - // 更新device channel 的经纬度 - DeviceChannel deviceChannel = new DeviceChannel(); - deviceChannel.setDeviceId(device.getDeviceId()); - deviceChannel.setChannelId(channelId); - deviceChannel.setLongitude(mobilePosition.getLongitude()); - deviceChannel.setLatitude(mobilePosition.getLatitude()); - deviceChannel.setGpsTime(mobilePosition.getTime()); -// deviceChannel = deviceChannelService.updateGps(deviceChannel, device); -// -// mobilePosition.setLongitudeWgs84(deviceChannel.getLongitudeWgs84()); -// mobilePosition.setLatitudeWgs84(deviceChannel.getLatitudeWgs84()); -// mobilePosition.setLongitudeGcj02(deviceChannel.getLongitudeGcj02()); -// mobilePosition.setLatitudeGcj02(deviceChannel.getLatitudeGcj02()); - - deviceChannelService.updateChannelGPS(device, deviceChannel, mobilePosition); - - } catch (DocumentException e) { - logger.error("未处理的异常 ", e); - } - } - /*** * 处理alarm设备报警Notify */ diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java index de989167a..f3d74777b 100755 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java @@ -289,7 +289,7 @@ public class ZLMHttpHookListener { String channelId = ssrcTransactionForAll.get(0).getChannelId(); DeviceChannel deviceChannel = storager.queryChannel(deviceId, channelId); if (deviceChannel != null) { - result.setEnable_audio(deviceChannel.isHasAudio()); + result.setEnable_audio(deviceChannel.getHasAudio()); } // 如果是录像下载就设置视频间隔十秒 if (ssrcTransactionForAll.get(0).getType() == InviteSessionType.DOWNLOAD) { diff --git a/src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStorageImpl.java b/src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStorageImpl.java index 509240f8f..b1eef31e4 100755 --- a/src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStorageImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStorageImpl.java @@ -7,7 +7,6 @@ import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.gb28181.event.EventPublisher; import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent; import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem; -import com.genersoft.iot.vmp.service.IGbStreamService; import com.genersoft.iot.vmp.service.bean.GPSMsgInfo; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage; @@ -139,7 +138,7 @@ public class VideoManagerStorageImpl implements IVideoManagerStorage { gbIdSet.add(deviceChannel.getChannelId()); if (allChannelMap.containsKey(deviceChannel.getChannelId())) { deviceChannel.setStreamId(allChannelMap.get(deviceChannel.getChannelId()).getStreamId()); - deviceChannel.setHasAudio(allChannelMap.get(deviceChannel.getChannelId()).isHasAudio()); + deviceChannel.setHasAudio(allChannelMap.get(deviceChannel.getChannelId()).getHasAudio()); if (allChannelMap.get(deviceChannel.getChannelId()).isStatus() !=deviceChannel.isStatus()){ List strings = platformChannelMapper.queryParentPlatformByChannelId(deviceChannel.getChannelId()); if (!CollectionUtils.isEmpty(strings)){ @@ -275,7 +274,7 @@ public class VideoManagerStorageImpl implements IVideoManagerStorage { deviceChannel.setUpdateTime(DateUtil.getNow()); if (allChannelMap.containsKey(deviceChannel.getChannelId())) { deviceChannel.setStreamId(allChannelMap.get(deviceChannel.getChannelId()).getStreamId()); - deviceChannel.setHasAudio(allChannelMap.get(deviceChannel.getChannelId()).isHasAudio()); + deviceChannel.setHasAudio(allChannelMap.get(deviceChannel.getChannelId()).getHasAudio()); if (allChannelMap.get(deviceChannel.getChannelId()).isStatus() !=deviceChannel.isStatus()){ List strings = platformChannelMapper.queryParentPlatformByChannelId(deviceChannel.getChannelId()); if (!CollectionUtils.isEmpty(strings)){ From 44aa37ad6eb9f979ff58a0cabc7cd73c2d4ac133 Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: Wed, 24 Apr 2024 16:12:42 +0800 Subject: [PATCH 5/8] =?UTF-8?q?=E4=BC=98=E5=8C=96notify=E6=B6=88=E6=81=AF?= =?UTF-8?q?=E4=B8=AD=E7=9B=AE=E5=BD=95=E7=9A=84=E5=A4=84=E7=90=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../NotifyRequestForCatalogProcessor.java | 176 +++++++++++++++--- 1 file changed, 154 insertions(+), 22 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForCatalogProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForCatalogProcessor.java index 4d261897e..a0ce17c09 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForCatalogProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForCatalogProcessor.java @@ -1,5 +1,6 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl; +import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.conf.SipConfig; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.gb28181.bean.Device; @@ -22,6 +23,7 @@ import org.springframework.transaction.annotation.Transactional; import javax.sip.RequestEvent; import javax.sip.header.FromHeader; +import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -37,12 +39,13 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent private final static Logger logger = LoggerFactory.getLogger(NotifyRequestForCatalogProcessor.class); + private final List updateChannelOnlineList = new CopyOnWriteArrayList<>(); + private final List updateChannelOfflineList = new CopyOnWriteArrayList<>(); private final Map updateChannelMap = new ConcurrentHashMap<>(); private final Map addChannelMap = new ConcurrentHashMap<>(); private final List deleteChannelList = new CopyOnWriteArrayList<>(); - @Autowired private UserSetting userSetting; @@ -55,6 +58,9 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent @Autowired private IDeviceChannelService deviceChannelService; + @Autowired + private DynamicTask dynamicTask; + @Autowired private SipConfig sipConfig; @@ -71,7 +77,7 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent Device device = redisCatchStorage.getDevice(deviceId); if (device == null || !device.isOnLine()) { - logger.warn("[收到目录订阅]:{}, 但是设备已经离线", (device != null ? device.getDeviceId():"" )); + logger.warn("[收到目录订阅]:{}, 但是设备已经离线", (device != null ? device.getDeviceId() : "")); return; } Element rootElement = getRootElement(evt, device.getCharset()); @@ -92,9 +98,9 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent Element eventElement = itemDevice.element("Event"); String event; if (eventElement == null) { - logger.warn("[收到目录订阅]:{}, 但是Event为空, 设为默认值 ADD", (device != null ? device.getDeviceId():"" )); + logger.warn("[收到目录订阅]:{}, 但是Event为空, 设为默认值 ADD", (device != null ? device.getDeviceId() : "")); event = CatalogEvent.ADD; - }else { + } else { event = eventElement.getText().toUpperCase(); } DeviceChannel channel = XmlUtil.channelContentHandler(itemDevice, device, event); @@ -106,61 +112,187 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent channel.setParentId(null); } channel.setDeviceId(device.getDeviceId()); - logger.info("[收到目录订阅]:{}, {}/{}",event, device.getDeviceId(), channel.getChannelId()); + logger.info("[收到目录订阅]:{}/{}", device.getDeviceId(), channel.getChannelId()); switch (event) { case CatalogEvent.ON: // 上线 - deviceChannelService.online(channel); + logger.info("[收到通道上线通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId()); + updateChannelOnlineList.add(channel); if (userSetting.getDeviceStatusNotify()) { // 发送redis消息 redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), channel.getChannelId(), true); } - break; - case CatalogEvent.OFF : - case CatalogEvent.VLOST: - case CatalogEvent.DEFECT: + case CatalogEvent.OFF: // 离线 + logger.info("[收到通道离线通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId()); if (userSetting.getRefuseChannelStatusChannelFormNotify()) { - logger.info("[目录订阅] 离线 但是平台已配置拒绝此消息,来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId()); - }else { - deviceChannelService.offline(channel); + logger.info("[收到通道离线通知] 但是平台已配置拒绝此消息,来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId()); + } else { + updateChannelOfflineList.add(channel); if (userSetting.getDeviceStatusNotify()) { // 发送redis消息 redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), channel.getChannelId(), false); } } + break; + case CatalogEvent.VLOST: + // 视频丢失 + logger.info("[收到通道视频丢失通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId()); + if (userSetting.getRefuseChannelStatusChannelFormNotify()) { + logger.info("[收到通道视频丢失通知] 但是平台已配置拒绝此消息,来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId()); + } else { + updateChannelOfflineList.add(channel); + if (userSetting.getDeviceStatusNotify()) { + // 发送redis消息 + redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), channel.getChannelId(), false); + } + } + break; + case CatalogEvent.DEFECT: + // 故障 + logger.info("[收到通道视频故障通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId()); + if (userSetting.getRefuseChannelStatusChannelFormNotify()) { + logger.info("[收到通道视频故障通知] 但是平台已配置拒绝此消息,来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId()); + } else { + updateChannelOfflineList.add(channel); + if (userSetting.getDeviceStatusNotify()) { + // 发送redis消息 + redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), channel.getChannelId(), false); + } + } + break; + case CatalogEvent.ADD: + // 增加 + logger.info("[收到增加通道通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId()); + // 判断此通道是否存在 + DeviceChannel deviceChannel = deviceChannelService.getOne(deviceId, channel.getChannelId()); + if (deviceChannel != null) { + logger.info("[增加通道] 已存在,不发送通知只更新,设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId()); + channel.setId(deviceChannel.getId()); + channel.setHasAudio(null); + updateChannelMap.put(channel.getChannelId(), channel); + } else { + addChannelMap.put(channel.getChannelId(), channel); + if (userSetting.getDeviceStatusNotify()) { + // 发送redis消息 + redisCatchStorage.sendChannelAddOrDelete(device.getDeviceId(), channel.getChannelId(), true); + } + } + break; case CatalogEvent.DEL: // 删除 - deviceChannelService.delete(channel); + logger.info("[收到删除通道通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId()); + deleteChannelList.add(channel); if (userSetting.getDeviceStatusNotify()) { // 发送redis消息 redisCatchStorage.sendChannelAddOrDelete(device.getDeviceId(), channel.getChannelId(), false); } break; - case CatalogEvent.ADD: case CatalogEvent.UPDATE: // 更新 - channel.setUpdateTime(DateUtil.getNow()); - channel.setHasAudio(null); - deviceChannelService.updateChannel(deviceId,channel); - if (userSetting.getDeviceStatusNotify()) { - // 发送redis消息 - redisCatchStorage.sendChannelAddOrDelete(device.getDeviceId(), channel.getChannelId(), true); + logger.info("[收到更新通道通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId()); + // 判断此通道是否存在 + DeviceChannel deviceChannelForUpdate = deviceChannelService.getOne(deviceId, channel.getChannelId()); + if (deviceChannelForUpdate != null) { + channel.setId(deviceChannelForUpdate.getId()); + channel.setUpdateTime(DateUtil.getNow()); + channel.setHasAudio(null); + updateChannelMap.put(channel.getChannelId(), channel); + } else { + addChannelMap.put(channel.getChannelId(), channel); + if (userSetting.getDeviceStatusNotify()) { + // 发送redis消息 + redisCatchStorage.sendChannelAddOrDelete(device.getDeviceId(), channel.getChannelId(), true); + } } break; default: - logger.warn("[ NotifyCatalog ] event not found : {}", event ); + logger.warn("[ NotifyCatalog ] event not found : {}", event); } // 转发变化信息 eventPublisher.catalogEventPublish(null, channel, event); } } + } catch (DocumentException e) { logger.error("未处理的异常 ", e); } } + if (!updateChannelMap.keySet().isEmpty() + || !addChannelMap.keySet().isEmpty() + || !updateChannelOnlineList.isEmpty() + || !updateChannelOfflineList.isEmpty() + || !deleteChannelList.isEmpty()) { + executeSave(); + } + } + + public void executeSave(){ + try { + executeSaveForAdd(); + } catch (Exception e) { + logger.error("[存储收到的增加通道] 异常: ", e ); + } + try { + executeSaveForOnline(); + } catch (Exception e) { + logger.error("[存储收到的通道上线] 异常: ", e ); + } + try { + executeSaveForOffline(); + } catch (Exception e) { + logger.error("[存储收到的通道离线] 异常: ", e ); + } + try { + executeSaveForUpdate(); + } catch (Exception e) { + logger.error("[存储收到的更新通道] 异常: ", e ); + } + try { + executeSaveForDelete(); + } catch (Exception e) { + logger.error("[存储收到的删除通道] 异常: ", e ); + } + } + + private void executeSaveForUpdate(){ + if (!updateChannelMap.values().isEmpty()) { + logger.info("[存储收到的更新通道], 数量: {}", updateChannelMap.size()); + ArrayList deviceChannels = new ArrayList<>(updateChannelMap.values()); + deviceChannelService.batchUpdateChannel(deviceChannels); + updateChannelMap.clear(); + } + } + + private void executeSaveForAdd(){ + if (!addChannelMap.values().isEmpty()) { + ArrayList deviceChannels = new ArrayList<>(addChannelMap.values()); + addChannelMap.clear(); + deviceChannelService.batchAddChannel(deviceChannels); + } + } + + private void executeSaveForDelete(){ + if (!deleteChannelList.isEmpty()) { + deviceChannelService.deleteChannels(deleteChannelList); + deleteChannelList.clear(); + } + } + + private void executeSaveForOnline(){ + if (!updateChannelOnlineList.isEmpty()) { + deviceChannelService.channelsOnline(updateChannelOnlineList); + updateChannelOnlineList.clear(); + } + } + + private void executeSaveForOffline(){ + if (!updateChannelOfflineList.isEmpty()) { + deviceChannelService.channelsOffline(updateChannelOfflineList); + updateChannelOfflineList.clear(); + } } } From 2113e8cf271e0d189d4ff9dd2d4d5dd7cba6e3ab Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: Wed, 24 Apr 2024 20:33:52 +0800 Subject: [PATCH 6/8] =?UTF-8?q?=E4=BC=98=E5=8C=96notify=E6=B6=88=E6=81=AF?= =?UTF-8?q?=E4=B8=AD=E7=9B=AE=E5=BD=95=E7=9A=84=E5=A4=84=E7=90=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../NotifyRequestForCatalogProcessor.java | 30 +- ...tifyRequestForMobilePositionProcessor.java | 31 +- .../request/impl/NotifyRequestProcessor.java | 289 ++++++------------ .../impl/DeviceChannelServiceImpl.java | 18 +- 4 files changed, 165 insertions(+), 203 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForCatalogProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForCatalogProcessor.java index a0ce17c09..6185cda4a 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForCatalogProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForCatalogProcessor.java @@ -5,6 +5,7 @@ import com.genersoft.iot.vmp.conf.SipConfig; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel; +import com.genersoft.iot.vmp.gb28181.bean.HandlerCatchData; import com.genersoft.iot.vmp.gb28181.event.EventPublisher; import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent; import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent; @@ -18,6 +19,7 @@ import org.dom4j.Element; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import org.springframework.transaction.annotation.Transactional; @@ -28,6 +30,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CopyOnWriteArrayList; /** @@ -46,6 +49,8 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent private final Map addChannelMap = new ConcurrentHashMap<>(); private final List deleteChannelList = new CopyOnWriteArrayList<>(); + private ConcurrentLinkedQueue taskQueue = new ConcurrentLinkedQueue<>(); + @Autowired private UserSetting userSetting; @@ -65,11 +70,24 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent private SipConfig sipConfig; @Transactional - public void process(List evtList) { - if (evtList.isEmpty()) { + public void process(RequestEvent evt) { + if (taskQueue.size() >= userSetting.getMaxNotifyCountQueue()) { + logger.error("[notify-目录订阅] 待处理消息队列已满 {},返回486 BUSY_HERE,消息不做处理", userSetting.getMaxNotifyCountQueue()); return; } - for (RequestEvent evt : evtList) { + taskQueue.offer(new HandlerCatchData(evt, null, null)); + } + + @Scheduled(fixedRate = 400) //每400毫秒执行一次 + public void executeTaskQueue(){ + if (taskQueue.isEmpty()) { + return; + } + for (HandlerCatchData take : taskQueue) { + if (take == null) { + continue; + } + RequestEvent evt = take.getEvt(); try { long start = System.currentTimeMillis(); FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME); @@ -221,6 +239,7 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent logger.error("未处理的异常 ", e); } } + taskQueue.clear(); if (!updateChannelMap.keySet().isEmpty() || !addChannelMap.keySet().isEmpty() || !updateChannelOnlineList.isEmpty() @@ -295,4 +314,9 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent updateChannelOfflineList.clear(); } } + + @Scheduled(fixedRate = 10000) //每1秒执行一次 + public void execute(){ + logger.info("[待处理Notify-目录订阅消息数量]: {}", taskQueue.size()); + } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForMobilePositionProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForMobilePositionProcessor.java index 05a5336a0..9f6acc96e 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForMobilePositionProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForMobilePositionProcessor.java @@ -4,6 +4,7 @@ import com.alibaba.fastjson2.JSONObject; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel; +import com.genersoft.iot.vmp.gb28181.bean.HandlerCatchData; import com.genersoft.iot.vmp.gb28181.bean.MobilePosition; import com.genersoft.iot.vmp.gb28181.event.EventPublisher; import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent; @@ -17,6 +18,7 @@ import org.dom4j.Element; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import org.springframework.transaction.annotation.Transactional; import org.springframework.util.ObjectUtils; @@ -27,6 +29,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; /** * SIP命令类型: NOTIFY请求中的移动位置请求处理 @@ -37,6 +40,7 @@ public class NotifyRequestForMobilePositionProcessor extends SIPRequestProcessor private final static Logger logger = LoggerFactory.getLogger(NotifyRequestForMobilePositionProcessor.class); + private ConcurrentLinkedQueue taskQueue = new ConcurrentLinkedQueue<>(); @Autowired private UserSetting userSetting; @@ -50,14 +54,28 @@ public class NotifyRequestForMobilePositionProcessor extends SIPRequestProcessor @Autowired private IDeviceChannelService deviceChannelService; + public void process(RequestEvent evt) { + + if (taskQueue.size() >= userSetting.getMaxNotifyCountQueue()) { + logger.error("[notify-移动位置] 待处理消息队列已满 {},返回486 BUSY_HERE,消息不做处理", userSetting.getMaxNotifyCountQueue()); + return; + } + taskQueue.offer(new HandlerCatchData(evt, null, null)); + } + + @Scheduled(fixedRate = 200) //每200毫秒执行一次 @Transactional - public void process(List eventList) { - if (eventList.isEmpty()) { + public void executeTaskQueue() { + if (taskQueue.isEmpty()) { return; } Map updateChannelMap = new ConcurrentHashMap<>(); List addMobilePositionList = new ArrayList<>(); - for (RequestEvent evt : eventList) { + for (HandlerCatchData take : taskQueue) { + if (take == null) { + continue; + } + RequestEvent evt = take.getEvt(); try { FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME); String deviceId = SipUtils.getUserIdFromFromHeader(fromHeader); @@ -180,10 +198,11 @@ public class NotifyRequestForMobilePositionProcessor extends SIPRequestProcessor logger.error("未处理的异常 ", e); } } + taskQueue.clear(); if(!updateChannelMap.isEmpty()) { List channels = new ArrayList<>(updateChannelMap.values()); logger.info("[移动位置订阅]更新通道位置: {}", channels.size()); - deviceChannelService.batchUpdateChannelGPS(channels); + deviceChannelService.batchUpdateChannel(channels); updateChannelMap.clear(); } if (userSetting.isSavePositionHistory() && !addMobilePositionList.isEmpty()) { @@ -196,4 +215,8 @@ public class NotifyRequestForMobilePositionProcessor extends SIPRequestProcessor addMobilePositionList.clear(); } } + @Scheduled(fixedRate = 10000) + public void execute(){ + logger.info("[待处理Notify-移动位置订阅消息数量]: {}", taskQueue.size()); + } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java index dcf823bfe..8da07a15c 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java @@ -1,12 +1,9 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl; import com.genersoft.iot.vmp.conf.SipConfig; -import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.gb28181.event.EventPublisher; import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver; -import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder; -import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander; import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor; import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent; import com.genersoft.iot.vmp.gb28181.utils.NumericUtil; @@ -14,9 +11,7 @@ import com.genersoft.iot.vmp.gb28181.utils.SipUtils; import com.genersoft.iot.vmp.gb28181.utils.XmlUtil; import com.genersoft.iot.vmp.service.IDeviceChannelService; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; -import com.genersoft.iot.vmp.storager.IVideoManagerStorage; import com.genersoft.iot.vmp.utils.DateUtil; -import com.genersoft.iot.vmp.utils.redis.RedisUtil; import gov.nist.javax.sip.message.SIPRequest; import org.dom4j.DocumentException; import org.dom4j.Element; @@ -24,9 +19,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Qualifier; -import org.springframework.scheduling.annotation.Scheduled; -import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Component; import javax.sip.InvalidArgumentException; @@ -35,9 +27,6 @@ import javax.sip.SipException; import javax.sip.header.FromHeader; import javax.sip.message.Response; import java.text.ParseException; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.ConcurrentLinkedQueue; /** * SIP命令类型: NOTIFY请求,这是作为上级发送订阅请求后,设备才会响应的 @@ -48,15 +37,6 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements private final static Logger logger = LoggerFactory.getLogger(NotifyRequestProcessor.class); - @Autowired - private UserSetting userSetting; - - @Autowired - private IVideoManagerStorage storager; - - @Autowired - private EventPublisher eventPublisher; - @Autowired private SipConfig sipConfig; @@ -80,14 +60,6 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements @Autowired private NotifyRequestForMobilePositionProcessor notifyRequestForMobilePositionProcessor; - private ConcurrentLinkedQueue taskQueue = new ConcurrentLinkedQueue<>(); - - @Qualifier("taskExecutor") - @Autowired - private ThreadPoolTaskExecutor taskExecutor; - - private int maxQueueCount = 30000; - @Override public void afterPropertiesSet() throws Exception { // 添加消息处理的订阅 @@ -97,187 +69,122 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements @Override public void process(RequestEvent evt) { try { - if (taskQueue.size() >= userSetting.getMaxNotifyCountQueue()) { - responseAck((SIPRequest) evt.getRequest(), Response.BUSY_HERE, null, null); - logger.error("[notify] 待处理消息队列已满 {},返回486 BUSY_HERE,消息不做处理", userSetting.getMaxNotifyCountQueue()); - return; - } else { + responseAck((SIPRequest) evt.getRequest(), Response.OK, null, null); + Element rootElement = getRootElement(evt); + if (rootElement == null) { + logger.error("处理NOTIFY消息时未获取到消息体,{}", evt.getRequest()); responseAck((SIPRequest) evt.getRequest(), Response.OK, null, null); + return; } + String cmd = XmlUtil.getText(rootElement, "CmdType"); + if (CmdType.CATALOG.equals(cmd)) { + notifyRequestForCatalogProcessor.process(evt); + } else if (CmdType.ALARM.equals(cmd)) { + processNotifyAlarm(evt); + } else if (CmdType.MOBILE_POSITION.equals(cmd)) { + notifyRequestForMobilePositionProcessor.process(evt); + } else { + logger.info("接收到消息:" + cmd); + } } catch (SipException | InvalidArgumentException | ParseException e) { logger.error("未处理的异常 ", e); - } - taskQueue.offer(new HandlerCatchData(evt, null, null)); - } - @Scheduled(fixedRate = 200) //每200毫秒执行一次 - public void executeTaskQueue(){ - if (taskQueue.isEmpty()) { - return; - } - try { - List catalogEventList = new ArrayList<>(); - List alarmEventList = new ArrayList<>(); - List mobilePositionEventList = new ArrayList<>(); - for (HandlerCatchData take : taskQueue) { - if (take == null) { - continue; - } - Element rootElement = getRootElement(take.getEvt()); - if (rootElement == null) { - logger.error("处理NOTIFY消息时未获取到消息体,{}", take.getEvt().getRequest()); - continue; - } - String cmd = XmlUtil.getText(rootElement, "CmdType"); - - if (CmdType.CATALOG.equals(cmd)) { - catalogEventList.add(take.getEvt()); - } else if (CmdType.ALARM.equals(cmd)) { - alarmEventList.add(take.getEvt()); - } else if (CmdType.MOBILE_POSITION.equals(cmd)) { - mobilePositionEventList.add(take.getEvt()); - } else { - logger.info("接收到消息:" + cmd); - } - } - taskQueue.clear(); - if (!alarmEventList.isEmpty()) { - processNotifyAlarm(alarmEventList); - } - if (!catalogEventList.isEmpty()) { - notifyRequestForCatalogProcessor.process(catalogEventList); - } - if (!mobilePositionEventList.isEmpty()) { - notifyRequestForMobilePositionProcessor.process(mobilePositionEventList); - } } catch (DocumentException e) { - logger.error("处理NOTIFY消息时错误", e); + throw new RuntimeException(e); } - } + } /*** * 处理alarm设备报警Notify */ - private void processNotifyAlarm(List evtList) { + private void processNotifyAlarm(RequestEvent evt) { if (!sipConfig.isAlarm()) { return; } - if (!evtList.isEmpty()) { - for (RequestEvent evt : evtList) { - try { - FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME); - String deviceId = SipUtils.getUserIdFromFromHeader(fromHeader); + try { + FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME); + String deviceId = SipUtils.getUserIdFromFromHeader(fromHeader); - Element rootElement = getRootElement(evt); - if (rootElement == null) { - logger.error("处理alarm设备报警Notify时未获取到消息体{}", evt.getRequest()); - return; - } - Element deviceIdElement = rootElement.element("DeviceID"); - String channelId = deviceIdElement.getText().toString(); - - Device device = redisCatchStorage.getDevice(deviceId); - if (device == null) { - logger.warn("[ NotifyAlarm ] 未找到设备:{}", deviceId); - return; - } - rootElement = getRootElement(evt, device.getCharset()); - if (rootElement == null) { - logger.warn("[ NotifyAlarm ] content cannot be null, {}", evt.getRequest()); - return; - } - DeviceAlarm deviceAlarm = new DeviceAlarm(); - deviceAlarm.setDeviceId(deviceId); - deviceAlarm.setAlarmPriority(XmlUtil.getText(rootElement, "AlarmPriority")); - deviceAlarm.setAlarmMethod(XmlUtil.getText(rootElement, "AlarmMethod")); - String alarmTime = XmlUtil.getText(rootElement, "AlarmTime"); - if (alarmTime == null) { - logger.warn("[ NotifyAlarm ] AlarmTime cannot be null"); - return; - } - deviceAlarm.setAlarmTime(DateUtil.ISO8601Toyyyy_MM_dd_HH_mm_ss(alarmTime)); - if (XmlUtil.getText(rootElement, "AlarmDescription") == null) { - deviceAlarm.setAlarmDescription(""); - } else { - deviceAlarm.setAlarmDescription(XmlUtil.getText(rootElement, "AlarmDescription")); - } - if (NumericUtil.isDouble(XmlUtil.getText(rootElement, "Longitude"))) { - deviceAlarm.setLongitude(Double.parseDouble(XmlUtil.getText(rootElement, "Longitude"))); - } else { - deviceAlarm.setLongitude(0.00); - } - if (NumericUtil.isDouble(XmlUtil.getText(rootElement, "Latitude"))) { - deviceAlarm.setLatitude(Double.parseDouble(XmlUtil.getText(rootElement, "Latitude"))); - } else { - deviceAlarm.setLatitude(0.00); - } - logger.info("[收到Notify-Alarm]:{}/{}", device.getDeviceId(), deviceAlarm.getChannelId()); - if ("4".equals(deviceAlarm.getAlarmMethod())) { - MobilePosition mobilePosition = new MobilePosition(); - mobilePosition.setChannelId(channelId); - mobilePosition.setCreateTime(DateUtil.getNow()); - mobilePosition.setDeviceId(deviceAlarm.getDeviceId()); - mobilePosition.setTime(deviceAlarm.getAlarmTime()); - mobilePosition.setLongitude(deviceAlarm.getLongitude()); - mobilePosition.setLatitude(deviceAlarm.getLatitude()); - mobilePosition.setReportSource("GPS Alarm"); - - // 更新device channel 的经纬度 - DeviceChannel deviceChannel = new DeviceChannel(); - deviceChannel.setDeviceId(device.getDeviceId()); - deviceChannel.setChannelId(channelId); - deviceChannel.setLongitude(mobilePosition.getLongitude()); - deviceChannel.setLatitude(mobilePosition.getLatitude()); - deviceChannel.setGpsTime(mobilePosition.getTime()); - - deviceChannel = deviceChannelService.updateGps(deviceChannel, device); - - mobilePosition.setLongitudeWgs84(deviceChannel.getLongitudeWgs84()); - mobilePosition.setLatitudeWgs84(deviceChannel.getLatitudeWgs84()); - mobilePosition.setLongitudeGcj02(deviceChannel.getLongitudeGcj02()); - mobilePosition.setLatitudeGcj02(deviceChannel.getLatitudeGcj02()); - - deviceChannelService.updateChannelGPS(device, deviceChannel, mobilePosition); - } - - // 回复200 OK - if (redisCatchStorage.deviceIsOnline(deviceId)) { - publisher.deviceAlarmEventPublish(deviceAlarm); - } - } catch (DocumentException e) { - logger.error("未处理的异常 ", e); - } + Element rootElement = getRootElement(evt); + if (rootElement == null) { + logger.error("处理alarm设备报警Notify时未获取到消息体{}", evt.getRequest()); + return; } + Element deviceIdElement = rootElement.element("DeviceID"); + String channelId = deviceIdElement.getText().toString(); + + Device device = redisCatchStorage.getDevice(deviceId); + if (device == null) { + logger.warn("[ NotifyAlarm ] 未找到设备:{}", deviceId); + return; + } + rootElement = getRootElement(evt, device.getCharset()); + if (rootElement == null) { + logger.warn("[ NotifyAlarm ] content cannot be null, {}", evt.getRequest()); + return; + } + DeviceAlarm deviceAlarm = new DeviceAlarm(); + deviceAlarm.setDeviceId(deviceId); + deviceAlarm.setAlarmPriority(XmlUtil.getText(rootElement, "AlarmPriority")); + deviceAlarm.setAlarmMethod(XmlUtil.getText(rootElement, "AlarmMethod")); + String alarmTime = XmlUtil.getText(rootElement, "AlarmTime"); + if (alarmTime == null) { + logger.warn("[ NotifyAlarm ] AlarmTime cannot be null"); + return; + } + deviceAlarm.setAlarmTime(DateUtil.ISO8601Toyyyy_MM_dd_HH_mm_ss(alarmTime)); + if (XmlUtil.getText(rootElement, "AlarmDescription") == null) { + deviceAlarm.setAlarmDescription(""); + } else { + deviceAlarm.setAlarmDescription(XmlUtil.getText(rootElement, "AlarmDescription")); + } + if (NumericUtil.isDouble(XmlUtil.getText(rootElement, "Longitude"))) { + deviceAlarm.setLongitude(Double.parseDouble(XmlUtil.getText(rootElement, "Longitude"))); + } else { + deviceAlarm.setLongitude(0.00); + } + if (NumericUtil.isDouble(XmlUtil.getText(rootElement, "Latitude"))) { + deviceAlarm.setLatitude(Double.parseDouble(XmlUtil.getText(rootElement, "Latitude"))); + } else { + deviceAlarm.setLatitude(0.00); + } + logger.info("[收到Notify-Alarm]:{}/{}", device.getDeviceId(), deviceAlarm.getChannelId()); + if ("4".equals(deviceAlarm.getAlarmMethod())) { + MobilePosition mobilePosition = new MobilePosition(); + mobilePosition.setChannelId(channelId); + mobilePosition.setCreateTime(DateUtil.getNow()); + mobilePosition.setDeviceId(deviceAlarm.getDeviceId()); + mobilePosition.setTime(deviceAlarm.getAlarmTime()); + mobilePosition.setLongitude(deviceAlarm.getLongitude()); + mobilePosition.setLatitude(deviceAlarm.getLatitude()); + mobilePosition.setReportSource("GPS Alarm"); + + // 更新device channel 的经纬度 + DeviceChannel deviceChannel = new DeviceChannel(); + deviceChannel.setDeviceId(device.getDeviceId()); + deviceChannel.setChannelId(channelId); + deviceChannel.setLongitude(mobilePosition.getLongitude()); + deviceChannel.setLatitude(mobilePosition.getLatitude()); + deviceChannel.setGpsTime(mobilePosition.getTime()); + + deviceChannel = deviceChannelService.updateGps(deviceChannel, device); + + mobilePosition.setLongitudeWgs84(deviceChannel.getLongitudeWgs84()); + mobilePosition.setLatitudeWgs84(deviceChannel.getLatitudeWgs84()); + mobilePosition.setLongitudeGcj02(deviceChannel.getLongitudeGcj02()); + mobilePosition.setLatitudeGcj02(deviceChannel.getLatitudeGcj02()); + + deviceChannelService.updateChannelGPS(device, deviceChannel, mobilePosition); + } + + // 回复200 OK + if (redisCatchStorage.deviceIsOnline(deviceId)) { + publisher.deviceAlarmEventPublish(deviceAlarm); + } + } catch (DocumentException e) { + logger.error("未处理的异常 ", e); } } - public void setCmder(SIPCommander cmder) { - } - public void setStorager(IVideoManagerStorage storager) { - this.storager = storager; - } - - public void setPublisher(EventPublisher publisher) { - this.publisher = publisher; - } - - public void setRedis(RedisUtil redis) { - } - - public void setDeferredResultHolder(DeferredResultHolder deferredResultHolder) { - } - - public IRedisCatchStorage getRedisCatchStorage() { - return redisCatchStorage; - } - - public void setRedisCatchStorage(IRedisCatchStorage redisCatchStorage) { - this.redisCatchStorage = redisCatchStorage; - } - - @Scheduled(fixedRate = 10000) //每1秒执行一次 - public void execute(){ - logger.info("[待处理Notify消息数量]: {}", taskQueue.size()); - } } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceChannelServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceChannelServiceImpl.java index 67e34a024..58e7b02ae 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceChannelServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceChannelServiceImpl.java @@ -275,15 +275,23 @@ public class DeviceChannelServiceImpl implements IDeviceChannelService { } @Override - public void batchUpdateChannel(List channels) { + public synchronized void batchUpdateChannel(List channels) { String now = DateUtil.getNow(); for (DeviceChannel channel : channels) { channel.setUpdateTime(now); } - channelMapper.batchUpdate(channels); - for (DeviceChannel channel : channels) { - if (channel.getParentId() != null) { - channelMapper.updateChannelSubCount(channel.getDeviceId(), channel.getParentId()); + int limitCount = 1000; + if (!channels.isEmpty()) { + if (channels.size() > limitCount) { + for (int i = 0; i < channels.size(); i += limitCount) { + int toIndex = i + limitCount; + if (i + limitCount > channels.size()) { + toIndex = channels.size(); + } + channelMapper.batchUpdate(channels.subList(i, toIndex)); + } + }else { + channelMapper.batchUpdate(channels); } } } From 3f6ef955f51ba13991d3e71d7fa031d39d06a689 Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: Fri, 26 Apr 2024 15:02:49 +0800 Subject: [PATCH 7/8] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E7=A9=BA=E6=8C=87?= =?UTF-8?q?=E9=92=88=E5=BC=82=E5=B8=B8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java | 4 +--- .../request/impl/NotifyRequestForMobilePositionProcessor.java | 2 +- .../iot/vmp/storager/impl/RedisCatchStorageImpl.java | 2 +- 3 files changed, 3 insertions(+), 5 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java index a9ce759fd..cbf431ac0 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java @@ -520,9 +520,7 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform { if (parentPlatform == null) { return; } - if (logger.isDebugEnabled()) { - logger.debug("[发送 移动位置订阅] {}/{}->{},{}", parentPlatform.getServerGBId(), gpsMsgInfo.getId(), gpsMsgInfo.getLng(), gpsMsgInfo.getLat()); - } + logger.info("[发送 移动位置订阅] {}/{}->{},{}", parentPlatform.getServerGBId(), gpsMsgInfo.getId(), gpsMsgInfo.getLng(), gpsMsgInfo.getLat()); String characterSet = parentPlatform.getCharacterSet(); StringBuffer deviceStatusXml = new StringBuffer(600); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForMobilePositionProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForMobilePositionProcessor.java index 9f6acc96e..013d95eba 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForMobilePositionProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForMobilePositionProcessor.java @@ -166,7 +166,7 @@ public class NotifyRequestForMobilePositionProcessor extends SIPRequestProcessor }catch (Exception e) { logger.error("[向上级转发移动位置失败] ", e); } - if (mobilePosition.getChannelId().equals(mobilePosition.getDeviceId()) || mobilePosition.getChannelId() == null) { + if (mobilePosition.getChannelId() == null || mobilePosition.getChannelId().equals(mobilePosition.getDeviceId()) ) { List channels = deviceChannelService.queryChaneListByDeviceId(mobilePosition.getDeviceId()); channels.forEach(channel -> { // 发送redis消息。 通知位置信息的变化 diff --git a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java index 8b9cc0799..e42ea68b6 100755 --- a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java @@ -570,7 +570,7 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage { @Override public void sendMobilePositionMsg(JSONObject jsonObject) { String key = VideoManagerConstants.VM_MSG_SUBSCRIBE_MOBILE_POSITION; -// logger.info("[redis发送通知] 发送 移动位置 {}: {}", key, jsonObject.toString()); + logger.info("[redis发送通知] 发送 移动位置 {}: {}", key, jsonObject.toString()); redisTemplate.convertAndSend(key, jsonObject); } From d4cdf5510bb581f2ba0526a0b937fd20a87957cc Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: Tue, 30 Apr 2024 14:39:53 +0800 Subject: [PATCH 8/8] =?UTF-8?q?=E4=BC=98=E5=8C=96=E6=8B=89=E6=B5=81?= =?UTF-8?q?=E4=BB=A3=E7=90=86=E8=B0=83=E7=94=A8=E6=B5=81=E7=A8=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../service/impl/StreamProxyServiceImpl.java | 20 +++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java index f0230f76d..3869f0b14 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java @@ -170,15 +170,19 @@ public class StreamProxyServiceImpl implements IStreamProxyService { callback.run(ErrorCode.ERROR100.getCode(), "保存失败", null); return; } - HookSubscribeForStreamChange hookSubscribeForStreamChange = HookSubscribeFactory.on_stream_changed(param.getApp(), param.getStream(), true, "rtsp", mediaInfo.getId()); - hookSubscribe.addSubscribe(hookSubscribeForStreamChange, (mediaServerItem, response) -> { - StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream( - mediaInfo, param.getApp(), param.getStream(), null, null); - callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo); - }); + if (param.isEnable()) { String talkKey = UUID.randomUUID().toString(); String delayTalkKey = UUID.randomUUID().toString(); + HookSubscribeForStreamChange hookSubscribeForStreamChange = HookSubscribeFactory.on_stream_changed(param.getApp(), param.getStream(), true, "rtsp", mediaInfo.getId()); + hookSubscribe.addSubscribe(hookSubscribeForStreamChange, (mediaServerItem, response) -> { + dynamicTask.stop(delayTalkKey); + hookSubscribe.removeSubscribe(hookSubscribeForStreamChange); + StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream( + mediaInfo, param.getApp(), param.getStream(), null, null); + callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo); + }); + dynamicTask.startDelay(delayTalkKey, ()->{ StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStreamWithCheck(param.getApp(), param.getStream(), mediaInfo.getId(), false); if (streamInfo != null) { @@ -324,6 +328,9 @@ public class StreamProxyServiceImpl implements IStreamProxyService { zlmresTfulUtils.closeStreams(mediaServerItem, param.getApp(), param.getStream()); } if ("ffmpeg".equalsIgnoreCase(param.getType())){ + if (param.getTimeoutMs() == 0) { + param.setTimeoutMs(15); + } result = zlmresTfulUtils.addFFmpegSource(mediaServerItem, param.getSrcUrl().trim(), param.getDstUrl(), param.getTimeoutMs() + "", param.isEnableAudio(), param.isEnableMp4(), param.getFfmpegCmdKey()); @@ -379,6 +386,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService { gbStreamMapper.del(app, stream); videoManagerStorager.deleteStreamProxy(app, stream); redisCatchStorage.removeStream(streamProxyItem.getMediaServerId(), "PULL", app, stream); + redisCatchStorage.removeStream(streamProxyItem.getMediaServerId(), "PUSH", app, stream); JSONObject jsonObject = removeStreamProxyFromZlm(streamProxyItem); if (jsonObject != null && jsonObject.getInteger("code") == 0) { logger.info("[移除代理]: 代理: {}/{}, 从zlm移除成功", app, stream);