From 57f32e56a97cf1ae2ff437affa6fe69cede68689 Mon Sep 17 00:00:00 2001 From: lzh Date: Sat, 7 Mar 2026 22:44:09 +0800 Subject: [PATCH] =?UTF-8?q?fix(ops):=20=E6=94=B6=E5=8F=A3=E9=98=9F?= =?UTF-8?q?=E5=88=97=20Redis=20=E5=88=86=E6=95=B0=E6=9D=A5=E6=BA=90?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 移除 Redis 队列层基于优先级和时间戳的本地兜底算分逻辑 - 强制 enqueue、batchEnqueue、updatePriority 使用服务层预先计算的 queueScore - 兼容历史缺少 queueScore 的 Redis 记录,按最低优先级处理避免旧模型重新参与排序 - 补齐 QueueSyncService 的 queueScore 映射,确保 MySQL 同步到 Redis 时保留总分 - 新增 QueueSyncServiceTest 覆盖同步链路携带 queueScore 的行为 --- .../ops/service/queue/QueueSyncService.java | 1 + .../queue/RedisOrderQueueServiceImpl.java | 65 +++++-------------- .../service/queue/QueueSyncServiceTest.java | 61 +++++++++++++++++ 3 files changed, 77 insertions(+), 50 deletions(-) create mode 100644 viewsh-module-ops/viewsh-module-ops-biz/src/test/java/com/viewsh/module/ops/service/queue/QueueSyncServiceTest.java diff --git a/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/queue/QueueSyncService.java b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/queue/QueueSyncService.java index 57168a0..2ec000a 100644 --- a/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/queue/QueueSyncService.java +++ b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/queue/QueueSyncService.java @@ -232,6 +232,7 @@ public class QueueSyncService { dto.setUserId(queueDO.getUserId()); dto.setQueueIndex(queueDO.getQueueIndex()); dto.setPriority(queueDO.getPriority()); + dto.setQueueScore(queueDO.getQueueScore()); dto.setQueueStatus(queueDO.getQueueStatus()); dto.setEnqueueTime(queueDO.getEnqueueTime()); dto.setDequeueTime(queueDO.getDequeueTime()); diff --git a/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/queue/RedisOrderQueueServiceImpl.java b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/queue/RedisOrderQueueServiceImpl.java index c368e3c..24a7ad7 100644 --- a/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/queue/RedisOrderQueueServiceImpl.java +++ b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/queue/RedisOrderQueueServiceImpl.java @@ -9,7 +9,6 @@ import org.springframework.data.redis.core.script.DefaultRedisScript; import org.springframework.stereotype.Service; import java.time.LocalDateTime; -import java.time.ZoneId; import java.util.*; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -27,29 +26,14 @@ public class RedisOrderQueueServiceImpl implements RedisOrderQueueService { @Resource private StringRedisTemplate stringRedisTemplate; - /** - * Score 计算公式:优先级分数 + 时间戳 - * 优先级分数:P0=0, P1=1000000, P2=2000000, P3=3000000 - * 时间戳:毫秒级时间戳 - * 结果:优先级高的排在前面,同优先级按时间排序 - */ - private static final Map PRIORITY_SCORES = Map.of( - 0, 0L, // P0: 0 - 1, 1000000L, // P1: 1,000,000 - 2, 2000000L, // P2: 2,000,000 - 3, 3000000L // P3: 3,000,000 - ); - @Override public boolean enqueue(OrderQueueDTO dto) { try { String queueKey = QUEUE_KEY_PREFIX + dto.getUserId(); String infoKey = INFO_KEY_PREFIX + dto.getId(); - // 1. 计算分数(优先级 + 时间戳) - double score = dto.getQueueScore() != null - ? dto.getQueueScore() - : calculateScore(dto.getPriority(), dto.getEnqueueTime()); + // Redis 仅持久化服务层已计算好的最终总分,不再本地兜底重算 + double score = requireQueueScore(dto, "enqueue"); dto.setQueueScore(score); // 2. 添加到 Sorted Set(使用 queueId 作为 member,而非 JSON) @@ -85,10 +69,8 @@ public class RedisOrderQueueServiceImpl implements RedisOrderQueueService { byte[] queueKey = (QUEUE_KEY_PREFIX + dto.getUserId()).getBytes(); byte[] infoKey = (INFO_KEY_PREFIX + dto.getId()).getBytes(); - // 计算分数并设置到 DTO - double score = dto.getQueueScore() != null - ? dto.getQueueScore() - : calculateScore(dto.getPriority(), dto.getEnqueueTime()); + // Redis 仅持久化服务层已计算好的最终总分,不再本地兜底重算 + double score = requireQueueScore(dto, "batchEnqueue"); dto.setQueueScore(score); // 添加到 Sorted Set(使用 queueId 作为 member) @@ -338,9 +320,7 @@ public class RedisOrderQueueServiceImpl implements RedisOrderQueueService { String queueKey = QUEUE_KEY_PREFIX + dto.getUserId(); dto.setPriority(newPriority); - double newScore = dto.getQueueScore() != null - ? dto.getQueueScore() - : calculateScore(newPriority, dto.getEnqueueTime()); + double newScore = requireQueueScore(dto, "updatePriority"); // 使用 Lua 脚本原子性更新 Hash 和 Sorted Set String script = @@ -554,23 +534,6 @@ public class RedisOrderQueueServiceImpl implements RedisOrderQueueService { // ========== 私有方法 ========== - /** - * 计算分数(优先级 + 时间戳) - */ - private double calculateScore(Integer priority, LocalDateTime enqueueTime) { - long priorityScore = PRIORITY_SCORES.getOrDefault(priority, 3000000L); - long timestamp; - if (enqueueTime != null) { - timestamp = enqueueTime - .atZone(ZoneId.systemDefault()) - .toEpochSecond(); - } else { - timestamp = System.currentTimeMillis() / 1000; - } - - return priorityScore + timestamp; - } - /** * 将 DTO 转换为 Map(用于 Hash 存储,所有值显式转 String,确保跨路径序列化一致) */ @@ -670,14 +633,9 @@ public class RedisOrderQueueServiceImpl implements RedisOrderQueueService { if (queueScoreObj != null) { dto.setQueueScore(Double.parseDouble(queueScoreObj.toString())); } else { - // 如果没有存储,则根据 priority 和 enqueueTime 计算 - Integer priority = dto.getPriority(); - LocalDateTime enqueueTime = null; - Object enqueueTimeObj = map.get("enqueueTime"); - if (enqueueTimeObj != null) { - enqueueTime = LocalDateTime.parse(enqueueTimeObj.toString()); - } - dto.setQueueScore(calculateScore(priority, enqueueTime)); + // 历史数据兼容:没有总分时将其视为最低优先级,避免旧模型再次参与排序 + log.warn("Redis 队列记录缺少 queueScore,按最低优先级处理: queueId={}", dto.getId()); + dto.setQueueScore(Double.MAX_VALUE); } // 字符串转换为 LocalDateTime @@ -712,4 +670,11 @@ public class RedisOrderQueueServiceImpl implements RedisOrderQueueService { return null; } } + + private double requireQueueScore(OrderQueueDTO dto, String operation) { + if (dto == null || dto.getQueueScore() == null) { + throw new IllegalArgumentException("queueScore is required for Redis " + operation); + } + return dto.getQueueScore(); + } } diff --git a/viewsh-module-ops/viewsh-module-ops-biz/src/test/java/com/viewsh/module/ops/service/queue/QueueSyncServiceTest.java b/viewsh-module-ops/viewsh-module-ops-biz/src/test/java/com/viewsh/module/ops/service/queue/QueueSyncServiceTest.java new file mode 100644 index 0000000..84e4878 --- /dev/null +++ b/viewsh-module-ops/viewsh-module-ops-biz/src/test/java/com/viewsh/module/ops/service/queue/QueueSyncServiceTest.java @@ -0,0 +1,61 @@ +package com.viewsh.module.ops.service.queue; + +import com.viewsh.module.ops.api.queue.OrderQueueDTO; +import com.viewsh.module.ops.dal.dataobject.queue.OpsOrderQueueDO; +import com.viewsh.module.ops.dal.mysql.queue.OpsOrderQueueMapper; +import com.viewsh.module.ops.enums.OrderQueueStatusEnum; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.ArgumentCaptor; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import java.time.LocalDateTime; +import java.util.List; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +class QueueSyncServiceTest { + + @Mock + private OpsOrderQueueMapper orderQueueMapper; + @Mock + private RedisOrderQueueService redisQueueService; + + @InjectMocks + private QueueSyncService queueSyncService; + + @Test + void shouldCarryPersistedQueueScoreWhenSyncUserQueueToRedis() { + Long userId = 1001L; + OpsOrderQueueDO queueDO = OpsOrderQueueDO.builder() + .id(11L) + .opsOrderId(22L) + .userId(userId) + .queueIndex(1) + .priority(0) + .queueScore(-120D) + .queueStatus(OrderQueueStatusEnum.WAITING.getStatus()) + .enqueueTime(LocalDateTime.now().minusMinutes(10)) + .build(); + + when(orderQueueMapper.selectList(any())).thenReturn(List.of(queueDO)); + when(redisQueueService.clearQueue(userId)).thenReturn(1L); + when(redisQueueService.batchEnqueue(any())).thenReturn(1L); + + queueSyncService.syncUserQueueToRedis(userId); + + ArgumentCaptor> captor = ArgumentCaptor.forClass(List.class); + verify(redisQueueService).batchEnqueue(captor.capture()); + List syncedTasks = captor.getValue(); + assertNotNull(syncedTasks); + assertEquals(1, syncedTasks.size()); + assertEquals(-120D, syncedTasks.get(0).getQueueScore()); + } +}