Compare commits

2 Commits

Author SHA1 Message Date
lzh
fa619710ef refactor(iot): 优化事件发布机制并修复状态值解析
Some checks failed
Java CI with Maven / build (11) (push) Has been cancelled
Java CI with Maven / build (17) (push) Has been cancelled
Java CI with Maven / build (8) (push) Has been cancelled
1. IntegrationEventPublisher 只保留设备状态变更事件发布
   - 注释掉 publishPropertyChanged 和 publishEventOccurred 接口
   - RocketMQIntegrationEventPublisher 对应实现改为注释

2. IotDevicePropertyServiceImpl 属性消息发布暂停
   - 注释掉 saveDeviceProperty 中的 publishPropertyMessage 调用
   - 注释掉 publishToIntegrationEventBus 中的实际发布逻辑

3. IotDeviceMessageServiceImpl 新增状态值解析兼容
   - 新增 parseStateValue 方法支持整数和字符串格式状态值
   - 支持 "online"/"offline" 字符串解析

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-21 22:56:13 +08:00
lzh
842b40596d fix: 修复RocketMQ topic名称非法字符问题
RocketMQ topic 只允许 ^[%|a-zA-Z0-9_-]+$ 字符,不支持 `.`

IoT 模块 Topic 变更:
- integration.device.status → integration-device-status
- integration.device.property → integration-device-property
- integration.device.event → integration-device-event

Ops 模块 Topic 变更:
- ops.order.create → ops-order-create
- ops.order.arrive → ops-order-arrive
- ops.order.complete → ops-order-complete
- ops.order.audit → ops-order-audit
- ops.order.confirm → ops-order-confirm

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-21 22:35:22 +08:00
11 changed files with 69 additions and 60 deletions

View File

@@ -12,26 +12,26 @@ public interface CleanOrderTopics {
/**
* 保洁工单创建事件
*/
String ORDER_CREATE = "ops.order.create";
String ORDER_CREATE = "ops-order-create";
/**
* 保洁工单到岗事件
*/
String ORDER_ARRIVE = "ops.order.arrive";
String ORDER_ARRIVE = "ops-order-arrive";
/**
* 保洁工单完成事件
*/
String ORDER_COMPLETE = "ops.order.complete";
String ORDER_COMPLETE = "ops-order-complete";
/**
* 保洁工单审计事件
*/
String ORDER_AUDIT = "ops.order.audit";
String ORDER_AUDIT = "ops-order-audit";
/**
* 保洁工单确认事件(按键确认)
*/
String ORDER_CONFIRM = "ops.order.confirm";
String ORDER_CONFIRM = "ops-order-confirm";
}

View File

