From c24b1eb64137eda5907738cb6a59ef37627a18d3 Mon Sep 17 00:00:00 2001 From: lzh Date: Mon, 20 Apr 2026 11:22:18 +0800 Subject: [PATCH] =?UTF-8?q?fix(ops):=20=E7=9B=B4=E6=8E=A5=E6=B4=BE?= =?UTF-8?q?=E5=8F=91=E5=8A=A0=E7=A9=BA=E9=97=B2=E5=85=9C=E5=BA=95=20+=20?= =?UTF-8?q?=E9=98=9F=E5=88=97=E5=90=8C=E6=AD=A5=E6=8C=89=E6=B4=BB=E8=B7=83?= =?UTF-8?q?=E7=8A=B6=E6=80=81=E8=BF=87=E6=BB=A4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 1. 直接派发空闲兜底(补 autoDispatchNext 之外的另一条派发入口) DispatchEngineImpl.executeDispatch 在 DIRECT_DISPATCH/PUSH_AND_ENQUEUE 前增加 MySQL 兜底校验:若执行人仍挂活跃工单(Redis 判空闲但 MySQL 不一致的场景),强制降级为 ENQUEUE_ONLY 让任务进队列等待下一轮 autoDispatchNext 接力。避免同一设备再次出现并行多单。 2. 队列同步按活跃状态过滤 syncUserQueueToRedis / getTasksByUserId 的 MySQL 回填路径此前调用 selectListByUserId 不过滤状态,会把历史 REMOVED 记录一并同步到 Redis(线上观察到设备 31 的 Redis ZSet 塞了 206 条、其中 205 条是 REMOVED)。新增 OpsOrderQueueMapper.selectActiveListByUserId,只返 回 WAITING/PROCESSING/PAUSED,两条同步链路改走此方法。原 selectList ByUserId 保留给审计/统计场景。 未清理历史 REMOVED 记录,保留审计追溯。 Co-Authored-By: Claude Opus 4.7 (1M context) --- .../ops/core/dispatch/DispatchEngineImpl.java | 17 +++++++++++++++++ .../dal/mysql/queue/OpsOrderQueueMapper.java | 15 ++++++++++++++- .../queue/OrderQueueServiceEnhanced.java | 6 ++++-- 3 files changed, 35 insertions(+), 3 deletions(-) diff --git a/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/core/dispatch/DispatchEngineImpl.java b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/core/dispatch/DispatchEngineImpl.java index 1cf884d8..ba7d98cf 100644 --- a/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/core/dispatch/DispatchEngineImpl.java +++ b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/core/dispatch/DispatchEngineImpl.java @@ -362,6 +362,23 @@ public class DispatchEngineImpl implements DispatchEngine { Long orderId = context.getOrderId(); Long assigneeId = context.getRecommendedAssigneeId(); + // 兜底校验:调度策略基于 Redis 的设备状态判空闲,可能与 MySQL 的 ops_order 实际活跃态不一致 + // (例如设备 Redis 状态被某次 COMPLETED 清回 IDLE 但历史 CONFIRMED/DISPATCHED 单仍残留)。 + // 若分配路径会真正推送工单给设备(DIRECT_DISPATCH / PUSH_AND_ENQUEUE), + // 此处再查一次 MySQL,非空闲时强制降级到 ENQUEUE_ONLY,避免同一设备并行多单的状态错乱。 + if (assigneeId != null + && (decision.getPath() == DispatchPath.DIRECT_DISPATCH + || decision.getPath() == DispatchPath.PUSH_AND_ENQUEUE)) { + List activeOrders = orderMapper.selectActiveByAssignee(assigneeId, orderId); + if (!activeOrders.isEmpty()) { + OpsOrderDO head = activeOrders.get(0); + log.warn("调度决策为 {} 但执行人仍挂活跃工单,降级为仅入队: orderId={}, assigneeId={}, activeCount={}, sampleOrderId={}, sampleStatus={}", + decision.getPath(), orderId, assigneeId, + activeOrders.size(), head.getId(), head.getStatus()); + return executeEnqueueOnly(context, assigneeId); + } + } + switch (decision.getPath()) { case DIRECT_DISPATCH: return executeDirectDispatch(context, assigneeId); diff --git a/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/dal/mysql/queue/OpsOrderQueueMapper.java b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/dal/mysql/queue/OpsOrderQueueMapper.java index f6ed6f62..a25c207d 100644 --- a/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/dal/mysql/queue/OpsOrderQueueMapper.java +++ b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/dal/mysql/queue/OpsOrderQueueMapper.java @@ -54,7 +54,7 @@ public interface OpsOrderQueueMapper extends BaseMapperX { } /** - * 根据用户ID查询队列列表 + * 根据用户ID查询队列列表(含历史 REMOVED 记录,通常用于审计/统计) */ default List selectListByUserId(Long userId) { return selectList(new LambdaQueryWrapperX() @@ -62,6 +62,19 @@ public interface OpsOrderQueueMapper extends BaseMapperX { .orderByDesc(OpsOrderQueueDO::getEnqueueTime)); } + /** + * 根据用户ID查询活跃队列列表(仅 WAITING/PROCESSING/PAUSED,排除 REMOVED/已终态) + *

+ * 同步到 Redis、计算队列长度、查询当前任务等场景应走此方法,避免 + * 将历史 REMOVED 记录同步到 Redis 造成 ZSet / Hash 膨胀。 + */ + default List selectActiveListByUserId(Long userId) { + return selectList(new LambdaQueryWrapperX() + .eq(OpsOrderQueueDO::getUserId, userId) + .in(OpsOrderQueueDO::getQueueStatus, "WAITING", "PROCESSING", "PAUSED") + .orderByDesc(OpsOrderQueueDO::getEnqueueTime)); + } + /** * 根据用户ID和状态查询队列列表 * 用于强制从 MySQL 读取最新数据 diff --git a/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/queue/OrderQueueServiceEnhanced.java b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/queue/OrderQueueServiceEnhanced.java index aeb3fc2b..ef723a69 100644 --- a/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/queue/OrderQueueServiceEnhanced.java +++ b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/queue/OrderQueueServiceEnhanced.java @@ -467,7 +467,8 @@ public class OrderQueueServiceEnhanced implements OrderQueueService { } // 2. Redis 未命中,从 MySQL 获取并同步到 Redis - List mysqlList = orderQueueMapper.selectListByUserId(userId); + // 只同步活跃态(WAITING/PROCESSING/PAUSED),排除 REMOVED 历史记录,避免 Redis 膨胀 + List mysqlList = orderQueueMapper.selectActiveListByUserId(userId); if (mysqlList != null && !mysqlList.isEmpty()) { // 同步到 Redis List dtoList = convertToDTO(mysqlList); @@ -764,7 +765,8 @@ public class OrderQueueServiceEnhanced implements OrderQueueService { } private void syncUserQueueToRedis(Long userId, List rebuiltWaitingTasks) { - List queues = orderQueueMapper.selectListByUserId(userId); + // 只同步活跃态(WAITING/PROCESSING/PAUSED),避免把历史 REMOVED 记录回写 Redis ZSet/Hash + List queues = orderQueueMapper.selectActiveListByUserId(userId); if (queues == null || queues.isEmpty()) { redisQueueService.clearQueue(userId); return;