2 Commits

Author SHA1 Message Date
lzh
fa619710ef refactor(iot): 优化事件发布机制并修复状态值解析
Some checks failed
Java CI with Maven / build (11) (push) Has been cancelled
Java CI with Maven / build (17) (push) Has been cancelled
Java CI with Maven / build (8) (push) Has been cancelled
1. IntegrationEventPublisher 只保留设备状态变更事件发布
   - 注释掉 publishPropertyChanged 和 publishEventOccurred 接口
   - RocketMQIntegrationEventPublisher 对应实现改为注释

2. IotDevicePropertyServiceImpl 属性消息发布暂停
   - 注释掉 saveDeviceProperty 中的 publishPropertyMessage 调用
   - 注释掉 publishToIntegrationEventBus 中的实际发布逻辑

3. IotDeviceMessageServiceImpl 新增状态值解析兼容
   - 新增 parseStateValue 方法支持整数和字符串格式状态值
   - 支持 "online"/"offline" 字符串解析

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-21 22:56:13 +08:00
lzh
842b40596d fix: 修复RocketMQ topic名称非法字符问题
RocketMQ topic 只允许 ^[%|a-zA-Z0-9_-]+$ 字符,不支持 `.`

IoT 模块 Topic 变更:
- integration.device.status → integration-device-status
- integration.device.property → integration-device-property
- integration.device.event → integration-device-event

Ops 模块 Topic 变更:
- ops.order.create → ops-order-create
- ops.order.arrive → ops-order-arrive
- ops.order.complete → ops-order-complete
- ops.order.audit → ops-order-audit
- ops.order.confirm → ops-order-confirm

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-21 22:35:22 +08:00
11 changed files with 69 additions and 60 deletions

View File

@@ -12,26 +12,26 @@ public interface CleanOrderTopics {
/** /**
* 保洁工单创建事件 * 保洁工单创建事件
*/ */
String ORDER_CREATE = "ops.order.create"; String ORDER_CREATE = "ops-order-create";
/** /**
* 保洁工单到岗事件 * 保洁工单到岗事件
*/ */
String ORDER_ARRIVE = "ops.order.arrive"; String ORDER_ARRIVE = "ops-order-arrive";
/** /**
* 保洁工单完成事件 * 保洁工单完成事件
*/ */
String ORDER_COMPLETE = "ops.order.complete"; String ORDER_COMPLETE = "ops-order-complete";
/** /**
* 保洁工单审计事件 * 保洁工单审计事件
*/ */
String ORDER_AUDIT = "ops.order.audit"; String ORDER_AUDIT = "ops-order-audit";
/** /**
* 保洁工单确认事件(按键确认) * 保洁工单确认事件(按键确认)
*/ */
String ORDER_CONFIRM = "ops.order.confirm"; String ORDER_CONFIRM = "ops-order-confirm";
} }

View File

@@ -14,21 +14,21 @@ public class IntegrationTopics {
* <p> * <p>
* 用于发布设备上线/离线状态变更事件 * 用于发布设备上线/离线状态变更事件
*/ */
public static final String DEVICE_STATUS = "integration.device.status"; public static final String DEVICE_STATUS = "integration-device-status";
/** /**
* 设备属性变更 Topic * 设备属性变更 Topic
* <p> * <p>
* 用于发布设备属性变更事件 * 用于发布设备属性变更事件
*/ */
public static final String DEVICE_PROPERTY = "integration.device.property"; public static final String DEVICE_PROPERTY = "integration-device-property";
/** /**
* 设备事件上报 Topic * 设备事件上报 Topic
* <p> * <p>
* 用于发布设备事件SOS、按键等 * 用于发布设备事件SOS、按键等
*/ */
public static final String DEVICE_EVENT = "integration.device.event"; public static final String DEVICE_EVENT = "integration-device-event";
private IntegrationTopics() { private IntegrationTopics() {
} }

View File

@@ -26,13 +26,13 @@ public interface IntegrationEventPublisher {
* *
* @param event 属性变更事件 * @param event 属性变更事件
*/ */
void publishPropertyChanged(DevicePropertyChangedEvent event); // void publishPropertyChanged(DevicePropertyChangedEvent event);
/** /**
* 发布设备事件上报事件 * 发布设备事件上报事件
* *
* @param event 设备事件 * @param event 设备事件
*/ */
void publishEventOccurred(DeviceEventOccurredEvent event); // void publishEventOccurred(DeviceEventOccurredEvent event);
} }

View File

