feat(iot): 实现集成事件基础设施

This commit is contained in:
lzh
2026-01-15 16:14:16 +08:00
parent 85f1a2cca7
commit a25c16f151
11 changed files with 449 additions and 1 deletions

View File

@@ -0,0 +1,37 @@
package com.viewsh.module.iot.core.integration.config;
import com.viewsh.module.iot.core.integration.publisher.IntegrationEventPublisher;
import com.viewsh.module.iot.core.integration.publisher.RocketMQIntegrationEventPublisher;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.boot.autoconfigure.AutoConfiguration;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
/**
* 跨模块事件总线自动配置
* <p>
* 独立于 IotMessageBus用于向其他模块发布事件
*
* @author lzh
*/
@AutoConfiguration
@EnableConfigurationProperties(IntegrationEventProperties.class)
@Slf4j
public class IntegrationEventAutoConfiguration {
/**
* RocketMQ 实现
*/
@Bean
@ConditionalOnClass(RocketMQTemplate.class)
@ConditionalOnProperty(prefix = "viewsh.integration.mq", name = "enabled", havingValue = "true", matchIfMissing = true)
public IntegrationEventPublisher integrationEventPublisher(RocketMQTemplate rocketMQTemplate,
IntegrationEventProperties properties) {
log.info("[integrationEventPublisher][创建跨模块事件发布器 - RocketMQ 实现]");
return new RocketMQIntegrationEventPublisher(rocketMQTemplate, properties);
}
}

View File

@@ -0,0 +1,37 @@
package com.viewsh.module.iot.core.integration.config;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
/**
* 跨模块事件总线配置属性
* <p>
* 用于配置 IntegrationEventBus与 IotMessageBus 独立
*
* @author lzh
*/
@Data
@ConfigurationProperties(prefix = "viewsh.integration.mq")
public class IntegrationEventProperties {
/**
* 是否启用跨模块事件发布
*/
private Boolean enabled = true;
/**
* 生产者组名
*/
private String producerGroup = "integration-event-producer";
/**
* 发送超时时间(毫秒)
*/
private Integer sendTimeoutMs = 3000;
/**
* 消息最大长度(字节)
*/
private Integer maxMessageSize = 4 * 1024 * 1024; // 4MB
}

View File

@@ -0,0 +1,28 @@
package com.viewsh.module.iot.core.integration.constants;
/**
* 设备 Tag 工具类
* <p>
* 跨模块事件使用 productKey 作为 Tag 进行消息过滤
*
* @author lzh
*/
public final class DeviceTags {
private DeviceTags() {
}
/**
* 根据 productKey 构建 Tag
*
* @param productKey 产品 Key
* @return Tag 值
*/
public static String fromProductKey(String productKey) {
if (productKey == null || productKey.isEmpty()) {
return "unknown";
}
return productKey;
}
}

View File

@@ -0,0 +1,36 @@
package com.viewsh.module.iot.core.integration.constants;
/**
* 跨模块事件 Topic 常量
* <p>
* 定义 IntegrationEventBus 发布事件使用的 RocketMQ Topic
*
* @author lzh
*/
public class IntegrationTopics {
/**
* 设备状态变更 Topic
* <p>
* 用于发布设备上线/离线状态变更事件
*/
public static final String DEVICE_STATUS = "integration.device.status";
/**
* 设备属性变更 Topic
* <p>
* 用于发布设备属性变更事件
*/
public static final String DEVICE_PROPERTY = "integration.device.property";
/**
* 设备事件上报 Topic
* <p>
* 用于发布设备事件SOS、按键等
*/
public static final String DEVICE_EVENT = "integration.device.event";
private IntegrationTopics() {
}
}

View File

@@ -0,0 +1,61 @@
package com.viewsh.module.iot.core.integration.event;
import lombok.AllArgsConstructor;
import lombok.experimental.SuperBuilder;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.time.LocalDateTime;
import java.util.UUID;
/**
* 跨模块设备事件基类
* <p>
* 用于 IntegrationEventBus发布到 RocketMQ 供其他模块消费
*
* @author lzh
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
@SuperBuilder
public abstract class BaseDeviceEvent {
/**
* 事件ID唯一标识用于幂等性处理
*/
@Builder.Default
private String eventId = UUID.randomUUID().toString();
/**
* 设备ID
*/
private Long deviceId;
/**
* 设备名称deviceName
*/
private String deviceName;
/**
* 产品ID
*/
private Long productId;
/**
* 产品标识符productKey用作 RocketMQ Tag
*/
private String productKey;
/**
* 租户ID
*/
private Long tenantId;
/**
* 事件时间
*/
private LocalDateTime eventTime;
}

