From ba6f94a2795a4205edc4ac37049b782553a38850 Mon Sep 17 00:00:00 2001 From: lzh Date: Mon, 20 Apr 2026 14:51:32 +0800 Subject: [PATCH] =?UTF-8?q?fix(ops):=20review=20=E5=A4=8D=E7=9B=98?= =?UTF-8?q?=E8=A1=A5=E9=BD=90=20FOR=20UPDATE=20=E8=A6=86=E7=9B=96=E9=9D=A2?= =?UTF-8?q?=20+=20=E6=B8=85=E7=90=86=E6=B3=A8=E8=A7=A3/=E6=97=A5=E5=BF=97?= =?UTF-8?q?=E6=AD=BB=E8=A7=92?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 今日 review 发现 Bug #2 的 FOR UPDATE 防线只装在 dispatch() 上,但同文件另有两条 路径绕过它: 1. P1 — DispatchEngineImpl.autoDispatchNext 调 transition() 派发队列下一单, 不走 FOR UPDATE。idle 校验和 transition 之间存在竞争窗口,能再次让同 assignee 挂两条 DISPATCHED。改调 dispatch(),天然继承串行化。 补测 autoDispatchNext_whenDispatchingFromQueue_shouldGoThroughDispatchNotTransition 锁定该不变量。 2. P2 — OrderLifecycleManagerImpl.resumeOrder/resumeInterruptedOrder 同样走 transition(),P0 恢复与并发派发竞争时可能产生两条 DISPATCHED。改为先 selectById 取 assigneeId,改调 dispatch() 让同一检查生效。 顺手清理 3 个误导: - DispatchEngineImpl.executePushAndEnqueue 原先忽略内部 dispatch 的返回值, 并发场景下会输出假的“已推送等待任务”日志误导运维,改为按 result.isSuccess() 分支打印。 - OrderTransitionAuditListener.writeRollbackAudit 的 @Transactional(REQUIRES_NEW) 是死注解(由 onAfterRollback 自调用,Spring 代理无法拦截;且 AFTER_ROLLBACK 本就无事务),移除并更新 Javadoc 说明实际行为。 - OrderQueueServiceEnhanced.triggerQueueRebuildAfterCommit 的自调用绕过 @Transactional 是设计意图(最终一致即可),补 Javadoc 解释事务边界, 避免后续误判为 bug。 测试:ops-biz 56 个相关用例全部通过,含新增的 P1 锁定测试。 Co-Authored-By: Claude Opus 4.7 (1M context) --- .../ops/core/dispatch/DispatchEngineImpl.java | 15 ++++++-- .../lifecycle/OrderLifecycleManagerImpl.java | 11 ++++-- .../audit/OrderTransitionAuditListener.java | 11 +++--- .../queue/OrderQueueServiceEnhanced.java | 15 ++++++++ .../dispatch/DispatchEngineIdleCheckTest.java | 37 +++++++++++++++++++ 5 files changed, 77 insertions(+), 12 deletions(-) diff --git a/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/core/dispatch/DispatchEngineImpl.java b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/core/dispatch/DispatchEngineImpl.java index e233e02c..b65f4582 100644 --- a/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/core/dispatch/DispatchEngineImpl.java +++ b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/core/dispatch/DispatchEngineImpl.java @@ -246,7 +246,9 @@ public class DispatchEngineImpl implements DispatchEngine { .reason("等待队列动态重排后自动派发") .build(); - OrderTransitionResult result = orderLifecycleManager.transition(request); + // 走 dispatch() 而不是 transition():dispatch 内部会先做 FOR UPDATE 不变量检查 + // (Bug #2 防线),避免 autoDispatchNext 在"从队列派发"这一类入口绕过串行化。 + OrderTransitionResult result = orderLifecycleManager.dispatch(request); if (result.isSuccess()) { return DispatchResult.success("已按队列总分派发下一单", assigneeId); @@ -477,8 +479,15 @@ public class DispatchEngineImpl implements DispatchEngine { .reason("自动推送等待任务") .build(); - orderLifecycleManager.dispatch(dispatchRequest); - log.info("已推送等待任务: taskId={}", firstWaiting.getId()); + OrderTransitionResult pushResult = orderLifecycleManager.dispatch(dispatchRequest); + if (pushResult.isSuccess()) { + log.info("已推送等待任务: taskId={}", firstWaiting.getId()); + } else { + // 可能被 dispatch() 里的 FOR UPDATE 拒绝:此处不中断新任务入队流程, + // 但要把"推送失败"清晰落在日志里,避免 "已推送" 说谎误导运维排查。 + log.warn("推送等待任务失败,继续执行新任务入队: taskId={}, orderId={}, error={}", + firstWaiting.getId(), firstWaiting.getOpsOrderId(), pushResult.getMessage()); + } } // 新任务入队 diff --git a/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/core/lifecycle/OrderLifecycleManagerImpl.java b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/core/lifecycle/OrderLifecycleManagerImpl.java index e9dce59b..4770afa7 100644 --- a/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/core/lifecycle/OrderLifecycleManagerImpl.java +++ b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/core/lifecycle/OrderLifecycleManagerImpl.java @@ -230,17 +230,22 @@ public class OrderLifecycleManagerImpl implements OrderLifecycleManager { public void resumeOrder(Long orderId, Long operatorId) { log.info("开始恢复工单: orderId={}, operatorId={}", orderId, operatorId); - // 构建请求 + // 取出工单自身的 assigneeId 透传给 dispatch,使其 FOR UPDATE 不变量检查生效—— + // 否则 P0 恢复与并发派发竞争时可能再出现"同一 assignee 两条 DISPATCHED"。 + // assigneeId == null 的异常态(工单已卸人)下 dispatch 会跳过该检查,行为退化为原 transition。 + OpsOrderDO order = opsOrderMapper.selectById(orderId); + Long assigneeId = order != null ? order.getAssigneeId() : null; + OrderTransitionRequest request = OrderTransitionRequest.builder() .orderId(orderId) .targetStatus(WorkOrderStatusEnum.DISPATCHED) + .assigneeId(assigneeId) .operatorType(OperatorTypeEnum.CLEANER) .operatorId(operatorId) .reason("恢复工单") .build(); - // 执行状态转换 - OrderTransitionResult result = transition(request); + OrderTransitionResult result = dispatch(request); if (!result.isSuccess()) { throw new IllegalStateException("恢复工单失败: " + result.getMessage()); diff --git a/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/core/lifecycle/audit/OrderTransitionAuditListener.java b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/core/lifecycle/audit/OrderTransitionAuditListener.java index 4d6d13ca..bb10af99 100644 --- a/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/core/lifecycle/audit/OrderTransitionAuditListener.java +++ b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/core/lifecycle/audit/OrderTransitionAuditListener.java @@ -11,8 +11,6 @@ import com.viewsh.module.ops.infrastructure.log.recorder.EventLogRecorder; import jakarta.annotation.Resource; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; -import org.springframework.transaction.annotation.Propagation; -import org.springframework.transaction.annotation.Transactional; import org.springframework.transaction.event.TransactionPhase; import org.springframework.transaction.event.TransactionalEventListener; @@ -72,12 +70,13 @@ public class OrderTransitionAuditListener { } /** - * 独立事务写入失败审计。 + * 写入"事务已回滚"的审计记录。 *

