From b534d79434f6cbb091e39e3bd410335097731700 Mon Sep 17 00:00:00 2001 From: lzh Date: Mon, 20 Apr 2026 11:52:38 +0800 Subject: [PATCH] =?UTF-8?q?fix(ops):=20=E6=B4=BE=E5=8F=91=E5=85=A5?= =?UTF-8?q?=E5=8F=A3=E5=8A=A0=20FOR=20UPDATE=20=E5=B9=B6=E5=8F=91=E5=85=9C?= =?UTF-8?q?=E5=BA=95=EF=BC=8C=E5=86=B2=E7=AA=81=E6=97=B6=E9=99=8D=E7=BA=A7?= =?UTF-8?q?=E5=85=A5=E9=98=9F=E9=81=BF=E5=85=8D=E6=82=AC=E7=A9=BA?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 业务不变量:同一执行人在任一时刻最多只有 1 条活跃工单 (DISPATCHED/CONFIRMED/ARRIVED)。PAUSED 不纳入——P0 打断恢复 走 PAUSED→DISPATCHED,此处必须放行。 实现: 1. OpsOrderMapper.selectActiveByAssigneeForUpdate 查询 assignee 活跃工单并对命中行加 FOR UPDATE 排他锁。必须在 事务中调用。 2. OrderLifecycleManagerImpl.dispatch 入口校验 事务开启后立即执行 FOR UPDATE 查询,命中则返回带错误码 ASSIGNEE_HAS_ACTIVE_ORDER 的失败结果,不再执行责任链, 事务 commit 空操作、锁释放;并发竞争的第二个线程会阻塞到 第一个 commit 后看到活跃单,失败退出。 3. 新增 TransitionErrorCode 枚举 + OrderTransitionResult.errorCode 调用方可区分需降级的冲突与硬失败,避免把"可降级"的结果 直接抛给用户。 4. DispatchEngineImpl.executeDirectDispatch 降级逻辑 - 冲突 + 原状态 PENDING → 调 executeEnqueueOnly 降级到 QUEUED, 工单不悬空,等下一轮 autoDispatchNext 重挑。 - 冲突 + 原状态已是 QUEUED(并发另一路抢先派发时回滚保留) → 返回 fail 但不重复入队,天然等下一轮。 - 其他失败 → 照常 fail。 职责划分: - 生命周期层负责"拒绝违反不变量的转换" - 编排层负责"失败后给工单安置归宿" Co-Authored-By: Claude Opus 4.7 (1M context) --- .../ops/core/dispatch/DispatchEngineImpl.java | 21 +++++++++-- .../lifecycle/OrderLifecycleManagerImpl.java | 21 +++++++++++ .../model/OrderTransitionResult.java | 22 ++++++++++++ .../lifecycle/model/TransitionErrorCode.java | 35 +++++++++++++++++++ .../dal/mysql/workorder/OpsOrderMapper.java | 28 +++++++++++++++ 5 files changed, 125 insertions(+), 2 deletions(-) create mode 100644 viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/core/lifecycle/model/TransitionErrorCode.java diff --git a/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/core/dispatch/DispatchEngineImpl.java b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/core/dispatch/DispatchEngineImpl.java index ba7d98cf..e233e02c 100644 --- a/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/core/dispatch/DispatchEngineImpl.java +++ b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/core/dispatch/DispatchEngineImpl.java @@ -9,6 +9,7 @@ import com.viewsh.module.ops.core.dispatch.strategy.ScheduleStrategy; import com.viewsh.module.ops.core.lifecycle.OrderLifecycleManager; import com.viewsh.module.ops.core.lifecycle.model.OrderTransitionRequest; import com.viewsh.module.ops.core.lifecycle.model.OrderTransitionResult; +import com.viewsh.module.ops.core.lifecycle.model.TransitionErrorCode; import com.viewsh.module.ops.dal.dataobject.workorder.OpsOrderDO; import com.viewsh.module.ops.dal.mysql.workorder.OpsOrderMapper; import com.viewsh.module.ops.enums.OperatorTypeEnum; @@ -435,9 +436,25 @@ public class DispatchEngineImpl implements DispatchEngine { DispatchPath.DIRECT_DISPATCH, result.getQueueId() ); - } else { - return DispatchResult.fail("直接派单失败: " + result.getMessage()); } + + // 并发冲突兜底:dispatch 入口的 FOR UPDATE 判定执行人已有活跃工单, + // 此时工单仍在原状态(通常是 PENDING)。如果仍是 PENDING,直接降级为入队, + // 避免工单悬空;若已是 QUEUED(例如从队列派发被抢先),则让它继续留在队列等下一轮。 + if (result.getErrorCode() == TransitionErrorCode.ASSIGNEE_HAS_ACTIVE_ORDER) { + OpsOrderDO order = orderMapper.selectById(context.getOrderId()); + String currentStatus = order != null ? order.getStatus() : null; + if (WorkOrderStatusEnum.QUEUED.getStatus().equals(currentStatus)) { + log.warn("直接派单被 FOR UPDATE 拒绝且工单已在队列中,保持 QUEUED 等待下一轮: orderId={}, assigneeId={}", + context.getOrderId(), assigneeId); + return DispatchResult.fail("并发冲突,已留在队列等待: " + result.getMessage()); + } + log.warn("直接派单被 FOR UPDATE 拒绝,降级为入队: orderId={}, assigneeId={}, reason={}", + context.getOrderId(), assigneeId, result.getMessage()); + return executeEnqueueOnly(context, assigneeId); + } + + return DispatchResult.fail("直接派单失败: " + result.getMessage()); } /** diff --git a/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/core/lifecycle/OrderLifecycleManagerImpl.java b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/core/lifecycle/OrderLifecycleManagerImpl.java index 3864f49f..d9d4e104 100644 --- a/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/core/lifecycle/OrderLifecycleManagerImpl.java +++ b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/core/lifecycle/OrderLifecycleManagerImpl.java @@ -7,6 +7,7 @@ import com.viewsh.module.ops.core.lifecycle.handler.TransitionHandler; import com.viewsh.module.ops.core.lifecycle.model.OrderTransitionRequest; import com.viewsh.module.ops.core.lifecycle.model.OrderTransitionResult; import com.viewsh.module.ops.core.lifecycle.model.TransitionContext; +import com.viewsh.module.ops.core.lifecycle.model.TransitionErrorCode; import com.viewsh.module.ops.dal.dataobject.workorder.OpsOrderDO; import com.viewsh.module.ops.dal.mysql.workorder.OpsOrderMapper; import com.viewsh.module.ops.enums.OperatorTypeEnum; @@ -142,6 +143,26 @@ public class OrderLifecycleManagerImpl implements OrderLifecycleManager { // 设置目标状态 request.setTargetStatus(WorkOrderStatusEnum.DISPATCHED); + // 业务不变量:同一执行人在任一时刻最多只能有 1 条活跃工单 + // (DISPATCHED/CONFIRMED/ARRIVED)。PAUSED 不纳入——P0 打断恢复走的就是 + // PAUSED→DISPATCHED,此处放行。对命中行加 FOR UPDATE,配合 @Transactional + // 串行化并发派发;命中则本次派发被拒,由调用方决定降级策略 + // (DispatchEngineImpl.executeDirectDispatch 会降级为入队)。 + if (request.getAssigneeId() != null) { + java.util.List activeOrders = opsOrderMapper.selectActiveByAssigneeForUpdate( + request.getAssigneeId(), request.getOrderId()); + if (!activeOrders.isEmpty()) { + OpsOrderDO head = activeOrders.get(0); + log.warn("派发被拒:执行人已有活跃工单: assigneeId={}, requestOrderId={}, activeCount={}, sampleOrderId={}, sampleStatus={}", + request.getAssigneeId(), request.getOrderId(), + activeOrders.size(), head.getId(), head.getStatus()); + return OrderTransitionResult.fail( + request.getOrderId(), + "执行人已有活跃工单: orderId=" + head.getId() + ", status=" + head.getStatus(), + TransitionErrorCode.ASSIGNEE_HAS_ACTIVE_ORDER); + } + } + // 派单时更新工单的 assigneeId(从 PENDING -> DISPATCHED) if (request.getAssigneeId() != null) { OpsOrderDO order = opsOrderMapper.selectById(request.getOrderId()); diff --git a/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/core/lifecycle/model/OrderTransitionResult.java b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/core/lifecycle/model/OrderTransitionResult.java index e32a9c91..66e81d66 100644 --- a/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/core/lifecycle/model/OrderTransitionResult.java +++ b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/core/lifecycle/model/OrderTransitionResult.java @@ -47,6 +47,14 @@ public class OrderTransitionResult { */ private Long queueId; + /** + * 失败错误码(仅 success=false 时有值) + *

