refactor(iot): 适配规则处理器使用新的配置查询方法

- ButtonEventRuleProcessor: 改为从设备 config 读取按键配置
- TrafficThresholdRuleProcessor: 使用 getConfigWrapperByDeviceId 方法
- BeaconDetectionRuleProcessor: 使用 getConfigByAreaIdAndRelationType 方法
- SignalLossRuleProcessor: 使用 getConfigByAreaIdAndRelationType 方法
- CleanRuleProcessorManager: 更新处理流程
- 添加性能优化 TODO 注释

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
lzh
2026-02-01 00:57:23 +08:00
parent 89e68eceb2
commit 3839da2966
5 changed files with 146 additions and 93 deletions

View File

@@ -78,9 +78,9 @@ public class CleanRuleProcessorManager {
* <p>
* 事件上报的 params 结构:
* {
* "identifier": "button_event",
* "eventTime": 1234567890,
* "params": { keyId: 1, keyState: 1 }
* "identifier": "button_event",
* "eventTime": 1234567890,
* "params": { keyId: 1, keyState: 1 }
* }
*/
private void processEventData(Long deviceId, Map<String, Object> data) {
@@ -95,8 +95,7 @@ public class CleanRuleProcessorManager {
// 路由到对应处理器
switch (identifier) {
case "button_event" ->
buttonEventRuleProcessor.processPropertyChange(deviceId, identifier, params);
case "button_event" -> buttonEventRuleProcessor.processPropertyChange(deviceId, identifier, params);
default -> {
// 其他事件忽略
}
@@ -118,12 +117,9 @@ public class CleanRuleProcessorManager {
private void processDataSafely(Long deviceId, String identifier, Object value) {
try {
switch (identifier) {
case "people_in" ->
trafficThresholdRuleProcessor.processPropertyChange(deviceId, identifier, value);
case "people_in" -> trafficThresholdRuleProcessor.processPropertyChange(deviceId, identifier, value);
case "bluetoothDevices" ->
beaconDetectionRuleProcessor.processPropertyChange(deviceId, identifier, value);
case "button_event" ->
buttonEventRuleProcessor.processPropertyChange(deviceId, identifier, value);
beaconDetectionRuleProcessor.processPropertyChange(deviceId, identifier, value);
default -> {
// 其他属性/事件忽略
}

View File

@@ -70,16 +70,17 @@ public class BeaconDetectionRuleProcessor {
log.debug("[BeaconDetection] 收到蓝牙属性deviceId={}", deviceId);
// 2. 获取工牌设备的配置包含区域ID
CleanOrderIntegrationConfigService.AreaDeviceConfigWrapper badgeConfigWrapper = configService
.getConfigWrapperByDeviceId(deviceId);
// 2. 获取当前工单状态(从中获取正确的 areaId
BadgeDeviceStatusRedisDAO.OrderInfo currentOrder = badgeDeviceStatusRedisDAO.getCurrentOrder(deviceId);
if (badgeConfigWrapper == null || badgeConfigWrapper.getAreaId() == null) {
log.debug("[BeaconDetection] 工牌设备无区域配置deviceId={}", deviceId);
if (currentOrder == null || currentOrder.getAreaId() == null) {
log.debug("[BeaconDetection] 无当前工单,跳过检测deviceId={}", deviceId);
return;
}
Long areaId = badgeConfigWrapper.getAreaId();
Long areaId = currentOrder.getAreaId();
log.debug("[BeaconDetection] 从工单状态获取区域deviceId={}, areaId={}, orderId={}",
deviceId, areaId, currentOrder.getOrderId());
// 3. 获取该区域的信标配置(从 BEACON 类型的设备获取)
CleanOrderIntegrationConfigService.AreaDeviceConfigWrapper beaconConfigWrapper = configService
@@ -96,24 +97,21 @@ public class BeaconDetectionRuleProcessor {
return;
}
// 3. 解析蓝牙数据,提取目标信标的 RSSI
// 4. 解析蓝牙数据,提取目标信标的 RSSI
Integer targetRssi = detector.extractTargetRssi(propertyValue, beaconConfig.getBeaconMac());
log.debug("[BeaconDetection] 提取RSSIdeviceId={}, beaconMac={}, rssi={}",
deviceId, beaconConfig.getBeaconMac(), targetRssi);
log.debug("[BeaconDetection] 提取RSSIdeviceId={}, areaId={}, beaconMac={}, rssi={}",
deviceId, areaId, beaconConfig.getBeaconMac(), targetRssi);
// 4. 更新滑动窗口(使用 enter 和 exit 中较大的窗口大小)
// 5. 更新滑动窗口(使用 enter 和 exit 中较大的窗口大小)
int maxWindowSize = Math.max(
beaconConfig.getEnter().getWindowSize(),
beaconConfig.getExit().getWindowSize());
windowRedisDAO.updateWindow(deviceId, areaId, targetRssi, maxWindowSize);
// 5. 获取当前窗口样本
// 6. 获取当前窗口样本
List<Integer> window = windowRedisDAO.getWindow(deviceId, areaId);
// 6. 获取设备当前工单状态
BadgeDeviceStatusRedisDAO.OrderInfo currentOrder = badgeDeviceStatusRedisDAO.getCurrentOrder(deviceId);
// 7. 确定当前状态
RssiSlidingWindowDetector.AreaState currentState = determineState(currentOrder, areaId, deviceId);
@@ -127,7 +125,7 @@ public class BeaconDetectionRuleProcessor {
// 9. 处理检测结果
switch (result) {
case ARRIVE_CONFIRMED:
handleArriveConfirmed(deviceId, areaId, window, beaconConfig, badgeConfigWrapper);
handleArriveConfirmed(deviceId, areaId, window, beaconConfig, currentOrder);
break;
case LEAVE_CONFIRMED:
handleLeaveConfirmed(deviceId, areaId, window, beaconConfig);
@@ -142,8 +140,8 @@ public class BeaconDetectionRuleProcessor {
* 处理到达确认
*/
private void handleArriveConfirmed(Long deviceId, Long areaId, List<Integer> window,
BeaconPresenceConfig beaconConfig,
CleanOrderIntegrationConfigService.AreaDeviceConfigWrapper badgeConfigWrapper) {
BeaconPresenceConfig beaconConfig,
BadgeDeviceStatusRedisDAO.OrderInfo currentOrder) {
log.info("[BeaconDetection] 到达确认deviceId={}, areaId={}, window={}",
deviceId, areaId, window);
@@ -160,20 +158,20 @@ public class BeaconDetectionRuleProcessor {
// 4. 获取当前最新的 RSSI 值(使用原窗口快照,因为已清理)
Integer currentRssi = window.isEmpty() ? -999 : window.get(window.size() - 1);
// 4. 构建触发数据
// 5. 构建触发数据
Map<String, Object> triggerData = new HashMap<>();
triggerData.put("beaconMac", beaconConfig.getBeaconMac());
triggerData.put("rssi", currentRssi);
triggerData.put("windowSnapshot", window);
triggerData.put("enterRssiThreshold", beaconConfig.getEnter().getRssiThreshold());
// 5. 发布到岗事件
// 6. 发布到岗事件
if (beaconConfig.getEnter().getAutoArrival()) {
publishArriveEvent(deviceId, badgeConfigWrapper.getDeviceKey(), areaId, triggerData);
publishArriveEvent(deviceId, currentOrder.getOrderId(), areaId, triggerData);
}
// 6. 发布审计日志
publishAuditEvent("BEACON_ARRIVE_CONFIRMED", deviceId, badgeConfigWrapper.getDeviceKey(), areaId,
// 7. 发布审计日志
publishAuditEvent("BEACON_ARRIVE_CONFIRMED", deviceId, null, areaId,
"蓝牙信标自动到岗确认", triggerData);
}
@@ -181,7 +179,7 @@ public class BeaconDetectionRuleProcessor {
* 处理离开确认
*/
private void handleLeaveConfirmed(Long deviceId, Long areaId, List<Integer> window,
BeaconPresenceConfig beaconConfig) {
BeaconPresenceConfig beaconConfig) {
log.info("[BeaconDetection] 离开确认deviceId={}, areaId={}, window={}",
deviceId, areaId, window);
@@ -230,21 +228,13 @@ public class BeaconDetectionRuleProcessor {
/**
* 发布到岗事件
*/
private void publishArriveEvent(Long deviceId, String deviceKey, Long areaId, Map<String, Object> triggerData) {
private void publishArriveEvent(Long deviceId, Long orderId, Long areaId, Map<String, Object> triggerData) {
try {
// 获取当前工单信息,没有工单不发布事件
BadgeDeviceStatusRedisDAO.OrderInfo currentOrder = badgeDeviceStatusRedisDAO.getCurrentOrder(deviceId);
if (currentOrder == null || currentOrder.getOrderId() == null) {
log.warn("[BeaconDetection] 设备无当前工单,跳过到岗事件: deviceId={}, areaId={}", deviceId, areaId);
return;
}
CleanOrderArriveEvent event = CleanOrderArriveEvent.builder()
.eventId(java.util.UUID.randomUUID().toString())
.orderType("CLEAN")
.orderId(currentOrder.getOrderId())
.orderId(orderId)
.deviceId(deviceId)
.deviceKey(deviceKey)
.areaId(areaId)
.triggerSource("IOT_BEACON")
.triggerData(triggerData)
@@ -253,7 +243,7 @@ public class BeaconDetectionRuleProcessor {
rocketMQTemplate.syncSend(CleanOrderTopics.ORDER_ARRIVE, MessageBuilder.withPayload(event).build());
log.info("[BeaconDetection] 发布到岗事件eventId={}, deviceId={}, areaId={}, orderId={}",
event.getEventId(), deviceId, areaId, currentOrder.getOrderId());
event.getEventId(), deviceId, areaId, orderId);
} catch (Exception e) {
log.error("[BeaconDetection] 发布到岗事件失败deviceId={}, areaId={}", deviceId, areaId, e);
}
@@ -263,7 +253,7 @@ public class BeaconDetectionRuleProcessor {
* 发布审计事件
*/
private void publishAuditEvent(String auditType, Long deviceId, String deviceKey,
Long areaId, String message, Map<String, Object> data) {
Long areaId, String message, Map<String, Object> data) {
try {
CleanOrderAuditEvent event = CleanOrderAuditEvent.builder()
.eventId(java.util.UUID.randomUUID().toString())

View File

@@ -1,9 +1,11 @@
package com.viewsh.module.iot.service.rule.clean.processor;
import com.viewsh.framework.common.util.json.JsonUtils;
import com.viewsh.module.iot.core.integration.constants.CleanOrderTopics;
import com.viewsh.module.iot.dal.dataobject.integration.clean.ButtonEventConfig;
import com.viewsh.module.iot.dal.redis.clean.BadgeDeviceStatusRedisDAO;
import com.viewsh.module.iot.service.integration.clean.CleanOrderIntegrationConfigService;
import com.viewsh.module.iot.service.device.IotDeviceService;
import com.viewsh.module.iot.dal.dataobject.device.IotDeviceDO;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
@@ -31,10 +33,10 @@ import java.util.UUID;
public class ButtonEventRuleProcessor {
@Resource
private CleanOrderIntegrationConfigService configService;
private BadgeDeviceStatusRedisDAO badgeDeviceStatusRedisDAO;
@Resource
private BadgeDeviceStatusRedisDAO badgeDeviceStatusRedisDAO;
private IotDeviceService deviceService;
@Resource
private RocketMQTemplate rocketMQTemplate;
@@ -56,16 +58,8 @@ public class ButtonEventRuleProcessor {
log.debug("[ButtonEvent] 收到按键事件deviceId={}, value={}", deviceId, propertyValue);
// 2. 获取配置
CleanOrderIntegrationConfigService.AreaDeviceConfigWrapper configWrapper =
getConfigWrapper(deviceId);
if (configWrapper == null || configWrapper.getConfig() == null) {
log.debug("[ButtonEvent] 设备无配置deviceId={}", deviceId);
return;
}
ButtonEventConfig buttonConfig = configWrapper.getConfig().getButtonEvent();
// 2. 获取设备按键配置(从设备 config 字段读取)
ButtonEventConfig buttonConfig = getButtonConfig(deviceId);
if (buttonConfig == null || !buttonConfig.getEnabled()) {
log.debug("[ButtonEvent] 未启用按键事件处理deviceId={}", deviceId);
return;
@@ -83,10 +77,10 @@ public class ButtonEventRuleProcessor {
// 4. 匹配按键类型并处理
if (buttonId.equals(buttonConfig.getConfirmKeyId())) {
// 确认键
handleConfirmButton(configWrapper, buttonId);
handleConfirmButton(deviceId, buttonId);
} else if (buttonId.equals(buttonConfig.getQueryKeyId())) {
// 查询键
handleQueryButton(configWrapper, buttonId);
handleQueryButton(deviceId, buttonId);
} else {
log.debug("[ButtonEvent] 未配置的按键deviceId={}, buttonId={}", deviceId, buttonId);
}
@@ -97,9 +91,7 @@ public class ButtonEventRuleProcessor {
* <p>
* 保洁员按下确认键,确认接收工单
*/
private void handleConfirmButton(CleanOrderIntegrationConfigService.AreaDeviceConfigWrapper configWrapper,
Integer buttonId) {
Long deviceId = configWrapper.getDeviceId();
private void handleConfirmButton(Long deviceId, Integer buttonId) {
log.info("[ButtonEvent] 确认键按下deviceId={}, buttonId={}", deviceId, buttonId);
@@ -123,7 +115,7 @@ public class ButtonEventRuleProcessor {
}
// 3. 发布工单确认事件
publishConfirmEvent(configWrapper, orderId, buttonId);
publishConfirmEvent(deviceId, orderId, buttonId);
log.info("[ButtonEvent] 发布工单确认事件deviceId={}, orderId={}", deviceId, orderId);
}
@@ -133,9 +125,7 @@ public class ButtonEventRuleProcessor {
* <p>
* 保洁员按下查询键,查询当前工单信息
*/
private void handleQueryButton(CleanOrderIntegrationConfigService.AreaDeviceConfigWrapper configWrapper,
Integer buttonId) {
Long deviceId = configWrapper.getDeviceId();
private void handleQueryButton(Long deviceId, Integer buttonId) {
log.info("[ButtonEvent] 查询键按下deviceId={}, buttonId={}", deviceId, buttonId);
@@ -144,12 +134,12 @@ public class ButtonEventRuleProcessor {
if (currentOrder == null) {
log.info("[ButtonEvent] 设备无当前工单deviceId={}", deviceId);
// 发布查询结果事件(无工单)
publishQueryEvent(configWrapper, null, buttonId, "当前无工单");
publishQueryEvent(deviceId, null, buttonId, "当前无工单");
return;
}
// 2. 发布查询事件
publishQueryEvent(configWrapper, currentOrder.getOrderId(), buttonId, "查询当前工单");
publishQueryEvent(deviceId, currentOrder.getOrderId(), buttonId, "查询当前工单");
log.info("[ButtonEvent] 发布工单查询事件deviceId={}, orderId={}", deviceId, currentOrder.getOrderId());
}
@@ -157,16 +147,17 @@ public class ButtonEventRuleProcessor {
/**
* 发布工单确认事件
*/
private void publishConfirmEvent(CleanOrderIntegrationConfigService.AreaDeviceConfigWrapper configWrapper,
Long orderId, Integer buttonId) {
private void publishConfirmEvent(Long deviceId, Long orderId, Integer buttonId) {
try {
String deviceKey = getDeviceKey(deviceId);
Map<String, Object> event = new HashMap<>();
event.put("eventId", UUID.randomUUID().toString());
event.put("orderType", "CLEAN");
event.put("orderId", orderId);
event.put("deviceId", configWrapper.getDeviceId());
event.put("deviceKey", configWrapper.getDeviceKey());
event.put("areaId", configWrapper.getAreaId());
event.put("deviceId", deviceId);
event.put("deviceKey", deviceKey);
event.put("areaId", null); // areaId 由 Ops 模块从当前工单获取
event.put("triggerSource", "IOT_BUTTON_CONFIRM");
event.put("buttonId", buttonId);
@@ -176,26 +167,27 @@ public class ButtonEventRuleProcessor {
);
log.info("[ButtonEvent] 确认事件已发布eventId={}, orderId={}, deviceId={}",
event.get("eventId"), orderId, configWrapper.getDeviceId());
event.get("eventId"), orderId, deviceId);
} catch (Exception e) {
log.error("[ButtonEvent] 发布确认事件失败deviceId={}, orderId={}",
configWrapper.getDeviceId(), orderId, e);
deviceId, orderId, e);
}
}
/**
* 发布工单查询事件
*/
private void publishQueryEvent(CleanOrderIntegrationConfigService.AreaDeviceConfigWrapper configWrapper,
Long orderId, Integer buttonId, String message) {
private void publishQueryEvent(Long deviceId, Long orderId, Integer buttonId, String message) {
try {
String deviceKey = getDeviceKey(deviceId);
Map<String, Object> event = new HashMap<>();
event.put("eventId", UUID.randomUUID().toString());
event.put("orderType", "CLEAN");
event.put("orderId", orderId);
event.put("deviceId", configWrapper.getDeviceId());
event.put("deviceKey", configWrapper.getDeviceKey());
event.put("areaId", configWrapper.getAreaId());
event.put("deviceId", deviceId);
event.put("deviceKey", deviceKey);
event.put("areaId", null); // areaId 由 Ops 模块从当前工单获取
event.put("triggerSource", "IOT_BUTTON_QUERY");
event.put("buttonId", buttonId);
event.put("message", message);
@@ -206,18 +198,47 @@ public class ButtonEventRuleProcessor {
);
log.info("[ButtonEvent] 查询事件已发布eventId={}, orderId={}, deviceId={}, message={}",
event.get("eventId"), orderId, configWrapper.getDeviceId(), message);
event.get("eventId"), orderId, deviceId, message);
} catch (Exception e) {
log.error("[ButtonEvent] 发布查询事件失败deviceId={}, orderId={}",
configWrapper.getDeviceId(), orderId, e);
deviceId, orderId, e);
}
}
/**
* 获取配置包装器
* 获取设备按键配置
* <p>
* 从设备的 config 字段读取按键事件配置
*
* @param deviceId 设备ID
* @return 按键配置,如果未配置返回 null
*/
private CleanOrderIntegrationConfigService.AreaDeviceConfigWrapper getConfigWrapper(Long deviceId) {
return configService.getConfigWrapperByDeviceId(deviceId);
private ButtonEventConfig getButtonConfig(Long deviceId) {
try {
IotDeviceDO device = deviceService.getDeviceFromCache(deviceId);
if (device == null || device.getConfig() == null) {
log.debug("[ButtonEvent] 设备不存在或无配置deviceId={}", deviceId);
return null;
}
// 从设备 config JSON 中解析 buttonEvent 配置
// 注意:使用 JsonUtils.parseObject 直接解析整个 config 为 Map然后提取 buttonEvent
// 避免 先转JSON字符串再解析回对象 的双重转换
@SuppressWarnings("unchecked")
Map<String, Object> configMap = JsonUtils.parseObject(device.getConfig(), Map.class);
if (configMap == null || !configMap.containsKey("buttonEvent")) {
log.debug("[ButtonEvent] 设备配置中无 buttonEventdeviceId={}", deviceId);
return null;
}
// 将 buttonEvent 对象转为 JSON 字符串再解析为目标类型
// TODO: 后续可优化为直接转换,避免序列化/反序列化开销
Object buttonEventObj = configMap.get("buttonEvent");
return JsonUtils.parseObject(JsonUtils.toJsonString(buttonEventObj), ButtonEventConfig.class);
} catch (Exception e) {
log.error("[ButtonEvent] 获取按键配置失败deviceId={}", deviceId, e);
return null;
}
}
/**
@@ -259,6 +280,27 @@ public class ButtonEventRuleProcessor {
return null;
}
/**
* 获取设备 Key从 IoT 设备缓存获取 serialNumber
* <p>
* deviceKey 在 ops_area_device_relation 表中是冗余字段,
* 实际来源是 iot_device.serialNumber
*
* @param deviceId 设备ID
* @return deviceKeyserialNumber获取<E88EB7><E58F96>败返回 null
*/
private String getDeviceKey(Long deviceId) {
try {
IotDeviceDO device = deviceService.getDeviceFromCache(deviceId);
if (device != null) {
return device.getSerialNumber();
}
} catch (Exception e) {
log.warn("[ButtonEvent] 获取 deviceKey 失败deviceId={}", deviceId, e);
}
return null;
}
@Resource
private StringRedisTemplate stringRedisTemplate;
}

View File

@@ -9,6 +9,8 @@ import com.viewsh.module.iot.dal.redis.clean.BeaconArrivedTimeRedisDAO;
import com.viewsh.module.iot.dal.redis.clean.BeaconRssiWindowRedisDAO;
import com.viewsh.module.iot.dal.redis.clean.SignalLossRedisDAO;
import com.viewsh.module.iot.service.integration.clean.CleanOrderIntegrationConfigService;
import com.viewsh.module.iot.service.device.IotDeviceService;
import com.viewsh.module.iot.dal.dataobject.device.IotDeviceDO;
import com.xxl.job.core.handler.annotation.XxlJob;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
@@ -49,6 +51,9 @@ public class SignalLossRuleProcessor {
@Resource
private CleanOrderIntegrationConfigService configService;
@Resource
private IotDeviceService deviceService;
@Resource
private RocketMQTemplate rocketMQTemplate;
@@ -143,13 +148,10 @@ public class SignalLossRuleProcessor {
BeaconPresenceConfig.ExitConfig exitConfig = beaconConfigWrapper.getConfig().getBeaconPresence().getExit();
// 2. 获取工牌设备信息(用于获取 deviceKey
CleanOrderIntegrationConfigService.AreaDeviceConfigWrapper badgeConfigWrapper = configService
.getConfigWrapperByDeviceId(deviceId);
// 2. 获取 deviceKey从 IoT 设备缓存获取 serialNumber
String badgeDeviceKey = getDeviceKey(deviceId);
String badgeDeviceKey = (badgeConfigWrapper != null) ? badgeConfigWrapper.getDeviceKey() : null;
// 2. 获取首次丢失时间
// 3. 获取首次丢失时间
Long firstLossTime = signalLossRedisDAO.getFirstLossTime(deviceId, areaId);
if (firstLossTime == null) {
@@ -344,4 +346,25 @@ public class SignalLossRuleProcessor {
BadgeDeviceStatusRedisDAO.OrderInfo currentOrder = badgeDeviceStatusRedisDAO.getCurrentOrder(deviceId);
return currentOrder != null && !currentOrder.getAreaId().equals(areaId);
}
/**
* 获取设备 Key从 IoT 设备缓存获取 serialNumber
* <p>
* deviceKey 在 ops_area_device_relation 表中是冗余字段,
* 实际来源是 iot_device.serialNumber
*
* @param deviceId 设备ID
* @return deviceKeyserialNumber获取失败返回 null
*/
private String getDeviceKey(Long deviceId) {
try {
IotDeviceDO device = deviceService.getDeviceFromCache(deviceId);
if (device != null) {
return device.getSerialNumber();
}
} catch (Exception e) {
log.warn("[SignalLoss] 获取 deviceKey 失败deviceId={}", deviceId, e);
}
return null;
}
}

View File

@@ -56,7 +56,7 @@ public class TrafficThresholdRuleProcessor {
log.debug("[TrafficThreshold] 收到客流属性deviceId={}, value={}", deviceId, propertyValue);
// 2. 获取配置
// 2. 获取设备关联信息(包含 areaId
CleanOrderIntegrationConfigService.AreaDeviceConfigWrapper configWrapper =
getConfigWrapper(deviceId);
@@ -153,6 +153,8 @@ public class TrafficThresholdRuleProcessor {
/**
* 获取配置包装器
* <p>
* 通过 deviceId 直接获取配置(适用于一对一关系的设备,如 TRAFFIC_COUNTER
*/
private CleanOrderIntegrationConfigService.AreaDeviceConfigWrapper getConfigWrapper(Long deviceId) {
return configService.getConfigWrapperByDeviceId(deviceId);