diff --git a/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/mq/consumer/rule/IotCleanRuleMessageHandler.java b/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/mq/consumer/rule/IotCleanRuleMessageHandler.java new file mode 100644 index 0000000..c747bba --- /dev/null +++ b/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/mq/consumer/rule/IotCleanRuleMessageHandler.java @@ -0,0 +1,59 @@ +package com.viewsh.module.iot.mq.consumer.rule; + +import com.viewsh.module.iot.core.messagebus.core.IotMessageBus; +import com.viewsh.module.iot.core.messagebus.core.IotMessageSubscriber; +import com.viewsh.module.iot.core.mq.message.IotDeviceMessage; +import com.viewsh.module.iot.service.rule.clean.CleanRuleProcessorManager; +import jakarta.annotation.PostConstruct; +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +/** + * 保洁规则消息处理器 + *

+ * 订阅设备消息总线,独立处理保洁相关规则,与属性保存解耦 + * + * 优化点: + * 1. 独立消费组,不阻塞属性保存主流程 + * 2. 与场景规则引擎架构一致,便于维护 + * 3. 可独立扩展消费者数量 + * + * @author AI + */ +@Component +@Slf4j +public class IotCleanRuleMessageHandler implements IotMessageSubscriber { + + @Resource + private CleanRuleProcessorManager cleanRuleProcessorManager; + + @Resource + private IotMessageBus messageBus; + + @PostConstruct + public void init() { + messageBus.register(this); + } + + @Override + public String getTopic() { + return IotDeviceMessage.MESSAGE_BUS_DEVICE_MESSAGE_TOPIC; + } + + @Override + public String getGroup() { + return "iot_clean_rule_consumer"; // 独立消费组 + } + + @Override + public void onMessage(IotDeviceMessage message) { + try { + cleanRuleProcessorManager.processMessage(message); + } catch (Exception e) { + // 规则处理异常不影响其他消息处理 + log.error("[onMessage][消息({}) 保洁规则处理异常]", message.getRequestId(), e); + } + } + +} diff --git a/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/service/rule/clean/CleanRuleProcessorManager.java b/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/service/rule/clean/CleanRuleProcessorManager.java new file mode 100644 index 0000000..405f989 --- /dev/null +++ b/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/service/rule/clean/CleanRuleProcessorManager.java @@ -0,0 +1,100 @@ +package com.viewsh.module.iot.service.rule.clean; + +import com.viewsh.module.iot.core.enums.IotDeviceMessageMethodEnum; +import com.viewsh.module.iot.core.mq.message.IotDeviceMessage; +import com.viewsh.module.iot.service.rule.clean.processor.BeaconDetectionRuleProcessor; +import com.viewsh.module.iot.service.rule.clean.processor.ButtonEventRuleProcessor; +import com.viewsh.module.iot.service.rule.clean.processor.TrafficThresholdRuleProcessor; +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +import java.util.Map; + +/** + * 保洁规则处理器管理器 + *

+ * 统一管理所有保洁规则处理器,提供消息分发能力 + *

+ * 与场景规则处理器架构保持一致,直接在消息线程中同步处理 + * + * @author AI + */ +@Component +@Slf4j +public class CleanRuleProcessorManager { + + @Resource + private TrafficThresholdRuleProcessor trafficThresholdRuleProcessor; + + @Resource + private BeaconDetectionRuleProcessor beaconDetectionRuleProcessor; + + @Resource + private ButtonEventRuleProcessor buttonEventRuleProcessor; + + /** + * 处理设备消息 + *

+ * 处理属性上报和事件上报消息,其他消息类型忽略 + * + * @param message 设备消息 + */ + public void processMessage(IotDeviceMessage message) { + String method = message.getMethod(); + + // 1. 只处理属性上报和事件上报消息 + if (!IotDeviceMessageMethodEnum.PROPERTY_POST.getMethod().equals(method) + && !IotDeviceMessageMethodEnum.EVENT_POST.getMethod().equals(method)) { + return; + } + + // 2. 获取事件/属性数据 + if (!(message.getParams() instanceof Map)) { + log.warn("[processMessage][消息({}) params 格式不正确]", message.getRequestId()); + return; + } + + Long deviceId = message.getDeviceId(); + @SuppressWarnings("unchecked") + Map data = (Map) message.getParams(); + + log.debug("[processMessage][设备({}) 处理{}上报,数据数量: {}]", + deviceId, "thing.event.post".equals(method) ? "事件" : "属性", data.size()); + + // 3. 顺序处理各数据项(与场景规则处理器保持一致) + data.forEach((identifier, value) -> + processDataSafely(deviceId, identifier, value)); + } + + /** + * 安全处理单个数据项 + *

+ * 按标识符路由到对应处理器: + * - 属性:people_in → TrafficThresholdRuleProcessor + * - 属性:bluetoothDevices → BeaconDetectionRuleProcessor + * - 事件:button_event → ButtonEventRuleProcessor + * + * @param deviceId 设备ID + * @param identifier 标识符 + * @param value 数据值 + */ + private void processDataSafely(Long deviceId, String identifier, Object value) { + try { + switch (identifier) { + case "people_in" -> + trafficThresholdRuleProcessor.processPropertyChange(deviceId, identifier, value); + case "bluetoothDevices" -> + beaconDetectionRuleProcessor.processPropertyChange(deviceId, identifier, value); + case "button_event" -> + buttonEventRuleProcessor.processPropertyChange(deviceId, identifier, value); + default -> { + // 其他属性/事件忽略 + } + } + } catch (Exception e) { + log.error("[processDataSafely][设备({}) 数据({}) 处理异常]", deviceId, identifier, e); + } + } + +}