chore: 【ops】保洁与IOT对接设备消费MQ消息

This commit is contained in:
lzh
2026-01-09 17:41:14 +08:00
parent f26bd15f5b
commit 2792226c95
8 changed files with 639 additions and 0 deletions

View File

@@ -0,0 +1,152 @@
package com.viewsh.module.ops.environment.integration.consumer;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.viewsh.module.ops.environment.integration.dto.DeviceEventOccurredEventDTO;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.springframework.stereotype.Component;
/**
* 设备事件上报消费者
* <p>
* 订阅 IoT 模块发布的设备事件上报SOS、按键等
* <p>
* RocketMQ 配置:
* - Topic: integration.device.event
* - Tag: productKey可配置如 cleaner_badge_v1
*
* @author lzh
*/
@Slf4j
@Component
@org.apache.rocketmq.spring.annotation.RocketMQMessageListener(
topic = "integration.device.event",
consumerGroup = "ops-device-event-group",
consumeMode = ConsumeMode.CONCURRENTLY,
selectorExpression = "*"
)
public class DeviceEventEventHandler implements org.apache.rocketmq.spring.core.RocketMQListener<String> {
@Resource
private ObjectMapper objectMapper;
@Resource
private IntegrationEventDeduplicationService deduplicationService;
@Override
public void onMessage(String message) {
try {
// 1. JSON 反序列化
DeviceEventOccurredEventDTO event = objectMapper.readValue(message, DeviceEventOccurredEventDTO.class);
// 2. 幂等性检查
if (!deduplicationService.tryConsume(event.getEventId())) {
log.debug("[DeviceEventEventHandler] 重复消息,跳过处理: eventId={}", event.getEventId());
return;
}
// 3. 业务处理
handleEventOccurred(event);
} catch (Exception e) {
log.error("[DeviceEventEventHandler] 消息处理失败: message={}", message, e);
throw new RuntimeException("设备事件上报处理失败", e);
}
}
/**
* 处理设备事件
*/
private void handleEventOccurred(DeviceEventOccurredEventDTO event) {
Long deviceId = event.getDeviceId();
String eventIdentifier = event.getEventIdentifier();
log.info("[DeviceEventEventHandler] 设备事件: deviceId={}, event={}, productKey={}",
deviceId, eventIdentifier, event.getProductKey());
// 根据事件类型分发处理
switch (eventIdentifier) {
case "sos":
handleSosEvent(event);
break;
case "button_click":
handleButtonClick(event);
break;
case "fall_detected":
handleFallDetected(event);
break;
case "confirm_order":
handleConfirmOrder(event);
break;
case "complete_order":
handleCompleteOrder(event);
break;
case "pause_order":
handlePauseOrder(event);
break;
default:
log.debug("[DeviceEventEventHandler] 未知事件类型: {}", eventIdentifier);
break;
}
log.debug("[DeviceEventEventHandler] 处理完成: deviceId={}, event={}", deviceId, eventIdentifier);
}
/**
* 处理 SOS 告警事件
*/
private void handleSosEvent(DeviceEventOccurredEventDTO event) {
log.warn("[DeviceEventEventHandler] SOS 告警: deviceId={}, productKey={}",
event.getDeviceId(), event.getProductKey());
// TODO: 触发紧急工单或告警
// 1. 查找设备关联的保洁员
// 2. 发送告警通知
// 3. 创建紧急工单
}
/**
* 处理按键点击事件
*/
private void handleButtonClick(DeviceEventOccurredEventDTO event) {
log.info("[DeviceEventEventHandler] 按键点击: deviceId={}", event.getDeviceId());
}
/**
* 处理跌倒检测事件
*/
private void handleFallDetected(DeviceEventOccurredEventDTO event) {
log.warn("[DeviceEventEventHandler] 跌倒检测: deviceId={}", event.getDeviceId());
// TODO: 触发紧急告警
}
/**
* 处理工单确认事件
*/
private void handleConfirmOrder(DeviceEventOccurredEventDTO event) {
log.info("[DeviceEventEventHandler] 工单确认: deviceId={}", event.getDeviceId());
// TODO: 更新工单状态为已确认
}
/**
* 处理工单完成事件
*/
private void handleCompleteOrder(DeviceEventOccurredEventDTO event) {
log.info("[DeviceEventEventHandler] 工单完成: deviceId={}", event.getDeviceId());
// TODO: 更新工单状态为已完成
}
/**
* 处理工单暂停事件
*/
private void handlePauseOrder(DeviceEventOccurredEventDTO event) {
log.info("[DeviceEventEventHandler] 工单暂停: deviceId={}", event.getDeviceId());
// TODO: 更新工单状态为已暂停
}
}

