feat(iot): add-iot-clean-order-integration阶段二-规则处理器

This commit is contained in:
lzh
2026-01-17 15:54:12 +08:00
parent 471cd45162
commit de427b15ab
12 changed files with 1369 additions and 1 deletions

View File

@@ -85,6 +85,12 @@ public class IotDevicePropertyServiceImpl implements IotDevicePropertyService {
@Resource
private IntegrationEventPublisher integrationEventPublisher;
@Resource
private com.viewsh.module.iot.service.rule.clean.processor.TrafficThresholdRuleProcessor trafficThresholdRuleProcessor;
@Resource
private com.viewsh.module.iot.service.rule.clean.processor.BeaconDetectionRuleProcessor beaconDetectionRuleProcessor;
// ========== 设备属性相关操作 ==========
@Override
@@ -171,10 +177,41 @@ public class IotDevicePropertyServiceImpl implements IotDevicePropertyService {
IotDevicePropertyDO.builder().value(entry.getValue()).updateTime(message.getReportTime()).build());
deviceDataRedisDAO.putAll(device.getId(), properties2);
// 2.3 发布属性消息到 Redis Stream供其他模块如 Ops 订阅
// 2.3 调用规则处理器(保洁工单集成
processRuleProcessors(device, properties);
// 2.4 发布属性消息到 Redis Stream供其他模块如 Ops 订阅)
publishPropertyMessage(device, properties, message.getReportTime());
}
/**
* 处理规则处理器(保洁工单集成)
* <p>
* 在设备属性上报处理流程中调用,检测是否满足工单创建/到岗/完成条件
*
* @param device 设备信息
* @param properties 属性数据
*/
private void processRuleProcessors(IotDeviceDO device, Map<String, Object> properties) {
try {
// 遍历所有属性,调用规则处理器
for (Map.Entry<String, Object> entry : properties.entrySet()) {
String identifier = entry.getKey();
Object value = entry.getValue();
// 调用客流阈值规则处理器
trafficThresholdRuleProcessor.processPropertyChange(device.getId(), identifier, value);
// 调用蓝牙信标检测规则处理器
beaconDetectionRuleProcessor.processPropertyChange(device.getId(), identifier, value);
}
} catch (Exception e) {
// 规则处理器异常不应阻塞属性上报主流程
log.error("[processRuleProcessors] 规则处理器调用失败: deviceId={}, properties={}",
device.getId(), properties.keySet(), e);
}
}
/**
* 发布设备属性消息
* <p>

View File

@@ -23,6 +23,16 @@ public interface CleanOrderIntegrationConfigService {
*/
CleanOrderIntegrationConfig getConfigByDeviceId(Long deviceId);
/**
* 根据设备ID查询配置包装器包含完整信息
* <p>
* 返回包含设备ID、区域ID、关联类型等完整信息的配置包装器
*
* @param deviceId 设备ID
* @return 配置包装器,如果不存在或未启用返回 null
*/
AreaDeviceConfigWrapper getConfigWrapperByDeviceId(Long deviceId);
/**
* 根据区域ID查询所有启用的配置带缓存
* <p>

View File

@@ -107,6 +107,19 @@ public class CleanOrderIntegrationConfigServiceImpl implements CleanOrderIntegra
.collect(Collectors.toList());
}
@Override
public AreaDeviceConfigWrapper getConfigWrapperByDeviceId(Long deviceId) {
log.debug("[CleanOrderConfig] 查询设备完整配置deviceId={}", deviceId);
OpsAreaDeviceRelationDO relation = relationMapper.selectByDeviceId(deviceId);
if (relation == null || !relation.getEnabled()) {
return null;
}
return wrapConfig(relation);
}
@Override
public void evictCache(Long deviceId) {
String cacheKey = formatDeviceKey(deviceId);

View File

@@ -0,0 +1,151 @@
package com.viewsh.module.iot.service.rule.clean.detector;
import com.viewsh.module.iot.dal.dataobject.integration.clean.BeaconPresenceConfig;
import lombok.extern.slf4j.Slf4j;
import java.util.List;
/**
* RSSI 滑动窗口检测器
* <p>
* 实现"强进弱出"双阈值算法,解决蓝牙信号漂移和多径效应导致的误判问题
* <p>
* 核心思想:
* 1. 进入(到岗)使用强阈值(如 -70dBm只有信号足够强才算到达避免路过误判
* 2. 退出(离岗)使用弱阈值(如 -85dBm只有信号足够弱或彻底消失才算离开避免边缘抖动
* 3. 引入状态粘性:一旦判定进入,需要满足更严格的退出条件才能判定离开
*
* @author AI
*/
@Slf4j
public class RssiSlidingWindowDetector {
/**
* 区域状态枚举
*/
public enum AreaState {
/**
* 在区域内
*/
IN_AREA,
/**
* 在区域外
*/
OUT_AREA
}
/**
* 检测结果枚举
*/
public enum DetectionResult {
/**
* 确认到达
*/
ARRIVE_CONFIRMED,
/**
* 确认离开
*/
LEAVE_CONFIRMED,
/**
* 无变化
*/
NO_CHANGE
}
/**
* 执行检测(强进弱出算法)
*
* @param window RSSI 滑动窗口样本(从旧到新)
* @param enterConfig 进入(到岗)判定配置 - 强阈值
* @param exitConfig 退出(离岗)判定配置 - 弱阈值
* @param currentState 当前状态
* @return 检测结果
*/
public DetectionResult detect(
List<Integer> window,
BeaconPresenceConfig.EnterConfig enterConfig,
BeaconPresenceConfig.ExitConfig exitConfig,
AreaState currentState) {
if (window == null || window.isEmpty()) {
log.debug("[RssiDetector] 窗口为空,返回无变化");
return DetectionResult.NO_CHANGE;
}
// 1. 统计满足强阈值的样本数(用于进入判定)
long enterHits = window.stream()
.filter(rssi -> rssi >= enterConfig.getRssiThreshold())
.count();
// 2. 统计满足弱阈值的样本数(用于退出判定)
// 条件RSSI < 弱阈值 或 为缺失值(-999
long exitHits = window.stream()
.filter(rssi -> rssi < exitConfig.getWeakRssiThreshold() || rssi == -999)
.count();
log.debug("[RssiDetector] 检测统计currentState={}, windowSize={}, enterHits={}/{}, exitHits={}/{}",
currentState, window.size(), enterHits, enterConfig.getHitCount(),
exitHits, exitConfig.getHitCount());
// 3. 状态转换判定
if (currentState == AreaState.OUT_AREA) {
// 当前在区域外,检测是否进入
if (enterHits >= enterConfig.getHitCount()) {
log.info("[RssiDetector] 进入条件满足enterHits={}, required={}",
enterHits, enterConfig.getHitCount());
return DetectionResult.ARRIVE_CONFIRMED;
}
} else {
// 当前在区域内,检测是否离开
if (exitHits >= exitConfig.getHitCount()) {
log.info("[RssiDetector] 离开条件满足exitHits={}, required={}",
exitHits, exitConfig.getHitCount());
return DetectionResult.LEAVE_CONFIRMED;
}
}
return DetectionResult.NO_CHANGE;
}
/**
* 从蓝牙设备列表中提取目标信标的 RSSI
*
* @param bluetoothDevices 蓝牙设备列表
* 格式:[{"rssi":-52,"type":243,"mac":"F0:C8:60:1D:10:BB"},...]
* @param targetMac 目标信标 MAC 地址
* @return RSSI 值,如果未找到返回 -999缺失值
*/
public Integer extractTargetRssi(Object bluetoothDevices, String targetMac) {
if (bluetoothDevices == null) {
return -999;
}
try {
@SuppressWarnings("unchecked")
List<java.util.Map<String, Object>> deviceList =
(List<java.util.Map<String, Object>>) bluetoothDevices;
return deviceList.stream()
.filter(device -> targetMac.equals(device.get("mac")))
.map(device -> (Integer) device.get("rssi"))
.findFirst()
.orElse(-999);
} catch (Exception e) {
log.error("[RssiDetector] 解析蓝牙数据失败targetMac={}", targetMac, e);
return -999;
}
}
/**
* 判断窗口是否有效
* <p>
* 窗口大小必须 >= 配置的窗口大小
*
* @param window 窗口样本
* @param windowSize 配置的窗口大小
* @return true - 有效false - 无效
*/
public boolean isWindowValid(List<Integer> window, int windowSize) {
return window != null && window.size() >= windowSize;
}
}

