diff --git a/viewsh-module-iot/viewsh-module-iot-core/src/main/java/com/viewsh/module/iot/core/integration/config/IntegrationEventAutoConfiguration.java b/viewsh-module-iot/viewsh-module-iot-core/src/main/java/com/viewsh/module/iot/core/integration/config/IntegrationEventAutoConfiguration.java new file mode 100644 index 0000000..b881371 --- /dev/null +++ b/viewsh-module-iot/viewsh-module-iot-core/src/main/java/com/viewsh/module/iot/core/integration/config/IntegrationEventAutoConfiguration.java @@ -0,0 +1,37 @@ +package com.viewsh.module.iot.core.integration.config; + +import com.viewsh.module.iot.core.integration.publisher.IntegrationEventPublisher; +import com.viewsh.module.iot.core.integration.publisher.RocketMQIntegrationEventPublisher; +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.spring.core.RocketMQTemplate; +import org.springframework.boot.autoconfigure.AutoConfiguration; +import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.annotation.Bean; + +/** + * 跨模块事件总线自动配置 + *

+ * 独立于 IotMessageBus,用于向其他模块发布事件 + * + * @author lzh + */ +@AutoConfiguration +@EnableConfigurationProperties(IntegrationEventProperties.class) +@Slf4j +public class IntegrationEventAutoConfiguration { + + /** + * RocketMQ 实现 + */ + @Bean + @ConditionalOnClass(RocketMQTemplate.class) + @ConditionalOnProperty(prefix = "viewsh.integration.mq", name = "enabled", havingValue = "true", matchIfMissing = true) + public IntegrationEventPublisher integrationEventPublisher(RocketMQTemplate rocketMQTemplate, + IntegrationEventProperties properties) { + log.info("[integrationEventPublisher][创建跨模块事件发布器 - RocketMQ 实现]"); + return new RocketMQIntegrationEventPublisher(rocketMQTemplate, properties); + } + +} diff --git a/viewsh-module-iot/viewsh-module-iot-core/src/main/java/com/viewsh/module/iot/core/integration/config/IntegrationEventProperties.java b/viewsh-module-iot/viewsh-module-iot-core/src/main/java/com/viewsh/module/iot/core/integration/config/IntegrationEventProperties.java new file mode 100644 index 0000000..c4d7972 --- /dev/null +++ b/viewsh-module-iot/viewsh-module-iot-core/src/main/java/com/viewsh/module/iot/core/integration/config/IntegrationEventProperties.java @@ -0,0 +1,37 @@ +package com.viewsh.module.iot.core.integration.config; + +import lombok.Data; +import org.springframework.boot.context.properties.ConfigurationProperties; + +/** + * 跨模块事件总线配置属性 + *

+ * 用于配置 IntegrationEventBus,与 IotMessageBus 独立 + * + * @author lzh + */ +@Data +@ConfigurationProperties(prefix = "viewsh.integration.mq") +public class IntegrationEventProperties { + + /** + * 是否启用跨模块事件发布 + */ + private Boolean enabled = true; + + /** + * 生产者组名 + */ + private String producerGroup = "integration-event-producer"; + + /** + * 发送超时时间(毫秒) + */ + private Integer sendTimeoutMs = 3000; + + /** + * 消息最大长度(字节) + */ + private Integer maxMessageSize = 4 * 1024 * 1024; // 4MB + +} diff --git a/viewsh-module-iot/viewsh-module-iot-core/src/main/java/com/viewsh/module/iot/core/integration/constants/DeviceTags.java b/viewsh-module-iot/viewsh-module-iot-core/src/main/java/com/viewsh/module/iot/core/integration/constants/DeviceTags.java new file mode 100644 index 0000000..1e113bd --- /dev/null +++ b/viewsh-module-iot/viewsh-module-iot-core/src/main/java/com/viewsh/module/iot/core/integration/constants/DeviceTags.java @@ -0,0 +1,28 @@ +package com.viewsh.module.iot.core.integration.constants; + +/** + * 设备 Tag 工具类 + *

+ * 跨模块事件使用 productKey 作为 Tag 进行消息过滤 + * + * @author lzh + */ +public final class DeviceTags { + + private DeviceTags() { + } + + /** + * 根据 productKey 构建 Tag + * + * @param productKey 产品 Key + * @return Tag 值 + */ + public static String fromProductKey(String productKey) { + if (productKey == null || productKey.isEmpty()) { + return "unknown"; + } + return productKey; + } + +} diff --git a/viewsh-module-iot/viewsh-module-iot-core/src/main/java/com/viewsh/module/iot/core/integration/constants/IntegrationTopics.java b/viewsh-module-iot/viewsh-module-iot-core/src/main/java/com/viewsh/module/iot/core/integration/constants/IntegrationTopics.java new file mode 100644 index 0000000..a6e0b60 --- /dev/null +++ b/viewsh-module-iot/viewsh-module-iot-core/src/main/java/com/viewsh/module/iot/core/integration/constants/IntegrationTopics.java @@ -0,0 +1,36 @@ +package com.viewsh.module.iot.core.integration.constants; + +/** + * 跨模块事件 Topic 常量 + *

+ * 定义 IntegrationEventBus 发布事件使用的 RocketMQ Topic + * + * @author lzh + */ +public class IntegrationTopics { + + /** + * 设备状态变更 Topic + *

+ * 用于发布设备上线/离线状态变更事件 + */ + public static final String DEVICE_STATUS = "integration.device.status"; + + /** + * 设备属性变更 Topic + *

+ * 用于发布设备属性变更事件 + */ + public static final String DEVICE_PROPERTY = "integration.device.property"; + + /** + * 设备事件上报 Topic + *

+ * 用于发布设备事件(SOS、按键等) + */ + public static final String DEVICE_EVENT = "integration.device.event"; + + private IntegrationTopics() { + } + +} diff --git a/viewsh-module-iot/viewsh-module-iot-core/src/main/java/com/viewsh/module/iot/core/integration/event/BaseDeviceEvent.java b/viewsh-module-iot/viewsh-module-iot-core/src/main/java/com/viewsh/module/iot/core/integration/event/BaseDeviceEvent.java new file mode 100644 index 0000000..74893f6 --- /dev/null +++ b/viewsh-module-iot/viewsh-module-iot-core/src/main/java/com/viewsh/module/iot/core/integration/event/BaseDeviceEvent.java @@ -0,0 +1,61 @@ +package com.viewsh.module.iot.core.integration.event; + +import lombok.AllArgsConstructor; +import lombok.experimental.SuperBuilder; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.time.LocalDateTime; +import java.util.UUID; + +/** + * 跨模块设备事件基类 + *

+ * 用于 IntegrationEventBus,发布到 RocketMQ 供其他模块消费 + * + * @author lzh + */ +@Data +@NoArgsConstructor +@AllArgsConstructor +@SuperBuilder +public abstract class BaseDeviceEvent { + + /** + * 事件ID(唯一标识,用于幂等性处理) + */ + @Builder.Default + private String eventId = UUID.randomUUID().toString(); + + /** + * 设备ID + */ + private Long deviceId; + + /** + * 设备名称(deviceName) + */ + private String deviceName; + + /** + * 产品ID + */ + private Long productId; + + /** + * 产品标识符(productKey,用作 RocketMQ Tag) + */ + private String productKey; + + /** + * 租户ID + */ + private Long tenantId; + + /** + * 事件时间 + */ + private LocalDateTime eventTime; + +} diff --git a/viewsh-module-iot/viewsh-module-iot-core/src/main/java/com/viewsh/module/iot/core/integration/event/DeviceEventOccurredEvent.java b/viewsh-module-iot/viewsh-module-iot-core/src/main/java/com/viewsh/module/iot/core/integration/event/DeviceEventOccurredEvent.java new file mode 100644 index 0000000..6b73833 --- /dev/null +++ b/viewsh-module-iot/viewsh-module-iot-core/src/main/java/com/viewsh/module/iot/core/integration/event/DeviceEventOccurredEvent.java @@ -0,0 +1,46 @@ +package com.viewsh.module.iot.core.integration.event; + +import lombok.AllArgsConstructor; +import lombok.experimental.SuperBuilder; +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.NoArgsConstructor; + +import java.util.Map; + +/** + * 设备事件上报事件 + *

+ * 用于通知业务模块(如 Ops 模块)设备事件发生 + *

+ * 发布到 RocketMQ Topic: integration.device.event + * Tag: productKey + * + * @author lzh + */ +@Data +@NoArgsConstructor +@AllArgsConstructor +@SuperBuilder +@EqualsAndHashCode(callSuper = true) +public class DeviceEventOccurredEvent extends BaseDeviceEvent { + + /** + * 事件标识符 + * 例如: button_click(按键点击), sos(紧急求救), fall_detected(跌倒检测)等 + */ + private String eventIdentifier; + + /** + * 事件类型(用于 Tag 过滤的补充分类) + * 例如: ALARM(告警事件), CONTROL(控制事件), INFO(信息事件) + */ + private String eventType; + + /** + * 事件参数 + * 例如: {count: 1, duration: 1000} + */ + private Map eventParams; + +} diff --git a/viewsh-module-iot/viewsh-module-iot-core/src/main/java/com/viewsh/module/iot/core/integration/event/DevicePropertyChangedEvent.java b/viewsh-module-iot/viewsh-module-iot-core/src/main/java/com/viewsh/module/iot/core/integration/event/DevicePropertyChangedEvent.java new file mode 100644 index 0000000..e25b000 --- /dev/null +++ b/viewsh-module-iot/viewsh-module-iot-core/src/main/java/com/viewsh/module/iot/core/integration/event/DevicePropertyChangedEvent.java @@ -0,0 +1,40 @@ +package com.viewsh.module.iot.core.integration.event; + +import lombok.AllArgsConstructor; +import lombok.experimental.SuperBuilder; +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.NoArgsConstructor; + +import java.util.Map; + +/** + * 设备属性变更事件 + *

+ * 用于通知业务模块(如 Ops 模块)设备属性变化 + *

+ * 发布到 RocketMQ Topic: integration.device.property + * Tag: productKey + * + * @author lzh + */ +@Data +@NoArgsConstructor +@AllArgsConstructor +@SuperBuilder +@EqualsAndHashCode(callSuper = true) +public class DevicePropertyChangedEvent extends BaseDeviceEvent { + + /** + * 属性数据(变更后的属性) + * 例如: {battery: 80, signal: 4, temperature: 25.5} + */ + private Map properties; + + /** + * 变更的属性标识符列表 + * 用于快速识别哪些属性发生了变化 + */ + private java.util.Set changedIdentifiers; + +} diff --git a/viewsh-module-iot/viewsh-module-iot-core/src/main/java/com/viewsh/module/iot/core/integration/event/DeviceStatusChangedEvent.java b/viewsh-module-iot/viewsh-module-iot-core/src/main/java/com/viewsh/module/iot/core/integration/event/DeviceStatusChangedEvent.java new file mode 100644 index 0000000..a0f2020 --- /dev/null +++ b/viewsh-module-iot/viewsh-module-iot-core/src/main/java/com/viewsh/module/iot/core/integration/event/DeviceStatusChangedEvent.java @@ -0,0 +1,39 @@ +package com.viewsh.module.iot.core.integration.event; + +import lombok.AllArgsConstructor; +import lombok.experimental.SuperBuilder; +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.NoArgsConstructor; + +/** + * 设备状态变更事件 + *

+ * 发布到 RocketMQ Topic: integration.device.status + * Tag: productKey + * + * @author lzh + */ +@Data +@NoArgsConstructor +@AllArgsConstructor +@SuperBuilder +@EqualsAndHashCode(callSuper = true) +public class DeviceStatusChangedEvent extends BaseDeviceEvent { + + /** + * 旧状态(1=ONLINE, 2=OFFLINE, 0=INACTIVE) + */ + private Integer oldStatus; + + /** + * 新状态(1=ONLINE, 2=OFFLINE, 0=INACTIVE) + */ + private Integer newStatus; + + /** + * 状态变更原因 + */ + private String reason; + +} diff --git a/viewsh-module-iot/viewsh-module-iot-core/src/main/java/com/viewsh/module/iot/core/integration/publisher/IntegrationEventPublisher.java b/viewsh-module-iot/viewsh-module-iot-core/src/main/java/com/viewsh/module/iot/core/integration/publisher/IntegrationEventPublisher.java new file mode 100644 index 0000000..6915d23 --- /dev/null +++ b/viewsh-module-iot/viewsh-module-iot-core/src/main/java/com/viewsh/module/iot/core/integration/publisher/IntegrationEventPublisher.java @@ -0,0 +1,38 @@ +package com.viewsh.module.iot.core.integration.publisher; + + +import com.viewsh.module.iot.core.integration.event.DeviceEventOccurredEvent; +import com.viewsh.module.iot.core.integration.event.DevicePropertyChangedEvent; +import com.viewsh.module.iot.core.integration.event.DeviceStatusChangedEvent; + +/** + * 跨模块事件发布器接口 + *

+ * 用于向其他模块(如 Ops 模块)发布设备事件 + * + * @author lzh + */ +public interface IntegrationEventPublisher { + + /** + * 发布设备状态变更事件 + * + * @param event 状态变更事件 + */ + void publishStatusChanged(DeviceStatusChangedEvent event); + + /** + * 发布设备属性变更事件 + * + * @param event 属性变更事件 + */ + void publishPropertyChanged(DevicePropertyChangedEvent event); + + /** + * 发布设备事件上报事件 + * + * @param event 设备事件 + */ + void publishEventOccurred(DeviceEventOccurredEvent event); + +} diff --git a/viewsh-module-iot/viewsh-module-iot-core/src/main/java/com/viewsh/module/iot/core/integration/publisher/RocketMQIntegrationEventPublisher.java b/viewsh-module-iot/viewsh-module-iot-core/src/main/java/com/viewsh/module/iot/core/integration/publisher/RocketMQIntegrationEventPublisher.java new file mode 100644 index 0000000..eb7e4fa --- /dev/null +++ b/viewsh-module-iot/viewsh-module-iot-core/src/main/java/com/viewsh/module/iot/core/integration/publisher/RocketMQIntegrationEventPublisher.java @@ -0,0 +1,85 @@ +package com.viewsh.module.iot.core.integration.publisher; + +import com.viewsh.module.iot.core.integration.constants.DeviceTags; +import com.viewsh.module.iot.core.integration.constants.IntegrationTopics; +import com.viewsh.module.iot.core.integration.event.DeviceEventOccurredEvent; +import com.viewsh.module.iot.core.integration.event.DevicePropertyChangedEvent; +import com.viewsh.module.iot.core.integration.event.DeviceStatusChangedEvent; +import com.viewsh.module.iot.core.integration.config.IntegrationEventProperties; +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.spring.core.RocketMQTemplate; +import org.springframework.messaging.Message; +import org.springframework.messaging.support.MessageBuilder; + +/** + * 基于 RocketMQ 的跨模块事件发布器实现 + *

+ * Tag 设计: 使用 productKey 作为 Tag 进行消息过滤 + * + * @author lzh + */ +@Slf4j +public class RocketMQIntegrationEventPublisher implements IntegrationEventPublisher { + + private final RocketMQTemplate rocketMQTemplate; + private final IntegrationEventProperties properties; + + public RocketMQIntegrationEventPublisher(RocketMQTemplate rocketMQTemplate, + IntegrationEventProperties properties) { + this.rocketMQTemplate = rocketMQTemplate; + this.properties = properties; + } + + @Override + public void publishStatusChanged(DeviceStatusChangedEvent event) { + try { + String tag = DeviceTags.fromProductKey(event.getProductKey()); + String destination = IntegrationTopics.DEVICE_STATUS + ":" + tag; + + Message message = MessageBuilder.withPayload(event).build(); + rocketMQTemplate.syncSend(destination, message, properties.getSendTimeoutMs()); + + log.debug("[publishStatusChanged] 发布设备状态变更事件: eventId={}, deviceId={}, productKey={}, status={}", + event.getEventId(), event.getDeviceId(), event.getProductKey(), event.getNewStatus()); + } catch (Exception e) { + log.error("[publishStatusChanged] 发布设备状态变更事件失败: eventId={}, deviceId={}", + event.getEventId(), event.getDeviceId(), e); + } + } + + @Override + public void publishPropertyChanged(DevicePropertyChangedEvent event) { + try { + String tag = DeviceTags.fromProductKey(event.getProductKey()); + String destination = IntegrationTopics.DEVICE_PROPERTY + ":" + tag; + + Message message = MessageBuilder.withPayload(event).build(); + rocketMQTemplate.syncSend(destination, message, properties.getSendTimeoutMs()); + + log.debug("[publishPropertyChanged] 发布设备属性变更事件: eventId={}, deviceId={}, productKey={}, properties={}", + event.getEventId(), event.getDeviceId(), event.getProductKey(), event.getChangedIdentifiers()); + } catch (Exception e) { + log.error("[publishPropertyChanged] 发布设备属性变更事件失败: eventId={}, deviceId={}", + event.getEventId(), event.getDeviceId(), e); + } + } + + @Override + public void publishEventOccurred(DeviceEventOccurredEvent event) { + try { + // 使用 productKey 作为 Tag + String tag = DeviceTags.fromProductKey(event.getProductKey()); + String destination = IntegrationTopics.DEVICE_EVENT + ":" + tag; + + Message message = MessageBuilder.withPayload(event).build(); + rocketMQTemplate.syncSend(destination, message, properties.getSendTimeoutMs()); + + log.debug("[publishEventOccurred] 发布设备事件上报事件: eventId={}, deviceId={}, productKey={}, event={}", + event.getEventId(), event.getDeviceId(), event.getProductKey(), event.getEventIdentifier()); + } catch (Exception e) { + log.error("[publishEventOccurred] 发布设备事件上报事件失败: eventId={}, deviceId={}", + event.getEventId(), event.getDeviceId(), e); + } + } + +} diff --git a/viewsh-module-iot/viewsh-module-iot-core/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports b/viewsh-module-iot/viewsh-module-iot-core/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports index 5e6bb0c..08d3aac 100644 --- a/viewsh-module-iot/viewsh-module-iot-core/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports +++ b/viewsh-module-iot/viewsh-module-iot-core/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports @@ -1 +1,2 @@ -com.viewsh.module.iot.core.messagebus.config.IotMessageBusAutoConfiguration \ No newline at end of file +com.viewsh.module.iot.core.messagebus.config.IotMessageBusAutoConfiguration +com.viewsh.module.iot.core.integration.config.IntegrationEventAutoConfiguration \ No newline at end of file