@@ -1,11 +1,9 @@
package com.viewsh.module.iot.core.integration.publisher; package com.viewsh.module.iot.core.integration.publisher;
import com.viewsh.module.iot.core.integration.config.IntegrationEventProperties;
import com.viewsh.module.iot.core.integration.constants.DeviceTags; import com.viewsh.module.iot.core.integration.constants.DeviceTags;
import com.viewsh.module.iot.core.integration.constants.IntegrationTopics; import com.viewsh.module.iot.core.integration.constants.IntegrationTopics;
import com.viewsh.module.iot.core.integration.event.DeviceEventOccurredEvent;
import com.viewsh.module.iot.core.integration.event.DevicePropertyChangedEvent;
import com.viewsh.module.iot.core.integration.event.DeviceStatusChangedEvent; import com.viewsh.module.iot.core.integration.event.DeviceStatusChangedEvent;
import com.viewsh.module.iot.core.integration.config.IntegrationEventProperties;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.messaging.Message; import org.springframework.messaging.Message;
@@ -47,39 +45,18 @@ public class RocketMQIntegrationEventPublisher implements IntegrationEventPublis
} }
} }
@Override // @Override
public void publishPropertyChanged(DevicePropertyChangedEvent event) { // public void publishPropertyChanged(DevicePropertyChangedEvent event) {
try { // // 暂不处理属性变更事件
String tag = DeviceTags.fromProductKey(event.getProductKey()); // log.debug("[publishPropertyChanged] 跳过设备属性变更事件发布: eventId={}, deviceId={}, productKey={}",
String destination = IntegrationTopics.DEVICE_PROPERTY + ":" + tag; // event.getEventId(), event.getDeviceId(), event.getProductKey());
// }
Message<DevicePropertyChangedEvent> message = MessageBuilder.withPayload(event).build(); //
rocketMQTemplate.syncSend(destination, message, properties.getSendTimeoutMs()); // @Override
// public void publishEventOccurred(DeviceEventOccurredEvent event) {
log.debug("[publishPropertyChanged] 发布设备属性变更事件: eventId={}, deviceId={}, productKey={}, properties={}", // // 暂不处理设备事件上报
event.getEventId(), event.getDeviceId(), event.getProductKey(), event.getChangedIdentifiers()); // log.debug("[publishEventOccurred] 跳过设备事件上报发布: eventId={}, deviceId={}, productKey={}, event={}",
} catch (Exception e) { // event.getEventId(), event.getDeviceId(), event.getProductKey(), event.getEventIdentifier());
log.error("[publishPropertyChanged] 发布设备属性变更事件失败: eventId={}, deviceId={}", // }
event.getEventId(), event.getDeviceId(), e);
}
}
@Override
public void publishEventOccurred(DeviceEventOccurredEvent event) {
try {
// 使用 productKey 作为 Tag
String tag = DeviceTags.fromProductKey(event.getProductKey());
String destination = IntegrationTopics.DEVICE_EVENT + ":" + tag;
Message<DeviceEventOccurredEvent> message = MessageBuilder.withPayload(event).build();
rocketMQTemplate.syncSend(destination, message, properties.getSendTimeoutMs());
log.debug("[publishEventOccurred] 发布设备事件上报事件: eventId={}, deviceId={}, productKey={}, event={}",
event.getEventId(), event.getDeviceId(), event.getProductKey(), event.getEventIdentifier());
} catch (Exception e) {
log.error("[publishEventOccurred] 发布设备事件上报事件失败: eventId={}, deviceId={}",
event.getEventId(), event.getDeviceId(), e);
}
}
} }

View File

