diff --git a/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/core/dispatch/DispatchEngineImpl.java b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/core/dispatch/DispatchEngineImpl.java index 1cf884d8..ba7d98cf 100644 --- a/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/core/dispatch/DispatchEngineImpl.java +++ b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/core/dispatch/DispatchEngineImpl.java @@ -362,6 +362,23 @@ public class DispatchEngineImpl implements DispatchEngine { Long orderId = context.getOrderId(); Long assigneeId = context.getRecommendedAssigneeId(); + // 兜底校验:调度策略基于 Redis 的设备状态判空闲,可能与 MySQL 的 ops_order 实际活跃态不一致 + // (例如设备 Redis 状态被某次 COMPLETED 清回 IDLE 但历史 CONFIRMED/DISPATCHED 单仍残留)。 + // 若分配路径会真正推送工单给设备(DIRECT_DISPATCH / PUSH_AND_ENQUEUE), + // 此处再查一次 MySQL,非空闲时强制降级到 ENQUEUE_ONLY,避免同一设备并行多单的状态错乱。 + if (assigneeId != null + && (decision.getPath() == DispatchPath.DIRECT_DISPATCH + || decision.getPath() == DispatchPath.PUSH_AND_ENQUEUE)) { + List activeOrders = orderMapper.selectActiveByAssignee(assigneeId, orderId); + if (!activeOrders.isEmpty()) { + OpsOrderDO head = activeOrders.get(0); + log.warn("调度决策为 {} 但执行人仍挂活跃工单,降级为仅入队: orderId={}, assigneeId={}, activeCount={}, sampleOrderId={}, sampleStatus={}", + decision.getPath(), orderId, assigneeId, + activeOrders.size(), head.getId(), head.getStatus()); + return executeEnqueueOnly(context, assigneeId); + } + } + switch (decision.getPath()) { case DIRECT_DISPATCH: return executeDirectDispatch(context, assigneeId); diff --git a/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/dal/mysql/queue/OpsOrderQueueMapper.java b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/dal/mysql/queue/OpsOrderQueueMapper.java index f6ed6f62..a25c207d 100644 --- a/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/dal/mysql/queue/OpsOrderQueueMapper.java +++ b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/dal/mysql/queue/OpsOrderQueueMapper.java @@ -54,7 +54,7 @@ public interface OpsOrderQueueMapper extends BaseMapperX { } /** - * 根据用户ID查询队列列表 + * 根据用户ID查询队列列表(含历史 REMOVED 记录,通常用于审计/统计) */ default List selectListByUserId(Long userId) { return selectList(new LambdaQueryWrapperX() @@ -62,6 +62,19 @@ public interface OpsOrderQueueMapper extends BaseMapperX { .orderByDesc(OpsOrderQueueDO::getEnqueueTime)); } + /** + * 根据用户ID查询活跃队列列表(仅 WAITING/PROCESSING/PAUSED,排除 REMOVED/已终态) + *

+ * 同步到 Redis、计算队列长度、查询当前任务等场景应走此方法,避免 + * 将历史 REMOVED 记录同步到 Redis 造成 ZSet / Hash 膨胀。 + */ + default List selectActiveListByUserId(Long userId) { + return selectList(new LambdaQueryWrapperX() + .eq(OpsOrderQueueDO::getUserId, userId) + .in(OpsOrderQueueDO::getQueueStatus, "WAITING", "PROCESSING", "PAUSED") + .orderByDesc(OpsOrderQueueDO::getEnqueueTime)); + } + /** * 根据用户ID和状态查询队列列表 * 用于强制从 MySQL 读取最新数据 diff --git a/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/queue/OrderQueueServiceEnhanced.java b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/queue/OrderQueueServiceEnhanced.java index aeb3fc2b..ef723a69 100644 --- a/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/queue/OrderQueueServiceEnhanced.java +++ b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/queue/OrderQueueServiceEnhanced.java @@ -467,7 +467,8 @@ public class OrderQueueServiceEnhanced implements OrderQueueService { } // 2. Redis 未命中,从 MySQL 获取并同步到 Redis - List mysqlList = orderQueueMapper.selectListByUserId(userId); + // 只同步活跃态(WAITING/PROCESSING/PAUSED),排除 REMOVED 历史记录,避免 Redis 膨胀 + List mysqlList = orderQueueMapper.selectActiveListByUserId(userId); if (mysqlList != null && !mysqlList.isEmpty()) { // 同步到 Redis List dtoList = convertToDTO(mysqlList); @@ -764,7 +765,8 @@ public class OrderQueueServiceEnhanced implements OrderQueueService { } private void syncUserQueueToRedis(Long userId, List rebuiltWaitingTasks) { - List queues = orderQueueMapper.selectListByUserId(userId); + // 只同步活跃态(WAITING/PROCESSING/PAUSED),避免把历史 REMOVED 记录回写 Redis ZSet/Hash + List queues = orderQueueMapper.selectActiveListByUserId(userId); if (queues == null || queues.isEmpty()) { redisQueueService.clearQueue(userId); return;