@@ -14,21 +14,21 @@ public class IntegrationTopics {
* <p>
* 用于发布设备上线/离线状态变更事件
*/
public static final String DEVICE_STATUS = "integration.device.status";
public static final String DEVICE_STATUS = "integration-device-status";
/**
* 设备属性变更 Topic
* <p>
* 用于发布设备属性变更事件
*/
public static final String DEVICE_PROPERTY = "integration.device.property";
public static final String DEVICE_PROPERTY = "integration-device-property";
/**
* 设备事件上报 Topic
* <p>
* 用于发布设备事件SOS、按键等
*/
public static final String DEVICE_EVENT = "integration.device.event";
public static final String DEVICE_EVENT = "integration-device-event";
private IntegrationTopics() {
}

View File

@@ -26,13 +26,13 @@ public interface IntegrationEventPublisher {
*
* @param event 属性变更事件
*/
void publishPropertyChanged(DevicePropertyChangedEvent event);
// void publishPropertyChanged(DevicePropertyChangedEvent event);
/**
* 发布设备事件上报事件
*
* @param event 设备事件
*/
void publishEventOccurred(DeviceEventOccurredEvent event);
// void publishEventOccurred(DeviceEventOccurredEvent event);
}

View File

@@ -1,11 +1,9 @@
package com.viewsh.module.iot.core.integration.publisher;
import com.viewsh.module.iot.core.integration.config.IntegrationEventProperties;
import com.viewsh.module.iot.core.integration.constants.DeviceTags;
import com.viewsh.module.iot.core.integration.constants.IntegrationTopics;
import com.viewsh.module.iot.core.integration.event.DeviceEventOccurredEvent;
import com.viewsh.module.iot.core.integration.event.DevicePropertyChangedEvent;
import com.viewsh.module.iot.core.integration.event.DeviceStatusChangedEvent;
import com.viewsh.module.iot.core.integration.config.IntegrationEventProperties;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.messaging.Message;
@@ -47,39 +45,18 @@ public class RocketMQIntegrationEventPublisher implements IntegrationEventPublis
}
}
@Override
public void publishPropertyChanged(DevicePropertyChangedEvent event) {
try {
String tag = DeviceTags.fromProductKey(event.getProductKey());
String destination = IntegrationTopics.DEVICE_PROPERTY + ":" + tag;
Message<DevicePropertyChangedEvent> message = MessageBuilder.withPayload(event).build();
rocketMQTemplate.syncSend(destination, message, properties.getSendTimeoutMs());
log.debug("[publishPropertyChanged] 发布设备属性变更事件: eventId={}, deviceId={}, productKey={}, properties={}",
event.getEventId(), event.getDeviceId(), event.getProductKey(), event.getChangedIdentifiers());
} catch (Exception e) {
log.error("[publishPropertyChanged] 发布设备属性变更事件失败: eventId={}, deviceId={}",
event.getEventId(), event.getDeviceId(), e);
}
}
@Override
public void publishEventOccurred(DeviceEventOccurredEvent event) {
try {
// 使用 productKey 作为 Tag
String tag = DeviceTags.fromProductKey(event.getProductKey());
String destination = IntegrationTopics.DEVICE_EVENT + ":" + tag;
Message<DeviceEventOccurredEvent> message = MessageBuilder.withPayload(event).build();
rocketMQTemplate.syncSend(destination, message, properties.getSendTimeoutMs());
log.debug("[publishEventOccurred] 发布设备事件上报事件: eventId={}, deviceId={}, productKey={}, event={}",
event.getEventId(), event.getDeviceId(), event.getProductKey(), event.getEventIdentifier());
} catch (Exception e) {
log.error("[publishEventOccurred] 发布设备事件上报事件失败: eventId={}, deviceId={}",
event.getEventId(), event.getDeviceId(), e);
}
}
// @Override
// public void publishPropertyChanged(DevicePropertyChangedEvent event) {
// // 暂不处理属性变更事件
// log.debug("[publishPropertyChanged] 跳过设备属性变更事件发布: eventId={}, deviceId={}, productKey={}",
// event.getEventId(), event.getDeviceId(), event.getProductKey());
// }
//
// @Override
// public void publishEventOccurred(DeviceEventOccurredEvent event) {
// // 暂不处理设备事件上报
// log.debug("[publishEventOccurred] 跳过设备事件上报发布: eventId={}, deviceId={}, productKey={}, event={}",
// event.getEventId(), event.getDeviceId(), event.getProductKey(), event.getEventIdentifier());
// }
}

View File

@@ -14,6 +14,7 @@ import com.viewsh.module.iot.controller.admin.device.vo.message.IotDeviceMessage
import com.viewsh.module.iot.controller.admin.statistics.vo.IotStatisticsDeviceMessageReqVO;
import com.viewsh.module.iot.controller.admin.statistics.vo.IotStatisticsDeviceMessageSummaryByDateRespVO;
import com.viewsh.module.iot.core.enums.IotDeviceMessageMethodEnum;
import com.viewsh.module.iot.core.enums.IotDeviceStateEnum;
import com.viewsh.module.iot.core.mq.message.IotDeviceMessage;
import com.viewsh.module.iot.core.mq.producer.IotDeviceMessageProducer;
import com.viewsh.module.iot.core.util.IotDeviceMessageUtils;
@@ -197,7 +198,9 @@ public class IotDeviceMessageServiceImpl implements IotDeviceMessageService {
String stateStr = IotDeviceMessageUtils.getIdentifier(message);
assert stateStr != null;
Assert.notEmpty(stateStr, "设备状态不能为空");
deviceService.updateDeviceState(device, Integer.valueOf(stateStr));
// 兼容整数和字符串格式的状态值
Integer state = parseStateValue(stateStr);
deviceService.updateDeviceState(device, state);
// TODO 芋艿:子设备的关联
return null;
}
@@ -269,6 +272,34 @@ public class IotDeviceMessageServiceImpl implements IotDeviceMessageService {
});
}
/**
* 解析状态值,支持整数和字符串格式
* <p>
* 支持格式:
* - 整数0=未激活1=在线2=离线
* - 字符串0/inactive=未激活1/online=在线2/offline=离线
*
* @param stateStr 状态字符串
* @return 状态枚举值
*/
private Integer parseStateValue(String stateStr) {
if (stateStr == null) {
return IotDeviceStateEnum.INACTIVE.getState();
}
try {
// 尝试直接解析为整数
return Integer.parseInt(stateStr);
} catch (NumberFormatException e) {
// 字符格式匹配
String lower = stateStr.toLowerCase().trim();
return switch (lower) {
case "1", "online" -> IotDeviceStateEnum.ONLINE.getState();
case "2", "offline" -> IotDeviceStateEnum.OFFLINE.getState();
default -> IotDeviceStateEnum.INACTIVE.getState();
};
}
}
private IotDeviceMessageServiceImpl getSelf() {
return SpringUtil.getBean(getClass());
}