View File

@@ -0,0 +1,117 @@
package com.viewsh.module.ops.environment.integration.consumer;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.viewsh.module.ops.environment.integration.dto.DevicePropertyChangedEventDTO;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
/**
* 设备属性变更事件消费者
* <p>
* 订阅 IoT 模块发布的设备属性变更事件
* <p>
* RocketMQ 配置:
* - Topic: integration.device.property
* - Tag: productKey可配置如 cleaner_badge_v1
*
* @author lzh
*/
@Slf4j
@Component
@org.apache.rocketmq.spring.annotation.RocketMQMessageListener(
topic = "integration.device.property",
consumerGroup = "ops-device-property-group",
consumeMode = ConsumeMode.CONCURRENTLY,
selectorExpression = "*"
)
public class DevicePropertyEventHandler implements RocketMQListener<String> {
@Resource
private ObjectMapper objectMapper;
@Resource
private IntegrationEventDeduplicationService deduplicationService;
@Override
public void onMessage(String message) {
try {
// 1. JSON 反序列化
DevicePropertyChangedEventDTO event = objectMapper.readValue(message, DevicePropertyChangedEventDTO.class);
// 2. 幂等性检查
if (!deduplicationService.tryConsume(event.getEventId())) {
log.debug("[DevicePropertyEventHandler] 重复消息,跳过处理: eventId={}", event.getEventId());
return;
}
// 3. 业务处理
handlePropertyChanged(event);
} catch (Exception e) {
log.error("[DevicePropertyEventHandler] 消息处理失败: message={}", message, e);
throw new RuntimeException("设备属性变更事件处理失败", e);
}
}
/**
* 处理设备属性变更
*/
private void handlePropertyChanged(DevicePropertyChangedEventDTO event) {
Long deviceId = event.getDeviceId();
var properties = event.getProperties();
log.info("[DevicePropertyEventHandler] 设备属性变更: deviceId={}, properties={}, productKey={}",
deviceId, properties.keySet(), event.getProductKey());
// 处理特定属性变更
if (properties.containsKey("battery")) {
handleBatteryChange(deviceId, properties.get("battery"));
}
if (properties.containsKey("signal")) {
handleSignalChange(deviceId, properties.get("signal"));
}
// 客流传感器数据检查
if (properties.containsKey("people_count")) {
handlePeopleCountChange(deviceId, properties.get("people_count"));
}
log.debug("[DevicePropertyEventHandler] 处理完成: deviceId={}", deviceId);
}
/**
* 处理电量变化
*/
private void handleBatteryChange(Long deviceId, Object batteryValue) {
log.debug("[DevicePropertyEventHandler] 电量变化: deviceId={}, battery={}", deviceId, batteryValue);
// TODO: 如果电量低于阈值,提示更换
// if (batteryValue instanceof Number && ((Number) batteryValue).intValue() < 20) {
// // 触发低电量提醒
// }
}
/**
* 处理信号强度变化
*/
private void handleSignalChange(Long deviceId, Object signalValue) {
log.debug("[DevicePropertyEventHandler] 信号变化: deviceId={}, signal={}", deviceId, signalValue);
}
/**
* 处理客流数量变化
*/
private void handlePeopleCountChange(Long deviceId, Object peopleCount) {
log.debug("[DevicePropertyEventHandler] 客流变化: deviceId={}, peopleCount={}", deviceId, peopleCount);
// TODO: 客流达标触发保洁
// if (peopleCount instanceof Number && ((Number) peopleCount).intValue() > threshold) {
// // 触发保洁工单
// }
}
}

View File