@@ -14,6 +14,7 @@ import com.viewsh.module.iot.controller.admin.device.vo.message.IotDeviceMessage
import com.viewsh.module.iot.controller.admin.statistics.vo.IotStatisticsDeviceMessageReqVO; import com.viewsh.module.iot.controller.admin.statistics.vo.IotStatisticsDeviceMessageReqVO;
import com.viewsh.module.iot.controller.admin.statistics.vo.IotStatisticsDeviceMessageSummaryByDateRespVO; import com.viewsh.module.iot.controller.admin.statistics.vo.IotStatisticsDeviceMessageSummaryByDateRespVO;
import com.viewsh.module.iot.core.enums.IotDeviceMessageMethodEnum; import com.viewsh.module.iot.core.enums.IotDeviceMessageMethodEnum;
import com.viewsh.module.iot.core.enums.IotDeviceStateEnum;
import com.viewsh.module.iot.core.mq.message.IotDeviceMessage; import com.viewsh.module.iot.core.mq.message.IotDeviceMessage;
import com.viewsh.module.iot.core.mq.producer.IotDeviceMessageProducer; import com.viewsh.module.iot.core.mq.producer.IotDeviceMessageProducer;
import com.viewsh.module.iot.core.util.IotDeviceMessageUtils; import com.viewsh.module.iot.core.util.IotDeviceMessageUtils;
@@ -197,7 +198,9 @@ public class IotDeviceMessageServiceImpl implements IotDeviceMessageService {
String stateStr = IotDeviceMessageUtils.getIdentifier(message); String stateStr = IotDeviceMessageUtils.getIdentifier(message);
assert stateStr != null; assert stateStr != null;
Assert.notEmpty(stateStr, "设备状态不能为空"); Assert.notEmpty(stateStr, "设备状态不能为空");
deviceService.updateDeviceState(device, Integer.valueOf(stateStr)); // 兼容整数和字符串格式的状态值
Integer state = parseStateValue(stateStr);
deviceService.updateDeviceState(device, state);
// TODO 芋艿:子设备的关联 // TODO 芋艿:子设备的关联
return null; return null;
} }
@@ -269,6 +272,34 @@ public class IotDeviceMessageServiceImpl implements IotDeviceMessageService {
}); });
} }
/**
* 解析状态值,支持整数和字符串格式
* <p>
* 支持格式:
* - 整数0=未激活1=在线2=离线
* - 字符串0/inactive=未激活1/online=在线2/offline=离线
*
* @param stateStr 状态字符串
* @return 状态枚举值
*/
private Integer parseStateValue(String stateStr) {
if (stateStr == null) {
return IotDeviceStateEnum.INACTIVE.getState();
}
try {
// 尝试直接解析为整数
return Integer.parseInt(stateStr);
} catch (NumberFormatException e) {
// 字符格式匹配
String lower = stateStr.toLowerCase().trim();
return switch (lower) {
case "1", "online" -> IotDeviceStateEnum.ONLINE.getState();
case "2", "offline" -> IotDeviceStateEnum.OFFLINE.getState();
default -> IotDeviceStateEnum.INACTIVE.getState();
};
}
}
private IotDeviceMessageServiceImpl getSelf() { private IotDeviceMessageServiceImpl getSelf() {
return SpringUtil.getBean(getClass()); return SpringUtil.getBean(getClass());
} }

View File

