From fa619710efb563bba21fb1829004d3139b4727fe Mon Sep 17 00:00:00 2001 From: lzh Date: Wed, 21 Jan 2026 22:56:13 +0800 Subject: [PATCH] =?UTF-8?q?refactor(iot):=20=E4=BC=98=E5=8C=96=E4=BA=8B?= =?UTF-8?q?=E4=BB=B6=E5=8F=91=E5=B8=83=E6=9C=BA=E5=88=B6=E5=B9=B6=E4=BF=AE?= =?UTF-8?q?=E5=A4=8D=E7=8A=B6=E6=80=81=E5=80=BC=E8=A7=A3=E6=9E=90?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 1. IntegrationEventPublisher 只保留设备状态变更事件发布 - 注释掉 publishPropertyChanged 和 publishEventOccurred 接口 - RocketMQIntegrationEventPublisher 对应实现改为注释 2. IotDevicePropertyServiceImpl 属性消息发布暂停 - 注释掉 saveDeviceProperty 中的 publishPropertyMessage 调用 - 注释掉 publishToIntegrationEventBus 中的实际发布逻辑 3. IotDeviceMessageServiceImpl 新增状态值解析兼容 - 新增 parseStateValue 方法支持整数和字符串格式状态值 - 支持 "online"/"offline" 字符串解析 Co-Authored-By: Claude Opus 4.5 --- .../publisher/IntegrationEventPublisher.java | 4 +- .../RocketMQIntegrationEventPublisher.java | 51 +++++-------------- .../message/IotDeviceMessageServiceImpl.java | 33 +++++++++++- .../IotDevicePropertyServiceImpl.java | 7 +-- 4 files changed, 52 insertions(+), 43 deletions(-) diff --git a/viewsh-module-iot/viewsh-module-iot-core/src/main/java/com/viewsh/module/iot/core/integration/publisher/IntegrationEventPublisher.java b/viewsh-module-iot/viewsh-module-iot-core/src/main/java/com/viewsh/module/iot/core/integration/publisher/IntegrationEventPublisher.java index 6915d23..41a6c56 100644 --- a/viewsh-module-iot/viewsh-module-iot-core/src/main/java/com/viewsh/module/iot/core/integration/publisher/IntegrationEventPublisher.java +++ b/viewsh-module-iot/viewsh-module-iot-core/src/main/java/com/viewsh/module/iot/core/integration/publisher/IntegrationEventPublisher.java @@ -26,13 +26,13 @@ public interface IntegrationEventPublisher { * * @param event 属性变更事件 */ - void publishPropertyChanged(DevicePropertyChangedEvent event); +// void publishPropertyChanged(DevicePropertyChangedEvent event); /** * 发布设备事件上报事件 * * @param event 设备事件 */ - void publishEventOccurred(DeviceEventOccurredEvent event); +// void publishEventOccurred(DeviceEventOccurredEvent event); } diff --git a/viewsh-module-iot/viewsh-module-iot-core/src/main/java/com/viewsh/module/iot/core/integration/publisher/RocketMQIntegrationEventPublisher.java b/viewsh-module-iot/viewsh-module-iot-core/src/main/java/com/viewsh/module/iot/core/integration/publisher/RocketMQIntegrationEventPublisher.java index eb7e4fa..5510f05 100644 --- a/viewsh-module-iot/viewsh-module-iot-core/src/main/java/com/viewsh/module/iot/core/integration/publisher/RocketMQIntegrationEventPublisher.java +++ b/viewsh-module-iot/viewsh-module-iot-core/src/main/java/com/viewsh/module/iot/core/integration/publisher/RocketMQIntegrationEventPublisher.java @@ -1,11 +1,9 @@ package com.viewsh.module.iot.core.integration.publisher; +import com.viewsh.module.iot.core.integration.config.IntegrationEventProperties; import com.viewsh.module.iot.core.integration.constants.DeviceTags; import com.viewsh.module.iot.core.integration.constants.IntegrationTopics; -import com.viewsh.module.iot.core.integration.event.DeviceEventOccurredEvent; -import com.viewsh.module.iot.core.integration.event.DevicePropertyChangedEvent; import com.viewsh.module.iot.core.integration.event.DeviceStatusChangedEvent; -import com.viewsh.module.iot.core.integration.config.IntegrationEventProperties; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.springframework.messaging.Message; @@ -47,39 +45,18 @@ public class RocketMQIntegrationEventPublisher implements IntegrationEventPublis } } - @Override - public void publishPropertyChanged(DevicePropertyChangedEvent event) { - try { - String tag = DeviceTags.fromProductKey(event.getProductKey()); - String destination = IntegrationTopics.DEVICE_PROPERTY + ":" + tag; - - Message message = MessageBuilder.withPayload(event).build(); - rocketMQTemplate.syncSend(destination, message, properties.getSendTimeoutMs()); - - log.debug("[publishPropertyChanged] 发布设备属性变更事件: eventId={}, deviceId={}, productKey={}, properties={}", - event.getEventId(), event.getDeviceId(), event.getProductKey(), event.getChangedIdentifiers()); - } catch (Exception e) { - log.error("[publishPropertyChanged] 发布设备属性变更事件失败: eventId={}, deviceId={}", - event.getEventId(), event.getDeviceId(), e); - } - } - - @Override - public void publishEventOccurred(DeviceEventOccurredEvent event) { - try { - // 使用 productKey 作为 Tag - String tag = DeviceTags.fromProductKey(event.getProductKey()); - String destination = IntegrationTopics.DEVICE_EVENT + ":" + tag; - - Message message = MessageBuilder.withPayload(event).build(); - rocketMQTemplate.syncSend(destination, message, properties.getSendTimeoutMs()); - - log.debug("[publishEventOccurred] 发布设备事件上报事件: eventId={}, deviceId={}, productKey={}, event={}", - event.getEventId(), event.getDeviceId(), event.getProductKey(), event.getEventIdentifier()); - } catch (Exception e) { - log.error("[publishEventOccurred] 发布设备事件上报事件失败: eventId={}, deviceId={}", - event.getEventId(), event.getDeviceId(), e); - } - } +// @Override +// public void publishPropertyChanged(DevicePropertyChangedEvent event) { +// // 暂不处理属性变更事件 +// log.debug("[publishPropertyChanged] 跳过设备属性变更事件发布: eventId={}, deviceId={}, productKey={}", +// event.getEventId(), event.getDeviceId(), event.getProductKey()); +// } +// +// @Override +// public void publishEventOccurred(DeviceEventOccurredEvent event) { +// // 暂不处理设备事件上报 +// log.debug("[publishEventOccurred] 跳过设备事件上报发布: eventId={}, deviceId={}, productKey={}, event={}", +// event.getEventId(), event.getDeviceId(), event.getProductKey(), event.getEventIdentifier()); +// } } diff --git a/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/service/device/message/IotDeviceMessageServiceImpl.java b/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/service/device/message/IotDeviceMessageServiceImpl.java index d987baf..eb70bf3 100644 --- a/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/service/device/message/IotDeviceMessageServiceImpl.java +++ b/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/service/device/message/IotDeviceMessageServiceImpl.java @@ -14,6 +14,7 @@ import com.viewsh.module.iot.controller.admin.device.vo.message.IotDeviceMessage import com.viewsh.module.iot.controller.admin.statistics.vo.IotStatisticsDeviceMessageReqVO; import com.viewsh.module.iot.controller.admin.statistics.vo.IotStatisticsDeviceMessageSummaryByDateRespVO; import com.viewsh.module.iot.core.enums.IotDeviceMessageMethodEnum; +import com.viewsh.module.iot.core.enums.IotDeviceStateEnum; import com.viewsh.module.iot.core.mq.message.IotDeviceMessage; import com.viewsh.module.iot.core.mq.producer.IotDeviceMessageProducer; import com.viewsh.module.iot.core.util.IotDeviceMessageUtils; @@ -197,7 +198,9 @@ public class IotDeviceMessageServiceImpl implements IotDeviceMessageService { String stateStr = IotDeviceMessageUtils.getIdentifier(message); assert stateStr != null; Assert.notEmpty(stateStr, "设备状态不能为空"); - deviceService.updateDeviceState(device, Integer.valueOf(stateStr)); + // 兼容整数和字符串格式的状态值 + Integer state = parseStateValue(stateStr); + deviceService.updateDeviceState(device, state); // TODO 芋艿:子设备的关联 return null; } @@ -269,6 +272,34 @@ public class IotDeviceMessageServiceImpl implements IotDeviceMessageService { }); } + /** + * 解析状态值,支持整数和字符串格式 + *

+ * 支持格式: + * - 整数:0=未激活,1=在线,2=离线 + * - 字符串:0/inactive=未激活,1/online=在线,2/offline=离线 + * + * @param stateStr 状态字符串 + * @return 状态枚举值 + */ + private Integer parseStateValue(String stateStr) { + if (stateStr == null) { + return IotDeviceStateEnum.INACTIVE.getState(); + } + try { + // 尝试直接解析为整数 + return Integer.parseInt(stateStr); + } catch (NumberFormatException e) { + // 字符格式匹配 + String lower = stateStr.toLowerCase().trim(); + return switch (lower) { + case "1", "online" -> IotDeviceStateEnum.ONLINE.getState(); + case "2", "offline" -> IotDeviceStateEnum.OFFLINE.getState(); + default -> IotDeviceStateEnum.INACTIVE.getState(); + }; + } + } + private IotDeviceMessageServiceImpl getSelf() { return SpringUtil.getBean(getClass()); } diff --git a/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/service/device/property/IotDevicePropertyServiceImpl.java b/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/service/device/property/IotDevicePropertyServiceImpl.java index 3afb5e6..9dce672 100644 --- a/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/service/device/property/IotDevicePropertyServiceImpl.java +++ b/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/service/device/property/IotDevicePropertyServiceImpl.java @@ -184,7 +184,8 @@ public class IotDevicePropertyServiceImpl implements IotDevicePropertyService { processRuleProcessors(device, properties); // 2.4 发布属性消息到 Redis Stream(供其他模块如 Ops 订阅) - publishPropertyMessage(device, properties, message.getReportTime()); + // TODO: 暂停发布,后续根据需要开启 + // publishPropertyMessage(device, properties, message.getReportTime()); } /** @@ -197,7 +198,7 @@ public class IotDevicePropertyServiceImpl implements IotDevicePropertyService { */ private void processRuleProcessors(IotDeviceDO device, Map properties) { try { - // 遍历所有属性,调用规��处理器 + // 遍历所有属性,调用规则处理器 for (Map.Entry entry : properties.entrySet()) { String identifier = entry.getKey(); Object value = entry.getValue(); @@ -263,7 +264,7 @@ public class IotDevicePropertyServiceImpl implements IotDevicePropertyService { .eventTime(reportTime) .build(); - integrationEventPublisher.publishPropertyChanged(event); +// integrationEventPublisher.publishPropertyChanged(event); log.debug("[publishToIntegrationEventBus] 跨模块属性变更事件已发布: eventId={}, deviceId={}, productKey={}, properties={}", event.getEventId(), device.getId(), productKey, properties.keySet()); } catch (Exception e) {