fix(ops): 派发入口加 FOR UPDATE 并发兜底,冲突时降级入队避免悬空

业务不变量:同一执行人在任一时刻最多只有 1 条活跃工单
(DISPATCHED/CONFIRMED/ARRIVED)。PAUSED 不纳入——P0 打断恢复
走 PAUSED→DISPATCHED,此处必须放行。

实现:

1. OpsOrderMapper.selectActiveByAssigneeForUpdate
   查询 assignee 活跃工单并对命中行加 FOR UPDATE 排他锁。必须在
   事务中调用。

2. OrderLifecycleManagerImpl.dispatch 入口校验
   事务开启后立即执行 FOR UPDATE 查询,命中则返回带错误码
   ASSIGNEE_HAS_ACTIVE_ORDER 的失败结果,不再执行责任链,
   事务 commit 空操作、锁释放;并发竞争的第二个线程会阻塞到
   第一个 commit 后看到活跃单,失败退出。

3. 新增 TransitionErrorCode 枚举 + OrderTransitionResult.errorCode
   调用方可区分需降级的冲突与硬失败,避免把"可降级"的结果
   直接抛给用户。

4. DispatchEngineImpl.executeDirectDispatch 降级逻辑
   - 冲突 + 原状态 PENDING → 调 executeEnqueueOnly 降级到 QUEUED,
     工单不悬空,等下一轮 autoDispatchNext 重挑。
   - 冲突 + 原状态已是 QUEUED(并发另一路抢先派发时回滚保留)
     → 返回 fail 但不重复入队,天然等下一轮。
   - 其他失败 → 照常 fail。

职责划分:
- 生命周期层负责"拒绝违反不变量的转换"
- 编排层负责"失败后给工单安置归宿"

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
lzh
2026-04-20 11:52:38 +08:00
parent c24b1eb641
commit b534d79434
5 changed files with 125 additions and 2 deletions

View File

@@ -9,6 +9,7 @@ import com.viewsh.module.ops.core.dispatch.strategy.ScheduleStrategy;
import com.viewsh.module.ops.core.lifecycle.OrderLifecycleManager;
import com.viewsh.module.ops.core.lifecycle.model.OrderTransitionRequest;
import com.viewsh.module.ops.core.lifecycle.model.OrderTransitionResult;
import com.viewsh.module.ops.core.lifecycle.model.TransitionErrorCode;
import com.viewsh.module.ops.dal.dataobject.workorder.OpsOrderDO;
import com.viewsh.module.ops.dal.mysql.workorder.OpsOrderMapper;
import com.viewsh.module.ops.enums.OperatorTypeEnum;
@@ -435,9 +436,25 @@ public class DispatchEngineImpl implements DispatchEngine {
DispatchPath.DIRECT_DISPATCH,
result.getQueueId()
);
} else {
return DispatchResult.fail("直接派单失败: " + result.getMessage());
}
// 并发冲突兜底dispatch 入口的 FOR UPDATE 判定执行人已有活跃工单,
// 此时工单仍在原状态(通常是 PENDING。如果仍是 PENDING直接降级为入队
// 避免工单悬空;若已是 QUEUED例如从队列派发被抢先则让它继续留在队列等下一轮。
if (result.getErrorCode() == TransitionErrorCode.ASSIGNEE_HAS_ACTIVE_ORDER) {
OpsOrderDO order = orderMapper.selectById(context.getOrderId());
String currentStatus = order != null ? order.getStatus() : null;
if (WorkOrderStatusEnum.QUEUED.getStatus().equals(currentStatus)) {
log.warn("直接派单被 FOR UPDATE 拒绝且工单已在队列中,保持 QUEUED 等待下一轮: orderId={}, assigneeId={}",
context.getOrderId(), assigneeId);
return DispatchResult.fail("并发冲突,已留在队列等待: " + result.getMessage());
}
log.warn("直接派单被 FOR UPDATE 拒绝,降级为入队: orderId={}, assigneeId={}, reason={}",
context.getOrderId(), assigneeId, result.getMessage());
return executeEnqueueOnly(context, assigneeId);
}
return DispatchResult.fail("直接派单失败: " + result.getMessage());
}
/**

View File

@@ -7,6 +7,7 @@ import com.viewsh.module.ops.core.lifecycle.handler.TransitionHandler;
import com.viewsh.module.ops.core.lifecycle.model.OrderTransitionRequest;
import com.viewsh.module.ops.core.lifecycle.model.OrderTransitionResult;
import com.viewsh.module.ops.core.lifecycle.model.TransitionContext;
import com.viewsh.module.ops.core.lifecycle.model.TransitionErrorCode;
import com.viewsh.module.ops.dal.dataobject.workorder.OpsOrderDO;
import com.viewsh.module.ops.dal.mysql.workorder.OpsOrderMapper;
import com.viewsh.module.ops.enums.OperatorTypeEnum;
@@ -142,6 +143,26 @@ public class OrderLifecycleManagerImpl implements OrderLifecycleManager {
// 设置目标状态
request.setTargetStatus(WorkOrderStatusEnum.DISPATCHED);
// 业务不变量:同一执行人在任一时刻最多只能有 1 条活跃工单
// DISPATCHED/CONFIRMED/ARRIVED。PAUSED 不纳入——P0 打断恢复走的就是
// PAUSED→DISPATCHED此处放行。对命中行加 FOR UPDATE配合 @Transactional
// 串行化并发派发;命中则本次派发被拒,由调用方决定降级策略
// DispatchEngineImpl.executeDirectDispatch 会降级为入队)。
if (request.getAssigneeId() != null) {
java.util.List<OpsOrderDO> activeOrders = opsOrderMapper.selectActiveByAssigneeForUpdate(
request.getAssigneeId(), request.getOrderId());
if (!activeOrders.isEmpty()) {
OpsOrderDO head = activeOrders.get(0);
log.warn("派发被拒:执行人已有活跃工单: assigneeId={}, requestOrderId={}, activeCount={}, sampleOrderId={}, sampleStatus={}",
request.getAssigneeId(), request.getOrderId(),
activeOrders.size(), head.getId(), head.getStatus());
return OrderTransitionResult.fail(
request.getOrderId(),
"执行人已有活跃工单: orderId=" + head.getId() + ", status=" + head.getStatus(),
TransitionErrorCode.ASSIGNEE_HAS_ACTIVE_ORDER);
}
}
// 派单时更新工单的 assigneeId从 PENDING -> DISPATCHED
if (request.getAssigneeId() != null) {
OpsOrderDO order = opsOrderMapper.selectById(request.getOrderId());

View File

@@ -47,6 +47,14 @@ public class OrderTransitionResult {
*/
private Long queueId;
/**
* 失败错误码(仅 success=false 时有值)
* <p>
* 调用方可据此区分需降级的失败(如 ASSIGNEE_HAS_ACTIVE_ORDER与硬失败
* 未显式设置时默认为 {@link TransitionErrorCode#OTHER}。
*/
private TransitionErrorCode errorCode;
/**
* 成功结果
*/
@@ -81,6 +89,7 @@ public class OrderTransitionResult {
return OrderTransitionResult.builder()
.success(false)
.message(message)
.errorCode(TransitionErrorCode.OTHER)
.build();
}
@@ -92,6 +101,19 @@ public class OrderTransitionResult {
.success(false)
.orderId(orderId)
.message(message)
.errorCode(TransitionErrorCode.OTHER)
.build();
}
/**
* 失败结果带工单ID 和错误码)
*/
public static OrderTransitionResult fail(Long orderId, String message, TransitionErrorCode errorCode) {
return OrderTransitionResult.builder()
.success(false)
.orderId(orderId)
.message(message)
.errorCode(errorCode)
.build();
}
}

