From b73ef4f39f7bed5208fd5dce7c68a3da0d5b6ba5 Mon Sep 17 00:00:00 2001 From: lzh Date: Sat, 31 Jan 2026 19:05:26 +0800 Subject: [PATCH] =?UTF-8?q?fix(ops):=20=E8=BF=81=E7=A7=BBupdateStatus?= =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E5=88=B0Enhanced=E5=AE=9E=E7=8E=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit REMOVED 状态改为同步更新 Redis,避免自动派单查询到已完成任务 Co-Authored-By: Claude Opus 4.5 --- .../queue/OrderQueueServiceEnhanced.java | 113 ++++++++++-------- 1 file changed, 63 insertions(+), 50 deletions(-) diff --git a/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/queue/OrderQueueServiceEnhanced.java b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/queue/OrderQueueServiceEnhanced.java index 8e35335..d3bd080 100644 --- a/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/queue/OrderQueueServiceEnhanced.java +++ b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/queue/OrderQueueServiceEnhanced.java @@ -31,9 +31,9 @@ import java.util.stream.Collectors; * @author lzh */ @Slf4j -@Service -@Primary -public class OrderQueueServiceEnhanced implements OrderQueueService { +@Service +@Primary +public class OrderQueueServiceEnhanced implements OrderQueueService { /** * Score 计算公式:优先级分数 + 时间戳 @@ -179,17 +179,30 @@ public class OrderQueueServiceEnhanced implements OrderQueueService { int updated = orderQueueMapper.updateById(queueDO); - // 2. 异步更新 Redis + // 2. 更新 Redis if (updated > 0) { OrderQueueDTO dto = convertToDTO(queueDO); - CompletableFuture.runAsync(() -> { + + // REMOVED 状态需要同步更新,确保任务完成后立即从 Redis 队列移除 + // 避免自动派单下一个时查询到已完成的任务 + if (OrderQueueStatusEnum.REMOVED == newStatus) { try { redisQueueService.updateStatus(queueId, newStatus.getStatus()); - log.debug("Redis 更新状态成功: queueId={}, status={}", queueId, newStatus.getStatus()); + log.debug("Redis 同步更新状态成功: queueId={}, status={}", queueId, newStatus.getStatus()); } catch (Exception e) { - log.error("Redis 更新状态失败: queueId={}", queueId, e); + log.error("Redis 同步更新状态失败: queueId={}", queueId, e); } - }); + } else { + // 其他状态异步更新 + CompletableFuture.runAsync(() -> { + try { + redisQueueService.updateStatus(queueId, newStatus.getStatus()); + log.debug("Redis 异步更新状态成功: queueId={}, status={}", queueId, newStatus.getStatus()); + } catch (Exception e) { + log.error("Redis 异步更新状态失败: queueId={}", queueId, e); + } + }); + } } log.info("队列状态已更新: queueId={}, opsOrderId={}, oldStatus={}, newStatus={}", @@ -464,48 +477,48 @@ public class OrderQueueServiceEnhanced implements OrderQueueService { return Collections.emptyList(); } - @Override - public List getWaitingTasksByUserId(Long userId) { - // 获取所有任务 - List allTasks = getTasksByUserId(userId); - - // 过滤出 WAITING 状态的任务,并按队列分数排序 - return filterAndSortWaitingTasks(allTasks); - } - - @Override - public List getWaitingTasksByUserIdFromDb(Long userId) { - // 直接从 MySQL 获取所有任务 - List mysqlList = orderQueueMapper.selectListByUserId(userId); - List allTasks = convertToDTO(mysqlList); - - // 过滤出 WAITING 状态的任务,并按队列分数排序 - return filterAndSortWaitingTasks(allTasks); - } - - private List filterAndSortWaitingTasks(List allTasks) { - return allTasks.stream() - .filter(task -> OrderQueueStatusEnum.WAITING.getStatus().equals(task.getQueueStatus())) - .sorted((a, b) -> { - // 优先使用队列分数 score 排序(已在入队时计算好) - if (a.getQueueScore() != null && b.getQueueScore() != null) { - return Double.compare(a.getQueueScore(), b.getQueueScore()); - } - - // 兜底:如果 score 为空,则按优先级+时间排序 - // 按优先级排序(P0 > P1 > P2 > P3) - int priorityCompare = Integer.compare( - b.getPriority() != null ? b.getPriority() : 999, - a.getPriority() != null ? a.getPriority() : 999 - ); - if (priorityCompare != 0) { - return priorityCompare; - } - // 优先级相同,按入队时间排序 - return a.getEnqueueTime().compareTo(b.getEnqueueTime()); - }) - .collect(Collectors.toList()); - } + @Override + public List getWaitingTasksByUserId(Long userId) { + // 获取所有任务 + List allTasks = getTasksByUserId(userId); + + // 过滤出 WAITING 状态的任务,并按队列分数排序 + return filterAndSortWaitingTasks(allTasks); + } + + @Override + public List getWaitingTasksByUserIdFromDb(Long userId) { + // 直接从 MySQL 获取所有任务 + List mysqlList = orderQueueMapper.selectListByUserId(userId); + List allTasks = convertToDTO(mysqlList); + + // 过滤出 WAITING 状态的任务,并按队列分数排序 + return filterAndSortWaitingTasks(allTasks); + } + + private List filterAndSortWaitingTasks(List allTasks) { + return allTasks.stream() + .filter(task -> OrderQueueStatusEnum.WAITING.getStatus().equals(task.getQueueStatus())) + .sorted((a, b) -> { + // 优先使用队列分数 score 排序(已在入队时计算好) + if (a.getQueueScore() != null && b.getQueueScore() != null) { + return Double.compare(a.getQueueScore(), b.getQueueScore()); + } + + // 兜底:如果 score 为空,则按优先级+时间排序 + // 按优先级排序(P0 > P1 > P2 > P3) + int priorityCompare = Integer.compare( + b.getPriority() != null ? b.getPriority() : 999, + a.getPriority() != null ? a.getPriority() : 999 + ); + if (priorityCompare != 0) { + return priorityCompare; + } + // 优先级相同,按入队时间排序 + return a.getEnqueueTime().compareTo(b.getEnqueueTime()); + }) + .collect(Collectors.toList()); + } @Override public List getInterruptedTasksByUserId(Long userId) {