diff --git a/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/consumer/DeviceEventEventHandler.java b/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/consumer/DeviceEventEventHandler.java new file mode 100644 index 0000000..2163139 --- /dev/null +++ b/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/consumer/DeviceEventEventHandler.java @@ -0,0 +1,152 @@ +package com.viewsh.module.ops.environment.integration.consumer; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.viewsh.module.ops.environment.integration.dto.DeviceEventOccurredEventDTO; +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.spring.annotation.ConsumeMode; +import org.springframework.stereotype.Component; + +/** + * 设备事件上报消费者 + *

+ * 订阅 IoT 模块发布的设备事件上报(SOS、按键等) + *

+ * RocketMQ 配置: + * - Topic: integration.device.event + * - Tag: productKey(可配置,如 cleaner_badge_v1) + * + * @author lzh + */ +@Slf4j +@Component +@org.apache.rocketmq.spring.annotation.RocketMQMessageListener( + topic = "integration.device.event", + consumerGroup = "ops-device-event-group", + consumeMode = ConsumeMode.CONCURRENTLY, + selectorExpression = "*" +) +public class DeviceEventEventHandler implements org.apache.rocketmq.spring.core.RocketMQListener { + + @Resource + private ObjectMapper objectMapper; + + @Resource + private IntegrationEventDeduplicationService deduplicationService; + + @Override + public void onMessage(String message) { + try { + // 1. JSON 反序列化 + DeviceEventOccurredEventDTO event = objectMapper.readValue(message, DeviceEventOccurredEventDTO.class); + + // 2. 幂等性检查 + if (!deduplicationService.tryConsume(event.getEventId())) { + log.debug("[DeviceEventEventHandler] 重复消息,跳过处理: eventId={}", event.getEventId()); + return; + } + + // 3. 业务处理 + handleEventOccurred(event); + + } catch (Exception e) { + log.error("[DeviceEventEventHandler] 消息处理失败: message={}", message, e); + throw new RuntimeException("设备事件上报处理失败", e); + } + } + + /** + * 处理设备事件 + */ + private void handleEventOccurred(DeviceEventOccurredEventDTO event) { + Long deviceId = event.getDeviceId(); + String eventIdentifier = event.getEventIdentifier(); + + log.info("[DeviceEventEventHandler] 设备事件: deviceId={}, event={}, productKey={}", + deviceId, eventIdentifier, event.getProductKey()); + + // 根据事件类型分发处理 + switch (eventIdentifier) { + case "sos": + handleSosEvent(event); + break; + case "button_click": + handleButtonClick(event); + break; + case "fall_detected": + handleFallDetected(event); + break; + case "confirm_order": + handleConfirmOrder(event); + break; + case "complete_order": + handleCompleteOrder(event); + break; + case "pause_order": + handlePauseOrder(event); + break; + default: + log.debug("[DeviceEventEventHandler] 未知事件类型: {}", eventIdentifier); + break; + } + + log.debug("[DeviceEventEventHandler] 处理完成: deviceId={}, event={}", deviceId, eventIdentifier); + } + + /** + * 处理 SOS 告警事件 + */ + private void handleSosEvent(DeviceEventOccurredEventDTO event) { + log.warn("[DeviceEventEventHandler] SOS 告警: deviceId={}, productKey={}", + event.getDeviceId(), event.getProductKey()); + + // TODO: 触发紧急工单或告警 + // 1. 查找设备关联的保洁员 + // 2. 发送告警通知 + // 3. 创建紧急工单 + } + + /** + * 处理按键点击事件 + */ + private void handleButtonClick(DeviceEventOccurredEventDTO event) { + log.info("[DeviceEventEventHandler] 按键点击: deviceId={}", event.getDeviceId()); + } + + /** + * 处理跌倒检测事件 + */ + private void handleFallDetected(DeviceEventOccurredEventDTO event) { + log.warn("[DeviceEventEventHandler] 跌倒检测: deviceId={}", event.getDeviceId()); + + // TODO: 触发紧急告警 + } + + /** + * 处理工单确认事件 + */ + private void handleConfirmOrder(DeviceEventOccurredEventDTO event) { + log.info("[DeviceEventEventHandler] 工单确认: deviceId={}", event.getDeviceId()); + + // TODO: 更新工单状态为已确认 + } + + /** + * 处理工单完成事件 + */ + private void handleCompleteOrder(DeviceEventOccurredEventDTO event) { + log.info("[DeviceEventEventHandler] 工单完成: deviceId={}", event.getDeviceId()); + + // TODO: 更新工单状态为已完成 + } + + /** + * 处理工单暂停事件 + */ + private void handlePauseOrder(DeviceEventOccurredEventDTO event) { + log.info("[DeviceEventEventHandler] 工单暂停: deviceId={}", event.getDeviceId()); + + // TODO: 更新工单状态为已暂停 + } + +} diff --git a/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/consumer/DevicePropertyEventHandler.java b/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/consumer/DevicePropertyEventHandler.java new file mode 100644 index 0000000..91d4715 --- /dev/null +++ b/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/consumer/DevicePropertyEventHandler.java @@ -0,0 +1,117 @@ +package com.viewsh.module.ops.environment.integration.consumer; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.viewsh.module.ops.environment.integration.dto.DevicePropertyChangedEventDTO; +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.spring.annotation.ConsumeMode; +import org.apache.rocketmq.spring.core.RocketMQListener; +import org.springframework.stereotype.Component; + +/** + * 设备属性变更事件消费者 + *

+ * 订阅 IoT 模块发布的设备属性变更事件 + *

+ * RocketMQ 配置: + * - Topic: integration.device.property + * - Tag: productKey(可配置,如 cleaner_badge_v1) + * + * @author lzh + */ +@Slf4j +@Component +@org.apache.rocketmq.spring.annotation.RocketMQMessageListener( + topic = "integration.device.property", + consumerGroup = "ops-device-property-group", + consumeMode = ConsumeMode.CONCURRENTLY, + selectorExpression = "*" +) +public class DevicePropertyEventHandler implements RocketMQListener { + + @Resource + private ObjectMapper objectMapper; + + @Resource + private IntegrationEventDeduplicationService deduplicationService; + + @Override + public void onMessage(String message) { + try { + // 1. JSON 反序列化 + DevicePropertyChangedEventDTO event = objectMapper.readValue(message, DevicePropertyChangedEventDTO.class); + + // 2. 幂等性检查 + if (!deduplicationService.tryConsume(event.getEventId())) { + log.debug("[DevicePropertyEventHandler] 重复消息,跳过处理: eventId={}", event.getEventId()); + return; + } + + // 3. 业务处理 + handlePropertyChanged(event); + + } catch (Exception e) { + log.error("[DevicePropertyEventHandler] 消息处理失败: message={}", message, e); + throw new RuntimeException("设备属性变更事件处理失败", e); + } + } + + /** + * 处理设备属性变更 + */ + private void handlePropertyChanged(DevicePropertyChangedEventDTO event) { + Long deviceId = event.getDeviceId(); + var properties = event.getProperties(); + + log.info("[DevicePropertyEventHandler] 设备属性变更: deviceId={}, properties={}, productKey={}", + deviceId, properties.keySet(), event.getProductKey()); + + // 处理特定属性变更 + if (properties.containsKey("battery")) { + handleBatteryChange(deviceId, properties.get("battery")); + } + + if (properties.containsKey("signal")) { + handleSignalChange(deviceId, properties.get("signal")); + } + + // 客流传感器数据检查 + if (properties.containsKey("people_count")) { + handlePeopleCountChange(deviceId, properties.get("people_count")); + } + + log.debug("[DevicePropertyEventHandler] 处理完成: deviceId={}", deviceId); + } + + /** + * 处理电量变化 + */ + private void handleBatteryChange(Long deviceId, Object batteryValue) { + log.debug("[DevicePropertyEventHandler] 电量变化: deviceId={}, battery={}", deviceId, batteryValue); + + // TODO: 如果电量低于阈值,提示更换 + // if (batteryValue instanceof Number && ((Number) batteryValue).intValue() < 20) { + // // 触发低电量提醒 + // } + } + + /** + * 处理信号强度变化 + */ + private void handleSignalChange(Long deviceId, Object signalValue) { + log.debug("[DevicePropertyEventHandler] 信号变化: deviceId={}, signal={}", deviceId, signalValue); + } + + /** + * 处理客流数量变化 + */ + private void handlePeopleCountChange(Long deviceId, Object peopleCount) { + log.debug("[DevicePropertyEventHandler] 客流变化: deviceId={}, peopleCount={}", deviceId, peopleCount); + + // TODO: 客流达标触发保洁 + // if (peopleCount instanceof Number && ((Number) peopleCount).intValue() > threshold) { + // // 触发保洁工单 + // } + } + +} diff --git a/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/consumer/DeviceStatusEventHandler.java b/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/consumer/DeviceStatusEventHandler.java new file mode 100644 index 0000000..5c8ad69 --- /dev/null +++ b/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/consumer/DeviceStatusEventHandler.java @@ -0,0 +1,118 @@ +package com.viewsh.module.ops.environment.integration.consumer; + +import cn.hutool.core.util.StrUtil; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.viewsh.module.ops.environment.integration.dto.DeviceStatusChangedEventDTO; +import com.viewsh.module.ops.environment.service.cleaner.CleanerStatusService; +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.spring.annotation.ConsumeMode; +import org.apache.rocketmq.spring.annotation.MessageModel; +import org.springframework.stereotype.Component; + +/** + * 设备状态变更事件消费者 + *

+ * 订阅 IoT 模块发布的设备状态变更事件,同步更新保洁员在线状态 + *

+ * RocketMQ 配置: + * - Topic: integration.device.status + * - Tag: productKey(可配置,如 cleaner_badge_v1) + * - 消费模式: CONCURRENTLY(并发消费) + * + * @author lzh + */ +@Slf4j +@Component +@org.apache.rocketmq.spring.annotation.RocketMQMessageListener( + topic = "integration.device.status", + consumerGroup = "ops-device-status-group", + consumeMode = ConsumeMode.CONCURRENTLY, + // TAG 过滤将在配置文件中设置,或使用 selectorExpression + selectorExpression = "*" +) +public class DeviceStatusEventHandler implements org.apache.rocketmq.spring.core.RocketMQListener { + + @Resource + private CleanerStatusService cleanerStatusService; + + @Resource + private ObjectMapper objectMapper; + + @Resource + private IntegrationEventDeduplicationService deduplicationService; + + @Override + public void onMessage(String message) { + try { + // 1. JSON 反序列化 + DeviceStatusChangedEventDTO event = objectMapper.readValue(message, DeviceStatusChangedEventDTO.class); + + // 2. 幂等性检查 + if (!deduplicationService.tryConsume(event.getEventId())) { + log.debug("[DeviceStatusEventHandler] 重复消息,跳过处理: eventId={}", event.getEventId()); + return; + } + + // 3. 业务处理 + handleStatusChanged(event); + + } catch (Exception e) { + log.error("[DeviceStatusEventHandler] 消息处理失败: message={}", message, e); + // 抛出异常让 RocketMQ 重试 + throw new RuntimeException("设备状态变更事件处理失败", e); + } + } + + /** + * 处理设备状态变更 + */ + private void handleStatusChanged(DeviceStatusChangedEventDTO event) { + Long deviceId = event.getDeviceId(); + Integer newStatus = event.getNewStatus(); + + log.info("[DeviceStatusEventHandler] 设备状态变更: deviceId={}, newStatus={}, productKey={}", + deviceId, newStatus, event.getProductKey()); + + // 只处理在线/离线状态变更 + // 1 = 在线, 0 = 离线 + if (newStatus == 1) { + // 设备上线 -> 保洁员上线 + handleDeviceOnline(event); + } else if (newStatus == 0) { + // 设备离线 -> 保洁员离线 + handleDeviceOffline(event); + } + + log.debug("[DeviceStatusEventHandler] 处理完成: deviceId={}, status={}", deviceId, newStatus); + } + + /** + * 处理设备上线 + */ + private void handleDeviceOnline(DeviceStatusChangedEventDTO event) { + // TODO: 根据设备ID查找对应的保洁员 + // Long cleanerId = getCleanerIdByDeviceId(event.getDeviceId()); + // if (cleanerId != null) { + // cleanerStatusService.updateOnlineStatus(cleanerId, true); + // } + + log.debug("[DeviceStatusEventHandler] 设备上线: deviceId={}, reason={}", + event.getDeviceId(), event.getReason()); + } + + /** + * 处理设备离线 + */ + private void handleDeviceOffline(DeviceStatusChangedEventDTO event) { + // TODO: 根据设备ID查找对应的保洁员 + // Long cleanerId = getCleanerIdByDeviceId(event.getDeviceId()); + // if (cleanerId != null) { + // cleanerStatusService.updateOnlineStatus(cleanerId, false); + // } + + log.debug("[DeviceStatusEventHandler] 设备离线: deviceId={}, reason={}", + event.getDeviceId(), event.getReason()); + } + +} diff --git a/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/consumer/IntegrationEventDeduplicationService.java b/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/consumer/IntegrationEventDeduplicationService.java new file mode 100644 index 0000000..37d78c8 --- /dev/null +++ b/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/consumer/IntegrationEventDeduplicationService.java @@ -0,0 +1,83 @@ +package com.viewsh.module.ops.environment.integration.consumer; + +import cn.hutool.core.util.StrUtil; +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.springframework.data.redis.core.StringRedisTemplate; +import org.springframework.stereotype.Service; + +import java.time.Duration; + +/** + * 跨模块事件幂等性服务 + *

+ * 使用 eventId 防止重复消费同一事件 + * + * @author lzh + */ +@Slf4j +@Service +public class IntegrationEventDeduplicationService { + + private static final String DEDUP_KEY_PREFIX = "integration:event:dedup:"; + + @Resource + private StringRedisTemplate stringRedisTemplate; + + /** + * 尝试消费事件(幂等性检查) + * + * @param eventId 事件ID + * @return true-首次消费,false-重复消费 + */ + public boolean tryConsume(String eventId) { + if (StrUtil.isEmpty(eventId)) { + log.warn("[IntegrationEventDeduplication] eventId 为空,拒绝消费"); + return false; + } + + String key = DEDUP_KEY_PREFIX + eventId; + + // setNX:如果 key 不存在则设置,返回 true + Boolean success = stringRedisTemplate.opsForValue() + .setIfAbsent(key, "1", Duration.ofHours(24)); + + if (Boolean.FALSE.equals(success)) { + log.debug("[IntegrationEventDeduplication] 重复事件: eventId={}", eventId); + return false; + } + + return true; + } + + /** + * 标记事件已消费 + * + * @param eventId 事件ID + */ + public void markConsumed(String eventId) { + if (StrUtil.isEmpty(eventId)) { + return; + } + + String key = DEDUP_KEY_PREFIX + eventId; + stringRedisTemplate.opsForValue().set(key, "1", Duration.ofHours(24)); + } + + /** + * 检查事件是否已消费 + * + * @param eventId 事件ID + * @return true-已消费,false-未消费 + */ + public boolean isConsumed(String eventId) { + if (StrUtil.isEmpty(eventId)) { + return false; + } + + String key = DEDUP_KEY_PREFIX + eventId; + Boolean exists = stringRedisTemplate.hasKey(key); + return Boolean.TRUE.equals(exists); + } + +} diff --git a/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/dto/BaseDeviceEventDTO.java b/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/dto/BaseDeviceEventDTO.java new file mode 100644 index 0000000..25b8069 --- /dev/null +++ b/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/dto/BaseDeviceEventDTO.java @@ -0,0 +1,60 @@ +package com.viewsh.module.ops.environment.integration.dto; + +import com.fasterxml.jackson.annotation.JsonProperty; +import lombok.Data; + +import java.time.LocalDateTime; + +/** + * 设备事件基类 DTO + *

+ * 用于反序列化从 RocketMQ 接收的设备事件消息 + * + * @author lzh + */ +@Data +public class BaseDeviceEventDTO { + + /** + * 事件ID(用于幂等性去重) + */ + @JsonProperty("eventId") + private String eventId; + + /** + * 设备ID + */ + @JsonProperty("deviceId") + private Long deviceId; + + /** + * 设备名称 + */ + @JsonProperty("deviceName") + private String deviceName; + + /** + * 产品ID + */ + @JsonProperty("productId") + private Long productId; + + /** + * 产品标识符(RocketMQ Tag) + */ + @JsonProperty("productKey") + private String productKey; + + /** + * 租户ID + */ + @JsonProperty("tenantId") + private Long tenantId; + + /** + * 事件时间 + */ + @JsonProperty("eventTime") + private LocalDateTime eventTime; + +} diff --git a/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/dto/DeviceEventOccurredEventDTO.java b/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/dto/DeviceEventOccurredEventDTO.java new file mode 100644 index 0000000..ac5b1e2 --- /dev/null +++ b/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/dto/DeviceEventOccurredEventDTO.java @@ -0,0 +1,40 @@ +package com.viewsh.module.ops.environment.integration.dto; + +import com.fasterxml.jackson.annotation.JsonProperty; +import lombok.Data; +import lombok.EqualsAndHashCode; + +import java.util.Map; + +/** + * 设备事件上报 DTO + *

+ * 用于反序列化从 RocketMQ 接收的设备事件上报消息 + * + * @author lzh + */ +@Data +@EqualsAndHashCode(callSuper = true) +public class DeviceEventOccurredEventDTO extends BaseDeviceEventDTO { + + /** + * 事件标识符 + * 例如: button_click(按键点击), sos(紧急求救), fall_detected(跌倒检测)等 + */ + @JsonProperty("eventIdentifier") + private String eventIdentifier; + + /** + * 事件类型 + * 例如: ALARM(告警事件), CONTROL(控��事件), INFO(信息事件) + */ + @JsonProperty("eventType") + private String eventType; + + /** + * 事件参数 + */ + @JsonProperty("eventParams") + private Map eventParams; + +} diff --git a/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/dto/DevicePropertyChangedEventDTO.java b/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/dto/DevicePropertyChangedEventDTO.java new file mode 100644 index 0000000..b95176c --- /dev/null +++ b/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/dto/DevicePropertyChangedEventDTO.java @@ -0,0 +1,33 @@ +package com.viewsh.module.ops.environment.integration.dto; + +import com.fasterxml.jackson.annotation.JsonProperty; +import lombok.Data; +import lombok.EqualsAndHashCode; + +import java.util.Map; +import java.util.Set; + +/** + * 设备属性变更事件 DTO + *

+ * 用于反序列化从 RocketMQ 接收的设备属性变更消息 + * + * @author lzh + */ +@Data +@EqualsAndHashCode(callSuper = true) +public class DevicePropertyChangedEventDTO extends BaseDeviceEventDTO { + + /** + * 变更的属性数据 + */ + @JsonProperty("properties") + private Map properties; + + /** + * 变更的属性标识符集合 + */ + @JsonProperty("changedIdentifiers") + private Set changedIdentifiers; + +} diff --git a/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/dto/DeviceStatusChangedEventDTO.java b/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/dto/DeviceStatusChangedEventDTO.java new file mode 100644 index 0000000..84625a6 --- /dev/null +++ b/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/dto/DeviceStatusChangedEventDTO.java @@ -0,0 +1,36 @@ +package com.viewsh.module.ops.environment.integration.dto; + +import com.fasterxml.jackson.annotation.JsonProperty; +import lombok.Data; +import lombok.EqualsAndHashCode; + +/** + * 设备状态变更事件 DTO + *

+ * 用于反序列化从 RocketMQ 接收的设备状态变更消息 + * + * @author lzh + */ +@Data +@EqualsAndHashCode(callSuper = true) +public class DeviceStatusChangedEventDTO extends BaseDeviceEventDTO { + + /** + * 旧状态 + */ + @JsonProperty("oldStatus") + private Integer oldStatus; + + /** + * 新状态 + */ + @JsonProperty("newStatus") + private Integer newStatus; + + /** + * 变更原因 + */ + @JsonProperty("reason") + private String reason; + +}