View File

@@ -0,0 +1,46 @@
package com.viewsh.module.iot.core.integration.event;
import lombok.AllArgsConstructor;
import lombok.experimental.SuperBuilder;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
import java.util.Map;
/**
* 设备事件上报事件
* <p>
* 用于通知业务模块(如 Ops 模块)设备事件发生
* <p>
* 发布到 RocketMQ Topic: integration.device.event
* Tag: productKey
*
* @author lzh
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
@SuperBuilder
@EqualsAndHashCode(callSuper = true)
public class DeviceEventOccurredEvent extends BaseDeviceEvent {
/**
* 事件标识符
* 例如: button_click按键点击, sos紧急求救, fall_detected跌倒检测
*/
private String eventIdentifier;
/**
* 事件类型(用于 Tag 过滤的补充分类)
* 例如: ALARM告警事件, CONTROL控制事件, INFO信息事件
*/
private String eventType;
/**
* 事件参数
* 例如: {count: 1, duration: 1000}
*/
private Map<String, Object> eventParams;
}

View File

@@ -0,0 +1,40 @@
package com.viewsh.module.iot.core.integration.event;
import lombok.AllArgsConstructor;
import lombok.experimental.SuperBuilder;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
import java.util.Map;
/**
* 设备属性变更事件
* <p>
* 用于通知业务模块(如 Ops 模块)设备属性变化
* <p>
* 发布到 RocketMQ Topic: integration.device.property
* Tag: productKey
*
* @author lzh
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
@SuperBuilder
@EqualsAndHashCode(callSuper = true)
public class DevicePropertyChangedEvent extends BaseDeviceEvent {
/**
* 属性数据(变更后的属性)
* 例如: {battery: 80, signal: 4, temperature: 25.5}
*/
private Map<String, Object> properties;
/**
* 变更的属性标识符列表
* 用于快速识别哪些属性发生了变化
*/
private java.util.Set<String> changedIdentifiers;
}

View File

@@ -0,0 +1,39 @@
package com.viewsh.module.iot.core.integration.event;
import lombok.AllArgsConstructor;
import lombok.experimental.SuperBuilder;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
/**
* 设备状态变更事件
* <p>
* 发布到 RocketMQ Topic: integration.device.status
* Tag: productKey
*
* @author lzh
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
@SuperBuilder
@EqualsAndHashCode(callSuper = true)
public class DeviceStatusChangedEvent extends BaseDeviceEvent {
/**
* 旧状态1=ONLINE, 2=OFFLINE, 0=INACTIVE
*/
private Integer oldStatus;
/**
* 新状态1=ONLINE, 2=OFFLINE, 0=INACTIVE
*/
private Integer newStatus;
/**
* 状态变更原因
*/
private String reason;
}

View File

@@ -0,0 +1,38 @@
package com.viewsh.module.iot.core.integration.publisher;
import com.viewsh.module.iot.core.integration.event.DeviceEventOccurredEvent;
import com.viewsh.module.iot.core.integration.event.DevicePropertyChangedEvent;
import com.viewsh.module.iot.core.integration.event.DeviceStatusChangedEvent;
/**
* 跨模块事件发布器接口
* <p>
* 用于向其他模块(如 Ops 模块)发布设备事件
*
* @author lzh
*/
public interface IntegrationEventPublisher {
/**
* 发布设备状态变更事件
*
* @param event 状态变更事件
*/
void publishStatusChanged(DeviceStatusChangedEvent event);
/**
* 发布设备属性变更事件
*
* @param event 属性变更事件
*/
void publishPropertyChanged(DevicePropertyChangedEvent event);
/**
* 发布设备事件上报事件
*
* @param event 设备事件
*/
void publishEventOccurred(DeviceEventOccurredEvent event);
}

