fix(iot): 传递集成事件项目ID
This commit is contained in:
File diff suppressed because it is too large
Load Diff
@@ -1,376 +1,379 @@
|
||||
package com.viewsh.module.iot.service.rule.clean.processor;
|
||||
|
||||
import com.viewsh.framework.tenant.core.context.TenantContextHolder;
|
||||
import com.viewsh.module.iot.core.integration.constants.CleanOrderTopics;
|
||||
import com.viewsh.module.iot.core.integration.event.clean.CleanOrderArriveEvent;
|
||||
import com.viewsh.module.iot.core.integration.event.clean.CleanOrderAuditEvent;
|
||||
import com.viewsh.module.iot.dal.dataobject.integration.clean.BeaconPresenceConfig;
|
||||
import com.viewsh.module.iot.dal.redis.clean.BadgeDeviceStatusRedisDAO;
|
||||
import com.viewsh.module.iot.dal.redis.clean.BeaconArrivedTimeRedisDAO;
|
||||
import com.viewsh.module.iot.dal.redis.clean.BeaconRssiWindowRedisDAO;
|
||||
import com.viewsh.module.iot.dal.redis.clean.SignalLossRedisDAO;
|
||||
import com.viewsh.module.iot.service.integration.clean.CleanOrderIntegrationConfigService;
|
||||
import com.viewsh.module.iot.service.rule.clean.detector.RssiSlidingWindowDetector;
|
||||
import jakarta.annotation.Resource;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.rocketmq.spring.core.RocketMQTemplate;
|
||||
import org.springframework.messaging.support.MessageBuilder;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
/**
|
||||
* 蓝牙信标检测规则处理器
|
||||
* <p>
|
||||
* 监听工牌的蓝牙属性上报,基于滑动窗口算法检测保洁员到岗/离岗
|
||||
* 采用"强进弱出"双阈值,避免信号抖动
|
||||
*
|
||||
* @author AI
|
||||
*/
|
||||
@Component
|
||||
@Slf4j
|
||||
public class BeaconDetectionRuleProcessor {
|
||||
|
||||
@Resource
|
||||
private BeaconRssiWindowRedisDAO windowRedisDAO;
|
||||
|
||||
@Resource
|
||||
private BeaconArrivedTimeRedisDAO arrivedTimeRedisDAO;
|
||||
|
||||
@Resource
|
||||
private SignalLossRedisDAO signalLossRedisDAO;
|
||||
|
||||
@Resource
|
||||
private BadgeDeviceStatusRedisDAO badgeDeviceStatusRedisDAO;
|
||||
|
||||
@Resource
|
||||
private CleanOrderIntegrationConfigService configService;
|
||||
|
||||
@Resource
|
||||
private RssiSlidingWindowDetector detector;
|
||||
|
||||
@Resource
|
||||
private RocketMQTemplate rocketMQTemplate;
|
||||
|
||||
/**
|
||||
* 上次检测的工单ID缓存(设备ID -> 工单ID)
|
||||
* 用于检测工单切换,清理旧工单的检测状态
|
||||
*/
|
||||
private final Map<Long, Long> lastDetectedOrderCache = new ConcurrentHashMap<>();
|
||||
|
||||
/**
|
||||
* 处理蓝牙属性上报
|
||||
* <p>
|
||||
* 在设备属性上报处理流程中调用此方法
|
||||
*
|
||||
* @param deviceId 设备ID
|
||||
* @param identifier 属性标识符(bluetoothDevices)
|
||||
* @param propertyValue 属性值(蓝牙设备数组)
|
||||
*/
|
||||
public void processPropertyChange(Long deviceId, String identifier, Object propertyValue) {
|
||||
// 1. 检查是否是蓝牙属性
|
||||
if (!"bluetoothDevices".equals(identifier)) {
|
||||
return;
|
||||
}
|
||||
|
||||
log.debug("[BeaconDetection] 收到蓝牙属性:deviceId={}", deviceId);
|
||||
|
||||
// 2. 先获取当前工单状态(从中获取正确的 areaId)
|
||||
BadgeDeviceStatusRedisDAO.OrderInfo currentOrder = badgeDeviceStatusRedisDAO.getCurrentOrder(deviceId);
|
||||
|
||||
if (currentOrder == null || currentOrder.getAreaId() == null) {
|
||||
log.debug("[BeaconDetection] 无当前工单,跳过检测:deviceId={}", deviceId);
|
||||
// 无工单时清理本地缓存
|
||||
lastDetectedOrderCache.remove(deviceId);
|
||||
return;
|
||||
}
|
||||
|
||||
Long areaId = currentOrder.getAreaId();
|
||||
Long orderId = currentOrder.getOrderId();
|
||||
|
||||
// 3. 检测工单切换,清理旧工单的检测状态
|
||||
Long lastOrderId = lastDetectedOrderCache.get(deviceId);
|
||||
if (lastOrderId != null && !lastOrderId.equals(orderId)) {
|
||||
log.warn("[BeaconDetection] 检测到工单切换,清理旧工单的检测状态: " +
|
||||
"deviceId={}, oldOrderId={}, newOrderId={}", deviceId, lastOrderId, orderId);
|
||||
// 清理旧的检测状态(清理当前设备的所有区域检测状态)
|
||||
cleanupAllDetectionState(deviceId);
|
||||
}
|
||||
// 更新缓存
|
||||
lastDetectedOrderCache.put(deviceId, orderId);
|
||||
|
||||
log.debug("[BeaconDetection] 从工单状态获取区域:deviceId={}, areaId={}, orderId={}",
|
||||
deviceId, areaId, orderId);
|
||||
|
||||
// 3. 获取该区域的信标配置(从 BEACON 类型的设备获取)
|
||||
CleanOrderIntegrationConfigService.AreaDeviceConfigWrapper beaconConfigWrapper = configService
|
||||
.getConfigByAreaIdAndRelationType(areaId, "BEACON");
|
||||
|
||||
if (beaconConfigWrapper == null || beaconConfigWrapper.getConfig() == null) {
|
||||
log.debug("[BeaconDetection] 区域无信标配置:areaId={}", areaId);
|
||||
return;
|
||||
}
|
||||
|
||||
BeaconPresenceConfig beaconConfig = beaconConfigWrapper.getConfig().getBeaconPresence();
|
||||
if (beaconConfig == null || !beaconConfig.getEnabled()) {
|
||||
log.debug("[BeaconDetection] 未启用信标检测:areaId={}", areaId);
|
||||
return;
|
||||
}
|
||||
|
||||
// 4. 解析蓝牙数据,提取目标信标的 RSSI
|
||||
Integer targetRssi = detector.extractTargetRssi(propertyValue, beaconConfig);
|
||||
|
||||
log.debug("[BeaconDetection] 提取RSSI:deviceId={}, areaId={}, beaconMac={}, rssi={}",
|
||||
deviceId, areaId, beaconConfig.getBeaconMac(), targetRssi);
|
||||
|
||||
// 5. 更新滑动窗口(使用 enter 和 exit 中较大的窗口大小)
|
||||
int maxWindowSize = Math.max(
|
||||
beaconConfig.getEnter().getWindowSize(),
|
||||
beaconConfig.getExit().getWindowSize());
|
||||
windowRedisDAO.updateWindow(deviceId, areaId, targetRssi, maxWindowSize);
|
||||
|
||||
// 6. 获取当前窗口样本
|
||||
List<Integer> window = windowRedisDAO.getWindow(deviceId, areaId);
|
||||
|
||||
// 7. 确定当前状态
|
||||
RssiSlidingWindowDetector.AreaState currentState = determineState(currentOrder, areaId, deviceId);
|
||||
|
||||
// 8. 执行检测
|
||||
RssiSlidingWindowDetector.DetectionResult result = detector.detect(
|
||||
window,
|
||||
beaconConfig.getEnter(),
|
||||
beaconConfig.getExit(),
|
||||
currentState);
|
||||
|
||||
// 9. 处理检测结果
|
||||
switch (result) {
|
||||
case ARRIVE_CONFIRMED:
|
||||
handleArriveConfirmed(deviceId, areaId, window, beaconConfig, currentOrder);
|
||||
break;
|
||||
case LEAVE_CONFIRMED:
|
||||
handleLeaveConfirmed(deviceId, areaId, window, beaconConfig);
|
||||
break;
|
||||
default:
|
||||
// NO_CHANGE,不做处理
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 处理到达确认
|
||||
*/
|
||||
private void handleArriveConfirmed(Long deviceId, Long areaId, List<Integer> window,
|
||||
BeaconPresenceConfig beaconConfig,
|
||||
BadgeDeviceStatusRedisDAO.OrderInfo currentOrder) {
|
||||
log.info("[BeaconDetection] 到达确认:deviceId={}, areaId={}, window={}",
|
||||
deviceId, areaId, window);
|
||||
|
||||
// 1. 记录到达时间
|
||||
arrivedTimeRedisDAO.recordArrivedTime(deviceId, areaId, System.currentTimeMillis());
|
||||
|
||||
// 2. 清除离岗记录(如果存在)
|
||||
signalLossRedisDAO.clearLossRecord(deviceId, areaId);
|
||||
|
||||
// 3. 清理 RSSI 窗口(避免历史脏数据影响新的在岗周期)
|
||||
windowRedisDAO.clearWindow(deviceId, areaId);
|
||||
log.debug("[BeaconDetection] 到岗时清理RSSI窗口:deviceId={}, areaId={}", deviceId, areaId);
|
||||
|
||||
// 4. 获取当前最新的 RSSI 值(使用原窗口快照,因为已清理)
|
||||
Integer currentRssi = window.isEmpty() ? -999 : window.get(window.size() - 1);
|
||||
|
||||
// 5. 构建触发数据
|
||||
Map<String, Object> triggerData = new HashMap<>();
|
||||
triggerData.put("beaconMac", beaconConfig.getBeaconMac());
|
||||
triggerData.put("rssi", currentRssi);
|
||||
triggerData.put("windowSnapshot", window);
|
||||
triggerData.put("enterRssiThreshold", beaconConfig.getEnter().getRssiThreshold());
|
||||
|
||||
// 6. 发布到岗事件
|
||||
if (beaconConfig.getEnter().getAutoArrival()) {
|
||||
publishArriveEvent(deviceId, currentOrder.getOrderId(), areaId, triggerData);
|
||||
}
|
||||
|
||||
// 7. 发布审计日志
|
||||
publishAuditEvent("BEACON_ARRIVE_CONFIRMED", deviceId, null, areaId, currentOrder.getOrderId(),
|
||||
"蓝牙信标自动到岗确认", triggerData);
|
||||
}
|
||||
|
||||
/**
|
||||
* 处理离开确认
|
||||
*/
|
||||
private void handleLeaveConfirmed(Long deviceId, Long areaId, List<Integer> window,
|
||||
BeaconPresenceConfig beaconConfig) {
|
||||
log.info("[BeaconDetection] 离开确认:deviceId={}, areaId={}, window={}",
|
||||
deviceId, areaId, window);
|
||||
|
||||
// 注意:离岗警告阶段不清除arrivedTime,保持IN_AREA状态
|
||||
// arrivedTime在工单完成时由SignalLossRuleProcessor.cleanupRedisData清除
|
||||
|
||||
// P0 插队校验:检查当前工单是否属于正在检查的区域
|
||||
if (isSwitchingOrder(deviceId, areaId)) {
|
||||
log.debug("[BeaconDetection][P0Interrupt] 检测到工单切换,跳过区域 {} 的离岗处理",
|
||||
areaId);
|
||||
// 清理该区域的离岗记录(避免内存泄漏)
|
||||
signalLossRedisDAO.clearLossRecord(deviceId, areaId);
|
||||
return;
|
||||
}
|
||||
|
||||
BeaconPresenceConfig.ExitConfig exitConfig = beaconConfig.getExit();
|
||||
|
||||
// 1. 检查是否是首次丢失
|
||||
Long firstLossTime = signalLossRedisDAO.getFirstLossTime(deviceId, areaId);
|
||||
|
||||
if (firstLossTime == null) {
|
||||
// 首次丢失
|
||||
signalLossRedisDAO.recordFirstLoss(deviceId, areaId, System.currentTimeMillis());
|
||||
|
||||
// 2. 发布审计日志
|
||||
Map<String, Object> data = new HashMap<>();
|
||||
data.put("firstLossTime", System.currentTimeMillis());
|
||||
data.put("rssi", window.isEmpty() ? -999 : window.get(window.size() - 1));
|
||||
data.put("warningDelayMinutes", exitConfig.getWarningDelayMinutes());
|
||||
|
||||
// 获取当前工单ID
|
||||
BadgeDeviceStatusRedisDAO.OrderInfo currentOrder = badgeDeviceStatusRedisDAO.getCurrentOrder(deviceId);
|
||||
Long orderId = currentOrder != null ? currentOrder.getOrderId() : null;
|
||||
|
||||
publishAuditEvent("BEACON_LEAVE_WARNING_SENT", deviceId, null, areaId, orderId,
|
||||
"保洁员离开作业区域,已发送警告", data);
|
||||
} else {
|
||||
// 4. 更新最后丢失时间
|
||||
signalLossRedisDAO.updateLastLossTime(deviceId, areaId, System.currentTimeMillis());
|
||||
|
||||
log.debug("[BeaconDetection] 更新最后丢失时间:deviceId={}, areaId={}", deviceId, areaId);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 发布到岗事件
|
||||
*/
|
||||
private void publishArriveEvent(Long deviceId, Long orderId, Long areaId, Map<String, Object> triggerData) {
|
||||
try {
|
||||
CleanOrderArriveEvent event = CleanOrderArriveEvent.builder()
|
||||
.tenantId(TenantContextHolder.getTenantId())
|
||||
.eventId(java.util.UUID.randomUUID().toString())
|
||||
.orderType("CLEAN")
|
||||
.orderId(orderId)
|
||||
.deviceId(deviceId)
|
||||
.areaId(areaId)
|
||||
.triggerSource("IOT_BEACON")
|
||||
.triggerData(triggerData)
|
||||
.build();
|
||||
|
||||
rocketMQTemplate.syncSend(CleanOrderTopics.ORDER_ARRIVE, MessageBuilder.withPayload(event).build());
|
||||
|
||||
log.info("[BeaconDetection] 发布到岗事件:eventId={}, deviceId={}, areaId={}, orderId={}",
|
||||
event.getEventId(), deviceId, areaId, orderId);
|
||||
} catch (Exception e) {
|
||||
log.error("[BeaconDetection] 发布到岗事件失败:deviceId={}, areaId={}", deviceId, areaId, e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 发布审计事件
|
||||
*/
|
||||
private void publishAuditEvent(String auditType, Long deviceId, String deviceKey,
|
||||
Long areaId, Long orderId, String message, Map<String, Object> data) {
|
||||
try {
|
||||
CleanOrderAuditEvent event = CleanOrderAuditEvent.builder()
|
||||
.tenantId(TenantContextHolder.getTenantId())
|
||||
.eventId(java.util.UUID.randomUUID().toString())
|
||||
.auditType(auditType)
|
||||
.deviceId(deviceId)
|
||||
.deviceKey(deviceKey)
|
||||
.areaId(areaId)
|
||||
.orderId(orderId)
|
||||
.message(message)
|
||||
.data(data)
|
||||
.build();
|
||||
|
||||
rocketMQTemplate.syncSend(CleanOrderTopics.ORDER_AUDIT, MessageBuilder.withPayload(event).build());
|
||||
|
||||
log.debug("[BeaconDetection] 发布审计事件:auditType={}, deviceId={}, areaId={}, orderId={}",
|
||||
auditType, deviceId, areaId, orderId);
|
||||
} catch (Exception e) {
|
||||
log.error("[BeaconDetection] 发布审计事件失败:auditType={}, deviceId={}", auditType, deviceId, e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 发布 TTS 事件(通过审计事件传递)
|
||||
*/
|
||||
private void publishTtsEvent(Long deviceId, String text) {
|
||||
Map<String, Object> data = new HashMap<>();
|
||||
data.put("tts", text);
|
||||
data.put("timestamp", System.currentTimeMillis());
|
||||
|
||||
publishAuditEvent("TTS_REQUEST", deviceId, null, null, null, text, data);
|
||||
}
|
||||
|
||||
/**
|
||||
* 确定当前状态
|
||||
*/
|
||||
private RssiSlidingWindowDetector.AreaState determineState(
|
||||
BadgeDeviceStatusRedisDAO.OrderInfo currentOrder, Long areaId, Long deviceId) {
|
||||
|
||||
// 优先检查IoT本地到岗记录(避免依赖跨模块异步同步)
|
||||
Long arrivedTime = arrivedTimeRedisDAO.getArrivedTime(deviceId, areaId);
|
||||
if (arrivedTime != null) {
|
||||
log.debug("[BeaconDetection] 本地状态:已到岗, deviceId={}, areaId={}, arrivedTime={}",
|
||||
deviceId, areaId, arrivedTime);
|
||||
return RssiSlidingWindowDetector.AreaState.IN_AREA;
|
||||
}
|
||||
|
||||
// 降级:检查Ops模块的工单状态(向后兼容)
|
||||
if (currentOrder != null
|
||||
&& "ARRIVED".equals(currentOrder.getStatus())
|
||||
&& areaId.equals(currentOrder.getAreaId())) {
|
||||
// 同步本地状态(修复历史数据)
|
||||
arrivedTimeRedisDAO.recordArrivedTime(deviceId, areaId, System.currentTimeMillis());
|
||||
log.info("[BeaconDetection] 从Ops状态同步本地到岗记录:deviceId={}, areaId={}",
|
||||
deviceId, areaId);
|
||||
return RssiSlidingWindowDetector.AreaState.IN_AREA;
|
||||
}
|
||||
|
||||
return RssiSlidingWindowDetector.AreaState.OUT_AREA;
|
||||
}
|
||||
|
||||
/**
|
||||
* 检查是否正在切换工单(P0 插队场景)
|
||||
* <p>
|
||||
* 如果当前工单的区域ID与正在检查的区域不一致,说明保洁员已切换到其他区域的工单
|
||||
*
|
||||
* @param deviceId 设备ID
|
||||
* @param areaId 正在检查的区域ID
|
||||
* @return true-工单切换场景,false-正常离岗场景
|
||||
*/
|
||||
private boolean isSwitchingOrder(Long deviceId, Long areaId) {
|
||||
BadgeDeviceStatusRedisDAO.OrderInfo currentOrder = badgeDeviceStatusRedisDAO.getCurrentOrder(deviceId);
|
||||
return currentOrder != null && !currentOrder.getAreaId().equals(areaId);
|
||||
}
|
||||
|
||||
/**
|
||||
* 清理设备所有区域的检测状态
|
||||
* <p>
|
||||
* 用于工单切换场景,清理本地缓存。
|
||||
* Redis 数据(arrivedTime、signalLoss、rssiWindow)由以下路径清理:
|
||||
* <ul>
|
||||
* <li>工单完成时:SignalLossRuleProcessor.cleanupRedisData()</li>
|
||||
* <li>自然过期:Redis TTL 自动清理</li>
|
||||
* <li>新数据覆盖:每次检测都会更新滑动窗口</li>
|
||||
* </ul>
|
||||
*
|
||||
* @param deviceId 设备ID
|
||||
*/
|
||||
private void cleanupAllDetectionState(Long deviceId) {
|
||||
if (deviceId == null) {
|
||||
return;
|
||||
}
|
||||
// 清理本地缓存
|
||||
lastDetectedOrderCache.remove(deviceId);
|
||||
log.info("[BeaconDetection] 已清理设备工单切换检测状态: deviceId={}", deviceId);
|
||||
}
|
||||
}
|
||||
package com.viewsh.module.iot.service.rule.clean.processor;
|
||||
|
||||
import com.viewsh.framework.tenant.core.context.ProjectContextHolder;
|
||||
import com.viewsh.framework.tenant.core.context.TenantContextHolder;
|
||||
import com.viewsh.module.iot.core.integration.constants.CleanOrderTopics;
|
||||
import com.viewsh.module.iot.core.integration.event.clean.CleanOrderArriveEvent;
|
||||
import com.viewsh.module.iot.core.integration.event.clean.CleanOrderAuditEvent;
|
||||
import com.viewsh.module.iot.dal.dataobject.integration.clean.BeaconPresenceConfig;
|
||||
import com.viewsh.module.iot.dal.redis.clean.BadgeDeviceStatusRedisDAO;
|
||||
import com.viewsh.module.iot.dal.redis.clean.BeaconArrivedTimeRedisDAO;
|
||||
import com.viewsh.module.iot.dal.redis.clean.BeaconRssiWindowRedisDAO;
|
||||
import com.viewsh.module.iot.dal.redis.clean.SignalLossRedisDAO;
|
||||
import com.viewsh.module.iot.service.integration.clean.CleanOrderIntegrationConfigService;
|
||||
import com.viewsh.module.iot.service.rule.clean.detector.RssiSlidingWindowDetector;
|
||||
import jakarta.annotation.Resource;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.rocketmq.spring.core.RocketMQTemplate;
|
||||
import org.springframework.messaging.support.MessageBuilder;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
/**
|
||||
* 蓝牙信标检测规则处理器
|
||||
* <p>
|
||||
* 监听工牌的蓝牙属性上报,基于滑动窗口算法检测保洁员到岗/离岗
|
||||
* 采用"强进弱出"双阈值,避免信号抖动
|
||||
*
|
||||
* @author AI
|
||||
*/
|
||||
@Component
|
||||
@Slf4j
|
||||
public class BeaconDetectionRuleProcessor {
|
||||
|
||||
@Resource
|
||||
private BeaconRssiWindowRedisDAO windowRedisDAO;
|
||||
|
||||
@Resource
|
||||
private BeaconArrivedTimeRedisDAO arrivedTimeRedisDAO;
|
||||
|
||||
@Resource
|
||||
private SignalLossRedisDAO signalLossRedisDAO;
|
||||
|
||||
@Resource
|
||||
private BadgeDeviceStatusRedisDAO badgeDeviceStatusRedisDAO;
|
||||
|
||||
@Resource
|
||||
private CleanOrderIntegrationConfigService configService;
|
||||
|
||||
@Resource
|
||||
private RssiSlidingWindowDetector detector;
|
||||
|
||||
@Resource
|
||||
private RocketMQTemplate rocketMQTemplate;
|
||||
|
||||
/**
|
||||
* 上次检测的工单ID缓存(设备ID -> 工单ID)
|
||||
* 用于检测工单切换,清理旧工单的检测状态
|
||||
*/
|
||||
private final Map<Long, Long> lastDetectedOrderCache = new ConcurrentHashMap<>();
|
||||
|
||||
/**
|
||||
* 处理蓝牙属性上报
|
||||
* <p>
|
||||
* 在设备属性上报处理流程中调用此方法
|
||||
*
|
||||
* @param deviceId 设备ID
|
||||
* @param identifier 属性标识符(bluetoothDevices)
|
||||
* @param propertyValue 属性值(蓝牙设备数组)
|
||||
*/
|
||||
public void processPropertyChange(Long deviceId, String identifier, Object propertyValue) {
|
||||
// 1. 检查是否是蓝牙属性
|
||||
if (!"bluetoothDevices".equals(identifier)) {
|
||||
return;
|
||||
}
|
||||
|
||||
log.debug("[BeaconDetection] 收到蓝牙属性:deviceId={}", deviceId);
|
||||
|
||||
// 2. 先获取当前工单状态(从中获取正确的 areaId)
|
||||
BadgeDeviceStatusRedisDAO.OrderInfo currentOrder = badgeDeviceStatusRedisDAO.getCurrentOrder(deviceId);
|
||||
|
||||
if (currentOrder == null || currentOrder.getAreaId() == null) {
|
||||
log.debug("[BeaconDetection] 无当前工单,跳过检测:deviceId={}", deviceId);
|
||||
// 无工单时清理本地缓存
|
||||
lastDetectedOrderCache.remove(deviceId);
|
||||
return;
|
||||
}
|
||||
|
||||
Long areaId = currentOrder.getAreaId();
|
||||
Long orderId = currentOrder.getOrderId();
|
||||
|
||||
// 3. 检测工单切换,清理旧工单的检测状态
|
||||
Long lastOrderId = lastDetectedOrderCache.get(deviceId);
|
||||
if (lastOrderId != null && !lastOrderId.equals(orderId)) {
|
||||
log.warn("[BeaconDetection] 检测到工单切换,清理旧工单的检测状态: " +
|
||||
"deviceId={}, oldOrderId={}, newOrderId={}", deviceId, lastOrderId, orderId);
|
||||
// 清理旧的检测状态(清理当前设备的所有区域检测状态)
|
||||
cleanupAllDetectionState(deviceId);
|
||||
}
|
||||
// 更新缓存
|
||||
lastDetectedOrderCache.put(deviceId, orderId);
|
||||
|
||||
log.debug("[BeaconDetection] 从工单状态获取区域:deviceId={}, areaId={}, orderId={}",
|
||||
deviceId, areaId, orderId);
|
||||
|
||||
// 3. 获取该区域的信标配置(从 BEACON 类型的设备获取)
|
||||
CleanOrderIntegrationConfigService.AreaDeviceConfigWrapper beaconConfigWrapper = configService
|
||||
.getConfigByAreaIdAndRelationType(areaId, "BEACON");
|
||||
|
||||
if (beaconConfigWrapper == null || beaconConfigWrapper.getConfig() == null) {
|
||||
log.debug("[BeaconDetection] 区域无信标配置:areaId={}", areaId);
|
||||
return;
|
||||
}
|
||||
|
||||
BeaconPresenceConfig beaconConfig = beaconConfigWrapper.getConfig().getBeaconPresence();
|
||||
if (beaconConfig == null || !beaconConfig.getEnabled()) {
|
||||
log.debug("[BeaconDetection] 未启用信标检测:areaId={}", areaId);
|
||||
return;
|
||||
}
|
||||
|
||||
// 4. 解析蓝牙数据,提取目标信标的 RSSI
|
||||
Integer targetRssi = detector.extractTargetRssi(propertyValue, beaconConfig);
|
||||
|
||||
log.debug("[BeaconDetection] 提取RSSI:deviceId={}, areaId={}, beaconMac={}, rssi={}",
|
||||
deviceId, areaId, beaconConfig.getBeaconMac(), targetRssi);
|
||||
|
||||
// 5. 更新滑动窗口(使用 enter 和 exit 中较大的窗口大小)
|
||||
int maxWindowSize = Math.max(
|
||||
beaconConfig.getEnter().getWindowSize(),
|
||||
beaconConfig.getExit().getWindowSize());
|
||||
windowRedisDAO.updateWindow(deviceId, areaId, targetRssi, maxWindowSize);
|
||||
|
||||
// 6. 获取当前窗口样本
|
||||
List<Integer> window = windowRedisDAO.getWindow(deviceId, areaId);
|
||||
|
||||
// 7. 确定当前状态
|
||||
RssiSlidingWindowDetector.AreaState currentState = determineState(currentOrder, areaId, deviceId);
|
||||
|
||||
// 8. 执行检测
|
||||
RssiSlidingWindowDetector.DetectionResult result = detector.detect(
|
||||
window,
|
||||
beaconConfig.getEnter(),
|
||||
beaconConfig.getExit(),
|
||||
currentState);
|
||||
|
||||
// 9. 处理检测结果
|
||||
switch (result) {
|
||||
case ARRIVE_CONFIRMED:
|
||||
handleArriveConfirmed(deviceId, areaId, window, beaconConfig, currentOrder);
|
||||
break;
|
||||
case LEAVE_CONFIRMED:
|
||||
handleLeaveConfirmed(deviceId, areaId, window, beaconConfig);
|
||||
break;
|
||||
default:
|
||||
// NO_CHANGE,不做处理
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 处理到达确认
|
||||
*/
|
||||
private void handleArriveConfirmed(Long deviceId, Long areaId, List<Integer> window,
|
||||
BeaconPresenceConfig beaconConfig,
|
||||
BadgeDeviceStatusRedisDAO.OrderInfo currentOrder) {
|
||||
log.info("[BeaconDetection] 到达确认:deviceId={}, areaId={}, window={}",
|
||||
deviceId, areaId, window);
|
||||
|
||||
// 1. 记录到达时间
|
||||
arrivedTimeRedisDAO.recordArrivedTime(deviceId, areaId, System.currentTimeMillis());
|
||||
|
||||
// 2. 清除离岗记录(如果存在)
|
||||
signalLossRedisDAO.clearLossRecord(deviceId, areaId);
|
||||
|
||||
// 3. 清理 RSSI 窗口(避免历史脏数据影响新的在岗周期)
|
||||
windowRedisDAO.clearWindow(deviceId, areaId);
|
||||
log.debug("[BeaconDetection] 到岗时清理RSSI窗口:deviceId={}, areaId={}", deviceId, areaId);
|
||||
|
||||
// 4. 获取当前最新的 RSSI 值(使用原窗口快照,因为已清理)
|
||||
Integer currentRssi = window.isEmpty() ? -999 : window.get(window.size() - 1);
|
||||
|
||||
// 5. 构建触发数据
|
||||
Map<String, Object> triggerData = new HashMap<>();
|
||||
triggerData.put("beaconMac", beaconConfig.getBeaconMac());
|
||||
triggerData.put("rssi", currentRssi);
|
||||
triggerData.put("windowSnapshot", window);
|
||||
triggerData.put("enterRssiThreshold", beaconConfig.getEnter().getRssiThreshold());
|
||||
|
||||
// 6. 发布到岗事件
|
||||
if (beaconConfig.getEnter().getAutoArrival()) {
|
||||
publishArriveEvent(deviceId, currentOrder.getOrderId(), areaId, triggerData);
|
||||
}
|
||||
|
||||
// 7. 发布审计日志
|
||||
publishAuditEvent("BEACON_ARRIVE_CONFIRMED", deviceId, null, areaId, currentOrder.getOrderId(),
|
||||
"蓝牙信标自动到岗确认", triggerData);
|
||||
}
|
||||
|
||||
/**
|
||||
* 处理离开确认
|
||||
*/
|
||||
private void handleLeaveConfirmed(Long deviceId, Long areaId, List<Integer> window,
|
||||
BeaconPresenceConfig beaconConfig) {
|
||||
log.info("[BeaconDetection] 离开确认:deviceId={}, areaId={}, window={}",
|
||||
deviceId, areaId, window);
|
||||
|
||||
// 注意:离岗警告阶段不清除arrivedTime,保持IN_AREA状态
|
||||
// arrivedTime在工单完成时由SignalLossRuleProcessor.cleanupRedisData清除
|
||||
|
||||
// P0 插队校验:检查当前工单是否属于正在检查的区域
|
||||
if (isSwitchingOrder(deviceId, areaId)) {
|
||||
log.debug("[BeaconDetection][P0Interrupt] 检测到工单切换,跳过区域 {} 的离岗处理",
|
||||
areaId);
|
||||
// 清理该区域的离岗记录(避免内存泄漏)
|
||||
signalLossRedisDAO.clearLossRecord(deviceId, areaId);
|
||||
return;
|
||||
}
|
||||
|
||||
BeaconPresenceConfig.ExitConfig exitConfig = beaconConfig.getExit();
|
||||
|
||||
// 1. 检查是否是首次丢失
|
||||
Long firstLossTime = signalLossRedisDAO.getFirstLossTime(deviceId, areaId);
|
||||
|
||||
if (firstLossTime == null) {
|
||||
// 首次丢失
|
||||
signalLossRedisDAO.recordFirstLoss(deviceId, areaId, System.currentTimeMillis());
|
||||
|
||||
// 2. 发布审计日志
|
||||
Map<String, Object> data = new HashMap<>();
|
||||
data.put("firstLossTime", System.currentTimeMillis());
|
||||
data.put("rssi", window.isEmpty() ? -999 : window.get(window.size() - 1));
|
||||
data.put("warningDelayMinutes", exitConfig.getWarningDelayMinutes());
|
||||
|
||||
// 获取当前工单ID
|
||||
BadgeDeviceStatusRedisDAO.OrderInfo currentOrder = badgeDeviceStatusRedisDAO.getCurrentOrder(deviceId);
|
||||
Long orderId = currentOrder != null ? currentOrder.getOrderId() : null;
|
||||
|
||||
publishAuditEvent("BEACON_LEAVE_WARNING_SENT", deviceId, null, areaId, orderId,
|
||||
"保洁员离开作业区域,已发送警告", data);
|
||||
} else {
|
||||
// 4. 更新最后丢失时间
|
||||
signalLossRedisDAO.updateLastLossTime(deviceId, areaId, System.currentTimeMillis());
|
||||
|
||||
log.debug("[BeaconDetection] 更新最后丢失时间:deviceId={}, areaId={}", deviceId, areaId);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 发布到岗事件
|
||||
*/
|
||||
private void publishArriveEvent(Long deviceId, Long orderId, Long areaId, Map<String, Object> triggerData) {
|
||||
try {
|
||||
CleanOrderArriveEvent event = CleanOrderArriveEvent.builder()
|
||||
.tenantId(TenantContextHolder.getTenantId())
|
||||
.projectId(ProjectContextHolder.getProjectId())
|
||||
.eventId(java.util.UUID.randomUUID().toString())
|
||||
.orderType("CLEAN")
|
||||
.orderId(orderId)
|
||||
.deviceId(deviceId)
|
||||
.areaId(areaId)
|
||||
.triggerSource("IOT_BEACON")
|
||||
.triggerData(triggerData)
|
||||
.build();
|
||||
|
||||
rocketMQTemplate.syncSend(CleanOrderTopics.ORDER_ARRIVE, MessageBuilder.withPayload(event).build());
|
||||
|
||||
log.info("[BeaconDetection] 发布到岗事件:eventId={}, deviceId={}, areaId={}, orderId={}",
|
||||
event.getEventId(), deviceId, areaId, orderId);
|
||||
} catch (Exception e) {
|
||||
log.error("[BeaconDetection] 发布到岗事件失败:deviceId={}, areaId={}", deviceId, areaId, e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 发布审计事件
|
||||
*/
|
||||
private void publishAuditEvent(String auditType, Long deviceId, String deviceKey,
|
||||
Long areaId, Long orderId, String message, Map<String, Object> data) {
|
||||
try {
|
||||
CleanOrderAuditEvent event = CleanOrderAuditEvent.builder()
|
||||
.tenantId(TenantContextHolder.getTenantId())
|
||||
.projectId(ProjectContextHolder.getProjectId())
|
||||
.eventId(java.util.UUID.randomUUID().toString())
|
||||
.auditType(auditType)
|
||||
.deviceId(deviceId)
|
||||
.deviceKey(deviceKey)
|
||||
.areaId(areaId)
|
||||
.orderId(orderId)
|
||||
.message(message)
|
||||
.data(data)
|
||||
.build();
|
||||
|
||||
rocketMQTemplate.syncSend(CleanOrderTopics.ORDER_AUDIT, MessageBuilder.withPayload(event).build());
|
||||
|
||||
log.debug("[BeaconDetection] 发布审计事件:auditType={}, deviceId={}, areaId={}, orderId={}",
|
||||
auditType, deviceId, areaId, orderId);
|
||||
} catch (Exception e) {
|
||||
log.error("[BeaconDetection] 发布审计事件失败:auditType={}, deviceId={}", auditType, deviceId, e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 发布 TTS 事件(通过审计事件传递)
|
||||
*/
|
||||
private void publishTtsEvent(Long deviceId, String text) {
|
||||
Map<String, Object> data = new HashMap<>();
|
||||
data.put("tts", text);
|
||||
data.put("timestamp", System.currentTimeMillis());
|
||||
|
||||
publishAuditEvent("TTS_REQUEST", deviceId, null, null, null, text, data);
|
||||
}
|
||||
|
||||
/**
|
||||
* 确定当前状态
|
||||
*/
|
||||
private RssiSlidingWindowDetector.AreaState determineState(
|
||||
BadgeDeviceStatusRedisDAO.OrderInfo currentOrder, Long areaId, Long deviceId) {
|
||||
|
||||
// 优先检查IoT本地到岗记录(避免依赖跨模块异步同步)
|
||||
Long arrivedTime = arrivedTimeRedisDAO.getArrivedTime(deviceId, areaId);
|
||||
if (arrivedTime != null) {
|
||||
log.debug("[BeaconDetection] 本地状态:已到岗, deviceId={}, areaId={}, arrivedTime={}",
|
||||
deviceId, areaId, arrivedTime);
|
||||
return RssiSlidingWindowDetector.AreaState.IN_AREA;
|
||||
}
|
||||
|
||||
// 降级:检查Ops模块的工单状态(向后兼容)
|
||||
if (currentOrder != null
|
||||
&& "ARRIVED".equals(currentOrder.getStatus())
|
||||
&& areaId.equals(currentOrder.getAreaId())) {
|
||||
// 同步本地状态(修复历史数据)
|
||||
arrivedTimeRedisDAO.recordArrivedTime(deviceId, areaId, System.currentTimeMillis());
|
||||
log.info("[BeaconDetection] 从Ops状态同步本地到岗记录:deviceId={}, areaId={}",
|
||||
deviceId, areaId);
|
||||
return RssiSlidingWindowDetector.AreaState.IN_AREA;
|
||||
}
|
||||
|
||||
return RssiSlidingWindowDetector.AreaState.OUT_AREA;
|
||||
}
|
||||
|
||||
/**
|
||||
* 检查是否正在切换工单(P0 插队场景)
|
||||
* <p>
|
||||
* 如果当前工单的区域ID与正在检查的区域不一致,说明保洁员已切换到其他区域的工单
|
||||
*
|
||||
* @param deviceId 设备ID
|
||||
* @param areaId 正在检查的区域ID
|
||||
* @return true-工单切换场景,false-正常离岗场景
|
||||
*/
|
||||
private boolean isSwitchingOrder(Long deviceId, Long areaId) {
|
||||
BadgeDeviceStatusRedisDAO.OrderInfo currentOrder = badgeDeviceStatusRedisDAO.getCurrentOrder(deviceId);
|
||||
return currentOrder != null && !currentOrder.getAreaId().equals(areaId);
|
||||
}
|
||||
|
||||
/**
|
||||
* 清理设备所有区域的检测状态
|
||||
* <p>
|
||||
* 用于工单切换场景,清理本地缓存。
|
||||
* Redis 数据(arrivedTime、signalLoss、rssiWindow)由以下路径清理:
|
||||
* <ul>
|
||||
* <li>工单完成时:SignalLossRuleProcessor.cleanupRedisData()</li>
|
||||
* <li>自然过期:Redis TTL 自动清理</li>
|
||||
* <li>新数据覆盖:每次检测都会更新滑动窗口</li>
|
||||
* </ul>
|
||||
*
|
||||
* @param deviceId 设备ID
|
||||
*/
|
||||
private void cleanupAllDetectionState(Long deviceId) {
|
||||
if (deviceId == null) {
|
||||
return;
|
||||
}
|
||||
// 清理本地缓存
|
||||
lastDetectedOrderCache.remove(deviceId);
|
||||
log.info("[BeaconDetection] 已清理设备工单切换检测状态: deviceId={}", deviceId);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,297 +1,300 @@
|
||||
package com.viewsh.module.iot.service.rule.clean.processor;
|
||||
|
||||
import com.viewsh.framework.common.util.json.JsonUtils;
|
||||
import com.viewsh.framework.tenant.core.context.TenantContextHolder;
|
||||
import com.viewsh.module.iot.core.integration.constants.CleanOrderTopics;
|
||||
import com.viewsh.module.iot.dal.dataobject.integration.clean.ButtonEventConfig;
|
||||
import com.viewsh.module.iot.dal.redis.clean.BadgeDeviceStatusRedisDAO;
|
||||
import com.viewsh.module.iot.service.device.IotDeviceService;
|
||||
import com.viewsh.module.iot.dal.dataobject.device.IotDeviceDO;
|
||||
import jakarta.annotation.Resource;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.rocketmq.spring.core.RocketMQTemplate;
|
||||
import org.springframework.data.redis.core.StringRedisTemplate;
|
||||
import org.springframework.messaging.support.MessageBuilder;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.UUID;
|
||||
|
||||
/**
|
||||
* 按键事件规则处理器
|
||||
* <p>
|
||||
* 监听设备按键事件上报,处理保洁员工牌的按键交互
|
||||
* <p>
|
||||
* 支持的按键类型:
|
||||
* - 确认键(confirmKeyId):保洁员确认接收工单
|
||||
* - 查询键(queryKeyId):保洁员查询当前工单信息
|
||||
*
|
||||
* @author AI
|
||||
*/
|
||||
@Component
|
||||
@Slf4j
|
||||
public class ButtonEventRuleProcessor {
|
||||
|
||||
@Resource
|
||||
private BadgeDeviceStatusRedisDAO badgeDeviceStatusRedisDAO;
|
||||
|
||||
@Resource
|
||||
private IotDeviceService deviceService;
|
||||
|
||||
@Resource
|
||||
private RocketMQTemplate rocketMQTemplate;
|
||||
|
||||
/**
|
||||
* 处理按键事件属性上报
|
||||
* <p>
|
||||
* 在设备属性上报处理流程中调用此方法
|
||||
*
|
||||
* @param deviceId 设备ID
|
||||
* @param identifier 属性标识符(如 button_event)
|
||||
* @param propertyValue 属性值
|
||||
*/
|
||||
public void processPropertyChange(Long deviceId, String identifier, Object propertyValue) {
|
||||
// 1. 检查是否是按键事件属性
|
||||
if (!"button_event".equals(identifier)) {
|
||||
return;
|
||||
}
|
||||
|
||||
log.debug("[ButtonEvent] 收到按键事件:deviceId={}, value={}", deviceId, propertyValue);
|
||||
|
||||
// 2. 获取设备按键配置(从设备 config 字段读取)
|
||||
ButtonEventConfig buttonConfig = getButtonConfig(deviceId);
|
||||
if (buttonConfig == null || !buttonConfig.getEnabled()) {
|
||||
log.debug("[ButtonEvent] 未启用按键事件处理:deviceId={}", deviceId);
|
||||
return;
|
||||
}
|
||||
|
||||
// 3. 解析按键ID
|
||||
Integer buttonId = parseButtonId(propertyValue);
|
||||
if (buttonId == null) {
|
||||
log.warn("[ButtonEvent] 按键ID解析失败:deviceId={}, value={}", deviceId, propertyValue);
|
||||
return;
|
||||
}
|
||||
|
||||
log.debug("[ButtonEvent] 按键解析成功:deviceId={}, buttonId={}", deviceId, buttonId);
|
||||
|
||||
// 4. 匹配按键类型并处理(确认键和查询键统一路由到同一逻辑)
|
||||
if (buttonId.equals(buttonConfig.getConfirmKeyId())
|
||||
|| buttonId.equals(buttonConfig.getQueryKeyId())) {
|
||||
// 所有已知按键统一走绿色按键逻辑(根据工单状态智能判断行为)
|
||||
handleGreenButton(deviceId, buttonId);
|
||||
} else {
|
||||
log.debug("[ButtonEvent] 未配置的按键:deviceId={}, buttonId={}", deviceId, buttonId);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 处理绿色按键(统一按键逻辑)
|
||||
* <p>
|
||||
* 根据当前工单状态智能判断行为:
|
||||
* - 无工单:发布查询事件(Ops 端播报"没有工单")
|
||||
* - DISPATCHED:发布确认事件(触发确认状态转换 + 停止循环 + 播报地点)
|
||||
* - CONFIRMED/ARRIVED:发布查询事件(播报地点)
|
||||
* - 其他状态:发布查询事件(兜底处理)
|
||||
*/
|
||||
private void handleGreenButton(Long deviceId, Integer buttonId) {
|
||||
log.info("[ButtonEvent] 绿色按键按下:deviceId={}, buttonId={}", deviceId, buttonId);
|
||||
|
||||
// 1. 查询设备当前工单
|
||||
BadgeDeviceStatusRedisDAO.OrderInfo currentOrder = badgeDeviceStatusRedisDAO.getCurrentOrder(deviceId);
|
||||
if (currentOrder == null) {
|
||||
// 无工单 → 发布查询事件(Ops 端播报"没有工单")
|
||||
log.info("[ButtonEvent] 设备无当前工单:deviceId={}", deviceId);
|
||||
publishQueryEvent(deviceId, null, buttonId, "当前无工单");
|
||||
return;
|
||||
}
|
||||
|
||||
Long orderId = currentOrder.getOrderId();
|
||||
String orderStatus = currentOrder.getStatus();
|
||||
|
||||
// 2. 根据工单状态智能分派
|
||||
if ("DISPATCHED".equals(orderStatus)) {
|
||||
// DISPATCHED → 发布确认事件(触发确认 + 停止循环 + 播报地点)
|
||||
// 防重复检查
|
||||
String dedupKey = String.format("iot:clean:button:dedup:confirm:%s:%s", deviceId, orderId);
|
||||
Boolean firstTime = stringRedisTemplate.opsForValue()
|
||||
.setIfAbsent(dedupKey, "1", 10, java.util.concurrent.TimeUnit.SECONDS);
|
||||
|
||||
if (!Boolean.TRUE.equals(firstTime)) {
|
||||
// 重复确认不再静默,改为发查询事件给保洁员反馈(播报地点)
|
||||
log.info("[ButtonEvent] 确认操作重复,转为查询:deviceId={}, orderId={}", deviceId, orderId);
|
||||
publishQueryEvent(deviceId, orderId, buttonId, "重复确认,查询当前工单");
|
||||
return;
|
||||
}
|
||||
|
||||
publishConfirmEvent(deviceId, orderId, buttonId);
|
||||
log.info("[ButtonEvent] DISPATCHED状态,发布确认事件:deviceId={}, orderId={}", deviceId, orderId);
|
||||
} else {
|
||||
// CONFIRMED / ARRIVED / 其他状态 → 发布查询事件(播报地点)
|
||||
publishQueryEvent(deviceId, orderId, buttonId, "查询当前工单");
|
||||
log.info("[ButtonEvent] {}状态,发布查询事件:deviceId={}, orderId={}", orderStatus, deviceId, orderId);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 发布工单确认事件
|
||||
*/
|
||||
private void publishConfirmEvent(Long deviceId, Long orderId, Integer buttonId) {
|
||||
try {
|
||||
String deviceKey = getDeviceKey(deviceId);
|
||||
|
||||
Map<String, Object> event = new HashMap<>();
|
||||
event.put("eventId", UUID.randomUUID().toString());
|
||||
event.put("tenantId", TenantContextHolder.getTenantId());
|
||||
event.put("orderType", "CLEAN");
|
||||
event.put("orderId", orderId);
|
||||
event.put("deviceId", deviceId);
|
||||
event.put("deviceKey", deviceKey);
|
||||
event.put("areaId", null); // areaId 由 Ops 模块从当前工单获取
|
||||
event.put("triggerSource", "IOT_BUTTON_CONFIRM");
|
||||
event.put("buttonId", buttonId);
|
||||
|
||||
rocketMQTemplate.syncSend(
|
||||
CleanOrderTopics.ORDER_CONFIRM,
|
||||
MessageBuilder.withPayload(event).build()
|
||||
);
|
||||
|
||||
log.info("[ButtonEvent] 确认事件已发布:eventId={}, orderId={}, deviceId={}",
|
||||
event.get("eventId"), orderId, deviceId);
|
||||
} catch (Exception e) {
|
||||
log.error("[ButtonEvent] 发布确认事件失败:deviceId={}, orderId={}",
|
||||
deviceId, orderId, e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 发布工单查询事件
|
||||
*/
|
||||
private void publishQueryEvent(Long deviceId, Long orderId, Integer buttonId, String message) {
|
||||
try {
|
||||
String deviceKey = getDeviceKey(deviceId);
|
||||
|
||||
Map<String, Object> event = new HashMap<>();
|
||||
event.put("eventId", UUID.randomUUID().toString());
|
||||
event.put("tenantId", TenantContextHolder.getTenantId());
|
||||
event.put("orderType", "CLEAN");
|
||||
event.put("orderId", orderId);
|
||||
event.put("deviceId", deviceId);
|
||||
event.put("deviceKey", deviceKey);
|
||||
event.put("areaId", null); // areaId 由 Ops 模块从当前工单获取
|
||||
event.put("triggerSource", "IOT_BUTTON_QUERY");
|
||||
event.put("buttonId", buttonId);
|
||||
event.put("message", message);
|
||||
|
||||
rocketMQTemplate.syncSend(
|
||||
CleanOrderTopics.ORDER_AUDIT, // 查询事件使用审计主题
|
||||
MessageBuilder.withPayload(event).build()
|
||||
);
|
||||
|
||||
log.info("[ButtonEvent] 查询事件已发布:eventId={}, orderId={}, deviceId={}, message={}",
|
||||
event.get("eventId"), orderId, deviceId, message);
|
||||
} catch (Exception e) {
|
||||
log.error("[ButtonEvent] 发布查询事件失败:deviceId={}, orderId={}",
|
||||
deviceId, orderId, e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取设备按键配置
|
||||
* <p>
|
||||
* 从设备的 config 字段读取按键事件配置
|
||||
*
|
||||
* @param deviceId 设备ID
|
||||
* @return 按键配置,如果未配置返回 null
|
||||
*/
|
||||
private ButtonEventConfig getButtonConfig(Long deviceId) {
|
||||
try {
|
||||
IotDeviceDO device = deviceService.getDeviceFromCache(deviceId);
|
||||
if (device == null || device.getConfig() == null) {
|
||||
log.debug("[ButtonEvent] 设备不存在或无配置:deviceId={}", deviceId);
|
||||
return null;
|
||||
}
|
||||
|
||||
// 从设备 config JSON 中解析 buttonEvent 配置
|
||||
// 注意:使用 JsonUtils.parseObject 直接解析整个 config 为 Map,然后提取 buttonEvent
|
||||
// 避免 先转JSON字符串再解析回对象 的双重转换
|
||||
@SuppressWarnings("unchecked")
|
||||
Map<String, Object> configMap = JsonUtils.parseObject(device.getConfig(), Map.class);
|
||||
if (configMap == null || !configMap.containsKey("buttonEvent")) {
|
||||
log.debug("[ButtonEvent] 设备配置中无 buttonEvent:deviceId={}", deviceId);
|
||||
return null;
|
||||
}
|
||||
|
||||
// 将 buttonEvent 对象转为 JSON 字符串再解析为目标类型
|
||||
// TODO: 后续可优化为直接转换,避免序列化/反序列化开销
|
||||
Object buttonEventObj = configMap.get("buttonEvent");
|
||||
return JsonUtils.parseObject(JsonUtils.toJsonString(buttonEventObj), ButtonEventConfig.class);
|
||||
} catch (Exception e) {
|
||||
log.error("[ButtonEvent] 获取按键配置失败:deviceId={}", deviceId, e);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 解析按键ID
|
||||
* <p>
|
||||
* 支持两种格式:
|
||||
* 1. 属性上报:value 直接是按键ID(如 1)
|
||||
* 2. 事件上报:value 是 Map,包含 keyId 字段(如 {keyId: 1, keyState: 1})
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
private Integer parseButtonId(Object value) {
|
||||
if (value == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
// 事件上报格式:value 是 Map,包含 keyId 字段
|
||||
if (value instanceof Map) {
|
||||
Map<String, Object> map = (Map<String, Object>) value;
|
||||
Object keyId = map.get("keyId");
|
||||
if (keyId instanceof Number) {
|
||||
return ((Number) keyId).intValue();
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
// 属性上报格式:value 直接是按键ID
|
||||
if (value instanceof Number) {
|
||||
return ((Number) value).intValue();
|
||||
}
|
||||
|
||||
if (value instanceof String) {
|
||||
try {
|
||||
return Integer.parseInt((String) value);
|
||||
} catch (NumberFormatException e) {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取设备 Key(从 IoT 设备缓存获取 serialNumber)
|
||||
* <p>
|
||||
* deviceKey 在 ops_area_device_relation 表中是冗余字段,
|
||||
* 实际来源是 iot_device.serialNumber
|
||||
*
|
||||
* @param deviceId 设备ID
|
||||
* @return deviceKey(serialNumber),获取<EFBFBD><EFBFBD>败返回 null
|
||||
*/
|
||||
private String getDeviceKey(Long deviceId) {
|
||||
try {
|
||||
IotDeviceDO device = deviceService.getDeviceFromCache(deviceId);
|
||||
if (device != null) {
|
||||
return device.getSerialNumber();
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.warn("[ButtonEvent] 获取 deviceKey 失败:deviceId={}", deviceId, e);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
@Resource
|
||||
private StringRedisTemplate stringRedisTemplate;
|
||||
}
|
||||
package com.viewsh.module.iot.service.rule.clean.processor;
|
||||
|
||||
import com.viewsh.framework.common.util.json.JsonUtils;
|
||||
import com.viewsh.framework.tenant.core.context.ProjectContextHolder;
|
||||
import com.viewsh.framework.tenant.core.context.TenantContextHolder;
|
||||
import com.viewsh.module.iot.core.integration.constants.CleanOrderTopics;
|
||||
import com.viewsh.module.iot.dal.dataobject.integration.clean.ButtonEventConfig;
|
||||
import com.viewsh.module.iot.dal.redis.clean.BadgeDeviceStatusRedisDAO;
|
||||
import com.viewsh.module.iot.service.device.IotDeviceService;
|
||||
import com.viewsh.module.iot.dal.dataobject.device.IotDeviceDO;
|
||||
import jakarta.annotation.Resource;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.rocketmq.spring.core.RocketMQTemplate;
|
||||
import org.springframework.data.redis.core.StringRedisTemplate;
|
||||
import org.springframework.messaging.support.MessageBuilder;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.UUID;
|
||||
|
||||
/**
|
||||
* 按键事件规则处理器
|
||||
* <p>
|
||||
* 监听设备按键事件上报,处理保洁员工牌的按键交互
|
||||
* <p>
|
||||
* 支持的按键类型:
|
||||
* - 确认键(confirmKeyId):保洁员确认接收工单
|
||||
* - 查询键(queryKeyId):保洁员查询当前工单信息
|
||||
*
|
||||
* @author AI
|
||||
*/
|
||||
@Component
|
||||
@Slf4j
|
||||
public class ButtonEventRuleProcessor {
|
||||
|
||||
@Resource
|
||||
private BadgeDeviceStatusRedisDAO badgeDeviceStatusRedisDAO;
|
||||
|
||||
@Resource
|
||||
private IotDeviceService deviceService;
|
||||
|
||||
@Resource
|
||||
private RocketMQTemplate rocketMQTemplate;
|
||||
|
||||
/**
|
||||
* 处理按键事件属性上报
|
||||
* <p>
|
||||
* 在设备属性上报处理流程中调用此方法
|
||||
*
|
||||
* @param deviceId 设备ID
|
||||
* @param identifier 属性标识符(如 button_event)
|
||||
* @param propertyValue 属性值
|
||||
*/
|
||||
public void processPropertyChange(Long deviceId, String identifier, Object propertyValue) {
|
||||
// 1. 检查是否是按键事件属性
|
||||
if (!"button_event".equals(identifier)) {
|
||||
return;
|
||||
}
|
||||
|
||||
log.debug("[ButtonEvent] 收到按键事件:deviceId={}, value={}", deviceId, propertyValue);
|
||||
|
||||
// 2. 获取设备按键配置(从设备 config 字段读取)
|
||||
ButtonEventConfig buttonConfig = getButtonConfig(deviceId);
|
||||
if (buttonConfig == null || !buttonConfig.getEnabled()) {
|
||||
log.debug("[ButtonEvent] 未启用按键事件处理:deviceId={}", deviceId);
|
||||
return;
|
||||
}
|
||||
|
||||
// 3. 解析按键ID
|
||||
Integer buttonId = parseButtonId(propertyValue);
|
||||
if (buttonId == null) {
|
||||
log.warn("[ButtonEvent] 按键ID解析失败:deviceId={}, value={}", deviceId, propertyValue);
|
||||
return;
|
||||
}
|
||||
|
||||
log.debug("[ButtonEvent] 按键解析成功:deviceId={}, buttonId={}", deviceId, buttonId);
|
||||
|
||||
// 4. 匹配按键类型并处理(确认键和查询键统一路由到同一逻辑)
|
||||
if (buttonId.equals(buttonConfig.getConfirmKeyId())
|
||||
|| buttonId.equals(buttonConfig.getQueryKeyId())) {
|
||||
// 所有已知按键统一走绿色按键逻辑(根据工单状态智能判断行为)
|
||||
handleGreenButton(deviceId, buttonId);
|
||||
} else {
|
||||
log.debug("[ButtonEvent] 未配置的按键:deviceId={}, buttonId={}", deviceId, buttonId);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 处理绿色按键(统一按键逻辑)
|
||||
* <p>
|
||||
* 根据当前工单状态智能判断行为:
|
||||
* - 无工单:发布查询事件(Ops 端播报"没有工单")
|
||||
* - DISPATCHED:发布确认事件(触发确认状态转换 + 停止循环 + 播报地点)
|
||||
* - CONFIRMED/ARRIVED:发布查询事件(播报地点)
|
||||
* - 其他状态:发布查询事件(兜底处理)
|
||||
*/
|
||||
private void handleGreenButton(Long deviceId, Integer buttonId) {
|
||||
log.info("[ButtonEvent] 绿色按键按下:deviceId={}, buttonId={}", deviceId, buttonId);
|
||||
|
||||
// 1. 查询设备当前工单
|
||||
BadgeDeviceStatusRedisDAO.OrderInfo currentOrder = badgeDeviceStatusRedisDAO.getCurrentOrder(deviceId);
|
||||
if (currentOrder == null) {
|
||||
// 无工单 → 发布查询事件(Ops 端播报"没有工单")
|
||||
log.info("[ButtonEvent] 设备无当前工单:deviceId={}", deviceId);
|
||||
publishQueryEvent(deviceId, null, buttonId, "当前无工单");
|
||||
return;
|
||||
}
|
||||
|
||||
Long orderId = currentOrder.getOrderId();
|
||||
String orderStatus = currentOrder.getStatus();
|
||||
|
||||
// 2. 根据工单状态智能分派
|
||||
if ("DISPATCHED".equals(orderStatus)) {
|
||||
// DISPATCHED → 发布确认事件(触发确认 + 停止循环 + 播报地点)
|
||||
// 防重复检查
|
||||
String dedupKey = String.format("iot:clean:button:dedup:confirm:%s:%s", deviceId, orderId);
|
||||
Boolean firstTime = stringRedisTemplate.opsForValue()
|
||||
.setIfAbsent(dedupKey, "1", 10, java.util.concurrent.TimeUnit.SECONDS);
|
||||
|
||||
if (!Boolean.TRUE.equals(firstTime)) {
|
||||
// 重复确认不再静默,改为发查询事件给保洁员反馈(播报地点)
|
||||
log.info("[ButtonEvent] 确认操作重复,转为查询:deviceId={}, orderId={}", deviceId, orderId);
|
||||
publishQueryEvent(deviceId, orderId, buttonId, "重复确认,查询当前工单");
|
||||
return;
|
||||
}
|
||||
|
||||
publishConfirmEvent(deviceId, orderId, buttonId);
|
||||
log.info("[ButtonEvent] DISPATCHED状态,发布确认事件:deviceId={}, orderId={}", deviceId, orderId);
|
||||
} else {
|
||||
// CONFIRMED / ARRIVED / 其他状态 → 发布查询事件(播报地点)
|
||||
publishQueryEvent(deviceId, orderId, buttonId, "查询当前工单");
|
||||
log.info("[ButtonEvent] {}状态,发布查询事件:deviceId={}, orderId={}", orderStatus, deviceId, orderId);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 发布工单确认事件
|
||||
*/
|
||||
private void publishConfirmEvent(Long deviceId, Long orderId, Integer buttonId) {
|
||||
try {
|
||||
String deviceKey = getDeviceKey(deviceId);
|
||||
|
||||
Map<String, Object> event = new HashMap<>();
|
||||
event.put("eventId", UUID.randomUUID().toString());
|
||||
event.put("tenantId", TenantContextHolder.getTenantId());
|
||||
event.put("projectId", ProjectContextHolder.getProjectId());
|
||||
event.put("orderType", "CLEAN");
|
||||
event.put("orderId", orderId);
|
||||
event.put("deviceId", deviceId);
|
||||
event.put("deviceKey", deviceKey);
|
||||
event.put("areaId", null); // areaId 由 Ops 模块从当前工单获取
|
||||
event.put("triggerSource", "IOT_BUTTON_CONFIRM");
|
||||
event.put("buttonId", buttonId);
|
||||
|
||||
rocketMQTemplate.syncSend(
|
||||
CleanOrderTopics.ORDER_CONFIRM,
|
||||
MessageBuilder.withPayload(event).build()
|
||||
);
|
||||
|
||||
log.info("[ButtonEvent] 确认事件已发布:eventId={}, orderId={}, deviceId={}",
|
||||
event.get("eventId"), orderId, deviceId);
|
||||
} catch (Exception e) {
|
||||
log.error("[ButtonEvent] 发布确认事件失败:deviceId={}, orderId={}",
|
||||
deviceId, orderId, e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 发布工单查询事件
|
||||
*/
|
||||
private void publishQueryEvent(Long deviceId, Long orderId, Integer buttonId, String message) {
|
||||
try {
|
||||
String deviceKey = getDeviceKey(deviceId);
|
||||
|
||||
Map<String, Object> event = new HashMap<>();
|
||||
event.put("eventId", UUID.randomUUID().toString());
|
||||
event.put("tenantId", TenantContextHolder.getTenantId());
|
||||
event.put("projectId", ProjectContextHolder.getProjectId());
|
||||
event.put("orderType", "CLEAN");
|
||||
event.put("orderId", orderId);
|
||||
event.put("deviceId", deviceId);
|
||||
event.put("deviceKey", deviceKey);
|
||||
event.put("areaId", null); // areaId 由 Ops 模块从当前工单获取
|
||||
event.put("triggerSource", "IOT_BUTTON_QUERY");
|
||||
event.put("buttonId", buttonId);
|
||||
event.put("message", message);
|
||||
|
||||
rocketMQTemplate.syncSend(
|
||||
CleanOrderTopics.ORDER_AUDIT, // 查询事件使用审计主题
|
||||
MessageBuilder.withPayload(event).build()
|
||||
);
|
||||
|
||||
log.info("[ButtonEvent] 查询事件已发布:eventId={}, orderId={}, deviceId={}, message={}",
|
||||
event.get("eventId"), orderId, deviceId, message);
|
||||
} catch (Exception e) {
|
||||
log.error("[ButtonEvent] 发布查询事件失败:deviceId={}, orderId={}",
|
||||
deviceId, orderId, e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取设备按键配置
|
||||
* <p>
|
||||
* 从设备的 config 字段读取按键事件配置
|
||||
*
|
||||
* @param deviceId 设备ID
|
||||
* @return 按键配置,如果未配置返回 null
|
||||
*/
|
||||
private ButtonEventConfig getButtonConfig(Long deviceId) {
|
||||
try {
|
||||
IotDeviceDO device = deviceService.getDeviceFromCache(deviceId);
|
||||
if (device == null || device.getConfig() == null) {
|
||||
log.debug("[ButtonEvent] 设备不存在或无配置:deviceId={}", deviceId);
|
||||
return null;
|
||||
}
|
||||
|
||||
// 从设备 config JSON 中解析 buttonEvent 配置
|
||||
// 注意:使用 JsonUtils.parseObject 直接解析整个 config 为 Map,然后提取 buttonEvent
|
||||
// 避免 先转JSON字符串再解析回对象 的双重转换
|
||||
@SuppressWarnings("unchecked")
|
||||
Map<String, Object> configMap = JsonUtils.parseObject(device.getConfig(), Map.class);
|
||||
if (configMap == null || !configMap.containsKey("buttonEvent")) {
|
||||
log.debug("[ButtonEvent] 设备配置中无 buttonEvent:deviceId={}", deviceId);
|
||||
return null;
|
||||
}
|
||||
|
||||
// 将 buttonEvent 对象转为 JSON 字符串再解析为目标类型
|
||||
// TODO: 后续可优化为直接转换,避免序列化/反序列化开销
|
||||
Object buttonEventObj = configMap.get("buttonEvent");
|
||||
return JsonUtils.parseObject(JsonUtils.toJsonString(buttonEventObj), ButtonEventConfig.class);
|
||||
} catch (Exception e) {
|
||||
log.error("[ButtonEvent] 获取按键配置失败:deviceId={}", deviceId, e);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 解析按键ID
|
||||
* <p>
|
||||
* 支持两种格式:
|
||||
* 1. 属性上报:value 直接是按键ID(如 1)
|
||||
* 2. 事件上报:value 是 Map,包含 keyId 字段(如 {keyId: 1, keyState: 1})
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
private Integer parseButtonId(Object value) {
|
||||
if (value == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
// 事件上报格式:value 是 Map,包含 keyId 字段
|
||||
if (value instanceof Map) {
|
||||
Map<String, Object> map = (Map<String, Object>) value;
|
||||
Object keyId = map.get("keyId");
|
||||
if (keyId instanceof Number) {
|
||||
return ((Number) keyId).intValue();
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
// 属性上报格式:value 直接是按键ID
|
||||
if (value instanceof Number) {
|
||||
return ((Number) value).intValue();
|
||||
}
|
||||
|
||||
if (value instanceof String) {
|
||||
try {
|
||||
return Integer.parseInt((String) value);
|
||||
} catch (NumberFormatException e) {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取设备 Key(从 IoT 设备缓存获取 serialNumber)
|
||||
* <p>
|
||||
* deviceKey 在 ops_area_device_relation 表中是冗余字段,
|
||||
* 实际来源是 iot_device.serialNumber
|
||||
*
|
||||
* @param deviceId 设备ID
|
||||
* @return deviceKey(serialNumber),获取<E88EB7><E58F96>败返回 null
|
||||
*/
|
||||
private String getDeviceKey(Long deviceId) {
|
||||
try {
|
||||
IotDeviceDO device = deviceService.getDeviceFromCache(deviceId);
|
||||
if (device != null) {
|
||||
return device.getSerialNumber();
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.warn("[ButtonEvent] 获取 deviceKey 失败:deviceId={}", deviceId, e);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
@Resource
|
||||
private StringRedisTemplate stringRedisTemplate;
|
||||
}
|
||||
|
||||
@@ -1,384 +1,387 @@
|
||||
package com.viewsh.module.iot.service.rule.clean.processor;
|
||||
|
||||
import com.viewsh.framework.tenant.core.context.TenantContextHolder;
|
||||
import com.viewsh.module.iot.core.integration.constants.CleanOrderTopics;
|
||||
import com.viewsh.module.iot.core.integration.event.clean.CleanOrderAuditEvent;
|
||||
import com.viewsh.module.iot.core.integration.event.clean.CleanOrderCompleteEvent;
|
||||
import com.viewsh.module.iot.dal.dataobject.integration.clean.BeaconPresenceConfig;
|
||||
import com.viewsh.module.iot.dal.redis.clean.BadgeDeviceStatusRedisDAO;
|
||||
import com.viewsh.module.iot.dal.redis.clean.BeaconArrivedTimeRedisDAO;
|
||||
import com.viewsh.module.iot.dal.redis.clean.BeaconRssiWindowRedisDAO;
|
||||
import com.viewsh.module.iot.dal.redis.clean.SignalLossRedisDAO;
|
||||
import com.viewsh.module.iot.service.integration.clean.CleanOrderIntegrationConfigService;
|
||||
import com.viewsh.module.iot.service.device.IotDeviceService;
|
||||
import com.viewsh.module.iot.dal.dataobject.device.IotDeviceDO;
|
||||
import com.xxl.job.core.handler.annotation.XxlJob;
|
||||
import jakarta.annotation.Resource;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.rocketmq.spring.core.RocketMQTemplate;
|
||||
import com.viewsh.framework.tenant.core.util.TenantUtils;
|
||||
import org.springframework.messaging.support.MessageBuilder;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import org.springframework.data.redis.core.StringRedisTemplate;
|
||||
|
||||
/**
|
||||
* 信号丢失规则处理器
|
||||
* <p>
|
||||
* 定时检查离岗后N分钟则触发工单自动完成
|
||||
* 包含作业时长有效性校验,防止"打卡即走"作弊
|
||||
*
|
||||
* @author AI
|
||||
*/
|
||||
@Component
|
||||
@Slf4j
|
||||
public class SignalLossRuleProcessor {
|
||||
|
||||
@Resource
|
||||
private SignalLossRedisDAO signalLossRedisDAO;
|
||||
|
||||
@Resource
|
||||
private BeaconArrivedTimeRedisDAO arrivedTimeRedisDAO;
|
||||
|
||||
@Resource
|
||||
private BeaconRssiWindowRedisDAO windowRedisDAO;
|
||||
|
||||
@Resource
|
||||
private BadgeDeviceStatusRedisDAO badgeDeviceStatusRedisDAO;
|
||||
|
||||
@Resource
|
||||
private CleanOrderIntegrationConfigService configService;
|
||||
|
||||
@Resource
|
||||
private IotDeviceService deviceService;
|
||||
|
||||
@Resource
|
||||
private RocketMQTemplate rocketMQTemplate;
|
||||
|
||||
@Resource
|
||||
private StringRedisTemplate stringRedisTemplate;
|
||||
|
||||
/**
|
||||
* Redis Key 模式:扫描所有离岗记录
|
||||
*/
|
||||
private static final String LOSS_KEY_PATTERN = "iot:clean:signal:loss:*";
|
||||
|
||||
/**
|
||||
* 定时检查离岗超时(每 30 秒执行一次)
|
||||
* <p>
|
||||
* 遍历所有离岗记录,检查是否超过超时时间
|
||||
* 如果超过,则触发工单完成
|
||||
*/
|
||||
@XxlJob("signalLossCheckJob")
|
||||
public String checkLossTimeout() {
|
||||
// TODO: 设置租户上下文(单租户场景使用固定租户ID=1)
|
||||
// 确保后续发送的 RocketMQ 消息正确携带租户信息
|
||||
return doCheckLossTimeout();
|
||||
}
|
||||
|
||||
/**
|
||||
* 执行离岗超时检查
|
||||
*/
|
||||
private String doCheckLossTimeout() {
|
||||
try {
|
||||
log.debug("[SignalLoss] 开始检查离岗超时");
|
||||
|
||||
// 1. 扫描所有离岗记录的 Key
|
||||
Set<String> keys = stringRedisTemplate.keys(LOSS_KEY_PATTERN);
|
||||
|
||||
if (keys == null || keys.isEmpty()) {
|
||||
return "暂无";
|
||||
}
|
||||
|
||||
log.debug("[SignalLoss] 发现 {} 条离岗记录", keys.size());
|
||||
|
||||
// 2. 遍历每条记录
|
||||
for (String key : keys) {
|
||||
try {
|
||||
// 解析 deviceId 和 areaId
|
||||
// Key 格式:iot:clean:signal:loss:{deviceId}:{areaId}
|
||||
String[] parts = key.split(":");
|
||||
if (parts.length < 6) {
|
||||
continue;
|
||||
}
|
||||
|
||||
Long deviceId = Long.parseLong(parts[4]);
|
||||
Long areaId = Long.parseLong(parts[5]);
|
||||
|
||||
// 检查超时
|
||||
IotDeviceDO device = deviceService.getDevice(deviceId);
|
||||
if (device == null || device.getTenantId() == null) {
|
||||
log.warn("[SignalLoss] 璁惧涓嶅瓨鍦ㄦ垨缂哄皯绉熸埛淇℃伅: deviceId={}", deviceId);
|
||||
continue;
|
||||
}
|
||||
TenantUtils.execute(device.getTenantId(), () -> checkTimeoutForDevice(deviceId, areaId));
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("[SignalLoss] 处理离岗记录失败:key={}", key, e);
|
||||
}
|
||||
}
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("[SignalLoss] 检查离岗超时失败", e);
|
||||
return "ERROR: " + e.getMessage();
|
||||
}
|
||||
|
||||
return "SUCCESS";
|
||||
}
|
||||
|
||||
/**
|
||||
* 检查单个设备的离岗超时
|
||||
*/
|
||||
private void checkTimeoutForDevice(Long deviceId, Long areaId) {
|
||||
// P0 插队校验:检查当前工单是否属于正在检查的区域
|
||||
if (isSwitchingOrder(deviceId, areaId)) {
|
||||
log.debug("[SignalLoss][P0Interrupt] 检测到工单切换,跳过区域 {} 的超时检查",
|
||||
areaId);
|
||||
// 清理该区域的离岗记录(避免内存泄漏)
|
||||
signalLossRedisDAO.clearLossRecord(deviceId, areaId);
|
||||
return;
|
||||
}
|
||||
|
||||
// 1. 获取该区域的信标配置(从 BEACON 类型的设备获取)
|
||||
CleanOrderIntegrationConfigService.AreaDeviceConfigWrapper beaconConfigWrapper = configService
|
||||
.getConfigByAreaIdAndRelationType(areaId, "BEACON");
|
||||
|
||||
if (beaconConfigWrapper == null || beaconConfigWrapper.getConfig() == null ||
|
||||
beaconConfigWrapper.getConfig().getBeaconPresence() == null) {
|
||||
log.debug("[SignalLoss] 区域无信标配置:areaId={}", areaId);
|
||||
return;
|
||||
}
|
||||
|
||||
BeaconPresenceConfig.ExitConfig exitConfig = beaconConfigWrapper.getConfig().getBeaconPresence().getExit();
|
||||
|
||||
// 2. 获取 deviceKey(从 IoT 设备缓存获取 serialNumber)
|
||||
String badgeDeviceKey = getDeviceKey(deviceId);
|
||||
|
||||
// 3. 获取首次丢失时间
|
||||
Long firstLossTime = signalLossRedisDAO.getFirstLossTime(deviceId, areaId);
|
||||
|
||||
if (firstLossTime == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
// 3. 获取最后丢失时间
|
||||
Long lastLossTime = signalLossRedisDAO.getLastLossTime(deviceId, areaId);
|
||||
|
||||
if (lastLossTime == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
// 4. 检查是否超时
|
||||
long timeoutMillis = exitConfig.getLossTimeoutMinutes() * 60000L;
|
||||
long elapsedMillis = System.currentTimeMillis() - firstLossTime;
|
||||
|
||||
if (elapsedMillis < timeoutMillis) {
|
||||
log.debug("[SignalLoss] 未超时:deviceId={}, elapsed={}ms, timeout={}ms",
|
||||
deviceId, elapsedMillis, timeoutMillis);
|
||||
return;
|
||||
}
|
||||
|
||||
log.info("[SignalLoss] 检测到离岗超时:deviceId={}, areaId={}, elapsed={}ms",
|
||||
deviceId, areaId, elapsedMillis);
|
||||
|
||||
// 5. 有效性校验:检查作业时长
|
||||
Long arrivedAt = arrivedTimeRedisDAO.getArrivedTime(deviceId, areaId);
|
||||
|
||||
if (arrivedAt == null) {
|
||||
log.warn("[SignalLoss] 未找到到达时间记录:deviceId={}, areaId={}", deviceId, areaId);
|
||||
signalLossRedisDAO.clearLossRecord(deviceId, areaId);
|
||||
return;
|
||||
}
|
||||
|
||||
long durationMs = lastLossTime - arrivedAt;
|
||||
long minValidWorkMillis = exitConfig.getMinValidWorkMinutes() * 60000L;
|
||||
|
||||
// 6. 分支处理:有效 vs 无效作业
|
||||
// TODO 暂时取消作业时长不足抑制自动完成的逻辑,所有情况均触发完成
|
||||
// if (durationMs < minValidWorkMillis) {
|
||||
// // 作业时长不足,抑制完成
|
||||
// handleInvalidWork(deviceId, badgeDeviceKey, areaId,
|
||||
// durationMs, minValidWorkMillis, exitConfig);
|
||||
// } else {
|
||||
// 作业时长有效,触发完成
|
||||
handleTimeoutComplete(deviceId, badgeDeviceKey, areaId,
|
||||
durationMs, lastLossTime);
|
||||
// }
|
||||
}
|
||||
|
||||
/**
|
||||
* 处理无效作业(时长不足)
|
||||
*/
|
||||
private void handleInvalidWork(Long deviceId, String deviceKey, Long areaId,
|
||||
Long durationMs, Long minValidWorkMillis,
|
||||
BeaconPresenceConfig.ExitConfig exitConfig) {
|
||||
log.warn("[SignalLoss] 作业时长不足,抑制自动完成:deviceId={}, duration={}ms, minRequired={}ms",
|
||||
deviceId, durationMs, minValidWorkMillis);
|
||||
|
||||
// 1. 发送 TTS 警告
|
||||
publishTtsEvent(deviceId, "工单作业时长异常,请回到作业区域继续完成");
|
||||
|
||||
// 2. 发布审计日志
|
||||
Map<String, Object> data = new HashMap<>();
|
||||
data.put("durationMs", durationMs);
|
||||
data.put("minValidWorkMinutes", exitConfig.getMinValidWorkMinutes());
|
||||
data.put("shortageMs", minValidWorkMillis - durationMs);
|
||||
|
||||
// 获取当前工单ID
|
||||
BadgeDeviceStatusRedisDAO.OrderInfo currentOrder = badgeDeviceStatusRedisDAO.getCurrentOrder(deviceId);
|
||||
Long orderId = currentOrder != null ? currentOrder.getOrderId() : null;
|
||||
|
||||
publishAuditEvent("COMPLETE_SUPPRESSED_INVALID", deviceId, deviceKey, areaId, orderId,
|
||||
"作业时长不足,抑制自动完成", data);
|
||||
|
||||
// 3. 清除丢失记录(允许重新进入)
|
||||
signalLossRedisDAO.clearLossRecord(deviceId, areaId);
|
||||
|
||||
// 4. 清除无效作业标记(允许下次警告)
|
||||
signalLossRedisDAO.markInvalidWorkNotified(deviceId, areaId);
|
||||
}
|
||||
|
||||
/**
|
||||
* 处理超时自动完成
|
||||
*/
|
||||
private void handleTimeoutComplete(Long deviceId, String deviceKey, Long areaId,
|
||||
Long durationMs, Long lastLossTime) {
|
||||
log.info("[SignalLoss] 触发自动完成:deviceId={}, areaId={}, duration={}ms",
|
||||
deviceId, areaId, durationMs);
|
||||
|
||||
// 1. 获取当前工单
|
||||
BadgeDeviceStatusRedisDAO.OrderInfo currentOrder = badgeDeviceStatusRedisDAO.getCurrentOrder(deviceId);
|
||||
|
||||
if (currentOrder == null) {
|
||||
log.warn("[SignalLoss] 设备无当前工单:deviceId={}", deviceId);
|
||||
return;
|
||||
}
|
||||
|
||||
// 2. 构建触发数据
|
||||
Map<String, Object> triggerData = new HashMap<>();
|
||||
triggerData.put("durationMs", durationMs);
|
||||
triggerData.put("lastLossTime", lastLossTime);
|
||||
triggerData.put("completionReason", "SIGNAL_LOSS_TIMEOUT");
|
||||
|
||||
// 3. 发布完成事件
|
||||
try {
|
||||
CleanOrderCompleteEvent event = CleanOrderCompleteEvent.builder()
|
||||
.tenantId(TenantContextHolder.getTenantId())
|
||||
.eventId(java.util.UUID.randomUUID().toString())
|
||||
.orderType("CLEAN")
|
||||
.orderId(currentOrder.getOrderId())
|
||||
.deviceId(deviceId)
|
||||
.deviceKey(deviceKey)
|
||||
.areaId(areaId)
|
||||
.triggerSource("IOT_SIGNAL_LOSS")
|
||||
.triggerData(triggerData)
|
||||
.build();
|
||||
|
||||
rocketMQTemplate.syncSend(CleanOrderTopics.ORDER_COMPLETE, MessageBuilder.withPayload(event).build());
|
||||
|
||||
log.info("[SignalLoss] 发布完成事件:eventId={}, orderId={}, duration={}ms",
|
||||
event.getEventId(), currentOrder.getOrderId(), durationMs);
|
||||
} catch (Exception e) {
|
||||
log.error("[SignalLoss] 发布完成事件失败:deviceId={}, orderId={}",
|
||||
deviceId, currentOrder.getOrderId(), e);
|
||||
return;
|
||||
}
|
||||
|
||||
// 4. 发布审计日志
|
||||
Map<String, Object> auditData = new HashMap<>();
|
||||
auditData.put("durationMs", durationMs);
|
||||
auditData.put("lastLossTime", lastLossTime);
|
||||
|
||||
publishAuditEvent("BEACON_COMPLETE_REQUESTED", deviceId, deviceKey, areaId, currentOrder.getOrderId(),
|
||||
"信号丢失超时自动完成", auditData);
|
||||
|
||||
// 5. 清理 Redis 数据
|
||||
cleanupRedisData(deviceId, areaId);
|
||||
}
|
||||
|
||||
/**
|
||||
* 清理 Redis 数据
|
||||
*/
|
||||
private void cleanupRedisData(Long deviceId, Long areaId) {
|
||||
signalLossRedisDAO.clearLossRecord(deviceId, areaId);
|
||||
arrivedTimeRedisDAO.clearArrivedTime(deviceId, areaId);
|
||||
windowRedisDAO.clearWindow(deviceId, areaId);
|
||||
|
||||
log.debug("[SignalLoss] 清理 Redis 数据:deviceId={}, areaId={}", deviceId, areaId);
|
||||
}
|
||||
|
||||
/**
|
||||
* 发布审计事件
|
||||
*/
|
||||
private void publishAuditEvent(String auditType, Long deviceId, String deviceKey,
|
||||
Long areaId, Long orderId, String message, Map<String, Object> data) {
|
||||
try {
|
||||
CleanOrderAuditEvent event = CleanOrderAuditEvent.builder()
|
||||
.tenantId(TenantContextHolder.getTenantId())
|
||||
.eventId(java.util.UUID.randomUUID().toString())
|
||||
.auditType(auditType)
|
||||
.deviceId(deviceId)
|
||||
.deviceKey(deviceKey)
|
||||
.areaId(areaId)
|
||||
.orderId(orderId)
|
||||
.message(message)
|
||||
.data(data)
|
||||
.build();
|
||||
|
||||
rocketMQTemplate.syncSend(CleanOrderTopics.ORDER_AUDIT, MessageBuilder.withPayload(event).build());
|
||||
|
||||
log.debug("[SignalLoss] 发布审计事件:auditType={}, deviceId={}, orderId={}", auditType, deviceId, orderId);
|
||||
} catch (Exception e) {
|
||||
log.error("[SignalLoss] 发布审计事件失败:auditType={}, deviceId={}", auditType, deviceId, e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 发布 TTS 事件
|
||||
*/
|
||||
private void publishTtsEvent(Long deviceId, String text) {
|
||||
Map<String, Object> data = new HashMap<>();
|
||||
data.put("tts", text);
|
||||
data.put("timestamp", System.currentTimeMillis());
|
||||
|
||||
publishAuditEvent("TTS_REQUEST", deviceId, null, null, null, text, data);
|
||||
}
|
||||
|
||||
/**
|
||||
* 检查是否正在切换工单(P0 插队场景)
|
||||
* <p>
|
||||
* 如果当前工单的区域ID与正在检查的区域不一致,说明保洁员已切换到其他区域的工单
|
||||
*
|
||||
* @param deviceId 设备ID
|
||||
* @param areaId 正在检查的区域ID
|
||||
* @return true-工单切换场景,false-正常离岗场景
|
||||
*/
|
||||
private boolean isSwitchingOrder(Long deviceId, Long areaId) {
|
||||
BadgeDeviceStatusRedisDAO.OrderInfo currentOrder = badgeDeviceStatusRedisDAO.getCurrentOrder(deviceId);
|
||||
return currentOrder != null && !currentOrder.getAreaId().equals(areaId);
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取设备 Key(从 IoT 设备缓存获取 serialNumber)
|
||||
* <p>
|
||||
* deviceKey 在 ops_area_device_relation 表中是冗余字段,
|
||||
* 实际来源是 iot_device.serialNumber
|
||||
*
|
||||
* @param deviceId 设备ID
|
||||
* @return deviceKey(serialNumber),获取失败返回 null
|
||||
*/
|
||||
private String getDeviceKey(Long deviceId) {
|
||||
try {
|
||||
IotDeviceDO device = deviceService.getDeviceFromCache(deviceId);
|
||||
if (device != null) {
|
||||
return device.getSerialNumber();
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.warn("[SignalLoss] 获取 deviceKey 失败:deviceId={}", deviceId, e);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
}
|
||||
package com.viewsh.module.iot.service.rule.clean.processor;
|
||||
|
||||
import com.viewsh.framework.tenant.core.context.ProjectContextHolder;
|
||||
import com.viewsh.framework.tenant.core.context.TenantContextHolder;
|
||||
import com.viewsh.module.iot.core.integration.constants.CleanOrderTopics;
|
||||
import com.viewsh.module.iot.core.integration.event.clean.CleanOrderAuditEvent;
|
||||
import com.viewsh.module.iot.core.integration.event.clean.CleanOrderCompleteEvent;
|
||||
import com.viewsh.module.iot.dal.dataobject.integration.clean.BeaconPresenceConfig;
|
||||
import com.viewsh.module.iot.dal.redis.clean.BadgeDeviceStatusRedisDAO;
|
||||
import com.viewsh.module.iot.dal.redis.clean.BeaconArrivedTimeRedisDAO;
|
||||
import com.viewsh.module.iot.dal.redis.clean.BeaconRssiWindowRedisDAO;
|
||||
import com.viewsh.module.iot.dal.redis.clean.SignalLossRedisDAO;
|
||||
import com.viewsh.module.iot.service.integration.clean.CleanOrderIntegrationConfigService;
|
||||
import com.viewsh.module.iot.service.device.IotDeviceService;
|
||||
import com.viewsh.module.iot.dal.dataobject.device.IotDeviceDO;
|
||||
import com.xxl.job.core.handler.annotation.XxlJob;
|
||||
import jakarta.annotation.Resource;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.rocketmq.spring.core.RocketMQTemplate;
|
||||
import com.viewsh.framework.tenant.core.util.TenantUtils;
|
||||
import org.springframework.messaging.support.MessageBuilder;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import org.springframework.data.redis.core.StringRedisTemplate;
|
||||
|
||||
/**
|
||||
* 信号丢失规则处理器
|
||||
* <p>
|
||||
* 定时检查离岗后N分钟则触发工单自动完成
|
||||
* 包含作业时长有效性校验,防止"打卡即走"作弊
|
||||
*
|
||||
* @author AI
|
||||
*/
|
||||
@Component
|
||||
@Slf4j
|
||||
public class SignalLossRuleProcessor {
|
||||
|
||||
@Resource
|
||||
private SignalLossRedisDAO signalLossRedisDAO;
|
||||
|
||||
@Resource
|
||||
private BeaconArrivedTimeRedisDAO arrivedTimeRedisDAO;
|
||||
|
||||
@Resource
|
||||
private BeaconRssiWindowRedisDAO windowRedisDAO;
|
||||
|
||||
@Resource
|
||||
private BadgeDeviceStatusRedisDAO badgeDeviceStatusRedisDAO;
|
||||
|
||||
@Resource
|
||||
private CleanOrderIntegrationConfigService configService;
|
||||
|
||||
@Resource
|
||||
private IotDeviceService deviceService;
|
||||
|
||||
@Resource
|
||||
private RocketMQTemplate rocketMQTemplate;
|
||||
|
||||
@Resource
|
||||
private StringRedisTemplate stringRedisTemplate;
|
||||
|
||||
/**
|
||||
* Redis Key 模式:扫描所有离岗记录
|
||||
*/
|
||||
private static final String LOSS_KEY_PATTERN = "iot:clean:signal:loss:*";
|
||||
|
||||
/**
|
||||
* 定时检查离岗超时(每 30 秒执行一次)
|
||||
* <p>
|
||||
* 遍历所有离岗记录,检查是否超过超时时间
|
||||
* 如果超过,则触发工单完成
|
||||
*/
|
||||
@XxlJob("signalLossCheckJob")
|
||||
public String checkLossTimeout() {
|
||||
// TODO: 设置租户上下文(单租户场景使用固定租户ID=1)
|
||||
// 确保后续发送的 RocketMQ 消息正确携带租户信息
|
||||
return doCheckLossTimeout();
|
||||
}
|
||||
|
||||
/**
|
||||
* 执行离岗超时检查
|
||||
*/
|
||||
private String doCheckLossTimeout() {
|
||||
try {
|
||||
log.debug("[SignalLoss] 开始检查离岗超时");
|
||||
|
||||
// 1. 扫描所有离岗记录的 Key
|
||||
Set<String> keys = stringRedisTemplate.keys(LOSS_KEY_PATTERN);
|
||||
|
||||
if (keys == null || keys.isEmpty()) {
|
||||
return "暂无";
|
||||
}
|
||||
|
||||
log.debug("[SignalLoss] 发现 {} 条离岗记录", keys.size());
|
||||
|
||||
// 2. 遍历每条记录
|
||||
for (String key : keys) {
|
||||
try {
|
||||
// 解析 deviceId 和 areaId
|
||||
// Key 格式:iot:clean:signal:loss:{deviceId}:{areaId}
|
||||
String[] parts = key.split(":");
|
||||
if (parts.length < 6) {
|
||||
continue;
|
||||
}
|
||||
|
||||
Long deviceId = Long.parseLong(parts[4]);
|
||||
Long areaId = Long.parseLong(parts[5]);
|
||||
|
||||
// 检查超时
|
||||
IotDeviceDO device = deviceService.getDevice(deviceId);
|
||||
if (device == null || device.getTenantId() == null) {
|
||||
log.warn("[SignalLoss] 璁惧涓嶅瓨鍦ㄦ垨缂哄皯绉熸埛淇℃伅: deviceId={}", deviceId);
|
||||
continue;
|
||||
}
|
||||
TenantUtils.execute(device.getTenantId(), () -> checkTimeoutForDevice(deviceId, areaId));
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("[SignalLoss] 处理离岗记录失败:key={}", key, e);
|
||||
}
|
||||
}
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("[SignalLoss] 检查离岗超时失败", e);
|
||||
return "ERROR: " + e.getMessage();
|
||||
}
|
||||
|
||||
return "SUCCESS";
|
||||
}
|
||||
|
||||
/**
|
||||
* 检查单个设备的离岗超时
|
||||
*/
|
||||
private void checkTimeoutForDevice(Long deviceId, Long areaId) {
|
||||
// P0 插队校验:检查当前工单是否属于正在检查的区域
|
||||
if (isSwitchingOrder(deviceId, areaId)) {
|
||||
log.debug("[SignalLoss][P0Interrupt] 检测到工单切换,跳过区域 {} 的超时检查",
|
||||
areaId);
|
||||
// 清理该区域的离岗记录(避免内存泄漏)
|
||||
signalLossRedisDAO.clearLossRecord(deviceId, areaId);
|
||||
return;
|
||||
}
|
||||
|
||||
// 1. 获取该区域的信标配置(从 BEACON 类型的设备获取)
|
||||
CleanOrderIntegrationConfigService.AreaDeviceConfigWrapper beaconConfigWrapper = configService
|
||||
.getConfigByAreaIdAndRelationType(areaId, "BEACON");
|
||||
|
||||
if (beaconConfigWrapper == null || beaconConfigWrapper.getConfig() == null ||
|
||||
beaconConfigWrapper.getConfig().getBeaconPresence() == null) {
|
||||
log.debug("[SignalLoss] 区域无信标配置:areaId={}", areaId);
|
||||
return;
|
||||
}
|
||||
|
||||
BeaconPresenceConfig.ExitConfig exitConfig = beaconConfigWrapper.getConfig().getBeaconPresence().getExit();
|
||||
|
||||
// 2. 获取 deviceKey(从 IoT 设备缓存获取 serialNumber)
|
||||
String badgeDeviceKey = getDeviceKey(deviceId);
|
||||
|
||||
// 3. 获取首次丢失时间
|
||||
Long firstLossTime = signalLossRedisDAO.getFirstLossTime(deviceId, areaId);
|
||||
|
||||
if (firstLossTime == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
// 3. 获取最后丢失时间
|
||||
Long lastLossTime = signalLossRedisDAO.getLastLossTime(deviceId, areaId);
|
||||
|
||||
if (lastLossTime == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
// 4. 检查是否超时
|
||||
long timeoutMillis = exitConfig.getLossTimeoutMinutes() * 60000L;
|
||||
long elapsedMillis = System.currentTimeMillis() - firstLossTime;
|
||||
|
||||
if (elapsedMillis < timeoutMillis) {
|
||||
log.debug("[SignalLoss] 未超时:deviceId={}, elapsed={}ms, timeout={}ms",
|
||||
deviceId, elapsedMillis, timeoutMillis);
|
||||
return;
|
||||
}
|
||||
|
||||
log.info("[SignalLoss] 检测到离岗超时:deviceId={}, areaId={}, elapsed={}ms",
|
||||
deviceId, areaId, elapsedMillis);
|
||||
|
||||
// 5. 有效性校验:检查作业时长
|
||||
Long arrivedAt = arrivedTimeRedisDAO.getArrivedTime(deviceId, areaId);
|
||||
|
||||
if (arrivedAt == null) {
|
||||
log.warn("[SignalLoss] 未找到到达时间记录:deviceId={}, areaId={}", deviceId, areaId);
|
||||
signalLossRedisDAO.clearLossRecord(deviceId, areaId);
|
||||
return;
|
||||
}
|
||||
|
||||
long durationMs = lastLossTime - arrivedAt;
|
||||
long minValidWorkMillis = exitConfig.getMinValidWorkMinutes() * 60000L;
|
||||
|
||||
// 6. 分支处理:有效 vs 无效作业
|
||||
// TODO 暂时取消作业时长不足抑制自动完成的逻辑,所有情况均触发完成
|
||||
// if (durationMs < minValidWorkMillis) {
|
||||
// // 作业时长不足,抑制完成
|
||||
// handleInvalidWork(deviceId, badgeDeviceKey, areaId,
|
||||
// durationMs, minValidWorkMillis, exitConfig);
|
||||
// } else {
|
||||
// 作业时长有效,触发完成
|
||||
handleTimeoutComplete(deviceId, badgeDeviceKey, areaId,
|
||||
durationMs, lastLossTime);
|
||||
// }
|
||||
}
|
||||
|
||||
/**
|
||||
* 处理无效作业(时长不足)
|
||||
*/
|
||||
private void handleInvalidWork(Long deviceId, String deviceKey, Long areaId,
|
||||
Long durationMs, Long minValidWorkMillis,
|
||||
BeaconPresenceConfig.ExitConfig exitConfig) {
|
||||
log.warn("[SignalLoss] 作业时长不足,抑制自动完成:deviceId={}, duration={}ms, minRequired={}ms",
|
||||
deviceId, durationMs, minValidWorkMillis);
|
||||
|
||||
// 1. 发送 TTS 警告
|
||||
publishTtsEvent(deviceId, "工单作业时长异常,请回到作业区域继续完成");
|
||||
|
||||
// 2. 发布审计日志
|
||||
Map<String, Object> data = new HashMap<>();
|
||||
data.put("durationMs", durationMs);
|
||||
data.put("minValidWorkMinutes", exitConfig.getMinValidWorkMinutes());
|
||||
data.put("shortageMs", minValidWorkMillis - durationMs);
|
||||
|
||||
// 获取当前工单ID
|
||||
BadgeDeviceStatusRedisDAO.OrderInfo currentOrder = badgeDeviceStatusRedisDAO.getCurrentOrder(deviceId);
|
||||
Long orderId = currentOrder != null ? currentOrder.getOrderId() : null;
|
||||
|
||||
publishAuditEvent("COMPLETE_SUPPRESSED_INVALID", deviceId, deviceKey, areaId, orderId,
|
||||
"作业时长不足,抑制自动完成", data);
|
||||
|
||||
// 3. 清除丢失记录(允许重新进入)
|
||||
signalLossRedisDAO.clearLossRecord(deviceId, areaId);
|
||||
|
||||
// 4. 清除无效作业标记(允许下次警告)
|
||||
signalLossRedisDAO.markInvalidWorkNotified(deviceId, areaId);
|
||||
}
|
||||
|
||||
/**
|
||||
* 处理超时自动完成
|
||||
*/
|
||||
private void handleTimeoutComplete(Long deviceId, String deviceKey, Long areaId,
|
||||
Long durationMs, Long lastLossTime) {
|
||||
log.info("[SignalLoss] 触发自动完成:deviceId={}, areaId={}, duration={}ms",
|
||||
deviceId, areaId, durationMs);
|
||||
|
||||
// 1. 获取当前工单
|
||||
BadgeDeviceStatusRedisDAO.OrderInfo currentOrder = badgeDeviceStatusRedisDAO.getCurrentOrder(deviceId);
|
||||
|
||||
if (currentOrder == null) {
|
||||
log.warn("[SignalLoss] 设备无当前工单:deviceId={}", deviceId);
|
||||
return;
|
||||
}
|
||||
|
||||
// 2. 构建触发数据
|
||||
Map<String, Object> triggerData = new HashMap<>();
|
||||
triggerData.put("durationMs", durationMs);
|
||||
triggerData.put("lastLossTime", lastLossTime);
|
||||
triggerData.put("completionReason", "SIGNAL_LOSS_TIMEOUT");
|
||||
|
||||
// 3. 发布完成事件
|
||||
try {
|
||||
CleanOrderCompleteEvent event = CleanOrderCompleteEvent.builder()
|
||||
.tenantId(TenantContextHolder.getTenantId())
|
||||
.projectId(ProjectContextHolder.getProjectId())
|
||||
.eventId(java.util.UUID.randomUUID().toString())
|
||||
.orderType("CLEAN")
|
||||
.orderId(currentOrder.getOrderId())
|
||||
.deviceId(deviceId)
|
||||
.deviceKey(deviceKey)
|
||||
.areaId(areaId)
|
||||
.triggerSource("IOT_SIGNAL_LOSS")
|
||||
.triggerData(triggerData)
|
||||
.build();
|
||||
|
||||
rocketMQTemplate.syncSend(CleanOrderTopics.ORDER_COMPLETE, MessageBuilder.withPayload(event).build());
|
||||
|
||||
log.info("[SignalLoss] 发布完成事件:eventId={}, orderId={}, duration={}ms",
|
||||
event.getEventId(), currentOrder.getOrderId(), durationMs);
|
||||
} catch (Exception e) {
|
||||
log.error("[SignalLoss] 发布完成事件失败:deviceId={}, orderId={}",
|
||||
deviceId, currentOrder.getOrderId(), e);
|
||||
return;
|
||||
}
|
||||
|
||||
// 4. 发布审计日志
|
||||
Map<String, Object> auditData = new HashMap<>();
|
||||
auditData.put("durationMs", durationMs);
|
||||
auditData.put("lastLossTime", lastLossTime);
|
||||
|
||||
publishAuditEvent("BEACON_COMPLETE_REQUESTED", deviceId, deviceKey, areaId, currentOrder.getOrderId(),
|
||||
"信号丢失超时自动完成", auditData);
|
||||
|
||||
// 5. 清理 Redis 数据
|
||||
cleanupRedisData(deviceId, areaId);
|
||||
}
|
||||
|
||||
/**
|
||||
* 清理 Redis 数据
|
||||
*/
|
||||
private void cleanupRedisData(Long deviceId, Long areaId) {
|
||||
signalLossRedisDAO.clearLossRecord(deviceId, areaId);
|
||||
arrivedTimeRedisDAO.clearArrivedTime(deviceId, areaId);
|
||||
windowRedisDAO.clearWindow(deviceId, areaId);
|
||||
|
||||
log.debug("[SignalLoss] 清理 Redis 数据:deviceId={}, areaId={}", deviceId, areaId);
|
||||
}
|
||||
|
||||
/**
|
||||
* 发布审计事件
|
||||
*/
|
||||
private void publishAuditEvent(String auditType, Long deviceId, String deviceKey,
|
||||
Long areaId, Long orderId, String message, Map<String, Object> data) {
|
||||
try {
|
||||
CleanOrderAuditEvent event = CleanOrderAuditEvent.builder()
|
||||
.tenantId(TenantContextHolder.getTenantId())
|
||||
.projectId(ProjectContextHolder.getProjectId())
|
||||
.eventId(java.util.UUID.randomUUID().toString())
|
||||
.auditType(auditType)
|
||||
.deviceId(deviceId)
|
||||
.deviceKey(deviceKey)
|
||||
.areaId(areaId)
|
||||
.orderId(orderId)
|
||||
.message(message)
|
||||
.data(data)
|
||||
.build();
|
||||
|
||||
rocketMQTemplate.syncSend(CleanOrderTopics.ORDER_AUDIT, MessageBuilder.withPayload(event).build());
|
||||
|
||||
log.debug("[SignalLoss] 发布审计事件:auditType={}, deviceId={}, orderId={}", auditType, deviceId, orderId);
|
||||
} catch (Exception e) {
|
||||
log.error("[SignalLoss] 发布审计事件失败:auditType={}, deviceId={}", auditType, deviceId, e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 发布 TTS 事件
|
||||
*/
|
||||
private void publishTtsEvent(Long deviceId, String text) {
|
||||
Map<String, Object> data = new HashMap<>();
|
||||
data.put("tts", text);
|
||||
data.put("timestamp", System.currentTimeMillis());
|
||||
|
||||
publishAuditEvent("TTS_REQUEST", deviceId, null, null, null, text, data);
|
||||
}
|
||||
|
||||
/**
|
||||
* 检查是否正在切换工单(P0 插队场景)
|
||||
* <p>
|
||||
* 如果当前工单的区域ID与正在检查的区域不一致,说明保洁员已切换到其他区域的工单
|
||||
*
|
||||
* @param deviceId 设备ID
|
||||
* @param areaId 正在检查的区域ID
|
||||
* @return true-工单切换场景,false-正常离岗场景
|
||||
*/
|
||||
private boolean isSwitchingOrder(Long deviceId, Long areaId) {
|
||||
BadgeDeviceStatusRedisDAO.OrderInfo currentOrder = badgeDeviceStatusRedisDAO.getCurrentOrder(deviceId);
|
||||
return currentOrder != null && !currentOrder.getAreaId().equals(areaId);
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取设备 Key(从 IoT 设备缓存获取 serialNumber)
|
||||
* <p>
|
||||
* deviceKey 在 ops_area_device_relation 表中是冗余字段,
|
||||
* 实际来源是 iot_device.serialNumber
|
||||
*
|
||||
* @param deviceId 设备ID
|
||||
* @return deviceKey(serialNumber),获取失败返回 null
|
||||
*/
|
||||
private String getDeviceKey(Long deviceId) {
|
||||
try {
|
||||
IotDeviceDO device = deviceService.getDeviceFromCache(deviceId);
|
||||
if (device != null) {
|
||||
return device.getSerialNumber();
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.warn("[SignalLoss] 获取 deviceKey 失败:deviceId={}", deviceId, e);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,260 +1,262 @@
|
||||
package com.viewsh.module.iot.service.rule.clean.processor;
|
||||
|
||||
import com.viewsh.framework.tenant.core.context.TenantContextHolder;
|
||||
import com.viewsh.module.iot.core.integration.constants.CleanOrderTopics;
|
||||
import com.viewsh.module.iot.core.integration.event.clean.CleanOrderCreateEvent;
|
||||
import com.viewsh.module.iot.dal.dataobject.integration.clean.TrafficThresholdConfig;
|
||||
import com.viewsh.module.iot.dal.redis.clean.TrafficCounterRedisDAO;
|
||||
import com.viewsh.module.iot.service.integration.clean.CleanOrderIntegrationConfigService;
|
||||
import jakarta.annotation.Resource;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.rocketmq.spring.core.RocketMQTemplate;
|
||||
import org.springframework.data.redis.core.StringRedisTemplate;
|
||||
import org.springframework.messaging.support.MessageBuilder;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.time.LocalDate;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/**
|
||||
* 客流阈值规则处理器
|
||||
* <p>
|
||||
* 监听设备属性上报,将增量原子累加到 Redis 阈值计数器,
|
||||
* 达到阈值后触发工单创建事件并重置计数器。
|
||||
* <p>
|
||||
* 同时维护当日累积统计(不因工单触发而重置),用于统计报表。
|
||||
*
|
||||
* @author AI
|
||||
*/
|
||||
@Component
|
||||
@Slf4j
|
||||
public class TrafficThresholdRuleProcessor {
|
||||
|
||||
@Resource
|
||||
private CleanOrderIntegrationConfigService configService;
|
||||
|
||||
@Resource
|
||||
private TrafficCounterRedisDAO trafficCounterRedisDAO;
|
||||
|
||||
@Resource
|
||||
private RocketMQTemplate rocketMQTemplate;
|
||||
|
||||
@Resource
|
||||
private StringRedisTemplate stringRedisTemplate;
|
||||
|
||||
/**
|
||||
* 处理客流属性上报
|
||||
* <p>
|
||||
* 支持 people_in 和 people_out 两个属性:
|
||||
* - people_in:累加到当日统计 + 阈值计数器(需配置)
|
||||
* - people_out:累加到当日统计
|
||||
* <p>
|
||||
* 支持两种上报模式(通过 configData.trafficThreshold.reportMode 配置):
|
||||
* - INCREMENTAL(默认):上报值直接作为增量
|
||||
* - CUMULATIVE:上报值为累计值,自动计算差值得到增量
|
||||
*
|
||||
* @param deviceId 设备ID
|
||||
* @param identifier 属性标识符(people_in 或 people_out)
|
||||
* @param propertyValue 属性值(增量或累计值,取决于 reportMode)
|
||||
*/
|
||||
public void processPropertyChange(Long deviceId, String identifier, Object propertyValue) {
|
||||
// 1. 校验属性类型
|
||||
if (!"people_in".equals(identifier) && !"people_out".equals(identifier)) {
|
||||
return;
|
||||
}
|
||||
|
||||
log.debug("[TrafficThreshold] 收到客流属性:deviceId={}, identifier={}, value={}",
|
||||
deviceId, identifier, propertyValue);
|
||||
|
||||
// 2. 解析原始值
|
||||
Long rawValue = parseTrafficCount(propertyValue);
|
||||
if (rawValue == null || rawValue <= 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
// 3. 获取配置,判断上报模式
|
||||
CleanOrderIntegrationConfigService.AreaDeviceConfigWrapper configWrapper = getConfigWrapper(deviceId);
|
||||
TrafficThresholdConfig thresholdConfig = resolveThresholdConfig(configWrapper);
|
||||
|
||||
// 4. 根据上报模式计算增量
|
||||
long increment;
|
||||
if (thresholdConfig != null && thresholdConfig.isCumulative()) {
|
||||
increment = resolveIncrement(deviceId, identifier, rawValue);
|
||||
} else {
|
||||
increment = rawValue;
|
||||
}
|
||||
if (increment <= 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
// 5. 累加到当日统计(统计与工单触发解耦)
|
||||
LocalDate today = LocalDate.now();
|
||||
if ("people_in".equals(identifier)) {
|
||||
trafficCounterRedisDAO.incrementDaily(deviceId, today, increment, 0);
|
||||
} else {
|
||||
trafficCounterRedisDAO.incrementDaily(deviceId, today, 0, increment);
|
||||
}
|
||||
log.debug("[TrafficThreshold] 当日统计累加:deviceId={}, identifier={}, increment={}",
|
||||
deviceId, identifier, increment);
|
||||
|
||||
// 6. 以下为工单触发逻辑,仅 people_in 参与
|
||||
if (!"people_in".equals(identifier)) {
|
||||
return;
|
||||
}
|
||||
if (thresholdConfig == null || !Boolean.TRUE.equals(thresholdConfig.getAutoCreateOrder())) {
|
||||
return;
|
||||
}
|
||||
|
||||
Long areaId = configWrapper.getAreaId();
|
||||
handlePeopleIn(deviceId, areaId, increment, today, thresholdConfig, configWrapper);
|
||||
}
|
||||
|
||||
/**
|
||||
* 从配置包装器中提取客流阈值配置
|
||||
*
|
||||
* @return 阈值配置,无配置时返回 null
|
||||
*/
|
||||
private TrafficThresholdConfig resolveThresholdConfig(
|
||||
CleanOrderIntegrationConfigService.AreaDeviceConfigWrapper configWrapper) {
|
||||
if (configWrapper == null || configWrapper.getConfig() == null) {
|
||||
return null;
|
||||
}
|
||||
return configWrapper.getConfig().getTrafficThreshold();
|
||||
}
|
||||
|
||||
/**
|
||||
* 累计值转增量
|
||||
* <p>
|
||||
* 通过 Redis 存储上次上报的累计值,计算差值得到本次增量。
|
||||
* 处理三种场景:首次上报、正常递增、设备重启归零。
|
||||
*
|
||||
* @param deviceId 设备ID
|
||||
* @param identifier 属性标识符
|
||||
* @param currentValue 本次上报的累计值
|
||||
* @return 增量值;首次上报返回 0
|
||||
*/
|
||||
private long resolveIncrement(Long deviceId, String identifier, long currentValue) {
|
||||
Long lastValue = trafficCounterRedisDAO.getLastCumulativeValue(deviceId, identifier);
|
||||
|
||||
// 无论是否能算出增量,都记录当前值
|
||||
trafficCounterRedisDAO.setLastCumulativeValue(deviceId, identifier, currentValue);
|
||||
|
||||
if (lastValue == null) {
|
||||
// 首次上报:无历史基准,不计入统计
|
||||
log.info("[TrafficThreshold] 累计值设备首次上报,建立基准:deviceId={}, identifier={}, value={}",
|
||||
deviceId, identifier, currentValue);
|
||||
return 0;
|
||||
}
|
||||
|
||||
if (currentValue >= lastValue) {
|
||||
return currentValue - lastValue;
|
||||
}
|
||||
|
||||
// currentValue < lastValue → 设备重启归零
|
||||
log.info("[TrafficThreshold] 检测到设备重启:deviceId={}, identifier={}, last={}, current={}",
|
||||
deviceId, identifier, lastValue, currentValue);
|
||||
return currentValue;
|
||||
}
|
||||
|
||||
/**
|
||||
* 处理 people_in 增量
|
||||
*/
|
||||
private void handlePeopleIn(Long deviceId, Long areaId, long increment, LocalDate today,
|
||||
TrafficThresholdConfig thresholdConfig,
|
||||
CleanOrderIntegrationConfigService.AreaDeviceConfigWrapper configWrapper) {
|
||||
// 1. 原子累加到阈值计数器,返回累积值(当日统计已在 processPropertyChange 中完成)
|
||||
Long accumulated = trafficCounterRedisDAO.incrementThreshold(deviceId, areaId, increment);
|
||||
|
||||
log.debug("[TrafficThreshold] people_in 阈值累加:deviceId={}, areaId={}, increment={}, accumulated={}, threshold={}",
|
||||
deviceId, areaId, increment, accumulated, thresholdConfig.getThreshold());
|
||||
|
||||
// 3. 阈值判定
|
||||
if (accumulated < thresholdConfig.getThreshold()) {
|
||||
return; // 未达标
|
||||
}
|
||||
|
||||
// 4. 防重复检查(使用 Redis 分布式锁)
|
||||
String lockKey = String.format("iot:clean:traffic:lock:%s:%s", deviceId, areaId);
|
||||
Boolean locked = stringRedisTemplate.opsForValue()
|
||||
.setIfAbsent(lockKey, "1", thresholdConfig.getTimeWindowSeconds(), TimeUnit.SECONDS);
|
||||
|
||||
if (Boolean.FALSE.equals(locked)) {
|
||||
log.info("[TrafficThreshold] 防重复触发:deviceId={}, areaId={}", deviceId, areaId);
|
||||
return;
|
||||
}
|
||||
|
||||
// 5. 发布工单创建事件
|
||||
// 注意:阈值计数器将在 Ops 模块工单创建成功后重置,确保事务一致性
|
||||
publishCreateEvent(configWrapper, accumulated, thresholdConfig.getThreshold());
|
||||
}
|
||||
|
||||
/**
|
||||
* 发布工单创建事件
|
||||
*/
|
||||
private void publishCreateEvent(CleanOrderIntegrationConfigService.AreaDeviceConfigWrapper configWrapper,
|
||||
Long accumulated, Integer threshold) {
|
||||
try {
|
||||
CleanOrderCreateEvent event = CleanOrderCreateEvent.builder()
|
||||
.tenantId(TenantContextHolder.getTenantId())
|
||||
.orderType("CLEAN")
|
||||
.areaId(configWrapper.getAreaId())
|
||||
.triggerSource("IOT_TRAFFIC")
|
||||
.triggerDeviceId(configWrapper.getDeviceId())
|
||||
.triggerDeviceKey(configWrapper.getDeviceKey())
|
||||
.priority(configWrapper.getConfig().getTrafficThreshold().getOrderPriority())
|
||||
.triggerData(buildTriggerData(accumulated, threshold))
|
||||
.build();
|
||||
|
||||
rocketMQTemplate.syncSend(CleanOrderTopics.ORDER_CREATE, MessageBuilder.withPayload(event).build());
|
||||
|
||||
log.info("[TrafficThreshold] 发布工单创建事件:eventId={}, areaId={}, accumulated={}, threshold={}",
|
||||
event.getEventId(), configWrapper.getAreaId(), accumulated, threshold);
|
||||
} catch (Exception e) {
|
||||
log.error("[TrafficThreshold] 发布工单创建事件失败:deviceId={}, areaId={}",
|
||||
configWrapper.getDeviceId(), configWrapper.getAreaId(), e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 构建触发数据
|
||||
*/
|
||||
private Map<String, Object> buildTriggerData(Long accumulated, Integer threshold) {
|
||||
Map<String, Object> data = new HashMap<>();
|
||||
data.put("accumulated", accumulated);
|
||||
data.put("threshold", threshold);
|
||||
data.put("exceededCount", accumulated - threshold);
|
||||
return data;
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取配置包装器
|
||||
*/
|
||||
private CleanOrderIntegrationConfigService.AreaDeviceConfigWrapper getConfigWrapper(Long deviceId) {
|
||||
return configService.getConfigWrapperByDeviceId(deviceId);
|
||||
}
|
||||
|
||||
/**
|
||||
* 解析客流计数值
|
||||
*/
|
||||
private Long parseTrafficCount(Object value) {
|
||||
if (value == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
if (value instanceof Number) {
|
||||
return ((Number) value).longValue();
|
||||
}
|
||||
|
||||
if (value instanceof String) {
|
||||
try {
|
||||
return Long.parseLong((String) value);
|
||||
} catch (NumberFormatException e) {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
}
|
||||
package com.viewsh.module.iot.service.rule.clean.processor;
|
||||
|
||||
import com.viewsh.framework.tenant.core.context.TenantContextHolder;
|
||||
import com.viewsh.framework.tenant.core.context.ProjectContextHolder;
|
||||
import com.viewsh.module.iot.core.integration.constants.CleanOrderTopics;
|
||||
import com.viewsh.module.iot.core.integration.event.clean.CleanOrderCreateEvent;
|
||||
import com.viewsh.module.iot.dal.dataobject.integration.clean.TrafficThresholdConfig;
|
||||
import com.viewsh.module.iot.dal.redis.clean.TrafficCounterRedisDAO;
|
||||
import com.viewsh.module.iot.service.integration.clean.CleanOrderIntegrationConfigService;
|
||||
import jakarta.annotation.Resource;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.rocketmq.spring.core.RocketMQTemplate;
|
||||
import org.springframework.data.redis.core.StringRedisTemplate;
|
||||
import org.springframework.messaging.support.MessageBuilder;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.time.LocalDate;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/**
|
||||
* 客流阈值规则处理器
|
||||
* <p>
|
||||
* 监听设备属性上报,将增量原子累加到 Redis 阈值计数器,
|
||||
* 达到阈值后触发工单创建事件并重置计数器。
|
||||
* <p>
|
||||
* 同时维护当日累积统计(不因工单触发而重置),用于统计报表。
|
||||
*
|
||||
* @author AI
|
||||
*/
|
||||
@Component
|
||||
@Slf4j
|
||||
public class TrafficThresholdRuleProcessor {
|
||||
|
||||
@Resource
|
||||
private CleanOrderIntegrationConfigService configService;
|
||||
|
||||
@Resource
|
||||
private TrafficCounterRedisDAO trafficCounterRedisDAO;
|
||||
|
||||
@Resource
|
||||
private RocketMQTemplate rocketMQTemplate;
|
||||
|
||||
@Resource
|
||||
private StringRedisTemplate stringRedisTemplate;
|
||||
|
||||
/**
|
||||
* 处理客流属性上报
|
||||
* <p>
|
||||
* 支持 people_in 和 people_out 两个属性:
|
||||
* - people_in:累加到当日统计 + 阈值计数器(需配置)
|
||||
* - people_out:累加到当日统计
|
||||
* <p>
|
||||
* 支持两种上报模式(通过 configData.trafficThreshold.reportMode 配置):
|
||||
* - INCREMENTAL(默认):上报值直接作为增量
|
||||
* - CUMULATIVE:上报值为累计值,自动计算差值得到增量
|
||||
*
|
||||
* @param deviceId 设备ID
|
||||
* @param identifier 属性标识符(people_in 或 people_out)
|
||||
* @param propertyValue 属性值(增量或累计值,取决于 reportMode)
|
||||
*/
|
||||
public void processPropertyChange(Long deviceId, String identifier, Object propertyValue) {
|
||||
// 1. 校验属性类型
|
||||
if (!"people_in".equals(identifier) && !"people_out".equals(identifier)) {
|
||||
return;
|
||||
}
|
||||
|
||||
log.debug("[TrafficThreshold] 收到客流属性:deviceId={}, identifier={}, value={}",
|
||||
deviceId, identifier, propertyValue);
|
||||
|
||||
// 2. 解析原始值
|
||||
Long rawValue = parseTrafficCount(propertyValue);
|
||||
if (rawValue == null || rawValue <= 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
// 3. 获取配置,判断上报模式
|
||||
CleanOrderIntegrationConfigService.AreaDeviceConfigWrapper configWrapper = getConfigWrapper(deviceId);
|
||||
TrafficThresholdConfig thresholdConfig = resolveThresholdConfig(configWrapper);
|
||||
|
||||
// 4. 根据上报模式计算增量
|
||||
long increment;
|
||||
if (thresholdConfig != null && thresholdConfig.isCumulative()) {
|
||||
increment = resolveIncrement(deviceId, identifier, rawValue);
|
||||
} else {
|
||||
increment = rawValue;
|
||||
}
|
||||
if (increment <= 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
// 5. 累加到当日统计(统计与工单触发解耦)
|
||||
LocalDate today = LocalDate.now();
|
||||
if ("people_in".equals(identifier)) {
|
||||
trafficCounterRedisDAO.incrementDaily(deviceId, today, increment, 0);
|
||||
} else {
|
||||
trafficCounterRedisDAO.incrementDaily(deviceId, today, 0, increment);
|
||||
}
|
||||
log.debug("[TrafficThreshold] 当日统计累加:deviceId={}, identifier={}, increment={}",
|
||||
deviceId, identifier, increment);
|
||||
|
||||
// 6. 以下为工单触发逻辑,仅 people_in 参与
|
||||
if (!"people_in".equals(identifier)) {
|
||||
return;
|
||||
}
|
||||
if (thresholdConfig == null || !Boolean.TRUE.equals(thresholdConfig.getAutoCreateOrder())) {
|
||||
return;
|
||||
}
|
||||
|
||||
Long areaId = configWrapper.getAreaId();
|
||||
handlePeopleIn(deviceId, areaId, increment, today, thresholdConfig, configWrapper);
|
||||
}
|
||||
|
||||
/**
|
||||
* 从配置包装器中提取客流阈值配置
|
||||
*
|
||||
* @return 阈值配置,无配置时返回 null
|
||||
*/
|
||||
private TrafficThresholdConfig resolveThresholdConfig(
|
||||
CleanOrderIntegrationConfigService.AreaDeviceConfigWrapper configWrapper) {
|
||||
if (configWrapper == null || configWrapper.getConfig() == null) {
|
||||
return null;
|
||||
}
|
||||
return configWrapper.getConfig().getTrafficThreshold();
|
||||
}
|
||||
|
||||
/**
|
||||
* 累计值转增量
|
||||
* <p>
|
||||
* 通过 Redis 存储上次上报的累计值,计算差值得到本次增量。
|
||||
* 处理三种场景:首次上报、正常递增、设备重启归零。
|
||||
*
|
||||
* @param deviceId 设备ID
|
||||
* @param identifier 属性标识符
|
||||
* @param currentValue 本次上报的累计值
|
||||
* @return 增量值;首次上报返回 0
|
||||
*/
|
||||
private long resolveIncrement(Long deviceId, String identifier, long currentValue) {
|
||||
Long lastValue = trafficCounterRedisDAO.getLastCumulativeValue(deviceId, identifier);
|
||||
|
||||
// 无论是否能算出增量,都记录当前值
|
||||
trafficCounterRedisDAO.setLastCumulativeValue(deviceId, identifier, currentValue);
|
||||
|
||||
if (lastValue == null) {
|
||||
// 首次上报:无历史基准,不计入统计
|
||||
log.info("[TrafficThreshold] 累计值设备首次上报,建立基准:deviceId={}, identifier={}, value={}",
|
||||
deviceId, identifier, currentValue);
|
||||
return 0;
|
||||
}
|
||||
|
||||
if (currentValue >= lastValue) {
|
||||
return currentValue - lastValue;
|
||||
}
|
||||
|
||||
// currentValue < lastValue → 设备重启归零
|
||||
log.info("[TrafficThreshold] 检测到设备重启:deviceId={}, identifier={}, last={}, current={}",
|
||||
deviceId, identifier, lastValue, currentValue);
|
||||
return currentValue;
|
||||
}
|
||||
|
||||
/**
|
||||
* 处理 people_in 增量
|
||||
*/
|
||||
private void handlePeopleIn(Long deviceId, Long areaId, long increment, LocalDate today,
|
||||
TrafficThresholdConfig thresholdConfig,
|
||||
CleanOrderIntegrationConfigService.AreaDeviceConfigWrapper configWrapper) {
|
||||
// 1. 原子累加到阈值计数器,返回累积值(当日统计已在 processPropertyChange 中完成)
|
||||
Long accumulated = trafficCounterRedisDAO.incrementThreshold(deviceId, areaId, increment);
|
||||
|
||||
log.debug("[TrafficThreshold] people_in 阈值累加:deviceId={}, areaId={}, increment={}, accumulated={}, threshold={}",
|
||||
deviceId, areaId, increment, accumulated, thresholdConfig.getThreshold());
|
||||
|
||||
// 3. 阈值判定
|
||||
if (accumulated < thresholdConfig.getThreshold()) {
|
||||
return; // 未达标
|
||||
}
|
||||
|
||||
// 4. 防重复检查(使用 Redis 分布式锁)
|
||||
String lockKey = String.format("iot:clean:traffic:lock:%s:%s", deviceId, areaId);
|
||||
Boolean locked = stringRedisTemplate.opsForValue()
|
||||
.setIfAbsent(lockKey, "1", thresholdConfig.getTimeWindowSeconds(), TimeUnit.SECONDS);
|
||||
|
||||
if (Boolean.FALSE.equals(locked)) {
|
||||
log.info("[TrafficThreshold] 防重复触发:deviceId={}, areaId={}", deviceId, areaId);
|
||||
return;
|
||||
}
|
||||
|
||||
// 5. 发布工单创建事件
|
||||
// 注意:阈值计数器将在 Ops 模块工单创建成功后重置,确保事务一致性
|
||||
publishCreateEvent(configWrapper, accumulated, thresholdConfig.getThreshold());
|
||||
}
|
||||
|
||||
/**
|
||||
* 发布工单创建事件
|
||||
*/
|
||||
private void publishCreateEvent(CleanOrderIntegrationConfigService.AreaDeviceConfigWrapper configWrapper,
|
||||
Long accumulated, Integer threshold) {
|
||||
try {
|
||||
CleanOrderCreateEvent event = CleanOrderCreateEvent.builder()
|
||||
.tenantId(TenantContextHolder.getTenantId())
|
||||
.projectId(ProjectContextHolder.getProjectId())
|
||||
.orderType("CLEAN")
|
||||
.areaId(configWrapper.getAreaId())
|
||||
.triggerSource("IOT_TRAFFIC")
|
||||
.triggerDeviceId(configWrapper.getDeviceId())
|
||||
.triggerDeviceKey(configWrapper.getDeviceKey())
|
||||
.priority(configWrapper.getConfig().getTrafficThreshold().getOrderPriority())
|
||||
.triggerData(buildTriggerData(accumulated, threshold))
|
||||
.build();
|
||||
|
||||
rocketMQTemplate.syncSend(CleanOrderTopics.ORDER_CREATE, MessageBuilder.withPayload(event).build());
|
||||
|
||||
log.info("[TrafficThreshold] 发布工单创建事件:eventId={}, areaId={}, accumulated={}, threshold={}",
|
||||
event.getEventId(), configWrapper.getAreaId(), accumulated, threshold);
|
||||
} catch (Exception e) {
|
||||
log.error("[TrafficThreshold] 发布工单创建事件失败:deviceId={}, areaId={}",
|
||||
configWrapper.getDeviceId(), configWrapper.getAreaId(), e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 构建触发数据
|
||||
*/
|
||||
private Map<String, Object> buildTriggerData(Long accumulated, Integer threshold) {
|
||||
Map<String, Object> data = new HashMap<>();
|
||||
data.put("accumulated", accumulated);
|
||||
data.put("threshold", threshold);
|
||||
data.put("exceededCount", accumulated - threshold);
|
||||
return data;
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取配置包装器
|
||||
*/
|
||||
private CleanOrderIntegrationConfigService.AreaDeviceConfigWrapper getConfigWrapper(Long deviceId) {
|
||||
return configService.getConfigWrapperByDeviceId(deviceId);
|
||||
}
|
||||
|
||||
/**
|
||||
* 解析客流计数值
|
||||
*/
|
||||
private Long parseTrafficCount(Object value) {
|
||||
if (value == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
if (value instanceof Number) {
|
||||
return ((Number) value).longValue();
|
||||
}
|
||||
|
||||
if (value instanceof String) {
|
||||
try {
|
||||
return Long.parseLong((String) value);
|
||||
} catch (NumberFormatException e) {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -22,6 +22,7 @@ import org.springframework.data.redis.core.StringRedisTemplate;
|
||||
import org.springframework.messaging.support.MessageBuilder;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import com.viewsh.framework.tenant.core.context.ProjectContextHolder;
|
||||
import com.viewsh.framework.tenant.core.context.TenantContextHolder;
|
||||
|
||||
import java.time.LocalDateTime;
|
||||
@@ -449,6 +450,7 @@ public class TrajectoryDetectionProcessor {
|
||||
.enterRssi(enterRssi)
|
||||
.eventTime((eventTime != null ? eventTime : LocalDateTime.now()).toString())
|
||||
.tenantId(TenantContextHolder.getTenantId())
|
||||
.projectId(ProjectContextHolder.getProjectId())
|
||||
.build();
|
||||
|
||||
rocketMQTemplate.syncSend(TrajectoryTopics.TRAJECTORY_ENTER,
|
||||
@@ -475,6 +477,7 @@ public class TrajectoryDetectionProcessor {
|
||||
.enterTimestamp(enterTimestamp)
|
||||
.eventTime((eventTime != null ? eventTime : LocalDateTime.now()).toString())
|
||||
.tenantId(TenantContextHolder.getTenantId())
|
||||
.projectId(ProjectContextHolder.getProjectId())
|
||||
.build();
|
||||
|
||||
rocketMQTemplate.syncSend(TrajectoryTopics.TRAJECTORY_LEAVE,
|
||||
|
||||
Reference in New Issue
Block a user