chore: 【ops】工单队列管理

This commit is contained in:
lzh
2026-01-08 15:12:39 +08:00
parent 2a4ca53249
commit e4fa971fea
2 changed files with 58 additions and 24 deletions

View File

@@ -155,6 +155,14 @@ public interface OrderQueueService {
*/ */
List<OrderQueueDTO> getWaitingTasksByUserId(Long userId); List<OrderQueueDTO> getWaitingTasksByUserId(Long userId);
/**
* 获取用户的暂停任务列表PAUSED状态按暂停时间排序
*
* @param userId 用户ID
* @return 暂停的任务列表(已按暂停时间排序,最早暂停的排在前面)
*/
List<OrderQueueDTO> getInterruptedTasksByUserId(Long userId);
/** /**
* 查询队列记录 * 查询队列记录
* *

View File

@@ -9,7 +9,6 @@ import com.viewsh.module.ops.enums.PriorityEnum;
import jakarta.annotation.Resource; import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeanUtils; import org.springframework.beans.BeanUtils;
import org.springframework.context.annotation.Primary;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional; import org.springframework.transaction.annotation.Transactional;
@@ -20,8 +19,8 @@ import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors; import java.util.stream.Collectors;
/** /**
* 工单队列管理服务实现Redis + MySQL 混合架构 * 工单队列管理服务实现
* * <p>
* 架构说明 * 架构说明
* 1. 写入先写 MySQL持久化再异步写 Redis高性能 * 1. 写入先写 MySQL持久化再异步写 Redis高性能
* 2. 读取优先读 Redis未命中则读 MySQL 并同步到 Redis * 2. 读取优先读 Redis未命中则读 MySQL 并同步到 Redis
@@ -32,7 +31,7 @@ import java.util.stream.Collectors;
*/ */
@Slf4j @Slf4j
@Service @Service
public class OrderQueueServiceEnhanced implements OrderQueueService { public class OrderQueueServiceImpl implements OrderQueueService {
/** /**
* Score 计算公式优先级分数 + 时间戳 * Score 计算公式优先级分数 + 时间戳
@@ -212,8 +211,8 @@ public class OrderQueueServiceEnhanced implements OrderQueueService {
return false; return false;
} }
// 队列状态流转WAITING DISPATCHED // 队列状态流转WAITING PROCESSING
queueDO.setQueueStatus(OrderQueueStatusEnum.DISPATCHED.getStatus()); queueDO.setQueueStatus(OrderQueueStatusEnum.PROCESSING.getStatus());
queueDO.setDequeueTime(LocalDateTime.now()); queueDO.setDequeueTime(LocalDateTime.now());
queueDO.setEventMessage("派单成功,已分配给执行人员"); queueDO.setEventMessage("派单成功,已分配给执行人员");
@@ -223,7 +222,7 @@ public class OrderQueueServiceEnhanced implements OrderQueueService {
if (updated > 0) { if (updated > 0) {
CompletableFuture.runAsync(() -> { CompletableFuture.runAsync(() -> {
try { try {
redisQueueService.updateStatus(queueId, OrderQueueStatusEnum.DISPATCHED.getStatus()); redisQueueService.updateStatus(queueId, OrderQueueStatusEnum.PROCESSING.getStatus());
} catch (Exception e) { } catch (Exception e) {
log.error("Redis 更新状态失败: queueId={}", queueId, e); log.error("Redis 更新状态失败: queueId={}", queueId, e);
} }
@@ -245,13 +244,15 @@ public class OrderQueueServiceEnhanced implements OrderQueueService {
return false; return false;
} }
if (!OrderQueueStatusEnum.DISPATCHED.getStatus().equals(queueDO.getQueueStatus())) { // 允许 PROCESSING 状态的任务可以暂停
log.warn("只有DISPATCHED状态的任务可以暂停: queueId={}, status={}", String currentStatus = queueDO.getQueueStatus();
queueId, queueDO.getQueueStatus()); if (!OrderQueueStatusEnum.PROCESSING.getStatus().equals(currentStatus)) {
log.warn("只有PROCESSING状态的任务可以暂停: queueId={}, status={}",
queueId, currentStatus);
return false; return false;
} }
// 队列状态流转DISPATCHED PAUSED // 队列状态流转PROCESSING PAUSED
queueDO.setQueueStatus(OrderQueueStatusEnum.PAUSED.getStatus()); queueDO.setQueueStatus(OrderQueueStatusEnum.PAUSED.getStatus());
// 计算暂停时长 // 计算暂停时长
@@ -292,14 +293,16 @@ public class OrderQueueServiceEnhanced implements OrderQueueService {
return false; return false;
} }
if (!OrderQueueStatusEnum.PAUSED.getStatus().equals(queueDO.getQueueStatus())) { // 只有 PAUSED 状态的任务可以恢复
String currentStatus = queueDO.getQueueStatus();
if (!OrderQueueStatusEnum.PAUSED.getStatus().equals(currentStatus)) {
log.warn("只有PAUSED状态的任务可以恢复: queueId={}, status={}", log.warn("只有PAUSED状态的任务可以恢复: queueId={}, status={}",
queueId, queueDO.getQueueStatus()); queueId, currentStatus);
return false; return false;
} }
// 队列状态流转PAUSED DISPATCHED // 队列状态流转PAUSED PROCESSING
queueDO.setQueueStatus(OrderQueueStatusEnum.DISPATCHED.getStatus()); queueDO.setQueueStatus(OrderQueueStatusEnum.PROCESSING.getStatus());
queueDO.setDequeueTime(LocalDateTime.now()); // 重置出队时间 queueDO.setDequeueTime(LocalDateTime.now()); // 重置出队时间
queueDO.setEventMessage("任务已恢复执行"); queueDO.setEventMessage("任务已恢复执行");
@@ -309,7 +312,7 @@ public class OrderQueueServiceEnhanced implements OrderQueueService {
if (updated > 0) { if (updated > 0) {
CompletableFuture.runAsync(() -> { CompletableFuture.runAsync(() -> {
try { try {
redisQueueService.updateStatus(queueId, OrderQueueStatusEnum.DISPATCHED.getStatus()); redisQueueService.updateStatus(queueId, OrderQueueStatusEnum.PROCESSING.getStatus());
} catch (Exception e) { } catch (Exception e) {
log.error("Redis 更新状态失败: queueId={}", queueId, e); log.error("Redis 更新状态失败: queueId={}", queueId, e);
} }
@@ -488,6 +491,25 @@ public class OrderQueueServiceEnhanced implements OrderQueueService {
.collect(Collectors.toList()); .collect(Collectors.toList());
} }
@Override
public List<OrderQueueDTO> getInterruptedTasksByUserId(Long userId) {
// 获取所有任务
List<OrderQueueDTO> allTasks = getTasksByUserId(userId);
// 过滤出 PAUSED 状态的任务并按中断时间排序
return allTasks.stream()
.filter(task -> OrderQueueStatusEnum.PAUSED.getStatus().equals(task.getQueueStatus()))
.sorted((a, b) -> {
// 按中断时间排序最早中断的排在前面
// 如果没有中断时间字段则按入队时间排序
if (a.getEnqueueTime() != null && b.getEnqueueTime() != null) {
return a.getEnqueueTime().compareTo(b.getEnqueueTime());
}
return 0;
})
.collect(Collectors.toList());
}
@Override @Override
public OrderQueueDTO getById(Long queueId) { public OrderQueueDTO getById(Long queueId) {
// 1. 优先从 Redis Hash 获取 // 1. 优先从 Redis Hash 获取
@@ -573,8 +595,8 @@ public class OrderQueueServiceEnhanced implements OrderQueueService {
* 计算队列分数用于排序 * 计算队列分数用于排序
* 公式优先级分数 + 时间戳 * 公式优先级分数 + 时间戳
* *
* @param priority 优先级0=P0, 1=P1, 2=P2, 3=P3 * @param priority 优先级0=P0, 1=P1, 2=P2, 3=P3
* @param enqueueTime 入队时间 * @param enqueueTime 入队时间
* @return 队列分数 * @return 队列分数
*/ */
private double calculateQueueScore(Integer priority, LocalDateTime enqueueTime) { private double calculateQueueScore(Integer priority, LocalDateTime enqueueTime) {
@@ -605,7 +627,8 @@ public class OrderQueueServiceEnhanced implements OrderQueueService {
* 判断是否为终态 * 判断是否为终态
*/ */
private boolean isTerminalStatus(String status) { private boolean isTerminalStatus(String status) {
return "CANCELLED".equals(status) || "COMPLETED".equals(status) || "FAILED".equals(status); return "CANCELLED".equals(status) || "COMPLETED".equals(status)
|| "REMOVED".equals(status) || "FAILED".equals(status);
} }
/** /**
@@ -617,11 +640,14 @@ public class OrderQueueServiceEnhanced implements OrderQueueService {
return false; return false;
} }
// 允许的状态流转简化版本根据实际表结构 // 允许的状态流转支持新旧状态
return switch (oldStatus.toUpperCase()) { return switch (oldStatus.toUpperCase()) {
case "WAITING" -> "DISPATCHED".equals(newStatus) || "CANCELLED".equals(newStatus); case "PENDING" -> "WAITING".equals(newStatus) || "CANCELLED".equals(newStatus);
case "DISPATCHED" -> "PAUSED".equals(newStatus) || "CANCELLED".equals(newStatus); case "WAITING" -> "PROCESSING".equals(newStatus) || "DISPATCHED".equals(newStatus)
case "PAUSED" -> "DISPATCHED".equals(newStatus) || "CANCELLED".equals(newStatus); || "CANCELLED".equals(newStatus);
case "PROCESSING", "DISPATCHED" -> "PAUSED".equals(newStatus)
|| "REMOVED".equals(newStatus) || "COMPLETED".equals(newStatus);
case "PAUSED" -> "PROCESSING".equals(newStatus) || "DISPATCHED".equals(newStatus);
default -> false; default -> false;
}; };
} }
@@ -636,7 +662,7 @@ public class OrderQueueServiceEnhanced implements OrderQueueService {
queueDO.setDequeueTime(LocalDateTime.now()); queueDO.setDequeueTime(LocalDateTime.now());
} }
} }
case "CANCELLED", "COMPLETED" -> { case "REMOVED", "COMPLETED" -> {
// 可以记录完成时间如果表有该字段 // 可以记录完成时间如果表有该字段
} }
} }