View File

@@ -0,0 +1,282 @@
package com.viewsh.module.iot.service.rule.clean.processor;
import com.viewsh.module.iot.core.integration.constants.CleanOrderTopics;
import com.viewsh.module.iot.core.integration.event.clean.CleanOrderArriveEvent;
import com.viewsh.module.iot.core.integration.event.clean.CleanOrderAuditEvent;
import com.viewsh.module.iot.dal.dataobject.integration.clean.BeaconPresenceConfig;
import com.viewsh.module.iot.dal.redis.clean.BeaconArrivedTimeRedisDAO;
import com.viewsh.module.iot.dal.redis.clean.BeaconRssiWindowRedisDAO;
import com.viewsh.module.iot.dal.redis.clean.DeviceCurrentOrderRedisDAO;
import com.viewsh.module.iot.dal.redis.clean.SignalLossRedisDAO;
import com.viewsh.module.iot.service.integration.clean.CleanOrderIntegrationConfigService;
import com.viewsh.module.iot.service.rule.clean.detector.RssiSlidingWindowDetector;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* 蓝牙信标检测规则处理器
* <p>
* 监听工牌的蓝牙属性上报,基于滑动窗口算法检测保洁员到岗/离岗
* 采用"强进弱出"双阈值,避免信号抖动
*
* @author AI
*/
@Component
@Slf4j
public class BeaconDetectionRuleProcessor {
@Resource
private BeaconRssiWindowRedisDAO windowRedisDAO;
@Resource
private BeaconArrivedTimeRedisDAO arrivedTimeRedisDAO;
@Resource
private SignalLossRedisDAO signalLossRedisDAO;
@Resource
private DeviceCurrentOrderRedisDAO deviceCurrentOrderRedisDAO;
@Resource
private CleanOrderIntegrationConfigService configService;
@Resource
private RssiSlidingWindowDetector detector;
@Resource
private RocketMQTemplate rocketMQTemplate;
/**
* 处理蓝牙属性上报
* <p>
* 在设备属性上报处理流程中调用此方法
*
* @param deviceId 设备ID
* @param identifier 属性标识符bluetoothDevices
* @param propertyValue 属性值(蓝牙设备数组)
*/
public void processPropertyChange(Long deviceId, String identifier, Object propertyValue) {
// 1. 检查是否是蓝牙属性
if (!"bluetoothDevices".equals(identifier)) {
return;
}
log.debug("[BeaconDetection] 收到蓝牙属性deviceId={}", deviceId);
// 2. 获取配置
CleanOrderIntegrationConfigService.AreaDeviceConfigWrapper configWrapper =
configService.getConfigWrapperByDeviceId(deviceId);
if (configWrapper == null || configWrapper.getConfig() == null) {
log.debug("[BeaconDetection] 设备无配置deviceId={}", deviceId);
return;
}
BeaconPresenceConfig beaconConfig = configWrapper.getConfig().getBeaconPresence();
if (beaconConfig == null || !beaconConfig.getEnabled()) {
log.debug("[BeaconDetection] 未启用信标检测deviceId={}", deviceId);
return;
}
Long areaId = configWrapper.getAreaId();
// 3. 解析蓝牙数据,提取目标信标的 RSSI
Integer targetRssi = detector.extractTargetRssi(propertyValue, beaconConfig.getBeaconMac());
log.debug("[BeaconDetection] 提取RSSIdeviceId={}, beaconMac={}, rssi={}",
deviceId, beaconConfig.getBeaconMac(), targetRssi);
// 4. 更新滑动窗口
BeaconPresenceConfig.WindowConfig windowConfig = beaconConfig.getWindow();
windowRedisDAO.updateWindow(deviceId, areaId, targetRssi, windowConfig.getSampleTtlSeconds());
// 5. 获取当前窗口样本
List<Integer> window = windowRedisDAO.getWindow(deviceId, areaId);
// 6. 获取设备当前工单状态
DeviceCurrentOrderRedisDAO.OrderCacheInfo currentOrder =
deviceCurrentOrderRedisDAO.getCurrentOrder(deviceId);
// 7. 确定当前状态
RssiSlidingWindowDetector.AreaState currentState = determineState(currentOrder, areaId);
// 8. 执行检测
RssiSlidingWindowDetector.DetectionResult result = detector.detect(
window,
beaconConfig.getEnter(),
beaconConfig.getExit(),
currentState
);
// 9. 处理检测结果
switch (result) {
case ARRIVE_CONFIRMED:
handleArriveConfirmed(deviceId, areaId, window, beaconConfig, configWrapper);
break;
case LEAVE_CONFIRMED:
handleLeaveConfirmed(deviceId, areaId, window, beaconConfig);
break;
default:
// NO_CHANGE不做处理
break;
}
}
/**
* 处理到达确认
*/
private void handleArriveConfirmed(Long deviceId, Long areaId, List<Integer> window,
BeaconPresenceConfig beaconConfig,
CleanOrderIntegrationConfigService.AreaDeviceConfigWrapper configWrapper) {
log.info("[BeaconDetection] 到达确认deviceId={}, areaId={}, window={}",
deviceId, areaId, window);
// 1. 记录到达时间
arrivedTimeRedisDAO.recordArrivedTime(deviceId, areaId, System.currentTimeMillis());
// 2. 清除离岗记录(如果存在)
signalLossRedisDAO.clearLossRecord(deviceId, areaId);
// 3. 获取当前最新的 RSSI 值
Integer currentRssi = window.isEmpty() ? -999 : window.get(window.size() - 1);
// 4. 构建触发数据
Map<String, Object> triggerData = new HashMap<>();
triggerData.put("beaconMac", beaconConfig.getBeaconMac());
triggerData.put("rssi", currentRssi);
triggerData.put("windowSnapshot", window);
triggerData.put("enterRssiThreshold", beaconConfig.getEnter().getRssiThreshold());
// 5. 发布到岗事件
if (beaconConfig.getEnter().getAutoArrival()) {
publishArriveEvent(deviceId, configWrapper.getDeviceKey(), areaId, triggerData);
}
// 6. 发布审计日志
publishAuditEvent("BEACON_ARRIVE_CONFIRMED", deviceId, configWrapper.getDeviceKey(), areaId,
"蓝牙信标自动到岗确认", triggerData);
}
/**
* 处理离开确认
*/
private void handleLeaveConfirmed(Long deviceId, Long areaId, List<Integer> window,
BeaconPresenceConfig beaconConfig) {
log.info("[BeaconDetection] 离开确认deviceId={}, areaId={}, window={}",
deviceId, areaId, window);
BeaconPresenceConfig.ExitConfig exitConfig = beaconConfig.getExit();
// 1. 检查是否是首次丢失
Long firstLossTime = signalLossRedisDAO.getFirstLossTime(deviceId, areaId);
if (firstLossTime == null) {
// 首次丢失
signalLossRedisDAO.recordFirstLoss(deviceId, areaId, System.currentTimeMillis());
// 2. 发送警告
publishTtsEvent(deviceId, "你已离开当前区域," +
(exitConfig.getLossTimeoutMinutes() > 0 ?
exitConfig.getLossTimeoutMinutes() + "分钟内工单将自动结算" : "工单将自动结算"));
// 3. 发布审计日志
Map<String, Object> data = new HashMap<>();
data.put("firstLossTime", System.currentTimeMillis());
data.put("rssi", window.isEmpty() ? -999 : window.get(window.size() - 1));
data.put("warningDelayMinutes", exitConfig.getWarningDelayMinutes());
publishAuditEvent("BEACON_LEAVE_WARNING_SENT", deviceId, null, areaId,
"保洁员离开作业区域,已发送警告", data);
} else {
// 4. 更新最后丢失时间
signalLossRedisDAO.updateLastLossTime(deviceId, areaId, System.currentTimeMillis());
log.debug("[BeaconDetection] 更新最后丢失时间deviceId={}, areaId={}", deviceId, areaId);
}
}
/**
* 发布到岗事件
*/
private void publishArriveEvent(Long deviceId, String deviceKey, Long areaId, Map<String, Object> triggerData) {
try {
CleanOrderArriveEvent event = CleanOrderArriveEvent.builder()
.eventId(java.util.UUID.randomUUID().toString())
.orderType("CLEAN")
.deviceId(deviceId)
.deviceKey(deviceKey)
.areaId(areaId)
.triggerSource("IOT_BEACON")
.triggerData(triggerData)
.build();
rocketMQTemplate.syncSend(CleanOrderTopics.ORDER_ARRIVE, MessageBuilder.withPayload(event).build());
log.info("[BeaconDetection] 发布到岗事件eventId={}, deviceId={}, areaId={}",
event.getEventId(), deviceId, areaId);
} catch (Exception e) {
log.error("[BeaconDetection] 发布到岗事件失败deviceId={}, areaId={}", deviceId, areaId, e);
}
}
/**
* 发布审计事件
*/
private void publishAuditEvent(String auditType, Long deviceId, String deviceKey,
Long areaId, String message, Map<String, Object> data) {
try {
CleanOrderAuditEvent event = CleanOrderAuditEvent.builder()
.eventId(java.util.UUID.randomUUID().toString())
.auditType(auditType)
.deviceId(deviceId)
.deviceKey(deviceKey)
.areaId(areaId)
.message(message)
.data(data)
.build();
rocketMQTemplate.syncSend(CleanOrderTopics.ORDER_AUDIT, MessageBuilder.withPayload(event).build());
log.debug("[BeaconDetection] 发布审计事件auditType={}, deviceId={}, areaId={}",
auditType, deviceId, areaId);
} catch (Exception e) {
log.error("[BeaconDetection] 发布审计事件失败auditType={}, deviceId={}", auditType, deviceId, e);
}
}
/**
* 发布 TTS 事件(通过审计事件传递)
*/
private void publishTtsEvent(Long deviceId, String text) {
Map<String, Object> data = new HashMap<>();
data.put("tts", text);
data.put("timestamp", System.currentTimeMillis());
publishAuditEvent("TTS_REQUEST", deviceId, null, null, text, data);
}
/**
* 确定当前状态
*/
private RssiSlidingWindowDetector.AreaState determineState(
DeviceCurrentOrderRedisDAO.OrderCacheInfo currentOrder, Long areaId) {
if (currentOrder == null) {
return RssiSlidingWindowDetector.AreaState.OUT_AREA;
}
// 检查工单状态和区域是否匹配
if ("ARRIVED".equals(currentOrder.getStatus()) && areaId.equals(currentOrder.getAreaId())) {
return RssiSlidingWindowDetector.AreaState.IN_AREA;
}
return RssiSlidingWindowDetector.AreaState.OUT_AREA;
}
}

