fix(ops): 迁移updateStatus修复到Enhanced实现
Some checks failed
Java CI with Maven / build (11) (push) Has been cancelled
Java CI with Maven / build (17) (push) Has been cancelled
Java CI with Maven / build (8) (push) Has been cancelled

REMOVED 状态改为同步更新 Redis,避免自动派单查询到已完成任务

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
lzh
2026-01-31 19:05:26 +08:00
parent 009fab543f
commit b73ef4f39f

View File

@@ -31,9 +31,9 @@ import java.util.stream.Collectors;
* @author lzh * @author lzh
*/ */
@Slf4j @Slf4j
@Service @Service
@Primary @Primary
public class OrderQueueServiceEnhanced implements OrderQueueService { public class OrderQueueServiceEnhanced implements OrderQueueService {
/** /**
* Score 计算公式:优先级分数 + 时间戳 * Score 计算公式:优先级分数 + 时间戳
@@ -179,17 +179,30 @@ public class OrderQueueServiceEnhanced implements OrderQueueService {
int updated = orderQueueMapper.updateById(queueDO); int updated = orderQueueMapper.updateById(queueDO);
// 2. 异步更新 Redis // 2. 更新 Redis
if (updated > 0) { if (updated > 0) {
OrderQueueDTO dto = convertToDTO(queueDO); OrderQueueDTO dto = convertToDTO(queueDO);
CompletableFuture.runAsync(() -> {
// REMOVED 状态需要同步更新,确保任务完成后立即从 Redis 队列移除
// 避免自动派单下一个时查询到已完成的任务
if (OrderQueueStatusEnum.REMOVED == newStatus) {
try { try {
redisQueueService.updateStatus(queueId, newStatus.getStatus()); redisQueueService.updateStatus(queueId, newStatus.getStatus());
log.debug("Redis 更新状态成功: queueId={}, status={}", queueId, newStatus.getStatus()); log.debug("Redis 同步更新状态成功: queueId={}, status={}", queueId, newStatus.getStatus());
} catch (Exception e) { } catch (Exception e) {
log.error("Redis 更新状态失败: queueId={}", queueId, e); log.error("Redis 同步更新状态失败: queueId={}", queueId, e);
} }
}); } else {
// 其他状态异步更新
CompletableFuture.runAsync(() -> {
try {
redisQueueService.updateStatus(queueId, newStatus.getStatus());
log.debug("Redis 异步更新状态成功: queueId={}, status={}", queueId, newStatus.getStatus());
} catch (Exception e) {
log.error("Redis 异步更新状态失败: queueId={}", queueId, e);
}
});
}
} }
log.info("队列状态已更新: queueId={}, opsOrderId={}, oldStatus={}, newStatus={}", log.info("队列状态已更新: queueId={}, opsOrderId={}, oldStatus={}, newStatus={}",
@@ -464,48 +477,48 @@ public class OrderQueueServiceEnhanced implements OrderQueueService {
return Collections.emptyList(); return Collections.emptyList();
} }
@Override @Override
public List<OrderQueueDTO> getWaitingTasksByUserId(Long userId) { public List<OrderQueueDTO> getWaitingTasksByUserId(Long userId) {
// 获取所有任务 // 获取所有任务
List<OrderQueueDTO> allTasks = getTasksByUserId(userId); List<OrderQueueDTO> allTasks = getTasksByUserId(userId);
// 过滤出 WAITING 状态的任务,并按队列分数排序 // 过滤出 WAITING 状态的任务,并按队列分数排序
return filterAndSortWaitingTasks(allTasks); return filterAndSortWaitingTasks(allTasks);
} }
@Override @Override
public List<OrderQueueDTO> getWaitingTasksByUserIdFromDb(Long userId) { public List<OrderQueueDTO> getWaitingTasksByUserIdFromDb(Long userId) {
// 直接从 MySQL 获取所有任务 // 直接从 MySQL 获取所有任务
List<OpsOrderQueueDO> mysqlList = orderQueueMapper.selectListByUserId(userId); List<OpsOrderQueueDO> mysqlList = orderQueueMapper.selectListByUserId(userId);
List<OrderQueueDTO> allTasks = convertToDTO(mysqlList); List<OrderQueueDTO> allTasks = convertToDTO(mysqlList);
// 过滤出 WAITING 状态的任务,并按队列分数排序 // 过滤出 WAITING 状态的任务,并按队列分数排序
return filterAndSortWaitingTasks(allTasks); return filterAndSortWaitingTasks(allTasks);
} }
private List<OrderQueueDTO> filterAndSortWaitingTasks(List<OrderQueueDTO> allTasks) { private List<OrderQueueDTO> filterAndSortWaitingTasks(List<OrderQueueDTO> allTasks) {
return allTasks.stream() return allTasks.stream()
.filter(task -> OrderQueueStatusEnum.WAITING.getStatus().equals(task.getQueueStatus())) .filter(task -> OrderQueueStatusEnum.WAITING.getStatus().equals(task.getQueueStatus()))
.sorted((a, b) -> { .sorted((a, b) -> {
// 优先使用队列分数 score 排序(已在入队时计算好) // 优先使用队列分数 score 排序(已在入队时计算好)
if (a.getQueueScore() != null && b.getQueueScore() != null) { if (a.getQueueScore() != null && b.getQueueScore() != null) {
return Double.compare(a.getQueueScore(), b.getQueueScore()); return Double.compare(a.getQueueScore(), b.getQueueScore());
} }
// 兜底:如果 score 为空,则按优先级+时间排序 // 兜底:如果 score 为空,则按优先级+时间排序
// 按优先级排序P0 > P1 > P2 > P3 // 按优先级排序P0 > P1 > P2 > P3
int priorityCompare = Integer.compare( int priorityCompare = Integer.compare(
b.getPriority() != null ? b.getPriority() : 999, b.getPriority() != null ? b.getPriority() : 999,
a.getPriority() != null ? a.getPriority() : 999 a.getPriority() != null ? a.getPriority() : 999
); );
if (priorityCompare != 0) { if (priorityCompare != 0) {
return priorityCompare; return priorityCompare;
} }
// 优先级相同,按入队时间排序 // 优先级相同,按入队时间排序
return a.getEnqueueTime().compareTo(b.getEnqueueTime()); return a.getEnqueueTime().compareTo(b.getEnqueueTime());
}) })
.collect(Collectors.toList()); .collect(Collectors.toList());
} }
@Override @Override
public List<OrderQueueDTO> getInterruptedTasksByUserId(Long userId) { public List<OrderQueueDTO> getInterruptedTasksByUserId(Long userId) {