fix(ops): review 复盘补齐 FOR UPDATE 覆盖面 + 清理注解/日志死角
今日 review 发现 Bug #2 的 FOR UPDATE 防线只装在 dispatch() 上,但同文件另有两条 路径绕过它: 1. P1 — DispatchEngineImpl.autoDispatchNext 调 transition() 派发队列下一单, 不走 FOR UPDATE。idle 校验和 transition 之间存在竞争窗口,能再次让同 assignee 挂两条 DISPATCHED。改调 dispatch(),天然继承串行化。 补测 autoDispatchNext_whenDispatchingFromQueue_shouldGoThroughDispatchNotTransition 锁定该不变量。 2. P2 — OrderLifecycleManagerImpl.resumeOrder/resumeInterruptedOrder 同样走 transition(),P0 恢复与并发派发竞争时可能产生两条 DISPATCHED。改为先 selectById 取 assigneeId,改调 dispatch() 让同一检查生效。 顺手清理 3 个误导: - DispatchEngineImpl.executePushAndEnqueue 原先忽略内部 dispatch 的返回值, 并发场景下会输出假的“已推送等待任务”日志误导运维,改为按 result.isSuccess() 分支打印。 - OrderTransitionAuditListener.writeRollbackAudit 的 @Transactional(REQUIRES_NEW) 是死注解(由 onAfterRollback 自调用,Spring 代理无法拦截;且 AFTER_ROLLBACK 本就无事务),移除并更新 Javadoc 说明实际行为。 - OrderQueueServiceEnhanced.triggerQueueRebuildAfterCommit 的自调用绕过 @Transactional 是设计意图(最终一致即可),补 Javadoc 解释事务边界, 避免后续误判为 bug。 测试:ops-biz 56 个相关用例全部通过,含新增的 P1 锁定测试。 Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -246,7 +246,9 @@ public class DispatchEngineImpl implements DispatchEngine {
|
||||
.reason("等待队列动态重排后自动派发")
|
||||
.build();
|
||||
|
||||
OrderTransitionResult result = orderLifecycleManager.transition(request);
|
||||
// 走 dispatch() 而不是 transition():dispatch 内部会先做 FOR UPDATE 不变量检查
|
||||
// (Bug #2 防线),避免 autoDispatchNext 在"从队列派发"这一类入口绕过串行化。
|
||||
OrderTransitionResult result = orderLifecycleManager.dispatch(request);
|
||||
|
||||
if (result.isSuccess()) {
|
||||
return DispatchResult.success("已按队列总分派发下一单", assigneeId);
|
||||
@@ -477,8 +479,15 @@ public class DispatchEngineImpl implements DispatchEngine {
|
||||
.reason("自动推送等待任务")
|
||||
.build();
|
||||
|
||||
orderLifecycleManager.dispatch(dispatchRequest);
|
||||
log.info("已推送等待任务: taskId={}", firstWaiting.getId());
|
||||
OrderTransitionResult pushResult = orderLifecycleManager.dispatch(dispatchRequest);
|
||||
if (pushResult.isSuccess()) {
|
||||
log.info("已推送等待任务: taskId={}", firstWaiting.getId());
|
||||
} else {
|
||||
// 可能被 dispatch() 里的 FOR UPDATE 拒绝:此处不中断新任务入队流程,
|
||||
// 但要把"推送失败"清晰落在日志里,避免 "已推送" 说谎误导运维排查。
|
||||
log.warn("推送等待任务失败,继续执行新任务入队: taskId={}, orderId={}, error={}",
|
||||
firstWaiting.getId(), firstWaiting.getOpsOrderId(), pushResult.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
// 新任务入队
|
||||
|
||||
@@ -230,17 +230,22 @@ public class OrderLifecycleManagerImpl implements OrderLifecycleManager {
|
||||
public void resumeOrder(Long orderId, Long operatorId) {
|
||||
log.info("开始恢复工单: orderId={}, operatorId={}", orderId, operatorId);
|
||||
|
||||
// 构建请求
|
||||
// 取出工单自身的 assigneeId 透传给 dispatch,使其 FOR UPDATE 不变量检查生效——
|
||||
// 否则 P0 恢复与并发派发竞争时可能再出现"同一 assignee 两条 DISPATCHED"。
|
||||
// assigneeId == null 的异常态(工单已卸人)下 dispatch 会跳过该检查,行为退化为原 transition。
|
||||
OpsOrderDO order = opsOrderMapper.selectById(orderId);
|
||||
Long assigneeId = order != null ? order.getAssigneeId() : null;
|
||||
|
||||
OrderTransitionRequest request = OrderTransitionRequest.builder()
|
||||
.orderId(orderId)
|
||||
.targetStatus(WorkOrderStatusEnum.DISPATCHED)
|
||||
.assigneeId(assigneeId)
|
||||
.operatorType(OperatorTypeEnum.CLEANER)
|
||||
.operatorId(operatorId)
|
||||
.reason("恢复工单")
|
||||
.build();
|
||||
|
||||
// 执行状态转换
|
||||
OrderTransitionResult result = transition(request);
|
||||
OrderTransitionResult result = dispatch(request);
|
||||
|
||||
if (!result.isSuccess()) {
|
||||
throw new IllegalStateException("恢复工单失败: " + result.getMessage());
|
||||
|
||||
@@ -11,8 +11,6 @@ import com.viewsh.module.ops.infrastructure.log.recorder.EventLogRecorder;
|
||||
import jakarta.annotation.Resource;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.transaction.annotation.Propagation;
|
||||
import org.springframework.transaction.annotation.Transactional;
|
||||
import org.springframework.transaction.event.TransactionPhase;
|
||||
import org.springframework.transaction.event.TransactionalEventListener;
|
||||
|
||||
@@ -72,12 +70,13 @@ public class OrderTransitionAuditListener {
|
||||
}
|
||||
|
||||
/**
|
||||
* 独立事务写入失败审计。
|
||||
* 写入"事务已回滚"的审计记录。
|
||||
* <p>
|
||||
* AFTER_ROLLBACK 触发时主事务已结束,此处即便用 REQUIRES_NEW 也不会遇到嵌套锁;
|
||||
* 失败本身只是单行 insert,不再对其他表加锁。
|
||||
* 不加 @Transactional:AFTER_ROLLBACK 阶段主事务已彻底结束,当前线程无活跃事务;
|
||||
* 且本方法由 onAfterRollback 自调用,Spring 代理不会拦截,加注解也是死注解。
|
||||
* 实际行为:eventLogRecorder.recordSync 的 insert 在 auto-commit 模式下单条提交,
|
||||
* 失败只丢这一行审计、不影响主业务(主业务早已回滚并报错给调用方)。
|
||||
*/
|
||||
@Transactional(propagation = Propagation.REQUIRES_NEW, rollbackFor = Exception.class)
|
||||
public void writeRollbackAudit(OrderTransitionAttemptedEvent event) {
|
||||
try {
|
||||
eventLogRecorder.recordSync(toRecord(event, /*rolledBack=*/true));
|
||||
|
||||
@@ -837,6 +837,21 @@ public class OrderQueueServiceEnhanced implements OrderQueueService {
|
||||
redisQueueService.batchEnqueue(queueDTOs);
|
||||
}
|
||||
|
||||
/**
|
||||
* 在当前事务提交后触发一次等待队列重算。
|
||||
* <p>
|
||||
* <b>事务边界说明</b>:本方法在 afterCommit 阶段(即外层事务已提交)自调用
|
||||
* {@link #rebuildWaitingTasksByUserId(Long, Long)},此时:
|
||||
* <ul>
|
||||
* <li>当前线程不在任何事务中(主事务刚提交完)</li>
|
||||
* <li>自调用绕过 Spring 代理,rebuild 方法上的 @Transactional 不生效</li>
|
||||
* <li>实际运行在 auto-commit 模式:每个 updateById 独立提交</li>
|
||||
* </ul>
|
||||
* <b>后果</b>:rebuild 中途抛异常时 MySQL 可能半更新、Redis 可能部分写入,
|
||||
* 不强一致但最终一致——下一次 enqueue 会再触发一次完整 rebuild 自愈。
|
||||
* 对“队列排序”这类可重放数据可以接受;若未来改为影响 MySQL 外表的写入,
|
||||
* 需要把 rebuild 抽到独立 bean,用代理调用走新事务。
|
||||
*/
|
||||
private void triggerQueueRebuildAfterCommit(Long userId, Long fallbackAreaId) {
|
||||
Runnable rebuildAction = () -> {
|
||||
try {
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
package com.viewsh.module.ops.core.dispatch;
|
||||
|
||||
import com.viewsh.module.ops.api.queue.OrderQueueDTO;
|
||||
import com.viewsh.module.ops.api.queue.OrderQueueService;
|
||||
import com.viewsh.module.ops.core.dispatch.model.AssigneeRecommendation;
|
||||
import com.viewsh.module.ops.core.dispatch.model.DispatchDecision;
|
||||
@@ -224,4 +225,40 @@ class DispatchEngineIdleCheckTest {
|
||||
return OrderTransitionResult.success(orderId,
|
||||
WorkOrderStatusEnum.PENDING, WorkOrderStatusEnum.QUEUED, queueId);
|
||||
}
|
||||
|
||||
@Test
|
||||
void autoDispatchNext_whenDispatchingFromQueue_shouldGoThroughDispatchNotTransition() {
|
||||
// 锁死 P1 修复:从队列派发必须走 dispatch(),以继承 Bug #2 的 FOR UPDATE 串行化防线。
|
||||
// 如果未来有人改回 transition(),本测试会红:autoDispatchNext 绕过 FOR UPDATE 的漏洞就回来了。
|
||||
Long completedOrderId = 700L;
|
||||
Long waitingOrderId = 701L;
|
||||
Long queueId = 800L;
|
||||
|
||||
when(orderMapper.selectActiveByAssignee(ASSIGNEE_ID, completedOrderId)).thenReturn(List.of());
|
||||
when(orderMapper.selectById(completedOrderId)).thenReturn(OpsOrderDO.builder()
|
||||
.id(completedOrderId).areaId(501L).build());
|
||||
OrderQueueDTO waitingDTO = new OrderQueueDTO();
|
||||
waitingDTO.setId(queueId);
|
||||
waitingDTO.setOpsOrderId(waitingOrderId);
|
||||
waitingDTO.setQueueScore(1000.0);
|
||||
waitingDTO.setFloorDiff(1);
|
||||
waitingDTO.setWaitMinutes(2L);
|
||||
when(orderQueueService.rebuildWaitingTasksByUserId(ASSIGNEE_ID, 501L))
|
||||
.thenReturn(List.of(waitingDTO));
|
||||
when(orderMapper.selectById(waitingOrderId)).thenReturn(OpsOrderDO.builder()
|
||||
.id(waitingOrderId)
|
||||
.status(WorkOrderStatusEnum.QUEUED.getStatus())
|
||||
.build());
|
||||
when(orderLifecycleManager.dispatch(any(OrderTransitionRequest.class)))
|
||||
.thenReturn(OrderTransitionResult.success(waitingOrderId,
|
||||
WorkOrderStatusEnum.QUEUED, WorkOrderStatusEnum.DISPATCHED));
|
||||
|
||||
DispatchResult result = dispatchEngine.autoDispatchNext(completedOrderId, ASSIGNEE_ID);
|
||||
|
||||
assertTrue(result.isSuccess());
|
||||
assertEquals("已按队列总分派发下一单", result.getMessage());
|
||||
// 关键断言:必须调 dispatch()(带 FOR UPDATE)而不是 transition()(裸责任链)
|
||||
verify(orderLifecycleManager).dispatch(any(OrderTransitionRequest.class));
|
||||
verify(orderLifecycleManager, never()).transition(any(OrderTransitionRequest.class));
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user