View File

@@ -184,7 +184,8 @@ public class IotDevicePropertyServiceImpl implements IotDevicePropertyService {
processRuleProcessors(device, properties);
// 2.4 发布属性消息到 Redis Stream供其他模块如 Ops 订阅)
publishPropertyMessage(device, properties, message.getReportTime());
// TODO: 暂停发布,后续根据需要开启
// publishPropertyMessage(device, properties, message.getReportTime());
}
/**
@@ -197,7 +198,7 @@ public class IotDevicePropertyServiceImpl implements IotDevicePropertyService {
*/
private void processRuleProcessors(IotDeviceDO device, Map<String, Object> properties) {
try {
// 遍历所有属性,调用规<EFBFBD><EFBFBD>处理器
// 遍历所有属性,调用规处理器
for (Map.Entry<String, Object> entry : properties.entrySet()) {
String identifier = entry.getKey();
Object value = entry.getValue();
@@ -263,7 +264,7 @@ public class IotDevicePropertyServiceImpl implements IotDevicePropertyService {
.eventTime(reportTime)
.build();
integrationEventPublisher.publishPropertyChanged(event);
// integrationEventPublisher.publishPropertyChanged(event);
log.debug("[publishToIntegrationEventBus] 跨模块属性变更事件已发布: eventId={}, deviceId={}, productKey={}, properties={}",
event.getEventId(), device.getId(), productKey, properties.keySet());
} catch (Exception e) {

View File

@@ -26,7 +26,7 @@ import java.util.concurrent.TimeUnit;
* 订阅 IoT 模块发布的保洁工单到岗事件
* <p>
* RocketMQ 配置:
* - Topic: ops.order.arrive
* - Topic: ops-order-arrive
* - ConsumerGroup: ops-clean-order-arrive-group
*
* @author AI
@@ -34,7 +34,7 @@ import java.util.concurrent.TimeUnit;
@Slf4j
@Component
@RocketMQMessageListener(
topic = "ops.order.arrive",
topic = "ops-order-arrive",
consumerGroup = "ops-clean-order-arrive-group",
consumeMode = ConsumeMode.CONCURRENTLY,
selectorExpression = "*"

View File

@@ -29,7 +29,7 @@ import java.util.concurrent.TimeUnit;
* 用于记录非状态变更的业务审计日志(如警告发送、抑制操作等)
* <p>
* RocketMQ 配置:
* - Topic: ops.order.audit
* - Topic: ops-order-audit
* - ConsumerGroup: ops-clean-order-audit-group
*
* @author AI
@@ -37,7 +37,7 @@ import java.util.concurrent.TimeUnit;
@Slf4j
@Component
@RocketMQMessageListener(
topic = "ops.order.audit",
topic = "ops-order-audit",
consumerGroup = "ops-clean-order-audit-group",
consumeMode = ConsumeMode.CONCURRENTLY,
selectorExpression = "*"

View File

@@ -27,7 +27,7 @@ import java.util.concurrent.TimeUnit;
* 订阅 IoT 模块发布的保洁工单完成事件
* <p>
* RocketMQ 配置:
* - Topic: ops.order.complete
* - Topic: ops-order-complete
* - ConsumerGroup: ops-clean-order-complete-group
*
* @author AI
@@ -35,7 +35,7 @@ import java.util.concurrent.TimeUnit;
@Slf4j
@Component
@RocketMQMessageListener(
topic = "ops.order.complete",
topic = "ops-order-complete",
consumerGroup = "ops-clean-order-complete-group",
consumeMode = ConsumeMode.CONCURRENTLY,
selectorExpression = "*"

View File

@@ -31,7 +31,7 @@ import java.util.concurrent.TimeUnit;
@Slf4j
@Component
@RocketMQMessageListener(
topic = "ops.order.confirm",
topic = "ops-order-confirm",
consumerGroup = "ops-clean-order-confirm-group",
consumeMode = ConsumeMode.CONCURRENTLY
)

View File

@@ -23,7 +23,7 @@ import java.util.concurrent.TimeUnit;
* 订阅 IoT 模块发布的保洁工单创建事件
* <p>
* RocketMQ 配置:
* - Topic: ops.order.create
* - Topic: ops-order-create
* - ConsumerGroup: ops-clean-order-create-group
*
* @author AI
@@ -31,7 +31,7 @@ import java.util.concurrent.TimeUnit;
@Slf4j
@Component
@RocketMQMessageListener(
topic = "ops.order.create",
topic = "ops-order-create",
consumerGroup = "ops-clean-order-create-group",
consumeMode = ConsumeMode.CONCURRENTLY,
selectorExpression = "*"