@@ -184,7 +184,8 @@ public class IotDevicePropertyServiceImpl implements IotDevicePropertyService {
processRuleProcessors(device, properties); processRuleProcessors(device, properties);
// 2.4 发布属性消息到 Redis Stream供其他模块如 Ops 订阅) // 2.4 发布属性消息到 Redis Stream供其他模块如 Ops 订阅)
publishPropertyMessage(device, properties, message.getReportTime()); // TODO: 暂停发布,后续根据需要开启
// publishPropertyMessage(device, properties, message.getReportTime());
} }
/** /**
@@ -197,7 +198,7 @@ public class IotDevicePropertyServiceImpl implements IotDevicePropertyService {
*/ */
private void processRuleProcessors(IotDeviceDO device, Map<String, Object> properties) { private void processRuleProcessors(IotDeviceDO device, Map<String, Object> properties) {
try { try {
// 遍历所有属性,调用规<EFBFBD><EFBFBD>处理器 // 遍历所有属性,调用规处理器
for (Map.Entry<String, Object> entry : properties.entrySet()) { for (Map.Entry<String, Object> entry : properties.entrySet()) {
String identifier = entry.getKey(); String identifier = entry.getKey();
Object value = entry.getValue(); Object value = entry.getValue();
@@ -263,7 +264,7 @@ public class IotDevicePropertyServiceImpl implements IotDevicePropertyService {
.eventTime(reportTime) .eventTime(reportTime)
.build(); .build();
integrationEventPublisher.publishPropertyChanged(event); // integrationEventPublisher.publishPropertyChanged(event);
log.debug("[publishToIntegrationEventBus] 跨模块属性变更事件已发布: eventId={}, deviceId={}, productKey={}, properties={}", log.debug("[publishToIntegrationEventBus] 跨模块属性变更事件已发布: eventId={}, deviceId={}, productKey={}, properties={}",
event.getEventId(), device.getId(), productKey, properties.keySet()); event.getEventId(), device.getId(), productKey, properties.keySet());
} catch (Exception e) { } catch (Exception e) {

View File

@@ -26,7 +26,7 @@ import java.util.concurrent.TimeUnit;
* 订阅 IoT 模块发布的保洁工单到岗事件 * 订阅 IoT 模块发布的保洁工单到岗事件
* <p> * <p>
* RocketMQ 配置: * RocketMQ 配置:
* - Topic: ops.order.arrive * - Topic: ops-order-arrive
* - ConsumerGroup: ops-clean-order-arrive-group * - ConsumerGroup: ops-clean-order-arrive-group
* *
* @author AI * @author AI
@@ -34,7 +34,7 @@ import java.util.concurrent.TimeUnit;
@Slf4j @Slf4j
@Component @Component
@RocketMQMessageListener( @RocketMQMessageListener(
topic = "ops.order.arrive", topic = "ops-order-arrive",
consumerGroup = "ops-clean-order-arrive-group", consumerGroup = "ops-clean-order-arrive-group",
consumeMode = ConsumeMode.CONCURRENTLY, consumeMode = ConsumeMode.CONCURRENTLY,
selectorExpression = "*" selectorExpression = "*"

View File

@@ -29,7 +29,7 @@ import java.util.concurrent.TimeUnit;
* 用于记录非状态变更的业务审计日志(如警告发送、抑制操作等) * 用于记录非状态变更的业务审计日志(如警告发送、抑制操作等)
* <p> * <p>
* RocketMQ 配置: * RocketMQ 配置:
* - Topic: ops.order.audit * - Topic: ops-order-audit
* - ConsumerGroup: ops-clean-order-audit-group * - ConsumerGroup: ops-clean-order-audit-group
* *
* @author AI * @author AI
@@ -37,7 +37,7 @@ import java.util.concurrent.TimeUnit;
@Slf4j @Slf4j
@Component @Component
@RocketMQMessageListener( @RocketMQMessageListener(
topic = "ops.order.audit", topic = "ops-order-audit",
consumerGroup = "ops-clean-order-audit-group", consumerGroup = "ops-clean-order-audit-group",
consumeMode = ConsumeMode.CONCURRENTLY, consumeMode = ConsumeMode.CONCURRENTLY,
selectorExpression = "*" selectorExpression = "*"

View File

@@ -27,7 +27,7 @@ import java.util.concurrent.TimeUnit;
* 订阅 IoT 模块发布的保洁工单完成事件 * 订阅 IoT 模块发布的保洁工单完成事件
* <p> * <p>
* RocketMQ 配置: * RocketMQ 配置:
* - Topic: ops.order.complete * - Topic: ops-order-complete
* - ConsumerGroup: ops-clean-order-complete-group * - ConsumerGroup: ops-clean-order-complete-group
* *
* @author AI * @author AI
@@ -35,7 +35,7 @@ import java.util.concurrent.TimeUnit;
@Slf4j @Slf4j
@Component @Component
@RocketMQMessageListener( @RocketMQMessageListener(
topic = "ops.order.complete", topic = "ops-order-complete",
consumerGroup = "ops-clean-order-complete-group", consumerGroup = "ops-clean-order-complete-group",
consumeMode = ConsumeMode.CONCURRENTLY, consumeMode = ConsumeMode.CONCURRENTLY,
selectorExpression = "*" selectorExpression = "*"

View File

@@ -31,7 +31,7 @@ import java.util.concurrent.TimeUnit;
@Slf4j @Slf4j
@Component @Component
@RocketMQMessageListener( @RocketMQMessageListener(
topic = "ops.order.confirm", topic = "ops-order-confirm",
consumerGroup = "ops-clean-order-confirm-group", consumerGroup = "ops-clean-order-confirm-group",
consumeMode = ConsumeMode.CONCURRENTLY consumeMode = ConsumeMode.CONCURRENTLY
) )

View File

@@ -23,7 +23,7 @@ import java.util.concurrent.TimeUnit;
* 订阅 IoT 模块发布的保洁工单创建事件 * 订阅 IoT 模块发布的保洁工单创建事件
* <p> * <p>
* RocketMQ 配置: * RocketMQ 配置:
* - Topic: ops.order.create * - Topic: ops-order-create
* - ConsumerGroup: ops-clean-order-create-group * - ConsumerGroup: ops-clean-order-create-group
* *
* @author AI * @author AI
@@ -31,7 +31,7 @@ import java.util.concurrent.TimeUnit;
@Slf4j @Slf4j
@Component @Component
@RocketMQMessageListener( @RocketMQMessageListener(
topic = "ops.order.create", topic = "ops-order-create",
consumerGroup = "ops-clean-order-create-group", consumerGroup = "ops-clean-order-create-group",
consumeMode = ConsumeMode.CONCURRENTLY, consumeMode = ConsumeMode.CONCURRENTLY,
selectorExpression = "*" selectorExpression = "*"