From e4fa971fea2e9d5a023208ef5802e86c9287c717 Mon Sep 17 00:00:00 2001 From: lzh Date: Thu, 8 Jan 2026 15:12:39 +0800 Subject: [PATCH] =?UTF-8?q?chore:=20=E3=80=90ops=E3=80=91=E5=B7=A5?= =?UTF-8?q?=E5=8D=95=E9=98=9F=E5=88=97=E7=AE=A1=E7=90=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../ops/api/queue/OrderQueueService.java | 8 ++ ...hanced.java => OrderQueueServiceImpl.java} | 74 +++++++++++++------ 2 files changed, 58 insertions(+), 24 deletions(-) rename viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/queue/{OrderQueueServiceEnhanced.java => OrderQueueServiceImpl.java} (89%) diff --git a/viewsh-module-ops/viewsh-module-ops-api/src/main/java/com/viewsh/module/ops/api/queue/OrderQueueService.java b/viewsh-module-ops/viewsh-module-ops-api/src/main/java/com/viewsh/module/ops/api/queue/OrderQueueService.java index cd015e4..e5dfdcf 100644 --- a/viewsh-module-ops/viewsh-module-ops-api/src/main/java/com/viewsh/module/ops/api/queue/OrderQueueService.java +++ b/viewsh-module-ops/viewsh-module-ops-api/src/main/java/com/viewsh/module/ops/api/queue/OrderQueueService.java @@ -155,6 +155,14 @@ public interface OrderQueueService { */ List getWaitingTasksByUserId(Long userId); + /** + * 获取用户的暂停任务列表(PAUSED状态,按暂停时间排序) + * + * @param userId 用户ID + * @return 暂停的任务列表(已按暂停时间排序,最早暂停的排在前面) + */ + List getInterruptedTasksByUserId(Long userId); + /** * 查询队列记录 * diff --git a/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/queue/OrderQueueServiceEnhanced.java b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/queue/OrderQueueServiceImpl.java similarity index 89% rename from viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/queue/OrderQueueServiceEnhanced.java rename to viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/queue/OrderQueueServiceImpl.java index 404ae24..f2253e7 100644 --- a/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/queue/OrderQueueServiceEnhanced.java +++ b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/queue/OrderQueueServiceImpl.java @@ -9,7 +9,6 @@ import com.viewsh.module.ops.enums.PriorityEnum; import jakarta.annotation.Resource; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.BeanUtils; -import org.springframework.context.annotation.Primary; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; @@ -20,8 +19,8 @@ import java.util.concurrent.CompletableFuture; import java.util.stream.Collectors; /** - * 工单队列管理服务实现(Redis + MySQL 混合架构) - * + * 工单队列管理服务实现 + *

* 架构说明: * 1. 写入:先写 MySQL(持久化),再异步写 Redis(高性能) * 2. 读取:优先读 Redis,未命中则读 MySQL 并同步到 Redis @@ -32,7 +31,7 @@ import java.util.stream.Collectors; */ @Slf4j @Service -public class OrderQueueServiceEnhanced implements OrderQueueService { +public class OrderQueueServiceImpl implements OrderQueueService { /** * Score 计算公式:优先级分数 + 时间戳 @@ -212,8 +211,8 @@ public class OrderQueueServiceEnhanced implements OrderQueueService { return false; } - // 队列状态流转:WAITING → DISPATCHED - queueDO.setQueueStatus(OrderQueueStatusEnum.DISPATCHED.getStatus()); + // 队列状态流转:WAITING → PROCESSING + queueDO.setQueueStatus(OrderQueueStatusEnum.PROCESSING.getStatus()); queueDO.setDequeueTime(LocalDateTime.now()); queueDO.setEventMessage("派单成功,已分配给执行人员"); @@ -223,7 +222,7 @@ public class OrderQueueServiceEnhanced implements OrderQueueService { if (updated > 0) { CompletableFuture.runAsync(() -> { try { - redisQueueService.updateStatus(queueId, OrderQueueStatusEnum.DISPATCHED.getStatus()); + redisQueueService.updateStatus(queueId, OrderQueueStatusEnum.PROCESSING.getStatus()); } catch (Exception e) { log.error("Redis 更新状态失败: queueId={}", queueId, e); } @@ -245,13 +244,15 @@ public class OrderQueueServiceEnhanced implements OrderQueueService { return false; } - if (!OrderQueueStatusEnum.DISPATCHED.getStatus().equals(queueDO.getQueueStatus())) { - log.warn("只有DISPATCHED状态的任务可以暂停: queueId={}, status={}", - queueId, queueDO.getQueueStatus()); + // 允许 PROCESSING 状态的任务可以暂停 + String currentStatus = queueDO.getQueueStatus(); + if (!OrderQueueStatusEnum.PROCESSING.getStatus().equals(currentStatus)) { + log.warn("只有PROCESSING状态的任务可以暂停: queueId={}, status={}", + queueId, currentStatus); return false; } - // 队列状态流转:DISPATCHED → PAUSED + // 队列状态流转:PROCESSING → PAUSED queueDO.setQueueStatus(OrderQueueStatusEnum.PAUSED.getStatus()); // 计算暂停时长 @@ -292,14 +293,16 @@ public class OrderQueueServiceEnhanced implements OrderQueueService { return false; } - if (!OrderQueueStatusEnum.PAUSED.getStatus().equals(queueDO.getQueueStatus())) { + // 只有 PAUSED 状态的任务可以恢复 + String currentStatus = queueDO.getQueueStatus(); + if (!OrderQueueStatusEnum.PAUSED.getStatus().equals(currentStatus)) { log.warn("只有PAUSED状态的任务可以恢复: queueId={}, status={}", - queueId, queueDO.getQueueStatus()); + queueId, currentStatus); return false; } - // 队列状态流转:PAUSED → DISPATCHED - queueDO.setQueueStatus(OrderQueueStatusEnum.DISPATCHED.getStatus()); + // 队列状态流转:PAUSED → PROCESSING + queueDO.setQueueStatus(OrderQueueStatusEnum.PROCESSING.getStatus()); queueDO.setDequeueTime(LocalDateTime.now()); // 重置出队时间 queueDO.setEventMessage("任务已恢复执行"); @@ -309,7 +312,7 @@ public class OrderQueueServiceEnhanced implements OrderQueueService { if (updated > 0) { CompletableFuture.runAsync(() -> { try { - redisQueueService.updateStatus(queueId, OrderQueueStatusEnum.DISPATCHED.getStatus()); + redisQueueService.updateStatus(queueId, OrderQueueStatusEnum.PROCESSING.getStatus()); } catch (Exception e) { log.error("Redis 更新状态失败: queueId={}", queueId, e); } @@ -488,6 +491,25 @@ public class OrderQueueServiceEnhanced implements OrderQueueService { .collect(Collectors.toList()); } + @Override + public List getInterruptedTasksByUserId(Long userId) { + // 获取所有任务 + List allTasks = getTasksByUserId(userId); + + // 过滤出 PAUSED 状态的任务,并按中断时间排序 + return allTasks.stream() + .filter(task -> OrderQueueStatusEnum.PAUSED.getStatus().equals(task.getQueueStatus())) + .sorted((a, b) -> { + // 按中断时间排序(最早中断的排在前面) + // 如果没有中断时间字段,则按入队时间排序 + if (a.getEnqueueTime() != null && b.getEnqueueTime() != null) { + return a.getEnqueueTime().compareTo(b.getEnqueueTime()); + } + return 0; + }) + .collect(Collectors.toList()); + } + @Override public OrderQueueDTO getById(Long queueId) { // 1. 优先从 Redis Hash 获取 @@ -573,8 +595,8 @@ public class OrderQueueServiceEnhanced implements OrderQueueService { * 计算队列分数(用于排序) * 公式:优先级分数 + 时间戳 * - * @param priority 优先级(0=P0, 1=P1, 2=P2, 3=P3) - * @param enqueueTime 入队时间 + * @param priority 优先级(0=P0, 1=P1, 2=P2, 3=P3) + * @param enqueueTime 入队时间 * @return 队列分数 */ private double calculateQueueScore(Integer priority, LocalDateTime enqueueTime) { @@ -605,7 +627,8 @@ public class OrderQueueServiceEnhanced implements OrderQueueService { * 判断是否为终态 */ private boolean isTerminalStatus(String status) { - return "CANCELLED".equals(status) || "COMPLETED".equals(status) || "FAILED".equals(status); + return "CANCELLED".equals(status) || "COMPLETED".equals(status) + || "REMOVED".equals(status) || "FAILED".equals(status); } /** @@ -617,11 +640,14 @@ public class OrderQueueServiceEnhanced implements OrderQueueService { return false; } - // 允许的状态流转(简化版本,根据实际表结构) + // 允许的状态流转(支持新旧状态) return switch (oldStatus.toUpperCase()) { - case "WAITING" -> "DISPATCHED".equals(newStatus) || "CANCELLED".equals(newStatus); - case "DISPATCHED" -> "PAUSED".equals(newStatus) || "CANCELLED".equals(newStatus); - case "PAUSED" -> "DISPATCHED".equals(newStatus) || "CANCELLED".equals(newStatus); + case "PENDING" -> "WAITING".equals(newStatus) || "CANCELLED".equals(newStatus); + case "WAITING" -> "PROCESSING".equals(newStatus) || "DISPATCHED".equals(newStatus) + || "CANCELLED".equals(newStatus); + case "PROCESSING", "DISPATCHED" -> "PAUSED".equals(newStatus) + || "REMOVED".equals(newStatus) || "COMPLETED".equals(newStatus); + case "PAUSED" -> "PROCESSING".equals(newStatus) || "DISPATCHED".equals(newStatus); default -> false; }; } @@ -636,7 +662,7 @@ public class OrderQueueServiceEnhanced implements OrderQueueService { queueDO.setDequeueTime(LocalDateTime.now()); } } - case "CANCELLED", "COMPLETED" -> { + case "REMOVED", "COMPLETED" -> { // 可以记录完成时间(如果表有该字段) } }