+ * 调用方可据此区分需降级的失败(如 ASSIGNEE_HAS_ACTIVE_ORDER)与硬失败, + * 未显式设置时默认为 {@link TransitionErrorCode#OTHER}。 + */ + private TransitionErrorCode errorCode; + /** * 成功结果 */ @@ -81,6 +89,7 @@ public class OrderTransitionResult { return OrderTransitionResult.builder() .success(false) .message(message) + .errorCode(TransitionErrorCode.OTHER) .build(); } @@ -92,6 +101,19 @@ public class OrderTransitionResult { .success(false) .orderId(orderId) .message(message) + .errorCode(TransitionErrorCode.OTHER) + .build(); + } + + /** + * 失败结果(带工单ID 和错误码) + */ + public static OrderTransitionResult fail(Long orderId, String message, TransitionErrorCode errorCode) { + return OrderTransitionResult.builder() + .success(false) + .orderId(orderId) + .message(message) + .errorCode(errorCode) .build(); } } diff --git a/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/core/lifecycle/model/TransitionErrorCode.java b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/core/lifecycle/model/TransitionErrorCode.java new file mode 100644 index 00000000..0ae7fac1 --- /dev/null +++ b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/core/lifecycle/model/TransitionErrorCode.java @@ -0,0 +1,35 @@ +package com.viewsh.module.ops.core.lifecycle.model; + +/** + * 状态转换失败的错误码 + *

+ * 用于调用方区分可恢复/需降级的失败场景(如并发冲突)与真正的硬失败(状态机非法转换等), + * 避免把"可降级"的结果误当成硬错误直接向用户暴露。 + * + * @author lzh + */ +public enum TransitionErrorCode { + + /** + * 执行人已有活跃工单(DISPATCHED/CONFIRMED/ARRIVED),不应再派发。 + *

+ * 发生在 OrderLifecycleManager.dispatch 入口的 FOR UPDATE 兜底检查命中时。 + * 调用方应将工单降级到 QUEUED(入队等待下一轮动态派发),避免 PENDING 状态悬空。 + */ + ASSIGNEE_HAS_ACTIVE_ORDER, + + /** + * 状态机不允许此转换(非法的状态流转) + */ + INVALID_TRANSITION, + + /** + * 工单不存在 + */ + ORDER_NOT_FOUND, + + /** + * 其他失败(无特定归类) + */ + OTHER; +} diff --git a/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/dal/mysql/workorder/OpsOrderMapper.java b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/dal/mysql/workorder/OpsOrderMapper.java index 6df51062..e19a7262 100644 --- a/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/dal/mysql/workorder/OpsOrderMapper.java +++ b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/dal/mysql/workorder/OpsOrderMapper.java @@ -114,6 +114,34 @@ public interface OpsOrderMapper extends BaseMapperX { .orderByAsc(OpsOrderDO::getCreateTime)); } + /** + * 查询执行人名下"正在执行"的工单,并对命中行加行锁(SELECT ... FOR UPDATE) + *

+ * 与 {@link #selectActiveByAssignee} 的区别: + *

    + *
  • 不含 PAUSED——PAUSED 代表 P0 打断后挂起的旧任务,不占用"当前时间片", + * 派发时(如 P0 结束后恢复)不应被它阻塞
  • + *
  • 结果行加 FOR UPDATE 排他锁,用于 dispatch 入口做业务不变量校验: + * "同一执行人在任一时刻最多只能有 1 条活跃工单"。
  • + *
+ * 必须在事务中调用,否则锁无意义。 + * + * @param assigneeId 执行人ID + * @param excludeOrderId 排除的工单ID(通常是本次正在派发的工单本身) + * @return 命中的活跃工单列表(通常空列表表示可派发) + */ + default List selectActiveByAssigneeForUpdate(Long assigneeId, Long excludeOrderId) { + return selectList(new LambdaQueryWrapperX() + .eq(OpsOrderDO::getAssigneeId, assigneeId) + .in(OpsOrderDO::getStatus, + WorkOrderStatusEnum.DISPATCHED.getStatus(), + WorkOrderStatusEnum.CONFIRMED.getStatus(), + WorkOrderStatusEnum.ARRIVED.getStatus()) + .ne(excludeOrderId != null, OpsOrderDO::getId, excludeOrderId) + .orderByAsc(OpsOrderDO::getCreateTime) + .last("FOR UPDATE")); + } + // ==================== 统计聚合查询 ==================== /**