View File

@@ -0,0 +1,309 @@
package com.viewsh.module.iot.service.rule.clean.processor;
import com.viewsh.module.iot.core.integration.constants.CleanOrderTopics;
import com.viewsh.module.iot.core.integration.event.clean.CleanOrderAuditEvent;
import com.viewsh.module.iot.core.integration.event.clean.CleanOrderCompleteEvent;
import com.viewsh.module.iot.dal.dataobject.integration.clean.BeaconPresenceConfig;
import com.viewsh.module.iot.dal.redis.clean.BeaconArrivedTimeRedisDAO;
import com.viewsh.module.iot.dal.redis.clean.BeaconRssiWindowRedisDAO;
import com.viewsh.module.iot.dal.redis.clean.DeviceCurrentOrderRedisDAO;
import com.viewsh.module.iot.dal.redis.clean.SignalLossRedisDAO;
import com.viewsh.module.iot.service.integration.clean.CleanOrderIntegrationConfigService;
import com.xxl.job.core.handler.annotation.XxlJob;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import org.springframework.data.redis.core.StringRedisTemplate;
/**
* 信号丢失规则处理器
* <p>
* 定时检查离岗后N分钟则触发工单自动完成
* 包含作业时长有效性校验,防止"打卡即走"作弊
*
* @author AI
*/
@Component
@Slf4j
public class SignalLossRuleProcessor {
@Resource
private SignalLossRedisDAO signalLossRedisDAO;
@Resource
private BeaconArrivedTimeRedisDAO arrivedTimeRedisDAO;
@Resource
private BeaconRssiWindowRedisDAO windowRedisDAO;
@Resource
private DeviceCurrentOrderRedisDAO deviceCurrentOrderRedisDAO;
@Resource
private CleanOrderIntegrationConfigService configService;
@Resource
private RocketMQTemplate rocketMQTemplate;
@Resource
private StringRedisTemplate stringRedisTemplate;
/**
* Redis Key 模式:扫描所有离岗记录
*/
private static final String LOSS_KEY_PATTERN = "iot:clean:signal:loss:*";
/**
* 定时检查离岗超时(每 30 秒执行一次)
* <p>
* 遍历所有离岗记录,检查是否超过超时时间
* 如果超过,则触发工单完成
*/
@XxlJob("signalLossCheckJob")
public String checkLossTimeout() {
try {
log.debug("[SignalLoss] 开始检查离岗超时");
// 1. 扫描所有离岗记录的 Key
Set<String> keys = stringRedisTemplate.keys(LOSS_KEY_PATTERN);
if (keys == null || keys.isEmpty()) {
return "暂无";
}
log.debug("[SignalLoss] 发现 {} 条离岗记录", keys.size());
// 2. 遍历每条记录
for (String key : keys) {
try {
// 解析 deviceId 和 areaId
// Key 格式iot:clean:signal:loss:{deviceId}:{areaId}
String[] parts = key.split(":");
if (parts.length < 5) {
continue;
}
Long deviceId = Long.parseLong(parts[3]);
Long areaId = Long.parseLong(parts[4]);
// 检查超时
checkTimeoutForDevice(deviceId, areaId);
} catch (Exception e) {
log.error("[SignalLoss] 处理离岗记录失败key={}", key, e);
}
}
} catch (Exception e) {
log.error("[SignalLoss] 检查离岗超时失败", e);
return "ERROR: " + e.getMessage();
}
return "SUCCESS";
}
/**
* 检查单个设备的离岗超时
*/
private void checkTimeoutForDevice(Long deviceId, Long areaId) {
// 1. 获取配置
CleanOrderIntegrationConfigService.AreaDeviceConfigWrapper configWrapper =
configService.getConfigWrapperByDeviceId(deviceId);
if (configWrapper == null || configWrapper.getConfig() == null ||
configWrapper.getConfig().getBeaconPresence() == null) {
log.debug("[SignalLoss] 设备无信标配置deviceId={}", deviceId);
return;
}
BeaconPresenceConfig.ExitConfig exitConfig = configWrapper.getConfig().getBeaconPresence().getExit();
// 2. 获取首次丢失时间
Long firstLossTime = signalLossRedisDAO.getFirstLossTime(deviceId, areaId);
if (firstLossTime == null) {
return;
}
// 3. 获取最后丢失时间
Long lastLossTime = signalLossRedisDAO.getLastLossTime(deviceId, areaId);
if (lastLossTime == null) {
return;
}
// 4. 检查是否超时
long timeoutMillis = exitConfig.getLossTimeoutMinutes() * 60000L;
long elapsedMillis = System.currentTimeMillis() - firstLossTime;
if (elapsedMillis < timeoutMillis) {
log.debug("[SignalLoss] 未超时deviceId={}, elapsed={}ms, timeout={}ms",
deviceId, elapsedMillis, timeoutMillis);
return;
}
log.info("[SignalLoss] 检测到离岗超时deviceId={}, areaId={}, elapsed={}ms",
deviceId, areaId, elapsedMillis);
// 5. 有效性校验:检查作业时长
Long arrivedAt = arrivedTimeRedisDAO.getArrivedTime(deviceId, areaId);
if (arrivedAt == null) {
log.warn("[SignalLoss] 未找到到达时间记录deviceId={}, areaId={}", deviceId, areaId);
signalLossRedisDAO.clearLossRecord(deviceId, areaId);
return;
}
long durationMs = lastLossTime - arrivedAt;
long minValidWorkMillis = exitConfig.getMinValidWorkMinutes() * 60000L;
// 6. 分支处理:有效 vs 无效作业
if (durationMs < minValidWorkMillis) {
// 作业时长不足,抑制完成
handleInvalidWork(deviceId, configWrapper.getDeviceKey(), areaId,
durationMs, minValidWorkMillis, exitConfig);
} else {
// 作业时长有效,触发完成
handleTimeoutComplete(deviceId, configWrapper.getDeviceKey(), areaId,
durationMs, lastLossTime);
}
}
/**
* 处理无效作业(时长不足)
*/
private void handleInvalidWork(Long deviceId, String deviceKey, Long areaId,
Long durationMs, Long minValidWorkMillis,
BeaconPresenceConfig.ExitConfig exitConfig) {
log.warn("[SignalLoss] 作业时长不足抑制自动完成deviceId={}, duration={}ms, minRequired={}ms",
deviceId, durationMs, minValidWorkMillis);
// 1. 发送 TTS 警告
publishTtsEvent(deviceId, "工单作业时长异常,请回到作业区域继续完成");
// 2. 发布审计日志
Map<String, Object> data = new HashMap<>();
data.put("durationMs", durationMs);
data.put("minValidWorkMinutes", exitConfig.getMinValidWorkMinutes());
data.put("shortageMs", minValidWorkMillis - durationMs);
publishAuditEvent("COMPLETE_SUPPRESSED_INVALID", deviceId, deviceKey, areaId,
"作业时长不足,抑制自动完成", data);
// 3. 清除丢失记录(允许重新进入)
signalLossRedisDAO.clearLossRecord(deviceId, areaId);
// 4. 清除无效作业标记(允许下次警告)
signalLossRedisDAO.markInvalidWorkNotified(deviceId, areaId);
}
/**
* 处理超时自动完成
*/
private void handleTimeoutComplete(Long deviceId, String deviceKey, Long areaId,
Long durationMs, Long lastLossTime) {
log.info("[SignalLoss] 触发自动完成deviceId={}, areaId={}, duration={}ms",
deviceId, areaId, durationMs);
// 1. 获取当前工单
DeviceCurrentOrderRedisDAO.OrderCacheInfo currentOrder =
deviceCurrentOrderRedisDAO.getCurrentOrder(deviceId);
if (currentOrder == null) {
log.warn("[SignalLoss] 设备无当前工单deviceId={}", deviceId);
return;
}
// 2. 构建触发数据
Map<String, Object> triggerData = new HashMap<>();
triggerData.put("durationMs", durationMs);
triggerData.put("lastLossTime", lastLossTime);
triggerData.put("completionReason", "SIGNAL_LOSS_TIMEOUT");
// 3. 发布完成事件
try {
CleanOrderCompleteEvent event = CleanOrderCompleteEvent.builder()
.eventId(java.util.UUID.randomUUID().toString())
.orderType("CLEAN")
.orderId(currentOrder.getOrderId())
.deviceId(deviceId)
.deviceKey(deviceKey)
.areaId(areaId)
.triggerSource("IOT_SIGNAL_LOSS")
.triggerData(triggerData)
.build();
rocketMQTemplate.syncSend(CleanOrderTopics.ORDER_COMPLETE, MessageBuilder.withPayload(event).build());
log.info("[SignalLoss] 发布完成事件eventId={}, orderId={}, duration={}ms",
event.getEventId(), currentOrder.getOrderId(), durationMs);
} catch (Exception e) {
log.error("[SignalLoss] 发布完成事件失败deviceId={}, orderId={}",
deviceId, currentOrder.getOrderId(), e);
return;
}
// 4. 发布审计日志
Map<String, Object> auditData = new HashMap<>();
auditData.put("durationMs", durationMs);
auditData.put("lastLossTime", lastLossTime);
publishAuditEvent("BEACON_COMPLETE_REQUESTED", deviceId, deviceKey, areaId,
"信号丢失超时自动完成", auditData);
// 5. 清理 Redis 数据
cleanupRedisData(deviceId, areaId);
}
/**
* 清理 Redis 数据
*/
private void cleanupRedisData(Long deviceId, Long areaId) {
signalLossRedisDAO.clearLossRecord(deviceId, areaId);
arrivedTimeRedisDAO.clearArrivedTime(deviceId, areaId);
windowRedisDAO.clearWindow(deviceId, areaId);
log.debug("[SignalLoss] 清理 Redis 数据deviceId={}, areaId={}", deviceId, areaId);
}
/**
* 发布审计事件
*/
private void publishAuditEvent(String auditType, Long deviceId, String deviceKey,
Long areaId, String message, Map<String, Object> data) {
try {
CleanOrderAuditEvent event = CleanOrderAuditEvent.builder()
.eventId(java.util.UUID.randomUUID().toString())
.auditType(auditType)
.deviceId(deviceId)
.deviceKey(deviceKey)
.areaId(areaId)
.message(message)
.data(data)
.build();
rocketMQTemplate.syncSend(CleanOrderTopics.ORDER_AUDIT, MessageBuilder.withPayload(event).build());
log.debug("[SignalLoss] 发布审计事件auditType={}, deviceId={}", auditType, deviceId);
} catch (Exception e) {
log.error("[SignalLoss] 发布审计事件失败auditType={}, deviceId={}", auditType, deviceId, e);
}
}
/**
* 发布 TTS 事件
*/
private void publishTtsEvent(Long deviceId, String text) {
Map<String, Object> data = new HashMap<>();
data.put("tts", text);
data.put("timestamp", System.currentTimeMillis());
publishAuditEvent("TTS_REQUEST", deviceId, null, null, text, data);
}
}

