feat(iot): 新增保洁规则消息订阅者,实现规则处理异步解耦

新增 IotCleanRuleMessageHandler 订阅设备消息总线,与属性保存主流程解耦
新增 CleanRuleProcessorManager 统一管理保洁规则处理器

优化点:
- 独立消费组,不阻塞属性保存主流程
- 与场景规则引擎架构一致,便于维护
- 支持 PROPERTY_POST 和 EVENT_POST 两种消息类型

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
lzh
2026-01-25 09:38:12 +08:00
parent 7cf8e5ed2f
commit 22d7a31194
2 changed files with 159 additions and 0 deletions

View File

@@ -0,0 +1,59 @@
package com.viewsh.module.iot.mq.consumer.rule;
import com.viewsh.module.iot.core.messagebus.core.IotMessageBus;
import com.viewsh.module.iot.core.messagebus.core.IotMessageSubscriber;
import com.viewsh.module.iot.core.mq.message.IotDeviceMessage;
import com.viewsh.module.iot.service.rule.clean.CleanRuleProcessorManager;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
/**
* 保洁规则消息处理器
* <p>
* 订阅设备消息总线,独立处理保洁相关规则,与属性保存解耦
*
* 优化点:
* 1. 独立消费组,不阻塞属性保存主流程
* 2. 与场景规则引擎架构一致,便于维护
* 3. 可独立扩展消费者数量
*
* @author AI
*/
@Component
@Slf4j
public class IotCleanRuleMessageHandler implements IotMessageSubscriber<IotDeviceMessage> {
@Resource
private CleanRuleProcessorManager cleanRuleProcessorManager;
@Resource
private IotMessageBus messageBus;
@PostConstruct
public void init() {
messageBus.register(this);
}
@Override
public String getTopic() {
return IotDeviceMessage.MESSAGE_BUS_DEVICE_MESSAGE_TOPIC;
}
@Override
public String getGroup() {
return "iot_clean_rule_consumer"; // 独立消费组
}
@Override
public void onMessage(IotDeviceMessage message) {
try {
cleanRuleProcessorManager.processMessage(message);
} catch (Exception e) {
// 规则处理异常不影响其他消息处理
log.error("[onMessage][消息({}) 保洁规则处理异常]", message.getRequestId(), e);
}
}
}

View File

@@ -0,0 +1,100 @@
package com.viewsh.module.iot.service.rule.clean;
import com.viewsh.module.iot.core.enums.IotDeviceMessageMethodEnum;
import com.viewsh.module.iot.core.mq.message.IotDeviceMessage;
import com.viewsh.module.iot.service.rule.clean.processor.BeaconDetectionRuleProcessor;
import com.viewsh.module.iot.service.rule.clean.processor.ButtonEventRuleProcessor;
import com.viewsh.module.iot.service.rule.clean.processor.TrafficThresholdRuleProcessor;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.Map;
/**
* 保洁规则处理器管理器
* <p>
* 统一管理所有保洁规则处理器,提供消息分发能力
* <p>
* 与场景规则处理器架构保持一致,直接在消息线程中同步处理
*
* @author AI
*/
@Component
@Slf4j
public class CleanRuleProcessorManager {
@Resource
private TrafficThresholdRuleProcessor trafficThresholdRuleProcessor;
@Resource
private BeaconDetectionRuleProcessor beaconDetectionRuleProcessor;
@Resource
private ButtonEventRuleProcessor buttonEventRuleProcessor;
/**
* 处理设备消息
* <p>
* 处理属性上报和事件上报消息,其他消息类型忽略
*
* @param message 设备消息
*/
public void processMessage(IotDeviceMessage message) {
String method = message.getMethod();
// 1. 只处理属性上报和事件上报消息
if (!IotDeviceMessageMethodEnum.PROPERTY_POST.getMethod().equals(method)
&& !IotDeviceMessageMethodEnum.EVENT_POST.getMethod().equals(method)) {
return;
}
// 2. 获取事件/属性数据
if (!(message.getParams() instanceof Map)) {
log.warn("[processMessage][消息({}) params 格式不正确]", message.getRequestId());
return;
}
Long deviceId = message.getDeviceId();
@SuppressWarnings("unchecked")
Map<String, Object> data = (Map<String, Object>) message.getParams();
log.debug("[processMessage][设备({}) 处理{}上报,数据数量: {}]",
deviceId, "thing.event.post".equals(method) ? "事件" : "属性", data.size());
// 3. 顺序处理各数据项(与场景规则处理器保持一致)
data.forEach((identifier, value) ->
processDataSafely(deviceId, identifier, value));
}
/**
* 安全处理单个数据项
* <p>
* 按标识符路由到对应处理器:
* - 属性people_in → TrafficThresholdRuleProcessor
* - 属性bluetoothDevices → BeaconDetectionRuleProcessor
* - 事件button_event → ButtonEventRuleProcessor
*
* @param deviceId 设备ID
* @param identifier 标识符
* @param value 数据值
*/
private void processDataSafely(Long deviceId, String identifier, Object value) {
try {
switch (identifier) {
case "people_in" ->
trafficThresholdRuleProcessor.processPropertyChange(deviceId, identifier, value);
case "bluetoothDevices" ->
beaconDetectionRuleProcessor.processPropertyChange(deviceId, identifier, value);
case "button_event" ->
buttonEventRuleProcessor.processPropertyChange(deviceId, identifier, value);
default -> {
// 其他属性/事件忽略
}
}
} catch (Exception e) {
log.error("[processDataSafely][设备({}) 数据({}) 处理异常]", deviceId, identifier, e);
}
}
}