From bd6a98b7665f2e98ea7e2f60cf244bbf980f0561 Mon Sep 17 00:00:00 2001 From: lzh Date: Mon, 30 Mar 2026 15:55:35 +0800 Subject: [PATCH] =?UTF-8?q?Revert=20"fix(ops):=20RocketMQ=20=E6=B6=88?= =?UTF-8?q?=E8=B4=B9=E8=80=85=E6=B7=BB=E5=8A=A0=E7=A7=9F=E6=88=B7=E4=B8=8A?= =?UTF-8?q?=E4=B8=8B=E6=96=87=E9=98=B2=E5=BE=A1=E6=80=A7=E5=85=9C=E5=BA=95?= =?UTF-8?q?"?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This reverts commit 7d19e7bafaf6ae96b68e9e7d7319fa59a1be86d4. --- .../BadgeDeviceStatusEventHandler.java | 323 +++++++++++------- .../CleanOrderArriveEventHandler.java | 278 ++++++++------- .../consumer/CleanOrderAuditEventHandler.java | 31 +- .../CleanOrderCompleteEventHandler.java | 321 ++++++++++------- .../CleanOrderConfirmEventHandler.java | 133 +++++--- .../CleanOrderCreateEventHandler.java | 17 +- .../IntegrationEventDeduplicationService.java | 9 +- .../dto/CleanOrderArriveEventDTO.java | 4 +- .../dto/CleanOrderAuditEventDTO.java | 4 +- .../dto/CleanOrderCompleteEventDTO.java | 4 +- .../dto/CleanOrderCreateEventDTO.java | 4 +- 11 files changed, 649 insertions(+), 479 deletions(-) diff --git a/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/consumer/BadgeDeviceStatusEventHandler.java b/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/consumer/BadgeDeviceStatusEventHandler.java index 283f3ce..fa65c52 100644 --- a/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/consumer/BadgeDeviceStatusEventHandler.java +++ b/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/consumer/BadgeDeviceStatusEventHandler.java @@ -1,128 +1,195 @@ -package com.viewsh.module.ops.environment.integration.consumer; - -import com.fasterxml.jackson.databind.ObjectMapper; -import com.viewsh.framework.tenant.core.context.TenantContextHolder; -import com.viewsh.framework.tenant.core.util.TenantUtils; -import com.viewsh.module.ops.dal.dataobject.area.OpsAreaDeviceRelationDO; -import com.viewsh.module.ops.enums.BadgeDeviceStatusEnum; -import com.viewsh.module.ops.environment.integration.dto.IotDeviceStatusChangedEventDTO; -import com.viewsh.module.ops.environment.service.badge.BadgeDeviceStatusService; -import com.viewsh.module.ops.infrastructure.redis.OpsRedisKeyBuilder; -import com.viewsh.module.ops.service.area.AreaDeviceService; -import jakarta.annotation.Resource; -import lombok.extern.slf4j.Slf4j; -import org.apache.rocketmq.spring.annotation.ConsumeMode; -import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; -import org.apache.rocketmq.spring.core.RocketMQListener; -import org.springframework.data.redis.core.StringRedisTemplate; -import org.springframework.stereotype.Component; - -import java.util.Objects; -import java.util.concurrent.TimeUnit; - -@Slf4j -@Component -@RocketMQMessageListener( - topic = "integration-device-status", - consumerGroup = "ops-badge-status-consumer", - consumeMode = ConsumeMode.CONCURRENTLY, - selectorExpression = "*", - accessKey = "${rocketmq.consumer.access-key:}", - secretKey = "${rocketmq.consumer.secret-key:}" -) -public class BadgeDeviceStatusEventHandler implements RocketMQListener { - - private static final int DEDUP_TTL_SECONDS = 120; - - @Resource - private ObjectMapper objectMapper; - - @Resource - private StringRedisTemplate stringRedisTemplate; - - @Resource - private BadgeDeviceStatusService badgeDeviceStatusService; - - @Resource - private AreaDeviceService areaDeviceService; - - @Override - public void onMessage(String message) { - try { - IotDeviceStatusChangedEventDTO event = objectMapper.readValue(message, IotDeviceStatusChangedEventDTO.class); - log.debug("[BadgeDeviceStatusEventHandler] 收到设备状态变更事件: eventId={}, deviceId={}, {} -> {}", - event.getEventId(), event.getDeviceId(), event.getOldStatus(), event.getNewStatus()); - String dedupKey = OpsRedisKeyBuilder.badgeDedup(event.getTenantId(), event.getEventId()); - Boolean firstTime = stringRedisTemplate.opsForValue() - .setIfAbsent(dedupKey, "1", DEDUP_TTL_SECONDS, TimeUnit.SECONDS); - if (Boolean.FALSE.equals(firstTime)) { - log.debug("[BadgeDeviceStatusEventHandler] 重复消息,跳过处理: eventId={}", event.getEventId()); - return; - } - executeInTenantContext(event.getTenantId(), () -> handleDeviceStatusChange(event)); - } catch (Exception e) { - log.error("[BadgeDeviceStatusEventHandler] 消息处理失败: message={}", message, e); - } - } - - private void handleDeviceStatusChange(IotDeviceStatusChangedEventDTO event) { - if (!isBadgeDevice(event.getDeviceId())) { - log.debug("[BadgeDeviceStatusEventHandler] 非工牌设备,忽略状态同步: deviceId={}", event.getDeviceId()); - return; - } - - Long areaId = getAreaIdByDeviceId(event.getDeviceId()); - if (event.isOnline()) { - badgeDeviceStatusService.updateBadgeOnlineStatus( - event.getDeviceId(), event.getDeviceName(), event.getNickname(), areaId, - BadgeDeviceStatusEnum.IDLE, "设备上线"); - return; - } - if (event.isOffline()) { - badgeDeviceStatusService.updateBadgeOnlineStatus( - event.getDeviceId(), event.getDeviceName(), event.getNickname(), null, - BadgeDeviceStatusEnum.OFFLINE, "设备离线"); - return; - } - log.debug("[BadgeDeviceStatusEventHandler] 忽略未处理的状态变更: deviceId={}, newStatus={}", - event.getDeviceId(), event.getNewStatus()); - } - - private boolean isBadgeDevice(Long deviceId) { - if (deviceId == null) { - return false; - } - try { - OpsAreaDeviceRelationDO relation = areaDeviceService.getByDeviceId(deviceId); - return relation != null && "BADGE".equals(relation.getRelationType()); - } catch (Exception e) { - log.warn("[BadgeDeviceStatusEventHandler] 查询设备绑定关系失败: deviceId={}", deviceId, e); - return false; - } - } - - private Long getAreaIdByDeviceId(Long deviceId) { - if (deviceId == null) { - return null; - } - try { - OpsAreaDeviceRelationDO relation = areaDeviceService.getByDeviceId(deviceId); - if (relation != null && "BADGE".equals(relation.getRelationType())) { - return relation.getAreaId(); - } - return null; - } catch (Exception e) { - log.warn("[BadgeDeviceStatusEventHandler] 查询设备所属区域失败: deviceId={}", deviceId, e); - return null; - } - } - - private void executeInTenantContext(Long tenantId, Runnable runnable) { - Long currentTenantId = TenantContextHolder.getTenantId(); - if (tenantId == null || Objects.equals(currentTenantId, tenantId)) { - runnable.run(); - return; - } - TenantUtils.execute(tenantId, runnable); - } -} +package com.viewsh.module.ops.environment.integration.consumer; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.viewsh.module.ops.dal.dataobject.area.OpsAreaDeviceRelationDO; +import com.viewsh.module.ops.enums.BadgeDeviceStatusEnum; +import com.viewsh.module.ops.environment.integration.dto.IotDeviceStatusChangedEventDTO; +import com.viewsh.module.ops.environment.service.badge.BadgeDeviceStatusService; +import com.viewsh.module.ops.service.area.AreaDeviceService; +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.spring.annotation.ConsumeMode; +import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; +import org.apache.rocketmq.spring.core.RocketMQListener; +import org.springframework.data.redis.core.StringRedisTemplate; +import org.springframework.stereotype.Component; + +import java.util.concurrent.TimeUnit; + +/** + * 工牌设备状态变更事件消费者 + *

+ * 监听 IoT 模块发布的设备状态变更事件(上线/离线) + *

+ * 调用链路: + * + *

+ * IoT 设备连接/断开
+ *     ↓
+ * IotDeviceServiceImpl.updateDeviceState()
+ *     ↓
+ * RocketMQ Topic: integration-device-status
+ *     ↓
+ * BadgeDeviceStatusEventHandler.onMessage()
+ *     - 幂等性检查
+ *     - 判断是否为工牌设备
+ *     ↓
+ * badgeDeviceStatusService.updateBadgeOnlineStatus()
+ *     - 创建/更新 Redis 状态记录
+ * 
+ * + * @author AI + */ +@Slf4j +@Component +@RocketMQMessageListener( + topic = "integration-device-status", + consumerGroup = "ops-badge-status-consumer", + consumeMode = ConsumeMode.CONCURRENTLY, + selectorExpression = "*", + accessKey = "${rocketmq.consumer.access-key:}", + secretKey = "${rocketmq.consumer.secret-key:}" +) +public class BadgeDeviceStatusEventHandler implements RocketMQListener { + + /** + * 幂等性控制 Key 模式 + */ + private static final String DEDUP_KEY_PATTERN = "ops:badge:dedup:status:%s"; + + /** + * 幂等性控制 TTL(秒) + */ + private static final int DEDUP_TTL_SECONDS = 120; + + @Resource + private ObjectMapper objectMapper; + + @Resource + private StringRedisTemplate stringRedisTemplate; + + @Resource + private BadgeDeviceStatusService badgeDeviceStatusService; + + @Resource + private AreaDeviceService areaDeviceService; + + @Override + public void onMessage(String message) { + try { + // 1. JSON 反序列化 + IotDeviceStatusChangedEventDTO event = objectMapper.readValue(message, + IotDeviceStatusChangedEventDTO.class); + + log.debug("[BadgeDeviceStatusEventHandler] 收到设备状态变更事件: eventId={}, deviceId={}, {} -> {}", + event.getEventId(), event.getDeviceId(), event.getOldStatus(), event.getNewStatus()); + + // 2. 幂等性检查 + String dedupKey = String.format(DEDUP_KEY_PATTERN, event.getEventId()); + Boolean firstTime = stringRedisTemplate.opsForValue() + .setIfAbsent(dedupKey, "1", DEDUP_TTL_SECONDS, TimeUnit.SECONDS); + + if (Boolean.FALSE.equals(firstTime)) { + log.debug("[BadgeDeviceStatusEventHandler] 重复消息,跳过处理: eventId={}", event.getEventId()); + return; + } + + // 3. 判断是否为工牌设备 + if (!isBadgeDevice(event.getDeviceId())) { + log.debug("[BadgeDeviceStatusEventHandler] 非工牌设备,跳过处理: deviceId={}", event.getDeviceId()); + return; + } + + // 4. 处理状态变更 + handleDeviceStatusChange(event); + + } catch (Exception e) { + log.error("[BadgeDeviceStatusEventHandler] 消息处理失败: message={}", message, e); + // 不抛出异常,避免消息重试导致重复处理 + } + } + + /** + * 处理设备状态变更 + */ + private void handleDeviceStatusChange(IotDeviceStatusChangedEventDTO event) { + Long deviceId = event.getDeviceId(); + String deviceCode = event.getDeviceName(); + String nickname = event.getNickname(); + + // 获取设备所属区域 + Long areaId = getAreaIdByDeviceId(deviceId); + + if (event.isOnline()) { + // 设备上线 + log.info("[BadgeDeviceStatusEventHandler] 工牌设备上线: deviceId={}, deviceCode={}, nickname={}, areaId={}", + deviceId, deviceCode, nickname, areaId); + + badgeDeviceStatusService.updateBadgeOnlineStatus( + deviceId, + deviceCode, + nickname, + areaId, + BadgeDeviceStatusEnum.IDLE, + "设备上线"); + + } else if (event.isOffline()) { + // 设备离线 + log.info("[BadgeDeviceStatusEventHandler] 工牌设备离线: deviceId={}, deviceCode={}", + deviceId, deviceCode); + + badgeDeviceStatusService.updateBadgeOnlineStatus( + deviceId, + deviceCode, + nickname, + null, + BadgeDeviceStatusEnum.OFFLINE, + "设备离线"); + + } else { + // 其他状态(如 INACTIVE),暂不处理 + log.debug("[BadgeDeviceStatusEventHandler] 忽略状态变更: deviceId={}, newStatus={}", + deviceId, event.getNewStatus()); + } + } + + /** + * 判断是否为工牌设备 + *

+ * 通过查询 ops_area_device_relation 表判断设备是否为 BADGE 类型 + */ + private boolean isBadgeDevice(Long deviceId) { + if (deviceId == null) { + return false; + } + + try { + // 通过区域设备关系判断是否为工牌设备 + OpsAreaDeviceRelationDO relation = areaDeviceService.getByDeviceId(deviceId); + return relation != null && "BADGE".equals(relation.getRelationType()); + } catch (Exception e) { + log.warn("[BadgeDeviceStatusEventHandler] 查询设备类型失败: deviceId={}", deviceId, e); + return false; + } + } + + /** + * 获取设备所属区域ID + */ + private Long getAreaIdByDeviceId(Long deviceId) { + if (deviceId == null) { + return null; + } + + try { + OpsAreaDeviceRelationDO relation = areaDeviceService.getByDeviceId(deviceId); + if (relation != null && "BADGE".equals(relation.getRelationType())) { + return relation.getAreaId(); + } + return null; + } catch (Exception e) { + log.warn("[BadgeDeviceStatusEventHandler] 获取设备区域失败: deviceId={}", deviceId, e); + return null; + } + } +} diff --git a/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/consumer/CleanOrderArriveEventHandler.java b/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/consumer/CleanOrderArriveEventHandler.java index 7fb6e99..fa891b3 100644 --- a/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/consumer/CleanOrderArriveEventHandler.java +++ b/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/consumer/CleanOrderArriveEventHandler.java @@ -1,118 +1,160 @@ -package com.viewsh.module.ops.environment.integration.consumer; - -import com.fasterxml.jackson.databind.ObjectMapper; -import com.viewsh.framework.tenant.core.context.TenantContextHolder; -import com.viewsh.framework.tenant.core.util.TenantUtils; -import com.viewsh.module.ops.core.lifecycle.OrderLifecycleManager; -import com.viewsh.module.ops.core.lifecycle.model.OrderTransitionRequest; -import com.viewsh.module.ops.dal.dataobject.workorder.OpsOrderDO; -import com.viewsh.module.ops.dal.mysql.workorder.OpsOrderMapper; -import com.viewsh.module.ops.enums.OperatorTypeEnum; -import com.viewsh.module.ops.enums.WorkOrderStatusEnum; -import com.viewsh.module.ops.environment.integration.dto.CleanOrderArriveEventDTO; -import com.viewsh.module.ops.infrastructure.redis.OpsRedisKeyBuilder; -import jakarta.annotation.Resource; -import lombok.extern.slf4j.Slf4j; -import org.apache.rocketmq.spring.annotation.ConsumeMode; -import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; -import org.apache.rocketmq.spring.core.RocketMQListener; -import org.springframework.data.redis.core.StringRedisTemplate; -import org.springframework.stereotype.Component; - -import java.util.HashMap; -import java.util.Map; -import java.util.Objects; -import java.util.concurrent.TimeUnit; - -@Slf4j -@Component -@RocketMQMessageListener( - topic = "ops-order-arrive", - consumerGroup = "ops-clean-order-arrive-group", - consumeMode = ConsumeMode.CONCURRENTLY, - selectorExpression = "*", - accessKey = "${rocketmq.consumer.access-key:}", - secretKey = "${rocketmq.consumer.secret-key:}" -) -public class CleanOrderArriveEventHandler implements RocketMQListener { - - private static final int DEDUP_TTL_SECONDS = 300; - - @Resource - private ObjectMapper objectMapper; - - @Resource - private StringRedisTemplate stringRedisTemplate; - - @Resource - private OpsOrderMapper opsOrderMapper; - - @Resource - private OrderLifecycleManager orderLifecycleManager; - - @Override - public void onMessage(String message) { - try { - CleanOrderArriveEventDTO event = objectMapper.readValue(message, CleanOrderArriveEventDTO.class); - String dedupKey = OpsRedisKeyBuilder.eventDedup(event.getTenantId(), "arrive", event.getEventId()); - Boolean firstTime = stringRedisTemplate.opsForValue() - .setIfAbsent(dedupKey, "1", DEDUP_TTL_SECONDS, TimeUnit.SECONDS); - if (!Boolean.TRUE.equals(firstTime)) { - log.debug("[CleanOrderArriveEventHandler] 重复消息,跳过处理: eventId={}", event.getEventId()); - return; - } - executeInTenantContext(event.getTenantId(), () -> handleOrderArrive(event)); - } catch (Exception e) { - log.error("[CleanOrderArriveEventHandler] 消息处理失败: message={}", message, e); - throw new RuntimeException("保洁工单到岗事件处理失败", e); - } - } - - private void handleOrderArrive(CleanOrderArriveEventDTO event) { - log.info("[CleanOrderArriveEventHandler] 收到到岗事件: eventId={}, orderId={}, deviceId={}, areaId={}", - event.getEventId(), event.getOrderId(), event.getDeviceId(), event.getAreaId()); - - OpsOrderDO order = opsOrderMapper.selectById(event.getOrderId()); - if (order == null) { - log.warn("[CleanOrderArriveEventHandler] 工单不存在: orderId={}", event.getOrderId()); - return; - } - - WorkOrderStatusEnum currentStatus = WorkOrderStatusEnum.fromStatus(order.getStatus()); - if (!currentStatus.canStartWorking()) { - log.warn("[CleanOrderArriveEventHandler] 当前状态不允许到岗: orderId={}, status={}", - event.getOrderId(), order.getStatus()); - return; - } - - Map payload = new HashMap<>(); - payload.put("deviceId", event.getDeviceId()); - payload.put("deviceKey", event.getDeviceKey()); - payload.put("areaId", event.getAreaId()); - payload.put("triggerSource", event.getTriggerSource()); - if (event.getTriggerData() != null) { - payload.putAll(event.getTriggerData()); - } - - OrderTransitionRequest request = OrderTransitionRequest.builder() - .orderId(event.getOrderId()) - .targetStatus(WorkOrderStatusEnum.ARRIVED) - .operatorType(OperatorTypeEnum.SYSTEM) - .reason("设备触发自动到岗") - .payload(payload) - .build(); - orderLifecycleManager.transition(request); - - log.info("[CleanOrderArriveEventHandler] 工单到岗完成: eventId={}, orderId={}, deviceId={}", - event.getEventId(), event.getOrderId(), event.getDeviceId()); - } - - private void executeInTenantContext(Long tenantId, Runnable runnable) { - Long currentTenantId = TenantContextHolder.getTenantId(); - if (tenantId == null || Objects.equals(currentTenantId, tenantId)) { - runnable.run(); - return; - } - TenantUtils.execute(tenantId, runnable); - } -} +package com.viewsh.module.ops.environment.integration.consumer; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.viewsh.module.ops.core.lifecycle.OrderLifecycleManager; +import com.viewsh.module.ops.core.lifecycle.model.OrderTransitionRequest; +import com.viewsh.module.ops.dal.dataobject.workorder.OpsOrderDO; +import com.viewsh.module.ops.dal.mysql.workorder.OpsOrderMapper; +import com.viewsh.module.ops.enums.OperatorTypeEnum; +import com.viewsh.module.ops.enums.WorkOrderStatusEnum; +import com.viewsh.module.ops.environment.integration.dto.CleanOrderArriveEventDTO; +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.spring.annotation.ConsumeMode; +import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; +import org.apache.rocketmq.spring.core.RocketMQListener; +import org.springframework.data.redis.core.StringRedisTemplate; +import org.springframework.stereotype.Component; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +/** + * 保洁工单到岗事件消费者 + *

+ * 订阅 IoT 模块发布的保洁工单到岗事件 + *

+ * 调用链路: + *

+ * IoT 发布 RocketMQ 消息 (ops-order-arrive)
+ *     ↓
+ * CleanOrderArriveEventHandler.onMessage()
+ *     - 幂等性检查
+ *     - 状态检查
+ *     ↓
+ * orderLifecycleManager.transition(ARRIVED) [@Transactional]
+ *     - 更新工单状态
+ *     - 发布 OrderStateChangedEvent
+ *     ↓
+ * 事务提交
+ *     ↓
+ * CleanOrderEventListener.handleArrived() [@Async, AFTER_COMMIT]
+ *     - 记录到岗时间到扩展表
+ *     - 更新设备工单关联信息(areaId, beaconMac)
+ *     - 记录业务日志
+ * 
+ *

+ * 设计说明: + * - Handler 只负责消息接收和状态转换 + * - 业务日志、设备缓存更新由 CleanOrderEventListener 在事务提交后处理 + * - BadgeDeviceStatusServiceImpl 在 BEFORE_COMMIT 阶段跳过 ARRIVED(由 Listener 处理完整信息) + * + * @author AI + */ +@Slf4j +@Component +@RocketMQMessageListener( + topic = "ops-order-arrive", + consumerGroup = "ops-clean-order-arrive-group", + consumeMode = ConsumeMode.CONCURRENTLY, + selectorExpression = "*", + accessKey = "${rocketmq.consumer.access-key:}", + secretKey = "${rocketmq.consumer.secret-key:}" +) +public class CleanOrderArriveEventHandler implements RocketMQListener { + + /** + * 幂等性控制 Key 模式 + */ + private static final String DEDUP_KEY_PATTERN = "ops:clean:dedup:arrive:%s"; + + /** + * 幂等性控制 TTL(秒) + */ + private static final int DEDUP_TTL_SECONDS = 300; + + @Resource + private ObjectMapper objectMapper; + + @Resource + private StringRedisTemplate stringRedisTemplate; + + @Resource + private OpsOrderMapper opsOrderMapper; + + @Resource + private OrderLifecycleManager orderLifecycleManager; + + @Override + public void onMessage(String message) { + try { + // 1. JSON 反序列化 + CleanOrderArriveEventDTO event = objectMapper.readValue(message, CleanOrderArriveEventDTO.class); + + // 2. 幂等性检查 + String dedupKey = String.format(DEDUP_KEY_PATTERN, event.getEventId()); + Boolean firstTime = stringRedisTemplate.opsForValue() + .setIfAbsent(dedupKey, "1", DEDUP_TTL_SECONDS, TimeUnit.SECONDS); + + if (!firstTime) { + log.debug("[CleanOrderArriveEventHandler] 重复消息,跳过处理: eventId={}", event.getEventId()); + return; + } + + // 3. 业务处理 + handleOrderArrive(event); + + } catch (Exception e) { + log.error("[CleanOrderArriveEventHandler] 消息处理失败: message={}", message, e); + throw new RuntimeException("保洁工单到岗事件处理失败", e); + } + } + + /** + * 处理工单到岗 + */ + private void handleOrderArrive(CleanOrderArriveEventDTO event) { + log.info("[CleanOrderArriveEventHandler] 收到到岗事件: eventId={}, orderId={}, deviceId={}, areaId={}", + event.getEventId(), event.getOrderId(), event.getDeviceId(), event.getAreaId()); + + // 1. 查询工单 + OpsOrderDO order = opsOrderMapper.selectById(event.getOrderId()); + if (order == null) { + log.warn("[CleanOrderArriveEventHandler] 工单不存在: orderId={}", event.getOrderId()); + return; + } + + // 2. 检查工单状态 + WorkOrderStatusEnum currentStatus = WorkOrderStatusEnum.fromStatus(order.getStatus()); + if (!currentStatus.canStartWorking()) { + log.warn("[CleanOrderArriveEventHandler] 工单状态不允许到岗: orderId={}, status={}", + event.getOrderId(), order.getStatus()); + return; + } + + // 3. 构建状态转换请求(包含完整信息,供 Listener 使用) + Map payload = new HashMap<>(); + payload.put("deviceId", event.getDeviceId()); + payload.put("deviceKey", event.getDeviceKey()); + payload.put("areaId", event.getAreaId()); + payload.put("triggerSource", event.getTriggerSource()); + if (event.getTriggerData() != null) { + payload.putAll(event.getTriggerData()); + } + + OrderTransitionRequest request = OrderTransitionRequest.builder() + .orderId(event.getOrderId()) + .targetStatus(WorkOrderStatusEnum.ARRIVED) + .operatorType(OperatorTypeEnum.SYSTEM) + .reason("蓝牙信标自动到岗确认") + .payload(payload) + .build(); + // 4. 执行状态转换 + // 注意:业务日志和设备缓存更新由 CleanOrderEventListener 在 AFTER_COMMIT 阶段处理 + orderLifecycleManager.transition(request); + + log.info("[CleanOrderArriveEventHandler] 工单到岗成功: eventId={}, orderId={}, deviceId={}", + event.getEventId(), event.getOrderId(), event.getDeviceId()); + } +} diff --git a/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/consumer/CleanOrderAuditEventHandler.java b/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/consumer/CleanOrderAuditEventHandler.java index 93f26ae..051bfb6 100644 --- a/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/consumer/CleanOrderAuditEventHandler.java +++ b/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/consumer/CleanOrderAuditEventHandler.java @@ -1,8 +1,6 @@ package com.viewsh.module.ops.environment.integration.consumer; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.viewsh.framework.tenant.core.context.TenantContextHolder; -import com.viewsh.framework.tenant.core.util.TenantUtils; +import com.fasterxml.jackson.databind.ObjectMapper; import com.viewsh.framework.mybatis.core.query.LambdaQueryWrapperX; import com.viewsh.module.ops.dal.dataobject.workorder.OpsOrderDO; import com.viewsh.module.ops.dal.mysql.workorder.OpsOrderMapper; @@ -15,9 +13,8 @@ import com.viewsh.module.ops.infrastructure.log.enumeration.EventDomain; import com.viewsh.module.ops.infrastructure.log.enumeration.EventLevel; import com.viewsh.module.ops.infrastructure.log.enumeration.LogModule; import com.viewsh.module.ops.infrastructure.log.enumeration.LogType; -import com.viewsh.module.ops.infrastructure.log.recorder.EventLogRecord; -import com.viewsh.module.ops.infrastructure.log.recorder.EventLogRecorder; -import com.viewsh.module.ops.infrastructure.redis.OpsRedisKeyBuilder; +import com.viewsh.module.ops.infrastructure.log.recorder.EventLogRecord; +import com.viewsh.module.ops.infrastructure.log.recorder.EventLogRecorder; import jakarta.annotation.Resource; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.spring.annotation.ConsumeMode; @@ -26,9 +23,8 @@ import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.stereotype.Component; -import java.util.Arrays; -import java.util.Objects; -import java.util.concurrent.TimeUnit; +import java.util.Arrays; +import java.util.concurrent.TimeUnit; /** * 保洁工单审计事件消费者 @@ -88,7 +84,7 @@ public class CleanOrderAuditEventHandler implements RocketMQListener { CleanOrderAuditEventDTO event = objectMapper.readValue(message, CleanOrderAuditEventDTO.class); // 2. 幂等性检查 - String dedupKey = OpsRedisKeyBuilder.eventDedup(event.getTenantId(), "audit", event.getEventId()); + String dedupKey = String.format(DEDUP_KEY_PATTERN, event.getEventId()); Boolean firstTime = stringRedisTemplate.opsForValue() .setIfAbsent(dedupKey, "1", DEDUP_TTL_SECONDS, TimeUnit.SECONDS); @@ -98,7 +94,7 @@ public class CleanOrderAuditEventHandler implements RocketMQListener { } // 3. 业务处理 - executeInTenantContext(event.getTenantId(), () -> handleAuditEvent(event)); + handleAuditEvent(event); } catch (Exception e) { log.error("[CleanOrderAuditEventHandler] 消息处理失败: message={}", message, e); @@ -260,14 +256,5 @@ public class CleanOrderAuditEventHandler implements RocketMQListener { return EventLevel.WARN; } return EventLevel.INFO; - } - - private void executeInTenantContext(Long tenantId, Runnable runnable) { - Long currentTenantId = TenantContextHolder.getTenantId(); - if (tenantId == null || Objects.equals(currentTenantId, tenantId)) { - runnable.run(); - return; - } - TenantUtils.execute(tenantId, runnable); - } -} + } +} diff --git a/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/consumer/CleanOrderCompleteEventHandler.java b/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/consumer/CleanOrderCompleteEventHandler.java index ffc8529..30ad827 100644 --- a/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/consumer/CleanOrderCompleteEventHandler.java +++ b/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/consumer/CleanOrderCompleteEventHandler.java @@ -1,130 +1,191 @@ -package com.viewsh.module.ops.environment.integration.consumer; - -import com.fasterxml.jackson.databind.ObjectMapper; -import com.viewsh.framework.tenant.core.context.TenantContextHolder; -import com.viewsh.framework.tenant.core.util.TenantUtils; -import com.viewsh.module.ops.core.lifecycle.OrderLifecycleManager; -import com.viewsh.module.ops.core.lifecycle.model.OrderTransitionRequest; -import com.viewsh.module.ops.dal.dataobject.workorder.OpsOrderDO; -import com.viewsh.module.ops.dal.mysql.workorder.OpsOrderMapper; -import com.viewsh.module.ops.enums.OperatorTypeEnum; -import com.viewsh.module.ops.enums.WorkOrderStatusEnum; -import com.viewsh.module.ops.environment.integration.dto.CleanOrderCompleteEventDTO; -import com.viewsh.module.ops.infrastructure.redis.OpsRedisKeyBuilder; -import jakarta.annotation.Resource; -import lombok.extern.slf4j.Slf4j; -import org.apache.rocketmq.spring.annotation.ConsumeMode; -import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; -import org.apache.rocketmq.spring.core.RocketMQListener; -import org.springframework.data.redis.core.StringRedisTemplate; -import org.springframework.stereotype.Component; - -import java.util.HashMap; -import java.util.Map; -import java.util.Objects; -import java.util.concurrent.TimeUnit; - -@Slf4j -@Component -@RocketMQMessageListener( - topic = "ops-order-complete", - consumerGroup = "ops-clean-order-complete-group", - consumeMode = ConsumeMode.CONCURRENTLY, - selectorExpression = "*", - accessKey = "${rocketmq.consumer.access-key:}", - secretKey = "${rocketmq.consumer.secret-key:}" -) -public class CleanOrderCompleteEventHandler implements RocketMQListener { - - private static final int DEDUP_TTL_SECONDS = 300; - - @Resource - private ObjectMapper objectMapper; - - @Resource - private StringRedisTemplate stringRedisTemplate; - - @Resource - private OpsOrderMapper opsOrderMapper; - - @Resource - private OrderLifecycleManager orderLifecycleManager; - - @Override - public void onMessage(String message) { - try { - CleanOrderCompleteEventDTO event = objectMapper.readValue(message, CleanOrderCompleteEventDTO.class); - String dedupKey = OpsRedisKeyBuilder.eventDedup(event.getTenantId(), "complete", event.getEventId()); - Boolean firstTime = stringRedisTemplate.opsForValue() - .setIfAbsent(dedupKey, "1", DEDUP_TTL_SECONDS, TimeUnit.SECONDS); - if (!Boolean.TRUE.equals(firstTime)) { - log.debug("[CleanOrderCompleteEventHandler] 重复消息,跳过处理: eventId={}", event.getEventId()); - return; - } - executeInTenantContext(event.getTenantId(), () -> handleOrderComplete(event)); - } catch (Exception e) { - log.error("[CleanOrderCompleteEventHandler] 消息处理失败: message={}", message, e); - throw new RuntimeException("保洁工单完单事件处理失败", e); - } - } - - private void handleOrderComplete(CleanOrderCompleteEventDTO event) { - log.info("[CleanOrderCompleteEventHandler] 收到完单事件: eventId={}, orderId={}, deviceId={}, areaId={}", - event.getEventId(), event.getOrderId(), event.getDeviceId(), event.getAreaId()); - - OpsOrderDO order = opsOrderMapper.selectById(event.getOrderId()); - if (order == null) { - log.warn("[CleanOrderCompleteEventHandler] 工单不存在: orderId={}", event.getOrderId()); - return; - } - - WorkOrderStatusEnum currentStatus = WorkOrderStatusEnum.fromStatus(order.getStatus()); - if (!currentStatus.canComplete()) { - log.warn("[CleanOrderCompleteEventHandler] 当前状态不允许完单: orderId={}, status={}", - event.getOrderId(), order.getStatus()); - return; - } - - Map payload = new HashMap<>(); - payload.put("deviceId", event.getDeviceId()); - payload.put("deviceKey", event.getDeviceKey()); - payload.put("areaId", event.getAreaId()); - payload.put("triggerSource", event.getTriggerSource()); - if (event.getTriggerData() != null) { - payload.putAll(event.getTriggerData()); - } - - OrderTransitionRequest request = OrderTransitionRequest.builder() - .orderId(event.getOrderId()) - .targetStatus(WorkOrderStatusEnum.COMPLETED) - .operatorType(OperatorTypeEnum.SYSTEM) - .assigneeId(event.getDeviceId()) - .reason(buildCompletionRemark(event)) - .payload(payload) - .build(); - orderLifecycleManager.transition(request); - - log.info("[CleanOrderCompleteEventHandler] 工单完单完成: eventId={}, orderId={}", - event.getEventId(), event.getOrderId()); - } - - private String buildCompletionRemark(CleanOrderCompleteEventDTO event) { - StringBuilder remark = new StringBuilder("设备触发自动完单"); - if (event.getTriggerData() != null) { - Object durationMs = event.getTriggerData().get("durationMs"); - if (durationMs instanceof Number number) { - remark.append(",作业时长约 ").append(number.longValue() / 60000).append(" 分钟"); - } - } - return remark.toString(); - } - - private void executeInTenantContext(Long tenantId, Runnable runnable) { - Long currentTenantId = TenantContextHolder.getTenantId(); - if (tenantId == null || Objects.equals(currentTenantId, tenantId)) { - runnable.run(); - return; - } - TenantUtils.execute(tenantId, runnable); - } -} +package com.viewsh.module.ops.environment.integration.consumer; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.viewsh.module.ops.core.lifecycle.OrderLifecycleManager; +import com.viewsh.module.ops.core.lifecycle.model.OrderTransitionRequest; +import com.viewsh.module.ops.dal.dataobject.workorder.OpsOrderDO; +import com.viewsh.module.ops.dal.mysql.workorder.OpsOrderMapper; +import com.viewsh.module.ops.enums.OperatorTypeEnum; +import com.viewsh.module.ops.enums.WorkOrderStatusEnum; +import com.viewsh.module.ops.environment.integration.dto.CleanOrderCompleteEventDTO; +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.spring.annotation.ConsumeMode; +import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; +import org.apache.rocketmq.spring.core.RocketMQListener; +import org.springframework.data.redis.core.StringRedisTemplate; +import org.springframework.stereotype.Component; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +/** + * 保洁工单完成事件消费者 + *

+ * 订阅 IoT 模块发布的保洁工单完成事件 + *

+ * 调用链路: + * + *

+ * IoT 发布 RocketMQ 消息 (ops-order-complete)
+ *     ↓
+ * CleanOrderCompleteEventHandler.onMessage()
+ *     - 幂等性检查
+ *     - 状态检查
+ *     ↓
+ * orderLifecycleManager.transition(COMPLETED) [@Transactional]
+ *     - 更新工单状态
+ *     - 发布 OrderStateChangedEvent
+ *     ↓
+ * 事务提交
+ *     ↓
+ * BadgeDeviceStatusEventListener.onOrderStateChanged() [BEFORE_COMMIT]
+ *     - 清除设备工单关联
+ *     - 检查等待任务,决定设备状态 (BUSY/IDLE)
+ *     ↓
+ * CleanOrderEventListener.handleCompleted() [AFTER_COMMIT]
+ *     - 记录完成时间到扩展表
+ *     - 记录业务日志
+ * CleanOrderEventListener.onOrderCompleted() [AFTER_COMMIT]
+ *     - 自动调度下一个任务
+ *     - 发送完成通知
+ * 
+ *

+ * 设计说明: + * - Handler 只负责消息接收和状态转换 + * - 设备状态由 BadgeDeviceStatusEventListener 在 BEFORE_COMMIT 阶段处理 + * - 业务日志、自动调度由 CleanOrderEventListener 在 AFTER_COMMIT 阶段处理 + * + * @author AI + */ +@Slf4j +@Component +@RocketMQMessageListener( + topic = "ops-order-complete", + consumerGroup = "ops-clean-order-complete-group", + consumeMode = ConsumeMode.CONCURRENTLY, + selectorExpression = "*", + accessKey = "${rocketmq.consumer.access-key:}", + secretKey = "${rocketmq.consumer.secret-key:}" +) +public class CleanOrderCompleteEventHandler implements RocketMQListener { + + /** + * 幂等性控制 Key 模式 + */ + private static final String DEDUP_KEY_PATTERN = "ops:clean:dedup:complete:%s"; + + /** + * 幂等性控制 TTL(秒) + */ + private static final int DEDUP_TTL_SECONDS = 300; + + @Resource + private ObjectMapper objectMapper; + + @Resource + private StringRedisTemplate stringRedisTemplate; + + @Resource + private OpsOrderMapper opsOrderMapper; + + @Resource + private OrderLifecycleManager orderLifecycleManager; + + @Override + public void onMessage(String message) { + try { + // 1. JSON 反序列化 + CleanOrderCompleteEventDTO event = objectMapper.readValue(message, CleanOrderCompleteEventDTO.class); + + // 2. 幂等性检查 + String dedupKey = String.format(DEDUP_KEY_PATTERN, event.getEventId()); + Boolean firstTime = stringRedisTemplate.opsForValue() + .setIfAbsent(dedupKey, "1", DEDUP_TTL_SECONDS, TimeUnit.SECONDS); + + if (!firstTime) { + log.debug("[CleanOrderCompleteEventHandler] 重复消息,跳过处理: eventId={}", event.getEventId()); + return; + } + + // 3. 业务处理 + handleOrderComplete(event); + + } catch (Exception e) { + log.error("[CleanOrderCompleteEventHandler] 消息处理失败: message={}", message, e); + throw new RuntimeException("保洁工单完成事件处理失败", e); + } + } + + /** + * 处理工单完成 + */ + private void handleOrderComplete(CleanOrderCompleteEventDTO event) { + log.info("[CleanOrderCompleteEventHandler] 收到完成事件: eventId={}, orderId={}, deviceId={}, areaId={}", + event.getEventId(), event.getOrderId(), event.getDeviceId(), event.getAreaId()); + + // 1. 查询工单 + OpsOrderDO order = opsOrderMapper.selectById(event.getOrderId()); + if (order == null) { + log.warn("[CleanOrderCompleteEventHandler] 工单不存在: orderId={}", event.getOrderId()); + return; + } + + // 2. 检查工单状态 + WorkOrderStatusEnum currentStatus = WorkOrderStatusEnum.fromStatus(order.getStatus()); + if (!currentStatus.canComplete()) { + log.warn("[CleanOrderCompleteEventHandler] 工单状态不允许完成: orderId={}, status={}", + event.getOrderId(), order.getStatus()); + return; + } + + // 3. 计算作业时长 + String remark = buildCompletionRemark(event); + + // 4. 构建状态转换请求 + Map payload = new HashMap<>(); + payload.put("deviceId", event.getDeviceId()); + payload.put("deviceKey", event.getDeviceKey()); + payload.put("areaId", event.getAreaId()); + payload.put("triggerSource", event.getTriggerSource()); + if (event.getTriggerData() != null) { + payload.putAll(event.getTriggerData()); + } + + OrderTransitionRequest request = OrderTransitionRequest.builder() + .orderId(event.getOrderId()) + .targetStatus(WorkOrderStatusEnum.COMPLETED) + .operatorType(OperatorTypeEnum.SYSTEM) + .assigneeId(event.getDeviceId()) // 设备ID用于后续自动调度 + .reason(remark) + .payload(payload) + .build(); + + // 5. 执行状态转换 + // 注意:设备状态、业务日志、自动调度由事件监听器处理 + orderLifecycleManager.transition(request); + + log.info("[CleanOrderCompleteEventHandler] 工单完成成功: eventId={}, orderId={}", + event.getEventId(), event.getOrderId()); + } + + /** + * 构建完成备注 + */ + private String buildCompletionRemark(CleanOrderCompleteEventDTO event) { + StringBuilder remark = new StringBuilder(); + remark.append("信号丢失超时自动完成"); + + if (event.getTriggerData() != null) { + Object durationMs = event.getTriggerData().get("durationMs"); + if (durationMs != null) { + long durationMinutes = ((Number) durationMs).longValue() / 60000; + remark.append(",作业时长: ").append(durationMinutes).append("分钟"); + } + } + + return remark.toString(); + } + +} diff --git a/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/consumer/CleanOrderConfirmEventHandler.java b/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/consumer/CleanOrderConfirmEventHandler.java index ddde5bf..3979606 100644 --- a/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/consumer/CleanOrderConfirmEventHandler.java +++ b/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/consumer/CleanOrderConfirmEventHandler.java @@ -1,8 +1,6 @@ package com.viewsh.module.ops.environment.integration.consumer; import com.fasterxml.jackson.databind.ObjectMapper; -import com.viewsh.framework.tenant.core.context.TenantContextHolder; -import com.viewsh.framework.tenant.core.util.TenantUtils; import com.viewsh.module.ops.core.lifecycle.OrderLifecycleManager; import com.viewsh.module.ops.core.lifecycle.model.OrderTransitionRequest; import com.viewsh.module.ops.dal.dataobject.workorder.OpsOrderDO; @@ -10,7 +8,6 @@ import com.viewsh.module.ops.dal.mysql.workorder.OpsOrderMapper; import com.viewsh.module.ops.enums.OperatorTypeEnum; import com.viewsh.module.ops.enums.WorkOrderStatusEnum; import com.viewsh.module.ops.environment.integration.dto.CleanOrderConfirmEventDTO; -import com.viewsh.module.ops.infrastructure.redis.OpsRedisKeyBuilder; import jakarta.annotation.Resource; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.spring.annotation.ConsumeMode; @@ -19,9 +16,39 @@ import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.stereotype.Component; -import java.util.Objects; import java.util.concurrent.TimeUnit; +/** + * 保洁工单确认事件消费者 + *

+ * 订阅 IoT 模块发布的工单确认事件(如:工牌按键确认) + *

+ * 调用链路: + *

+ * IoT 发布 RocketMQ 消息 (ops-order-confirm)
+ *     ↓
+ * CleanOrderConfirmEventHandler.onMessage()
+ *     - 幂等性检查
+ *     - 状态检查
+ *     ↓
+ * orderLifecycleManager.transition(CONFIRMED) [@Transactional]
+ *     - 更新工单状态
+ *     - 发布 OrderStateChangedEvent
+ *     ↓
+ * 事务提交
+ *     ↓
+ * CleanOrderEventListener.onOrderStateChanged() [@TransactionalEventListener(AFTER_COMMIT)]
+ *     - 记录业务日志
+ *     - 发送 TTS "工单已确认,请前往作业区域开始作业"
+ * 
+ *

+ * 设计说明: + * - Handler 只负责消息接收和状态转换 + * - 日志记录和通知发送由 CleanOrderEventListener 在事务提交后处理 + * - 设备状态由 BadgeDeviceStatusServiceImpl 在 BEFORE_COMMIT 阶段同步 + * + * @author AI + */ @Slf4j @Component @RocketMQMessageListener( @@ -33,6 +60,7 @@ import java.util.concurrent.TimeUnit; ) public class CleanOrderConfirmEventHandler implements RocketMQListener { + private static final String DEDUP_KEY_PATTERN = "ops:clean:dedup:confirm:%s"; private static final int DEDUP_TTL_SECONDS = 300; @Resource @@ -50,61 +78,66 @@ public class CleanOrderConfirmEventHandler implements RocketMQListener { @Override public void onMessage(String message) { try { + // 1. JSON 反序列化 CleanOrderConfirmEventDTO event = objectMapper.readValue(message, CleanOrderConfirmEventDTO.class); - String dedupKey = OpsRedisKeyBuilder.eventDedup(event.getTenantId(), "confirm", event.getEventId()); + String eventId = event.getEventId(); + + // 2. 幂等性检查 + String dedupKey = String.format(DEDUP_KEY_PATTERN, eventId); Boolean firstTime = stringRedisTemplate.opsForValue() .setIfAbsent(dedupKey, "1", DEDUP_TTL_SECONDS, TimeUnit.SECONDS); + if (!Boolean.TRUE.equals(firstTime)) { - log.debug("[CleanOrderConfirmEventHandler] 重复消息,跳过处理: eventId={}", event.getEventId()); + log.debug("[CleanOrderConfirmEventHandler] 重复事件,跳过处理: eventId={}", eventId); return; } - executeInTenantContext(event.getTenantId(), () -> handleConfirmEvent(event)); + + // 3. 加载工单 + Long orderId = event.getOrderId(); + OpsOrderDO order = opsOrderMapper.selectById(orderId); + if (order == null) { + log.warn("[CleanOrderConfirmEventHandler] 工单不存在: orderId={}", orderId); + return; + } + + WorkOrderStatusEnum currentStatus = WorkOrderStatusEnum.fromStatus(order.getStatus()); + log.info("[CleanOrderConfirmEventHandler] 收到确认事件: eventId={}, orderId={}, currentStatus={}, deviceId={}", + eventId, orderId, currentStatus, event.getDeviceId()); + + // 4. 状态检查 + // 如果已在进行中 (CONFIRMED or ARRIVED),直接返回(TTS 由 Listener 处理) + if (currentStatus == WorkOrderStatusEnum.CONFIRMED || currentStatus == WorkOrderStatusEnum.ARRIVED) { + log.debug("[CleanOrderConfirmEventHandler] 工单已在进行中: orderId={}, status={}", orderId, currentStatus); + return; + } + + // 检查是否可以确认 + if (!currentStatus.canConfirm()) { + log.warn("[CleanOrderConfirmEventHandler] 当前状态无法确认工单: orderId={}, status={}", orderId, currentStatus); + return; + } + + // 5. 构建状态转换请求(包含 deviceId,供 Listener 使用) + OrderTransitionRequest request = OrderTransitionRequest.builder() + .orderId(orderId) + .targetStatus(WorkOrderStatusEnum.CONFIRMED) + .reason("工牌按键确认") + .operatorType(OperatorTypeEnum.CLEANER) + .operatorId(event.getDeviceId() != null ? event.getDeviceId() : order.getAssigneeId()) + .build(); + // 将 deviceId 放入 payload,供 Listener 使用 + request.putPayload("deviceId", event.getDeviceId()); + request.putPayload("triggerSource", "BADGE_BUTTON"); + + // 6. 执行状态转换 + // 注意:日志记录和 TTS 由 CleanOrderEventListener 在 AFTER_COMMIT 阶段处理 + orderLifecycleManager.transition(request); + + log.info("[CleanOrderConfirmEventHandler] 工单确认成功: orderId={}, deviceId={}", orderId, event.getDeviceId()); + } catch (Exception e) { log.error("[CleanOrderConfirmEventHandler] 消息处理失败: message={}", message, e); throw new RuntimeException("保洁工单确认事件处理失败", e); } } - - private void handleConfirmEvent(CleanOrderConfirmEventDTO event) { - Long orderId = event.getOrderId(); - OpsOrderDO order = opsOrderMapper.selectById(orderId); - if (order == null) { - log.warn("[CleanOrderConfirmEventHandler] 工单不存在: orderId={}", orderId); - return; - } - - WorkOrderStatusEnum currentStatus = WorkOrderStatusEnum.fromStatus(order.getStatus()); - log.info("[CleanOrderConfirmEventHandler] 收到确认事件: eventId={}, orderId={}, currentStatus={}, deviceId={}", - event.getEventId(), orderId, currentStatus, event.getDeviceId()); - if (currentStatus == WorkOrderStatusEnum.CONFIRMED || currentStatus == WorkOrderStatusEnum.ARRIVED) { - log.debug("[CleanOrderConfirmEventHandler] 工单已确认或已到岗,跳过处理: orderId={}, status={}", orderId, currentStatus); - return; - } - if (!currentStatus.canConfirm()) { - log.warn("[CleanOrderConfirmEventHandler] 当前状态不允许确认: orderId={}, status={}", orderId, currentStatus); - return; - } - - OrderTransitionRequest request = OrderTransitionRequest.builder() - .orderId(orderId) - .targetStatus(WorkOrderStatusEnum.CONFIRMED) - .reason("设备确认工单") - .operatorType(OperatorTypeEnum.CLEANER) - .operatorId(event.getDeviceId() != null ? event.getDeviceId() : order.getAssigneeId()) - .build(); - request.putPayload("deviceId", event.getDeviceId()); - request.putPayload("triggerSource", "BADGE_BUTTON"); - orderLifecycleManager.transition(request); - - log.info("[CleanOrderConfirmEventHandler] 工单确认完成: orderId={}, deviceId={}", orderId, event.getDeviceId()); - } - - private void executeInTenantContext(Long tenantId, Runnable runnable) { - Long currentTenantId = TenantContextHolder.getTenantId(); - if (tenantId == null || Objects.equals(currentTenantId, tenantId)) { - runnable.run(); - return; - } - TenantUtils.execute(tenantId, runnable); - } } diff --git a/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/consumer/CleanOrderCreateEventHandler.java b/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/consumer/CleanOrderCreateEventHandler.java index 3a5c018..723860c 100644 --- a/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/consumer/CleanOrderCreateEventHandler.java +++ b/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/consumer/CleanOrderCreateEventHandler.java @@ -1,8 +1,6 @@ package com.viewsh.module.ops.environment.integration.consumer; import com.fasterxml.jackson.databind.ObjectMapper; -import com.viewsh.framework.tenant.core.context.TenantContextHolder; -import com.viewsh.framework.tenant.core.util.TenantUtils; import com.viewsh.module.iot.api.device.IotDeviceControlApi; import com.viewsh.module.iot.api.device.dto.ResetTrafficCounterReqDTO; import com.viewsh.module.ops.dal.dataobject.workorder.OpsOrderDO; @@ -20,7 +18,6 @@ import com.viewsh.module.ops.infrastructure.log.enumeration.LogModule; import com.viewsh.module.ops.infrastructure.log.enumeration.LogType; import com.viewsh.module.ops.infrastructure.log.recorder.EventLogRecord; import com.viewsh.module.ops.infrastructure.log.recorder.EventLogRecorder; -import com.viewsh.module.ops.infrastructure.redis.OpsRedisKeyBuilder; import jakarta.annotation.Resource; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.spring.annotation.ConsumeMode; @@ -31,7 +28,6 @@ import org.springframework.stereotype.Component; import java.util.HashMap; import java.util.Map; -import java.util.Objects; import java.util.Set; import java.util.concurrent.TimeUnit; @@ -110,7 +106,7 @@ public class CleanOrderCreateEventHandler implements RocketMQListener { CleanOrderCreateEventDTO event = objectMapper.readValue(message, CleanOrderCreateEventDTO.class); // 2. 幂等性检查 - String dedupKey = OpsRedisKeyBuilder.eventDedup(event.getTenantId(), "create", event.getEventId()); + String dedupKey = String.format(DEDUP_KEY_PATTERN, event.getEventId()); Boolean firstTime = stringRedisTemplate.opsForValue() .setIfAbsent(dedupKey, "1", DEDUP_TTL_SECONDS, TimeUnit.SECONDS); @@ -120,7 +116,7 @@ public class CleanOrderCreateEventHandler implements RocketMQListener { } // 3. 业务处理 - executeInTenantContext(event.getTenantId(), () -> handleOrderCreate(event)); + handleOrderCreate(event); } catch (Exception e) { log.error("[CleanOrderCreateEventHandler] 消息处理失败: message={}", message, e); @@ -509,13 +505,4 @@ public class CleanOrderCreateEventHandler implements RocketMQListener { } return null; } - - private void executeInTenantContext(Long tenantId, Runnable runnable) { - Long currentTenantId = TenantContextHolder.getTenantId(); - if (tenantId == null || Objects.equals(currentTenantId, tenantId)) { - runnable.run(); - return; - } - TenantUtils.execute(tenantId, runnable); - } } diff --git a/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/consumer/IntegrationEventDeduplicationService.java b/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/consumer/IntegrationEventDeduplicationService.java index bcea05f..37d78c8 100644 --- a/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/consumer/IntegrationEventDeduplicationService.java +++ b/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/consumer/IntegrationEventDeduplicationService.java @@ -1,7 +1,6 @@ package com.viewsh.module.ops.environment.integration.consumer; import cn.hutool.core.util.StrUtil; -import com.viewsh.module.ops.infrastructure.redis.OpsRedisKeyBuilder; import jakarta.annotation.Resource; import lombok.extern.slf4j.Slf4j; import org.springframework.data.redis.core.StringRedisTemplate; @@ -20,6 +19,8 @@ import java.time.Duration; @Service public class IntegrationEventDeduplicationService { + private static final String DEDUP_KEY_PREFIX = "integration:event:dedup:"; + @Resource private StringRedisTemplate stringRedisTemplate; @@ -35,7 +36,7 @@ public class IntegrationEventDeduplicationService { return false; } - String key = OpsRedisKeyBuilder.eventDedup("integration", eventId); + String key = DEDUP_KEY_PREFIX + eventId; // setNX:如果 key 不存在则设置,返回 true Boolean success = stringRedisTemplate.opsForValue() @@ -59,7 +60,7 @@ public class IntegrationEventDeduplicationService { return; } - String key = OpsRedisKeyBuilder.eventDedup("integration", eventId); + String key = DEDUP_KEY_PREFIX + eventId; stringRedisTemplate.opsForValue().set(key, "1", Duration.ofHours(24)); } @@ -74,7 +75,7 @@ public class IntegrationEventDeduplicationService { return false; } - String key = OpsRedisKeyBuilder.eventDedup("integration", eventId); + String key = DEDUP_KEY_PREFIX + eventId; Boolean exists = stringRedisTemplate.hasKey(key); return Boolean.TRUE.equals(exists); } diff --git a/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/dto/CleanOrderArriveEventDTO.java b/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/dto/CleanOrderArriveEventDTO.java index 7750ed9..2dd3e21 100644 --- a/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/dto/CleanOrderArriveEventDTO.java +++ b/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/dto/CleanOrderArriveEventDTO.java @@ -48,9 +48,7 @@ public class CleanOrderArriveEventDTO { /** * 区域ID */ - private Long areaId; - - private Long tenantId; + private Long areaId; /** * 触发来源(IOT_BEACON=蓝牙信标检测) diff --git a/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/dto/CleanOrderAuditEventDTO.java b/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/dto/CleanOrderAuditEventDTO.java index 8075a03..afbdd0d 100644 --- a/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/dto/CleanOrderAuditEventDTO.java +++ b/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/dto/CleanOrderAuditEventDTO.java @@ -63,9 +63,7 @@ public class CleanOrderAuditEventDTO { /** * 区域ID */ - private Long areaId; - - private Long tenantId; + private Long areaId; /** * 消息内容 diff --git a/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/dto/CleanOrderCompleteEventDTO.java b/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/dto/CleanOrderCompleteEventDTO.java index 56fbbc4..4634e76 100644 --- a/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/dto/CleanOrderCompleteEventDTO.java +++ b/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/dto/CleanOrderCompleteEventDTO.java @@ -48,9 +48,7 @@ public class CleanOrderCompleteEventDTO { /** * 区域ID */ - private Long areaId; - - private Long tenantId; + private Long areaId; /** * 触发来源(IOT_SIGNAL_LOSS=信号丢失超时) diff --git a/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/dto/CleanOrderCreateEventDTO.java b/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/dto/CleanOrderCreateEventDTO.java index c496777..e997337 100644 --- a/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/dto/CleanOrderCreateEventDTO.java +++ b/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/dto/CleanOrderCreateEventDTO.java @@ -53,9 +53,7 @@ public class CleanOrderCreateEventDTO { /** * 优先级(0=P0紧急 1=P1重要 2=P2普通) */ - private Integer priority; - - private Long tenantId; + private Integer priority; /** * 触发数据(JSON 格式的附加信息)