refactor(ops): 删除重复的OrderQueueServiceImpl实现类
保留 OrderQueueServiceEnhanced 作为唯一实现,避免Bean冲突 Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
@@ -464,34 +464,48 @@ public class OrderQueueServiceEnhanced implements OrderQueueService {
|
||||
return Collections.emptyList();
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<OrderQueueDTO> getWaitingTasksByUserId(Long userId) {
|
||||
// 获取所有任务
|
||||
List<OrderQueueDTO> allTasks = getTasksByUserId(userId);
|
||||
|
||||
// 过滤出 WAITING 状态的任务,并按队列分数排序
|
||||
return allTasks.stream()
|
||||
.filter(task -> OrderQueueStatusEnum.WAITING.getStatus().equals(task.getQueueStatus()))
|
||||
.sorted((a, b) -> {
|
||||
// 优先使用队列分数 score 排序(已在入队时计算好)
|
||||
if (a.getQueueScore() != null && b.getQueueScore() != null) {
|
||||
return Double.compare(a.getQueueScore(), b.getQueueScore());
|
||||
}
|
||||
|
||||
// 兜底:如果 score 为空,则按优先级+时间排序
|
||||
// 按优先级排序(P0 > P1 > P2 > P3)
|
||||
int priorityCompare = Integer.compare(
|
||||
b.getPriority() != null ? b.getPriority() : 999,
|
||||
a.getPriority() != null ? a.getPriority() : 999
|
||||
);
|
||||
if (priorityCompare != 0) {
|
||||
return priorityCompare;
|
||||
}
|
||||
// 优先级相同,按入队时间排序
|
||||
return a.getEnqueueTime().compareTo(b.getEnqueueTime());
|
||||
})
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
@Override
|
||||
public List<OrderQueueDTO> getWaitingTasksByUserId(Long userId) {
|
||||
// 获取所有任务
|
||||
List<OrderQueueDTO> allTasks = getTasksByUserId(userId);
|
||||
|
||||
// 过滤出 WAITING 状态的任务,并按队列分数排序
|
||||
return filterAndSortWaitingTasks(allTasks);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<OrderQueueDTO> getWaitingTasksByUserIdFromDb(Long userId) {
|
||||
// 直接从 MySQL 获取所有任务
|
||||
List<OpsOrderQueueDO> mysqlList = orderQueueMapper.selectListByUserId(userId);
|
||||
List<OrderQueueDTO> allTasks = convertToDTO(mysqlList);
|
||||
|
||||
// 过滤出 WAITING 状态的任务,并按队列分数排序
|
||||
return filterAndSortWaitingTasks(allTasks);
|
||||
}
|
||||
|
||||
private List<OrderQueueDTO> filterAndSortWaitingTasks(List<OrderQueueDTO> allTasks) {
|
||||
return allTasks.stream()
|
||||
.filter(task -> OrderQueueStatusEnum.WAITING.getStatus().equals(task.getQueueStatus()))
|
||||
.sorted((a, b) -> {
|
||||
// 优先使用队列分数 score 排序(已在入队时计算好)
|
||||
if (a.getQueueScore() != null && b.getQueueScore() != null) {
|
||||
return Double.compare(a.getQueueScore(), b.getQueueScore());
|
||||
}
|
||||
|
||||
// 兜底:如果 score 为空,则按优先级+时间排序
|
||||
// 按优先级排序(P0 > P1 > P2 > P3)
|
||||
int priorityCompare = Integer.compare(
|
||||
b.getPriority() != null ? b.getPriority() : 999,
|
||||
a.getPriority() != null ? a.getPriority() : 999
|
||||
);
|
||||
if (priorityCompare != 0) {
|
||||
return priorityCompare;
|
||||
}
|
||||
// 优先级相同,按入队时间排序
|
||||
return a.getEnqueueTime().compareTo(b.getEnqueueTime());
|
||||
})
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<OrderQueueDTO> getInterruptedTasksByUserId(Long userId) {
|
||||
|
||||
@@ -1,732 +0,0 @@
|
||||
package com.viewsh.module.ops.service.queue;
|
||||
|
||||
import com.viewsh.module.ops.api.queue.OrderQueueDTO;
|
||||
import com.viewsh.module.ops.api.queue.OrderQueueService;
|
||||
import com.viewsh.module.ops.dal.dataobject.queue.OpsOrderQueueDO;
|
||||
import com.viewsh.module.ops.dal.mysql.queue.OpsOrderQueueMapper;
|
||||
import com.viewsh.module.ops.enums.OrderQueueStatusEnum;
|
||||
import com.viewsh.module.ops.enums.PriorityEnum;
|
||||
import jakarta.annotation.Resource;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.BeanUtils;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.springframework.transaction.annotation.Transactional;
|
||||
|
||||
import java.time.LocalDateTime;
|
||||
import java.time.ZoneId;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* 工单队列管理服务实现
|
||||
* <p>
|
||||
* 架构说明:
|
||||
* 1. 写入:先写 MySQL(持久化),再异步写 Redis(高性能)
|
||||
* 2. 读取:优先读 Redis,未命中则读 MySQL 并同步到 Redis
|
||||
* 3. 同步:定时任务将 MySQL 数据同步到 Redis
|
||||
* 4. 容灾:Redis 宕机时降级到纯 MySQL 模式
|
||||
*
|
||||
* @author lzh
|
||||
*/
|
||||
@Slf4j
|
||||
@Service
|
||||
public class OrderQueueServiceImpl implements OrderQueueService {
|
||||
|
||||
/**
|
||||
* Score 计算公式:优先级分数 + 时间戳
|
||||
* 优先级分数:P0=0, P1=1000000, P2=2000000, P3=3000000
|
||||
* 时间戳:秒级时间戳
|
||||
* 结果:优先级高的排在前面,同优先级按时间排序
|
||||
*/
|
||||
private static final Map<Integer, Long> PRIORITY_SCORES = Map.of(
|
||||
0, 0L, // P0: 0
|
||||
1, 1000000L, // P1: 1,000,000
|
||||
2, 2000000L, // P2: 2,000,000
|
||||
3, 3000000L // P3: 3,000,000
|
||||
);
|
||||
|
||||
@Resource
|
||||
private OpsOrderQueueMapper orderQueueMapper;
|
||||
|
||||
@Resource
|
||||
private RedisOrderQueueService redisQueueService;
|
||||
|
||||
@Resource
|
||||
private QueueSyncService queueSyncService;
|
||||
|
||||
@Override
|
||||
@Transactional(rollbackFor = Exception.class)
|
||||
public Long enqueue(Long opsOrderId, Long userId, PriorityEnum priority, Integer queueIndex) {
|
||||
// 1. 检查工单是否已在队列中(MySQL)
|
||||
OpsOrderQueueDO existingQueue = orderQueueMapper.selectByOpsOrderId(opsOrderId);
|
||||
if (existingQueue != null) {
|
||||
log.warn("工单已在队列中: opsOrderId={}, userId={}, existingQueueId={}", opsOrderId, userId,
|
||||
existingQueue.getId());
|
||||
return existingQueue.getId();
|
||||
}
|
||||
|
||||
// 2. 计算队列分数
|
||||
LocalDateTime now = LocalDateTime.now();
|
||||
double queueScore = calculateQueueScore(priority.getPriority(), now);
|
||||
|
||||
// 3. 创建队列记录(MySQL)
|
||||
OpsOrderQueueDO queueDO = OpsOrderQueueDO.builder()
|
||||
.opsOrderId(opsOrderId)
|
||||
.userId(userId)
|
||||
.queueIndex(queueIndex != null ? queueIndex : calculateNextQueueIndex(priority))
|
||||
.priority(priority.getPriority())
|
||||
.queueScore(queueScore)
|
||||
.queueStatus(OrderQueueStatusEnum.WAITING.getStatus())
|
||||
.enqueueTime(now)
|
||||
.pausedDuration(0)
|
||||
.eventMessage("工单入队,等待派单")
|
||||
.build();
|
||||
|
||||
orderQueueMapper.insert(queueDO);
|
||||
|
||||
log.info("工单已入队(MySQL): opsOrderId={}, userId={}, priority={}, queueId={}",
|
||||
opsOrderId, userId, priority, queueDO.getId());
|
||||
|
||||
// 3. 异步写入 Redis(失败不影响主流程)
|
||||
OrderQueueDTO dto = convertToDTO(queueDO);
|
||||
CompletableFuture.runAsync(() -> {
|
||||
try {
|
||||
redisQueueService.enqueue(dto);
|
||||
log.debug("工单已入队(Redis): queueId={}", dto.getId());
|
||||
} catch (Exception e) {
|
||||
log.error("Redis 入队失败,依赖定时同步任务补偿: queueId={}", dto.getId(), e);
|
||||
}
|
||||
});
|
||||
|
||||
// 4. 如果是P0紧急任务,记录警告日志
|
||||
if (priority.isUrgent()) {
|
||||
log.warn("检测到P0紧急任务入队,需立即处理: opsOrderId={}", opsOrderId);
|
||||
// TODO: 触发紧急派单流程(在派单引擎中实现)
|
||||
}
|
||||
|
||||
return queueDO.getId();
|
||||
}
|
||||
|
||||
@Override
|
||||
@Transactional(rollbackFor = Exception.class)
|
||||
public boolean dequeue(Long queueId) {
|
||||
// 1. MySQL 删除
|
||||
OpsOrderQueueDO queueDO = orderQueueMapper.selectById(queueId);
|
||||
if (queueDO == null) {
|
||||
log.warn("队列记录不存在: queueId={}", queueId);
|
||||
return false;
|
||||
}
|
||||
|
||||
// 只允许删除终态的记录
|
||||
if (!isTerminalStatus(queueDO.getQueueStatus())) {
|
||||
log.warn("只能删除终态的队列记录: queueId={}, status={}", queueId, queueDO.getQueueStatus());
|
||||
return false;
|
||||
}
|
||||
|
||||
int deleted = orderQueueMapper.deleteById(queueId);
|
||||
|
||||
// 2. 异步删除 Redis
|
||||
if (deleted > 0) {
|
||||
CompletableFuture.runAsync(() -> {
|
||||
try {
|
||||
redisQueueService.remove(queueId);
|
||||
log.debug("Redis 删除队列记录成功: queueId={}", queueId);
|
||||
} catch (Exception e) {
|
||||
log.error("Redis 删除队列记录失败: queueId={}", queueId, e);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
log.info("队列记录已删除: queueId={}, deleted={}", queueId, deleted > 0);
|
||||
return deleted > 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
@Transactional(rollbackFor = Exception.class)
|
||||
public boolean dequeueByOpsOrderId(Long opsOrderId) {
|
||||
OpsOrderQueueDO queueDO = orderQueueMapper.selectByOpsOrderId(opsOrderId);
|
||||
if (queueDO == null) {
|
||||
log.warn("工单不在队列中: opsOrderId={}", opsOrderId);
|
||||
return false;
|
||||
}
|
||||
|
||||
return dequeue(queueDO.getId());
|
||||
}
|
||||
|
||||
@Override
|
||||
@Transactional(rollbackFor = Exception.class)
|
||||
public boolean updateStatus(Long queueId, OrderQueueStatusEnum newStatus) {
|
||||
// 1. MySQL 更新
|
||||
OpsOrderQueueDO queueDO = orderQueueMapper.selectById(queueId);
|
||||
if (queueDO == null) {
|
||||
log.warn("队列记录不存在: queueId={}", queueId);
|
||||
return false;
|
||||
}
|
||||
|
||||
String oldStatus = queueDO.getQueueStatus();
|
||||
|
||||
// 状态流转校验
|
||||
if (!validateStatusTransition(oldStatus, newStatus.getStatus())) {
|
||||
log.warn("非法的状态流转: queueId={}, oldStatus={}, newStatus={}",
|
||||
queueId, oldStatus, newStatus.getStatus());
|
||||
return false;
|
||||
}
|
||||
|
||||
queueDO.setQueueStatus(newStatus.getStatus().toUpperCase());
|
||||
updateStatusTimestamp(queueDO, newStatus);
|
||||
|
||||
int updated = orderQueueMapper.updateById(queueDO);
|
||||
|
||||
// 2. 更新 Redis
|
||||
if (updated > 0) {
|
||||
OrderQueueDTO dto = convertToDTO(queueDO);
|
||||
|
||||
// REMOVED 状态需要同步更新,确保任务完成后立即从 Redis 队列移除
|
||||
// 避免自动派单下一个时查询到已完成的任务
|
||||
if (OrderQueueStatusEnum.REMOVED == newStatus) {
|
||||
try {
|
||||
redisQueueService.updateStatus(queueId, newStatus.getStatus());
|
||||
log.debug("Redis 同步更新状态成功: queueId={}, status={}", queueId, newStatus.getStatus());
|
||||
} catch (Exception e) {
|
||||
log.error("Redis 同步更新状态失败: queueId={}", queueId, e);
|
||||
}
|
||||
} else {
|
||||
// 其他状态异步更新
|
||||
CompletableFuture.runAsync(() -> {
|
||||
try {
|
||||
redisQueueService.updateStatus(queueId, newStatus.getStatus());
|
||||
log.debug("Redis 异步更新状态成功: queueId={}, status={}", queueId, newStatus.getStatus());
|
||||
} catch (Exception e) {
|
||||
log.error("Redis 异步更新状态失败: queueId={}", queueId, e);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
log.info("队列状态已更新: queueId={}, opsOrderId={}, oldStatus={}, newStatus={}",
|
||||
queueId, queueDO.getOpsOrderId(), oldStatus, newStatus.getStatus());
|
||||
|
||||
return updated > 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
@Transactional(rollbackFor = Exception.class)
|
||||
public boolean startExecution(Long queueId) {
|
||||
OpsOrderQueueDO queueDO = orderQueueMapper.selectById(queueId);
|
||||
if (queueDO == null) {
|
||||
log.warn("队列记录不存在: queueId={}", queueId);
|
||||
return false;
|
||||
}
|
||||
|
||||
if (!OrderQueueStatusEnum.WAITING.getStatus().equals(queueDO.getQueueStatus())) {
|
||||
log.warn("只有WAITING状态的任务可以开始执行: queueId={}, status={}",
|
||||
queueId, queueDO.getQueueStatus());
|
||||
return false;
|
||||
}
|
||||
|
||||
// 队列状态流转:WAITING → PROCESSING
|
||||
queueDO.setQueueStatus(OrderQueueStatusEnum.PROCESSING.getStatus());
|
||||
queueDO.setDequeueTime(LocalDateTime.now());
|
||||
queueDO.setEventMessage("派单成功,已分配给执行人员");
|
||||
|
||||
int updated = orderQueueMapper.updateById(queueDO);
|
||||
|
||||
// 异步更新 Redis
|
||||
if (updated > 0) {
|
||||
CompletableFuture.runAsync(() -> {
|
||||
try {
|
||||
redisQueueService.updateStatus(queueId, OrderQueueStatusEnum.PROCESSING.getStatus());
|
||||
} catch (Exception e) {
|
||||
log.error("Redis 更新状态失败: queueId={}", queueId, e);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
log.info("派单成功: queueId={}, opsOrderId={}, assigneeId={}",
|
||||
queueId, queueDO.getOpsOrderId(), queueDO.getUserId());
|
||||
|
||||
return updated > 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
@Transactional(rollbackFor = Exception.class)
|
||||
public boolean pauseTask(Long queueId) {
|
||||
OpsOrderQueueDO queueDO = orderQueueMapper.selectById(queueId);
|
||||
if (queueDO == null) {
|
||||
log.warn("队列记录不存在: queueId={}", queueId);
|
||||
return false;
|
||||
}
|
||||
|
||||
// 允许 PROCESSING 状态的任务可以暂停
|
||||
String currentStatus = queueDO.getQueueStatus();
|
||||
if (!OrderQueueStatusEnum.PROCESSING.getStatus().equals(currentStatus)) {
|
||||
log.warn("只有PROCESSING状态的任务可以暂停: queueId={}, status={}",
|
||||
queueId, currentStatus);
|
||||
return false;
|
||||
}
|
||||
|
||||
// 队列状态流转:PROCESSING → PAUSED
|
||||
queueDO.setQueueStatus(OrderQueueStatusEnum.PAUSED.getStatus());
|
||||
|
||||
// 计算暂停时长
|
||||
if (queueDO.getDequeueTime() != null) {
|
||||
long pausedSeconds = java.time.Duration.between(
|
||||
queueDO.getDequeueTime(),
|
||||
LocalDateTime.now()).getSeconds();
|
||||
queueDO.addPausedDuration((int) pausedSeconds);
|
||||
}
|
||||
|
||||
queueDO.setEventMessage("任务已暂停");
|
||||
int updated = orderQueueMapper.updateById(queueDO);
|
||||
|
||||
// 异步更新 Redis
|
||||
if (updated > 0) {
|
||||
CompletableFuture.runAsync(() -> {
|
||||
try {
|
||||
redisQueueService.updateStatus(queueId, OrderQueueStatusEnum.PAUSED.getStatus());
|
||||
} catch (Exception e) {
|
||||
log.error("Redis 更新状态失败: queueId={}", queueId, e);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
log.info("任务已暂停: queueId={}, opsOrderId={}, pausedDuration={}",
|
||||
queueId, queueDO.getOpsOrderId(), queueDO.getPausedDuration());
|
||||
|
||||
return updated > 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
@Transactional(rollbackFor = Exception.class)
|
||||
public boolean resumeTask(Long queueId) {
|
||||
OpsOrderQueueDO queueDO = orderQueueMapper.selectById(queueId);
|
||||
if (queueDO == null) {
|
||||
log.warn("队列记录不存在: queueId={}", queueId);
|
||||
return false;
|
||||
}
|
||||
|
||||
// 只有 PAUSED 状态的任务可以恢复
|
||||
String currentStatus = queueDO.getQueueStatus();
|
||||
if (!OrderQueueStatusEnum.PAUSED.getStatus().equals(currentStatus)) {
|
||||
log.warn("只有PAUSED状态的任务可以恢复: queueId={}, status={}",
|
||||
queueId, currentStatus);
|
||||
return false;
|
||||
}
|
||||
|
||||
// 队列状态流转:PAUSED → PROCESSING
|
||||
queueDO.setQueueStatus(OrderQueueStatusEnum.PROCESSING.getStatus());
|
||||
queueDO.setDequeueTime(LocalDateTime.now()); // 重置出队时间
|
||||
queueDO.setEventMessage("任务已恢复执行");
|
||||
|
||||
int updated = orderQueueMapper.updateById(queueDO);
|
||||
|
||||
// 异步更新 Redis
|
||||
if (updated > 0) {
|
||||
CompletableFuture.runAsync(() -> {
|
||||
try {
|
||||
redisQueueService.updateStatus(queueId, OrderQueueStatusEnum.PROCESSING.getStatus());
|
||||
} catch (Exception e) {
|
||||
log.error("Redis 更新状态失败: queueId={}", queueId, e);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
log.info("任务已恢复执行: queueId={}, opsOrderId={}", queueId, queueDO.getOpsOrderId());
|
||||
|
||||
return updated > 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
@Transactional(rollbackFor = Exception.class)
|
||||
public boolean adjustPriority(Long queueId, PriorityEnum newPriority, String reason) {
|
||||
OpsOrderQueueDO queueDO = orderQueueMapper.selectById(queueId);
|
||||
if (queueDO == null) {
|
||||
log.warn("队列记录不存在: queueId={}", queueId);
|
||||
return false;
|
||||
}
|
||||
|
||||
Integer oldPriority = queueDO.getPriority();
|
||||
|
||||
// 如果优先级没有变化,直接返回
|
||||
if (oldPriority.equals(newPriority.getPriority())) {
|
||||
log.info("优先级未变化: queueId={}, priority={}", queueId, newPriority);
|
||||
return true;
|
||||
}
|
||||
|
||||
queueDO.setPriorityEnum(newPriority);
|
||||
// 重新计算队列顺序
|
||||
queueDO.setQueueIndex(calculateNextQueueIndex(newPriority));
|
||||
// 重新计算队列分数(使用原入队时间保持时间戳不变)
|
||||
LocalDateTime enqueueTime = queueDO.getEnqueueTime() != null ? queueDO.getEnqueueTime() : LocalDateTime.now();
|
||||
double newQueueScore = calculateQueueScore(newPriority.getPriority(), enqueueTime);
|
||||
queueDO.setQueueScore(newQueueScore);
|
||||
queueDO.setEventMessage("优先级调整: " + oldPriority + " -> " + newPriority + ", 原因: " + reason);
|
||||
|
||||
int updated = orderQueueMapper.updateById(queueDO);
|
||||
|
||||
// 异步更新 Redis
|
||||
if (updated > 0) {
|
||||
CompletableFuture.runAsync(() -> {
|
||||
try {
|
||||
redisQueueService.updatePriority(queueId, newPriority.getPriority());
|
||||
} catch (Exception e) {
|
||||
log.error("Redis 更新优先级失败: queueId={}", queueId, e);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
log.info("优先级已调整: queueId={}, opsOrderId={}, oldPriority={}, newPriority={}, reason={}, newScore={}",
|
||||
queueId, queueDO.getOpsOrderId(), oldPriority, newPriority, reason, newQueueScore);
|
||||
|
||||
// 如果升级为P0,触发紧急派单
|
||||
if (newPriority.isUrgent()) {
|
||||
log.warn("任务升级为P0紧急,需立即处理: queueId={}, opsOrderId={}",
|
||||
queueId, queueDO.getOpsOrderId());
|
||||
// TODO: 触发紧急派单流程
|
||||
}
|
||||
|
||||
return updated > 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
@Transactional(rollbackFor = Exception.class)
|
||||
public boolean adjustPriorityByOpsOrderId(Long opsOrderId, PriorityEnum newPriority, String reason) {
|
||||
OpsOrderQueueDO queueDO = orderQueueMapper.selectByOpsOrderId(opsOrderId);
|
||||
if (queueDO == null) {
|
||||
log.warn("工单不在队列中: opsOrderId={}", opsOrderId);
|
||||
return false;
|
||||
}
|
||||
|
||||
return adjustPriority(queueDO.getId(), newPriority, reason);
|
||||
}
|
||||
|
||||
@Override
|
||||
@Transactional(rollbackFor = Exception.class)
|
||||
public boolean upgradeToUrgent(Long opsOrderId, String reason) {
|
||||
return adjustPriorityByOpsOrderId(opsOrderId, PriorityEnum.P0, reason);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<OrderQueueDTO> getWaitingQueue() {
|
||||
// 优先从 Redis 获取
|
||||
// TODO: 实现全局等待队列(需要聚合所有用户的队列)
|
||||
// 这里暂时使用 MySQL
|
||||
List<OpsOrderQueueDO> list = orderQueueMapper.selectListWaiting();
|
||||
return convertToDTO(list);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<OrderQueueDTO> getUrgentOrders() {
|
||||
// TODO: 优先从 Redis 获取
|
||||
List<OpsOrderQueueDO> list = orderQueueMapper.selectUrgentOrders();
|
||||
return convertToDTO(list);
|
||||
}
|
||||
|
||||
@Override
|
||||
public OrderQueueDTO getCurrentTaskByUserId(Long userId) {
|
||||
// 1. 优先从 Redis 获取
|
||||
List<OrderQueueDTO> redisTasks = redisQueueService.peekTasks(userId, 1);
|
||||
if (redisTasks != null && !redisTasks.isEmpty()) {
|
||||
return redisTasks.get(0);
|
||||
}
|
||||
|
||||
// 2. Redis 未命中,从 MySQL 获取并同步到 Redis
|
||||
OpsOrderQueueDO queueDO = orderQueueMapper.selectCurrentExecutingByUserId(userId);
|
||||
if (queueDO != null) {
|
||||
// 同步到 Redis
|
||||
OrderQueueDTO dto = convertToDTO(queueDO);
|
||||
CompletableFuture.runAsync(() -> {
|
||||
try {
|
||||
redisQueueService.enqueue(dto);
|
||||
} catch (Exception e) {
|
||||
log.error("同步到 Redis 失败: queueId={}", dto.getId(), e);
|
||||
}
|
||||
});
|
||||
return dto;
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<OrderQueueDTO> getTasksByUserId(Long userId) {
|
||||
// 1. 优先从 Redis 获取
|
||||
List<OrderQueueDTO> redisTasks = redisQueueService.getTasksByUserId(userId);
|
||||
if (redisTasks != null && !redisTasks.isEmpty()) {
|
||||
return redisTasks;
|
||||
}
|
||||
|
||||
// 2. Redis 未命中,从 MySQL 获取并同步到 Redis
|
||||
List<OpsOrderQueueDO> mysqlList = orderQueueMapper.selectListByUserId(userId);
|
||||
if (mysqlList != null && !mysqlList.isEmpty()) {
|
||||
// 同步到 Redis
|
||||
List<OrderQueueDTO> dtoList = convertToDTO(mysqlList);
|
||||
CompletableFuture.runAsync(() -> {
|
||||
try {
|
||||
redisQueueService.batchEnqueue(dtoList);
|
||||
} catch (Exception e) {
|
||||
log.error("批量同步到 Redis 失败: userId={}", userId, e);
|
||||
}
|
||||
});
|
||||
return dtoList;
|
||||
}
|
||||
|
||||
return Collections.emptyList();
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<OrderQueueDTO> getWaitingTasksByUserId(Long userId) {
|
||||
// 获取所有任务
|
||||
List<OrderQueueDTO> allTasks = getTasksByUserId(userId);
|
||||
|
||||
// 过滤出 WAITING 状态的任务,并按队列分数排序
|
||||
return allTasks.stream()
|
||||
.filter(task -> OrderQueueStatusEnum.WAITING.getStatus().equalsIgnoreCase(task.getQueueStatus()))
|
||||
.sorted((a, b) -> {
|
||||
// 优先使用队列分数 score 排序(已在入队时计算好)
|
||||
if (a.getQueueScore() != null && b.getQueueScore() != null) {
|
||||
return Double.compare(a.getQueueScore(), b.getQueueScore());
|
||||
}
|
||||
|
||||
// 兜底:如果 score 为空,则按优先级+时间排序
|
||||
// 按优先级排序(P0 > P1 > P2 > P3)
|
||||
int priorityCompare = Integer.compare(
|
||||
b.getPriority() != null ? b.getPriority() : 999,
|
||||
a.getPriority() != null ? a.getPriority() : 999);
|
||||
if (priorityCompare != 0) {
|
||||
return priorityCompare;
|
||||
}
|
||||
// 优先级相同,按入队时间排序
|
||||
return a.getEnqueueTime().compareTo(b.getEnqueueTime());
|
||||
})
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<OrderQueueDTO> getWaitingTasksByUserIdFromDb(Long userId) {
|
||||
// 直接从 MySQL 获取等待中的任务(忽略 Redis 缓存)
|
||||
// 用于确保获取最新数据,例如在任务完成后自动派单下一个时
|
||||
List<OpsOrderQueueDO> mysqlList = orderQueueMapper.selectListByUserIdAndStatus(
|
||||
userId, OrderQueueStatusEnum.WAITING.getStatus());
|
||||
|
||||
if (mysqlList == null || mysqlList.isEmpty()) {
|
||||
return Collections.emptyList();
|
||||
}
|
||||
|
||||
// 转换为 DTO 并按队列分数排序
|
||||
return mysqlList.stream()
|
||||
.map(this::convertToDTO)
|
||||
.filter(Objects::nonNull)
|
||||
.sorted((a, b) -> {
|
||||
// 优先使用队列分数 score 排序
|
||||
if (a.getQueueScore() != null && b.getQueueScore() != null) {
|
||||
return Double.compare(a.getQueueScore(), b.getQueueScore());
|
||||
}
|
||||
// 兜底:如果 score 为空,则按优先级+时间排序
|
||||
int priorityCompare = Integer.compare(
|
||||
b.getPriority() != null ? b.getPriority() : 999,
|
||||
a.getPriority() != null ? a.getPriority() : 999);
|
||||
if (priorityCompare != 0) {
|
||||
return priorityCompare;
|
||||
}
|
||||
return a.getEnqueueTime().compareTo(b.getEnqueueTime());
|
||||
})
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<OrderQueueDTO> getInterruptedTasksByUserId(Long userId) {
|
||||
// 获取所有任务
|
||||
List<OrderQueueDTO> allTasks = getTasksByUserId(userId);
|
||||
|
||||
// 过滤出 PAUSED 状态的任务,并按中断时间排序
|
||||
return allTasks.stream()
|
||||
.filter(task -> OrderQueueStatusEnum.PAUSED.getStatus().equals(task.getQueueStatus()))
|
||||
.sorted((a, b) -> {
|
||||
// 按中断时间排序(最早中断的排在前面)
|
||||
// 如果没有中断时间字段,则按入队时间排序
|
||||
if (a.getEnqueueTime() != null && b.getEnqueueTime() != null) {
|
||||
return a.getEnqueueTime().compareTo(b.getEnqueueTime());
|
||||
}
|
||||
return 0;
|
||||
})
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
|
||||
@Override
|
||||
public OrderQueueDTO getById(Long queueId) {
|
||||
// 1. 优先从 Redis Hash 获取
|
||||
OrderQueueDTO redisDTO = redisQueueService.getByQueueId(queueId);
|
||||
if (redisDTO != null) {
|
||||
return redisDTO;
|
||||
}
|
||||
|
||||
// 2. Redis 未命中,从 MySQL 获取
|
||||
OpsOrderQueueDO queueDO = orderQueueMapper.selectById(queueId);
|
||||
if (queueDO != null) {
|
||||
OrderQueueDTO dto = convertToDTO(queueDO);
|
||||
// 同步到 Redis
|
||||
CompletableFuture.runAsync(() -> {
|
||||
try {
|
||||
redisQueueService.enqueue(dto);
|
||||
} catch (Exception e) {
|
||||
log.error("同步到 Redis 失败: queueId={}", dto.getId(), e);
|
||||
}
|
||||
});
|
||||
return dto;
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public OrderQueueDTO getByOpsOrderId(Long opsOrderId) {
|
||||
// 1. 优先从 Redis 获取(遍历所有用户队列,性能较差,慎用)
|
||||
OrderQueueDTO redisDTO = redisQueueService.getByOrderId(opsOrderId);
|
||||
if (redisDTO != null) {
|
||||
return redisDTO;
|
||||
}
|
||||
|
||||
// 2. Redis 未命中,从 MySQL 获取
|
||||
OpsOrderQueueDO queueDO = orderQueueMapper.selectByOpsOrderId(opsOrderId);
|
||||
if (queueDO != null) {
|
||||
return convertToDTO(queueDO);
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Long countWaiting() {
|
||||
// 优先从 Redis 获取(需要聚合所有用户)
|
||||
// TODO: 实现分布式计数器
|
||||
return orderQueueMapper.countWaiting();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Long countByUserId(Long userId) {
|
||||
// 优先从 Redis 获取
|
||||
long redisCount = redisQueueService.getQueueSize(userId);
|
||||
if (redisCount > 0) {
|
||||
return redisCount;
|
||||
}
|
||||
|
||||
// Redis 未命中,从 MySQL 获取
|
||||
return orderQueueMapper.countByUserId(userId);
|
||||
}
|
||||
|
||||
@Override
|
||||
@Transactional(rollbackFor = Exception.class)
|
||||
public boolean updateEventMessage(Long queueId, String eventMessage) {
|
||||
OpsOrderQueueDO queueDO = orderQueueMapper.selectById(queueId);
|
||||
if (queueDO == null) {
|
||||
log.warn("队列记录不存在: queueId={}", queueId);
|
||||
return false;
|
||||
}
|
||||
|
||||
queueDO.setEventMessage(eventMessage);
|
||||
int updated = orderQueueMapper.updateById(queueDO);
|
||||
|
||||
log.info("事件消息已更新: queueId={}, eventMessage={}", queueId, eventMessage);
|
||||
|
||||
return updated > 0;
|
||||
}
|
||||
|
||||
// ========== 私有方法 ==========
|
||||
|
||||
/**
|
||||
* 计算队列分数(用于排序)
|
||||
* 公式:优先级分数 + 时间戳
|
||||
*
|
||||
* @param priority 优先级(0=P0, 1=P1, 2=P2, 3=P3)
|
||||
* @param enqueueTime 入队时间
|
||||
* @return 队列分数
|
||||
*/
|
||||
private double calculateQueueScore(Integer priority, LocalDateTime enqueueTime) {
|
||||
// 获取优先级分数
|
||||
long priorityScore = PRIORITY_SCORES.getOrDefault(priority, 3000000L);
|
||||
|
||||
// 计算时间戳(秒级)
|
||||
long timestamp;
|
||||
if (enqueueTime != null) {
|
||||
timestamp = enqueueTime.atZone(ZoneId.systemDefault()).toEpochSecond();
|
||||
} else {
|
||||
timestamp = System.currentTimeMillis() / 1000;
|
||||
}
|
||||
|
||||
return priorityScore + timestamp;
|
||||
}
|
||||
|
||||
/**
|
||||
* 计算下一个队列顺序号
|
||||
*/
|
||||
private Integer calculateNextQueueIndex(PriorityEnum priority) {
|
||||
// TODO: 查询当前优先级的最大 queueIndex,然后 +1
|
||||
// 这里简化处理,返回默认值
|
||||
return priority.getQueueOrder() * 1000;
|
||||
}
|
||||
|
||||
/**
|
||||
* 判断是否为终态
|
||||
*/
|
||||
private boolean isTerminalStatus(String status) {
|
||||
return "CANCELLED".equals(status) || "COMPLETED".equals(status)
|
||||
|| "REMOVED".equals(status) || "FAILED".equals(status);
|
||||
}
|
||||
|
||||
/**
|
||||
* 校验状态流转是否合法
|
||||
*/
|
||||
private boolean validateStatusTransition(String oldStatus, String newStatus) {
|
||||
// 终态不能再变更
|
||||
if (isTerminalStatus(oldStatus)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
// 允许的状态流转(支持新旧状态)
|
||||
return switch (oldStatus.toUpperCase()) {
|
||||
case "PENDING" -> "WAITING".equals(newStatus) || "CANCELLED".equals(newStatus);
|
||||
case "WAITING" -> "PROCESSING".equals(newStatus) || "DISPATCHED".equals(newStatus)
|
||||
|| "CANCELLED".equals(newStatus);
|
||||
case "PROCESSING", "DISPATCHED" -> "PAUSED".equals(newStatus)
|
||||
|| "REMOVED".equals(newStatus) || "COMPLETED".equals(newStatus);
|
||||
case "PAUSED" -> "PROCESSING".equals(newStatus) || "DISPATCHED".equals(newStatus);
|
||||
default -> false;
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* 根据状态更新时间戳
|
||||
*/
|
||||
private void updateStatusTimestamp(OpsOrderQueueDO queueDO, OrderQueueStatusEnum status) {
|
||||
switch (status.getStatus().toUpperCase()) {
|
||||
case "DISPATCHED" -> {
|
||||
if (queueDO.getDequeueTime() == null) {
|
||||
queueDO.setDequeueTime(LocalDateTime.now());
|
||||
}
|
||||
}
|
||||
case "REMOVED", "COMPLETED" -> {
|
||||
// 可以记录完成时间(如果表有该字段)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 转换为 DTO
|
||||
*/
|
||||
private OrderQueueDTO convertToDTO(OpsOrderQueueDO queueDO) {
|
||||
OrderQueueDTO dto = new OrderQueueDTO();
|
||||
BeanUtils.copyProperties(queueDO, dto);
|
||||
return dto;
|
||||
}
|
||||
|
||||
/**
|
||||
* 批量转换为 DTO
|
||||
*/
|
||||
private List<OrderQueueDTO> convertToDTO(List<OpsOrderQueueDO> list) {
|
||||
return list.stream()
|
||||
.map(this::convertToDTO)
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user