diff --git a/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/consumer/BadgeDeviceStatusEventHandler.java b/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/consumer/BadgeDeviceStatusEventHandler.java index fa65c52..283f3ce 100644 --- a/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/consumer/BadgeDeviceStatusEventHandler.java +++ b/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/consumer/BadgeDeviceStatusEventHandler.java @@ -1,195 +1,128 @@ -package com.viewsh.module.ops.environment.integration.consumer; - -import com.fasterxml.jackson.databind.ObjectMapper; -import com.viewsh.module.ops.dal.dataobject.area.OpsAreaDeviceRelationDO; -import com.viewsh.module.ops.enums.BadgeDeviceStatusEnum; -import com.viewsh.module.ops.environment.integration.dto.IotDeviceStatusChangedEventDTO; -import com.viewsh.module.ops.environment.service.badge.BadgeDeviceStatusService; -import com.viewsh.module.ops.service.area.AreaDeviceService; -import jakarta.annotation.Resource; -import lombok.extern.slf4j.Slf4j; -import org.apache.rocketmq.spring.annotation.ConsumeMode; -import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; -import org.apache.rocketmq.spring.core.RocketMQListener; -import org.springframework.data.redis.core.StringRedisTemplate; -import org.springframework.stereotype.Component; - -import java.util.concurrent.TimeUnit; - -/** - * 工牌设备状态变更事件消费者 - *

- * 监听 IoT 模块发布的设备状态变更事件(上线/离线) - *

- * 调用链路: - * - *

- * IoT 设备连接/断开
- *     ↓
- * IotDeviceServiceImpl.updateDeviceState()
- *     ↓
- * RocketMQ Topic: integration-device-status
- *     ↓
- * BadgeDeviceStatusEventHandler.onMessage()
- *     - 幂等性检查
- *     - 判断是否为工牌设备
- *     ↓
- * badgeDeviceStatusService.updateBadgeOnlineStatus()
- *     - 创建/更新 Redis 状态记录
- * 
- * - * @author AI - */ -@Slf4j -@Component -@RocketMQMessageListener( - topic = "integration-device-status", - consumerGroup = "ops-badge-status-consumer", - consumeMode = ConsumeMode.CONCURRENTLY, - selectorExpression = "*", - accessKey = "${rocketmq.consumer.access-key:}", - secretKey = "${rocketmq.consumer.secret-key:}" -) -public class BadgeDeviceStatusEventHandler implements RocketMQListener { - - /** - * 幂等性控制 Key 模式 - */ - private static final String DEDUP_KEY_PATTERN = "ops:badge:dedup:status:%s"; - - /** - * 幂等性控制 TTL(秒) - */ - private static final int DEDUP_TTL_SECONDS = 120; - - @Resource - private ObjectMapper objectMapper; - - @Resource - private StringRedisTemplate stringRedisTemplate; - - @Resource - private BadgeDeviceStatusService badgeDeviceStatusService; - - @Resource - private AreaDeviceService areaDeviceService; - - @Override - public void onMessage(String message) { - try { - // 1. JSON 反序列化 - IotDeviceStatusChangedEventDTO event = objectMapper.readValue(message, - IotDeviceStatusChangedEventDTO.class); - - log.debug("[BadgeDeviceStatusEventHandler] 收到设备状态变更事件: eventId={}, deviceId={}, {} -> {}", - event.getEventId(), event.getDeviceId(), event.getOldStatus(), event.getNewStatus()); - - // 2. 幂等性检查 - String dedupKey = String.format(DEDUP_KEY_PATTERN, event.getEventId()); - Boolean firstTime = stringRedisTemplate.opsForValue() - .setIfAbsent(dedupKey, "1", DEDUP_TTL_SECONDS, TimeUnit.SECONDS); - - if (Boolean.FALSE.equals(firstTime)) { - log.debug("[BadgeDeviceStatusEventHandler] 重复消息,跳过处理: eventId={}", event.getEventId()); - return; - } - - // 3. 判断是否为工牌设备 - if (!isBadgeDevice(event.getDeviceId())) { - log.debug("[BadgeDeviceStatusEventHandler] 非工牌设备,跳过处理: deviceId={}", event.getDeviceId()); - return; - } - - // 4. 处理状态变更 - handleDeviceStatusChange(event); - - } catch (Exception e) { - log.error("[BadgeDeviceStatusEventHandler] 消息处理失败: message={}", message, e); - // 不抛出异常,避免消息重试导致重复处理 - } - } - - /** - * 处理设备状态变更 - */ - private void handleDeviceStatusChange(IotDeviceStatusChangedEventDTO event) { - Long deviceId = event.getDeviceId(); - String deviceCode = event.getDeviceName(); - String nickname = event.getNickname(); - - // 获取设备所属区域 - Long areaId = getAreaIdByDeviceId(deviceId); - - if (event.isOnline()) { - // 设备上线 - log.info("[BadgeDeviceStatusEventHandler] 工牌设备上线: deviceId={}, deviceCode={}, nickname={}, areaId={}", - deviceId, deviceCode, nickname, areaId); - - badgeDeviceStatusService.updateBadgeOnlineStatus( - deviceId, - deviceCode, - nickname, - areaId, - BadgeDeviceStatusEnum.IDLE, - "设备上线"); - - } else if (event.isOffline()) { - // 设备离线 - log.info("[BadgeDeviceStatusEventHandler] 工牌设备离线: deviceId={}, deviceCode={}", - deviceId, deviceCode); - - badgeDeviceStatusService.updateBadgeOnlineStatus( - deviceId, - deviceCode, - nickname, - null, - BadgeDeviceStatusEnum.OFFLINE, - "设备离线"); - - } else { - // 其他状态(如 INACTIVE),暂不处理 - log.debug("[BadgeDeviceStatusEventHandler] 忽略状态变更: deviceId={}, newStatus={}", - deviceId, event.getNewStatus()); - } - } - - /** - * 判断是否为工牌设备 - *

- * 通过查询 ops_area_device_relation 表判断设备是否为 BADGE 类型 - */ - private boolean isBadgeDevice(Long deviceId) { - if (deviceId == null) { - return false; - } - - try { - // 通过区域设备关系判断是否为工牌设备 - OpsAreaDeviceRelationDO relation = areaDeviceService.getByDeviceId(deviceId); - return relation != null && "BADGE".equals(relation.getRelationType()); - } catch (Exception e) { - log.warn("[BadgeDeviceStatusEventHandler] 查询设备类型失败: deviceId={}", deviceId, e); - return false; - } - } - - /** - * 获取设备所属区域ID - */ - private Long getAreaIdByDeviceId(Long deviceId) { - if (deviceId == null) { - return null; - } - - try { - OpsAreaDeviceRelationDO relation = areaDeviceService.getByDeviceId(deviceId); - if (relation != null && "BADGE".equals(relation.getRelationType())) { - return relation.getAreaId(); - } - return null; - } catch (Exception e) { - log.warn("[BadgeDeviceStatusEventHandler] 获取设备区域失败: deviceId={}", deviceId, e); - return null; - } - } -} +package com.viewsh.module.ops.environment.integration.consumer; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.viewsh.framework.tenant.core.context.TenantContextHolder; +import com.viewsh.framework.tenant.core.util.TenantUtils; +import com.viewsh.module.ops.dal.dataobject.area.OpsAreaDeviceRelationDO; +import com.viewsh.module.ops.enums.BadgeDeviceStatusEnum; +import com.viewsh.module.ops.environment.integration.dto.IotDeviceStatusChangedEventDTO; +import com.viewsh.module.ops.environment.service.badge.BadgeDeviceStatusService; +import com.viewsh.module.ops.infrastructure.redis.OpsRedisKeyBuilder; +import com.viewsh.module.ops.service.area.AreaDeviceService; +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.spring.annotation.ConsumeMode; +import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; +import org.apache.rocketmq.spring.core.RocketMQListener; +import org.springframework.data.redis.core.StringRedisTemplate; +import org.springframework.stereotype.Component; + +import java.util.Objects; +import java.util.concurrent.TimeUnit; + +@Slf4j +@Component +@RocketMQMessageListener( + topic = "integration-device-status", + consumerGroup = "ops-badge-status-consumer", + consumeMode = ConsumeMode.CONCURRENTLY, + selectorExpression = "*", + accessKey = "${rocketmq.consumer.access-key:}", + secretKey = "${rocketmq.consumer.secret-key:}" +) +public class BadgeDeviceStatusEventHandler implements RocketMQListener { + + private static final int DEDUP_TTL_SECONDS = 120; + + @Resource + private ObjectMapper objectMapper; + + @Resource + private StringRedisTemplate stringRedisTemplate; + + @Resource + private BadgeDeviceStatusService badgeDeviceStatusService; + + @Resource + private AreaDeviceService areaDeviceService; + + @Override + public void onMessage(String message) { + try { + IotDeviceStatusChangedEventDTO event = objectMapper.readValue(message, IotDeviceStatusChangedEventDTO.class); + log.debug("[BadgeDeviceStatusEventHandler] 收到设备状态变更事件: eventId={}, deviceId={}, {} -> {}", + event.getEventId(), event.getDeviceId(), event.getOldStatus(), event.getNewStatus()); + String dedupKey = OpsRedisKeyBuilder.badgeDedup(event.getTenantId(), event.getEventId()); + Boolean firstTime = stringRedisTemplate.opsForValue() + .setIfAbsent(dedupKey, "1", DEDUP_TTL_SECONDS, TimeUnit.SECONDS); + if (Boolean.FALSE.equals(firstTime)) { + log.debug("[BadgeDeviceStatusEventHandler] 重复消息,跳过处理: eventId={}", event.getEventId()); + return; + } + executeInTenantContext(event.getTenantId(), () -> handleDeviceStatusChange(event)); + } catch (Exception e) { + log.error("[BadgeDeviceStatusEventHandler] 消息处理失败: message={}", message, e); + } + } + + private void handleDeviceStatusChange(IotDeviceStatusChangedEventDTO event) { + if (!isBadgeDevice(event.getDeviceId())) { + log.debug("[BadgeDeviceStatusEventHandler] 非工牌设备,忽略状态同步: deviceId={}", event.getDeviceId()); + return; + } + + Long areaId = getAreaIdByDeviceId(event.getDeviceId()); + if (event.isOnline()) { + badgeDeviceStatusService.updateBadgeOnlineStatus( + event.getDeviceId(), event.getDeviceName(), event.getNickname(), areaId, + BadgeDeviceStatusEnum.IDLE, "设备上线"); + return; + } + if (event.isOffline()) { + badgeDeviceStatusService.updateBadgeOnlineStatus( + event.getDeviceId(), event.getDeviceName(), event.getNickname(), null, + BadgeDeviceStatusEnum.OFFLINE, "设备离线"); + return; + } + log.debug("[BadgeDeviceStatusEventHandler] 忽略未处理的状态变更: deviceId={}, newStatus={}", + event.getDeviceId(), event.getNewStatus()); + } + + private boolean isBadgeDevice(Long deviceId) { + if (deviceId == null) { + return false; + } + try { + OpsAreaDeviceRelationDO relation = areaDeviceService.getByDeviceId(deviceId); + return relation != null && "BADGE".equals(relation.getRelationType()); + } catch (Exception e) { + log.warn("[BadgeDeviceStatusEventHandler] 查询设备绑定关系失败: deviceId={}", deviceId, e); + return false; + } + } + + private Long getAreaIdByDeviceId(Long deviceId) { + if (deviceId == null) { + return null; + } + try { + OpsAreaDeviceRelationDO relation = areaDeviceService.getByDeviceId(deviceId); + if (relation != null && "BADGE".equals(relation.getRelationType())) { + return relation.getAreaId(); + } + return null; + } catch (Exception e) { + log.warn("[BadgeDeviceStatusEventHandler] 查询设备所属区域失败: deviceId={}", deviceId, e); + return null; + } + } + + private void executeInTenantContext(Long tenantId, Runnable runnable) { + Long currentTenantId = TenantContextHolder.getTenantId(); + if (tenantId == null || Objects.equals(currentTenantId, tenantId)) { + runnable.run(); + return; + } + TenantUtils.execute(tenantId, runnable); + } +} diff --git a/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/consumer/CleanOrderArriveEventHandler.java b/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/consumer/CleanOrderArriveEventHandler.java index fa891b3..7fb6e99 100644 --- a/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/consumer/CleanOrderArriveEventHandler.java +++ b/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/consumer/CleanOrderArriveEventHandler.java @@ -1,160 +1,118 @@ -package com.viewsh.module.ops.environment.integration.consumer; - -import com.fasterxml.jackson.databind.ObjectMapper; -import com.viewsh.module.ops.core.lifecycle.OrderLifecycleManager; -import com.viewsh.module.ops.core.lifecycle.model.OrderTransitionRequest; -import com.viewsh.module.ops.dal.dataobject.workorder.OpsOrderDO; -import com.viewsh.module.ops.dal.mysql.workorder.OpsOrderMapper; -import com.viewsh.module.ops.enums.OperatorTypeEnum; -import com.viewsh.module.ops.enums.WorkOrderStatusEnum; -import com.viewsh.module.ops.environment.integration.dto.CleanOrderArriveEventDTO; -import jakarta.annotation.Resource; -import lombok.extern.slf4j.Slf4j; -import org.apache.rocketmq.spring.annotation.ConsumeMode; -import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; -import org.apache.rocketmq.spring.core.RocketMQListener; -import org.springframework.data.redis.core.StringRedisTemplate; -import org.springframework.stereotype.Component; - -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.TimeUnit; - -/** - * 保洁工单到岗事件消费者 - *

- * 订阅 IoT 模块发布的保洁工单到岗事件 - *

- * 调用链路: - *

- * IoT 发布 RocketMQ 消息 (ops-order-arrive)
- *     ↓
- * CleanOrderArriveEventHandler.onMessage()
- *     - 幂等性检查
- *     - 状态检查
- *     ↓
- * orderLifecycleManager.transition(ARRIVED) [@Transactional]
- *     - 更新工单状态
- *     - 发布 OrderStateChangedEvent
- *     ↓
- * 事务提交
- *     ↓
- * CleanOrderEventListener.handleArrived() [@Async, AFTER_COMMIT]
- *     - 记录到岗时间到扩展表
- *     - 更新设备工单关联信息(areaId, beaconMac)
- *     - 记录业务日志
- * 
- *

- * 设计说明: - * - Handler 只负责消息接收和状态转换 - * - 业务日志、设备缓存更新由 CleanOrderEventListener 在事务提交后处理 - * - BadgeDeviceStatusServiceImpl 在 BEFORE_COMMIT 阶段跳过 ARRIVED(由 Listener 处理完整信息) - * - * @author AI - */ -@Slf4j -@Component -@RocketMQMessageListener( - topic = "ops-order-arrive", - consumerGroup = "ops-clean-order-arrive-group", - consumeMode = ConsumeMode.CONCURRENTLY, - selectorExpression = "*", - accessKey = "${rocketmq.consumer.access-key:}", - secretKey = "${rocketmq.consumer.secret-key:}" -) -public class CleanOrderArriveEventHandler implements RocketMQListener { - - /** - * 幂等性控制 Key 模式 - */ - private static final String DEDUP_KEY_PATTERN = "ops:clean:dedup:arrive:%s"; - - /** - * 幂等性控制 TTL(秒) - */ - private static final int DEDUP_TTL_SECONDS = 300; - - @Resource - private ObjectMapper objectMapper; - - @Resource - private StringRedisTemplate stringRedisTemplate; - - @Resource - private OpsOrderMapper opsOrderMapper; - - @Resource - private OrderLifecycleManager orderLifecycleManager; - - @Override - public void onMessage(String message) { - try { - // 1. JSON 反序列化 - CleanOrderArriveEventDTO event = objectMapper.readValue(message, CleanOrderArriveEventDTO.class); - - // 2. 幂等性检查 - String dedupKey = String.format(DEDUP_KEY_PATTERN, event.getEventId()); - Boolean firstTime = stringRedisTemplate.opsForValue() - .setIfAbsent(dedupKey, "1", DEDUP_TTL_SECONDS, TimeUnit.SECONDS); - - if (!firstTime) { - log.debug("[CleanOrderArriveEventHandler] 重复消息,跳过处理: eventId={}", event.getEventId()); - return; - } - - // 3. 业务处理 - handleOrderArrive(event); - - } catch (Exception e) { - log.error("[CleanOrderArriveEventHandler] 消息处理失败: message={}", message, e); - throw new RuntimeException("保洁工单到岗事件处理失败", e); - } - } - - /** - * 处理工单到岗 - */ - private void handleOrderArrive(CleanOrderArriveEventDTO event) { - log.info("[CleanOrderArriveEventHandler] 收到到岗事件: eventId={}, orderId={}, deviceId={}, areaId={}", - event.getEventId(), event.getOrderId(), event.getDeviceId(), event.getAreaId()); - - // 1. 查询工单 - OpsOrderDO order = opsOrderMapper.selectById(event.getOrderId()); - if (order == null) { - log.warn("[CleanOrderArriveEventHandler] 工单不存在: orderId={}", event.getOrderId()); - return; - } - - // 2. 检查工单状态 - WorkOrderStatusEnum currentStatus = WorkOrderStatusEnum.fromStatus(order.getStatus()); - if (!currentStatus.canStartWorking()) { - log.warn("[CleanOrderArriveEventHandler] 工单状态不允许到岗: orderId={}, status={}", - event.getOrderId(), order.getStatus()); - return; - } - - // 3. 构建状态转换请求(包含完整信息,供 Listener 使用) - Map payload = new HashMap<>(); - payload.put("deviceId", event.getDeviceId()); - payload.put("deviceKey", event.getDeviceKey()); - payload.put("areaId", event.getAreaId()); - payload.put("triggerSource", event.getTriggerSource()); - if (event.getTriggerData() != null) { - payload.putAll(event.getTriggerData()); - } - - OrderTransitionRequest request = OrderTransitionRequest.builder() - .orderId(event.getOrderId()) - .targetStatus(WorkOrderStatusEnum.ARRIVED) - .operatorType(OperatorTypeEnum.SYSTEM) - .reason("蓝牙信标自动到岗确认") - .payload(payload) - .build(); - // 4. 执行状态转换 - // 注意:业务日志和设备缓存更新由 CleanOrderEventListener 在 AFTER_COMMIT 阶段处理 - orderLifecycleManager.transition(request); - - log.info("[CleanOrderArriveEventHandler] 工单到岗成功: eventId={}, orderId={}, deviceId={}", - event.getEventId(), event.getOrderId(), event.getDeviceId()); - } -} +package com.viewsh.module.ops.environment.integration.consumer; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.viewsh.framework.tenant.core.context.TenantContextHolder; +import com.viewsh.framework.tenant.core.util.TenantUtils; +import com.viewsh.module.ops.core.lifecycle.OrderLifecycleManager; +import com.viewsh.module.ops.core.lifecycle.model.OrderTransitionRequest; +import com.viewsh.module.ops.dal.dataobject.workorder.OpsOrderDO; +import com.viewsh.module.ops.dal.mysql.workorder.OpsOrderMapper; +import com.viewsh.module.ops.enums.OperatorTypeEnum; +import com.viewsh.module.ops.enums.WorkOrderStatusEnum; +import com.viewsh.module.ops.environment.integration.dto.CleanOrderArriveEventDTO; +import com.viewsh.module.ops.infrastructure.redis.OpsRedisKeyBuilder; +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.spring.annotation.ConsumeMode; +import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; +import org.apache.rocketmq.spring.core.RocketMQListener; +import org.springframework.data.redis.core.StringRedisTemplate; +import org.springframework.stereotype.Component; + +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.TimeUnit; + +@Slf4j +@Component +@RocketMQMessageListener( + topic = "ops-order-arrive", + consumerGroup = "ops-clean-order-arrive-group", + consumeMode = ConsumeMode.CONCURRENTLY, + selectorExpression = "*", + accessKey = "${rocketmq.consumer.access-key:}", + secretKey = "${rocketmq.consumer.secret-key:}" +) +public class CleanOrderArriveEventHandler implements RocketMQListener { + + private static final int DEDUP_TTL_SECONDS = 300; + + @Resource + private ObjectMapper objectMapper; + + @Resource + private StringRedisTemplate stringRedisTemplate; + + @Resource + private OpsOrderMapper opsOrderMapper; + + @Resource + private OrderLifecycleManager orderLifecycleManager; + + @Override + public void onMessage(String message) { + try { + CleanOrderArriveEventDTO event = objectMapper.readValue(message, CleanOrderArriveEventDTO.class); + String dedupKey = OpsRedisKeyBuilder.eventDedup(event.getTenantId(), "arrive", event.getEventId()); + Boolean firstTime = stringRedisTemplate.opsForValue() + .setIfAbsent(dedupKey, "1", DEDUP_TTL_SECONDS, TimeUnit.SECONDS); + if (!Boolean.TRUE.equals(firstTime)) { + log.debug("[CleanOrderArriveEventHandler] 重复消息,跳过处理: eventId={}", event.getEventId()); + return; + } + executeInTenantContext(event.getTenantId(), () -> handleOrderArrive(event)); + } catch (Exception e) { + log.error("[CleanOrderArriveEventHandler] 消息处理失败: message={}", message, e); + throw new RuntimeException("保洁工单到岗事件处理失败", e); + } + } + + private void handleOrderArrive(CleanOrderArriveEventDTO event) { + log.info("[CleanOrderArriveEventHandler] 收到到岗事件: eventId={}, orderId={}, deviceId={}, areaId={}", + event.getEventId(), event.getOrderId(), event.getDeviceId(), event.getAreaId()); + + OpsOrderDO order = opsOrderMapper.selectById(event.getOrderId()); + if (order == null) { + log.warn("[CleanOrderArriveEventHandler] 工单不存在: orderId={}", event.getOrderId()); + return; + } + + WorkOrderStatusEnum currentStatus = WorkOrderStatusEnum.fromStatus(order.getStatus()); + if (!currentStatus.canStartWorking()) { + log.warn("[CleanOrderArriveEventHandler] 当前状态不允许到岗: orderId={}, status={}", + event.getOrderId(), order.getStatus()); + return; + } + + Map payload = new HashMap<>(); + payload.put("deviceId", event.getDeviceId()); + payload.put("deviceKey", event.getDeviceKey()); + payload.put("areaId", event.getAreaId()); + payload.put("triggerSource", event.getTriggerSource()); + if (event.getTriggerData() != null) { + payload.putAll(event.getTriggerData()); + } + + OrderTransitionRequest request = OrderTransitionRequest.builder() + .orderId(event.getOrderId()) + .targetStatus(WorkOrderStatusEnum.ARRIVED) + .operatorType(OperatorTypeEnum.SYSTEM) + .reason("设备触发自动到岗") + .payload(payload) + .build(); + orderLifecycleManager.transition(request); + + log.info("[CleanOrderArriveEventHandler] 工单到岗完成: eventId={}, orderId={}, deviceId={}", + event.getEventId(), event.getOrderId(), event.getDeviceId()); + } + + private void executeInTenantContext(Long tenantId, Runnable runnable) { + Long currentTenantId = TenantContextHolder.getTenantId(); + if (tenantId == null || Objects.equals(currentTenantId, tenantId)) { + runnable.run(); + return; + } + TenantUtils.execute(tenantId, runnable); + } +} diff --git a/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/consumer/CleanOrderAuditEventHandler.java b/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/consumer/CleanOrderAuditEventHandler.java index 051bfb6..93f26ae 100644 --- a/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/consumer/CleanOrderAuditEventHandler.java +++ b/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/consumer/CleanOrderAuditEventHandler.java @@ -1,6 +1,8 @@ package com.viewsh.module.ops.environment.integration.consumer; -import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.viewsh.framework.tenant.core.context.TenantContextHolder; +import com.viewsh.framework.tenant.core.util.TenantUtils; import com.viewsh.framework.mybatis.core.query.LambdaQueryWrapperX; import com.viewsh.module.ops.dal.dataobject.workorder.OpsOrderDO; import com.viewsh.module.ops.dal.mysql.workorder.OpsOrderMapper; @@ -13,8 +15,9 @@ import com.viewsh.module.ops.infrastructure.log.enumeration.EventDomain; import com.viewsh.module.ops.infrastructure.log.enumeration.EventLevel; import com.viewsh.module.ops.infrastructure.log.enumeration.LogModule; import com.viewsh.module.ops.infrastructure.log.enumeration.LogType; -import com.viewsh.module.ops.infrastructure.log.recorder.EventLogRecord; -import com.viewsh.module.ops.infrastructure.log.recorder.EventLogRecorder; +import com.viewsh.module.ops.infrastructure.log.recorder.EventLogRecord; +import com.viewsh.module.ops.infrastructure.log.recorder.EventLogRecorder; +import com.viewsh.module.ops.infrastructure.redis.OpsRedisKeyBuilder; import jakarta.annotation.Resource; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.spring.annotation.ConsumeMode; @@ -23,8 +26,9 @@ import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.stereotype.Component; -import java.util.Arrays; -import java.util.concurrent.TimeUnit; +import java.util.Arrays; +import java.util.Objects; +import java.util.concurrent.TimeUnit; /** * 保洁工单审计事件消费者 @@ -84,7 +88,7 @@ public class CleanOrderAuditEventHandler implements RocketMQListener { CleanOrderAuditEventDTO event = objectMapper.readValue(message, CleanOrderAuditEventDTO.class); // 2. 幂等性检查 - String dedupKey = String.format(DEDUP_KEY_PATTERN, event.getEventId()); + String dedupKey = OpsRedisKeyBuilder.eventDedup(event.getTenantId(), "audit", event.getEventId()); Boolean firstTime = stringRedisTemplate.opsForValue() .setIfAbsent(dedupKey, "1", DEDUP_TTL_SECONDS, TimeUnit.SECONDS); @@ -94,7 +98,7 @@ public class CleanOrderAuditEventHandler implements RocketMQListener { } // 3. 业务处理 - handleAuditEvent(event); + executeInTenantContext(event.getTenantId(), () -> handleAuditEvent(event)); } catch (Exception e) { log.error("[CleanOrderAuditEventHandler] 消息处理失败: message={}", message, e); @@ -256,5 +260,14 @@ public class CleanOrderAuditEventHandler implements RocketMQListener { return EventLevel.WARN; } return EventLevel.INFO; - } -} + } + + private void executeInTenantContext(Long tenantId, Runnable runnable) { + Long currentTenantId = TenantContextHolder.getTenantId(); + if (tenantId == null || Objects.equals(currentTenantId, tenantId)) { + runnable.run(); + return; + } + TenantUtils.execute(tenantId, runnable); + } +} diff --git a/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/consumer/CleanOrderCompleteEventHandler.java b/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/consumer/CleanOrderCompleteEventHandler.java index 30ad827..ffc8529 100644 --- a/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/consumer/CleanOrderCompleteEventHandler.java +++ b/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/consumer/CleanOrderCompleteEventHandler.java @@ -1,191 +1,130 @@ -package com.viewsh.module.ops.environment.integration.consumer; - -import com.fasterxml.jackson.databind.ObjectMapper; -import com.viewsh.module.ops.core.lifecycle.OrderLifecycleManager; -import com.viewsh.module.ops.core.lifecycle.model.OrderTransitionRequest; -import com.viewsh.module.ops.dal.dataobject.workorder.OpsOrderDO; -import com.viewsh.module.ops.dal.mysql.workorder.OpsOrderMapper; -import com.viewsh.module.ops.enums.OperatorTypeEnum; -import com.viewsh.module.ops.enums.WorkOrderStatusEnum; -import com.viewsh.module.ops.environment.integration.dto.CleanOrderCompleteEventDTO; -import jakarta.annotation.Resource; -import lombok.extern.slf4j.Slf4j; -import org.apache.rocketmq.spring.annotation.ConsumeMode; -import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; -import org.apache.rocketmq.spring.core.RocketMQListener; -import org.springframework.data.redis.core.StringRedisTemplate; -import org.springframework.stereotype.Component; - -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.TimeUnit; - -/** - * 保洁工单完成事件消费者 - *

- * 订阅 IoT 模块发布的保洁工单完成事件 - *

- * 调用链路: - * - *

- * IoT 发布 RocketMQ 消息 (ops-order-complete)
- *     ↓
- * CleanOrderCompleteEventHandler.onMessage()
- *     - 幂等性检查
- *     - 状态检查
- *     ↓
- * orderLifecycleManager.transition(COMPLETED) [@Transactional]
- *     - 更新工单状态
- *     - 发布 OrderStateChangedEvent
- *     ↓
- * 事务提交
- *     ↓
- * BadgeDeviceStatusEventListener.onOrderStateChanged() [BEFORE_COMMIT]
- *     - 清除设备工单关联
- *     - 检查等待任务,决定设备状态 (BUSY/IDLE)
- *     ↓
- * CleanOrderEventListener.handleCompleted() [AFTER_COMMIT]
- *     - 记录完成时间到扩展表
- *     - 记录业务日志
- * CleanOrderEventListener.onOrderCompleted() [AFTER_COMMIT]
- *     - 自动调度下一个任务
- *     - 发送完成通知
- * 
- *

- * 设计说明: - * - Handler 只负责消息接收和状态转换 - * - 设备状态由 BadgeDeviceStatusEventListener 在 BEFORE_COMMIT 阶段处理 - * - 业务日志、自动调度由 CleanOrderEventListener 在 AFTER_COMMIT 阶段处理 - * - * @author AI - */ -@Slf4j -@Component -@RocketMQMessageListener( - topic = "ops-order-complete", - consumerGroup = "ops-clean-order-complete-group", - consumeMode = ConsumeMode.CONCURRENTLY, - selectorExpression = "*", - accessKey = "${rocketmq.consumer.access-key:}", - secretKey = "${rocketmq.consumer.secret-key:}" -) -public class CleanOrderCompleteEventHandler implements RocketMQListener { - - /** - * 幂等性控制 Key 模式 - */ - private static final String DEDUP_KEY_PATTERN = "ops:clean:dedup:complete:%s"; - - /** - * 幂等性控制 TTL(秒) - */ - private static final int DEDUP_TTL_SECONDS = 300; - - @Resource - private ObjectMapper objectMapper; - - @Resource - private StringRedisTemplate stringRedisTemplate; - - @Resource - private OpsOrderMapper opsOrderMapper; - - @Resource - private OrderLifecycleManager orderLifecycleManager; - - @Override - public void onMessage(String message) { - try { - // 1. JSON 反序列化 - CleanOrderCompleteEventDTO event = objectMapper.readValue(message, CleanOrderCompleteEventDTO.class); - - // 2. 幂等性检查 - String dedupKey = String.format(DEDUP_KEY_PATTERN, event.getEventId()); - Boolean firstTime = stringRedisTemplate.opsForValue() - .setIfAbsent(dedupKey, "1", DEDUP_TTL_SECONDS, TimeUnit.SECONDS); - - if (!firstTime) { - log.debug("[CleanOrderCompleteEventHandler] 重复消息,跳过处理: eventId={}", event.getEventId()); - return; - } - - // 3. 业务处理 - handleOrderComplete(event); - - } catch (Exception e) { - log.error("[CleanOrderCompleteEventHandler] 消息处理失败: message={}", message, e); - throw new RuntimeException("保洁工单完成事件处理失败", e); - } - } - - /** - * 处理工单完成 - */ - private void handleOrderComplete(CleanOrderCompleteEventDTO event) { - log.info("[CleanOrderCompleteEventHandler] 收到完成事件: eventId={}, orderId={}, deviceId={}, areaId={}", - event.getEventId(), event.getOrderId(), event.getDeviceId(), event.getAreaId()); - - // 1. 查询工单 - OpsOrderDO order = opsOrderMapper.selectById(event.getOrderId()); - if (order == null) { - log.warn("[CleanOrderCompleteEventHandler] 工单不存在: orderId={}", event.getOrderId()); - return; - } - - // 2. 检查工单状态 - WorkOrderStatusEnum currentStatus = WorkOrderStatusEnum.fromStatus(order.getStatus()); - if (!currentStatus.canComplete()) { - log.warn("[CleanOrderCompleteEventHandler] 工单状态不允许完成: orderId={}, status={}", - event.getOrderId(), order.getStatus()); - return; - } - - // 3. 计算作业时长 - String remark = buildCompletionRemark(event); - - // 4. 构建状态转换请求 - Map payload = new HashMap<>(); - payload.put("deviceId", event.getDeviceId()); - payload.put("deviceKey", event.getDeviceKey()); - payload.put("areaId", event.getAreaId()); - payload.put("triggerSource", event.getTriggerSource()); - if (event.getTriggerData() != null) { - payload.putAll(event.getTriggerData()); - } - - OrderTransitionRequest request = OrderTransitionRequest.builder() - .orderId(event.getOrderId()) - .targetStatus(WorkOrderStatusEnum.COMPLETED) - .operatorType(OperatorTypeEnum.SYSTEM) - .assigneeId(event.getDeviceId()) // 设备ID用于后续自动调度 - .reason(remark) - .payload(payload) - .build(); - - // 5. 执行状态转换 - // 注意:设备状态、业务日志、自动调度由事件监听器处理 - orderLifecycleManager.transition(request); - - log.info("[CleanOrderCompleteEventHandler] 工单完成成功: eventId={}, orderId={}", - event.getEventId(), event.getOrderId()); - } - - /** - * 构建完成备注 - */ - private String buildCompletionRemark(CleanOrderCompleteEventDTO event) { - StringBuilder remark = new StringBuilder(); - remark.append("信号丢失超时自动完成"); - - if (event.getTriggerData() != null) { - Object durationMs = event.getTriggerData().get("durationMs"); - if (durationMs != null) { - long durationMinutes = ((Number) durationMs).longValue() / 60000; - remark.append(",作业时长: ").append(durationMinutes).append("分钟"); - } - } - - return remark.toString(); - } - -} +package com.viewsh.module.ops.environment.integration.consumer; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.viewsh.framework.tenant.core.context.TenantContextHolder; +import com.viewsh.framework.tenant.core.util.TenantUtils; +import com.viewsh.module.ops.core.lifecycle.OrderLifecycleManager; +import com.viewsh.module.ops.core.lifecycle.model.OrderTransitionRequest; +import com.viewsh.module.ops.dal.dataobject.workorder.OpsOrderDO; +import com.viewsh.module.ops.dal.mysql.workorder.OpsOrderMapper; +import com.viewsh.module.ops.enums.OperatorTypeEnum; +import com.viewsh.module.ops.enums.WorkOrderStatusEnum; +import com.viewsh.module.ops.environment.integration.dto.CleanOrderCompleteEventDTO; +import com.viewsh.module.ops.infrastructure.redis.OpsRedisKeyBuilder; +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.spring.annotation.ConsumeMode; +import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; +import org.apache.rocketmq.spring.core.RocketMQListener; +import org.springframework.data.redis.core.StringRedisTemplate; +import org.springframework.stereotype.Component; + +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.TimeUnit; + +@Slf4j +@Component +@RocketMQMessageListener( + topic = "ops-order-complete", + consumerGroup = "ops-clean-order-complete-group", + consumeMode = ConsumeMode.CONCURRENTLY, + selectorExpression = "*", + accessKey = "${rocketmq.consumer.access-key:}", + secretKey = "${rocketmq.consumer.secret-key:}" +) +public class CleanOrderCompleteEventHandler implements RocketMQListener { + + private static final int DEDUP_TTL_SECONDS = 300; + + @Resource + private ObjectMapper objectMapper; + + @Resource + private StringRedisTemplate stringRedisTemplate; + + @Resource + private OpsOrderMapper opsOrderMapper; + + @Resource + private OrderLifecycleManager orderLifecycleManager; + + @Override + public void onMessage(String message) { + try { + CleanOrderCompleteEventDTO event = objectMapper.readValue(message, CleanOrderCompleteEventDTO.class); + String dedupKey = OpsRedisKeyBuilder.eventDedup(event.getTenantId(), "complete", event.getEventId()); + Boolean firstTime = stringRedisTemplate.opsForValue() + .setIfAbsent(dedupKey, "1", DEDUP_TTL_SECONDS, TimeUnit.SECONDS); + if (!Boolean.TRUE.equals(firstTime)) { + log.debug("[CleanOrderCompleteEventHandler] 重复消息,跳过处理: eventId={}", event.getEventId()); + return; + } + executeInTenantContext(event.getTenantId(), () -> handleOrderComplete(event)); + } catch (Exception e) { + log.error("[CleanOrderCompleteEventHandler] 消息处理失败: message={}", message, e); + throw new RuntimeException("保洁工单完单事件处理失败", e); + } + } + + private void handleOrderComplete(CleanOrderCompleteEventDTO event) { + log.info("[CleanOrderCompleteEventHandler] 收到完单事件: eventId={}, orderId={}, deviceId={}, areaId={}", + event.getEventId(), event.getOrderId(), event.getDeviceId(), event.getAreaId()); + + OpsOrderDO order = opsOrderMapper.selectById(event.getOrderId()); + if (order == null) { + log.warn("[CleanOrderCompleteEventHandler] 工单不存在: orderId={}", event.getOrderId()); + return; + } + + WorkOrderStatusEnum currentStatus = WorkOrderStatusEnum.fromStatus(order.getStatus()); + if (!currentStatus.canComplete()) { + log.warn("[CleanOrderCompleteEventHandler] 当前状态不允许完单: orderId={}, status={}", + event.getOrderId(), order.getStatus()); + return; + } + + Map payload = new HashMap<>(); + payload.put("deviceId", event.getDeviceId()); + payload.put("deviceKey", event.getDeviceKey()); + payload.put("areaId", event.getAreaId()); + payload.put("triggerSource", event.getTriggerSource()); + if (event.getTriggerData() != null) { + payload.putAll(event.getTriggerData()); + } + + OrderTransitionRequest request = OrderTransitionRequest.builder() + .orderId(event.getOrderId()) + .targetStatus(WorkOrderStatusEnum.COMPLETED) + .operatorType(OperatorTypeEnum.SYSTEM) + .assigneeId(event.getDeviceId()) + .reason(buildCompletionRemark(event)) + .payload(payload) + .build(); + orderLifecycleManager.transition(request); + + log.info("[CleanOrderCompleteEventHandler] 工单完单完成: eventId={}, orderId={}", + event.getEventId(), event.getOrderId()); + } + + private String buildCompletionRemark(CleanOrderCompleteEventDTO event) { + StringBuilder remark = new StringBuilder("设备触发自动完单"); + if (event.getTriggerData() != null) { + Object durationMs = event.getTriggerData().get("durationMs"); + if (durationMs instanceof Number number) { + remark.append(",作业时长约 ").append(number.longValue() / 60000).append(" 分钟"); + } + } + return remark.toString(); + } + + private void executeInTenantContext(Long tenantId, Runnable runnable) { + Long currentTenantId = TenantContextHolder.getTenantId(); + if (tenantId == null || Objects.equals(currentTenantId, tenantId)) { + runnable.run(); + return; + } + TenantUtils.execute(tenantId, runnable); + } +} diff --git a/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/consumer/CleanOrderConfirmEventHandler.java b/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/consumer/CleanOrderConfirmEventHandler.java index 3979606..ddde5bf 100644 --- a/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/consumer/CleanOrderConfirmEventHandler.java +++ b/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/consumer/CleanOrderConfirmEventHandler.java @@ -1,6 +1,8 @@ package com.viewsh.module.ops.environment.integration.consumer; import com.fasterxml.jackson.databind.ObjectMapper; +import com.viewsh.framework.tenant.core.context.TenantContextHolder; +import com.viewsh.framework.tenant.core.util.TenantUtils; import com.viewsh.module.ops.core.lifecycle.OrderLifecycleManager; import com.viewsh.module.ops.core.lifecycle.model.OrderTransitionRequest; import com.viewsh.module.ops.dal.dataobject.workorder.OpsOrderDO; @@ -8,6 +10,7 @@ import com.viewsh.module.ops.dal.mysql.workorder.OpsOrderMapper; import com.viewsh.module.ops.enums.OperatorTypeEnum; import com.viewsh.module.ops.enums.WorkOrderStatusEnum; import com.viewsh.module.ops.environment.integration.dto.CleanOrderConfirmEventDTO; +import com.viewsh.module.ops.infrastructure.redis.OpsRedisKeyBuilder; import jakarta.annotation.Resource; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.spring.annotation.ConsumeMode; @@ -16,39 +19,9 @@ import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.stereotype.Component; +import java.util.Objects; import java.util.concurrent.TimeUnit; -/** - * 保洁工单确认事件消费者 - *

- * 订阅 IoT 模块发布的工单确认事件(如:工牌按键确认) - *

- * 调用链路: - *

- * IoT 发布 RocketMQ 消息 (ops-order-confirm)
- *     ↓
- * CleanOrderConfirmEventHandler.onMessage()
- *     - 幂等性检查
- *     - 状态检查
- *     ↓
- * orderLifecycleManager.transition(CONFIRMED) [@Transactional]
- *     - 更新工单状态
- *     - 发布 OrderStateChangedEvent
- *     ↓
- * 事务提交
- *     ↓
- * CleanOrderEventListener.onOrderStateChanged() [@TransactionalEventListener(AFTER_COMMIT)]
- *     - 记录业务日志
- *     - 发送 TTS "工单已确认,请前往作业区域开始作业"
- * 
- *

- * 设计说明: - * - Handler 只负责消息接收和状态转换 - * - 日志记录和通知发送由 CleanOrderEventListener 在事务提交后处理 - * - 设备状态由 BadgeDeviceStatusServiceImpl 在 BEFORE_COMMIT 阶段同步 - * - * @author AI - */ @Slf4j @Component @RocketMQMessageListener( @@ -60,7 +33,6 @@ import java.util.concurrent.TimeUnit; ) public class CleanOrderConfirmEventHandler implements RocketMQListener { - private static final String DEDUP_KEY_PATTERN = "ops:clean:dedup:confirm:%s"; private static final int DEDUP_TTL_SECONDS = 300; @Resource @@ -78,66 +50,61 @@ public class CleanOrderConfirmEventHandler implements RocketMQListener { @Override public void onMessage(String message) { try { - // 1. JSON 反序列化 CleanOrderConfirmEventDTO event = objectMapper.readValue(message, CleanOrderConfirmEventDTO.class); - String eventId = event.getEventId(); - - // 2. 幂等性检查 - String dedupKey = String.format(DEDUP_KEY_PATTERN, eventId); + String dedupKey = OpsRedisKeyBuilder.eventDedup(event.getTenantId(), "confirm", event.getEventId()); Boolean firstTime = stringRedisTemplate.opsForValue() .setIfAbsent(dedupKey, "1", DEDUP_TTL_SECONDS, TimeUnit.SECONDS); - if (!Boolean.TRUE.equals(firstTime)) { - log.debug("[CleanOrderConfirmEventHandler] 重复事件,跳过处理: eventId={}", eventId); + log.debug("[CleanOrderConfirmEventHandler] 重复消息,跳过处理: eventId={}", event.getEventId()); return; } - - // 3. 加载工单 - Long orderId = event.getOrderId(); - OpsOrderDO order = opsOrderMapper.selectById(orderId); - if (order == null) { - log.warn("[CleanOrderConfirmEventHandler] 工单不存在: orderId={}", orderId); - return; - } - - WorkOrderStatusEnum currentStatus = WorkOrderStatusEnum.fromStatus(order.getStatus()); - log.info("[CleanOrderConfirmEventHandler] 收到确认事件: eventId={}, orderId={}, currentStatus={}, deviceId={}", - eventId, orderId, currentStatus, event.getDeviceId()); - - // 4. 状态检查 - // 如果已在进行中 (CONFIRMED or ARRIVED),直接返回(TTS 由 Listener 处理) - if (currentStatus == WorkOrderStatusEnum.CONFIRMED || currentStatus == WorkOrderStatusEnum.ARRIVED) { - log.debug("[CleanOrderConfirmEventHandler] 工单已在进行中: orderId={}, status={}", orderId, currentStatus); - return; - } - - // 检查是否可以确认 - if (!currentStatus.canConfirm()) { - log.warn("[CleanOrderConfirmEventHandler] 当前状态无法确认工单: orderId={}, status={}", orderId, currentStatus); - return; - } - - // 5. 构建状态转换请求(包含 deviceId,供 Listener 使用) - OrderTransitionRequest request = OrderTransitionRequest.builder() - .orderId(orderId) - .targetStatus(WorkOrderStatusEnum.CONFIRMED) - .reason("工牌按键确认") - .operatorType(OperatorTypeEnum.CLEANER) - .operatorId(event.getDeviceId() != null ? event.getDeviceId() : order.getAssigneeId()) - .build(); - // 将 deviceId 放入 payload,供 Listener 使用 - request.putPayload("deviceId", event.getDeviceId()); - request.putPayload("triggerSource", "BADGE_BUTTON"); - - // 6. 执行状态转换 - // 注意:日志记录和 TTS 由 CleanOrderEventListener 在 AFTER_COMMIT 阶段处理 - orderLifecycleManager.transition(request); - - log.info("[CleanOrderConfirmEventHandler] 工单确认成功: orderId={}, deviceId={}", orderId, event.getDeviceId()); - + executeInTenantContext(event.getTenantId(), () -> handleConfirmEvent(event)); } catch (Exception e) { log.error("[CleanOrderConfirmEventHandler] 消息处理失败: message={}", message, e); throw new RuntimeException("保洁工单确认事件处理失败", e); } } + + private void handleConfirmEvent(CleanOrderConfirmEventDTO event) { + Long orderId = event.getOrderId(); + OpsOrderDO order = opsOrderMapper.selectById(orderId); + if (order == null) { + log.warn("[CleanOrderConfirmEventHandler] 工单不存在: orderId={}", orderId); + return; + } + + WorkOrderStatusEnum currentStatus = WorkOrderStatusEnum.fromStatus(order.getStatus()); + log.info("[CleanOrderConfirmEventHandler] 收到确认事件: eventId={}, orderId={}, currentStatus={}, deviceId={}", + event.getEventId(), orderId, currentStatus, event.getDeviceId()); + if (currentStatus == WorkOrderStatusEnum.CONFIRMED || currentStatus == WorkOrderStatusEnum.ARRIVED) { + log.debug("[CleanOrderConfirmEventHandler] 工单已确认或已到岗,跳过处理: orderId={}, status={}", orderId, currentStatus); + return; + } + if (!currentStatus.canConfirm()) { + log.warn("[CleanOrderConfirmEventHandler] 当前状态不允许确认: orderId={}, status={}", orderId, currentStatus); + return; + } + + OrderTransitionRequest request = OrderTransitionRequest.builder() + .orderId(orderId) + .targetStatus(WorkOrderStatusEnum.CONFIRMED) + .reason("设备确认工单") + .operatorType(OperatorTypeEnum.CLEANER) + .operatorId(event.getDeviceId() != null ? event.getDeviceId() : order.getAssigneeId()) + .build(); + request.putPayload("deviceId", event.getDeviceId()); + request.putPayload("triggerSource", "BADGE_BUTTON"); + orderLifecycleManager.transition(request); + + log.info("[CleanOrderConfirmEventHandler] 工单确认完成: orderId={}, deviceId={}", orderId, event.getDeviceId()); + } + + private void executeInTenantContext(Long tenantId, Runnable runnable) { + Long currentTenantId = TenantContextHolder.getTenantId(); + if (tenantId == null || Objects.equals(currentTenantId, tenantId)) { + runnable.run(); + return; + } + TenantUtils.execute(tenantId, runnable); + } } diff --git a/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/consumer/CleanOrderCreateEventHandler.java b/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/consumer/CleanOrderCreateEventHandler.java index 723860c..3a5c018 100644 --- a/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/consumer/CleanOrderCreateEventHandler.java +++ b/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/consumer/CleanOrderCreateEventHandler.java @@ -1,6 +1,8 @@ package com.viewsh.module.ops.environment.integration.consumer; import com.fasterxml.jackson.databind.ObjectMapper; +import com.viewsh.framework.tenant.core.context.TenantContextHolder; +import com.viewsh.framework.tenant.core.util.TenantUtils; import com.viewsh.module.iot.api.device.IotDeviceControlApi; import com.viewsh.module.iot.api.device.dto.ResetTrafficCounterReqDTO; import com.viewsh.module.ops.dal.dataobject.workorder.OpsOrderDO; @@ -18,6 +20,7 @@ import com.viewsh.module.ops.infrastructure.log.enumeration.LogModule; import com.viewsh.module.ops.infrastructure.log.enumeration.LogType; import com.viewsh.module.ops.infrastructure.log.recorder.EventLogRecord; import com.viewsh.module.ops.infrastructure.log.recorder.EventLogRecorder; +import com.viewsh.module.ops.infrastructure.redis.OpsRedisKeyBuilder; import jakarta.annotation.Resource; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.spring.annotation.ConsumeMode; @@ -28,6 +31,7 @@ import org.springframework.stereotype.Component; import java.util.HashMap; import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.concurrent.TimeUnit; @@ -106,7 +110,7 @@ public class CleanOrderCreateEventHandler implements RocketMQListener { CleanOrderCreateEventDTO event = objectMapper.readValue(message, CleanOrderCreateEventDTO.class); // 2. 幂等性检查 - String dedupKey = String.format(DEDUP_KEY_PATTERN, event.getEventId()); + String dedupKey = OpsRedisKeyBuilder.eventDedup(event.getTenantId(), "create", event.getEventId()); Boolean firstTime = stringRedisTemplate.opsForValue() .setIfAbsent(dedupKey, "1", DEDUP_TTL_SECONDS, TimeUnit.SECONDS); @@ -116,7 +120,7 @@ public class CleanOrderCreateEventHandler implements RocketMQListener { } // 3. 业务处理 - handleOrderCreate(event); + executeInTenantContext(event.getTenantId(), () -> handleOrderCreate(event)); } catch (Exception e) { log.error("[CleanOrderCreateEventHandler] 消息处理失败: message={}", message, e); @@ -505,4 +509,13 @@ public class CleanOrderCreateEventHandler implements RocketMQListener { } return null; } + + private void executeInTenantContext(Long tenantId, Runnable runnable) { + Long currentTenantId = TenantContextHolder.getTenantId(); + if (tenantId == null || Objects.equals(currentTenantId, tenantId)) { + runnable.run(); + return; + } + TenantUtils.execute(tenantId, runnable); + } } diff --git a/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/consumer/IntegrationEventDeduplicationService.java b/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/consumer/IntegrationEventDeduplicationService.java index 37d78c8..bcea05f 100644 --- a/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/consumer/IntegrationEventDeduplicationService.java +++ b/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/consumer/IntegrationEventDeduplicationService.java @@ -1,6 +1,7 @@ package com.viewsh.module.ops.environment.integration.consumer; import cn.hutool.core.util.StrUtil; +import com.viewsh.module.ops.infrastructure.redis.OpsRedisKeyBuilder; import jakarta.annotation.Resource; import lombok.extern.slf4j.Slf4j; import org.springframework.data.redis.core.StringRedisTemplate; @@ -19,8 +20,6 @@ import java.time.Duration; @Service public class IntegrationEventDeduplicationService { - private static final String DEDUP_KEY_PREFIX = "integration:event:dedup:"; - @Resource private StringRedisTemplate stringRedisTemplate; @@ -36,7 +35,7 @@ public class IntegrationEventDeduplicationService { return false; } - String key = DEDUP_KEY_PREFIX + eventId; + String key = OpsRedisKeyBuilder.eventDedup("integration", eventId); // setNX:如果 key 不存在则设置,返回 true Boolean success = stringRedisTemplate.opsForValue() @@ -60,7 +59,7 @@ public class IntegrationEventDeduplicationService { return; } - String key = DEDUP_KEY_PREFIX + eventId; + String key = OpsRedisKeyBuilder.eventDedup("integration", eventId); stringRedisTemplate.opsForValue().set(key, "1", Duration.ofHours(24)); } @@ -75,7 +74,7 @@ public class IntegrationEventDeduplicationService { return false; } - String key = DEDUP_KEY_PREFIX + eventId; + String key = OpsRedisKeyBuilder.eventDedup("integration", eventId); Boolean exists = stringRedisTemplate.hasKey(key); return Boolean.TRUE.equals(exists); } diff --git a/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/dto/CleanOrderArriveEventDTO.java b/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/dto/CleanOrderArriveEventDTO.java index 2dd3e21..7750ed9 100644 --- a/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/dto/CleanOrderArriveEventDTO.java +++ b/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/dto/CleanOrderArriveEventDTO.java @@ -48,7 +48,9 @@ public class CleanOrderArriveEventDTO { /** * 区域ID */ - private Long areaId; + private Long areaId; + + private Long tenantId; /** * 触发来源(IOT_BEACON=蓝牙信标检测) diff --git a/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/dto/CleanOrderAuditEventDTO.java b/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/dto/CleanOrderAuditEventDTO.java index afbdd0d..8075a03 100644 --- a/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/dto/CleanOrderAuditEventDTO.java +++ b/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/dto/CleanOrderAuditEventDTO.java @@ -63,7 +63,9 @@ public class CleanOrderAuditEventDTO { /** * 区域ID */ - private Long areaId; + private Long areaId; + + private Long tenantId; /** * 消息内容 diff --git a/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/dto/CleanOrderCompleteEventDTO.java b/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/dto/CleanOrderCompleteEventDTO.java index 4634e76..56fbbc4 100644 --- a/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/dto/CleanOrderCompleteEventDTO.java +++ b/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/dto/CleanOrderCompleteEventDTO.java @@ -48,7 +48,9 @@ public class CleanOrderCompleteEventDTO { /** * 区域ID */ - private Long areaId; + private Long areaId; + + private Long tenantId; /** * 触发来源(IOT_SIGNAL_LOSS=信号丢失超时) diff --git a/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/dto/CleanOrderCreateEventDTO.java b/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/dto/CleanOrderCreateEventDTO.java index e997337..c496777 100644 --- a/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/dto/CleanOrderCreateEventDTO.java +++ b/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/dto/CleanOrderCreateEventDTO.java @@ -53,7 +53,9 @@ public class CleanOrderCreateEventDTO { /** * 优先级(0=P0紧急 1=P1重要 2=P2普通) */ - private Integer priority; + private Integer priority; + + private Long tenantId; /** * 触发数据(JSON 格式的附加信息)