diff --git a/viewsh-module-iot/viewsh-module-iot-core/src/main/java/com/viewsh/module/iot/core/integration/publisher/IntegrationEventPublisher.java b/viewsh-module-iot/viewsh-module-iot-core/src/main/java/com/viewsh/module/iot/core/integration/publisher/IntegrationEventPublisher.java index 6915d23..41a6c56 100644 --- a/viewsh-module-iot/viewsh-module-iot-core/src/main/java/com/viewsh/module/iot/core/integration/publisher/IntegrationEventPublisher.java +++ b/viewsh-module-iot/viewsh-module-iot-core/src/main/java/com/viewsh/module/iot/core/integration/publisher/IntegrationEventPublisher.java @@ -26,13 +26,13 @@ public interface IntegrationEventPublisher { * * @param event 属性变更事件 */ - void publishPropertyChanged(DevicePropertyChangedEvent event); +// void publishPropertyChanged(DevicePropertyChangedEvent event); /** * 发布设备事件上报事件 * * @param event 设备事件 */ - void publishEventOccurred(DeviceEventOccurredEvent event); +// void publishEventOccurred(DeviceEventOccurredEvent event); } diff --git a/viewsh-module-iot/viewsh-module-iot-core/src/main/java/com/viewsh/module/iot/core/integration/publisher/RocketMQIntegrationEventPublisher.java b/viewsh-module-iot/viewsh-module-iot-core/src/main/java/com/viewsh/module/iot/core/integration/publisher/RocketMQIntegrationEventPublisher.java index eb7e4fa..5510f05 100644 --- a/viewsh-module-iot/viewsh-module-iot-core/src/main/java/com/viewsh/module/iot/core/integration/publisher/RocketMQIntegrationEventPublisher.java +++ b/viewsh-module-iot/viewsh-module-iot-core/src/main/java/com/viewsh/module/iot/core/integration/publisher/RocketMQIntegrationEventPublisher.java @@ -1,11 +1,9 @@ package com.viewsh.module.iot.core.integration.publisher; +import com.viewsh.module.iot.core.integration.config.IntegrationEventProperties; import com.viewsh.module.iot.core.integration.constants.DeviceTags; import com.viewsh.module.iot.core.integration.constants.IntegrationTopics; -import com.viewsh.module.iot.core.integration.event.DeviceEventOccurredEvent; -import com.viewsh.module.iot.core.integration.event.DevicePropertyChangedEvent; import com.viewsh.module.iot.core.integration.event.DeviceStatusChangedEvent; -import com.viewsh.module.iot.core.integration.config.IntegrationEventProperties; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.springframework.messaging.Message; @@ -47,39 +45,18 @@ public class RocketMQIntegrationEventPublisher implements IntegrationEventPublis } } - @Override - public void publishPropertyChanged(DevicePropertyChangedEvent event) { - try { - String tag = DeviceTags.fromProductKey(event.getProductKey()); - String destination = IntegrationTopics.DEVICE_PROPERTY + ":" + tag; - - Message message = MessageBuilder.withPayload(event).build(); - rocketMQTemplate.syncSend(destination, message, properties.getSendTimeoutMs()); - - log.debug("[publishPropertyChanged] 发布设备属性变更事件: eventId={}, deviceId={}, productKey={}, properties={}", - event.getEventId(), event.getDeviceId(), event.getProductKey(), event.getChangedIdentifiers()); - } catch (Exception e) { - log.error("[publishPropertyChanged] 发布设备属性变更事件失败: eventId={}, deviceId={}", - event.getEventId(), event.getDeviceId(), e); - } - } - - @Override - public void publishEventOccurred(DeviceEventOccurredEvent event) { - try { - // 使用 productKey 作为 Tag - String tag = DeviceTags.fromProductKey(event.getProductKey()); - String destination = IntegrationTopics.DEVICE_EVENT + ":" + tag; - - Message message = MessageBuilder.withPayload(event).build(); - rocketMQTemplate.syncSend(destination, message, properties.getSendTimeoutMs()); - - log.debug("[publishEventOccurred] 发布设备事件上报事件: eventId={}, deviceId={}, productKey={}, event={}", - event.getEventId(), event.getDeviceId(), event.getProductKey(), event.getEventIdentifier()); - } catch (Exception e) { - log.error("[publishEventOccurred] 发布设备事件上报事件失败: eventId={}, deviceId={}", - event.getEventId(), event.getDeviceId(), e); - } - } +// @Override +// public void publishPropertyChanged(DevicePropertyChangedEvent event) { +// // 暂不处理属性变更事件 +// log.debug("[publishPropertyChanged] 跳过设备属性变更事件发布: eventId={}, deviceId={}, productKey={}", +// event.getEventId(), event.getDeviceId(), event.getProductKey()); +// } +// +// @Override +// public void publishEventOccurred(DeviceEventOccurredEvent event) { +// // 暂不处理设备事件上报 +// log.debug("[publishEventOccurred] 跳过设备事件上报发布: eventId={}, deviceId={}, productKey={}, event={}", +// event.getEventId(), event.getDeviceId(), event.getProductKey(), event.getEventIdentifier()); +// } } diff --git a/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/service/device/message/IotDeviceMessageServiceImpl.java b/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/service/device/message/IotDeviceMessageServiceImpl.java index d987baf..eb70bf3 100644 --- a/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/service/device/message/IotDeviceMessageServiceImpl.java +++ b/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/service/device/message/IotDeviceMessageServiceImpl.java @@ -14,6 +14,7 @@ import com.viewsh.module.iot.controller.admin.device.vo.message.IotDeviceMessage import com.viewsh.module.iot.controller.admin.statistics.vo.IotStatisticsDeviceMessageReqVO; import com.viewsh.module.iot.controller.admin.statistics.vo.IotStatisticsDeviceMessageSummaryByDateRespVO; import com.viewsh.module.iot.core.enums.IotDeviceMessageMethodEnum; +import com.viewsh.module.iot.core.enums.IotDeviceStateEnum; import com.viewsh.module.iot.core.mq.message.IotDeviceMessage; import com.viewsh.module.iot.core.mq.producer.IotDeviceMessageProducer; import com.viewsh.module.iot.core.util.IotDeviceMessageUtils; @@ -197,7 +198,9 @@ public class IotDeviceMessageServiceImpl implements IotDeviceMessageService { String stateStr = IotDeviceMessageUtils.getIdentifier(message); assert stateStr != null; Assert.notEmpty(stateStr, "设备状态不能为空"); - deviceService.updateDeviceState(device, Integer.valueOf(stateStr)); + // 兼容整数和字符串格式的状态值 + Integer state = parseStateValue(stateStr); + deviceService.updateDeviceState(device, state); // TODO 芋艿:子设备的关联 return null; } @@ -269,6 +272,34 @@ public class IotDeviceMessageServiceImpl implements IotDeviceMessageService { }); } + /** + * 解析状态值,支持整数和字符串格式 + *

+ * 支持格式: + * - 整数:0=未激活,1=在线,2=离线 + * - 字符串:0/inactive=未激活,1/online=在线,2/offline=离线 + * + * @param stateStr 状态字符串 + * @return 状态枚举值 + */ + private Integer parseStateValue(String stateStr) { + if (stateStr == null) { + return IotDeviceStateEnum.INACTIVE.getState(); + } + try { + // 尝试直接解析为整数 + return Integer.parseInt(stateStr); + } catch (NumberFormatException e) { + // 字符格式匹配 + String lower = stateStr.toLowerCase().trim(); + return switch (lower) { + case "1", "online" -> IotDeviceStateEnum.ONLINE.getState(); + case "2", "offline" -> IotDeviceStateEnum.OFFLINE.getState(); + default -> IotDeviceStateEnum.INACTIVE.getState(); + }; + } + } + private IotDeviceMessageServiceImpl getSelf() { return SpringUtil.getBean(getClass()); } diff --git a/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/service/device/property/IotDevicePropertyServiceImpl.java b/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/service/device/property/IotDevicePropertyServiceImpl.java index 3afb5e6..9dce672 100644 --- a/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/service/device/property/IotDevicePropertyServiceImpl.java +++ b/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/service/device/property/IotDevicePropertyServiceImpl.java @@ -184,7 +184,8 @@ public class IotDevicePropertyServiceImpl implements IotDevicePropertyService { processRuleProcessors(device, properties); // 2.4 发布属性消息到 Redis Stream(供其他模块如 Ops 订阅) - publishPropertyMessage(device, properties, message.getReportTime()); + // TODO: 暂停发布,后续根据需要开启 + // publishPropertyMessage(device, properties, message.getReportTime()); } /** @@ -197,7 +198,7 @@ public class IotDevicePropertyServiceImpl implements IotDevicePropertyService { */ private void processRuleProcessors(IotDeviceDO device, Map properties) { try { - // 遍历所有属性,调用规��处理器 + // 遍历所有属性,调用规则处理器 for (Map.Entry entry : properties.entrySet()) { String identifier = entry.getKey(); Object value = entry.getValue(); @@ -263,7 +264,7 @@ public class IotDevicePropertyServiceImpl implements IotDevicePropertyService { .eventTime(reportTime) .build(); - integrationEventPublisher.publishPropertyChanged(event); +// integrationEventPublisher.publishPropertyChanged(event); log.debug("[publishToIntegrationEventBus] 跨模块属性变更事件已发布: eventId={}, deviceId={}, productKey={}, properties={}", event.getEventId(), device.getId(), productKey, properties.keySet()); } catch (Exception e) {