fix(ops): 直接派发加空闲兜底 + 队列同步按活跃状态过滤

1. 直接派发空闲兜底(补 autoDispatchNext 之外的另一条派发入口)
   DispatchEngineImpl.executeDispatch 在 DIRECT_DISPATCH/PUSH_AND_ENQUEUE
   前增加 MySQL 兜底校验:若执行人仍挂活跃工单(Redis 判空闲但 MySQL
   不一致的场景),强制降级为 ENQUEUE_ONLY 让任务进队列等待下一轮
   autoDispatchNext 接力。避免同一设备再次出现并行多单。

2. 队列同步按活跃状态过滤
   syncUserQueueToRedis / getTasksByUserId 的 MySQL 回填路径此前调用
   selectListByUserId 不过滤状态,会把历史 REMOVED 记录一并同步到
   Redis(线上观察到设备 31 的 Redis ZSet 塞了 206 条、其中 205 条是
   REMOVED)。新增 OpsOrderQueueMapper.selectActiveListByUserId,只返
   回 WAITING/PROCESSING/PAUSED,两条同步链路改走此方法。原 selectList
   ByUserId 保留给审计/统计场景。

未清理历史 REMOVED 记录,保留审计追溯。

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
lzh
2026-04-20 11:22:18 +08:00
parent 4d85659277
commit c24b1eb641
3 changed files with 35 additions and 3 deletions

View File

@@ -362,6 +362,23 @@ public class DispatchEngineImpl implements DispatchEngine {
Long orderId = context.getOrderId();
Long assigneeId = context.getRecommendedAssigneeId();
// 兜底校验:调度策略基于 Redis 的设备状态判空闲,可能与 MySQL 的 ops_order 实际活跃态不一致
// (例如设备 Redis 状态被某次 COMPLETED 清回 IDLE 但历史 CONFIRMED/DISPATCHED 单仍残留)。
// 若分配路径会真正推送工单给设备DIRECT_DISPATCH / PUSH_AND_ENQUEUE
// 此处再查一次 MySQL非空闲时强制降级到 ENQUEUE_ONLY避免同一设备并行多单的状态错乱。
if (assigneeId != null
&& (decision.getPath() == DispatchPath.DIRECT_DISPATCH
|| decision.getPath() == DispatchPath.PUSH_AND_ENQUEUE)) {
List<OpsOrderDO> activeOrders = orderMapper.selectActiveByAssignee(assigneeId, orderId);
if (!activeOrders.isEmpty()) {
OpsOrderDO head = activeOrders.get(0);
log.warn("调度决策为 {} 但执行人仍挂活跃工单,降级为仅入队: orderId={}, assigneeId={}, activeCount={}, sampleOrderId={}, sampleStatus={}",
decision.getPath(), orderId, assigneeId,
activeOrders.size(), head.getId(), head.getStatus());
return executeEnqueueOnly(context, assigneeId);
}
}
switch (decision.getPath()) {
case DIRECT_DISPATCH:
return executeDirectDispatch(context, assigneeId);

View File

@@ -54,7 +54,7 @@ public interface OpsOrderQueueMapper extends BaseMapperX<OpsOrderQueueDO> {
}
/**
* 根据用户ID查询队列列表
* 根据用户ID查询队列列表(含历史 REMOVED 记录,通常用于审计/统计)
*/
default List<OpsOrderQueueDO> selectListByUserId(Long userId) {
return selectList(new LambdaQueryWrapperX<OpsOrderQueueDO>()
@@ -62,6 +62,19 @@ public interface OpsOrderQueueMapper extends BaseMapperX<OpsOrderQueueDO> {
.orderByDesc(OpsOrderQueueDO::getEnqueueTime));
}
/**
* 根据用户ID查询活跃队列列表仅 WAITING/PROCESSING/PAUSED排除 REMOVED/已终态)
* <p>
* 同步到 Redis、计算队列长度、查询当前任务等场景应走此方法避免
* 将历史 REMOVED 记录同步到 Redis 造成 ZSet / Hash 膨胀。
*/
default List<OpsOrderQueueDO> selectActiveListByUserId(Long userId) {
return selectList(new LambdaQueryWrapperX<OpsOrderQueueDO>()
.eq(OpsOrderQueueDO::getUserId, userId)
.in(OpsOrderQueueDO::getQueueStatus, "WAITING", "PROCESSING", "PAUSED")
.orderByDesc(OpsOrderQueueDO::getEnqueueTime));
}
/**
* 根据用户ID和状态查询队列列表
* 用于强制从 MySQL 读取最新数据

View File

@@ -467,7 +467,8 @@ public class OrderQueueServiceEnhanced implements OrderQueueService {
}
// 2. Redis 未命中,从 MySQL 获取并同步到 Redis
List<OpsOrderQueueDO> mysqlList = orderQueueMapper.selectListByUserId(userId);
// 只同步活跃态WAITING/PROCESSING/PAUSED排除 REMOVED 历史记录,避免 Redis 膨胀
List<OpsOrderQueueDO> mysqlList = orderQueueMapper.selectActiveListByUserId(userId);
if (mysqlList != null && !mysqlList.isEmpty()) {
// 同步到 Redis
List<OrderQueueDTO> dtoList = convertToDTO(mysqlList);
@@ -764,7 +765,8 @@ public class OrderQueueServiceEnhanced implements OrderQueueService {
}
private void syncUserQueueToRedis(Long userId, List<OrderQueueDTO> rebuiltWaitingTasks) {
List<OpsOrderQueueDO> queues = orderQueueMapper.selectListByUserId(userId);
// 只同步活跃态WAITING/PROCESSING/PAUSED避免把历史 REMOVED 记录回写 Redis ZSet/Hash
List<OpsOrderQueueDO> queues = orderQueueMapper.selectActiveListByUserId(userId);
if (queues == null || queues.isEmpty()) {
redisQueueService.clearQueue(userId);
return;