diff --git a/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/core/dispatch/DispatchEngineImpl.java b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/core/dispatch/DispatchEngineImpl.java index e233e02c..b65f4582 100644 --- a/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/core/dispatch/DispatchEngineImpl.java +++ b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/core/dispatch/DispatchEngineImpl.java @@ -246,7 +246,9 @@ public class DispatchEngineImpl implements DispatchEngine { .reason("等待队列动态重排后自动派发") .build(); - OrderTransitionResult result = orderLifecycleManager.transition(request); + // 走 dispatch() 而不是 transition():dispatch 内部会先做 FOR UPDATE 不变量检查 + // (Bug #2 防线),避免 autoDispatchNext 在"从队列派发"这一类入口绕过串行化。 + OrderTransitionResult result = orderLifecycleManager.dispatch(request); if (result.isSuccess()) { return DispatchResult.success("已按队列总分派发下一单", assigneeId); @@ -477,8 +479,15 @@ public class DispatchEngineImpl implements DispatchEngine { .reason("自动推送等待任务") .build(); - orderLifecycleManager.dispatch(dispatchRequest); - log.info("已推送等待任务: taskId={}", firstWaiting.getId()); + OrderTransitionResult pushResult = orderLifecycleManager.dispatch(dispatchRequest); + if (pushResult.isSuccess()) { + log.info("已推送等待任务: taskId={}", firstWaiting.getId()); + } else { + // 可能被 dispatch() 里的 FOR UPDATE 拒绝:此处不中断新任务入队流程, + // 但要把"推送失败"清晰落在日志里,避免 "已推送" 说谎误导运维排查。 + log.warn("推送等待任务失败,继续执行新任务入队: taskId={}, orderId={}, error={}", + firstWaiting.getId(), firstWaiting.getOpsOrderId(), pushResult.getMessage()); + } } // 新任务入队 diff --git a/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/core/lifecycle/OrderLifecycleManagerImpl.java b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/core/lifecycle/OrderLifecycleManagerImpl.java index e9dce59b..4770afa7 100644 --- a/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/core/lifecycle/OrderLifecycleManagerImpl.java +++ b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/core/lifecycle/OrderLifecycleManagerImpl.java @@ -230,17 +230,22 @@ public class OrderLifecycleManagerImpl implements OrderLifecycleManager { public void resumeOrder(Long orderId, Long operatorId) { log.info("开始恢复工单: orderId={}, operatorId={}", orderId, operatorId); - // 构建请求 + // 取出工单自身的 assigneeId 透传给 dispatch,使其 FOR UPDATE 不变量检查生效—— + // 否则 P0 恢复与并发派发竞争时可能再出现"同一 assignee 两条 DISPATCHED"。 + // assigneeId == null 的异常态(工单已卸人)下 dispatch 会跳过该检查,行为退化为原 transition。 + OpsOrderDO order = opsOrderMapper.selectById(orderId); + Long assigneeId = order != null ? order.getAssigneeId() : null; + OrderTransitionRequest request = OrderTransitionRequest.builder() .orderId(orderId) .targetStatus(WorkOrderStatusEnum.DISPATCHED) + .assigneeId(assigneeId) .operatorType(OperatorTypeEnum.CLEANER) .operatorId(operatorId) .reason("恢复工单") .build(); - // 执行状态转换 - OrderTransitionResult result = transition(request); + OrderTransitionResult result = dispatch(request); if (!result.isSuccess()) { throw new IllegalStateException("恢复工单失败: " + result.getMessage()); diff --git a/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/core/lifecycle/audit/OrderTransitionAuditListener.java b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/core/lifecycle/audit/OrderTransitionAuditListener.java index 4d6d13ca..bb10af99 100644 --- a/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/core/lifecycle/audit/OrderTransitionAuditListener.java +++ b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/core/lifecycle/audit/OrderTransitionAuditListener.java @@ -11,8 +11,6 @@ import com.viewsh.module.ops.infrastructure.log.recorder.EventLogRecorder; import jakarta.annotation.Resource; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; -import org.springframework.transaction.annotation.Propagation; -import org.springframework.transaction.annotation.Transactional; import org.springframework.transaction.event.TransactionPhase; import org.springframework.transaction.event.TransactionalEventListener; @@ -72,12 +70,13 @@ public class OrderTransitionAuditListener { } /** - * 独立事务写入失败审计。 + * 写入"事务已回滚"的审计记录。 *

- * AFTER_ROLLBACK 触发时主事务已结束,此处即便用 REQUIRES_NEW 也不会遇到嵌套锁; - * 失败本身只是单行 insert,不再对其他表加锁。 + * 不加 @Transactional:AFTER_ROLLBACK 阶段主事务已彻底结束,当前线程无活跃事务; + * 且本方法由 onAfterRollback 自调用,Spring 代理不会拦截,加注解也是死注解。 + * 实际行为:eventLogRecorder.recordSync 的 insert 在 auto-commit 模式下单条提交, + * 失败只丢这一行审计、不影响主业务(主业务早已回滚并报错给调用方)。 */ - @Transactional(propagation = Propagation.REQUIRES_NEW, rollbackFor = Exception.class) public void writeRollbackAudit(OrderTransitionAttemptedEvent event) { try { eventLogRecorder.recordSync(toRecord(event, /*rolledBack=*/true)); diff --git a/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/queue/OrderQueueServiceEnhanced.java b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/queue/OrderQueueServiceEnhanced.java index 2401de09..216a37a2 100644 --- a/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/queue/OrderQueueServiceEnhanced.java +++ b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/queue/OrderQueueServiceEnhanced.java @@ -837,6 +837,21 @@ public class OrderQueueServiceEnhanced implements OrderQueueService { redisQueueService.batchEnqueue(queueDTOs); } + /** + * 在当前事务提交后触发一次等待队列重算。 + *

+ * 事务边界说明:本方法在 afterCommit 阶段(即外层事务已提交)自调用 + * {@link #rebuildWaitingTasksByUserId(Long, Long)},此时: + *

+ * 后果:rebuild 中途抛异常时 MySQL 可能半更新、Redis 可能部分写入, + * 不强一致但最终一致——下一次 enqueue 会再触发一次完整 rebuild 自愈。 + * 对“队列排序”这类可重放数据可以接受;若未来改为影响 MySQL 外表的写入, + * 需要把 rebuild 抽到独立 bean,用代理调用走新事务。 + */ private void triggerQueueRebuildAfterCommit(Long userId, Long fallbackAreaId) { Runnable rebuildAction = () -> { try { diff --git a/viewsh-module-ops/viewsh-module-ops-biz/src/test/java/com/viewsh/module/ops/core/dispatch/DispatchEngineIdleCheckTest.java b/viewsh-module-ops/viewsh-module-ops-biz/src/test/java/com/viewsh/module/ops/core/dispatch/DispatchEngineIdleCheckTest.java index ba2d36f9..4c04ac10 100644 --- a/viewsh-module-ops/viewsh-module-ops-biz/src/test/java/com/viewsh/module/ops/core/dispatch/DispatchEngineIdleCheckTest.java +++ b/viewsh-module-ops/viewsh-module-ops-biz/src/test/java/com/viewsh/module/ops/core/dispatch/DispatchEngineIdleCheckTest.java @@ -1,5 +1,6 @@ package com.viewsh.module.ops.core.dispatch; +import com.viewsh.module.ops.api.queue.OrderQueueDTO; import com.viewsh.module.ops.api.queue.OrderQueueService; import com.viewsh.module.ops.core.dispatch.model.AssigneeRecommendation; import com.viewsh.module.ops.core.dispatch.model.DispatchDecision; @@ -224,4 +225,40 @@ class DispatchEngineIdleCheckTest { return OrderTransitionResult.success(orderId, WorkOrderStatusEnum.PENDING, WorkOrderStatusEnum.QUEUED, queueId); } + + @Test + void autoDispatchNext_whenDispatchingFromQueue_shouldGoThroughDispatchNotTransition() { + // 锁死 P1 修复:从队列派发必须走 dispatch(),以继承 Bug #2 的 FOR UPDATE 串行化防线。 + // 如果未来有人改回 transition(),本测试会红:autoDispatchNext 绕过 FOR UPDATE 的漏洞就回来了。 + Long completedOrderId = 700L; + Long waitingOrderId = 701L; + Long queueId = 800L; + + when(orderMapper.selectActiveByAssignee(ASSIGNEE_ID, completedOrderId)).thenReturn(List.of()); + when(orderMapper.selectById(completedOrderId)).thenReturn(OpsOrderDO.builder() + .id(completedOrderId).areaId(501L).build()); + OrderQueueDTO waitingDTO = new OrderQueueDTO(); + waitingDTO.setId(queueId); + waitingDTO.setOpsOrderId(waitingOrderId); + waitingDTO.setQueueScore(1000.0); + waitingDTO.setFloorDiff(1); + waitingDTO.setWaitMinutes(2L); + when(orderQueueService.rebuildWaitingTasksByUserId(ASSIGNEE_ID, 501L)) + .thenReturn(List.of(waitingDTO)); + when(orderMapper.selectById(waitingOrderId)).thenReturn(OpsOrderDO.builder() + .id(waitingOrderId) + .status(WorkOrderStatusEnum.QUEUED.getStatus()) + .build()); + when(orderLifecycleManager.dispatch(any(OrderTransitionRequest.class))) + .thenReturn(OrderTransitionResult.success(waitingOrderId, + WorkOrderStatusEnum.QUEUED, WorkOrderStatusEnum.DISPATCHED)); + + DispatchResult result = dispatchEngine.autoDispatchNext(completedOrderId, ASSIGNEE_ID); + + assertTrue(result.isSuccess()); + assertEquals("已按队列总分派发下一单", result.getMessage()); + // 关键断言:必须调 dispatch()(带 FOR UPDATE)而不是 transition()(裸责任链) + verify(orderLifecycleManager).dispatch(any(OrderTransitionRequest.class)); + verify(orderLifecycleManager, never()).transition(any(OrderTransitionRequest.class)); + } }