- * AFTER_ROLLBACK 触发时主事务已结束,此处即便用 REQUIRES_NEW 也不会遇到嵌套锁; - * 失败本身只是单行 insert,不再对其他表加锁。 + * 不加 @Transactional:AFTER_ROLLBACK 阶段主事务已彻底结束,当前线程无活跃事务; + * 且本方法由 onAfterRollback 自调用,Spring 代理不会拦截,加注解也是死注解。 + * 实际行为:eventLogRecorder.recordSync 的 insert 在 auto-commit 模式下单条提交, + * 失败只丢这一行审计、不影响主业务(主业务早已回滚并报错给调用方)。 */ - @Transactional(propagation = Propagation.REQUIRES_NEW, rollbackFor = Exception.class) public void writeRollbackAudit(OrderTransitionAttemptedEvent event) { try { eventLogRecorder.recordSync(toRecord(event, /*rolledBack=*/true)); diff --git a/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/queue/OrderQueueServiceEnhanced.java b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/queue/OrderQueueServiceEnhanced.java index 2401de09..216a37a2 100644 --- a/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/queue/OrderQueueServiceEnhanced.java +++ b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/queue/OrderQueueServiceEnhanced.java @@ -837,6 +837,21 @@ public class OrderQueueServiceEnhanced implements OrderQueueService { redisQueueService.batchEnqueue(queueDTOs); } + /** + * 在当前事务提交后触发一次等待队列重算。 + *

+ * 事务边界说明:本方法在 afterCommit 阶段(即外层事务已提交)自调用 + * {@link #rebuildWaitingTasksByUserId(Long, Long)},此时: + *

+ * 后果:rebuild 中途抛异常时 MySQL 可能半更新、Redis 可能部分写入, + * 不强一致但最终一致——下一次 enqueue 会再触发一次完整 rebuild 自愈。 + * 对“队列排序”这类可重放数据可以接受;若未来改为影响 MySQL 外表的写入, + * 需要把 rebuild 抽到独立 bean,用代理调用走新事务。 + */ private void triggerQueueRebuildAfterCommit(Long userId, Long fallbackAreaId) { Runnable rebuildAction = () -> { try { diff --git a/viewsh-module-ops/viewsh-module-ops-biz/src/test/java/com/viewsh/module/ops/core/dispatch/DispatchEngineIdleCheckTest.java b/viewsh-module-ops/viewsh-module-ops-biz/src/test/java/com/viewsh/module/ops/core/dispatch/DispatchEngineIdleCheckTest.java index ba2d36f9..4c04ac10 100644 --- a/viewsh-module-ops/viewsh-module-ops-biz/src/test/java/com/viewsh/module/ops/core/dispatch/DispatchEngineIdleCheckTest.java +++ b/viewsh-module-ops/viewsh-module-ops-biz/src/test/java/com/viewsh/module/ops/core/dispatch/DispatchEngineIdleCheckTest.java @@ -1,5 +1,6 @@ package com.viewsh.module.ops.core.dispatch; +import com.viewsh.module.ops.api.queue.OrderQueueDTO; import com.viewsh.module.ops.api.queue.OrderQueueService; import com.viewsh.module.ops.core.dispatch.model.AssigneeRecommendation; import com.viewsh.module.ops.core.dispatch.model.DispatchDecision; @@ -224,4 +225,40 @@ class DispatchEngineIdleCheckTest { return OrderTransitionResult.success(orderId, WorkOrderStatusEnum.PENDING, WorkOrderStatusEnum.QUEUED, queueId); } + + @Test + void autoDispatchNext_whenDispatchingFromQueue_shouldGoThroughDispatchNotTransition() { + // 锁死 P1 修复:从队列派发必须走 dispatch(),以继承 Bug #2 的 FOR UPDATE 串行化防线。 + // 如果未来有人改回 transition(),本测试会红:autoDispatchNext 绕过 FOR UPDATE 的漏洞就回来了。 + Long completedOrderId = 700L; + Long waitingOrderId = 701L; + Long queueId = 800L; + + when(orderMapper.selectActiveByAssignee(ASSIGNEE_ID, completedOrderId)).thenReturn(List.of()); + when(orderMapper.selectById(completedOrderId)).thenReturn(OpsOrderDO.builder() + .id(completedOrderId).areaId(501L).build()); + OrderQueueDTO waitingDTO = new OrderQueueDTO(); + waitingDTO.setId(queueId); + waitingDTO.setOpsOrderId(waitingOrderId); + waitingDTO.setQueueScore(1000.0); + waitingDTO.setFloorDiff(1); + waitingDTO.setWaitMinutes(2L); + when(orderQueueService.rebuildWaitingTasksByUserId(ASSIGNEE_ID, 501L)) + .thenReturn(List.of(waitingDTO)); + when(orderMapper.selectById(waitingOrderId)).thenReturn(OpsOrderDO.builder() + .id(waitingOrderId) + .status(WorkOrderStatusEnum.QUEUED.getStatus()) + .build()); + when(orderLifecycleManager.dispatch(any(OrderTransitionRequest.class))) + .thenReturn(OrderTransitionResult.success(waitingOrderId, + WorkOrderStatusEnum.QUEUED, WorkOrderStatusEnum.DISPATCHED)); + + DispatchResult result = dispatchEngine.autoDispatchNext(completedOrderId, ASSIGNEE_ID); + + assertTrue(result.isSuccess()); + assertEquals("已按队列总分派发下一单", result.getMessage()); + // 关键断言:必须调 dispatch()(带 FOR UPDATE)而不是 transition()(裸责任链) + verify(orderLifecycleManager).dispatch(any(OrderTransitionRequest.class)); + verify(orderLifecycleManager, never()).transition(any(OrderTransitionRequest.class)); + } }