View File

@@ -0,0 +1,35 @@
package com.viewsh.module.ops.core.lifecycle.model;
/**
* 状态转换失败的错误码
* <p>
* 用于调用方区分可恢复/需降级的失败场景(如并发冲突)与真正的硬失败(状态机非法转换等),
* 避免把"可降级"的结果误当成硬错误直接向用户暴露。
*
* @author lzh
*/
public enum TransitionErrorCode {
/**
* 执行人已有活跃工单DISPATCHED/CONFIRMED/ARRIVED不应再派发。
* <p>
* 发生在 OrderLifecycleManager.dispatch 入口的 FOR UPDATE 兜底检查命中时。
* 调用方应将工单降级到 QUEUED入队等待下一轮动态派发避免 PENDING 状态悬空。
*/
ASSIGNEE_HAS_ACTIVE_ORDER,
/**
* 状态机不允许此转换(非法的状态流转)
*/
INVALID_TRANSITION,
/**
* 工单不存在
*/
ORDER_NOT_FOUND,
/**
* 其他失败(无特定归类)
*/
OTHER;
}

View File

@@ -114,6 +114,34 @@ public interface OpsOrderMapper extends BaseMapperX<OpsOrderDO> {
.orderByAsc(OpsOrderDO::getCreateTime));
}
/**
* 查询执行人名下"正在执行"的工单并对命中行加行锁SELECT ... FOR UPDATE
* <p>
* 与 {@link #selectActiveByAssignee} 的区别:
* <ul>
* <li><b>不含 PAUSED</b>——PAUSED 代表 P0 打断后挂起的旧任务,不占用"当前时间片"
* 派发时(如 P0 结束后恢复)不应被它阻塞</li>
* <li>结果行加 FOR UPDATE 排他锁,用于 dispatch 入口做业务不变量校验:
* "同一执行人在任一时刻最多只能有 1 条活跃工单"。</li>
* </ul>
* 必须在事务中调用,否则锁无意义。
*
* @param assigneeId 执行人ID
* @param excludeOrderId 排除的工单ID通常是本次正在派发的工单本身
* @return 命中的活跃工单列表(通常空列表表示可派发)
*/
default List<OpsOrderDO> selectActiveByAssigneeForUpdate(Long assigneeId, Long excludeOrderId) {
return selectList(new LambdaQueryWrapperX<OpsOrderDO>()
.eq(OpsOrderDO::getAssigneeId, assigneeId)
.in(OpsOrderDO::getStatus,
WorkOrderStatusEnum.DISPATCHED.getStatus(),
WorkOrderStatusEnum.CONFIRMED.getStatus(),
WorkOrderStatusEnum.ARRIVED.getStatus())
.ne(excludeOrderId != null, OpsOrderDO::getId, excludeOrderId)
.orderByAsc(OpsOrderDO::getCreateTime)
.last("FOR UPDATE"));
}
// ==================== 统计聚合查询 ====================
/**