View File

@@ -0,0 +1,85 @@
package com.viewsh.module.iot.core.integration.publisher;
import com.viewsh.module.iot.core.integration.constants.DeviceTags;
import com.viewsh.module.iot.core.integration.constants.IntegrationTopics;
import com.viewsh.module.iot.core.integration.event.DeviceEventOccurredEvent;
import com.viewsh.module.iot.core.integration.event.DevicePropertyChangedEvent;
import com.viewsh.module.iot.core.integration.event.DeviceStatusChangedEvent;
import com.viewsh.module.iot.core.integration.config.IntegrationEventProperties;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
/**
* 基于 RocketMQ 的跨模块事件发布器实现
* <p>
* Tag 设计: 使用 productKey 作为 Tag 进行消息过滤
*
* @author lzh
*/
@Slf4j
public class RocketMQIntegrationEventPublisher implements IntegrationEventPublisher {
private final RocketMQTemplate rocketMQTemplate;
private final IntegrationEventProperties properties;
public RocketMQIntegrationEventPublisher(RocketMQTemplate rocketMQTemplate,
IntegrationEventProperties properties) {
this.rocketMQTemplate = rocketMQTemplate;
this.properties = properties;
}
@Override
public void publishStatusChanged(DeviceStatusChangedEvent event) {
try {
String tag = DeviceTags.fromProductKey(event.getProductKey());
String destination = IntegrationTopics.DEVICE_STATUS + ":" + tag;
Message<DeviceStatusChangedEvent> message = MessageBuilder.withPayload(event).build();
rocketMQTemplate.syncSend(destination, message, properties.getSendTimeoutMs());
log.debug("[publishStatusChanged] 发布设备状态变更事件: eventId={}, deviceId={}, productKey={}, status={}",
event.getEventId(), event.getDeviceId(), event.getProductKey(), event.getNewStatus());
} catch (Exception e) {
log.error("[publishStatusChanged] 发布设备状态变更事件失败: eventId={}, deviceId={}",
event.getEventId(), event.getDeviceId(), e);
}
}
@Override
public void publishPropertyChanged(DevicePropertyChangedEvent event) {
try {
String tag = DeviceTags.fromProductKey(event.getProductKey());
String destination = IntegrationTopics.DEVICE_PROPERTY + ":" + tag;
Message<DevicePropertyChangedEvent> message = MessageBuilder.withPayload(event).build();
rocketMQTemplate.syncSend(destination, message, properties.getSendTimeoutMs());
log.debug("[publishPropertyChanged] 发布设备属性变更事件: eventId={}, deviceId={}, productKey={}, properties={}",
event.getEventId(), event.getDeviceId(), event.getProductKey(), event.getChangedIdentifiers());
} catch (Exception e) {
log.error("[publishPropertyChanged] 发布设备属性变更事件失败: eventId={}, deviceId={}",
event.getEventId(), event.getDeviceId(), e);
}
}
@Override
public void publishEventOccurred(DeviceEventOccurredEvent event) {
try {
// 使用 productKey 作为 Tag
String tag = DeviceTags.fromProductKey(event.getProductKey());
String destination = IntegrationTopics.DEVICE_EVENT + ":" + tag;
Message<DeviceEventOccurredEvent> message = MessageBuilder.withPayload(event).build();
rocketMQTemplate.syncSend(destination, message, properties.getSendTimeoutMs());
log.debug("[publishEventOccurred] 发布设备事件上报事件: eventId={}, deviceId={}, productKey={}, event={}",
event.getEventId(), event.getDeviceId(), event.getProductKey(), event.getEventIdentifier());
} catch (Exception e) {
log.error("[publishEventOccurred] 发布设备事件上报事件失败: eventId={}, deviceId={}",
event.getEventId(), event.getDeviceId(), e);
}
}
}

View File

@@ -1 +1,2 @@
com.viewsh.module.iot.core.messagebus.config.IotMessageBusAutoConfiguration
com.viewsh.module.iot.core.messagebus.config.IotMessageBusAutoConfiguration
com.viewsh.module.iot.core.integration.config.IntegrationEventAutoConfiguration