@@ -0,0 +1,118 @@
package com.viewsh.module.ops.environment.integration.consumer;
import cn.hutool.core.util.StrUtil;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.viewsh.module.ops.environment.integration.dto.DeviceStatusChangedEventDTO;
import com.viewsh.module.ops.environment.service.cleaner.CleanerStatusService;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.springframework.stereotype.Component;
/**
* 设备状态变更事件消费者
* <p>
* 订阅 IoT 模块发布的设备状态变更事件,同步更新保洁员在线状态
* <p>
* RocketMQ 配置:
* - Topic: integration.device.status
* - Tag: productKey可配置如 cleaner_badge_v1
* - 消费模式: CONCURRENTLY并发消费
*
* @author lzh
*/
@Slf4j
@Component
@org.apache.rocketmq.spring.annotation.RocketMQMessageListener(
topic = "integration.device.status",
consumerGroup = "ops-device-status-group",
consumeMode = ConsumeMode.CONCURRENTLY,
// TAG 过滤将在配置文件中设置,或使用 selectorExpression
selectorExpression = "*"
)
public class DeviceStatusEventHandler implements org.apache.rocketmq.spring.core.RocketMQListener<String> {
@Resource
private CleanerStatusService cleanerStatusService;
@Resource
private ObjectMapper objectMapper;
@Resource
private IntegrationEventDeduplicationService deduplicationService;
@Override
public void onMessage(String message) {
try {
// 1. JSON 反序列化
DeviceStatusChangedEventDTO event = objectMapper.readValue(message, DeviceStatusChangedEventDTO.class);
// 2. 幂等性检查
if (!deduplicationService.tryConsume(event.getEventId())) {
log.debug("[DeviceStatusEventHandler] 重复消息,跳过处理: eventId={}", event.getEventId());
return;
}
// 3. 业务处理
handleStatusChanged(event);
} catch (Exception e) {
log.error("[DeviceStatusEventHandler] 消息处理失败: message={}", message, e);
// 抛出异常让 RocketMQ 重试
throw new RuntimeException("设备状态变更事件处理失败", e);
}
}
/**
* 处理设备状态变更
*/
private void handleStatusChanged(DeviceStatusChangedEventDTO event) {
Long deviceId = event.getDeviceId();
Integer newStatus = event.getNewStatus();
log.info("[DeviceStatusEventHandler] 设备状态变更: deviceId={}, newStatus={}, productKey={}",
deviceId, newStatus, event.getProductKey());
// 只处理在线/离线状态变更
// 1 = 在线, 0 = 离线
if (newStatus == 1) {
// 设备上线 -> 保洁员上线
handleDeviceOnline(event);
} else if (newStatus == 0) {
// 设备离线 -> 保洁员离线
handleDeviceOffline(event);
}
log.debug("[DeviceStatusEventHandler] 处理完成: deviceId={}, status={}", deviceId, newStatus);
}
/**
* 处理设备上线
*/
private void handleDeviceOnline(DeviceStatusChangedEventDTO event) {
// TODO: 根据设备ID查找对应的保洁员
// Long cleanerId = getCleanerIdByDeviceId(event.getDeviceId());
// if (cleanerId != null) {
// cleanerStatusService.updateOnlineStatus(cleanerId, true);
// }
log.debug("[DeviceStatusEventHandler] 设备上线: deviceId={}, reason={}",
event.getDeviceId(), event.getReason());
}
/**
* 处理设备离线
*/
private void handleDeviceOffline(DeviceStatusChangedEventDTO event) {
// TODO: 根据设备ID查找对应的保洁员
// Long cleanerId = getCleanerIdByDeviceId(event.getDeviceId());
// if (cleanerId != null) {
// cleanerStatusService.updateOnlineStatus(cleanerId, false);
// }
log.debug("[DeviceStatusEventHandler] 设备离线: deviceId={}, reason={}",
event.getDeviceId(), event.getReason());
}
}

View File

@@ -0,0 +1,83 @@
package com.viewsh.module.ops.environment.integration.consumer;
import cn.hutool.core.util.StrUtil;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;
import java.time.Duration;
/**
* 跨模块事件幂等性服务
* <p>
* 使用 eventId 防止重复消费同一事件
*
* @author lzh
*/
@Slf4j
@Service
public class IntegrationEventDeduplicationService {
private static final String DEDUP_KEY_PREFIX = "integration:event:dedup:";
@Resource
private StringRedisTemplate stringRedisTemplate;
/**
* 尝试消费事件(幂等性检查)
*
* @param eventId 事件ID
* @return true-首次消费false-重复消费
*/
public boolean tryConsume(String eventId) {
if (StrUtil.isEmpty(eventId)) {
log.warn("[IntegrationEventDeduplication] eventId 为空,拒绝消费");
return false;
}
String key = DEDUP_KEY_PREFIX + eventId;
// setNX如果 key 不存在则设置,返回 true
Boolean success = stringRedisTemplate.opsForValue()
.setIfAbsent(key, "1", Duration.ofHours(24));
if (Boolean.FALSE.equals(success)) {
log.debug("[IntegrationEventDeduplication] 重复事件: eventId={}", eventId);
return false;
}
return true;
}
/**
* 标记事件已消费
*
* @param eventId 事件ID
*/
public void markConsumed(String eventId) {
if (StrUtil.isEmpty(eventId)) {
return;
}
String key = DEDUP_KEY_PREFIX + eventId;
stringRedisTemplate.opsForValue().set(key, "1", Duration.ofHours(24));
}
/**
* 检查事件是否已消费
*
* @param eventId 事件ID
* @return true-已消费false-未消费
*/
public boolean isConsumed(String eventId) {
if (StrUtil.isEmpty(eventId)) {
return false;
}
String key = DEDUP_KEY_PREFIX + eventId;
Boolean exists = stringRedisTemplate.hasKey(key);
return Boolean.TRUE.equals(exists);
}
}

