diff --git a/viewsh-module-iot/viewsh-module-iot-core/src/main/java/com/viewsh/module/iot/core/integration/constants/CleanOrderTopics.java b/viewsh-module-iot/viewsh-module-iot-core/src/main/java/com/viewsh/module/iot/core/integration/constants/CleanOrderTopics.java
new file mode 100644
index 0000000..b0e53fa
--- /dev/null
+++ b/viewsh-module-iot/viewsh-module-iot-core/src/main/java/com/viewsh/module/iot/core/integration/constants/CleanOrderTopics.java
@@ -0,0 +1,37 @@
+package com.viewsh.module.iot.core.integration.constants;
+
+/**
+ * Ops 业务事件 Topic 常量
+ *
+ * 定义 IoT → Ops 的业务事件 Topic
+ *
+ * @author AI
+ */
+public interface CleanOrderTopics {
+
+ /**
+ * 保洁工单创建事件
+ */
+ String ORDER_CREATE = "ops.order.create";
+
+ /**
+ * 保洁工单到岗事件
+ */
+ String ORDER_ARRIVE = "ops.order.arrive";
+
+ /**
+ * 保洁工单完成事件
+ */
+ String ORDER_COMPLETE = "ops.order.complete";
+
+ /**
+ * 保洁工单审计事件
+ */
+ String ORDER_AUDIT = "ops.order.audit";
+
+ /**
+ * 保洁工单确认事件(按键确认)
+ */
+ String ORDER_CONFIRM = "ops.order.confirm";
+
+}
diff --git a/viewsh-module-iot/viewsh-module-iot-core/src/main/java/com/viewsh/module/iot/core/integration/event/clean/CleanOrderArriveEvent.java b/viewsh-module-iot/viewsh-module-iot-core/src/main/java/com/viewsh/module/iot/core/integration/event/clean/CleanOrderArriveEvent.java
new file mode 100644
index 0000000..cc36343
--- /dev/null
+++ b/viewsh-module-iot/viewsh-module-iot-core/src/main/java/com/viewsh/module/iot/core/integration/event/clean/CleanOrderArriveEvent.java
@@ -0,0 +1,79 @@
+package com.viewsh.module.iot.core.integration.event.clean;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.time.LocalDateTime;
+import java.util.Map;
+import java.util.UUID;
+
+/**
+ * 保洁工单到岗事件
+ *
+ * 当工牌检测到蓝牙信标时,IoT 模块发布此事件到 Ops 模块
+ * Topic: ops.order.arrive
+ *
+ * @author AI
+ */
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class CleanOrderArriveEvent {
+
+ /**
+ * 事件ID(唯一标识,用于幂等性处理)
+ */
+ @Builder.Default
+ private String eventId = UUID.randomUUID().toString();
+
+ /**
+ * 工单类型
+ */
+ private String orderType;
+
+ /**
+ * 工单ID(可选,如果已知)
+ */
+ private Long orderId;
+
+ /**
+ * 设备ID(工牌)
+ */
+ private Long deviceId;
+
+ /**
+ * 设备Key
+ */
+ private String deviceKey;
+
+ /**
+ * 区域ID
+ */
+ private Long areaId;
+
+ /**
+ * 触发来源
+ */
+ private String triggerSource;
+
+ /**
+ * 触发数据(上下文信息)
+ *
+ * 例如:{beaconMac: "F0:C8:60:1D:10:BB", rssi: -66, windowSnapshot: [-68, -66, -69]}
+ */
+ private Map triggerData;
+
+ /**
+ * 事件时间
+ */
+ @Builder.Default
+ private LocalDateTime eventTime = LocalDateTime.now();
+
+ /**
+ * 租户ID
+ */
+ private Long tenantId;
+}
diff --git a/viewsh-module-iot/viewsh-module-iot-core/src/main/java/com/viewsh/module/iot/core/integration/event/clean/CleanOrderAuditEvent.java b/viewsh-module-iot/viewsh-module-iot-core/src/main/java/com/viewsh/module/iot/core/integration/event/clean/CleanOrderAuditEvent.java
new file mode 100644
index 0000000..a153595
--- /dev/null
+++ b/viewsh-module-iot/viewsh-module-iot-core/src/main/java/com/viewsh/module/iot/core/integration/event/clean/CleanOrderAuditEvent.java
@@ -0,0 +1,98 @@
+package com.viewsh.module.iot.core.integration.event.clean;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.time.LocalDateTime;
+import java.util.Map;
+import java.util.UUID;
+
+/**
+ * 保洁工单审计事件
+ *
+ * 用于记录不改变工单状态但需要记录的关键节点
+ * 例如:离岗警告、无效作业拦截
+ * Topic: ops.order.audit
+ *
+ * @author AI
+ */
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class CleanOrderAuditEvent {
+
+ /**
+ * 事件ID(唯一标识,用于幂等性处理)
+ */
+ @Builder.Default
+ private String eventId = UUID.randomUUID().toString();
+
+ /**
+ * 审计类型
+ *
+ * BEACON_ARRIVE_CONFIRMED - 信标到岗确认
+ * LEAVE_WARNING_SENT - 离岗警告已发送
+ * COMPLETE_SUPPRESSED_INVALID - 完成被抑制(作业时长不足)
+ * BEACON_COMPLETE_REQUESTED - 信标触发完成请求
+ */
+ private String auditType;
+
+ /**
+ * 工单ID(可选)
+ */
+ private Long orderId;
+
+ /**
+ * 设备ID(工牌)
+ */
+ private Long deviceId;
+
+ /**
+ * 设备Key
+ */
+ private String deviceKey;
+
+ /**
+ * 区域ID
+ */
+ private Long areaId;
+
+ /**
+ * 保洁员ID(可选)
+ */
+ private Long cleanerId;
+
+ /**
+ * 事件级别
+ *
+ * INFO - 信息
+ * WARN - 警告
+ * ERROR - 错误
+ */
+ @Builder.Default
+ private String level = "INFO";
+
+ /**
+ * 审计数据(结构化上下文)
+ */
+ private Map data;
+
+ /**
+ * 事件消息(可读描述)
+ */
+ private String message;
+
+ /**
+ * 事件时间
+ */
+ @Builder.Default
+ private LocalDateTime eventTime = LocalDateTime.now();
+
+ /**
+ * 租户ID
+ */
+ private Long tenantId;
+}
diff --git a/viewsh-module-iot/viewsh-module-iot-core/src/main/java/com/viewsh/module/iot/core/integration/event/clean/CleanOrderCompleteEvent.java b/viewsh-module-iot/viewsh-module-iot-core/src/main/java/com/viewsh/module/iot/core/integration/event/clean/CleanOrderCompleteEvent.java
new file mode 100644
index 0000000..8ce377d
--- /dev/null
+++ b/viewsh-module-iot/viewsh-module-iot-core/src/main/java/com/viewsh/module/iot/core/integration/event/clean/CleanOrderCompleteEvent.java
@@ -0,0 +1,79 @@
+package com.viewsh.module.iot.core.integration.event.clean;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.time.LocalDateTime;
+import java.util.Map;
+import java.util.UUID;
+
+/**
+ * 保洁工单完成事件
+ *
+ * 当工牌信号丢失超时时,IoT 模块发布此事件到 Ops 模块
+ * Topic: ops.order.complete
+ *
+ * @author AI
+ */
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class CleanOrderCompleteEvent {
+
+ /**
+ * 事件ID(唯一标识,用于幂等性处理)
+ */
+ @Builder.Default
+ private String eventId = UUID.randomUUID().toString();
+
+ /**
+ * 工单类型
+ */
+ private String orderType;
+
+ /**
+ * 工单ID
+ */
+ private Long orderId;
+
+ /**
+ * 设备ID(工牌)
+ */
+ private Long deviceId;
+
+ /**
+ * 设备Key
+ */
+ private String deviceKey;
+
+ /**
+ * 区域ID
+ */
+ private Long areaId;
+
+ /**
+ * 触发来源
+ */
+ private String triggerSource;
+
+ /**
+ * 触发数据(上下文信息)
+ *
+ * 例如:{durationMs: 780000, lastLossTime: 1736913000000, minValidWorkMinutes: 3}
+ */
+ private Map triggerData;
+
+ /**
+ * 事件时间
+ */
+ @Builder.Default
+ private LocalDateTime eventTime = LocalDateTime.now();
+
+ /**
+ * 租户ID
+ */
+ private Long tenantId;
+}
diff --git a/viewsh-module-iot/viewsh-module-iot-core/src/main/java/com/viewsh/module/iot/core/integration/event/clean/CleanOrderCreateEvent.java b/viewsh-module-iot/viewsh-module-iot-core/src/main/java/com/viewsh/module/iot/core/integration/event/clean/CleanOrderCreateEvent.java
new file mode 100644
index 0000000..20e7078
--- /dev/null
+++ b/viewsh-module-iot/viewsh-module-iot-core/src/main/java/com/viewsh/module/iot/core/integration/event/clean/CleanOrderCreateEvent.java
@@ -0,0 +1,86 @@
+package com.viewsh.module.iot.core.integration.event.clean;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.time.LocalDateTime;
+import java.util.Map;
+import java.util.UUID;
+
+/**
+ * 保洁工单创建事件
+ *
+ * 当客流阈值触发时,IoT 模块发布此事件到 Ops 模块
+ * Topic: ops.order.create
+ *
+ * @author AI
+ */
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class CleanOrderCreateEvent {
+
+ /**
+ * 事件ID(唯一标识,用于幂等性处理)
+ */
+ @Builder.Default
+ private String eventId = UUID.randomUUID().toString();
+
+ /**
+ * 工单类型
+ */
+ private String orderType;
+
+ /**
+ * 区域ID
+ */
+ private Long areaId;
+
+ /**
+ * 触发来源
+ *
+ * IOT_TRAFFIC - 客流触发
+ * IOT_ALERT - 告警触发
+ */
+ private String triggerSource;
+
+ /**
+ * 触发设备ID
+ */
+ private Long triggerDeviceId;
+
+ /**
+ * 触发设备Key
+ */
+ private String triggerDeviceKey;
+
+ /**
+ * 触发数据(上下文信息)
+ *
+ * 例如:{actualCount: 150, threshold: 100, baseValue: 50}
+ */
+ private Map triggerData;
+
+ /**
+ * 工单优先级
+ *
+ * P0 - 紧急
+ * P1 - 重要
+ * P2 - 普通
+ */
+ private String priority;
+
+ /**
+ * 事件时间
+ */
+ @Builder.Default
+ private LocalDateTime eventTime = LocalDateTime.now();
+
+ /**
+ * 租户ID
+ */
+ private Long tenantId;
+}
diff --git a/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/service/device/property/IotDevicePropertyServiceImpl.java b/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/service/device/property/IotDevicePropertyServiceImpl.java
index b9b02e1..7412be9 100644
--- a/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/service/device/property/IotDevicePropertyServiceImpl.java
+++ b/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/service/device/property/IotDevicePropertyServiceImpl.java
@@ -85,6 +85,12 @@ public class IotDevicePropertyServiceImpl implements IotDevicePropertyService {
@Resource
private IntegrationEventPublisher integrationEventPublisher;
+ @Resource
+ private com.viewsh.module.iot.service.rule.clean.processor.TrafficThresholdRuleProcessor trafficThresholdRuleProcessor;
+
+ @Resource
+ private com.viewsh.module.iot.service.rule.clean.processor.BeaconDetectionRuleProcessor beaconDetectionRuleProcessor;
+
// ========== 设备属性相关操作 ==========
@Override
@@ -171,10 +177,41 @@ public class IotDevicePropertyServiceImpl implements IotDevicePropertyService {
IotDevicePropertyDO.builder().value(entry.getValue()).updateTime(message.getReportTime()).build());
deviceDataRedisDAO.putAll(device.getId(), properties2);
- // 2.3 发布属性消息到 Redis Stream(供其他模块如 Ops 订阅)
+ // 2.3 调用规则处理器(保洁工单集成)
+ processRuleProcessors(device, properties);
+
+ // 2.4 发布属性消息到 Redis Stream(供其他模块如 Ops 订阅)
publishPropertyMessage(device, properties, message.getReportTime());
}
+ /**
+ * 处理规则处理器(保洁工单集成)
+ *
+ * 在设备属性上报处理流程中调用,检测是否满足工单创建/到岗/完成条件
+ *
+ * @param device 设备信息
+ * @param properties 属性数据
+ */
+ private void processRuleProcessors(IotDeviceDO device, Map properties) {
+ try {
+ // 遍历所有属性,调用规则处理器
+ for (Map.Entry entry : properties.entrySet()) {
+ String identifier = entry.getKey();
+ Object value = entry.getValue();
+
+ // 调用客流阈值规则处理器
+ trafficThresholdRuleProcessor.processPropertyChange(device.getId(), identifier, value);
+
+ // 调用蓝牙信标检测规则处理器
+ beaconDetectionRuleProcessor.processPropertyChange(device.getId(), identifier, value);
+ }
+ } catch (Exception e) {
+ // 规则处理器异常不应阻塞属性上报主流程
+ log.error("[processRuleProcessors] 规则处理器调用失败: deviceId={}, properties={}",
+ device.getId(), properties.keySet(), e);
+ }
+ }
+
/**
* 发布设备属性消息
*
diff --git a/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/service/integration/clean/CleanOrderIntegrationConfigService.java b/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/service/integration/clean/CleanOrderIntegrationConfigService.java
index 4f58291..5a42f97 100644
--- a/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/service/integration/clean/CleanOrderIntegrationConfigService.java
+++ b/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/service/integration/clean/CleanOrderIntegrationConfigService.java
@@ -23,6 +23,16 @@ public interface CleanOrderIntegrationConfigService {
*/
CleanOrderIntegrationConfig getConfigByDeviceId(Long deviceId);
+ /**
+ * 根据设备ID查询配置包装器(包含完整信息)
+ *
+ * 返回包含设备ID、区域ID、关联类型等完整信息的配置包装器
+ *
+ * @param deviceId 设备ID
+ * @return 配置包装器,如果不存在或未启用返回 null
+ */
+ AreaDeviceConfigWrapper getConfigWrapperByDeviceId(Long deviceId);
+
/**
* 根据区域ID查询所有启用的配置(带缓存)
*
diff --git a/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/service/integration/clean/CleanOrderIntegrationConfigServiceImpl.java b/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/service/integration/clean/CleanOrderIntegrationConfigServiceImpl.java
index 2d96da9..1dde87c 100644
--- a/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/service/integration/clean/CleanOrderIntegrationConfigServiceImpl.java
+++ b/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/service/integration/clean/CleanOrderIntegrationConfigServiceImpl.java
@@ -107,6 +107,19 @@ public class CleanOrderIntegrationConfigServiceImpl implements CleanOrderIntegra
.collect(Collectors.toList());
}
+ @Override
+ public AreaDeviceConfigWrapper getConfigWrapperByDeviceId(Long deviceId) {
+ log.debug("[CleanOrderConfig] 查询设备完整配置:deviceId={}", deviceId);
+
+ OpsAreaDeviceRelationDO relation = relationMapper.selectByDeviceId(deviceId);
+
+ if (relation == null || !relation.getEnabled()) {
+ return null;
+ }
+
+ return wrapConfig(relation);
+ }
+
@Override
public void evictCache(Long deviceId) {
String cacheKey = formatDeviceKey(deviceId);
diff --git a/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/service/rule/clean/detector/RssiSlidingWindowDetector.java b/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/service/rule/clean/detector/RssiSlidingWindowDetector.java
new file mode 100644
index 0000000..679b4da
--- /dev/null
+++ b/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/service/rule/clean/detector/RssiSlidingWindowDetector.java
@@ -0,0 +1,151 @@
+package com.viewsh.module.iot.service.rule.clean.detector;
+
+import com.viewsh.module.iot.dal.dataobject.integration.clean.BeaconPresenceConfig;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.List;
+
+/**
+ * RSSI 滑动窗口检测器
+ *
+ * 实现"强进弱出"双阈值算法,解决蓝牙信号漂移和多径效应导致的误判问题
+ *
+ * 核心思想:
+ * 1. 进入(到岗)使用强阈值(如 -70dBm),只有信号足够强才算到达,避免路过误判
+ * 2. 退出(离岗)使用弱阈值(如 -85dBm),只有信号足够弱或彻底消失才算离开,避免边缘抖动
+ * 3. 引入状态粘性:一旦判定进入,需要满足更严格的退出条件才能判定离开
+ *
+ * @author AI
+ */
+@Slf4j
+public class RssiSlidingWindowDetector {
+
+ /**
+ * 区域状态枚举
+ */
+ public enum AreaState {
+ /**
+ * 在区域内
+ */
+ IN_AREA,
+ /**
+ * 在区域外
+ */
+ OUT_AREA
+ }
+
+ /**
+ * 检测结果枚举
+ */
+ public enum DetectionResult {
+ /**
+ * 确认到达
+ */
+ ARRIVE_CONFIRMED,
+ /**
+ * 确认离开
+ */
+ LEAVE_CONFIRMED,
+ /**
+ * 无变化
+ */
+ NO_CHANGE
+ }
+
+ /**
+ * 执行检测(强进弱出算法)
+ *
+ * @param window RSSI 滑动窗口样本(从旧到新)
+ * @param enterConfig 进入(到岗)判定配置 - 强阈值
+ * @param exitConfig 退出(离岗)判定配置 - 弱阈值
+ * @param currentState 当前状态
+ * @return 检测结果
+ */
+ public DetectionResult detect(
+ List window,
+ BeaconPresenceConfig.EnterConfig enterConfig,
+ BeaconPresenceConfig.ExitConfig exitConfig,
+ AreaState currentState) {
+
+ if (window == null || window.isEmpty()) {
+ log.debug("[RssiDetector] 窗口为空,返回无变化");
+ return DetectionResult.NO_CHANGE;
+ }
+
+ // 1. 统计满足强阈值的样本数(用于进入判定)
+ long enterHits = window.stream()
+ .filter(rssi -> rssi >= enterConfig.getRssiThreshold())
+ .count();
+
+ // 2. 统计满足弱阈值的样本数(用于退出判定)
+ // 条件:RSSI < 弱阈值 或 为缺失值(-999)
+ long exitHits = window.stream()
+ .filter(rssi -> rssi < exitConfig.getWeakRssiThreshold() || rssi == -999)
+ .count();
+
+ log.debug("[RssiDetector] 检测统计:currentState={}, windowSize={}, enterHits={}/{}, exitHits={}/{}",
+ currentState, window.size(), enterHits, enterConfig.getHitCount(),
+ exitHits, exitConfig.getHitCount());
+
+ // 3. 状态转换判定
+ if (currentState == AreaState.OUT_AREA) {
+ // 当前在区域外,检测是否进入
+ if (enterHits >= enterConfig.getHitCount()) {
+ log.info("[RssiDetector] 进入条件满足:enterHits={}, required={}",
+ enterHits, enterConfig.getHitCount());
+ return DetectionResult.ARRIVE_CONFIRMED;
+ }
+ } else {
+ // 当前在区域内,检测是否离开
+ if (exitHits >= exitConfig.getHitCount()) {
+ log.info("[RssiDetector] 离开条件满足:exitHits={}, required={}",
+ exitHits, exitConfig.getHitCount());
+ return DetectionResult.LEAVE_CONFIRMED;
+ }
+ }
+
+ return DetectionResult.NO_CHANGE;
+ }
+
+ /**
+ * 从蓝牙设备列表中提取目标信标的 RSSI
+ *
+ * @param bluetoothDevices 蓝牙设备列表
+ * 格式:[{"rssi":-52,"type":243,"mac":"F0:C8:60:1D:10:BB"},...]
+ * @param targetMac 目标信标 MAC 地址
+ * @return RSSI 值,如果未找到返回 -999(缺失值)
+ */
+ public Integer extractTargetRssi(Object bluetoothDevices, String targetMac) {
+ if (bluetoothDevices == null) {
+ return -999;
+ }
+
+ try {
+ @SuppressWarnings("unchecked")
+ List> deviceList =
+ (List>) bluetoothDevices;
+
+ return deviceList.stream()
+ .filter(device -> targetMac.equals(device.get("mac")))
+ .map(device -> (Integer) device.get("rssi"))
+ .findFirst()
+ .orElse(-999);
+ } catch (Exception e) {
+ log.error("[RssiDetector] 解析蓝牙数据失败:targetMac={}", targetMac, e);
+ return -999;
+ }
+ }
+
+ /**
+ * 判断窗口是否有效
+ *
+ * 窗口大小必须 >= 配置的窗口大小
+ *
+ * @param window 窗口样本
+ * @param windowSize 配置的窗口大小
+ * @return true - 有效,false - 无效
+ */
+ public boolean isWindowValid(List window, int windowSize) {
+ return window != null && window.size() >= windowSize;
+ }
+}
diff --git a/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/service/rule/clean/processor/BeaconDetectionRuleProcessor.java b/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/service/rule/clean/processor/BeaconDetectionRuleProcessor.java
new file mode 100644
index 0000000..2d21d20
--- /dev/null
+++ b/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/service/rule/clean/processor/BeaconDetectionRuleProcessor.java
@@ -0,0 +1,282 @@
+package com.viewsh.module.iot.service.rule.clean.processor;
+
+import com.viewsh.module.iot.core.integration.constants.CleanOrderTopics;
+import com.viewsh.module.iot.core.integration.event.clean.CleanOrderArriveEvent;
+import com.viewsh.module.iot.core.integration.event.clean.CleanOrderAuditEvent;
+import com.viewsh.module.iot.dal.dataobject.integration.clean.BeaconPresenceConfig;
+import com.viewsh.module.iot.dal.redis.clean.BeaconArrivedTimeRedisDAO;
+import com.viewsh.module.iot.dal.redis.clean.BeaconRssiWindowRedisDAO;
+import com.viewsh.module.iot.dal.redis.clean.DeviceCurrentOrderRedisDAO;
+import com.viewsh.module.iot.dal.redis.clean.SignalLossRedisDAO;
+import com.viewsh.module.iot.service.integration.clean.CleanOrderIntegrationConfigService;
+import com.viewsh.module.iot.service.rule.clean.detector.RssiSlidingWindowDetector;
+import jakarta.annotation.Resource;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.rocketmq.spring.core.RocketMQTemplate;
+import org.springframework.messaging.support.MessageBuilder;
+import org.springframework.stereotype.Component;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * 蓝牙信标检测规则处理器
+ *
+ * 监听工牌的蓝牙属性上报,基于滑动窗口算法检测保洁员到岗/离岗
+ * 采用"强进弱出"双阈值,避免信号抖动
+ *
+ * @author AI
+ */
+@Component
+@Slf4j
+public class BeaconDetectionRuleProcessor {
+
+ @Resource
+ private BeaconRssiWindowRedisDAO windowRedisDAO;
+
+ @Resource
+ private BeaconArrivedTimeRedisDAO arrivedTimeRedisDAO;
+
+ @Resource
+ private SignalLossRedisDAO signalLossRedisDAO;
+
+ @Resource
+ private DeviceCurrentOrderRedisDAO deviceCurrentOrderRedisDAO;
+
+ @Resource
+ private CleanOrderIntegrationConfigService configService;
+
+ @Resource
+ private RssiSlidingWindowDetector detector;
+
+ @Resource
+ private RocketMQTemplate rocketMQTemplate;
+
+ /**
+ * 处理蓝牙属性上报
+ *
+ * 在设备属性上报处理流程中调用此方法
+ *
+ * @param deviceId 设备ID
+ * @param identifier 属性标识符(bluetoothDevices)
+ * @param propertyValue 属性值(蓝牙设备数组)
+ */
+ public void processPropertyChange(Long deviceId, String identifier, Object propertyValue) {
+ // 1. 检查是否是蓝牙属性
+ if (!"bluetoothDevices".equals(identifier)) {
+ return;
+ }
+
+ log.debug("[BeaconDetection] 收到蓝牙属性:deviceId={}", deviceId);
+
+ // 2. 获取配置
+ CleanOrderIntegrationConfigService.AreaDeviceConfigWrapper configWrapper =
+ configService.getConfigWrapperByDeviceId(deviceId);
+
+ if (configWrapper == null || configWrapper.getConfig() == null) {
+ log.debug("[BeaconDetection] 设备无配置:deviceId={}", deviceId);
+ return;
+ }
+
+ BeaconPresenceConfig beaconConfig = configWrapper.getConfig().getBeaconPresence();
+ if (beaconConfig == null || !beaconConfig.getEnabled()) {
+ log.debug("[BeaconDetection] 未启用信标检测:deviceId={}", deviceId);
+ return;
+ }
+
+ Long areaId = configWrapper.getAreaId();
+
+ // 3. 解析蓝牙数据,提取目标信标的 RSSI
+ Integer targetRssi = detector.extractTargetRssi(propertyValue, beaconConfig.getBeaconMac());
+
+ log.debug("[BeaconDetection] 提取RSSI:deviceId={}, beaconMac={}, rssi={}",
+ deviceId, beaconConfig.getBeaconMac(), targetRssi);
+
+ // 4. 更新滑动窗口
+ BeaconPresenceConfig.WindowConfig windowConfig = beaconConfig.getWindow();
+ windowRedisDAO.updateWindow(deviceId, areaId, targetRssi, windowConfig.getSampleTtlSeconds());
+
+ // 5. 获取当前窗口样本
+ List window = windowRedisDAO.getWindow(deviceId, areaId);
+
+ // 6. 获取设备当前工单状态
+ DeviceCurrentOrderRedisDAO.OrderCacheInfo currentOrder =
+ deviceCurrentOrderRedisDAO.getCurrentOrder(deviceId);
+
+ // 7. 确定当前状态
+ RssiSlidingWindowDetector.AreaState currentState = determineState(currentOrder, areaId);
+
+ // 8. 执行检测
+ RssiSlidingWindowDetector.DetectionResult result = detector.detect(
+ window,
+ beaconConfig.getEnter(),
+ beaconConfig.getExit(),
+ currentState
+ );
+
+ // 9. 处理检测结果
+ switch (result) {
+ case ARRIVE_CONFIRMED:
+ handleArriveConfirmed(deviceId, areaId, window, beaconConfig, configWrapper);
+ break;
+ case LEAVE_CONFIRMED:
+ handleLeaveConfirmed(deviceId, areaId, window, beaconConfig);
+ break;
+ default:
+ // NO_CHANGE,不做处理
+ break;
+ }
+ }
+
+ /**
+ * 处理到达确认
+ */
+ private void handleArriveConfirmed(Long deviceId, Long areaId, List window,
+ BeaconPresenceConfig beaconConfig,
+ CleanOrderIntegrationConfigService.AreaDeviceConfigWrapper configWrapper) {
+ log.info("[BeaconDetection] 到达确认:deviceId={}, areaId={}, window={}",
+ deviceId, areaId, window);
+
+ // 1. 记录到达时间
+ arrivedTimeRedisDAO.recordArrivedTime(deviceId, areaId, System.currentTimeMillis());
+
+ // 2. 清除离岗记录(如果存在)
+ signalLossRedisDAO.clearLossRecord(deviceId, areaId);
+
+ // 3. 获取当前最新的 RSSI 值
+ Integer currentRssi = window.isEmpty() ? -999 : window.get(window.size() - 1);
+
+ // 4. 构建触发数据
+ Map triggerData = new HashMap<>();
+ triggerData.put("beaconMac", beaconConfig.getBeaconMac());
+ triggerData.put("rssi", currentRssi);
+ triggerData.put("windowSnapshot", window);
+ triggerData.put("enterRssiThreshold", beaconConfig.getEnter().getRssiThreshold());
+
+ // 5. 发布到岗事件
+ if (beaconConfig.getEnter().getAutoArrival()) {
+ publishArriveEvent(deviceId, configWrapper.getDeviceKey(), areaId, triggerData);
+ }
+
+ // 6. 发布审计日志
+ publishAuditEvent("BEACON_ARRIVE_CONFIRMED", deviceId, configWrapper.getDeviceKey(), areaId,
+ "蓝牙信标自动到岗确认", triggerData);
+ }
+
+ /**
+ * 处理离开确认
+ */
+ private void handleLeaveConfirmed(Long deviceId, Long areaId, List window,
+ BeaconPresenceConfig beaconConfig) {
+ log.info("[BeaconDetection] 离开确认:deviceId={}, areaId={}, window={}",
+ deviceId, areaId, window);
+
+ BeaconPresenceConfig.ExitConfig exitConfig = beaconConfig.getExit();
+
+ // 1. 检查是否是首次丢失
+ Long firstLossTime = signalLossRedisDAO.getFirstLossTime(deviceId, areaId);
+
+ if (firstLossTime == null) {
+ // 首次丢失
+ signalLossRedisDAO.recordFirstLoss(deviceId, areaId, System.currentTimeMillis());
+
+ // 2. 发送警告
+ publishTtsEvent(deviceId, "你已离开当前区域," +
+ (exitConfig.getLossTimeoutMinutes() > 0 ?
+ exitConfig.getLossTimeoutMinutes() + "分钟内工单将自动结算" : "工单将自动结算"));
+
+ // 3. 发布审计日志
+ Map data = new HashMap<>();
+ data.put("firstLossTime", System.currentTimeMillis());
+ data.put("rssi", window.isEmpty() ? -999 : window.get(window.size() - 1));
+ data.put("warningDelayMinutes", exitConfig.getWarningDelayMinutes());
+
+ publishAuditEvent("BEACON_LEAVE_WARNING_SENT", deviceId, null, areaId,
+ "保洁员离开作业区域,已发送警告", data);
+ } else {
+ // 4. 更新最后丢失时间
+ signalLossRedisDAO.updateLastLossTime(deviceId, areaId, System.currentTimeMillis());
+
+ log.debug("[BeaconDetection] 更新最后丢失时间:deviceId={}, areaId={}", deviceId, areaId);
+ }
+ }
+
+ /**
+ * 发布到岗事件
+ */
+ private void publishArriveEvent(Long deviceId, String deviceKey, Long areaId, Map triggerData) {
+ try {
+ CleanOrderArriveEvent event = CleanOrderArriveEvent.builder()
+ .eventId(java.util.UUID.randomUUID().toString())
+ .orderType("CLEAN")
+ .deviceId(deviceId)
+ .deviceKey(deviceKey)
+ .areaId(areaId)
+ .triggerSource("IOT_BEACON")
+ .triggerData(triggerData)
+ .build();
+
+ rocketMQTemplate.syncSend(CleanOrderTopics.ORDER_ARRIVE, MessageBuilder.withPayload(event).build());
+
+ log.info("[BeaconDetection] 发布到岗事件:eventId={}, deviceId={}, areaId={}",
+ event.getEventId(), deviceId, areaId);
+ } catch (Exception e) {
+ log.error("[BeaconDetection] 发布到岗事件失败:deviceId={}, areaId={}", deviceId, areaId, e);
+ }
+ }
+
+ /**
+ * 发布审计事件
+ */
+ private void publishAuditEvent(String auditType, Long deviceId, String deviceKey,
+ Long areaId, String message, Map data) {
+ try {
+ CleanOrderAuditEvent event = CleanOrderAuditEvent.builder()
+ .eventId(java.util.UUID.randomUUID().toString())
+ .auditType(auditType)
+ .deviceId(deviceId)
+ .deviceKey(deviceKey)
+ .areaId(areaId)
+ .message(message)
+ .data(data)
+ .build();
+
+ rocketMQTemplate.syncSend(CleanOrderTopics.ORDER_AUDIT, MessageBuilder.withPayload(event).build());
+
+ log.debug("[BeaconDetection] 发布审计事件:auditType={}, deviceId={}, areaId={}",
+ auditType, deviceId, areaId);
+ } catch (Exception e) {
+ log.error("[BeaconDetection] 发布审计事件失败:auditType={}, deviceId={}", auditType, deviceId, e);
+ }
+ }
+
+ /**
+ * 发布 TTS 事件(通过审计事件传递)
+ */
+ private void publishTtsEvent(Long deviceId, String text) {
+ Map data = new HashMap<>();
+ data.put("tts", text);
+ data.put("timestamp", System.currentTimeMillis());
+
+ publishAuditEvent("TTS_REQUEST", deviceId, null, null, text, data);
+ }
+
+ /**
+ * 确定当前状态
+ */
+ private RssiSlidingWindowDetector.AreaState determineState(
+ DeviceCurrentOrderRedisDAO.OrderCacheInfo currentOrder, Long areaId) {
+
+ if (currentOrder == null) {
+ return RssiSlidingWindowDetector.AreaState.OUT_AREA;
+ }
+
+ // 检查工单状态和区域是否匹配
+ if ("ARRIVED".equals(currentOrder.getStatus()) && areaId.equals(currentOrder.getAreaId())) {
+ return RssiSlidingWindowDetector.AreaState.IN_AREA;
+ }
+
+ return RssiSlidingWindowDetector.AreaState.OUT_AREA;
+ }
+}
diff --git a/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/service/rule/clean/processor/SignalLossRuleProcessor.java b/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/service/rule/clean/processor/SignalLossRuleProcessor.java
new file mode 100644
index 0000000..4fb076d
--- /dev/null
+++ b/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/service/rule/clean/processor/SignalLossRuleProcessor.java
@@ -0,0 +1,309 @@
+package com.viewsh.module.iot.service.rule.clean.processor;
+
+import com.viewsh.module.iot.core.integration.constants.CleanOrderTopics;
+import com.viewsh.module.iot.core.integration.event.clean.CleanOrderAuditEvent;
+import com.viewsh.module.iot.core.integration.event.clean.CleanOrderCompleteEvent;
+import com.viewsh.module.iot.dal.dataobject.integration.clean.BeaconPresenceConfig;
+import com.viewsh.module.iot.dal.redis.clean.BeaconArrivedTimeRedisDAO;
+import com.viewsh.module.iot.dal.redis.clean.BeaconRssiWindowRedisDAO;
+import com.viewsh.module.iot.dal.redis.clean.DeviceCurrentOrderRedisDAO;
+import com.viewsh.module.iot.dal.redis.clean.SignalLossRedisDAO;
+import com.viewsh.module.iot.service.integration.clean.CleanOrderIntegrationConfigService;
+import com.xxl.job.core.handler.annotation.XxlJob;
+import jakarta.annotation.Resource;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.rocketmq.spring.core.RocketMQTemplate;
+import org.springframework.messaging.support.MessageBuilder;
+import org.springframework.stereotype.Component;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import org.springframework.data.redis.core.StringRedisTemplate;
+
+/**
+ * 信号丢失规则处理器
+ *
+ * 定时检查离岗后N分钟则触发工单自动完成
+ * 包含作业时长有效性校验,防止"打卡即走"作弊
+ *
+ * @author AI
+ */
+@Component
+@Slf4j
+public class SignalLossRuleProcessor {
+
+ @Resource
+ private SignalLossRedisDAO signalLossRedisDAO;
+
+ @Resource
+ private BeaconArrivedTimeRedisDAO arrivedTimeRedisDAO;
+
+ @Resource
+ private BeaconRssiWindowRedisDAO windowRedisDAO;
+
+ @Resource
+ private DeviceCurrentOrderRedisDAO deviceCurrentOrderRedisDAO;
+
+ @Resource
+ private CleanOrderIntegrationConfigService configService;
+
+ @Resource
+ private RocketMQTemplate rocketMQTemplate;
+
+ @Resource
+ private StringRedisTemplate stringRedisTemplate;
+
+ /**
+ * Redis Key 模式:扫描所有离岗记录
+ */
+ private static final String LOSS_KEY_PATTERN = "iot:clean:signal:loss:*";
+
+ /**
+ * 定时检查离岗超时(每 30 秒执行一次)
+ *
+ * 遍历所有离岗记录,检查是否超过超时时间
+ * 如果超过,则触发工单完成
+ */
+ @XxlJob("signalLossCheckJob")
+ public String checkLossTimeout() {
+ try {
+ log.debug("[SignalLoss] 开始检查离岗超时");
+
+ // 1. 扫描所有离岗记录的 Key
+ Set keys = stringRedisTemplate.keys(LOSS_KEY_PATTERN);
+
+ if (keys == null || keys.isEmpty()) {
+ return "暂无";
+ }
+
+ log.debug("[SignalLoss] 发现 {} 条离岗记录", keys.size());
+
+ // 2. 遍历每条记录
+ for (String key : keys) {
+ try {
+ // 解析 deviceId 和 areaId
+ // Key 格式:iot:clean:signal:loss:{deviceId}:{areaId}
+ String[] parts = key.split(":");
+ if (parts.length < 5) {
+ continue;
+ }
+
+ Long deviceId = Long.parseLong(parts[3]);
+ Long areaId = Long.parseLong(parts[4]);
+
+ // 检查超时
+ checkTimeoutForDevice(deviceId, areaId);
+
+ } catch (Exception e) {
+ log.error("[SignalLoss] 处理离岗记录失败:key={}", key, e);
+ }
+ }
+
+ } catch (Exception e) {
+ log.error("[SignalLoss] 检查离岗超时失败", e);
+ return "ERROR: " + e.getMessage();
+ }
+
+ return "SUCCESS";
+ }
+
+ /**
+ * 检查单个设备的离岗超时
+ */
+ private void checkTimeoutForDevice(Long deviceId, Long areaId) {
+ // 1. 获取配置
+ CleanOrderIntegrationConfigService.AreaDeviceConfigWrapper configWrapper =
+ configService.getConfigWrapperByDeviceId(deviceId);
+
+ if (configWrapper == null || configWrapper.getConfig() == null ||
+ configWrapper.getConfig().getBeaconPresence() == null) {
+ log.debug("[SignalLoss] 设备无信标配置:deviceId={}", deviceId);
+ return;
+ }
+
+ BeaconPresenceConfig.ExitConfig exitConfig = configWrapper.getConfig().getBeaconPresence().getExit();
+
+ // 2. 获取首次丢失时间
+ Long firstLossTime = signalLossRedisDAO.getFirstLossTime(deviceId, areaId);
+
+ if (firstLossTime == null) {
+ return;
+ }
+
+ // 3. 获取最后丢失时间
+ Long lastLossTime = signalLossRedisDAO.getLastLossTime(deviceId, areaId);
+
+ if (lastLossTime == null) {
+ return;
+ }
+
+ // 4. 检查是否超时
+ long timeoutMillis = exitConfig.getLossTimeoutMinutes() * 60000L;
+ long elapsedMillis = System.currentTimeMillis() - firstLossTime;
+
+ if (elapsedMillis < timeoutMillis) {
+ log.debug("[SignalLoss] 未超时:deviceId={}, elapsed={}ms, timeout={}ms",
+ deviceId, elapsedMillis, timeoutMillis);
+ return;
+ }
+
+ log.info("[SignalLoss] 检测到离岗超时:deviceId={}, areaId={}, elapsed={}ms",
+ deviceId, areaId, elapsedMillis);
+
+ // 5. 有效性校验:检查作业时长
+ Long arrivedAt = arrivedTimeRedisDAO.getArrivedTime(deviceId, areaId);
+
+ if (arrivedAt == null) {
+ log.warn("[SignalLoss] 未找到到达时间记录:deviceId={}, areaId={}", deviceId, areaId);
+ signalLossRedisDAO.clearLossRecord(deviceId, areaId);
+ return;
+ }
+
+ long durationMs = lastLossTime - arrivedAt;
+ long minValidWorkMillis = exitConfig.getMinValidWorkMinutes() * 60000L;
+
+ // 6. 分支处理:有效 vs 无效作业
+ if (durationMs < minValidWorkMillis) {
+ // 作业时长不足,抑制完成
+ handleInvalidWork(deviceId, configWrapper.getDeviceKey(), areaId,
+ durationMs, minValidWorkMillis, exitConfig);
+ } else {
+ // 作业时长有效,触发完成
+ handleTimeoutComplete(deviceId, configWrapper.getDeviceKey(), areaId,
+ durationMs, lastLossTime);
+ }
+ }
+
+ /**
+ * 处理无效作业(时长不足)
+ */
+ private void handleInvalidWork(Long deviceId, String deviceKey, Long areaId,
+ Long durationMs, Long minValidWorkMillis,
+ BeaconPresenceConfig.ExitConfig exitConfig) {
+ log.warn("[SignalLoss] 作业时长不足,抑制自动完成:deviceId={}, duration={}ms, minRequired={}ms",
+ deviceId, durationMs, minValidWorkMillis);
+
+ // 1. 发送 TTS 警告
+ publishTtsEvent(deviceId, "工单作业时长异常,请回到作业区域继续完成");
+
+ // 2. 发布审计日志
+ Map data = new HashMap<>();
+ data.put("durationMs", durationMs);
+ data.put("minValidWorkMinutes", exitConfig.getMinValidWorkMinutes());
+ data.put("shortageMs", minValidWorkMillis - durationMs);
+
+ publishAuditEvent("COMPLETE_SUPPRESSED_INVALID", deviceId, deviceKey, areaId,
+ "作业时长不足,抑制自动完成", data);
+
+ // 3. 清除丢失记录(允许重新进入)
+ signalLossRedisDAO.clearLossRecord(deviceId, areaId);
+
+ // 4. 清除无效作业标记(允许下次警告)
+ signalLossRedisDAO.markInvalidWorkNotified(deviceId, areaId);
+ }
+
+ /**
+ * 处理超时自动完成
+ */
+ private void handleTimeoutComplete(Long deviceId, String deviceKey, Long areaId,
+ Long durationMs, Long lastLossTime) {
+ log.info("[SignalLoss] 触发自动完成:deviceId={}, areaId={}, duration={}ms",
+ deviceId, areaId, durationMs);
+
+ // 1. 获取当前工单
+ DeviceCurrentOrderRedisDAO.OrderCacheInfo currentOrder =
+ deviceCurrentOrderRedisDAO.getCurrentOrder(deviceId);
+
+ if (currentOrder == null) {
+ log.warn("[SignalLoss] 设备无当前工单:deviceId={}", deviceId);
+ return;
+ }
+
+ // 2. 构建触发数据
+ Map triggerData = new HashMap<>();
+ triggerData.put("durationMs", durationMs);
+ triggerData.put("lastLossTime", lastLossTime);
+ triggerData.put("completionReason", "SIGNAL_LOSS_TIMEOUT");
+
+ // 3. 发布完成事件
+ try {
+ CleanOrderCompleteEvent event = CleanOrderCompleteEvent.builder()
+ .eventId(java.util.UUID.randomUUID().toString())
+ .orderType("CLEAN")
+ .orderId(currentOrder.getOrderId())
+ .deviceId(deviceId)
+ .deviceKey(deviceKey)
+ .areaId(areaId)
+ .triggerSource("IOT_SIGNAL_LOSS")
+ .triggerData(triggerData)
+ .build();
+
+ rocketMQTemplate.syncSend(CleanOrderTopics.ORDER_COMPLETE, MessageBuilder.withPayload(event).build());
+
+ log.info("[SignalLoss] 发布完成事件:eventId={}, orderId={}, duration={}ms",
+ event.getEventId(), currentOrder.getOrderId(), durationMs);
+ } catch (Exception e) {
+ log.error("[SignalLoss] 发布完成事件失败:deviceId={}, orderId={}",
+ deviceId, currentOrder.getOrderId(), e);
+ return;
+ }
+
+ // 4. 发布审计日志
+ Map auditData = new HashMap<>();
+ auditData.put("durationMs", durationMs);
+ auditData.put("lastLossTime", lastLossTime);
+
+ publishAuditEvent("BEACON_COMPLETE_REQUESTED", deviceId, deviceKey, areaId,
+ "信号丢失超时自动完成", auditData);
+
+ // 5. 清理 Redis 数据
+ cleanupRedisData(deviceId, areaId);
+ }
+
+ /**
+ * 清理 Redis 数据
+ */
+ private void cleanupRedisData(Long deviceId, Long areaId) {
+ signalLossRedisDAO.clearLossRecord(deviceId, areaId);
+ arrivedTimeRedisDAO.clearArrivedTime(deviceId, areaId);
+ windowRedisDAO.clearWindow(deviceId, areaId);
+
+ log.debug("[SignalLoss] 清理 Redis 数据:deviceId={}, areaId={}", deviceId, areaId);
+ }
+
+ /**
+ * 发布审计事件
+ */
+ private void publishAuditEvent(String auditType, Long deviceId, String deviceKey,
+ Long areaId, String message, Map data) {
+ try {
+ CleanOrderAuditEvent event = CleanOrderAuditEvent.builder()
+ .eventId(java.util.UUID.randomUUID().toString())
+ .auditType(auditType)
+ .deviceId(deviceId)
+ .deviceKey(deviceKey)
+ .areaId(areaId)
+ .message(message)
+ .data(data)
+ .build();
+
+ rocketMQTemplate.syncSend(CleanOrderTopics.ORDER_AUDIT, MessageBuilder.withPayload(event).build());
+
+ log.debug("[SignalLoss] 发布审计事件:auditType={}, deviceId={}", auditType, deviceId);
+ } catch (Exception e) {
+ log.error("[SignalLoss] 发布审计事件失败:auditType={}, deviceId={}", auditType, deviceId, e);
+ }
+ }
+
+ /**
+ * 发布 TTS 事件
+ */
+ private void publishTtsEvent(Long deviceId, String text) {
+ Map data = new HashMap<>();
+ data.put("tts", text);
+ data.put("timestamp", System.currentTimeMillis());
+
+ publishAuditEvent("TTS_REQUEST", deviceId, null, null, text, data);
+ }
+}
diff --git a/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/service/rule/clean/processor/TrafficThresholdRuleProcessor.java b/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/service/rule/clean/processor/TrafficThresholdRuleProcessor.java
new file mode 100644
index 0000000..354eea2
--- /dev/null
+++ b/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/service/rule/clean/processor/TrafficThresholdRuleProcessor.java
@@ -0,0 +1,187 @@
+package com.viewsh.module.iot.service.rule.clean.processor;
+
+import com.viewsh.framework.common.util.json.JsonUtils;
+import com.viewsh.module.iot.core.integration.constants.CleanOrderTopics;
+import com.viewsh.module.iot.core.integration.event.clean.CleanOrderCreateEvent;
+import com.viewsh.module.iot.dal.dataobject.integration.clean.CleanOrderIntegrationConfig;
+import com.viewsh.module.iot.dal.dataobject.integration.clean.TrafficThresholdConfig;
+import com.viewsh.module.iot.dal.redis.clean.DeviceCurrentOrderRedisDAO;
+import com.viewsh.module.iot.dal.redis.clean.TrafficCounterBaseRedisDAO;
+import com.viewsh.module.iot.service.integration.clean.CleanOrderIntegrationConfigService;
+import jakarta.annotation.Resource;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.rocketmq.spring.core.RocketMQTemplate;
+import org.springframework.messaging.support.MessageBuilder;
+import org.springframework.stereotype.Component;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * 客流阈值规则处理器
+ *
+ * 监听设备属性上报,检测客流计数器是否达到阈值
+ * 如果达到阈值,发布工单创建事件到 Ops 模块
+ *
+ * @author AI
+ */
+@Component
+@Slf4j
+public class TrafficThresholdRuleProcessor {
+
+ @Resource
+ private CleanOrderIntegrationConfigService configService;
+
+ @Resource
+ private TrafficCounterBaseRedisDAO trafficBaseRedisDAO;
+
+ @Resource
+ private DeviceCurrentOrderRedisDAO deviceCurrentOrderRedisDAO;
+
+ @Resource
+ private RocketMQTemplate rocketMQTemplate;
+
+ /**
+ * 处理客流属性上报
+ *
+ * 在设备属性上报处理流程中调用此方法
+ *
+ * @param deviceId 设备ID
+ * @param identifier 属性标识符(如 people_in)
+ * @param propertyValue 属性值
+ */
+ public void processPropertyChange(Long deviceId, String identifier, Object propertyValue) {
+ // 1. 检查是否是客流属性
+ if (!"people_in".equals(identifier)) {
+ return;
+ }
+
+ log.debug("[TrafficThreshold] 收到客流属性:deviceId={}, value={}", deviceId, propertyValue);
+
+ // 2. 获取配置
+ CleanOrderIntegrationConfigService.AreaDeviceConfigWrapper configWrapper =
+ getConfigWrapper(deviceId);
+
+ if (configWrapper == null || configWrapper.getConfig() == null) {
+ log.debug("[TrafficThreshold] 设备无配置:deviceId={}", deviceId);
+ return;
+ }
+
+ TrafficThresholdConfig thresholdConfig = configWrapper.getConfig().getTrafficThreshold();
+ if (thresholdConfig == null || !thresholdConfig.getAutoCreateOrder()) {
+ log.debug("[TrafficThreshold] 未启用客流阈值触发:deviceId={}", deviceId);
+ return;
+ }
+
+ // 3. 解析客流值
+ Long currentCount = parseTrafficCount(propertyValue);
+ if (currentCount == null) {
+ log.warn("[TrafficThreshold] 客流值解析失败:deviceId={}, value={}", deviceId, propertyValue);
+ return;
+ }
+
+ // 4. 计算实际客流(当前值 - 基准值)
+ Long baseValue = trafficBaseRedisDAO.getBaseValue(deviceId);
+ Long actualCount = currentCount - (baseValue != null ? baseValue : 0L);
+
+ // 防止负数(设备重启后计数器归零)
+ if (actualCount < 0) {
+ log.warn("[TrafficThreshold] 检测到负数客流,重置基准值:deviceId={}, currentCount={}, baseValue={}",
+ deviceId, currentCount, baseValue);
+ trafficBaseRedisDAO.setBaseValue(deviceId, currentCount);
+ actualCount = 0L;
+ }
+
+ log.debug("[TrafficThreshold] 客流统计:deviceId={}, currentCount={}, baseValue={}, actualCount={}, threshold={}",
+ deviceId, currentCount, baseValue, actualCount, thresholdConfig.getThreshold());
+
+ // 5. 阈值判定
+ if (actualCount < thresholdConfig.getThreshold()) {
+ return; // 未达标
+ }
+
+ // 6. 防重复检查(使用 Redis 分布式锁)
+ String lockKey = String.format("iot:clean:traffic:lock:%s:%s", deviceId, configWrapper.getAreaId());
+ Boolean locked = stringRedisTemplate.opsForValue()
+ .setIfAbsent(lockKey, "1", thresholdConfig.getTimeWindowSeconds(), java.util.concurrent.TimeUnit.SECONDS);
+
+ if (!locked) {
+ log.info("[TrafficThreshold] 防重复触发:deviceId={}, areaId={}", deviceId, configWrapper.getAreaId());
+ return;
+ }
+
+ // 7. 发布工单创建事件
+ publishCreateEvent(configWrapper, actualCount, baseValue, thresholdConfig.getThreshold());
+ }
+
+ /**
+ * 发布工单创建事件
+ */
+ private void publishCreateEvent(CleanOrderIntegrationConfigService.AreaDeviceConfigWrapper configWrapper,
+ Long actualCount, Long baseValue, Integer threshold) {
+ try {
+ CleanOrderCreateEvent event = CleanOrderCreateEvent.builder()
+ .orderType("CLEAN")
+ .areaId(configWrapper.getAreaId())
+ .triggerSource("IOT_TRAFFIC")
+ .triggerDeviceId(configWrapper.getDeviceId())
+ .triggerDeviceKey(configWrapper.getDeviceKey())
+ .priority(configWrapper.getConfig().getTrafficThreshold().getOrderPriority())
+ .triggerData(buildTriggerData(actualCount, baseValue, threshold))
+ .build();
+
+ rocketMQTemplate.syncSend(CleanOrderTopics.ORDER_CREATE, MessageBuilder.withPayload(event).build());
+
+ log.info("[TrafficThreshold] 发布工单创建事件:eventId={}, areaId={}, actualCount={}, threshold={}",
+ event.getEventId(), configWrapper.getAreaId(), actualCount, threshold);
+ } catch (Exception e) {
+ log.error("[TrafficThreshold] 发布工单创建事件失败:deviceId={}, areaId={}",
+ configWrapper.getDeviceId(), configWrapper.getAreaId(), e);
+ }
+ }
+
+ /**
+ * 构建触发数据
+ */
+ private Map buildTriggerData(Long actualCount, Long baseValue, Integer threshold) {
+ Map data = new HashMap<>();
+ data.put("actualCount", actualCount);
+ data.put("baseValue", baseValue);
+ data.put("threshold", threshold);
+ data.put("exceededCount", actualCount - threshold);
+ return data;
+ }
+
+ /**
+ * 获取配置包装器
+ */
+ private CleanOrderIntegrationConfigService.AreaDeviceConfigWrapper getConfigWrapper(Long deviceId) {
+ return configService.getConfigWrapperByDeviceId(deviceId);
+ }
+
+ /**
+ * 解析客流计数值
+ */
+ private Long parseTrafficCount(Object value) {
+ if (value == null) {
+ return null;
+ }
+
+ if (value instanceof Number) {
+ return ((Number) value).longValue();
+ }
+
+ if (value instanceof String) {
+ try {
+ return Long.parseLong((String) value);
+ } catch (NumberFormatException e) {
+ return null;
+ }
+ }
+
+ return null;
+ }
+
+ @Resource
+ private org.springframework.data.redis.core.StringRedisTemplate stringRedisTemplate;
+}