fix(ops): RocketMQ 消费者添加租户上下文防御性兜底
6 个消费者添加 executeInTenantContext() 统一模式:当框架 Hook 未设置租户上下文时,从事件体 tenantId 字段兜底切换。 同步为 4 个事件 DTO 添加 tenantId 字段,去重 Key 迁移至 OpsRedisKeyBuilder.eventDedup() 实现租户隔离。 Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -1,195 +1,128 @@
|
||||
package com.viewsh.module.ops.environment.integration.consumer;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.viewsh.module.ops.dal.dataobject.area.OpsAreaDeviceRelationDO;
|
||||
import com.viewsh.module.ops.enums.BadgeDeviceStatusEnum;
|
||||
import com.viewsh.module.ops.environment.integration.dto.IotDeviceStatusChangedEventDTO;
|
||||
import com.viewsh.module.ops.environment.service.badge.BadgeDeviceStatusService;
|
||||
import com.viewsh.module.ops.service.area.AreaDeviceService;
|
||||
import jakarta.annotation.Resource;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.rocketmq.spring.annotation.ConsumeMode;
|
||||
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
|
||||
import org.apache.rocketmq.spring.core.RocketMQListener;
|
||||
import org.springframework.data.redis.core.StringRedisTemplate;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/**
|
||||
* 工牌设备状态变更事件消费者
|
||||
* <p>
|
||||
* 监听 IoT 模块发布的设备状态变更事件(上线/离线)
|
||||
* <p>
|
||||
* 调用链路:
|
||||
*
|
||||
* <pre>
|
||||
* IoT 设备连接/断开
|
||||
* ↓
|
||||
* IotDeviceServiceImpl.updateDeviceState()
|
||||
* ↓
|
||||
* RocketMQ Topic: integration-device-status
|
||||
* ↓
|
||||
* BadgeDeviceStatusEventHandler.onMessage()
|
||||
* - 幂等性检查
|
||||
* - 判断是否为工牌设备
|
||||
* ↓
|
||||
* badgeDeviceStatusService.updateBadgeOnlineStatus()
|
||||
* - 创建/更新 Redis 状态记录
|
||||
* </pre>
|
||||
*
|
||||
* @author AI
|
||||
*/
|
||||
@Slf4j
|
||||
@Component
|
||||
@RocketMQMessageListener(
|
||||
topic = "integration-device-status",
|
||||
consumerGroup = "ops-badge-status-consumer",
|
||||
consumeMode = ConsumeMode.CONCURRENTLY,
|
||||
selectorExpression = "*",
|
||||
accessKey = "${rocketmq.consumer.access-key:}",
|
||||
secretKey = "${rocketmq.consumer.secret-key:}"
|
||||
)
|
||||
public class BadgeDeviceStatusEventHandler implements RocketMQListener<String> {
|
||||
|
||||
/**
|
||||
* 幂等性控制 Key 模式
|
||||
*/
|
||||
private static final String DEDUP_KEY_PATTERN = "ops:badge:dedup:status:%s";
|
||||
|
||||
/**
|
||||
* 幂等性控制 TTL(秒)
|
||||
*/
|
||||
private static final int DEDUP_TTL_SECONDS = 120;
|
||||
|
||||
@Resource
|
||||
private ObjectMapper objectMapper;
|
||||
|
||||
@Resource
|
||||
private StringRedisTemplate stringRedisTemplate;
|
||||
|
||||
@Resource
|
||||
private BadgeDeviceStatusService badgeDeviceStatusService;
|
||||
|
||||
@Resource
|
||||
private AreaDeviceService areaDeviceService;
|
||||
|
||||
@Override
|
||||
public void onMessage(String message) {
|
||||
try {
|
||||
// 1. JSON 反序列化
|
||||
IotDeviceStatusChangedEventDTO event = objectMapper.readValue(message,
|
||||
IotDeviceStatusChangedEventDTO.class);
|
||||
|
||||
log.debug("[BadgeDeviceStatusEventHandler] 收到设备状态变更事件: eventId={}, deviceId={}, {} -> {}",
|
||||
event.getEventId(), event.getDeviceId(), event.getOldStatus(), event.getNewStatus());
|
||||
|
||||
// 2. 幂等性检查
|
||||
String dedupKey = String.format(DEDUP_KEY_PATTERN, event.getEventId());
|
||||
Boolean firstTime = stringRedisTemplate.opsForValue()
|
||||
.setIfAbsent(dedupKey, "1", DEDUP_TTL_SECONDS, TimeUnit.SECONDS);
|
||||
|
||||
if (Boolean.FALSE.equals(firstTime)) {
|
||||
log.debug("[BadgeDeviceStatusEventHandler] 重复消息,跳过处理: eventId={}", event.getEventId());
|
||||
return;
|
||||
}
|
||||
|
||||
// 3. 判断是否为工牌设备
|
||||
if (!isBadgeDevice(event.getDeviceId())) {
|
||||
log.debug("[BadgeDeviceStatusEventHandler] 非工牌设备,跳过处理: deviceId={}", event.getDeviceId());
|
||||
return;
|
||||
}
|
||||
|
||||
// 4. 处理状态变更
|
||||
handleDeviceStatusChange(event);
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("[BadgeDeviceStatusEventHandler] 消息处理失败: message={}", message, e);
|
||||
// 不抛出异常,避免消息重试导致重复处理
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 处理设备状态变更
|
||||
*/
|
||||
private void handleDeviceStatusChange(IotDeviceStatusChangedEventDTO event) {
|
||||
Long deviceId = event.getDeviceId();
|
||||
String deviceCode = event.getDeviceName();
|
||||
String nickname = event.getNickname();
|
||||
|
||||
// 获取设备所属区域
|
||||
Long areaId = getAreaIdByDeviceId(deviceId);
|
||||
|
||||
if (event.isOnline()) {
|
||||
// 设备上线
|
||||
log.info("[BadgeDeviceStatusEventHandler] 工牌设备上线: deviceId={}, deviceCode={}, nickname={}, areaId={}",
|
||||
deviceId, deviceCode, nickname, areaId);
|
||||
|
||||
badgeDeviceStatusService.updateBadgeOnlineStatus(
|
||||
deviceId,
|
||||
deviceCode,
|
||||
nickname,
|
||||
areaId,
|
||||
BadgeDeviceStatusEnum.IDLE,
|
||||
"设备上线");
|
||||
|
||||
} else if (event.isOffline()) {
|
||||
// 设备离线
|
||||
log.info("[BadgeDeviceStatusEventHandler] 工牌设备离线: deviceId={}, deviceCode={}",
|
||||
deviceId, deviceCode);
|
||||
|
||||
badgeDeviceStatusService.updateBadgeOnlineStatus(
|
||||
deviceId,
|
||||
deviceCode,
|
||||
nickname,
|
||||
null,
|
||||
BadgeDeviceStatusEnum.OFFLINE,
|
||||
"设备离线");
|
||||
|
||||
} else {
|
||||
// 其他状态(如 INACTIVE),暂不处理
|
||||
log.debug("[BadgeDeviceStatusEventHandler] 忽略状态变更: deviceId={}, newStatus={}",
|
||||
deviceId, event.getNewStatus());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 判断是否为工牌设备
|
||||
* <p>
|
||||
* 通过查询 ops_area_device_relation 表判断设备是否为 BADGE 类型
|
||||
*/
|
||||
private boolean isBadgeDevice(Long deviceId) {
|
||||
if (deviceId == null) {
|
||||
return false;
|
||||
}
|
||||
|
||||
try {
|
||||
// 通过区域设备关系判断是否为工牌设备
|
||||
OpsAreaDeviceRelationDO relation = areaDeviceService.getByDeviceId(deviceId);
|
||||
return relation != null && "BADGE".equals(relation.getRelationType());
|
||||
} catch (Exception e) {
|
||||
log.warn("[BadgeDeviceStatusEventHandler] 查询设备类型失败: deviceId={}", deviceId, e);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取设备所属区域ID
|
||||
*/
|
||||
private Long getAreaIdByDeviceId(Long deviceId) {
|
||||
if (deviceId == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
try {
|
||||
OpsAreaDeviceRelationDO relation = areaDeviceService.getByDeviceId(deviceId);
|
||||
if (relation != null && "BADGE".equals(relation.getRelationType())) {
|
||||
return relation.getAreaId();
|
||||
}
|
||||
return null;
|
||||
} catch (Exception e) {
|
||||
log.warn("[BadgeDeviceStatusEventHandler] 获取设备区域失败: deviceId={}", deviceId, e);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
}
|
||||
package com.viewsh.module.ops.environment.integration.consumer;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.viewsh.framework.tenant.core.context.TenantContextHolder;
|
||||
import com.viewsh.framework.tenant.core.util.TenantUtils;
|
||||
import com.viewsh.module.ops.dal.dataobject.area.OpsAreaDeviceRelationDO;
|
||||
import com.viewsh.module.ops.enums.BadgeDeviceStatusEnum;
|
||||
import com.viewsh.module.ops.environment.integration.dto.IotDeviceStatusChangedEventDTO;
|
||||
import com.viewsh.module.ops.environment.service.badge.BadgeDeviceStatusService;
|
||||
import com.viewsh.module.ops.infrastructure.redis.OpsRedisKeyBuilder;
|
||||
import com.viewsh.module.ops.service.area.AreaDeviceService;
|
||||
import jakarta.annotation.Resource;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.rocketmq.spring.annotation.ConsumeMode;
|
||||
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
|
||||
import org.apache.rocketmq.spring.core.RocketMQListener;
|
||||
import org.springframework.data.redis.core.StringRedisTemplate;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
@Slf4j
|
||||
@Component
|
||||
@RocketMQMessageListener(
|
||||
topic = "integration-device-status",
|
||||
consumerGroup = "ops-badge-status-consumer",
|
||||
consumeMode = ConsumeMode.CONCURRENTLY,
|
||||
selectorExpression = "*",
|
||||
accessKey = "${rocketmq.consumer.access-key:}",
|
||||
secretKey = "${rocketmq.consumer.secret-key:}"
|
||||
)
|
||||
public class BadgeDeviceStatusEventHandler implements RocketMQListener<String> {
|
||||
|
||||
private static final int DEDUP_TTL_SECONDS = 120;
|
||||
|
||||
@Resource
|
||||
private ObjectMapper objectMapper;
|
||||
|
||||
@Resource
|
||||
private StringRedisTemplate stringRedisTemplate;
|
||||
|
||||
@Resource
|
||||
private BadgeDeviceStatusService badgeDeviceStatusService;
|
||||
|
||||
@Resource
|
||||
private AreaDeviceService areaDeviceService;
|
||||
|
||||
@Override
|
||||
public void onMessage(String message) {
|
||||
try {
|
||||
IotDeviceStatusChangedEventDTO event = objectMapper.readValue(message, IotDeviceStatusChangedEventDTO.class);
|
||||
log.debug("[BadgeDeviceStatusEventHandler] 收到设备状态变更事件: eventId={}, deviceId={}, {} -> {}",
|
||||
event.getEventId(), event.getDeviceId(), event.getOldStatus(), event.getNewStatus());
|
||||
String dedupKey = OpsRedisKeyBuilder.badgeDedup(event.getTenantId(), event.getEventId());
|
||||
Boolean firstTime = stringRedisTemplate.opsForValue()
|
||||
.setIfAbsent(dedupKey, "1", DEDUP_TTL_SECONDS, TimeUnit.SECONDS);
|
||||
if (Boolean.FALSE.equals(firstTime)) {
|
||||
log.debug("[BadgeDeviceStatusEventHandler] 重复消息,跳过处理: eventId={}", event.getEventId());
|
||||
return;
|
||||
}
|
||||
executeInTenantContext(event.getTenantId(), () -> handleDeviceStatusChange(event));
|
||||
} catch (Exception e) {
|
||||
log.error("[BadgeDeviceStatusEventHandler] 消息处理失败: message={}", message, e);
|
||||
}
|
||||
}
|
||||
|
||||
private void handleDeviceStatusChange(IotDeviceStatusChangedEventDTO event) {
|
||||
if (!isBadgeDevice(event.getDeviceId())) {
|
||||
log.debug("[BadgeDeviceStatusEventHandler] 非工牌设备,忽略状态同步: deviceId={}", event.getDeviceId());
|
||||
return;
|
||||
}
|
||||
|
||||
Long areaId = getAreaIdByDeviceId(event.getDeviceId());
|
||||
if (event.isOnline()) {
|
||||
badgeDeviceStatusService.updateBadgeOnlineStatus(
|
||||
event.getDeviceId(), event.getDeviceName(), event.getNickname(), areaId,
|
||||
BadgeDeviceStatusEnum.IDLE, "设备上线");
|
||||
return;
|
||||
}
|
||||
if (event.isOffline()) {
|
||||
badgeDeviceStatusService.updateBadgeOnlineStatus(
|
||||
event.getDeviceId(), event.getDeviceName(), event.getNickname(), null,
|
||||
BadgeDeviceStatusEnum.OFFLINE, "设备离线");
|
||||
return;
|
||||
}
|
||||
log.debug("[BadgeDeviceStatusEventHandler] 忽略未处理的状态变更: deviceId={}, newStatus={}",
|
||||
event.getDeviceId(), event.getNewStatus());
|
||||
}
|
||||
|
||||
private boolean isBadgeDevice(Long deviceId) {
|
||||
if (deviceId == null) {
|
||||
return false;
|
||||
}
|
||||
try {
|
||||
OpsAreaDeviceRelationDO relation = areaDeviceService.getByDeviceId(deviceId);
|
||||
return relation != null && "BADGE".equals(relation.getRelationType());
|
||||
} catch (Exception e) {
|
||||
log.warn("[BadgeDeviceStatusEventHandler] 查询设备绑定关系失败: deviceId={}", deviceId, e);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
private Long getAreaIdByDeviceId(Long deviceId) {
|
||||
if (deviceId == null) {
|
||||
return null;
|
||||
}
|
||||
try {
|
||||
OpsAreaDeviceRelationDO relation = areaDeviceService.getByDeviceId(deviceId);
|
||||
if (relation != null && "BADGE".equals(relation.getRelationType())) {
|
||||
return relation.getAreaId();
|
||||
}
|
||||
return null;
|
||||
} catch (Exception e) {
|
||||
log.warn("[BadgeDeviceStatusEventHandler] 查询设备所属区域失败: deviceId={}", deviceId, e);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
private void executeInTenantContext(Long tenantId, Runnable runnable) {
|
||||
Long currentTenantId = TenantContextHolder.getTenantId();
|
||||
if (tenantId == null || Objects.equals(currentTenantId, tenantId)) {
|
||||
runnable.run();
|
||||
return;
|
||||
}
|
||||
TenantUtils.execute(tenantId, runnable);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,160 +1,118 @@
|
||||
package com.viewsh.module.ops.environment.integration.consumer;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.viewsh.module.ops.core.lifecycle.OrderLifecycleManager;
|
||||
import com.viewsh.module.ops.core.lifecycle.model.OrderTransitionRequest;
|
||||
import com.viewsh.module.ops.dal.dataobject.workorder.OpsOrderDO;
|
||||
import com.viewsh.module.ops.dal.mysql.workorder.OpsOrderMapper;
|
||||
import com.viewsh.module.ops.enums.OperatorTypeEnum;
|
||||
import com.viewsh.module.ops.enums.WorkOrderStatusEnum;
|
||||
import com.viewsh.module.ops.environment.integration.dto.CleanOrderArriveEventDTO;
|
||||
import jakarta.annotation.Resource;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.rocketmq.spring.annotation.ConsumeMode;
|
||||
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
|
||||
import org.apache.rocketmq.spring.core.RocketMQListener;
|
||||
import org.springframework.data.redis.core.StringRedisTemplate;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/**
|
||||
* 保洁工单到岗事件消费者
|
||||
* <p>
|
||||
* 订阅 IoT 模块发布的保洁工单到岗事件
|
||||
* <p>
|
||||
* 调用链路:
|
||||
* <pre>
|
||||
* IoT 发布 RocketMQ 消息 (ops-order-arrive)
|
||||
* ↓
|
||||
* CleanOrderArriveEventHandler.onMessage()
|
||||
* - 幂等性检查
|
||||
* - 状态检查
|
||||
* ↓
|
||||
* orderLifecycleManager.transition(ARRIVED) [@Transactional]
|
||||
* - 更新工单状态
|
||||
* - 发布 OrderStateChangedEvent
|
||||
* ↓
|
||||
* 事务提交
|
||||
* ↓
|
||||
* CleanOrderEventListener.handleArrived() [@Async, AFTER_COMMIT]
|
||||
* - 记录到岗时间到扩展表
|
||||
* - 更新设备工单关联信息(areaId, beaconMac)
|
||||
* - 记录业务日志
|
||||
* </pre>
|
||||
* <p>
|
||||
* 设计说明:
|
||||
* - Handler 只负责消息接收和状态转换
|
||||
* - 业务日志、设备缓存更新由 CleanOrderEventListener 在事务提交后处理
|
||||
* - BadgeDeviceStatusServiceImpl 在 BEFORE_COMMIT 阶段跳过 ARRIVED(由 Listener 处理完整信息)
|
||||
*
|
||||
* @author AI
|
||||
*/
|
||||
@Slf4j
|
||||
@Component
|
||||
@RocketMQMessageListener(
|
||||
topic = "ops-order-arrive",
|
||||
consumerGroup = "ops-clean-order-arrive-group",
|
||||
consumeMode = ConsumeMode.CONCURRENTLY,
|
||||
selectorExpression = "*",
|
||||
accessKey = "${rocketmq.consumer.access-key:}",
|
||||
secretKey = "${rocketmq.consumer.secret-key:}"
|
||||
)
|
||||
public class CleanOrderArriveEventHandler implements RocketMQListener<String> {
|
||||
|
||||
/**
|
||||
* 幂等性控制 Key 模式
|
||||
*/
|
||||
private static final String DEDUP_KEY_PATTERN = "ops:clean:dedup:arrive:%s";
|
||||
|
||||
/**
|
||||
* 幂等性控制 TTL(秒)
|
||||
*/
|
||||
private static final int DEDUP_TTL_SECONDS = 300;
|
||||
|
||||
@Resource
|
||||
private ObjectMapper objectMapper;
|
||||
|
||||
@Resource
|
||||
private StringRedisTemplate stringRedisTemplate;
|
||||
|
||||
@Resource
|
||||
private OpsOrderMapper opsOrderMapper;
|
||||
|
||||
@Resource
|
||||
private OrderLifecycleManager orderLifecycleManager;
|
||||
|
||||
@Override
|
||||
public void onMessage(String message) {
|
||||
try {
|
||||
// 1. JSON 反序列化
|
||||
CleanOrderArriveEventDTO event = objectMapper.readValue(message, CleanOrderArriveEventDTO.class);
|
||||
|
||||
// 2. 幂等性检查
|
||||
String dedupKey = String.format(DEDUP_KEY_PATTERN, event.getEventId());
|
||||
Boolean firstTime = stringRedisTemplate.opsForValue()
|
||||
.setIfAbsent(dedupKey, "1", DEDUP_TTL_SECONDS, TimeUnit.SECONDS);
|
||||
|
||||
if (!firstTime) {
|
||||
log.debug("[CleanOrderArriveEventHandler] 重复消息,跳过处理: eventId={}", event.getEventId());
|
||||
return;
|
||||
}
|
||||
|
||||
// 3. 业务处理
|
||||
handleOrderArrive(event);
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("[CleanOrderArriveEventHandler] 消息处理失败: message={}", message, e);
|
||||
throw new RuntimeException("保洁工单到岗事件处理失败", e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 处理工单到岗
|
||||
*/
|
||||
private void handleOrderArrive(CleanOrderArriveEventDTO event) {
|
||||
log.info("[CleanOrderArriveEventHandler] 收到到岗事件: eventId={}, orderId={}, deviceId={}, areaId={}",
|
||||
event.getEventId(), event.getOrderId(), event.getDeviceId(), event.getAreaId());
|
||||
|
||||
// 1. 查询工单
|
||||
OpsOrderDO order = opsOrderMapper.selectById(event.getOrderId());
|
||||
if (order == null) {
|
||||
log.warn("[CleanOrderArriveEventHandler] 工单不存在: orderId={}", event.getOrderId());
|
||||
return;
|
||||
}
|
||||
|
||||
// 2. 检查工单状态
|
||||
WorkOrderStatusEnum currentStatus = WorkOrderStatusEnum.fromStatus(order.getStatus());
|
||||
if (!currentStatus.canStartWorking()) {
|
||||
log.warn("[CleanOrderArriveEventHandler] 工单状态不允许到岗: orderId={}, status={}",
|
||||
event.getOrderId(), order.getStatus());
|
||||
return;
|
||||
}
|
||||
|
||||
// 3. 构建状态转换请求(包含完整信息,供 Listener 使用)
|
||||
Map<String, Object> payload = new HashMap<>();
|
||||
payload.put("deviceId", event.getDeviceId());
|
||||
payload.put("deviceKey", event.getDeviceKey());
|
||||
payload.put("areaId", event.getAreaId());
|
||||
payload.put("triggerSource", event.getTriggerSource());
|
||||
if (event.getTriggerData() != null) {
|
||||
payload.putAll(event.getTriggerData());
|
||||
}
|
||||
|
||||
OrderTransitionRequest request = OrderTransitionRequest.builder()
|
||||
.orderId(event.getOrderId())
|
||||
.targetStatus(WorkOrderStatusEnum.ARRIVED)
|
||||
.operatorType(OperatorTypeEnum.SYSTEM)
|
||||
.reason("蓝牙信标自动到岗确认")
|
||||
.payload(payload)
|
||||
.build();
|
||||
// 4. 执行状态转换
|
||||
// 注意:业务日志和设备缓存更新由 CleanOrderEventListener 在 AFTER_COMMIT 阶段处理
|
||||
orderLifecycleManager.transition(request);
|
||||
|
||||
log.info("[CleanOrderArriveEventHandler] 工单到岗成功: eventId={}, orderId={}, deviceId={}",
|
||||
event.getEventId(), event.getOrderId(), event.getDeviceId());
|
||||
}
|
||||
}
|
||||
package com.viewsh.module.ops.environment.integration.consumer;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.viewsh.framework.tenant.core.context.TenantContextHolder;
|
||||
import com.viewsh.framework.tenant.core.util.TenantUtils;
|
||||
import com.viewsh.module.ops.core.lifecycle.OrderLifecycleManager;
|
||||
import com.viewsh.module.ops.core.lifecycle.model.OrderTransitionRequest;
|
||||
import com.viewsh.module.ops.dal.dataobject.workorder.OpsOrderDO;
|
||||
import com.viewsh.module.ops.dal.mysql.workorder.OpsOrderMapper;
|
||||
import com.viewsh.module.ops.enums.OperatorTypeEnum;
|
||||
import com.viewsh.module.ops.enums.WorkOrderStatusEnum;
|
||||
import com.viewsh.module.ops.environment.integration.dto.CleanOrderArriveEventDTO;
|
||||
import com.viewsh.module.ops.infrastructure.redis.OpsRedisKeyBuilder;
|
||||
import jakarta.annotation.Resource;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.rocketmq.spring.annotation.ConsumeMode;
|
||||
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
|
||||
import org.apache.rocketmq.spring.core.RocketMQListener;
|
||||
import org.springframework.data.redis.core.StringRedisTemplate;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
@Slf4j
|
||||
@Component
|
||||
@RocketMQMessageListener(
|
||||
topic = "ops-order-arrive",
|
||||
consumerGroup = "ops-clean-order-arrive-group",
|
||||
consumeMode = ConsumeMode.CONCURRENTLY,
|
||||
selectorExpression = "*",
|
||||
accessKey = "${rocketmq.consumer.access-key:}",
|
||||
secretKey = "${rocketmq.consumer.secret-key:}"
|
||||
)
|
||||
public class CleanOrderArriveEventHandler implements RocketMQListener<String> {
|
||||
|
||||
private static final int DEDUP_TTL_SECONDS = 300;
|
||||
|
||||
@Resource
|
||||
private ObjectMapper objectMapper;
|
||||
|
||||
@Resource
|
||||
private StringRedisTemplate stringRedisTemplate;
|
||||
|
||||
@Resource
|
||||
private OpsOrderMapper opsOrderMapper;
|
||||
|
||||
@Resource
|
||||
private OrderLifecycleManager orderLifecycleManager;
|
||||
|
||||
@Override
|
||||
public void onMessage(String message) {
|
||||
try {
|
||||
CleanOrderArriveEventDTO event = objectMapper.readValue(message, CleanOrderArriveEventDTO.class);
|
||||
String dedupKey = OpsRedisKeyBuilder.eventDedup(event.getTenantId(), "arrive", event.getEventId());
|
||||
Boolean firstTime = stringRedisTemplate.opsForValue()
|
||||
.setIfAbsent(dedupKey, "1", DEDUP_TTL_SECONDS, TimeUnit.SECONDS);
|
||||
if (!Boolean.TRUE.equals(firstTime)) {
|
||||
log.debug("[CleanOrderArriveEventHandler] 重复消息,跳过处理: eventId={}", event.getEventId());
|
||||
return;
|
||||
}
|
||||
executeInTenantContext(event.getTenantId(), () -> handleOrderArrive(event));
|
||||
} catch (Exception e) {
|
||||
log.error("[CleanOrderArriveEventHandler] 消息处理失败: message={}", message, e);
|
||||
throw new RuntimeException("保洁工单到岗事件处理失败", e);
|
||||
}
|
||||
}
|
||||
|
||||
private void handleOrderArrive(CleanOrderArriveEventDTO event) {
|
||||
log.info("[CleanOrderArriveEventHandler] 收到到岗事件: eventId={}, orderId={}, deviceId={}, areaId={}",
|
||||
event.getEventId(), event.getOrderId(), event.getDeviceId(), event.getAreaId());
|
||||
|
||||
OpsOrderDO order = opsOrderMapper.selectById(event.getOrderId());
|
||||
if (order == null) {
|
||||
log.warn("[CleanOrderArriveEventHandler] 工单不存在: orderId={}", event.getOrderId());
|
||||
return;
|
||||
}
|
||||
|
||||
WorkOrderStatusEnum currentStatus = WorkOrderStatusEnum.fromStatus(order.getStatus());
|
||||
if (!currentStatus.canStartWorking()) {
|
||||
log.warn("[CleanOrderArriveEventHandler] 当前状态不允许到岗: orderId={}, status={}",
|
||||
event.getOrderId(), order.getStatus());
|
||||
return;
|
||||
}
|
||||
|
||||
Map<String, Object> payload = new HashMap<>();
|
||||
payload.put("deviceId", event.getDeviceId());
|
||||
payload.put("deviceKey", event.getDeviceKey());
|
||||
payload.put("areaId", event.getAreaId());
|
||||
payload.put("triggerSource", event.getTriggerSource());
|
||||
if (event.getTriggerData() != null) {
|
||||
payload.putAll(event.getTriggerData());
|
||||
}
|
||||
|
||||
OrderTransitionRequest request = OrderTransitionRequest.builder()
|
||||
.orderId(event.getOrderId())
|
||||
.targetStatus(WorkOrderStatusEnum.ARRIVED)
|
||||
.operatorType(OperatorTypeEnum.SYSTEM)
|
||||
.reason("设备触发自动到岗")
|
||||
.payload(payload)
|
||||
.build();
|
||||
orderLifecycleManager.transition(request);
|
||||
|
||||
log.info("[CleanOrderArriveEventHandler] 工单到岗完成: eventId={}, orderId={}, deviceId={}",
|
||||
event.getEventId(), event.getOrderId(), event.getDeviceId());
|
||||
}
|
||||
|
||||
private void executeInTenantContext(Long tenantId, Runnable runnable) {
|
||||
Long currentTenantId = TenantContextHolder.getTenantId();
|
||||
if (tenantId == null || Objects.equals(currentTenantId, tenantId)) {
|
||||
runnable.run();
|
||||
return;
|
||||
}
|
||||
TenantUtils.execute(tenantId, runnable);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,6 +1,8 @@
|
||||
package com.viewsh.module.ops.environment.integration.consumer;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.viewsh.framework.tenant.core.context.TenantContextHolder;
|
||||
import com.viewsh.framework.tenant.core.util.TenantUtils;
|
||||
import com.viewsh.framework.mybatis.core.query.LambdaQueryWrapperX;
|
||||
import com.viewsh.module.ops.dal.dataobject.workorder.OpsOrderDO;
|
||||
import com.viewsh.module.ops.dal.mysql.workorder.OpsOrderMapper;
|
||||
@@ -13,8 +15,9 @@ import com.viewsh.module.ops.infrastructure.log.enumeration.EventDomain;
|
||||
import com.viewsh.module.ops.infrastructure.log.enumeration.EventLevel;
|
||||
import com.viewsh.module.ops.infrastructure.log.enumeration.LogModule;
|
||||
import com.viewsh.module.ops.infrastructure.log.enumeration.LogType;
|
||||
import com.viewsh.module.ops.infrastructure.log.recorder.EventLogRecord;
|
||||
import com.viewsh.module.ops.infrastructure.log.recorder.EventLogRecorder;
|
||||
import com.viewsh.module.ops.infrastructure.log.recorder.EventLogRecord;
|
||||
import com.viewsh.module.ops.infrastructure.log.recorder.EventLogRecorder;
|
||||
import com.viewsh.module.ops.infrastructure.redis.OpsRedisKeyBuilder;
|
||||
import jakarta.annotation.Resource;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.rocketmq.spring.annotation.ConsumeMode;
|
||||
@@ -23,8 +26,9 @@ import org.apache.rocketmq.spring.core.RocketMQListener;
|
||||
import org.springframework.data.redis.core.StringRedisTemplate;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.Arrays;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/**
|
||||
* 保洁工单审计事件消费者
|
||||
@@ -84,7 +88,7 @@ public class CleanOrderAuditEventHandler implements RocketMQListener<String> {
|
||||
CleanOrderAuditEventDTO event = objectMapper.readValue(message, CleanOrderAuditEventDTO.class);
|
||||
|
||||
// 2. 幂等性检查
|
||||
String dedupKey = String.format(DEDUP_KEY_PATTERN, event.getEventId());
|
||||
String dedupKey = OpsRedisKeyBuilder.eventDedup(event.getTenantId(), "audit", event.getEventId());
|
||||
Boolean firstTime = stringRedisTemplate.opsForValue()
|
||||
.setIfAbsent(dedupKey, "1", DEDUP_TTL_SECONDS, TimeUnit.SECONDS);
|
||||
|
||||
@@ -94,7 +98,7 @@ public class CleanOrderAuditEventHandler implements RocketMQListener<String> {
|
||||
}
|
||||
|
||||
// 3. 业务处理
|
||||
handleAuditEvent(event);
|
||||
executeInTenantContext(event.getTenantId(), () -> handleAuditEvent(event));
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("[CleanOrderAuditEventHandler] 消息处理失败: message={}", message, e);
|
||||
@@ -256,5 +260,14 @@ public class CleanOrderAuditEventHandler implements RocketMQListener<String> {
|
||||
return EventLevel.WARN;
|
||||
}
|
||||
return EventLevel.INFO;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void executeInTenantContext(Long tenantId, Runnable runnable) {
|
||||
Long currentTenantId = TenantContextHolder.getTenantId();
|
||||
if (tenantId == null || Objects.equals(currentTenantId, tenantId)) {
|
||||
runnable.run();
|
||||
return;
|
||||
}
|
||||
TenantUtils.execute(tenantId, runnable);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,191 +1,130 @@
|
||||
package com.viewsh.module.ops.environment.integration.consumer;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.viewsh.module.ops.core.lifecycle.OrderLifecycleManager;
|
||||
import com.viewsh.module.ops.core.lifecycle.model.OrderTransitionRequest;
|
||||
import com.viewsh.module.ops.dal.dataobject.workorder.OpsOrderDO;
|
||||
import com.viewsh.module.ops.dal.mysql.workorder.OpsOrderMapper;
|
||||
import com.viewsh.module.ops.enums.OperatorTypeEnum;
|
||||
import com.viewsh.module.ops.enums.WorkOrderStatusEnum;
|
||||
import com.viewsh.module.ops.environment.integration.dto.CleanOrderCompleteEventDTO;
|
||||
import jakarta.annotation.Resource;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.rocketmq.spring.annotation.ConsumeMode;
|
||||
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
|
||||
import org.apache.rocketmq.spring.core.RocketMQListener;
|
||||
import org.springframework.data.redis.core.StringRedisTemplate;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/**
|
||||
* 保洁工单完成事件消费者
|
||||
* <p>
|
||||
* 订阅 IoT 模块发布的保洁工单完成事件
|
||||
* <p>
|
||||
* 调用链路:
|
||||
*
|
||||
* <pre>
|
||||
* IoT 发布 RocketMQ 消息 (ops-order-complete)
|
||||
* ↓
|
||||
* CleanOrderCompleteEventHandler.onMessage()
|
||||
* - 幂等性检查
|
||||
* - 状态检查
|
||||
* ↓
|
||||
* orderLifecycleManager.transition(COMPLETED) [@Transactional]
|
||||
* - 更新工单状态
|
||||
* - 发布 OrderStateChangedEvent
|
||||
* ↓
|
||||
* 事务提交
|
||||
* ↓
|
||||
* BadgeDeviceStatusEventListener.onOrderStateChanged() [BEFORE_COMMIT]
|
||||
* - 清除设备工单关联
|
||||
* - 检查等待任务,决定设备状态 (BUSY/IDLE)
|
||||
* ↓
|
||||
* CleanOrderEventListener.handleCompleted() [AFTER_COMMIT]
|
||||
* - 记录完成时间到扩展表
|
||||
* - 记录业务日志
|
||||
* CleanOrderEventListener.onOrderCompleted() [AFTER_COMMIT]
|
||||
* - 自动调度下一个任务
|
||||
* - 发送完成通知
|
||||
* </pre>
|
||||
* <p>
|
||||
* 设计说明:
|
||||
* - Handler 只负责消息接收和状态转换
|
||||
* - 设备状态由 BadgeDeviceStatusEventListener 在 BEFORE_COMMIT 阶段处理
|
||||
* - 业务日志、自动调度由 CleanOrderEventListener 在 AFTER_COMMIT 阶段处理
|
||||
*
|
||||
* @author AI
|
||||
*/
|
||||
@Slf4j
|
||||
@Component
|
||||
@RocketMQMessageListener(
|
||||
topic = "ops-order-complete",
|
||||
consumerGroup = "ops-clean-order-complete-group",
|
||||
consumeMode = ConsumeMode.CONCURRENTLY,
|
||||
selectorExpression = "*",
|
||||
accessKey = "${rocketmq.consumer.access-key:}",
|
||||
secretKey = "${rocketmq.consumer.secret-key:}"
|
||||
)
|
||||
public class CleanOrderCompleteEventHandler implements RocketMQListener<String> {
|
||||
|
||||
/**
|
||||
* 幂等性控制 Key 模式
|
||||
*/
|
||||
private static final String DEDUP_KEY_PATTERN = "ops:clean:dedup:complete:%s";
|
||||
|
||||
/**
|
||||
* 幂等性控制 TTL(秒)
|
||||
*/
|
||||
private static final int DEDUP_TTL_SECONDS = 300;
|
||||
|
||||
@Resource
|
||||
private ObjectMapper objectMapper;
|
||||
|
||||
@Resource
|
||||
private StringRedisTemplate stringRedisTemplate;
|
||||
|
||||
@Resource
|
||||
private OpsOrderMapper opsOrderMapper;
|
||||
|
||||
@Resource
|
||||
private OrderLifecycleManager orderLifecycleManager;
|
||||
|
||||
@Override
|
||||
public void onMessage(String message) {
|
||||
try {
|
||||
// 1. JSON 反序列化
|
||||
CleanOrderCompleteEventDTO event = objectMapper.readValue(message, CleanOrderCompleteEventDTO.class);
|
||||
|
||||
// 2. 幂等性检查
|
||||
String dedupKey = String.format(DEDUP_KEY_PATTERN, event.getEventId());
|
||||
Boolean firstTime = stringRedisTemplate.opsForValue()
|
||||
.setIfAbsent(dedupKey, "1", DEDUP_TTL_SECONDS, TimeUnit.SECONDS);
|
||||
|
||||
if (!firstTime) {
|
||||
log.debug("[CleanOrderCompleteEventHandler] 重复消息,跳过处理: eventId={}", event.getEventId());
|
||||
return;
|
||||
}
|
||||
|
||||
// 3. 业务处理
|
||||
handleOrderComplete(event);
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("[CleanOrderCompleteEventHandler] 消息处理失败: message={}", message, e);
|
||||
throw new RuntimeException("保洁工单完成事件处理失败", e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 处理工单完成
|
||||
*/
|
||||
private void handleOrderComplete(CleanOrderCompleteEventDTO event) {
|
||||
log.info("[CleanOrderCompleteEventHandler] 收到完成事件: eventId={}, orderId={}, deviceId={}, areaId={}",
|
||||
event.getEventId(), event.getOrderId(), event.getDeviceId(), event.getAreaId());
|
||||
|
||||
// 1. 查询工单
|
||||
OpsOrderDO order = opsOrderMapper.selectById(event.getOrderId());
|
||||
if (order == null) {
|
||||
log.warn("[CleanOrderCompleteEventHandler] 工单不存在: orderId={}", event.getOrderId());
|
||||
return;
|
||||
}
|
||||
|
||||
// 2. 检查工单状态
|
||||
WorkOrderStatusEnum currentStatus = WorkOrderStatusEnum.fromStatus(order.getStatus());
|
||||
if (!currentStatus.canComplete()) {
|
||||
log.warn("[CleanOrderCompleteEventHandler] 工单状态不允许完成: orderId={}, status={}",
|
||||
event.getOrderId(), order.getStatus());
|
||||
return;
|
||||
}
|
||||
|
||||
// 3. 计算作业时长
|
||||
String remark = buildCompletionRemark(event);
|
||||
|
||||
// 4. 构建状态转换请求
|
||||
Map<String, Object> payload = new HashMap<>();
|
||||
payload.put("deviceId", event.getDeviceId());
|
||||
payload.put("deviceKey", event.getDeviceKey());
|
||||
payload.put("areaId", event.getAreaId());
|
||||
payload.put("triggerSource", event.getTriggerSource());
|
||||
if (event.getTriggerData() != null) {
|
||||
payload.putAll(event.getTriggerData());
|
||||
}
|
||||
|
||||
OrderTransitionRequest request = OrderTransitionRequest.builder()
|
||||
.orderId(event.getOrderId())
|
||||
.targetStatus(WorkOrderStatusEnum.COMPLETED)
|
||||
.operatorType(OperatorTypeEnum.SYSTEM)
|
||||
.assigneeId(event.getDeviceId()) // 设备ID用于后续自动调度
|
||||
.reason(remark)
|
||||
.payload(payload)
|
||||
.build();
|
||||
|
||||
// 5. 执行状态转换
|
||||
// 注意:设备状态、业务日志、自动调度由事件监听器处理
|
||||
orderLifecycleManager.transition(request);
|
||||
|
||||
log.info("[CleanOrderCompleteEventHandler] 工单完成成功: eventId={}, orderId={}",
|
||||
event.getEventId(), event.getOrderId());
|
||||
}
|
||||
|
||||
/**
|
||||
* 构建完成备注
|
||||
*/
|
||||
private String buildCompletionRemark(CleanOrderCompleteEventDTO event) {
|
||||
StringBuilder remark = new StringBuilder();
|
||||
remark.append("信号丢失超时自动完成");
|
||||
|
||||
if (event.getTriggerData() != null) {
|
||||
Object durationMs = event.getTriggerData().get("durationMs");
|
||||
if (durationMs != null) {
|
||||
long durationMinutes = ((Number) durationMs).longValue() / 60000;
|
||||
remark.append(",作业时长: ").append(durationMinutes).append("分钟");
|
||||
}
|
||||
}
|
||||
|
||||
return remark.toString();
|
||||
}
|
||||
|
||||
}
|
||||
package com.viewsh.module.ops.environment.integration.consumer;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.viewsh.framework.tenant.core.context.TenantContextHolder;
|
||||
import com.viewsh.framework.tenant.core.util.TenantUtils;
|
||||
import com.viewsh.module.ops.core.lifecycle.OrderLifecycleManager;
|
||||
import com.viewsh.module.ops.core.lifecycle.model.OrderTransitionRequest;
|
||||
import com.viewsh.module.ops.dal.dataobject.workorder.OpsOrderDO;
|
||||
import com.viewsh.module.ops.dal.mysql.workorder.OpsOrderMapper;
|
||||
import com.viewsh.module.ops.enums.OperatorTypeEnum;
|
||||
import com.viewsh.module.ops.enums.WorkOrderStatusEnum;
|
||||
import com.viewsh.module.ops.environment.integration.dto.CleanOrderCompleteEventDTO;
|
||||
import com.viewsh.module.ops.infrastructure.redis.OpsRedisKeyBuilder;
|
||||
import jakarta.annotation.Resource;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.rocketmq.spring.annotation.ConsumeMode;
|
||||
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
|
||||
import org.apache.rocketmq.spring.core.RocketMQListener;
|
||||
import org.springframework.data.redis.core.StringRedisTemplate;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
@Slf4j
|
||||
@Component
|
||||
@RocketMQMessageListener(
|
||||
topic = "ops-order-complete",
|
||||
consumerGroup = "ops-clean-order-complete-group",
|
||||
consumeMode = ConsumeMode.CONCURRENTLY,
|
||||
selectorExpression = "*",
|
||||
accessKey = "${rocketmq.consumer.access-key:}",
|
||||
secretKey = "${rocketmq.consumer.secret-key:}"
|
||||
)
|
||||
public class CleanOrderCompleteEventHandler implements RocketMQListener<String> {
|
||||
|
||||
private static final int DEDUP_TTL_SECONDS = 300;
|
||||
|
||||
@Resource
|
||||
private ObjectMapper objectMapper;
|
||||
|
||||
@Resource
|
||||
private StringRedisTemplate stringRedisTemplate;
|
||||
|
||||
@Resource
|
||||
private OpsOrderMapper opsOrderMapper;
|
||||
|
||||
@Resource
|
||||
private OrderLifecycleManager orderLifecycleManager;
|
||||
|
||||
@Override
|
||||
public void onMessage(String message) {
|
||||
try {
|
||||
CleanOrderCompleteEventDTO event = objectMapper.readValue(message, CleanOrderCompleteEventDTO.class);
|
||||
String dedupKey = OpsRedisKeyBuilder.eventDedup(event.getTenantId(), "complete", event.getEventId());
|
||||
Boolean firstTime = stringRedisTemplate.opsForValue()
|
||||
.setIfAbsent(dedupKey, "1", DEDUP_TTL_SECONDS, TimeUnit.SECONDS);
|
||||
if (!Boolean.TRUE.equals(firstTime)) {
|
||||
log.debug("[CleanOrderCompleteEventHandler] 重复消息,跳过处理: eventId={}", event.getEventId());
|
||||
return;
|
||||
}
|
||||
executeInTenantContext(event.getTenantId(), () -> handleOrderComplete(event));
|
||||
} catch (Exception e) {
|
||||
log.error("[CleanOrderCompleteEventHandler] 消息处理失败: message={}", message, e);
|
||||
throw new RuntimeException("保洁工单完单事件处理失败", e);
|
||||
}
|
||||
}
|
||||
|
||||
private void handleOrderComplete(CleanOrderCompleteEventDTO event) {
|
||||
log.info("[CleanOrderCompleteEventHandler] 收到完单事件: eventId={}, orderId={}, deviceId={}, areaId={}",
|
||||
event.getEventId(), event.getOrderId(), event.getDeviceId(), event.getAreaId());
|
||||
|
||||
OpsOrderDO order = opsOrderMapper.selectById(event.getOrderId());
|
||||
if (order == null) {
|
||||
log.warn("[CleanOrderCompleteEventHandler] 工单不存在: orderId={}", event.getOrderId());
|
||||
return;
|
||||
}
|
||||
|
||||
WorkOrderStatusEnum currentStatus = WorkOrderStatusEnum.fromStatus(order.getStatus());
|
||||
if (!currentStatus.canComplete()) {
|
||||
log.warn("[CleanOrderCompleteEventHandler] 当前状态不允许完单: orderId={}, status={}",
|
||||
event.getOrderId(), order.getStatus());
|
||||
return;
|
||||
}
|
||||
|
||||
Map<String, Object> payload = new HashMap<>();
|
||||
payload.put("deviceId", event.getDeviceId());
|
||||
payload.put("deviceKey", event.getDeviceKey());
|
||||
payload.put("areaId", event.getAreaId());
|
||||
payload.put("triggerSource", event.getTriggerSource());
|
||||
if (event.getTriggerData() != null) {
|
||||
payload.putAll(event.getTriggerData());
|
||||
}
|
||||
|
||||
OrderTransitionRequest request = OrderTransitionRequest.builder()
|
||||
.orderId(event.getOrderId())
|
||||
.targetStatus(WorkOrderStatusEnum.COMPLETED)
|
||||
.operatorType(OperatorTypeEnum.SYSTEM)
|
||||
.assigneeId(event.getDeviceId())
|
||||
.reason(buildCompletionRemark(event))
|
||||
.payload(payload)
|
||||
.build();
|
||||
orderLifecycleManager.transition(request);
|
||||
|
||||
log.info("[CleanOrderCompleteEventHandler] 工单完单完成: eventId={}, orderId={}",
|
||||
event.getEventId(), event.getOrderId());
|
||||
}
|
||||
|
||||
private String buildCompletionRemark(CleanOrderCompleteEventDTO event) {
|
||||
StringBuilder remark = new StringBuilder("设备触发自动完单");
|
||||
if (event.getTriggerData() != null) {
|
||||
Object durationMs = event.getTriggerData().get("durationMs");
|
||||
if (durationMs instanceof Number number) {
|
||||
remark.append(",作业时长约 ").append(number.longValue() / 60000).append(" 分钟");
|
||||
}
|
||||
}
|
||||
return remark.toString();
|
||||
}
|
||||
|
||||
private void executeInTenantContext(Long tenantId, Runnable runnable) {
|
||||
Long currentTenantId = TenantContextHolder.getTenantId();
|
||||
if (tenantId == null || Objects.equals(currentTenantId, tenantId)) {
|
||||
runnable.run();
|
||||
return;
|
||||
}
|
||||
TenantUtils.execute(tenantId, runnable);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,6 +1,8 @@
|
||||
package com.viewsh.module.ops.environment.integration.consumer;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.viewsh.framework.tenant.core.context.TenantContextHolder;
|
||||
import com.viewsh.framework.tenant.core.util.TenantUtils;
|
||||
import com.viewsh.module.ops.core.lifecycle.OrderLifecycleManager;
|
||||
import com.viewsh.module.ops.core.lifecycle.model.OrderTransitionRequest;
|
||||
import com.viewsh.module.ops.dal.dataobject.workorder.OpsOrderDO;
|
||||
@@ -8,6 +10,7 @@ import com.viewsh.module.ops.dal.mysql.workorder.OpsOrderMapper;
|
||||
import com.viewsh.module.ops.enums.OperatorTypeEnum;
|
||||
import com.viewsh.module.ops.enums.WorkOrderStatusEnum;
|
||||
import com.viewsh.module.ops.environment.integration.dto.CleanOrderConfirmEventDTO;
|
||||
import com.viewsh.module.ops.infrastructure.redis.OpsRedisKeyBuilder;
|
||||
import jakarta.annotation.Resource;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.rocketmq.spring.annotation.ConsumeMode;
|
||||
@@ -16,39 +19,9 @@ import org.apache.rocketmq.spring.core.RocketMQListener;
|
||||
import org.springframework.data.redis.core.StringRedisTemplate;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/**
|
||||
* 保洁工单确认事件消费者
|
||||
* <p>
|
||||
* 订阅 IoT 模块发布的工单确认事件(如:工牌按键确认)
|
||||
* <p>
|
||||
* 调用链路:
|
||||
* <pre>
|
||||
* IoT 发布 RocketMQ 消息 (ops-order-confirm)
|
||||
* ↓
|
||||
* CleanOrderConfirmEventHandler.onMessage()
|
||||
* - 幂等性检查
|
||||
* - 状态检查
|
||||
* ↓
|
||||
* orderLifecycleManager.transition(CONFIRMED) [@Transactional]
|
||||
* - 更新工单状态
|
||||
* - 发布 OrderStateChangedEvent
|
||||
* ↓
|
||||
* 事务提交
|
||||
* ↓
|
||||
* CleanOrderEventListener.onOrderStateChanged() [@TransactionalEventListener(AFTER_COMMIT)]
|
||||
* - 记录业务日志
|
||||
* - 发送 TTS "工单已确认,请前往作业区域开始作业"
|
||||
* </pre>
|
||||
* <p>
|
||||
* 设计说明:
|
||||
* - Handler 只负责消息接收和状态转换
|
||||
* - 日志记录和通知发送由 CleanOrderEventListener 在事务提交后处理
|
||||
* - 设备状态由 BadgeDeviceStatusServiceImpl 在 BEFORE_COMMIT 阶段同步
|
||||
*
|
||||
* @author AI
|
||||
*/
|
||||
@Slf4j
|
||||
@Component
|
||||
@RocketMQMessageListener(
|
||||
@@ -60,7 +33,6 @@ import java.util.concurrent.TimeUnit;
|
||||
)
|
||||
public class CleanOrderConfirmEventHandler implements RocketMQListener<String> {
|
||||
|
||||
private static final String DEDUP_KEY_PATTERN = "ops:clean:dedup:confirm:%s";
|
||||
private static final int DEDUP_TTL_SECONDS = 300;
|
||||
|
||||
@Resource
|
||||
@@ -78,66 +50,61 @@ public class CleanOrderConfirmEventHandler implements RocketMQListener<String> {
|
||||
@Override
|
||||
public void onMessage(String message) {
|
||||
try {
|
||||
// 1. JSON 反序列化
|
||||
CleanOrderConfirmEventDTO event = objectMapper.readValue(message, CleanOrderConfirmEventDTO.class);
|
||||
String eventId = event.getEventId();
|
||||
|
||||
// 2. 幂等性检查
|
||||
String dedupKey = String.format(DEDUP_KEY_PATTERN, eventId);
|
||||
String dedupKey = OpsRedisKeyBuilder.eventDedup(event.getTenantId(), "confirm", event.getEventId());
|
||||
Boolean firstTime = stringRedisTemplate.opsForValue()
|
||||
.setIfAbsent(dedupKey, "1", DEDUP_TTL_SECONDS, TimeUnit.SECONDS);
|
||||
|
||||
if (!Boolean.TRUE.equals(firstTime)) {
|
||||
log.debug("[CleanOrderConfirmEventHandler] 重复事件,跳过处理: eventId={}", eventId);
|
||||
log.debug("[CleanOrderConfirmEventHandler] 重复消息,跳过处理: eventId={}", event.getEventId());
|
||||
return;
|
||||
}
|
||||
|
||||
// 3. 加载工单
|
||||
Long orderId = event.getOrderId();
|
||||
OpsOrderDO order = opsOrderMapper.selectById(orderId);
|
||||
if (order == null) {
|
||||
log.warn("[CleanOrderConfirmEventHandler] 工单不存在: orderId={}", orderId);
|
||||
return;
|
||||
}
|
||||
|
||||
WorkOrderStatusEnum currentStatus = WorkOrderStatusEnum.fromStatus(order.getStatus());
|
||||
log.info("[CleanOrderConfirmEventHandler] 收到确认事件: eventId={}, orderId={}, currentStatus={}, deviceId={}",
|
||||
eventId, orderId, currentStatus, event.getDeviceId());
|
||||
|
||||
// 4. 状态检查
|
||||
// 如果已在进行中 (CONFIRMED or ARRIVED),直接返回(TTS 由 Listener 处理)
|
||||
if (currentStatus == WorkOrderStatusEnum.CONFIRMED || currentStatus == WorkOrderStatusEnum.ARRIVED) {
|
||||
log.debug("[CleanOrderConfirmEventHandler] 工单已在进行中: orderId={}, status={}", orderId, currentStatus);
|
||||
return;
|
||||
}
|
||||
|
||||
// 检查是否可以确认
|
||||
if (!currentStatus.canConfirm()) {
|
||||
log.warn("[CleanOrderConfirmEventHandler] 当前状态无法确认工单: orderId={}, status={}", orderId, currentStatus);
|
||||
return;
|
||||
}
|
||||
|
||||
// 5. 构建状态转换请求(包含 deviceId,供 Listener 使用)
|
||||
OrderTransitionRequest request = OrderTransitionRequest.builder()
|
||||
.orderId(orderId)
|
||||
.targetStatus(WorkOrderStatusEnum.CONFIRMED)
|
||||
.reason("工牌按键确认")
|
||||
.operatorType(OperatorTypeEnum.CLEANER)
|
||||
.operatorId(event.getDeviceId() != null ? event.getDeviceId() : order.getAssigneeId())
|
||||
.build();
|
||||
// 将 deviceId 放入 payload,供 Listener 使用
|
||||
request.putPayload("deviceId", event.getDeviceId());
|
||||
request.putPayload("triggerSource", "BADGE_BUTTON");
|
||||
|
||||
// 6. 执行状态转换
|
||||
// 注意:日志记录和 TTS 由 CleanOrderEventListener 在 AFTER_COMMIT 阶段处理
|
||||
orderLifecycleManager.transition(request);
|
||||
|
||||
log.info("[CleanOrderConfirmEventHandler] 工单确认成功: orderId={}, deviceId={}", orderId, event.getDeviceId());
|
||||
|
||||
executeInTenantContext(event.getTenantId(), () -> handleConfirmEvent(event));
|
||||
} catch (Exception e) {
|
||||
log.error("[CleanOrderConfirmEventHandler] 消息处理失败: message={}", message, e);
|
||||
throw new RuntimeException("保洁工单确认事件处理失败", e);
|
||||
}
|
||||
}
|
||||
|
||||
private void handleConfirmEvent(CleanOrderConfirmEventDTO event) {
|
||||
Long orderId = event.getOrderId();
|
||||
OpsOrderDO order = opsOrderMapper.selectById(orderId);
|
||||
if (order == null) {
|
||||
log.warn("[CleanOrderConfirmEventHandler] 工单不存在: orderId={}", orderId);
|
||||
return;
|
||||
}
|
||||
|
||||
WorkOrderStatusEnum currentStatus = WorkOrderStatusEnum.fromStatus(order.getStatus());
|
||||
log.info("[CleanOrderConfirmEventHandler] 收到确认事件: eventId={}, orderId={}, currentStatus={}, deviceId={}",
|
||||
event.getEventId(), orderId, currentStatus, event.getDeviceId());
|
||||
if (currentStatus == WorkOrderStatusEnum.CONFIRMED || currentStatus == WorkOrderStatusEnum.ARRIVED) {
|
||||
log.debug("[CleanOrderConfirmEventHandler] 工单已确认或已到岗,跳过处理: orderId={}, status={}", orderId, currentStatus);
|
||||
return;
|
||||
}
|
||||
if (!currentStatus.canConfirm()) {
|
||||
log.warn("[CleanOrderConfirmEventHandler] 当前状态不允许确认: orderId={}, status={}", orderId, currentStatus);
|
||||
return;
|
||||
}
|
||||
|
||||
OrderTransitionRequest request = OrderTransitionRequest.builder()
|
||||
.orderId(orderId)
|
||||
.targetStatus(WorkOrderStatusEnum.CONFIRMED)
|
||||
.reason("设备确认工单")
|
||||
.operatorType(OperatorTypeEnum.CLEANER)
|
||||
.operatorId(event.getDeviceId() != null ? event.getDeviceId() : order.getAssigneeId())
|
||||
.build();
|
||||
request.putPayload("deviceId", event.getDeviceId());
|
||||
request.putPayload("triggerSource", "BADGE_BUTTON");
|
||||
orderLifecycleManager.transition(request);
|
||||
|
||||
log.info("[CleanOrderConfirmEventHandler] 工单确认完成: orderId={}, deviceId={}", orderId, event.getDeviceId());
|
||||
}
|
||||
|
||||
private void executeInTenantContext(Long tenantId, Runnable runnable) {
|
||||
Long currentTenantId = TenantContextHolder.getTenantId();
|
||||
if (tenantId == null || Objects.equals(currentTenantId, tenantId)) {
|
||||
runnable.run();
|
||||
return;
|
||||
}
|
||||
TenantUtils.execute(tenantId, runnable);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,6 +1,8 @@
|
||||
package com.viewsh.module.ops.environment.integration.consumer;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.viewsh.framework.tenant.core.context.TenantContextHolder;
|
||||
import com.viewsh.framework.tenant.core.util.TenantUtils;
|
||||
import com.viewsh.module.iot.api.device.IotDeviceControlApi;
|
||||
import com.viewsh.module.iot.api.device.dto.ResetTrafficCounterReqDTO;
|
||||
import com.viewsh.module.ops.dal.dataobject.workorder.OpsOrderDO;
|
||||
@@ -18,6 +20,7 @@ import com.viewsh.module.ops.infrastructure.log.enumeration.LogModule;
|
||||
import com.viewsh.module.ops.infrastructure.log.enumeration.LogType;
|
||||
import com.viewsh.module.ops.infrastructure.log.recorder.EventLogRecord;
|
||||
import com.viewsh.module.ops.infrastructure.log.recorder.EventLogRecorder;
|
||||
import com.viewsh.module.ops.infrastructure.redis.OpsRedisKeyBuilder;
|
||||
import jakarta.annotation.Resource;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.rocketmq.spring.annotation.ConsumeMode;
|
||||
@@ -28,6 +31,7 @@ import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
@@ -106,7 +110,7 @@ public class CleanOrderCreateEventHandler implements RocketMQListener<String> {
|
||||
CleanOrderCreateEventDTO event = objectMapper.readValue(message, CleanOrderCreateEventDTO.class);
|
||||
|
||||
// 2. 幂等性检查
|
||||
String dedupKey = String.format(DEDUP_KEY_PATTERN, event.getEventId());
|
||||
String dedupKey = OpsRedisKeyBuilder.eventDedup(event.getTenantId(), "create", event.getEventId());
|
||||
Boolean firstTime = stringRedisTemplate.opsForValue()
|
||||
.setIfAbsent(dedupKey, "1", DEDUP_TTL_SECONDS, TimeUnit.SECONDS);
|
||||
|
||||
@@ -116,7 +120,7 @@ public class CleanOrderCreateEventHandler implements RocketMQListener<String> {
|
||||
}
|
||||
|
||||
// 3. 业务处理
|
||||
handleOrderCreate(event);
|
||||
executeInTenantContext(event.getTenantId(), () -> handleOrderCreate(event));
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("[CleanOrderCreateEventHandler] 消息处理失败: message={}", message, e);
|
||||
@@ -505,4 +509,13 @@ public class CleanOrderCreateEventHandler implements RocketMQListener<String> {
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
private void executeInTenantContext(Long tenantId, Runnable runnable) {
|
||||
Long currentTenantId = TenantContextHolder.getTenantId();
|
||||
if (tenantId == null || Objects.equals(currentTenantId, tenantId)) {
|
||||
runnable.run();
|
||||
return;
|
||||
}
|
||||
TenantUtils.execute(tenantId, runnable);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package com.viewsh.module.ops.environment.integration.consumer;
|
||||
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
import com.viewsh.module.ops.infrastructure.redis.OpsRedisKeyBuilder;
|
||||
import jakarta.annotation.Resource;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.data.redis.core.StringRedisTemplate;
|
||||
@@ -19,8 +20,6 @@ import java.time.Duration;
|
||||
@Service
|
||||
public class IntegrationEventDeduplicationService {
|
||||
|
||||
private static final String DEDUP_KEY_PREFIX = "integration:event:dedup:";
|
||||
|
||||
@Resource
|
||||
private StringRedisTemplate stringRedisTemplate;
|
||||
|
||||
@@ -36,7 +35,7 @@ public class IntegrationEventDeduplicationService {
|
||||
return false;
|
||||
}
|
||||
|
||||
String key = DEDUP_KEY_PREFIX + eventId;
|
||||
String key = OpsRedisKeyBuilder.eventDedup("integration", eventId);
|
||||
|
||||
// setNX:如果 key 不存在则设置,返回 true
|
||||
Boolean success = stringRedisTemplate.opsForValue()
|
||||
@@ -60,7 +59,7 @@ public class IntegrationEventDeduplicationService {
|
||||
return;
|
||||
}
|
||||
|
||||
String key = DEDUP_KEY_PREFIX + eventId;
|
||||
String key = OpsRedisKeyBuilder.eventDedup("integration", eventId);
|
||||
stringRedisTemplate.opsForValue().set(key, "1", Duration.ofHours(24));
|
||||
}
|
||||
|
||||
@@ -75,7 +74,7 @@ public class IntegrationEventDeduplicationService {
|
||||
return false;
|
||||
}
|
||||
|
||||
String key = DEDUP_KEY_PREFIX + eventId;
|
||||
String key = OpsRedisKeyBuilder.eventDedup("integration", eventId);
|
||||
Boolean exists = stringRedisTemplate.hasKey(key);
|
||||
return Boolean.TRUE.equals(exists);
|
||||
}
|
||||
|
||||
@@ -48,7 +48,9 @@ public class CleanOrderArriveEventDTO {
|
||||
/**
|
||||
* 区域ID
|
||||
*/
|
||||
private Long areaId;
|
||||
private Long areaId;
|
||||
|
||||
private Long tenantId;
|
||||
|
||||
/**
|
||||
* 触发来源(IOT_BEACON=蓝牙信标检测)
|
||||
|
||||
@@ -63,7 +63,9 @@ public class CleanOrderAuditEventDTO {
|
||||
/**
|
||||
* 区域ID
|
||||
*/
|
||||
private Long areaId;
|
||||
private Long areaId;
|
||||
|
||||
private Long tenantId;
|
||||
|
||||
/**
|
||||
* 消息内容
|
||||
|
||||
@@ -48,7 +48,9 @@ public class CleanOrderCompleteEventDTO {
|
||||
/**
|
||||
* 区域ID
|
||||
*/
|
||||
private Long areaId;
|
||||
private Long areaId;
|
||||
|
||||
private Long tenantId;
|
||||
|
||||
/**
|
||||
* 触发来源(IOT_SIGNAL_LOSS=信号丢失超时)
|
||||
|
||||
@@ -53,7 +53,9 @@ public class CleanOrderCreateEventDTO {
|
||||
/**
|
||||
* 优先级(0=P0紧急 1=P1重要 2=P2普通)
|
||||
*/
|
||||
private Integer priority;
|
||||
private Integer priority;
|
||||
|
||||
private Long tenantId;
|
||||
|
||||
/**
|
||||
* 触发数据(JSON 格式的附加信息)
|
||||
|
||||
Reference in New Issue
Block a user