View File

@@ -0,0 +1,187 @@
package com.viewsh.module.iot.service.rule.clean.processor;
import com.viewsh.framework.common.util.json.JsonUtils;
import com.viewsh.module.iot.core.integration.constants.CleanOrderTopics;
import com.viewsh.module.iot.core.integration.event.clean.CleanOrderCreateEvent;
import com.viewsh.module.iot.dal.dataobject.integration.clean.CleanOrderIntegrationConfig;
import com.viewsh.module.iot.dal.dataobject.integration.clean.TrafficThresholdConfig;
import com.viewsh.module.iot.dal.redis.clean.DeviceCurrentOrderRedisDAO;
import com.viewsh.module.iot.dal.redis.clean.TrafficCounterBaseRedisDAO;
import com.viewsh.module.iot.service.integration.clean.CleanOrderIntegrationConfigService;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.Map;
/**
* 客流阈值规则处理器
* <p>
* 监听设备属性上报,检测客流计数器是否达到阈值
* 如果达到阈值,发布工单创建事件到 Ops 模块
*
* @author AI
*/
@Component
@Slf4j
public class TrafficThresholdRuleProcessor {
@Resource
private CleanOrderIntegrationConfigService configService;
@Resource
private TrafficCounterBaseRedisDAO trafficBaseRedisDAO;
@Resource
private DeviceCurrentOrderRedisDAO deviceCurrentOrderRedisDAO;
@Resource
private RocketMQTemplate rocketMQTemplate;
/**
* 处理客流属性上报
* <p>
* 在设备属性上报处理流程中调用此方法
*
* @param deviceId 设备ID
* @param identifier 属性标识符(如 people_in
* @param propertyValue 属性值
*/
public void processPropertyChange(Long deviceId, String identifier, Object propertyValue) {
// 1. 检查是否是客流属性
if (!"people_in".equals(identifier)) {
return;
}
log.debug("[TrafficThreshold] 收到客流属性deviceId={}, value={}", deviceId, propertyValue);
// 2. 获取配置
CleanOrderIntegrationConfigService.AreaDeviceConfigWrapper configWrapper =
getConfigWrapper(deviceId);
if (configWrapper == null || configWrapper.getConfig() == null) {
log.debug("[TrafficThreshold] 设备无配置deviceId={}", deviceId);
return;
}
TrafficThresholdConfig thresholdConfig = configWrapper.getConfig().getTrafficThreshold();
if (thresholdConfig == null || !thresholdConfig.getAutoCreateOrder()) {
log.debug("[TrafficThreshold] 未启用客流阈值触发deviceId={}", deviceId);
return;
}
// 3. 解析客流值
Long currentCount = parseTrafficCount(propertyValue);
if (currentCount == null) {
log.warn("[TrafficThreshold] 客流值解析失败deviceId={}, value={}", deviceId, propertyValue);
return;
}
// 4. 计算实际客流(当前值 - 基准值)
Long baseValue = trafficBaseRedisDAO.getBaseValue(deviceId);
Long actualCount = currentCount - (baseValue != null ? baseValue : 0L);
// 防止负数(设备重启后计数器归零)
if (actualCount < 0) {
log.warn("[TrafficThreshold] 检测到负数客流重置基准值deviceId={}, currentCount={}, baseValue={}",
deviceId, currentCount, baseValue);
trafficBaseRedisDAO.setBaseValue(deviceId, currentCount);
actualCount = 0L;
}
log.debug("[TrafficThreshold] 客流统计deviceId={}, currentCount={}, baseValue={}, actualCount={}, threshold={}",
deviceId, currentCount, baseValue, actualCount, thresholdConfig.getThreshold());
// 5. 阈值判定
if (actualCount < thresholdConfig.getThreshold()) {
return; // 未达标
}
// 6. 防重复检查(使用 Redis 分布式锁)
String lockKey = String.format("iot:clean:traffic:lock:%s:%s", deviceId, configWrapper.getAreaId());
Boolean locked = stringRedisTemplate.opsForValue()
.setIfAbsent(lockKey, "1", thresholdConfig.getTimeWindowSeconds(), java.util.concurrent.TimeUnit.SECONDS);
if (!locked) {
log.info("[TrafficThreshold] 防重复触发deviceId={}, areaId={}", deviceId, configWrapper.getAreaId());
return;
}
// 7. 发布工单创建事件
publishCreateEvent(configWrapper, actualCount, baseValue, thresholdConfig.getThreshold());
}
/**
* 发布工单创建事件
*/
private void publishCreateEvent(CleanOrderIntegrationConfigService.AreaDeviceConfigWrapper configWrapper,
Long actualCount, Long baseValue, Integer threshold) {
try {
CleanOrderCreateEvent event = CleanOrderCreateEvent.builder()
.orderType("CLEAN")
.areaId(configWrapper.getAreaId())
.triggerSource("IOT_TRAFFIC")
.triggerDeviceId(configWrapper.getDeviceId())
.triggerDeviceKey(configWrapper.getDeviceKey())
.priority(configWrapper.getConfig().getTrafficThreshold().getOrderPriority())
.triggerData(buildTriggerData(actualCount, baseValue, threshold))
.build();
rocketMQTemplate.syncSend(CleanOrderTopics.ORDER_CREATE, MessageBuilder.withPayload(event).build());
log.info("[TrafficThreshold] 发布工单创建事件eventId={}, areaId={}, actualCount={}, threshold={}",
event.getEventId(), configWrapper.getAreaId(), actualCount, threshold);
} catch (Exception e) {
log.error("[TrafficThreshold] 发布工单创建事件失败deviceId={}, areaId={}",
configWrapper.getDeviceId(), configWrapper.getAreaId(), e);
}
}
/**
* 构建触发数据
*/
private Map<String, Object> buildTriggerData(Long actualCount, Long baseValue, Integer threshold) {
Map<String, Object> data = new HashMap<>();
data.put("actualCount", actualCount);
data.put("baseValue", baseValue);
data.put("threshold", threshold);
data.put("exceededCount", actualCount - threshold);
return data;
}
/**
* 获取配置包装器
*/
private CleanOrderIntegrationConfigService.AreaDeviceConfigWrapper getConfigWrapper(Long deviceId) {
return configService.getConfigWrapperByDeviceId(deviceId);
}
/**
* 解析客流计数值
*/
private Long parseTrafficCount(Object value) {
if (value == null) {
return null;
}
if (value instanceof Number) {
return ((Number) value).longValue();
}
if (value instanceof String) {
try {
return Long.parseLong((String) value);
} catch (NumberFormatException e) {
return null;
}
}
return null;
}
@Resource
private org.springframework.data.redis.core.StringRedisTemplate stringRedisTemplate;
}