diff --git a/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/queue/QueueSyncService.java b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/queue/QueueSyncService.java index 57168a0..2ec000a 100644 --- a/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/queue/QueueSyncService.java +++ b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/queue/QueueSyncService.java @@ -232,6 +232,7 @@ public class QueueSyncService { dto.setUserId(queueDO.getUserId()); dto.setQueueIndex(queueDO.getQueueIndex()); dto.setPriority(queueDO.getPriority()); + dto.setQueueScore(queueDO.getQueueScore()); dto.setQueueStatus(queueDO.getQueueStatus()); dto.setEnqueueTime(queueDO.getEnqueueTime()); dto.setDequeueTime(queueDO.getDequeueTime()); diff --git a/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/queue/RedisOrderQueueServiceImpl.java b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/queue/RedisOrderQueueServiceImpl.java index c368e3c..24a7ad7 100644 --- a/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/queue/RedisOrderQueueServiceImpl.java +++ b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/queue/RedisOrderQueueServiceImpl.java @@ -9,7 +9,6 @@ import org.springframework.data.redis.core.script.DefaultRedisScript; import org.springframework.stereotype.Service; import java.time.LocalDateTime; -import java.time.ZoneId; import java.util.*; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -27,29 +26,14 @@ public class RedisOrderQueueServiceImpl implements RedisOrderQueueService { @Resource private StringRedisTemplate stringRedisTemplate; - /** - * Score 计算公式:优先级分数 + 时间戳 - * 优先级分数:P0=0, P1=1000000, P2=2000000, P3=3000000 - * 时间戳:毫秒级时间戳 - * 结果:优先级高的排在前面,同优先级按时间排序 - */ - private static final Map PRIORITY_SCORES = Map.of( - 0, 0L, // P0: 0 - 1, 1000000L, // P1: 1,000,000 - 2, 2000000L, // P2: 2,000,000 - 3, 3000000L // P3: 3,000,000 - ); - @Override public boolean enqueue(OrderQueueDTO dto) { try { String queueKey = QUEUE_KEY_PREFIX + dto.getUserId(); String infoKey = INFO_KEY_PREFIX + dto.getId(); - // 1. 计算分数(优先级 + 时间戳) - double score = dto.getQueueScore() != null - ? dto.getQueueScore() - : calculateScore(dto.getPriority(), dto.getEnqueueTime()); + // Redis 仅持久化服务层已计算好的最终总分,不再本地兜底重算 + double score = requireQueueScore(dto, "enqueue"); dto.setQueueScore(score); // 2. 添加到 Sorted Set(使用 queueId 作为 member,而非 JSON) @@ -85,10 +69,8 @@ public class RedisOrderQueueServiceImpl implements RedisOrderQueueService { byte[] queueKey = (QUEUE_KEY_PREFIX + dto.getUserId()).getBytes(); byte[] infoKey = (INFO_KEY_PREFIX + dto.getId()).getBytes(); - // 计算分数并设置到 DTO - double score = dto.getQueueScore() != null - ? dto.getQueueScore() - : calculateScore(dto.getPriority(), dto.getEnqueueTime()); + // Redis 仅持久化服务层已计算好的最终总分,不再本地兜底重算 + double score = requireQueueScore(dto, "batchEnqueue"); dto.setQueueScore(score); // 添加到 Sorted Set(使用 queueId 作为 member) @@ -338,9 +320,7 @@ public class RedisOrderQueueServiceImpl implements RedisOrderQueueService { String queueKey = QUEUE_KEY_PREFIX + dto.getUserId(); dto.setPriority(newPriority); - double newScore = dto.getQueueScore() != null - ? dto.getQueueScore() - : calculateScore(newPriority, dto.getEnqueueTime()); + double newScore = requireQueueScore(dto, "updatePriority"); // 使用 Lua 脚本原子性更新 Hash 和 Sorted Set String script = @@ -554,23 +534,6 @@ public class RedisOrderQueueServiceImpl implements RedisOrderQueueService { // ========== 私有方法 ========== - /** - * 计算分数(优先级 + 时间戳) - */ - private double calculateScore(Integer priority, LocalDateTime enqueueTime) { - long priorityScore = PRIORITY_SCORES.getOrDefault(priority, 3000000L); - long timestamp; - if (enqueueTime != null) { - timestamp = enqueueTime - .atZone(ZoneId.systemDefault()) - .toEpochSecond(); - } else { - timestamp = System.currentTimeMillis() / 1000; - } - - return priorityScore + timestamp; - } - /** * 将 DTO 转换为 Map(用于 Hash 存储,所有值显式转 String,确保跨路径序列化一致) */ @@ -670,14 +633,9 @@ public class RedisOrderQueueServiceImpl implements RedisOrderQueueService { if (queueScoreObj != null) { dto.setQueueScore(Double.parseDouble(queueScoreObj.toString())); } else { - // 如果没有存储,则根据 priority 和 enqueueTime 计算 - Integer priority = dto.getPriority(); - LocalDateTime enqueueTime = null; - Object enqueueTimeObj = map.get("enqueueTime"); - if (enqueueTimeObj != null) { - enqueueTime = LocalDateTime.parse(enqueueTimeObj.toString()); - } - dto.setQueueScore(calculateScore(priority, enqueueTime)); + // 历史数据兼容:没有总分时将其视为最低优先级,避免旧模型再次参与排序 + log.warn("Redis 队列记录缺少 queueScore,按最低优先级处理: queueId={}", dto.getId()); + dto.setQueueScore(Double.MAX_VALUE); } // 字符串转换为 LocalDateTime @@ -712,4 +670,11 @@ public class RedisOrderQueueServiceImpl implements RedisOrderQueueService { return null; } } + + private double requireQueueScore(OrderQueueDTO dto, String operation) { + if (dto == null || dto.getQueueScore() == null) { + throw new IllegalArgumentException("queueScore is required for Redis " + operation); + } + return dto.getQueueScore(); + } } diff --git a/viewsh-module-ops/viewsh-module-ops-biz/src/test/java/com/viewsh/module/ops/service/queue/QueueSyncServiceTest.java b/viewsh-module-ops/viewsh-module-ops-biz/src/test/java/com/viewsh/module/ops/service/queue/QueueSyncServiceTest.java new file mode 100644 index 0000000..84e4878 --- /dev/null +++ b/viewsh-module-ops/viewsh-module-ops-biz/src/test/java/com/viewsh/module/ops/service/queue/QueueSyncServiceTest.java @@ -0,0 +1,61 @@ +package com.viewsh.module.ops.service.queue; + +import com.viewsh.module.ops.api.queue.OrderQueueDTO; +import com.viewsh.module.ops.dal.dataobject.queue.OpsOrderQueueDO; +import com.viewsh.module.ops.dal.mysql.queue.OpsOrderQueueMapper; +import com.viewsh.module.ops.enums.OrderQueueStatusEnum; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.ArgumentCaptor; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import java.time.LocalDateTime; +import java.util.List; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +class QueueSyncServiceTest { + + @Mock + private OpsOrderQueueMapper orderQueueMapper; + @Mock + private RedisOrderQueueService redisQueueService; + + @InjectMocks + private QueueSyncService queueSyncService; + + @Test + void shouldCarryPersistedQueueScoreWhenSyncUserQueueToRedis() { + Long userId = 1001L; + OpsOrderQueueDO queueDO = OpsOrderQueueDO.builder() + .id(11L) + .opsOrderId(22L) + .userId(userId) + .queueIndex(1) + .priority(0) + .queueScore(-120D) + .queueStatus(OrderQueueStatusEnum.WAITING.getStatus()) + .enqueueTime(LocalDateTime.now().minusMinutes(10)) + .build(); + + when(orderQueueMapper.selectList(any())).thenReturn(List.of(queueDO)); + when(redisQueueService.clearQueue(userId)).thenReturn(1L); + when(redisQueueService.batchEnqueue(any())).thenReturn(1L); + + queueSyncService.syncUserQueueToRedis(userId); + + ArgumentCaptor> captor = ArgumentCaptor.forClass(List.class); + verify(redisQueueService).batchEnqueue(captor.capture()); + List syncedTasks = captor.getValue(); + assertNotNull(syncedTasks); + assertEquals(1, syncedTasks.size()); + assertEquals(-120D, syncedTasks.get(0).getQueueScore()); + } +}