fix(ops): 收口队列 Redis 分数来源
- 移除 Redis 队列层基于优先级和时间戳的本地兜底算分逻辑 - 强制 enqueue、batchEnqueue、updatePriority 使用服务层预先计算的 queueScore - 兼容历史缺少 queueScore 的 Redis 记录,按最低优先级处理避免旧模型重新参与排序 - 补齐 QueueSyncService 的 queueScore 映射,确保 MySQL 同步到 Redis 时保留总分 - 新增 QueueSyncServiceTest 覆盖同步链路携带 queueScore 的行为
This commit is contained in:
@@ -232,6 +232,7 @@ public class QueueSyncService {
|
|||||||
dto.setUserId(queueDO.getUserId());
|
dto.setUserId(queueDO.getUserId());
|
||||||
dto.setQueueIndex(queueDO.getQueueIndex());
|
dto.setQueueIndex(queueDO.getQueueIndex());
|
||||||
dto.setPriority(queueDO.getPriority());
|
dto.setPriority(queueDO.getPriority());
|
||||||
|
dto.setQueueScore(queueDO.getQueueScore());
|
||||||
dto.setQueueStatus(queueDO.getQueueStatus());
|
dto.setQueueStatus(queueDO.getQueueStatus());
|
||||||
dto.setEnqueueTime(queueDO.getEnqueueTime());
|
dto.setEnqueueTime(queueDO.getEnqueueTime());
|
||||||
dto.setDequeueTime(queueDO.getDequeueTime());
|
dto.setDequeueTime(queueDO.getDequeueTime());
|
||||||
|
|||||||
@@ -9,7 +9,6 @@ import org.springframework.data.redis.core.script.DefaultRedisScript;
|
|||||||
import org.springframework.stereotype.Service;
|
import org.springframework.stereotype.Service;
|
||||||
|
|
||||||
import java.time.LocalDateTime;
|
import java.time.LocalDateTime;
|
||||||
import java.time.ZoneId;
|
|
||||||
import java.util.*;
|
import java.util.*;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
@@ -27,29 +26,14 @@ public class RedisOrderQueueServiceImpl implements RedisOrderQueueService {
|
|||||||
@Resource
|
@Resource
|
||||||
private StringRedisTemplate stringRedisTemplate;
|
private StringRedisTemplate stringRedisTemplate;
|
||||||
|
|
||||||
/**
|
|
||||||
* Score 计算公式:优先级分数 + 时间戳
|
|
||||||
* 优先级分数:P0=0, P1=1000000, P2=2000000, P3=3000000
|
|
||||||
* 时间戳:毫秒级时间戳
|
|
||||||
* 结果:优先级高的排在前面,同优先级按时间排序
|
|
||||||
*/
|
|
||||||
private static final Map<Integer, Long> PRIORITY_SCORES = Map.of(
|
|
||||||
0, 0L, // P0: 0
|
|
||||||
1, 1000000L, // P1: 1,000,000
|
|
||||||
2, 2000000L, // P2: 2,000,000
|
|
||||||
3, 3000000L // P3: 3,000,000
|
|
||||||
);
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean enqueue(OrderQueueDTO dto) {
|
public boolean enqueue(OrderQueueDTO dto) {
|
||||||
try {
|
try {
|
||||||
String queueKey = QUEUE_KEY_PREFIX + dto.getUserId();
|
String queueKey = QUEUE_KEY_PREFIX + dto.getUserId();
|
||||||
String infoKey = INFO_KEY_PREFIX + dto.getId();
|
String infoKey = INFO_KEY_PREFIX + dto.getId();
|
||||||
|
|
||||||
// 1. 计算分数(优先级 + 时间戳)
|
// Redis 仅持久化服务层已计算好的最终总分,不再本地兜底重算
|
||||||
double score = dto.getQueueScore() != null
|
double score = requireQueueScore(dto, "enqueue");
|
||||||
? dto.getQueueScore()
|
|
||||||
: calculateScore(dto.getPriority(), dto.getEnqueueTime());
|
|
||||||
dto.setQueueScore(score);
|
dto.setQueueScore(score);
|
||||||
|
|
||||||
// 2. 添加到 Sorted Set(使用 queueId 作为 member,而非 JSON)
|
// 2. 添加到 Sorted Set(使用 queueId 作为 member,而非 JSON)
|
||||||
@@ -85,10 +69,8 @@ public class RedisOrderQueueServiceImpl implements RedisOrderQueueService {
|
|||||||
byte[] queueKey = (QUEUE_KEY_PREFIX + dto.getUserId()).getBytes();
|
byte[] queueKey = (QUEUE_KEY_PREFIX + dto.getUserId()).getBytes();
|
||||||
byte[] infoKey = (INFO_KEY_PREFIX + dto.getId()).getBytes();
|
byte[] infoKey = (INFO_KEY_PREFIX + dto.getId()).getBytes();
|
||||||
|
|
||||||
// 计算分数并设置到 DTO
|
// Redis 仅持久化服务层已计算好的最终总分,不再本地兜底重算
|
||||||
double score = dto.getQueueScore() != null
|
double score = requireQueueScore(dto, "batchEnqueue");
|
||||||
? dto.getQueueScore()
|
|
||||||
: calculateScore(dto.getPriority(), dto.getEnqueueTime());
|
|
||||||
dto.setQueueScore(score);
|
dto.setQueueScore(score);
|
||||||
|
|
||||||
// 添加到 Sorted Set(使用 queueId 作为 member)
|
// 添加到 Sorted Set(使用 queueId 作为 member)
|
||||||
@@ -338,9 +320,7 @@ public class RedisOrderQueueServiceImpl implements RedisOrderQueueService {
|
|||||||
|
|
||||||
String queueKey = QUEUE_KEY_PREFIX + dto.getUserId();
|
String queueKey = QUEUE_KEY_PREFIX + dto.getUserId();
|
||||||
dto.setPriority(newPriority);
|
dto.setPriority(newPriority);
|
||||||
double newScore = dto.getQueueScore() != null
|
double newScore = requireQueueScore(dto, "updatePriority");
|
||||||
? dto.getQueueScore()
|
|
||||||
: calculateScore(newPriority, dto.getEnqueueTime());
|
|
||||||
|
|
||||||
// 使用 Lua 脚本原子性更新 Hash 和 Sorted Set
|
// 使用 Lua 脚本原子性更新 Hash 和 Sorted Set
|
||||||
String script =
|
String script =
|
||||||
@@ -554,23 +534,6 @@ public class RedisOrderQueueServiceImpl implements RedisOrderQueueService {
|
|||||||
|
|
||||||
// ========== 私有方法 ==========
|
// ========== 私有方法 ==========
|
||||||
|
|
||||||
/**
|
|
||||||
* 计算分数(优先级 + 时间戳)
|
|
||||||
*/
|
|
||||||
private double calculateScore(Integer priority, LocalDateTime enqueueTime) {
|
|
||||||
long priorityScore = PRIORITY_SCORES.getOrDefault(priority, 3000000L);
|
|
||||||
long timestamp;
|
|
||||||
if (enqueueTime != null) {
|
|
||||||
timestamp = enqueueTime
|
|
||||||
.atZone(ZoneId.systemDefault())
|
|
||||||
.toEpochSecond();
|
|
||||||
} else {
|
|
||||||
timestamp = System.currentTimeMillis() / 1000;
|
|
||||||
}
|
|
||||||
|
|
||||||
return priorityScore + timestamp;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 将 DTO 转换为 Map(用于 Hash 存储,所有值显式转 String,确保跨路径序列化一致)
|
* 将 DTO 转换为 Map(用于 Hash 存储,所有值显式转 String,确保跨路径序列化一致)
|
||||||
*/
|
*/
|
||||||
@@ -670,14 +633,9 @@ public class RedisOrderQueueServiceImpl implements RedisOrderQueueService {
|
|||||||
if (queueScoreObj != null) {
|
if (queueScoreObj != null) {
|
||||||
dto.setQueueScore(Double.parseDouble(queueScoreObj.toString()));
|
dto.setQueueScore(Double.parseDouble(queueScoreObj.toString()));
|
||||||
} else {
|
} else {
|
||||||
// 如果没有存储,则根据 priority 和 enqueueTime 计算
|
// 历史数据兼容:没有总分时将其视为最低优先级,避免旧模型再次参与排序
|
||||||
Integer priority = dto.getPriority();
|
log.warn("Redis 队列记录缺少 queueScore,按最低优先级处理: queueId={}", dto.getId());
|
||||||
LocalDateTime enqueueTime = null;
|
dto.setQueueScore(Double.MAX_VALUE);
|
||||||
Object enqueueTimeObj = map.get("enqueueTime");
|
|
||||||
if (enqueueTimeObj != null) {
|
|
||||||
enqueueTime = LocalDateTime.parse(enqueueTimeObj.toString());
|
|
||||||
}
|
|
||||||
dto.setQueueScore(calculateScore(priority, enqueueTime));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// 字符串转换为 LocalDateTime
|
// 字符串转换为 LocalDateTime
|
||||||
@@ -712,4 +670,11 @@ public class RedisOrderQueueServiceImpl implements RedisOrderQueueService {
|
|||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private double requireQueueScore(OrderQueueDTO dto, String operation) {
|
||||||
|
if (dto == null || dto.getQueueScore() == null) {
|
||||||
|
throw new IllegalArgumentException("queueScore is required for Redis " + operation);
|
||||||
|
}
|
||||||
|
return dto.getQueueScore();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -0,0 +1,61 @@
|
|||||||
|
package com.viewsh.module.ops.service.queue;
|
||||||
|
|
||||||
|
import com.viewsh.module.ops.api.queue.OrderQueueDTO;
|
||||||
|
import com.viewsh.module.ops.dal.dataobject.queue.OpsOrderQueueDO;
|
||||||
|
import com.viewsh.module.ops.dal.mysql.queue.OpsOrderQueueMapper;
|
||||||
|
import com.viewsh.module.ops.enums.OrderQueueStatusEnum;
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
import org.junit.jupiter.api.extension.ExtendWith;
|
||||||
|
import org.mockito.ArgumentCaptor;
|
||||||
|
import org.mockito.InjectMocks;
|
||||||
|
import org.mockito.Mock;
|
||||||
|
import org.mockito.junit.jupiter.MockitoExtension;
|
||||||
|
|
||||||
|
import java.time.LocalDateTime;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertNotNull;
|
||||||
|
import static org.mockito.ArgumentMatchers.any;
|
||||||
|
import static org.mockito.Mockito.verify;
|
||||||
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
|
@ExtendWith(MockitoExtension.class)
|
||||||
|
class QueueSyncServiceTest {
|
||||||
|
|
||||||
|
@Mock
|
||||||
|
private OpsOrderQueueMapper orderQueueMapper;
|
||||||
|
@Mock
|
||||||
|
private RedisOrderQueueService redisQueueService;
|
||||||
|
|
||||||
|
@InjectMocks
|
||||||
|
private QueueSyncService queueSyncService;
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void shouldCarryPersistedQueueScoreWhenSyncUserQueueToRedis() {
|
||||||
|
Long userId = 1001L;
|
||||||
|
OpsOrderQueueDO queueDO = OpsOrderQueueDO.builder()
|
||||||
|
.id(11L)
|
||||||
|
.opsOrderId(22L)
|
||||||
|
.userId(userId)
|
||||||
|
.queueIndex(1)
|
||||||
|
.priority(0)
|
||||||
|
.queueScore(-120D)
|
||||||
|
.queueStatus(OrderQueueStatusEnum.WAITING.getStatus())
|
||||||
|
.enqueueTime(LocalDateTime.now().minusMinutes(10))
|
||||||
|
.build();
|
||||||
|
|
||||||
|
when(orderQueueMapper.selectList(any())).thenReturn(List.of(queueDO));
|
||||||
|
when(redisQueueService.clearQueue(userId)).thenReturn(1L);
|
||||||
|
when(redisQueueService.batchEnqueue(any())).thenReturn(1L);
|
||||||
|
|
||||||
|
queueSyncService.syncUserQueueToRedis(userId);
|
||||||
|
|
||||||
|
ArgumentCaptor<List<OrderQueueDTO>> captor = ArgumentCaptor.forClass(List.class);
|
||||||
|
verify(redisQueueService).batchEnqueue(captor.capture());
|
||||||
|
List<OrderQueueDTO> syncedTasks = captor.getValue();
|
||||||
|
assertNotNull(syncedTasks);
|
||||||
|
assertEquals(1, syncedTasks.size());
|
||||||
|
assertEquals(-120D, syncedTasks.get(0).getQueueScore());
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user