Compare commits
2 Commits
4e387e410c
...
fa619710ef
| Author | SHA1 | Date | |
|---|---|---|---|
| fa619710ef | |||
| 842b40596d |
@@ -12,26 +12,26 @@ public interface CleanOrderTopics {
|
||||
/**
|
||||
* 保洁工单创建事件
|
||||
*/
|
||||
String ORDER_CREATE = "ops.order.create";
|
||||
String ORDER_CREATE = "ops-order-create";
|
||||
|
||||
/**
|
||||
* 保洁工单到岗事件
|
||||
*/
|
||||
String ORDER_ARRIVE = "ops.order.arrive";
|
||||
String ORDER_ARRIVE = "ops-order-arrive";
|
||||
|
||||
/**
|
||||
* 保洁工单完成事件
|
||||
*/
|
||||
String ORDER_COMPLETE = "ops.order.complete";
|
||||
String ORDER_COMPLETE = "ops-order-complete";
|
||||
|
||||
/**
|
||||
* 保洁工单审计事件
|
||||
*/
|
||||
String ORDER_AUDIT = "ops.order.audit";
|
||||
String ORDER_AUDIT = "ops-order-audit";
|
||||
|
||||
/**
|
||||
* 保洁工单确认事件(按键确认)
|
||||
*/
|
||||
String ORDER_CONFIRM = "ops.order.confirm";
|
||||
String ORDER_CONFIRM = "ops-order-confirm";
|
||||
|
||||
}
|
||||
|
||||
@@ -14,21 +14,21 @@ public class IntegrationTopics {
|
||||
* <p>
|
||||
* 用于发布设备上线/离线状态变更事件
|
||||
*/
|
||||
public static final String DEVICE_STATUS = "integration.device.status";
|
||||
public static final String DEVICE_STATUS = "integration-device-status";
|
||||
|
||||
/**
|
||||
* 设备属性变更 Topic
|
||||
* <p>
|
||||
* 用于发布设备属性变更事件
|
||||
*/
|
||||
public static final String DEVICE_PROPERTY = "integration.device.property";
|
||||
public static final String DEVICE_PROPERTY = "integration-device-property";
|
||||
|
||||
/**
|
||||
* 设备事件上报 Topic
|
||||
* <p>
|
||||
* 用于发布设备事件(SOS、按键等)
|
||||
*/
|
||||
public static final String DEVICE_EVENT = "integration.device.event";
|
||||
public static final String DEVICE_EVENT = "integration-device-event";
|
||||
|
||||
private IntegrationTopics() {
|
||||
}
|
||||
|
||||
@@ -26,13 +26,13 @@ public interface IntegrationEventPublisher {
|
||||
*
|
||||
* @param event 属性变更事件
|
||||
*/
|
||||
void publishPropertyChanged(DevicePropertyChangedEvent event);
|
||||
// void publishPropertyChanged(DevicePropertyChangedEvent event);
|
||||
|
||||
/**
|
||||
* 发布设备事件上报事件
|
||||
*
|
||||
* @param event 设备事件
|
||||
*/
|
||||
void publishEventOccurred(DeviceEventOccurredEvent event);
|
||||
// void publishEventOccurred(DeviceEventOccurredEvent event);
|
||||
|
||||
}
|
||||
|
||||
@@ -1,11 +1,9 @@
|
||||
package com.viewsh.module.iot.core.integration.publisher;
|
||||
|
||||
import com.viewsh.module.iot.core.integration.config.IntegrationEventProperties;
|
||||
import com.viewsh.module.iot.core.integration.constants.DeviceTags;
|
||||
import com.viewsh.module.iot.core.integration.constants.IntegrationTopics;
|
||||
import com.viewsh.module.iot.core.integration.event.DeviceEventOccurredEvent;
|
||||
import com.viewsh.module.iot.core.integration.event.DevicePropertyChangedEvent;
|
||||
import com.viewsh.module.iot.core.integration.event.DeviceStatusChangedEvent;
|
||||
import com.viewsh.module.iot.core.integration.config.IntegrationEventProperties;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.rocketmq.spring.core.RocketMQTemplate;
|
||||
import org.springframework.messaging.Message;
|
||||
@@ -47,39 +45,18 @@ public class RocketMQIntegrationEventPublisher implements IntegrationEventPublis
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void publishPropertyChanged(DevicePropertyChangedEvent event) {
|
||||
try {
|
||||
String tag = DeviceTags.fromProductKey(event.getProductKey());
|
||||
String destination = IntegrationTopics.DEVICE_PROPERTY + ":" + tag;
|
||||
|
||||
Message<DevicePropertyChangedEvent> message = MessageBuilder.withPayload(event).build();
|
||||
rocketMQTemplate.syncSend(destination, message, properties.getSendTimeoutMs());
|
||||
|
||||
log.debug("[publishPropertyChanged] 发布设备属性变更事件: eventId={}, deviceId={}, productKey={}, properties={}",
|
||||
event.getEventId(), event.getDeviceId(), event.getProductKey(), event.getChangedIdentifiers());
|
||||
} catch (Exception e) {
|
||||
log.error("[publishPropertyChanged] 发布设备属性变更事件失败: eventId={}, deviceId={}",
|
||||
event.getEventId(), event.getDeviceId(), e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void publishEventOccurred(DeviceEventOccurredEvent event) {
|
||||
try {
|
||||
// 使用 productKey 作为 Tag
|
||||
String tag = DeviceTags.fromProductKey(event.getProductKey());
|
||||
String destination = IntegrationTopics.DEVICE_EVENT + ":" + tag;
|
||||
|
||||
Message<DeviceEventOccurredEvent> message = MessageBuilder.withPayload(event).build();
|
||||
rocketMQTemplate.syncSend(destination, message, properties.getSendTimeoutMs());
|
||||
|
||||
log.debug("[publishEventOccurred] 发布设备事件上报事件: eventId={}, deviceId={}, productKey={}, event={}",
|
||||
event.getEventId(), event.getDeviceId(), event.getProductKey(), event.getEventIdentifier());
|
||||
} catch (Exception e) {
|
||||
log.error("[publishEventOccurred] 发布设备事件上报事件失败: eventId={}, deviceId={}",
|
||||
event.getEventId(), event.getDeviceId(), e);
|
||||
}
|
||||
}
|
||||
// @Override
|
||||
// public void publishPropertyChanged(DevicePropertyChangedEvent event) {
|
||||
// // 暂不处理属性变更事件
|
||||
// log.debug("[publishPropertyChanged] 跳过设备属性变更事件发布: eventId={}, deviceId={}, productKey={}",
|
||||
// event.getEventId(), event.getDeviceId(), event.getProductKey());
|
||||
// }
|
||||
//
|
||||
// @Override
|
||||
// public void publishEventOccurred(DeviceEventOccurredEvent event) {
|
||||
// // 暂不处理设备事件上报
|
||||
// log.debug("[publishEventOccurred] 跳过设备事件上报发布: eventId={}, deviceId={}, productKey={}, event={}",
|
||||
// event.getEventId(), event.getDeviceId(), event.getProductKey(), event.getEventIdentifier());
|
||||
// }
|
||||
|
||||
}
|
||||
|
||||
@@ -14,6 +14,7 @@ import com.viewsh.module.iot.controller.admin.device.vo.message.IotDeviceMessage
|
||||
import com.viewsh.module.iot.controller.admin.statistics.vo.IotStatisticsDeviceMessageReqVO;
|
||||
import com.viewsh.module.iot.controller.admin.statistics.vo.IotStatisticsDeviceMessageSummaryByDateRespVO;
|
||||
import com.viewsh.module.iot.core.enums.IotDeviceMessageMethodEnum;
|
||||
import com.viewsh.module.iot.core.enums.IotDeviceStateEnum;
|
||||
import com.viewsh.module.iot.core.mq.message.IotDeviceMessage;
|
||||
import com.viewsh.module.iot.core.mq.producer.IotDeviceMessageProducer;
|
||||
import com.viewsh.module.iot.core.util.IotDeviceMessageUtils;
|
||||
@@ -197,7 +198,9 @@ public class IotDeviceMessageServiceImpl implements IotDeviceMessageService {
|
||||
String stateStr = IotDeviceMessageUtils.getIdentifier(message);
|
||||
assert stateStr != null;
|
||||
Assert.notEmpty(stateStr, "设备状态不能为空");
|
||||
deviceService.updateDeviceState(device, Integer.valueOf(stateStr));
|
||||
// 兼容整数和字符串格式的状态值
|
||||
Integer state = parseStateValue(stateStr);
|
||||
deviceService.updateDeviceState(device, state);
|
||||
// TODO 芋艿:子设备的关联
|
||||
return null;
|
||||
}
|
||||
@@ -269,6 +272,34 @@ public class IotDeviceMessageServiceImpl implements IotDeviceMessageService {
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* 解析状态值,支持整数和字符串格式
|
||||
* <p>
|
||||
* 支持格式:
|
||||
* - 整数:0=未激活,1=在线,2=离线
|
||||
* - 字符串:0/inactive=未激活,1/online=在线,2/offline=离线
|
||||
*
|
||||
* @param stateStr 状态字符串
|
||||
* @return 状态枚举值
|
||||
*/
|
||||
private Integer parseStateValue(String stateStr) {
|
||||
if (stateStr == null) {
|
||||
return IotDeviceStateEnum.INACTIVE.getState();
|
||||
}
|
||||
try {
|
||||
// 尝试直接解析为整数
|
||||
return Integer.parseInt(stateStr);
|
||||
} catch (NumberFormatException e) {
|
||||
// 字符格式匹配
|
||||
String lower = stateStr.toLowerCase().trim();
|
||||
return switch (lower) {
|
||||
case "1", "online" -> IotDeviceStateEnum.ONLINE.getState();
|
||||
case "2", "offline" -> IotDeviceStateEnum.OFFLINE.getState();
|
||||
default -> IotDeviceStateEnum.INACTIVE.getState();
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
private IotDeviceMessageServiceImpl getSelf() {
|
||||
return SpringUtil.getBean(getClass());
|
||||
}
|
||||
|
||||
@@ -184,7 +184,8 @@ public class IotDevicePropertyServiceImpl implements IotDevicePropertyService {
|
||||
processRuleProcessors(device, properties);
|
||||
|
||||
// 2.4 发布属性消息到 Redis Stream(供其他模块如 Ops 订阅)
|
||||
publishPropertyMessage(device, properties, message.getReportTime());
|
||||
// TODO: 暂停发布,后续根据需要开启
|
||||
// publishPropertyMessage(device, properties, message.getReportTime());
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -197,7 +198,7 @@ public class IotDevicePropertyServiceImpl implements IotDevicePropertyService {
|
||||
*/
|
||||
private void processRuleProcessors(IotDeviceDO device, Map<String, Object> properties) {
|
||||
try {
|
||||
// 遍历所有属性,调用规<EFBFBD><EFBFBD>处理器
|
||||
// 遍历所有属性,调用规则处理器
|
||||
for (Map.Entry<String, Object> entry : properties.entrySet()) {
|
||||
String identifier = entry.getKey();
|
||||
Object value = entry.getValue();
|
||||
@@ -263,7 +264,7 @@ public class IotDevicePropertyServiceImpl implements IotDevicePropertyService {
|
||||
.eventTime(reportTime)
|
||||
.build();
|
||||
|
||||
integrationEventPublisher.publishPropertyChanged(event);
|
||||
// integrationEventPublisher.publishPropertyChanged(event);
|
||||
log.debug("[publishToIntegrationEventBus] 跨模块属性变更事件已发布: eventId={}, deviceId={}, productKey={}, properties={}",
|
||||
event.getEventId(), device.getId(), productKey, properties.keySet());
|
||||
} catch (Exception e) {
|
||||
|
||||
@@ -26,7 +26,7 @@ import java.util.concurrent.TimeUnit;
|
||||
* 订阅 IoT 模块发布的保洁工单到岗事件
|
||||
* <p>
|
||||
* RocketMQ 配置:
|
||||
* - Topic: ops.order.arrive
|
||||
* - Topic: ops-order-arrive
|
||||
* - ConsumerGroup: ops-clean-order-arrive-group
|
||||
*
|
||||
* @author AI
|
||||
@@ -34,7 +34,7 @@ import java.util.concurrent.TimeUnit;
|
||||
@Slf4j
|
||||
@Component
|
||||
@RocketMQMessageListener(
|
||||
topic = "ops.order.arrive",
|
||||
topic = "ops-order-arrive",
|
||||
consumerGroup = "ops-clean-order-arrive-group",
|
||||
consumeMode = ConsumeMode.CONCURRENTLY,
|
||||
selectorExpression = "*"
|
||||
|
||||
@@ -29,7 +29,7 @@ import java.util.concurrent.TimeUnit;
|
||||
* 用于记录非状态变更的业务审计日志(如警告发送、抑制操作等)
|
||||
* <p>
|
||||
* RocketMQ 配置:
|
||||
* - Topic: ops.order.audit
|
||||
* - Topic: ops-order-audit
|
||||
* - ConsumerGroup: ops-clean-order-audit-group
|
||||
*
|
||||
* @author AI
|
||||
@@ -37,7 +37,7 @@ import java.util.concurrent.TimeUnit;
|
||||
@Slf4j
|
||||
@Component
|
||||
@RocketMQMessageListener(
|
||||
topic = "ops.order.audit",
|
||||
topic = "ops-order-audit",
|
||||
consumerGroup = "ops-clean-order-audit-group",
|
||||
consumeMode = ConsumeMode.CONCURRENTLY,
|
||||
selectorExpression = "*"
|
||||
|
||||
@@ -27,7 +27,7 @@ import java.util.concurrent.TimeUnit;
|
||||
* 订阅 IoT 模块发布的保洁工单完成事件
|
||||
* <p>
|
||||
* RocketMQ 配置:
|
||||
* - Topic: ops.order.complete
|
||||
* - Topic: ops-order-complete
|
||||
* - ConsumerGroup: ops-clean-order-complete-group
|
||||
*
|
||||
* @author AI
|
||||
@@ -35,7 +35,7 @@ import java.util.concurrent.TimeUnit;
|
||||
@Slf4j
|
||||
@Component
|
||||
@RocketMQMessageListener(
|
||||
topic = "ops.order.complete",
|
||||
topic = "ops-order-complete",
|
||||
consumerGroup = "ops-clean-order-complete-group",
|
||||
consumeMode = ConsumeMode.CONCURRENTLY,
|
||||
selectorExpression = "*"
|
||||
|
||||
@@ -31,7 +31,7 @@ import java.util.concurrent.TimeUnit;
|
||||
@Slf4j
|
||||
@Component
|
||||
@RocketMQMessageListener(
|
||||
topic = "ops.order.confirm",
|
||||
topic = "ops-order-confirm",
|
||||
consumerGroup = "ops-clean-order-confirm-group",
|
||||
consumeMode = ConsumeMode.CONCURRENTLY
|
||||
)
|
||||
|
||||
@@ -23,7 +23,7 @@ import java.util.concurrent.TimeUnit;
|
||||
* 订阅 IoT 模块发布的保洁工单创建事件
|
||||
* <p>
|
||||
* RocketMQ 配置:
|
||||
* - Topic: ops.order.create
|
||||
* - Topic: ops-order-create
|
||||
* - ConsumerGroup: ops-clean-order-create-group
|
||||
*
|
||||
* @author AI
|
||||
@@ -31,7 +31,7 @@ import java.util.concurrent.TimeUnit;
|
||||
@Slf4j
|
||||
@Component
|
||||
@RocketMQMessageListener(
|
||||
topic = "ops.order.create",
|
||||
topic = "ops-order-create",
|
||||
consumerGroup = "ops-clean-order-create-group",
|
||||
consumeMode = ConsumeMode.CONCURRENTLY,
|
||||
selectorExpression = "*"
|
||||
|
||||
Reference in New Issue
Block a user