diff --git a/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/queue/OrderQueueServiceEnhanced.java b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/queue/OrderQueueServiceEnhanced.java index 5951d25..8e35335 100644 --- a/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/queue/OrderQueueServiceEnhanced.java +++ b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/queue/OrderQueueServiceEnhanced.java @@ -464,34 +464,48 @@ public class OrderQueueServiceEnhanced implements OrderQueueService { return Collections.emptyList(); } - @Override - public List getWaitingTasksByUserId(Long userId) { - // 获取所有任务 - List allTasks = getTasksByUserId(userId); - - // 过滤出 WAITING 状态的任务,并按队列分数排序 - return allTasks.stream() - .filter(task -> OrderQueueStatusEnum.WAITING.getStatus().equals(task.getQueueStatus())) - .sorted((a, b) -> { - // 优先使用队列分数 score 排序(已在入队时计算好) - if (a.getQueueScore() != null && b.getQueueScore() != null) { - return Double.compare(a.getQueueScore(), b.getQueueScore()); - } - - // 兜底:如果 score 为空,则按优先级+时间排序 - // 按优先级排序(P0 > P1 > P2 > P3) - int priorityCompare = Integer.compare( - b.getPriority() != null ? b.getPriority() : 999, - a.getPriority() != null ? a.getPriority() : 999 - ); - if (priorityCompare != 0) { - return priorityCompare; - } - // 优先级相同,按入队时间排序 - return a.getEnqueueTime().compareTo(b.getEnqueueTime()); - }) - .collect(Collectors.toList()); - } + @Override + public List getWaitingTasksByUserId(Long userId) { + // 获取所有任务 + List allTasks = getTasksByUserId(userId); + + // 过滤出 WAITING 状态的任务,并按队列分数排序 + return filterAndSortWaitingTasks(allTasks); + } + + @Override + public List getWaitingTasksByUserIdFromDb(Long userId) { + // 直接从 MySQL 获取所有任务 + List mysqlList = orderQueueMapper.selectListByUserId(userId); + List allTasks = convertToDTO(mysqlList); + + // 过滤出 WAITING 状态的任务,并按队列分数排序 + return filterAndSortWaitingTasks(allTasks); + } + + private List filterAndSortWaitingTasks(List allTasks) { + return allTasks.stream() + .filter(task -> OrderQueueStatusEnum.WAITING.getStatus().equals(task.getQueueStatus())) + .sorted((a, b) -> { + // 优先使用队列分数 score 排序(已在入队时计算好) + if (a.getQueueScore() != null && b.getQueueScore() != null) { + return Double.compare(a.getQueueScore(), b.getQueueScore()); + } + + // 兜底:如果 score 为空,则按优先级+时间排序 + // 按优先级排序(P0 > P1 > P2 > P3) + int priorityCompare = Integer.compare( + b.getPriority() != null ? b.getPriority() : 999, + a.getPriority() != null ? a.getPriority() : 999 + ); + if (priorityCompare != 0) { + return priorityCompare; + } + // 优先级相同,按入队时间排序 + return a.getEnqueueTime().compareTo(b.getEnqueueTime()); + }) + .collect(Collectors.toList()); + } @Override public List getInterruptedTasksByUserId(Long userId) { diff --git a/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/queue/OrderQueueServiceImpl.java b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/queue/OrderQueueServiceImpl.java deleted file mode 100644 index ffcd7be..0000000 --- a/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/queue/OrderQueueServiceImpl.java +++ /dev/null @@ -1,732 +0,0 @@ -package com.viewsh.module.ops.service.queue; - -import com.viewsh.module.ops.api.queue.OrderQueueDTO; -import com.viewsh.module.ops.api.queue.OrderQueueService; -import com.viewsh.module.ops.dal.dataobject.queue.OpsOrderQueueDO; -import com.viewsh.module.ops.dal.mysql.queue.OpsOrderQueueMapper; -import com.viewsh.module.ops.enums.OrderQueueStatusEnum; -import com.viewsh.module.ops.enums.PriorityEnum; -import jakarta.annotation.Resource; -import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.BeanUtils; -import org.springframework.stereotype.Service; -import org.springframework.transaction.annotation.Transactional; - -import java.time.LocalDateTime; -import java.time.ZoneId; -import java.util.*; -import java.util.concurrent.CompletableFuture; -import java.util.stream.Collectors; - -/** - * 工单队列管理服务实现 - *

- * 架构说明: - * 1. 写入:先写 MySQL(持久化),再异步写 Redis(高性能) - * 2. 读取:优先读 Redis,未命中则读 MySQL 并同步到 Redis - * 3. 同步:定时任务将 MySQL 数据同步到 Redis - * 4. 容灾:Redis 宕机时降级到纯 MySQL 模式 - * - * @author lzh - */ -@Slf4j -@Service -public class OrderQueueServiceImpl implements OrderQueueService { - - /** - * Score 计算公式:优先级分数 + 时间戳 - * 优先级分数:P0=0, P1=1000000, P2=2000000, P3=3000000 - * 时间戳:秒级时间戳 - * 结果:优先级高的排在前面,同优先级按时间排序 - */ - private static final Map PRIORITY_SCORES = Map.of( - 0, 0L, // P0: 0 - 1, 1000000L, // P1: 1,000,000 - 2, 2000000L, // P2: 2,000,000 - 3, 3000000L // P3: 3,000,000 - ); - - @Resource - private OpsOrderQueueMapper orderQueueMapper; - - @Resource - private RedisOrderQueueService redisQueueService; - - @Resource - private QueueSyncService queueSyncService; - - @Override - @Transactional(rollbackFor = Exception.class) - public Long enqueue(Long opsOrderId, Long userId, PriorityEnum priority, Integer queueIndex) { - // 1. 检查工单是否已在队列中(MySQL) - OpsOrderQueueDO existingQueue = orderQueueMapper.selectByOpsOrderId(opsOrderId); - if (existingQueue != null) { - log.warn("工单已在队列中: opsOrderId={}, userId={}, existingQueueId={}", opsOrderId, userId, - existingQueue.getId()); - return existingQueue.getId(); - } - - // 2. 计算队列分数 - LocalDateTime now = LocalDateTime.now(); - double queueScore = calculateQueueScore(priority.getPriority(), now); - - // 3. 创建队列记录(MySQL) - OpsOrderQueueDO queueDO = OpsOrderQueueDO.builder() - .opsOrderId(opsOrderId) - .userId(userId) - .queueIndex(queueIndex != null ? queueIndex : calculateNextQueueIndex(priority)) - .priority(priority.getPriority()) - .queueScore(queueScore) - .queueStatus(OrderQueueStatusEnum.WAITING.getStatus()) - .enqueueTime(now) - .pausedDuration(0) - .eventMessage("工单入队,等待派单") - .build(); - - orderQueueMapper.insert(queueDO); - - log.info("工单已入队(MySQL): opsOrderId={}, userId={}, priority={}, queueId={}", - opsOrderId, userId, priority, queueDO.getId()); - - // 3. 异步写入 Redis(失败不影响主流程) - OrderQueueDTO dto = convertToDTO(queueDO); - CompletableFuture.runAsync(() -> { - try { - redisQueueService.enqueue(dto); - log.debug("工单已入队(Redis): queueId={}", dto.getId()); - } catch (Exception e) { - log.error("Redis 入队失败,依赖定时同步任务补偿: queueId={}", dto.getId(), e); - } - }); - - // 4. 如果是P0紧急任务,记录警告日志 - if (priority.isUrgent()) { - log.warn("检测到P0紧急任务入队,需立即处理: opsOrderId={}", opsOrderId); - // TODO: 触发紧急派单流程(在派单引擎中实现) - } - - return queueDO.getId(); - } - - @Override - @Transactional(rollbackFor = Exception.class) - public boolean dequeue(Long queueId) { - // 1. MySQL 删除 - OpsOrderQueueDO queueDO = orderQueueMapper.selectById(queueId); - if (queueDO == null) { - log.warn("队列记录不存在: queueId={}", queueId); - return false; - } - - // 只允许删除终态的记录 - if (!isTerminalStatus(queueDO.getQueueStatus())) { - log.warn("只能删除终态的队列记录: queueId={}, status={}", queueId, queueDO.getQueueStatus()); - return false; - } - - int deleted = orderQueueMapper.deleteById(queueId); - - // 2. 异步删除 Redis - if (deleted > 0) { - CompletableFuture.runAsync(() -> { - try { - redisQueueService.remove(queueId); - log.debug("Redis 删除队列记录成功: queueId={}", queueId); - } catch (Exception e) { - log.error("Redis 删除队列记录失败: queueId={}", queueId, e); - } - }); - } - - log.info("队列记录已删除: queueId={}, deleted={}", queueId, deleted > 0); - return deleted > 0; - } - - @Override - @Transactional(rollbackFor = Exception.class) - public boolean dequeueByOpsOrderId(Long opsOrderId) { - OpsOrderQueueDO queueDO = orderQueueMapper.selectByOpsOrderId(opsOrderId); - if (queueDO == null) { - log.warn("工单不在队列中: opsOrderId={}", opsOrderId); - return false; - } - - return dequeue(queueDO.getId()); - } - - @Override - @Transactional(rollbackFor = Exception.class) - public boolean updateStatus(Long queueId, OrderQueueStatusEnum newStatus) { - // 1. MySQL 更新 - OpsOrderQueueDO queueDO = orderQueueMapper.selectById(queueId); - if (queueDO == null) { - log.warn("队列记录不存在: queueId={}", queueId); - return false; - } - - String oldStatus = queueDO.getQueueStatus(); - - // 状态流转校验 - if (!validateStatusTransition(oldStatus, newStatus.getStatus())) { - log.warn("非法的状态流转: queueId={}, oldStatus={}, newStatus={}", - queueId, oldStatus, newStatus.getStatus()); - return false; - } - - queueDO.setQueueStatus(newStatus.getStatus().toUpperCase()); - updateStatusTimestamp(queueDO, newStatus); - - int updated = orderQueueMapper.updateById(queueDO); - - // 2. 更新 Redis - if (updated > 0) { - OrderQueueDTO dto = convertToDTO(queueDO); - - // REMOVED 状态需要同步更新,确保任务完成后立即从 Redis 队列移除 - // 避免自动派单下一个时查询到已完成的任务 - if (OrderQueueStatusEnum.REMOVED == newStatus) { - try { - redisQueueService.updateStatus(queueId, newStatus.getStatus()); - log.debug("Redis 同步更新状态成功: queueId={}, status={}", queueId, newStatus.getStatus()); - } catch (Exception e) { - log.error("Redis 同步更新状态失败: queueId={}", queueId, e); - } - } else { - // 其他状态异步更新 - CompletableFuture.runAsync(() -> { - try { - redisQueueService.updateStatus(queueId, newStatus.getStatus()); - log.debug("Redis 异步更新状态成功: queueId={}, status={}", queueId, newStatus.getStatus()); - } catch (Exception e) { - log.error("Redis 异步更新状态失败: queueId={}", queueId, e); - } - }); - } - } - - log.info("队列状态已更新: queueId={}, opsOrderId={}, oldStatus={}, newStatus={}", - queueId, queueDO.getOpsOrderId(), oldStatus, newStatus.getStatus()); - - return updated > 0; - } - - @Override - @Transactional(rollbackFor = Exception.class) - public boolean startExecution(Long queueId) { - OpsOrderQueueDO queueDO = orderQueueMapper.selectById(queueId); - if (queueDO == null) { - log.warn("队列记录不存在: queueId={}", queueId); - return false; - } - - if (!OrderQueueStatusEnum.WAITING.getStatus().equals(queueDO.getQueueStatus())) { - log.warn("只有WAITING状态的任务可以开始执行: queueId={}, status={}", - queueId, queueDO.getQueueStatus()); - return false; - } - - // 队列状态流转:WAITING → PROCESSING - queueDO.setQueueStatus(OrderQueueStatusEnum.PROCESSING.getStatus()); - queueDO.setDequeueTime(LocalDateTime.now()); - queueDO.setEventMessage("派单成功,已分配给执行人员"); - - int updated = orderQueueMapper.updateById(queueDO); - - // 异步更新 Redis - if (updated > 0) { - CompletableFuture.runAsync(() -> { - try { - redisQueueService.updateStatus(queueId, OrderQueueStatusEnum.PROCESSING.getStatus()); - } catch (Exception e) { - log.error("Redis 更新状态失败: queueId={}", queueId, e); - } - }); - } - - log.info("派单成功: queueId={}, opsOrderId={}, assigneeId={}", - queueId, queueDO.getOpsOrderId(), queueDO.getUserId()); - - return updated > 0; - } - - @Override - @Transactional(rollbackFor = Exception.class) - public boolean pauseTask(Long queueId) { - OpsOrderQueueDO queueDO = orderQueueMapper.selectById(queueId); - if (queueDO == null) { - log.warn("队列记录不存在: queueId={}", queueId); - return false; - } - - // 允许 PROCESSING 状态的任务可以暂停 - String currentStatus = queueDO.getQueueStatus(); - if (!OrderQueueStatusEnum.PROCESSING.getStatus().equals(currentStatus)) { - log.warn("只有PROCESSING状态的任务可以暂停: queueId={}, status={}", - queueId, currentStatus); - return false; - } - - // 队列状态流转:PROCESSING → PAUSED - queueDO.setQueueStatus(OrderQueueStatusEnum.PAUSED.getStatus()); - - // 计算暂停时长 - if (queueDO.getDequeueTime() != null) { - long pausedSeconds = java.time.Duration.between( - queueDO.getDequeueTime(), - LocalDateTime.now()).getSeconds(); - queueDO.addPausedDuration((int) pausedSeconds); - } - - queueDO.setEventMessage("任务已暂停"); - int updated = orderQueueMapper.updateById(queueDO); - - // 异步更新 Redis - if (updated > 0) { - CompletableFuture.runAsync(() -> { - try { - redisQueueService.updateStatus(queueId, OrderQueueStatusEnum.PAUSED.getStatus()); - } catch (Exception e) { - log.error("Redis 更新状态失败: queueId={}", queueId, e); - } - }); - } - - log.info("任务已暂停: queueId={}, opsOrderId={}, pausedDuration={}", - queueId, queueDO.getOpsOrderId(), queueDO.getPausedDuration()); - - return updated > 0; - } - - @Override - @Transactional(rollbackFor = Exception.class) - public boolean resumeTask(Long queueId) { - OpsOrderQueueDO queueDO = orderQueueMapper.selectById(queueId); - if (queueDO == null) { - log.warn("队列记录不存在: queueId={}", queueId); - return false; - } - - // 只有 PAUSED 状态的任务可以恢复 - String currentStatus = queueDO.getQueueStatus(); - if (!OrderQueueStatusEnum.PAUSED.getStatus().equals(currentStatus)) { - log.warn("只有PAUSED状态的任务可以恢复: queueId={}, status={}", - queueId, currentStatus); - return false; - } - - // 队列状态流转:PAUSED → PROCESSING - queueDO.setQueueStatus(OrderQueueStatusEnum.PROCESSING.getStatus()); - queueDO.setDequeueTime(LocalDateTime.now()); // 重置出队时间 - queueDO.setEventMessage("任务已恢复执行"); - - int updated = orderQueueMapper.updateById(queueDO); - - // 异步更新 Redis - if (updated > 0) { - CompletableFuture.runAsync(() -> { - try { - redisQueueService.updateStatus(queueId, OrderQueueStatusEnum.PROCESSING.getStatus()); - } catch (Exception e) { - log.error("Redis 更新状态失败: queueId={}", queueId, e); - } - }); - } - - log.info("任务已恢复执行: queueId={}, opsOrderId={}", queueId, queueDO.getOpsOrderId()); - - return updated > 0; - } - - @Override - @Transactional(rollbackFor = Exception.class) - public boolean adjustPriority(Long queueId, PriorityEnum newPriority, String reason) { - OpsOrderQueueDO queueDO = orderQueueMapper.selectById(queueId); - if (queueDO == null) { - log.warn("队列记录不存在: queueId={}", queueId); - return false; - } - - Integer oldPriority = queueDO.getPriority(); - - // 如果优先级没有变化,直接返回 - if (oldPriority.equals(newPriority.getPriority())) { - log.info("优先级未变化: queueId={}, priority={}", queueId, newPriority); - return true; - } - - queueDO.setPriorityEnum(newPriority); - // 重新计算队列顺序 - queueDO.setQueueIndex(calculateNextQueueIndex(newPriority)); - // 重新计算队列分数(使用原入队时间保持时间戳不变) - LocalDateTime enqueueTime = queueDO.getEnqueueTime() != null ? queueDO.getEnqueueTime() : LocalDateTime.now(); - double newQueueScore = calculateQueueScore(newPriority.getPriority(), enqueueTime); - queueDO.setQueueScore(newQueueScore); - queueDO.setEventMessage("优先级调整: " + oldPriority + " -> " + newPriority + ", 原因: " + reason); - - int updated = orderQueueMapper.updateById(queueDO); - - // 异步更新 Redis - if (updated > 0) { - CompletableFuture.runAsync(() -> { - try { - redisQueueService.updatePriority(queueId, newPriority.getPriority()); - } catch (Exception e) { - log.error("Redis 更新优先级失败: queueId={}", queueId, e); - } - }); - } - - log.info("优先级已调整: queueId={}, opsOrderId={}, oldPriority={}, newPriority={}, reason={}, newScore={}", - queueId, queueDO.getOpsOrderId(), oldPriority, newPriority, reason, newQueueScore); - - // 如果升级为P0,触发紧急派单 - if (newPriority.isUrgent()) { - log.warn("任务升级为P0紧急,需立即处理: queueId={}, opsOrderId={}", - queueId, queueDO.getOpsOrderId()); - // TODO: 触发紧急派单流程 - } - - return updated > 0; - } - - @Override - @Transactional(rollbackFor = Exception.class) - public boolean adjustPriorityByOpsOrderId(Long opsOrderId, PriorityEnum newPriority, String reason) { - OpsOrderQueueDO queueDO = orderQueueMapper.selectByOpsOrderId(opsOrderId); - if (queueDO == null) { - log.warn("工单不在队列中: opsOrderId={}", opsOrderId); - return false; - } - - return adjustPriority(queueDO.getId(), newPriority, reason); - } - - @Override - @Transactional(rollbackFor = Exception.class) - public boolean upgradeToUrgent(Long opsOrderId, String reason) { - return adjustPriorityByOpsOrderId(opsOrderId, PriorityEnum.P0, reason); - } - - @Override - public List getWaitingQueue() { - // 优先从 Redis 获取 - // TODO: 实现全局等待队列(需要聚合所有用户的队列) - // 这里暂时使用 MySQL - List list = orderQueueMapper.selectListWaiting(); - return convertToDTO(list); - } - - @Override - public List getUrgentOrders() { - // TODO: 优先从 Redis 获取 - List list = orderQueueMapper.selectUrgentOrders(); - return convertToDTO(list); - } - - @Override - public OrderQueueDTO getCurrentTaskByUserId(Long userId) { - // 1. 优先从 Redis 获取 - List redisTasks = redisQueueService.peekTasks(userId, 1); - if (redisTasks != null && !redisTasks.isEmpty()) { - return redisTasks.get(0); - } - - // 2. Redis 未命中,从 MySQL 获取并同步到 Redis - OpsOrderQueueDO queueDO = orderQueueMapper.selectCurrentExecutingByUserId(userId); - if (queueDO != null) { - // 同步到 Redis - OrderQueueDTO dto = convertToDTO(queueDO); - CompletableFuture.runAsync(() -> { - try { - redisQueueService.enqueue(dto); - } catch (Exception e) { - log.error("同步到 Redis 失败: queueId={}", dto.getId(), e); - } - }); - return dto; - } - - return null; - } - - @Override - public List getTasksByUserId(Long userId) { - // 1. 优先从 Redis 获取 - List redisTasks = redisQueueService.getTasksByUserId(userId); - if (redisTasks != null && !redisTasks.isEmpty()) { - return redisTasks; - } - - // 2. Redis 未命中,从 MySQL 获取并同步到 Redis - List mysqlList = orderQueueMapper.selectListByUserId(userId); - if (mysqlList != null && !mysqlList.isEmpty()) { - // 同步到 Redis - List dtoList = convertToDTO(mysqlList); - CompletableFuture.runAsync(() -> { - try { - redisQueueService.batchEnqueue(dtoList); - } catch (Exception e) { - log.error("批量同步到 Redis 失败: userId={}", userId, e); - } - }); - return dtoList; - } - - return Collections.emptyList(); - } - - @Override - public List getWaitingTasksByUserId(Long userId) { - // 获取所有任务 - List allTasks = getTasksByUserId(userId); - - // 过滤出 WAITING 状态的任务,并按队列分数排序 - return allTasks.stream() - .filter(task -> OrderQueueStatusEnum.WAITING.getStatus().equalsIgnoreCase(task.getQueueStatus())) - .sorted((a, b) -> { - // 优先使用队列分数 score 排序(已在入队时计算好) - if (a.getQueueScore() != null && b.getQueueScore() != null) { - return Double.compare(a.getQueueScore(), b.getQueueScore()); - } - - // 兜底:如果 score 为空,则按优先级+时间排序 - // 按优先级排序(P0 > P1 > P2 > P3) - int priorityCompare = Integer.compare( - b.getPriority() != null ? b.getPriority() : 999, - a.getPriority() != null ? a.getPriority() : 999); - if (priorityCompare != 0) { - return priorityCompare; - } - // 优先级相同,按入队时间排序 - return a.getEnqueueTime().compareTo(b.getEnqueueTime()); - }) - .collect(Collectors.toList()); - } - - @Override - public List getWaitingTasksByUserIdFromDb(Long userId) { - // 直接从 MySQL 获取等待中的任务(忽略 Redis 缓存) - // 用于确保获取最新数据,例如在任务完成后自动派单下一个时 - List mysqlList = orderQueueMapper.selectListByUserIdAndStatus( - userId, OrderQueueStatusEnum.WAITING.getStatus()); - - if (mysqlList == null || mysqlList.isEmpty()) { - return Collections.emptyList(); - } - - // 转换为 DTO 并按队列分数排序 - return mysqlList.stream() - .map(this::convertToDTO) - .filter(Objects::nonNull) - .sorted((a, b) -> { - // 优先使用队列分数 score 排序 - if (a.getQueueScore() != null && b.getQueueScore() != null) { - return Double.compare(a.getQueueScore(), b.getQueueScore()); - } - // 兜底:如果 score 为空,则按优先级+时间排序 - int priorityCompare = Integer.compare( - b.getPriority() != null ? b.getPriority() : 999, - a.getPriority() != null ? a.getPriority() : 999); - if (priorityCompare != 0) { - return priorityCompare; - } - return a.getEnqueueTime().compareTo(b.getEnqueueTime()); - }) - .collect(Collectors.toList()); - } - - @Override - public List getInterruptedTasksByUserId(Long userId) { - // 获取所有任务 - List allTasks = getTasksByUserId(userId); - - // 过滤出 PAUSED 状态的任务,并按中断时间排序 - return allTasks.stream() - .filter(task -> OrderQueueStatusEnum.PAUSED.getStatus().equals(task.getQueueStatus())) - .sorted((a, b) -> { - // 按中断时间排序(最早中断的排在前面) - // 如果没有中断时间字段,则按入队时间排序 - if (a.getEnqueueTime() != null && b.getEnqueueTime() != null) { - return a.getEnqueueTime().compareTo(b.getEnqueueTime()); - } - return 0; - }) - .collect(Collectors.toList()); - } - - @Override - public OrderQueueDTO getById(Long queueId) { - // 1. 优先从 Redis Hash 获取 - OrderQueueDTO redisDTO = redisQueueService.getByQueueId(queueId); - if (redisDTO != null) { - return redisDTO; - } - - // 2. Redis 未命中,从 MySQL 获取 - OpsOrderQueueDO queueDO = orderQueueMapper.selectById(queueId); - if (queueDO != null) { - OrderQueueDTO dto = convertToDTO(queueDO); - // 同步到 Redis - CompletableFuture.runAsync(() -> { - try { - redisQueueService.enqueue(dto); - } catch (Exception e) { - log.error("同步到 Redis 失败: queueId={}", dto.getId(), e); - } - }); - return dto; - } - - return null; - } - - @Override - public OrderQueueDTO getByOpsOrderId(Long opsOrderId) { - // 1. 优先从 Redis 获取(遍历所有用户队列,性能较差,慎用) - OrderQueueDTO redisDTO = redisQueueService.getByOrderId(opsOrderId); - if (redisDTO != null) { - return redisDTO; - } - - // 2. Redis 未命中,从 MySQL 获取 - OpsOrderQueueDO queueDO = orderQueueMapper.selectByOpsOrderId(opsOrderId); - if (queueDO != null) { - return convertToDTO(queueDO); - } - - return null; - } - - @Override - public Long countWaiting() { - // 优先从 Redis 获取(需要聚合所有用户) - // TODO: 实现分布式计数器 - return orderQueueMapper.countWaiting(); - } - - @Override - public Long countByUserId(Long userId) { - // 优先从 Redis 获取 - long redisCount = redisQueueService.getQueueSize(userId); - if (redisCount > 0) { - return redisCount; - } - - // Redis 未命中,从 MySQL 获取 - return orderQueueMapper.countByUserId(userId); - } - - @Override - @Transactional(rollbackFor = Exception.class) - public boolean updateEventMessage(Long queueId, String eventMessage) { - OpsOrderQueueDO queueDO = orderQueueMapper.selectById(queueId); - if (queueDO == null) { - log.warn("队列记录不存在: queueId={}", queueId); - return false; - } - - queueDO.setEventMessage(eventMessage); - int updated = orderQueueMapper.updateById(queueDO); - - log.info("事件消息已更新: queueId={}, eventMessage={}", queueId, eventMessage); - - return updated > 0; - } - - // ========== 私有方法 ========== - - /** - * 计算队列分数(用于排序) - * 公式:优先级分数 + 时间戳 - * - * @param priority 优先级(0=P0, 1=P1, 2=P2, 3=P3) - * @param enqueueTime 入队时间 - * @return 队列分数 - */ - private double calculateQueueScore(Integer priority, LocalDateTime enqueueTime) { - // 获取优先级分数 - long priorityScore = PRIORITY_SCORES.getOrDefault(priority, 3000000L); - - // 计算时间戳(秒级) - long timestamp; - if (enqueueTime != null) { - timestamp = enqueueTime.atZone(ZoneId.systemDefault()).toEpochSecond(); - } else { - timestamp = System.currentTimeMillis() / 1000; - } - - return priorityScore + timestamp; - } - - /** - * 计算下一个队列顺序号 - */ - private Integer calculateNextQueueIndex(PriorityEnum priority) { - // TODO: 查询当前优先级的最大 queueIndex,然后 +1 - // 这里简化处理,返回默认值 - return priority.getQueueOrder() * 1000; - } - - /** - * 判断是否为终态 - */ - private boolean isTerminalStatus(String status) { - return "CANCELLED".equals(status) || "COMPLETED".equals(status) - || "REMOVED".equals(status) || "FAILED".equals(status); - } - - /** - * 校验状态流转是否合法 - */ - private boolean validateStatusTransition(String oldStatus, String newStatus) { - // 终态不能再变更 - if (isTerminalStatus(oldStatus)) { - return false; - } - - // 允许的状态流转(支持新旧状态) - return switch (oldStatus.toUpperCase()) { - case "PENDING" -> "WAITING".equals(newStatus) || "CANCELLED".equals(newStatus); - case "WAITING" -> "PROCESSING".equals(newStatus) || "DISPATCHED".equals(newStatus) - || "CANCELLED".equals(newStatus); - case "PROCESSING", "DISPATCHED" -> "PAUSED".equals(newStatus) - || "REMOVED".equals(newStatus) || "COMPLETED".equals(newStatus); - case "PAUSED" -> "PROCESSING".equals(newStatus) || "DISPATCHED".equals(newStatus); - default -> false; - }; - } - - /** - * 根据状态更新时间戳 - */ - private void updateStatusTimestamp(OpsOrderQueueDO queueDO, OrderQueueStatusEnum status) { - switch (status.getStatus().toUpperCase()) { - case "DISPATCHED" -> { - if (queueDO.getDequeueTime() == null) { - queueDO.setDequeueTime(LocalDateTime.now()); - } - } - case "REMOVED", "COMPLETED" -> { - // 可以记录完成时间(如果表有该字段) - } - } - } - - /** - * 转换为 DTO - */ - private OrderQueueDTO convertToDTO(OpsOrderQueueDO queueDO) { - OrderQueueDTO dto = new OrderQueueDTO(); - BeanUtils.copyProperties(queueDO, dto); - return dto; - } - - /** - * 批量转换为 DTO - */ - private List convertToDTO(List list) { - return list.stream() - .map(this::convertToDTO) - .collect(Collectors.toList()); - } -}