From de427b15abc13f5a815b6f6f80ea36c2478edb6d Mon Sep 17 00:00:00 2001 From: lzh Date: Sat, 17 Jan 2026 15:54:12 +0800 Subject: [PATCH] =?UTF-8?q?feat(iot):=20add-iot-clean-order-integration?= =?UTF-8?q?=E9=98=B6=E6=AE=B5=E4=BA=8C-=E8=A7=84=E5=88=99=E5=A4=84?= =?UTF-8?q?=E7=90=86=E5=99=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../constants/CleanOrderTopics.java | 37 +++ .../event/clean/CleanOrderArriveEvent.java | 79 +++++ .../event/clean/CleanOrderAuditEvent.java | 98 ++++++ .../event/clean/CleanOrderCompleteEvent.java | 79 +++++ .../event/clean/CleanOrderCreateEvent.java | 86 +++++ .../IotDevicePropertyServiceImpl.java | 39 ++- .../CleanOrderIntegrationConfigService.java | 10 + ...leanOrderIntegrationConfigServiceImpl.java | 13 + .../detector/RssiSlidingWindowDetector.java | 151 +++++++++ .../BeaconDetectionRuleProcessor.java | 282 ++++++++++++++++ .../processor/SignalLossRuleProcessor.java | 309 ++++++++++++++++++ .../TrafficThresholdRuleProcessor.java | 187 +++++++++++ 12 files changed, 1369 insertions(+), 1 deletion(-) create mode 100644 viewsh-module-iot/viewsh-module-iot-core/src/main/java/com/viewsh/module/iot/core/integration/constants/CleanOrderTopics.java create mode 100644 viewsh-module-iot/viewsh-module-iot-core/src/main/java/com/viewsh/module/iot/core/integration/event/clean/CleanOrderArriveEvent.java create mode 100644 viewsh-module-iot/viewsh-module-iot-core/src/main/java/com/viewsh/module/iot/core/integration/event/clean/CleanOrderAuditEvent.java create mode 100644 viewsh-module-iot/viewsh-module-iot-core/src/main/java/com/viewsh/module/iot/core/integration/event/clean/CleanOrderCompleteEvent.java create mode 100644 viewsh-module-iot/viewsh-module-iot-core/src/main/java/com/viewsh/module/iot/core/integration/event/clean/CleanOrderCreateEvent.java create mode 100644 viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/service/rule/clean/detector/RssiSlidingWindowDetector.java create mode 100644 viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/service/rule/clean/processor/BeaconDetectionRuleProcessor.java create mode 100644 viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/service/rule/clean/processor/SignalLossRuleProcessor.java create mode 100644 viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/service/rule/clean/processor/TrafficThresholdRuleProcessor.java diff --git a/viewsh-module-iot/viewsh-module-iot-core/src/main/java/com/viewsh/module/iot/core/integration/constants/CleanOrderTopics.java b/viewsh-module-iot/viewsh-module-iot-core/src/main/java/com/viewsh/module/iot/core/integration/constants/CleanOrderTopics.java new file mode 100644 index 0000000..b0e53fa --- /dev/null +++ b/viewsh-module-iot/viewsh-module-iot-core/src/main/java/com/viewsh/module/iot/core/integration/constants/CleanOrderTopics.java @@ -0,0 +1,37 @@ +package com.viewsh.module.iot.core.integration.constants; + +/** + * Ops 业务事件 Topic 常量 + *

+ * 定义 IoT → Ops 的业务事件 Topic + * + * @author AI + */ +public interface CleanOrderTopics { + + /** + * 保洁工单创建事件 + */ + String ORDER_CREATE = "ops.order.create"; + + /** + * 保洁工单到岗事件 + */ + String ORDER_ARRIVE = "ops.order.arrive"; + + /** + * 保洁工单完成事件 + */ + String ORDER_COMPLETE = "ops.order.complete"; + + /** + * 保洁工单审计事件 + */ + String ORDER_AUDIT = "ops.order.audit"; + + /** + * 保洁工单确认事件(按键确认) + */ + String ORDER_CONFIRM = "ops.order.confirm"; + +} diff --git a/viewsh-module-iot/viewsh-module-iot-core/src/main/java/com/viewsh/module/iot/core/integration/event/clean/CleanOrderArriveEvent.java b/viewsh-module-iot/viewsh-module-iot-core/src/main/java/com/viewsh/module/iot/core/integration/event/clean/CleanOrderArriveEvent.java new file mode 100644 index 0000000..cc36343 --- /dev/null +++ b/viewsh-module-iot/viewsh-module-iot-core/src/main/java/com/viewsh/module/iot/core/integration/event/clean/CleanOrderArriveEvent.java @@ -0,0 +1,79 @@ +package com.viewsh.module.iot.core.integration.event.clean; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.time.LocalDateTime; +import java.util.Map; +import java.util.UUID; + +/** + * 保洁工单到岗事件 + *

+ * 当工牌检测到蓝牙信标时,IoT 模块发布此事件到 Ops 模块 + * Topic: ops.order.arrive + * + * @author AI + */ +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class CleanOrderArriveEvent { + + /** + * 事件ID(唯一标识,用于幂等性处理) + */ + @Builder.Default + private String eventId = UUID.randomUUID().toString(); + + /** + * 工单类型 + */ + private String orderType; + + /** + * 工单ID(可选,如果已知) + */ + private Long orderId; + + /** + * 设备ID(工牌) + */ + private Long deviceId; + + /** + * 设备Key + */ + private String deviceKey; + + /** + * 区域ID + */ + private Long areaId; + + /** + * 触发来源 + */ + private String triggerSource; + + /** + * 触发数据(上下文信息) + *

+ * 例如:{beaconMac: "F0:C8:60:1D:10:BB", rssi: -66, windowSnapshot: [-68, -66, -69]} + */ + private Map triggerData; + + /** + * 事件时间 + */ + @Builder.Default + private LocalDateTime eventTime = LocalDateTime.now(); + + /** + * 租户ID + */ + private Long tenantId; +} diff --git a/viewsh-module-iot/viewsh-module-iot-core/src/main/java/com/viewsh/module/iot/core/integration/event/clean/CleanOrderAuditEvent.java b/viewsh-module-iot/viewsh-module-iot-core/src/main/java/com/viewsh/module/iot/core/integration/event/clean/CleanOrderAuditEvent.java new file mode 100644 index 0000000..a153595 --- /dev/null +++ b/viewsh-module-iot/viewsh-module-iot-core/src/main/java/com/viewsh/module/iot/core/integration/event/clean/CleanOrderAuditEvent.java @@ -0,0 +1,98 @@ +package com.viewsh.module.iot.core.integration.event.clean; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.time.LocalDateTime; +import java.util.Map; +import java.util.UUID; + +/** + * 保洁工单审计事件 + *

+ * 用于记录不改变工单状态但需要记录的关键节点 + * 例如:离岗警告、无效作业拦截 + * Topic: ops.order.audit + * + * @author AI + */ +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class CleanOrderAuditEvent { + + /** + * 事件ID(唯一标识,用于幂等性处理) + */ + @Builder.Default + private String eventId = UUID.randomUUID().toString(); + + /** + * 审计类型 + *

+ * BEACON_ARRIVE_CONFIRMED - 信标到岗确认 + * LEAVE_WARNING_SENT - 离岗警告已发送 + * COMPLETE_SUPPRESSED_INVALID - 完成被抑制(作业时长不足) + * BEACON_COMPLETE_REQUESTED - 信标触发完成请求 + */ + private String auditType; + + /** + * 工单ID(可选) + */ + private Long orderId; + + /** + * 设备ID(工牌) + */ + private Long deviceId; + + /** + * 设备Key + */ + private String deviceKey; + + /** + * 区域ID + */ + private Long areaId; + + /** + * 保洁员ID(可选) + */ + private Long cleanerId; + + /** + * 事件级别 + *

+ * INFO - 信息 + * WARN - 警告 + * ERROR - 错误 + */ + @Builder.Default + private String level = "INFO"; + + /** + * 审计数据(结构化上下文) + */ + private Map data; + + /** + * 事件消息(可读描述) + */ + private String message; + + /** + * 事件时间 + */ + @Builder.Default + private LocalDateTime eventTime = LocalDateTime.now(); + + /** + * 租户ID + */ + private Long tenantId; +} diff --git a/viewsh-module-iot/viewsh-module-iot-core/src/main/java/com/viewsh/module/iot/core/integration/event/clean/CleanOrderCompleteEvent.java b/viewsh-module-iot/viewsh-module-iot-core/src/main/java/com/viewsh/module/iot/core/integration/event/clean/CleanOrderCompleteEvent.java new file mode 100644 index 0000000..8ce377d --- /dev/null +++ b/viewsh-module-iot/viewsh-module-iot-core/src/main/java/com/viewsh/module/iot/core/integration/event/clean/CleanOrderCompleteEvent.java @@ -0,0 +1,79 @@ +package com.viewsh.module.iot.core.integration.event.clean; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.time.LocalDateTime; +import java.util.Map; +import java.util.UUID; + +/** + * 保洁工单完成事件 + *

+ * 当工牌信号丢失超时时,IoT 模块发布此事件到 Ops 模块 + * Topic: ops.order.complete + * + * @author AI + */ +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class CleanOrderCompleteEvent { + + /** + * 事件ID(唯一标识,用于幂等性处理) + */ + @Builder.Default + private String eventId = UUID.randomUUID().toString(); + + /** + * 工单类型 + */ + private String orderType; + + /** + * 工单ID + */ + private Long orderId; + + /** + * 设备ID(工牌) + */ + private Long deviceId; + + /** + * 设备Key + */ + private String deviceKey; + + /** + * 区域ID + */ + private Long areaId; + + /** + * 触发来源 + */ + private String triggerSource; + + /** + * 触发数据(上下文信息) + *

+ * 例如:{durationMs: 780000, lastLossTime: 1736913000000, minValidWorkMinutes: 3} + */ + private Map triggerData; + + /** + * 事件时间 + */ + @Builder.Default + private LocalDateTime eventTime = LocalDateTime.now(); + + /** + * 租户ID + */ + private Long tenantId; +} diff --git a/viewsh-module-iot/viewsh-module-iot-core/src/main/java/com/viewsh/module/iot/core/integration/event/clean/CleanOrderCreateEvent.java b/viewsh-module-iot/viewsh-module-iot-core/src/main/java/com/viewsh/module/iot/core/integration/event/clean/CleanOrderCreateEvent.java new file mode 100644 index 0000000..20e7078 --- /dev/null +++ b/viewsh-module-iot/viewsh-module-iot-core/src/main/java/com/viewsh/module/iot/core/integration/event/clean/CleanOrderCreateEvent.java @@ -0,0 +1,86 @@ +package com.viewsh.module.iot.core.integration.event.clean; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.time.LocalDateTime; +import java.util.Map; +import java.util.UUID; + +/** + * 保洁工单创建事件 + *

+ * 当客流阈值触发时,IoT 模块发布此事件到 Ops 模块 + * Topic: ops.order.create + * + * @author AI + */ +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class CleanOrderCreateEvent { + + /** + * 事件ID(唯一标识,用于幂等性处理) + */ + @Builder.Default + private String eventId = UUID.randomUUID().toString(); + + /** + * 工单类型 + */ + private String orderType; + + /** + * 区域ID + */ + private Long areaId; + + /** + * 触发来源 + *

+ * IOT_TRAFFIC - 客流触发 + * IOT_ALERT - 告警触发 + */ + private String triggerSource; + + /** + * 触发设备ID + */ + private Long triggerDeviceId; + + /** + * 触发设备Key + */ + private String triggerDeviceKey; + + /** + * 触发数据(上下文信息) + *

+ * 例如:{actualCount: 150, threshold: 100, baseValue: 50} + */ + private Map triggerData; + + /** + * 工单优先级 + *

+ * P0 - 紧急 + * P1 - 重要 + * P2 - 普通 + */ + private String priority; + + /** + * 事件时间 + */ + @Builder.Default + private LocalDateTime eventTime = LocalDateTime.now(); + + /** + * 租户ID + */ + private Long tenantId; +} diff --git a/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/service/device/property/IotDevicePropertyServiceImpl.java b/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/service/device/property/IotDevicePropertyServiceImpl.java index b9b02e1..7412be9 100644 --- a/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/service/device/property/IotDevicePropertyServiceImpl.java +++ b/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/service/device/property/IotDevicePropertyServiceImpl.java @@ -85,6 +85,12 @@ public class IotDevicePropertyServiceImpl implements IotDevicePropertyService { @Resource private IntegrationEventPublisher integrationEventPublisher; + @Resource + private com.viewsh.module.iot.service.rule.clean.processor.TrafficThresholdRuleProcessor trafficThresholdRuleProcessor; + + @Resource + private com.viewsh.module.iot.service.rule.clean.processor.BeaconDetectionRuleProcessor beaconDetectionRuleProcessor; + // ========== 设备属性相关操作 ========== @Override @@ -171,10 +177,41 @@ public class IotDevicePropertyServiceImpl implements IotDevicePropertyService { IotDevicePropertyDO.builder().value(entry.getValue()).updateTime(message.getReportTime()).build()); deviceDataRedisDAO.putAll(device.getId(), properties2); - // 2.3 发布属性消息到 Redis Stream(供其他模块如 Ops 订阅) + // 2.3 调用规则处理器(保洁工单集成) + processRuleProcessors(device, properties); + + // 2.4 发布属性消息到 Redis Stream(供其他模块如 Ops 订阅) publishPropertyMessage(device, properties, message.getReportTime()); } + /** + * 处理规则处理器(保洁工单集成) + *

+ * 在设备属性上报处理流程中调用,检测是否满足工单创建/到岗/完成条件 + * + * @param device 设备信息 + * @param properties 属性数据 + */ + private void processRuleProcessors(IotDeviceDO device, Map properties) { + try { + // 遍历所有属性,调用规则处理器 + for (Map.Entry entry : properties.entrySet()) { + String identifier = entry.getKey(); + Object value = entry.getValue(); + + // 调用客流阈值规则处理器 + trafficThresholdRuleProcessor.processPropertyChange(device.getId(), identifier, value); + + // 调用蓝牙信标检测规则处理器 + beaconDetectionRuleProcessor.processPropertyChange(device.getId(), identifier, value); + } + } catch (Exception e) { + // 规则处理器异常不应阻塞属性上报主流程 + log.error("[processRuleProcessors] 规则处理器调用失败: deviceId={}, properties={}", + device.getId(), properties.keySet(), e); + } + } + /** * 发布设备属性消息 *

diff --git a/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/service/integration/clean/CleanOrderIntegrationConfigService.java b/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/service/integration/clean/CleanOrderIntegrationConfigService.java index 4f58291..5a42f97 100644 --- a/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/service/integration/clean/CleanOrderIntegrationConfigService.java +++ b/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/service/integration/clean/CleanOrderIntegrationConfigService.java @@ -23,6 +23,16 @@ public interface CleanOrderIntegrationConfigService { */ CleanOrderIntegrationConfig getConfigByDeviceId(Long deviceId); + /** + * 根据设备ID查询配置包装器(包含完整信息) + *

+ * 返回包含设备ID、区域ID、关联类型等完整信息的配置包装器 + * + * @param deviceId 设备ID + * @return 配置包装器,如果不存在或未启用返回 null + */ + AreaDeviceConfigWrapper getConfigWrapperByDeviceId(Long deviceId); + /** * 根据区域ID查询所有启用的配置(带缓存) *

diff --git a/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/service/integration/clean/CleanOrderIntegrationConfigServiceImpl.java b/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/service/integration/clean/CleanOrderIntegrationConfigServiceImpl.java index 2d96da9..1dde87c 100644 --- a/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/service/integration/clean/CleanOrderIntegrationConfigServiceImpl.java +++ b/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/service/integration/clean/CleanOrderIntegrationConfigServiceImpl.java @@ -107,6 +107,19 @@ public class CleanOrderIntegrationConfigServiceImpl implements CleanOrderIntegra .collect(Collectors.toList()); } + @Override + public AreaDeviceConfigWrapper getConfigWrapperByDeviceId(Long deviceId) { + log.debug("[CleanOrderConfig] 查询设备完整配置:deviceId={}", deviceId); + + OpsAreaDeviceRelationDO relation = relationMapper.selectByDeviceId(deviceId); + + if (relation == null || !relation.getEnabled()) { + return null; + } + + return wrapConfig(relation); + } + @Override public void evictCache(Long deviceId) { String cacheKey = formatDeviceKey(deviceId); diff --git a/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/service/rule/clean/detector/RssiSlidingWindowDetector.java b/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/service/rule/clean/detector/RssiSlidingWindowDetector.java new file mode 100644 index 0000000..679b4da --- /dev/null +++ b/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/service/rule/clean/detector/RssiSlidingWindowDetector.java @@ -0,0 +1,151 @@ +package com.viewsh.module.iot.service.rule.clean.detector; + +import com.viewsh.module.iot.dal.dataobject.integration.clean.BeaconPresenceConfig; +import lombok.extern.slf4j.Slf4j; + +import java.util.List; + +/** + * RSSI 滑动窗口检测器 + *

+ * 实现"强进弱出"双阈值算法,解决蓝牙信号漂移和多径效应导致的误判问题 + *

+ * 核心思想: + * 1. 进入(到岗)使用强阈值(如 -70dBm),只有信号足够强才算到达,避免路过误判 + * 2. 退出(离岗)使用弱阈值(如 -85dBm),只有信号足够弱或彻底消失才算离开,避免边缘抖动 + * 3. 引入状态粘性:一旦判定进入,需要满足更严格的退出条件才能判定离开 + * + * @author AI + */ +@Slf4j +public class RssiSlidingWindowDetector { + + /** + * 区域状态枚举 + */ + public enum AreaState { + /** + * 在区域内 + */ + IN_AREA, + /** + * 在区域外 + */ + OUT_AREA + } + + /** + * 检测结果枚举 + */ + public enum DetectionResult { + /** + * 确认到达 + */ + ARRIVE_CONFIRMED, + /** + * 确认离开 + */ + LEAVE_CONFIRMED, + /** + * 无变化 + */ + NO_CHANGE + } + + /** + * 执行检测(强进弱出算法) + * + * @param window RSSI 滑动窗口样本(从旧到新) + * @param enterConfig 进入(到岗)判定配置 - 强阈值 + * @param exitConfig 退出(离岗)判定配置 - 弱阈值 + * @param currentState 当前状态 + * @return 检测结果 + */ + public DetectionResult detect( + List window, + BeaconPresenceConfig.EnterConfig enterConfig, + BeaconPresenceConfig.ExitConfig exitConfig, + AreaState currentState) { + + if (window == null || window.isEmpty()) { + log.debug("[RssiDetector] 窗口为空,返回无变化"); + return DetectionResult.NO_CHANGE; + } + + // 1. 统计满足强阈值的样本数(用于进入判定) + long enterHits = window.stream() + .filter(rssi -> rssi >= enterConfig.getRssiThreshold()) + .count(); + + // 2. 统计满足弱阈值的样本数(用于退出判定) + // 条件:RSSI < 弱阈值 或 为缺失值(-999) + long exitHits = window.stream() + .filter(rssi -> rssi < exitConfig.getWeakRssiThreshold() || rssi == -999) + .count(); + + log.debug("[RssiDetector] 检测统计:currentState={}, windowSize={}, enterHits={}/{}, exitHits={}/{}", + currentState, window.size(), enterHits, enterConfig.getHitCount(), + exitHits, exitConfig.getHitCount()); + + // 3. 状态转换判定 + if (currentState == AreaState.OUT_AREA) { + // 当前在区域外,检测是否进入 + if (enterHits >= enterConfig.getHitCount()) { + log.info("[RssiDetector] 进入条件满足:enterHits={}, required={}", + enterHits, enterConfig.getHitCount()); + return DetectionResult.ARRIVE_CONFIRMED; + } + } else { + // 当前在区域内,检测是否离开 + if (exitHits >= exitConfig.getHitCount()) { + log.info("[RssiDetector] 离开条件满足:exitHits={}, required={}", + exitHits, exitConfig.getHitCount()); + return DetectionResult.LEAVE_CONFIRMED; + } + } + + return DetectionResult.NO_CHANGE; + } + + /** + * 从蓝牙设备列表中提取目标信标的 RSSI + * + * @param bluetoothDevices 蓝牙设备列表 + * 格式:[{"rssi":-52,"type":243,"mac":"F0:C8:60:1D:10:BB"},...] + * @param targetMac 目标信标 MAC 地址 + * @return RSSI 值,如果未找到返回 -999(缺失值) + */ + public Integer extractTargetRssi(Object bluetoothDevices, String targetMac) { + if (bluetoothDevices == null) { + return -999; + } + + try { + @SuppressWarnings("unchecked") + List> deviceList = + (List>) bluetoothDevices; + + return deviceList.stream() + .filter(device -> targetMac.equals(device.get("mac"))) + .map(device -> (Integer) device.get("rssi")) + .findFirst() + .orElse(-999); + } catch (Exception e) { + log.error("[RssiDetector] 解析蓝牙数据失败:targetMac={}", targetMac, e); + return -999; + } + } + + /** + * 判断窗口是否有效 + *

+ * 窗口大小必须 >= 配置的窗口大小 + * + * @param window 窗口样本 + * @param windowSize 配置的窗口大小 + * @return true - 有效,false - 无效 + */ + public boolean isWindowValid(List window, int windowSize) { + return window != null && window.size() >= windowSize; + } +} diff --git a/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/service/rule/clean/processor/BeaconDetectionRuleProcessor.java b/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/service/rule/clean/processor/BeaconDetectionRuleProcessor.java new file mode 100644 index 0000000..2d21d20 --- /dev/null +++ b/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/service/rule/clean/processor/BeaconDetectionRuleProcessor.java @@ -0,0 +1,282 @@ +package com.viewsh.module.iot.service.rule.clean.processor; + +import com.viewsh.module.iot.core.integration.constants.CleanOrderTopics; +import com.viewsh.module.iot.core.integration.event.clean.CleanOrderArriveEvent; +import com.viewsh.module.iot.core.integration.event.clean.CleanOrderAuditEvent; +import com.viewsh.module.iot.dal.dataobject.integration.clean.BeaconPresenceConfig; +import com.viewsh.module.iot.dal.redis.clean.BeaconArrivedTimeRedisDAO; +import com.viewsh.module.iot.dal.redis.clean.BeaconRssiWindowRedisDAO; +import com.viewsh.module.iot.dal.redis.clean.DeviceCurrentOrderRedisDAO; +import com.viewsh.module.iot.dal.redis.clean.SignalLossRedisDAO; +import com.viewsh.module.iot.service.integration.clean.CleanOrderIntegrationConfigService; +import com.viewsh.module.iot.service.rule.clean.detector.RssiSlidingWindowDetector; +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.spring.core.RocketMQTemplate; +import org.springframework.messaging.support.MessageBuilder; +import org.springframework.stereotype.Component; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * 蓝牙信标检测规则处理器 + *

+ * 监听工牌的蓝牙属性上报,基于滑动窗口算法检测保洁员到岗/离岗 + * 采用"强进弱出"双阈值,避免信号抖动 + * + * @author AI + */ +@Component +@Slf4j +public class BeaconDetectionRuleProcessor { + + @Resource + private BeaconRssiWindowRedisDAO windowRedisDAO; + + @Resource + private BeaconArrivedTimeRedisDAO arrivedTimeRedisDAO; + + @Resource + private SignalLossRedisDAO signalLossRedisDAO; + + @Resource + private DeviceCurrentOrderRedisDAO deviceCurrentOrderRedisDAO; + + @Resource + private CleanOrderIntegrationConfigService configService; + + @Resource + private RssiSlidingWindowDetector detector; + + @Resource + private RocketMQTemplate rocketMQTemplate; + + /** + * 处理蓝牙属性上报 + *

+ * 在设备属性上报处理流程中调用此方法 + * + * @param deviceId 设备ID + * @param identifier 属性标识符(bluetoothDevices) + * @param propertyValue 属性值(蓝牙设备数组) + */ + public void processPropertyChange(Long deviceId, String identifier, Object propertyValue) { + // 1. 检查是否是蓝牙属性 + if (!"bluetoothDevices".equals(identifier)) { + return; + } + + log.debug("[BeaconDetection] 收到蓝牙属性:deviceId={}", deviceId); + + // 2. 获取配置 + CleanOrderIntegrationConfigService.AreaDeviceConfigWrapper configWrapper = + configService.getConfigWrapperByDeviceId(deviceId); + + if (configWrapper == null || configWrapper.getConfig() == null) { + log.debug("[BeaconDetection] 设备无配置:deviceId={}", deviceId); + return; + } + + BeaconPresenceConfig beaconConfig = configWrapper.getConfig().getBeaconPresence(); + if (beaconConfig == null || !beaconConfig.getEnabled()) { + log.debug("[BeaconDetection] 未启用信标检测:deviceId={}", deviceId); + return; + } + + Long areaId = configWrapper.getAreaId(); + + // 3. 解析蓝牙数据,提取目标信标的 RSSI + Integer targetRssi = detector.extractTargetRssi(propertyValue, beaconConfig.getBeaconMac()); + + log.debug("[BeaconDetection] 提取RSSI:deviceId={}, beaconMac={}, rssi={}", + deviceId, beaconConfig.getBeaconMac(), targetRssi); + + // 4. 更新滑动窗口 + BeaconPresenceConfig.WindowConfig windowConfig = beaconConfig.getWindow(); + windowRedisDAO.updateWindow(deviceId, areaId, targetRssi, windowConfig.getSampleTtlSeconds()); + + // 5. 获取当前窗口样本 + List window = windowRedisDAO.getWindow(deviceId, areaId); + + // 6. 获取设备当前工单状态 + DeviceCurrentOrderRedisDAO.OrderCacheInfo currentOrder = + deviceCurrentOrderRedisDAO.getCurrentOrder(deviceId); + + // 7. 确定当前状态 + RssiSlidingWindowDetector.AreaState currentState = determineState(currentOrder, areaId); + + // 8. 执行检测 + RssiSlidingWindowDetector.DetectionResult result = detector.detect( + window, + beaconConfig.getEnter(), + beaconConfig.getExit(), + currentState + ); + + // 9. 处理检测结果 + switch (result) { + case ARRIVE_CONFIRMED: + handleArriveConfirmed(deviceId, areaId, window, beaconConfig, configWrapper); + break; + case LEAVE_CONFIRMED: + handleLeaveConfirmed(deviceId, areaId, window, beaconConfig); + break; + default: + // NO_CHANGE,不做处理 + break; + } + } + + /** + * 处理到达确认 + */ + private void handleArriveConfirmed(Long deviceId, Long areaId, List window, + BeaconPresenceConfig beaconConfig, + CleanOrderIntegrationConfigService.AreaDeviceConfigWrapper configWrapper) { + log.info("[BeaconDetection] 到达确认:deviceId={}, areaId={}, window={}", + deviceId, areaId, window); + + // 1. 记录到达时间 + arrivedTimeRedisDAO.recordArrivedTime(deviceId, areaId, System.currentTimeMillis()); + + // 2. 清除离岗记录(如果存在) + signalLossRedisDAO.clearLossRecord(deviceId, areaId); + + // 3. 获取当前最新的 RSSI 值 + Integer currentRssi = window.isEmpty() ? -999 : window.get(window.size() - 1); + + // 4. 构建触发数据 + Map triggerData = new HashMap<>(); + triggerData.put("beaconMac", beaconConfig.getBeaconMac()); + triggerData.put("rssi", currentRssi); + triggerData.put("windowSnapshot", window); + triggerData.put("enterRssiThreshold", beaconConfig.getEnter().getRssiThreshold()); + + // 5. 发布到岗事件 + if (beaconConfig.getEnter().getAutoArrival()) { + publishArriveEvent(deviceId, configWrapper.getDeviceKey(), areaId, triggerData); + } + + // 6. 发布审计日志 + publishAuditEvent("BEACON_ARRIVE_CONFIRMED", deviceId, configWrapper.getDeviceKey(), areaId, + "蓝牙信标自动到岗确认", triggerData); + } + + /** + * 处理离开确认 + */ + private void handleLeaveConfirmed(Long deviceId, Long areaId, List window, + BeaconPresenceConfig beaconConfig) { + log.info("[BeaconDetection] 离开确认:deviceId={}, areaId={}, window={}", + deviceId, areaId, window); + + BeaconPresenceConfig.ExitConfig exitConfig = beaconConfig.getExit(); + + // 1. 检查是否是首次丢失 + Long firstLossTime = signalLossRedisDAO.getFirstLossTime(deviceId, areaId); + + if (firstLossTime == null) { + // 首次丢失 + signalLossRedisDAO.recordFirstLoss(deviceId, areaId, System.currentTimeMillis()); + + // 2. 发送警告 + publishTtsEvent(deviceId, "你已离开当前区域," + + (exitConfig.getLossTimeoutMinutes() > 0 ? + exitConfig.getLossTimeoutMinutes() + "分钟内工单将自动结算" : "工单将自动结算")); + + // 3. 发布审计日志 + Map data = new HashMap<>(); + data.put("firstLossTime", System.currentTimeMillis()); + data.put("rssi", window.isEmpty() ? -999 : window.get(window.size() - 1)); + data.put("warningDelayMinutes", exitConfig.getWarningDelayMinutes()); + + publishAuditEvent("BEACON_LEAVE_WARNING_SENT", deviceId, null, areaId, + "保洁员离开作业区域,已发送警告", data); + } else { + // 4. 更新最后丢失时间 + signalLossRedisDAO.updateLastLossTime(deviceId, areaId, System.currentTimeMillis()); + + log.debug("[BeaconDetection] 更新最后丢失时间:deviceId={}, areaId={}", deviceId, areaId); + } + } + + /** + * 发布到岗事件 + */ + private void publishArriveEvent(Long deviceId, String deviceKey, Long areaId, Map triggerData) { + try { + CleanOrderArriveEvent event = CleanOrderArriveEvent.builder() + .eventId(java.util.UUID.randomUUID().toString()) + .orderType("CLEAN") + .deviceId(deviceId) + .deviceKey(deviceKey) + .areaId(areaId) + .triggerSource("IOT_BEACON") + .triggerData(triggerData) + .build(); + + rocketMQTemplate.syncSend(CleanOrderTopics.ORDER_ARRIVE, MessageBuilder.withPayload(event).build()); + + log.info("[BeaconDetection] 发布到岗事件:eventId={}, deviceId={}, areaId={}", + event.getEventId(), deviceId, areaId); + } catch (Exception e) { + log.error("[BeaconDetection] 发布到岗事件失败:deviceId={}, areaId={}", deviceId, areaId, e); + } + } + + /** + * 发布审计事件 + */ + private void publishAuditEvent(String auditType, Long deviceId, String deviceKey, + Long areaId, String message, Map data) { + try { + CleanOrderAuditEvent event = CleanOrderAuditEvent.builder() + .eventId(java.util.UUID.randomUUID().toString()) + .auditType(auditType) + .deviceId(deviceId) + .deviceKey(deviceKey) + .areaId(areaId) + .message(message) + .data(data) + .build(); + + rocketMQTemplate.syncSend(CleanOrderTopics.ORDER_AUDIT, MessageBuilder.withPayload(event).build()); + + log.debug("[BeaconDetection] 发布审计事件:auditType={}, deviceId={}, areaId={}", + auditType, deviceId, areaId); + } catch (Exception e) { + log.error("[BeaconDetection] 发布审计事件失败:auditType={}, deviceId={}", auditType, deviceId, e); + } + } + + /** + * 发布 TTS 事件(通过审计事件传递) + */ + private void publishTtsEvent(Long deviceId, String text) { + Map data = new HashMap<>(); + data.put("tts", text); + data.put("timestamp", System.currentTimeMillis()); + + publishAuditEvent("TTS_REQUEST", deviceId, null, null, text, data); + } + + /** + * 确定当前状态 + */ + private RssiSlidingWindowDetector.AreaState determineState( + DeviceCurrentOrderRedisDAO.OrderCacheInfo currentOrder, Long areaId) { + + if (currentOrder == null) { + return RssiSlidingWindowDetector.AreaState.OUT_AREA; + } + + // 检查工单状态和区域是否匹配 + if ("ARRIVED".equals(currentOrder.getStatus()) && areaId.equals(currentOrder.getAreaId())) { + return RssiSlidingWindowDetector.AreaState.IN_AREA; + } + + return RssiSlidingWindowDetector.AreaState.OUT_AREA; + } +} diff --git a/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/service/rule/clean/processor/SignalLossRuleProcessor.java b/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/service/rule/clean/processor/SignalLossRuleProcessor.java new file mode 100644 index 0000000..4fb076d --- /dev/null +++ b/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/service/rule/clean/processor/SignalLossRuleProcessor.java @@ -0,0 +1,309 @@ +package com.viewsh.module.iot.service.rule.clean.processor; + +import com.viewsh.module.iot.core.integration.constants.CleanOrderTopics; +import com.viewsh.module.iot.core.integration.event.clean.CleanOrderAuditEvent; +import com.viewsh.module.iot.core.integration.event.clean.CleanOrderCompleteEvent; +import com.viewsh.module.iot.dal.dataobject.integration.clean.BeaconPresenceConfig; +import com.viewsh.module.iot.dal.redis.clean.BeaconArrivedTimeRedisDAO; +import com.viewsh.module.iot.dal.redis.clean.BeaconRssiWindowRedisDAO; +import com.viewsh.module.iot.dal.redis.clean.DeviceCurrentOrderRedisDAO; +import com.viewsh.module.iot.dal.redis.clean.SignalLossRedisDAO; +import com.viewsh.module.iot.service.integration.clean.CleanOrderIntegrationConfigService; +import com.xxl.job.core.handler.annotation.XxlJob; +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.spring.core.RocketMQTemplate; +import org.springframework.messaging.support.MessageBuilder; +import org.springframework.stereotype.Component; + +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import org.springframework.data.redis.core.StringRedisTemplate; + +/** + * 信号丢失规则处理器 + *

+ * 定时检查离岗后N分钟则触发工单自动完成 + * 包含作业时长有效性校验,防止"打卡即走"作弊 + * + * @author AI + */ +@Component +@Slf4j +public class SignalLossRuleProcessor { + + @Resource + private SignalLossRedisDAO signalLossRedisDAO; + + @Resource + private BeaconArrivedTimeRedisDAO arrivedTimeRedisDAO; + + @Resource + private BeaconRssiWindowRedisDAO windowRedisDAO; + + @Resource + private DeviceCurrentOrderRedisDAO deviceCurrentOrderRedisDAO; + + @Resource + private CleanOrderIntegrationConfigService configService; + + @Resource + private RocketMQTemplate rocketMQTemplate; + + @Resource + private StringRedisTemplate stringRedisTemplate; + + /** + * Redis Key 模式:扫描所有离岗记录 + */ + private static final String LOSS_KEY_PATTERN = "iot:clean:signal:loss:*"; + + /** + * 定时检查离岗超时(每 30 秒执行一次) + *

+ * 遍历所有离岗记录,检查是否超过超时时间 + * 如果超过,则触发工单完成 + */ + @XxlJob("signalLossCheckJob") + public String checkLossTimeout() { + try { + log.debug("[SignalLoss] 开始检查离岗超时"); + + // 1. 扫描所有离岗记录的 Key + Set keys = stringRedisTemplate.keys(LOSS_KEY_PATTERN); + + if (keys == null || keys.isEmpty()) { + return "暂无"; + } + + log.debug("[SignalLoss] 发现 {} 条离岗记录", keys.size()); + + // 2. 遍历每条记录 + for (String key : keys) { + try { + // 解析 deviceId 和 areaId + // Key 格式:iot:clean:signal:loss:{deviceId}:{areaId} + String[] parts = key.split(":"); + if (parts.length < 5) { + continue; + } + + Long deviceId = Long.parseLong(parts[3]); + Long areaId = Long.parseLong(parts[4]); + + // 检查超时 + checkTimeoutForDevice(deviceId, areaId); + + } catch (Exception e) { + log.error("[SignalLoss] 处理离岗记录失败:key={}", key, e); + } + } + + } catch (Exception e) { + log.error("[SignalLoss] 检查离岗超时失败", e); + return "ERROR: " + e.getMessage(); + } + + return "SUCCESS"; + } + + /** + * 检查单个设备的离岗超时 + */ + private void checkTimeoutForDevice(Long deviceId, Long areaId) { + // 1. 获取配置 + CleanOrderIntegrationConfigService.AreaDeviceConfigWrapper configWrapper = + configService.getConfigWrapperByDeviceId(deviceId); + + if (configWrapper == null || configWrapper.getConfig() == null || + configWrapper.getConfig().getBeaconPresence() == null) { + log.debug("[SignalLoss] 设备无信标配置:deviceId={}", deviceId); + return; + } + + BeaconPresenceConfig.ExitConfig exitConfig = configWrapper.getConfig().getBeaconPresence().getExit(); + + // 2. 获取首次丢失时间 + Long firstLossTime = signalLossRedisDAO.getFirstLossTime(deviceId, areaId); + + if (firstLossTime == null) { + return; + } + + // 3. 获取最后丢失时间 + Long lastLossTime = signalLossRedisDAO.getLastLossTime(deviceId, areaId); + + if (lastLossTime == null) { + return; + } + + // 4. 检查是否超时 + long timeoutMillis = exitConfig.getLossTimeoutMinutes() * 60000L; + long elapsedMillis = System.currentTimeMillis() - firstLossTime; + + if (elapsedMillis < timeoutMillis) { + log.debug("[SignalLoss] 未超时:deviceId={}, elapsed={}ms, timeout={}ms", + deviceId, elapsedMillis, timeoutMillis); + return; + } + + log.info("[SignalLoss] 检测到离岗超时:deviceId={}, areaId={}, elapsed={}ms", + deviceId, areaId, elapsedMillis); + + // 5. 有效性校验:检查作业时长 + Long arrivedAt = arrivedTimeRedisDAO.getArrivedTime(deviceId, areaId); + + if (arrivedAt == null) { + log.warn("[SignalLoss] 未找到到达时间记录:deviceId={}, areaId={}", deviceId, areaId); + signalLossRedisDAO.clearLossRecord(deviceId, areaId); + return; + } + + long durationMs = lastLossTime - arrivedAt; + long minValidWorkMillis = exitConfig.getMinValidWorkMinutes() * 60000L; + + // 6. 分支处理:有效 vs 无效作业 + if (durationMs < minValidWorkMillis) { + // 作业时长不足,抑制完成 + handleInvalidWork(deviceId, configWrapper.getDeviceKey(), areaId, + durationMs, minValidWorkMillis, exitConfig); + } else { + // 作业时长有效,触发完成 + handleTimeoutComplete(deviceId, configWrapper.getDeviceKey(), areaId, + durationMs, lastLossTime); + } + } + + /** + * 处理无效作业(时长不足) + */ + private void handleInvalidWork(Long deviceId, String deviceKey, Long areaId, + Long durationMs, Long minValidWorkMillis, + BeaconPresenceConfig.ExitConfig exitConfig) { + log.warn("[SignalLoss] 作业时长不足,抑制自动完成:deviceId={}, duration={}ms, minRequired={}ms", + deviceId, durationMs, minValidWorkMillis); + + // 1. 发送 TTS 警告 + publishTtsEvent(deviceId, "工单作业时长异常,请回到作业区域继续完成"); + + // 2. 发布审计日志 + Map data = new HashMap<>(); + data.put("durationMs", durationMs); + data.put("minValidWorkMinutes", exitConfig.getMinValidWorkMinutes()); + data.put("shortageMs", minValidWorkMillis - durationMs); + + publishAuditEvent("COMPLETE_SUPPRESSED_INVALID", deviceId, deviceKey, areaId, + "作业时长不足,抑制自动完成", data); + + // 3. 清除丢失记录(允许重新进入) + signalLossRedisDAO.clearLossRecord(deviceId, areaId); + + // 4. 清除无效作业标记(允许下次警告) + signalLossRedisDAO.markInvalidWorkNotified(deviceId, areaId); + } + + /** + * 处理超时自动完成 + */ + private void handleTimeoutComplete(Long deviceId, String deviceKey, Long areaId, + Long durationMs, Long lastLossTime) { + log.info("[SignalLoss] 触发自动完成:deviceId={}, areaId={}, duration={}ms", + deviceId, areaId, durationMs); + + // 1. 获取当前工单 + DeviceCurrentOrderRedisDAO.OrderCacheInfo currentOrder = + deviceCurrentOrderRedisDAO.getCurrentOrder(deviceId); + + if (currentOrder == null) { + log.warn("[SignalLoss] 设备无当前工单:deviceId={}", deviceId); + return; + } + + // 2. 构建触发数据 + Map triggerData = new HashMap<>(); + triggerData.put("durationMs", durationMs); + triggerData.put("lastLossTime", lastLossTime); + triggerData.put("completionReason", "SIGNAL_LOSS_TIMEOUT"); + + // 3. 发布完成事件 + try { + CleanOrderCompleteEvent event = CleanOrderCompleteEvent.builder() + .eventId(java.util.UUID.randomUUID().toString()) + .orderType("CLEAN") + .orderId(currentOrder.getOrderId()) + .deviceId(deviceId) + .deviceKey(deviceKey) + .areaId(areaId) + .triggerSource("IOT_SIGNAL_LOSS") + .triggerData(triggerData) + .build(); + + rocketMQTemplate.syncSend(CleanOrderTopics.ORDER_COMPLETE, MessageBuilder.withPayload(event).build()); + + log.info("[SignalLoss] 发布完成事件:eventId={}, orderId={}, duration={}ms", + event.getEventId(), currentOrder.getOrderId(), durationMs); + } catch (Exception e) { + log.error("[SignalLoss] 发布完成事件失败:deviceId={}, orderId={}", + deviceId, currentOrder.getOrderId(), e); + return; + } + + // 4. 发布审计日志 + Map auditData = new HashMap<>(); + auditData.put("durationMs", durationMs); + auditData.put("lastLossTime", lastLossTime); + + publishAuditEvent("BEACON_COMPLETE_REQUESTED", deviceId, deviceKey, areaId, + "信号丢失超时自动完成", auditData); + + // 5. 清理 Redis 数据 + cleanupRedisData(deviceId, areaId); + } + + /** + * 清理 Redis 数据 + */ + private void cleanupRedisData(Long deviceId, Long areaId) { + signalLossRedisDAO.clearLossRecord(deviceId, areaId); + arrivedTimeRedisDAO.clearArrivedTime(deviceId, areaId); + windowRedisDAO.clearWindow(deviceId, areaId); + + log.debug("[SignalLoss] 清理 Redis 数据:deviceId={}, areaId={}", deviceId, areaId); + } + + /** + * 发布审计事件 + */ + private void publishAuditEvent(String auditType, Long deviceId, String deviceKey, + Long areaId, String message, Map data) { + try { + CleanOrderAuditEvent event = CleanOrderAuditEvent.builder() + .eventId(java.util.UUID.randomUUID().toString()) + .auditType(auditType) + .deviceId(deviceId) + .deviceKey(deviceKey) + .areaId(areaId) + .message(message) + .data(data) + .build(); + + rocketMQTemplate.syncSend(CleanOrderTopics.ORDER_AUDIT, MessageBuilder.withPayload(event).build()); + + log.debug("[SignalLoss] 发布审计事件:auditType={}, deviceId={}", auditType, deviceId); + } catch (Exception e) { + log.error("[SignalLoss] 发布审计事件失败:auditType={}, deviceId={}", auditType, deviceId, e); + } + } + + /** + * 发布 TTS 事件 + */ + private void publishTtsEvent(Long deviceId, String text) { + Map data = new HashMap<>(); + data.put("tts", text); + data.put("timestamp", System.currentTimeMillis()); + + publishAuditEvent("TTS_REQUEST", deviceId, null, null, text, data); + } +} diff --git a/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/service/rule/clean/processor/TrafficThresholdRuleProcessor.java b/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/service/rule/clean/processor/TrafficThresholdRuleProcessor.java new file mode 100644 index 0000000..354eea2 --- /dev/null +++ b/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/service/rule/clean/processor/TrafficThresholdRuleProcessor.java @@ -0,0 +1,187 @@ +package com.viewsh.module.iot.service.rule.clean.processor; + +import com.viewsh.framework.common.util.json.JsonUtils; +import com.viewsh.module.iot.core.integration.constants.CleanOrderTopics; +import com.viewsh.module.iot.core.integration.event.clean.CleanOrderCreateEvent; +import com.viewsh.module.iot.dal.dataobject.integration.clean.CleanOrderIntegrationConfig; +import com.viewsh.module.iot.dal.dataobject.integration.clean.TrafficThresholdConfig; +import com.viewsh.module.iot.dal.redis.clean.DeviceCurrentOrderRedisDAO; +import com.viewsh.module.iot.dal.redis.clean.TrafficCounterBaseRedisDAO; +import com.viewsh.module.iot.service.integration.clean.CleanOrderIntegrationConfigService; +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.spring.core.RocketMQTemplate; +import org.springframework.messaging.support.MessageBuilder; +import org.springframework.stereotype.Component; + +import java.util.HashMap; +import java.util.Map; + +/** + * 客流阈值规则处理器 + *

+ * 监听设备属性上报,检测客流计数器是否达到阈值 + * 如果达到阈值,发布工单创建事件到 Ops 模块 + * + * @author AI + */ +@Component +@Slf4j +public class TrafficThresholdRuleProcessor { + + @Resource + private CleanOrderIntegrationConfigService configService; + + @Resource + private TrafficCounterBaseRedisDAO trafficBaseRedisDAO; + + @Resource + private DeviceCurrentOrderRedisDAO deviceCurrentOrderRedisDAO; + + @Resource + private RocketMQTemplate rocketMQTemplate; + + /** + * 处理客流属性上报 + *

+ * 在设备属性上报处理流程中调用此方法 + * + * @param deviceId 设备ID + * @param identifier 属性标识符(如 people_in) + * @param propertyValue 属性值 + */ + public void processPropertyChange(Long deviceId, String identifier, Object propertyValue) { + // 1. 检查是否是客流属性 + if (!"people_in".equals(identifier)) { + return; + } + + log.debug("[TrafficThreshold] 收到客流属性:deviceId={}, value={}", deviceId, propertyValue); + + // 2. 获取配置 + CleanOrderIntegrationConfigService.AreaDeviceConfigWrapper configWrapper = + getConfigWrapper(deviceId); + + if (configWrapper == null || configWrapper.getConfig() == null) { + log.debug("[TrafficThreshold] 设备无配置:deviceId={}", deviceId); + return; + } + + TrafficThresholdConfig thresholdConfig = configWrapper.getConfig().getTrafficThreshold(); + if (thresholdConfig == null || !thresholdConfig.getAutoCreateOrder()) { + log.debug("[TrafficThreshold] 未启用客流阈值触发:deviceId={}", deviceId); + return; + } + + // 3. 解析客流值 + Long currentCount = parseTrafficCount(propertyValue); + if (currentCount == null) { + log.warn("[TrafficThreshold] 客流值解析失败:deviceId={}, value={}", deviceId, propertyValue); + return; + } + + // 4. 计算实际客流(当前值 - 基准值) + Long baseValue = trafficBaseRedisDAO.getBaseValue(deviceId); + Long actualCount = currentCount - (baseValue != null ? baseValue : 0L); + + // 防止负数(设备重启后计数器归零) + if (actualCount < 0) { + log.warn("[TrafficThreshold] 检测到负数客流,重置基准值:deviceId={}, currentCount={}, baseValue={}", + deviceId, currentCount, baseValue); + trafficBaseRedisDAO.setBaseValue(deviceId, currentCount); + actualCount = 0L; + } + + log.debug("[TrafficThreshold] 客流统计:deviceId={}, currentCount={}, baseValue={}, actualCount={}, threshold={}", + deviceId, currentCount, baseValue, actualCount, thresholdConfig.getThreshold()); + + // 5. 阈值判定 + if (actualCount < thresholdConfig.getThreshold()) { + return; // 未达标 + } + + // 6. 防重复检查(使用 Redis 分布式锁) + String lockKey = String.format("iot:clean:traffic:lock:%s:%s", deviceId, configWrapper.getAreaId()); + Boolean locked = stringRedisTemplate.opsForValue() + .setIfAbsent(lockKey, "1", thresholdConfig.getTimeWindowSeconds(), java.util.concurrent.TimeUnit.SECONDS); + + if (!locked) { + log.info("[TrafficThreshold] 防重复触发:deviceId={}, areaId={}", deviceId, configWrapper.getAreaId()); + return; + } + + // 7. 发布工单创建事件 + publishCreateEvent(configWrapper, actualCount, baseValue, thresholdConfig.getThreshold()); + } + + /** + * 发布工单创建事件 + */ + private void publishCreateEvent(CleanOrderIntegrationConfigService.AreaDeviceConfigWrapper configWrapper, + Long actualCount, Long baseValue, Integer threshold) { + try { + CleanOrderCreateEvent event = CleanOrderCreateEvent.builder() + .orderType("CLEAN") + .areaId(configWrapper.getAreaId()) + .triggerSource("IOT_TRAFFIC") + .triggerDeviceId(configWrapper.getDeviceId()) + .triggerDeviceKey(configWrapper.getDeviceKey()) + .priority(configWrapper.getConfig().getTrafficThreshold().getOrderPriority()) + .triggerData(buildTriggerData(actualCount, baseValue, threshold)) + .build(); + + rocketMQTemplate.syncSend(CleanOrderTopics.ORDER_CREATE, MessageBuilder.withPayload(event).build()); + + log.info("[TrafficThreshold] 发布工单创建事件:eventId={}, areaId={}, actualCount={}, threshold={}", + event.getEventId(), configWrapper.getAreaId(), actualCount, threshold); + } catch (Exception e) { + log.error("[TrafficThreshold] 发布工单创建事件失败:deviceId={}, areaId={}", + configWrapper.getDeviceId(), configWrapper.getAreaId(), e); + } + } + + /** + * 构建触发数据 + */ + private Map buildTriggerData(Long actualCount, Long baseValue, Integer threshold) { + Map data = new HashMap<>(); + data.put("actualCount", actualCount); + data.put("baseValue", baseValue); + data.put("threshold", threshold); + data.put("exceededCount", actualCount - threshold); + return data; + } + + /** + * 获取配置包装器 + */ + private CleanOrderIntegrationConfigService.AreaDeviceConfigWrapper getConfigWrapper(Long deviceId) { + return configService.getConfigWrapperByDeviceId(deviceId); + } + + /** + * 解析客流计数值 + */ + private Long parseTrafficCount(Object value) { + if (value == null) { + return null; + } + + if (value instanceof Number) { + return ((Number) value).longValue(); + } + + if (value instanceof String) { + try { + return Long.parseLong((String) value); + } catch (NumberFormatException e) { + return null; + } + } + + return null; + } + + @Resource + private org.springframework.data.redis.core.StringRedisTemplate stringRedisTemplate; +}