refactor(iot): 优化事件发布机制并修复状态值解析
1. IntegrationEventPublisher 只保留设备状态变更事件发布 - 注释掉 publishPropertyChanged 和 publishEventOccurred 接口 - RocketMQIntegrationEventPublisher 对应实现改为注释 2. IotDevicePropertyServiceImpl 属性消息发布暂停 - 注释掉 saveDeviceProperty 中的 publishPropertyMessage 调用 - 注释掉 publishToIntegrationEventBus 中的实际发布逻辑 3. IotDeviceMessageServiceImpl 新增状态值解析兼容 - 新增 parseStateValue 方法支持整数和字符串格式状态值 - 支持 "online"/"offline" 字符串解析 Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
@@ -26,13 +26,13 @@ public interface IntegrationEventPublisher {
|
||||
*
|
||||
* @param event 属性变更事件
|
||||
*/
|
||||
void publishPropertyChanged(DevicePropertyChangedEvent event);
|
||||
// void publishPropertyChanged(DevicePropertyChangedEvent event);
|
||||
|
||||
/**
|
||||
* 发布设备事件上报事件
|
||||
*
|
||||
* @param event 设备事件
|
||||
*/
|
||||
void publishEventOccurred(DeviceEventOccurredEvent event);
|
||||
// void publishEventOccurred(DeviceEventOccurredEvent event);
|
||||
|
||||
}
|
||||
|
||||
@@ -1,11 +1,9 @@
|
||||
package com.viewsh.module.iot.core.integration.publisher;
|
||||
|
||||
import com.viewsh.module.iot.core.integration.config.IntegrationEventProperties;
|
||||
import com.viewsh.module.iot.core.integration.constants.DeviceTags;
|
||||
import com.viewsh.module.iot.core.integration.constants.IntegrationTopics;
|
||||
import com.viewsh.module.iot.core.integration.event.DeviceEventOccurredEvent;
|
||||
import com.viewsh.module.iot.core.integration.event.DevicePropertyChangedEvent;
|
||||
import com.viewsh.module.iot.core.integration.event.DeviceStatusChangedEvent;
|
||||
import com.viewsh.module.iot.core.integration.config.IntegrationEventProperties;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.rocketmq.spring.core.RocketMQTemplate;
|
||||
import org.springframework.messaging.Message;
|
||||
@@ -47,39 +45,18 @@ public class RocketMQIntegrationEventPublisher implements IntegrationEventPublis
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void publishPropertyChanged(DevicePropertyChangedEvent event) {
|
||||
try {
|
||||
String tag = DeviceTags.fromProductKey(event.getProductKey());
|
||||
String destination = IntegrationTopics.DEVICE_PROPERTY + ":" + tag;
|
||||
|
||||
Message<DevicePropertyChangedEvent> message = MessageBuilder.withPayload(event).build();
|
||||
rocketMQTemplate.syncSend(destination, message, properties.getSendTimeoutMs());
|
||||
|
||||
log.debug("[publishPropertyChanged] 发布设备属性变更事件: eventId={}, deviceId={}, productKey={}, properties={}",
|
||||
event.getEventId(), event.getDeviceId(), event.getProductKey(), event.getChangedIdentifiers());
|
||||
} catch (Exception e) {
|
||||
log.error("[publishPropertyChanged] 发布设备属性变更事件失败: eventId={}, deviceId={}",
|
||||
event.getEventId(), event.getDeviceId(), e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void publishEventOccurred(DeviceEventOccurredEvent event) {
|
||||
try {
|
||||
// 使用 productKey 作为 Tag
|
||||
String tag = DeviceTags.fromProductKey(event.getProductKey());
|
||||
String destination = IntegrationTopics.DEVICE_EVENT + ":" + tag;
|
||||
|
||||
Message<DeviceEventOccurredEvent> message = MessageBuilder.withPayload(event).build();
|
||||
rocketMQTemplate.syncSend(destination, message, properties.getSendTimeoutMs());
|
||||
|
||||
log.debug("[publishEventOccurred] 发布设备事件上报事件: eventId={}, deviceId={}, productKey={}, event={}",
|
||||
event.getEventId(), event.getDeviceId(), event.getProductKey(), event.getEventIdentifier());
|
||||
} catch (Exception e) {
|
||||
log.error("[publishEventOccurred] 发布设备事件上报事件失败: eventId={}, deviceId={}",
|
||||
event.getEventId(), event.getDeviceId(), e);
|
||||
}
|
||||
}
|
||||
// @Override
|
||||
// public void publishPropertyChanged(DevicePropertyChangedEvent event) {
|
||||
// // 暂不处理属性变更事件
|
||||
// log.debug("[publishPropertyChanged] 跳过设备属性变更事件发布: eventId={}, deviceId={}, productKey={}",
|
||||
// event.getEventId(), event.getDeviceId(), event.getProductKey());
|
||||
// }
|
||||
//
|
||||
// @Override
|
||||
// public void publishEventOccurred(DeviceEventOccurredEvent event) {
|
||||
// // 暂不处理设备事件上报
|
||||
// log.debug("[publishEventOccurred] 跳过设备事件上报发布: eventId={}, deviceId={}, productKey={}, event={}",
|
||||
// event.getEventId(), event.getDeviceId(), event.getProductKey(), event.getEventIdentifier());
|
||||
// }
|
||||
|
||||
}
|
||||
|
||||
@@ -14,6 +14,7 @@ import com.viewsh.module.iot.controller.admin.device.vo.message.IotDeviceMessage
|
||||
import com.viewsh.module.iot.controller.admin.statistics.vo.IotStatisticsDeviceMessageReqVO;
|
||||
import com.viewsh.module.iot.controller.admin.statistics.vo.IotStatisticsDeviceMessageSummaryByDateRespVO;
|
||||
import com.viewsh.module.iot.core.enums.IotDeviceMessageMethodEnum;
|
||||
import com.viewsh.module.iot.core.enums.IotDeviceStateEnum;
|
||||
import com.viewsh.module.iot.core.mq.message.IotDeviceMessage;
|
||||
import com.viewsh.module.iot.core.mq.producer.IotDeviceMessageProducer;
|
||||
import com.viewsh.module.iot.core.util.IotDeviceMessageUtils;
|
||||
@@ -197,7 +198,9 @@ public class IotDeviceMessageServiceImpl implements IotDeviceMessageService {
|
||||
String stateStr = IotDeviceMessageUtils.getIdentifier(message);
|
||||
assert stateStr != null;
|
||||
Assert.notEmpty(stateStr, "设备状态不能为空");
|
||||
deviceService.updateDeviceState(device, Integer.valueOf(stateStr));
|
||||
// 兼容整数和字符串格式的状态值
|
||||
Integer state = parseStateValue(stateStr);
|
||||
deviceService.updateDeviceState(device, state);
|
||||
// TODO 芋艿:子设备的关联
|
||||
return null;
|
||||
}
|
||||
@@ -269,6 +272,34 @@ public class IotDeviceMessageServiceImpl implements IotDeviceMessageService {
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* 解析状态值,支持整数和字符串格式
|
||||
* <p>
|
||||
* 支持格式:
|
||||
* - 整数:0=未激活,1=在线,2=离线
|
||||
* - 字符串:0/inactive=未激活,1/online=在线,2/offline=离线
|
||||
*
|
||||
* @param stateStr 状态字符串
|
||||
* @return 状态枚举值
|
||||
*/
|
||||
private Integer parseStateValue(String stateStr) {
|
||||
if (stateStr == null) {
|
||||
return IotDeviceStateEnum.INACTIVE.getState();
|
||||
}
|
||||
try {
|
||||
// 尝试直接解析为整数
|
||||
return Integer.parseInt(stateStr);
|
||||
} catch (NumberFormatException e) {
|
||||
// 字符格式匹配
|
||||
String lower = stateStr.toLowerCase().trim();
|
||||
return switch (lower) {
|
||||
case "1", "online" -> IotDeviceStateEnum.ONLINE.getState();
|
||||
case "2", "offline" -> IotDeviceStateEnum.OFFLINE.getState();
|
||||
default -> IotDeviceStateEnum.INACTIVE.getState();
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
private IotDeviceMessageServiceImpl getSelf() {
|
||||
return SpringUtil.getBean(getClass());
|
||||
}
|
||||
|
||||
@@ -184,7 +184,8 @@ public class IotDevicePropertyServiceImpl implements IotDevicePropertyService {
|
||||
processRuleProcessors(device, properties);
|
||||
|
||||
// 2.4 发布属性消息到 Redis Stream(供其他模块如 Ops 订阅)
|
||||
publishPropertyMessage(device, properties, message.getReportTime());
|
||||
// TODO: 暂停发布,后续根据需要开启
|
||||
// publishPropertyMessage(device, properties, message.getReportTime());
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -197,7 +198,7 @@ public class IotDevicePropertyServiceImpl implements IotDevicePropertyService {
|
||||
*/
|
||||
private void processRuleProcessors(IotDeviceDO device, Map<String, Object> properties) {
|
||||
try {
|
||||
// 遍历所有属性,调用规<EFBFBD><EFBFBD>处理器
|
||||
// 遍历所有属性,调用规则处理器
|
||||
for (Map.Entry<String, Object> entry : properties.entrySet()) {
|
||||
String identifier = entry.getKey();
|
||||
Object value = entry.getValue();
|
||||
@@ -263,7 +264,7 @@ public class IotDevicePropertyServiceImpl implements IotDevicePropertyService {
|
||||
.eventTime(reportTime)
|
||||
.build();
|
||||
|
||||
integrationEventPublisher.publishPropertyChanged(event);
|
||||
// integrationEventPublisher.publishPropertyChanged(event);
|
||||
log.debug("[publishToIntegrationEventBus] 跨模块属性变更事件已发布: eventId={}, deviceId={}, productKey={}, properties={}",
|
||||
event.getEventId(), device.getId(), productKey, properties.keySet());
|
||||
} catch (Exception e) {
|
||||
|
||||
Reference in New Issue
Block a user