View File

@@ -0,0 +1,60 @@
package com.viewsh.module.ops.environment.integration.dto;
import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.Data;
import java.time.LocalDateTime;
/**
* 设备事件基类 DTO
* <p>
* 用于反序列化从 RocketMQ 接收的设备事件消息
*
* @author lzh
*/
@Data
public class BaseDeviceEventDTO {
/**
* 事件ID用于幂等性去重
*/
@JsonProperty("eventId")
private String eventId;
/**
* 设备ID
*/
@JsonProperty("deviceId")
private Long deviceId;
/**
* 设备名称
*/
@JsonProperty("deviceName")
private String deviceName;
/**
* 产品ID
*/
@JsonProperty("productId")
private Long productId;
/**
* 产品标识符RocketMQ Tag
*/
@JsonProperty("productKey")
private String productKey;
/**
* 租户ID
*/
@JsonProperty("tenantId")
private Long tenantId;
/**
* 事件时间
*/
@JsonProperty("eventTime")
private LocalDateTime eventTime;
}

View File

@@ -0,0 +1,40 @@
package com.viewsh.module.ops.environment.integration.dto;
import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.Data;
import lombok.EqualsAndHashCode;
import java.util.Map;
/**
* 设备事件上报 DTO
* <p>
* 用于反序列化从 RocketMQ 接收的设备事件上报消息
*
* @author lzh
*/
@Data
@EqualsAndHashCode(callSuper = true)
public class DeviceEventOccurredEventDTO extends BaseDeviceEventDTO {
/**
* 事件标识符
* 例如: button_click按键点击, sos紧急求救, fall_detected跌倒检测
*/
@JsonProperty("eventIdentifier")
private String eventIdentifier;
/**
* 事件类型
* 例如: ALARM告警事件, CONTROL<EFBC88><E68EA7>事件, INFO信息事件
*/
@JsonProperty("eventType")
private String eventType;
/**
* 事件参数
*/
@JsonProperty("eventParams")
private Map<String, Object> eventParams;
}

View File

@@ -0,0 +1,33 @@
package com.viewsh.module.ops.environment.integration.dto;
import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.Data;
import lombok.EqualsAndHashCode;
import java.util.Map;
import java.util.Set;
/**
* 设备属性变更事件 DTO
* <p>
* 用于反序列化从 RocketMQ 接收的设备属性变更消息
*
* @author lzh
*/
@Data
@EqualsAndHashCode(callSuper = true)
public class DevicePropertyChangedEventDTO extends BaseDeviceEventDTO {
/**
* 变更的属性数据
*/
@JsonProperty("properties")
private Map<String, Object> properties;
/**
* 变更的属性标识符集合
*/
@JsonProperty("changedIdentifiers")
private Set<String> changedIdentifiers;
}

View File

@@ -0,0 +1,36 @@
package com.viewsh.module.ops.environment.integration.dto;
import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.Data;
import lombok.EqualsAndHashCode;
/**
* 设备状态变更事件 DTO
* <p>
* 用于反序列化从 RocketMQ 接收的设备状态变更消息
*
* @author lzh
*/
@Data
@EqualsAndHashCode(callSuper = true)
public class DeviceStatusChangedEventDTO extends BaseDeviceEventDTO {
/**
* 旧状态
*/
@JsonProperty("oldStatus")
private Integer oldStatus;
/**
* 新状态
*/
@JsonProperty("newStatus")
private Integer newStatus;
/**
* 变更原因
*/
@JsonProperty("reason")
private String reason;
}