diff --git a/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/listener/BadgeDeviceStatusEventListener.java b/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/listener/BadgeDeviceStatusEventListener.java index 921bbdb..3eed733 100644 --- a/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/listener/BadgeDeviceStatusEventListener.java +++ b/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/listener/BadgeDeviceStatusEventListener.java @@ -129,7 +129,7 @@ public class BadgeDeviceStatusEventListener { */ private void handleOrderStatusTransition(Long deviceId, Long orderId, WorkOrderStatusEnum newStatus, OrderStateChangedEvent event) { - var waitingTasks = orderQueueService.getWaitingTasksByUserId(deviceId); + var waitingTasks = orderQueueService.getWaitingTasksByUserIdFromDb(deviceId); switch (newStatus) { case DISPATCHED: // 工单已推送到工牌,设置工单关联,设备状态转为 BUSY diff --git a/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/listener/CleanOrderEventListener.java b/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/listener/CleanOrderEventListener.java index 7e128f7..62ea445 100644 --- a/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/listener/CleanOrderEventListener.java +++ b/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/listener/CleanOrderEventListener.java @@ -92,7 +92,11 @@ public class CleanOrderEventListener { /** * 监听工单创建事件,触发自动调度 */ - @EventListener + /** + * 监听工单创建事件,触发自动调度 + * 使用 @TransactionalEventListener 确保在事务提交后执行,防止异步线程查不到因为事务未提交而"不存在"的工单 + */ + @org.springframework.transaction.event.TransactionalEventListener(phase = org.springframework.transaction.event.TransactionPhase.AFTER_COMMIT) public void onOrderCreated(OrderCreatedEvent event) { if (!"CLEAN".equals(event.getOrderType())) { return; @@ -286,8 +290,8 @@ public class CleanOrderEventListener { if (deviceId != null) { try { - // 获取等待中的任务列表 - var waitingTasks = orderQueueService.getWaitingTasksByUserId(deviceId); + // 获取等待中的任务列表 - 从 MySQL 读取确保包含刚入队的任务 + var waitingTasks = orderQueueService.getWaitingTasksByUserIdFromDb(deviceId); int queueCount = waitingTasks != null ? waitingTasks.size() : 0; // 发送待办增加通知 diff --git a/viewsh-module-ops/viewsh-module-ops-api/src/main/java/com/viewsh/module/ops/api/queue/OrderQueueService.java b/viewsh-module-ops/viewsh-module-ops-api/src/main/java/com/viewsh/module/ops/api/queue/OrderQueueService.java index b4d80e3..81c0171 100644 --- a/viewsh-module-ops/viewsh-module-ops-api/src/main/java/com/viewsh/module/ops/api/queue/OrderQueueService.java +++ b/viewsh-module-ops/viewsh-module-ops-api/src/main/java/com/viewsh/module/ops/api/queue/OrderQueueService.java @@ -155,6 +155,15 @@ public interface OrderQueueService { */ List getWaitingTasksByUserId(Long userId); + /** + * 从数据库直接获取用户的等待中任务列表(忽略 Redis 缓存) + * 用于确保获取最新数据,例如在任务完成后自动派单下一个时 + * + * @param userId 用户ID + * @return 等待中的任务列表(已按队列分数排序) + */ + List getWaitingTasksByUserIdFromDb(Long userId); + /** * 获取用户的暂停任务列表(PAUSED状态,按暂停时间排序) * diff --git a/viewsh-module-ops/viewsh-module-ops-api/src/main/java/com/viewsh/module/ops/enums/OrderQueueStatusEnum.java b/viewsh-module-ops/viewsh-module-ops-api/src/main/java/com/viewsh/module/ops/enums/OrderQueueStatusEnum.java index 7b726ae..87604e8 100644 --- a/viewsh-module-ops/viewsh-module-ops-api/src/main/java/com/viewsh/module/ops/enums/OrderQueueStatusEnum.java +++ b/viewsh-module-ops/viewsh-module-ops-api/src/main/java/com/viewsh/module/ops/enums/OrderQueueStatusEnum.java @@ -28,28 +28,32 @@ public enum OrderQueueStatusEnum implements ArrayValuable { * 等待派单 - 工单已进入队列,等待派单引擎分配 * 对应工单状态: QUEUED */ - WAITING("waiting", "等待中"), + /** + * 等待派单 - 工单已进入队列,等待派单引擎分配 + * 对应工单状态: QUEUED + */ + WAITING("WAITING", "等待中"), /** * 处理中 - 任务已下发,正在处理(包括:已下发、已确认、已到岗) * 对应工单状态: DISPATCHED / CONFIRMED / ARRIVED */ - PROCESSING("processing", "处理中"), - + PROCESSING("PROCESSING", "处理中"), /** * 暂停中 - 任务暂停(如P0任务打断、保洁员临时离线等) * 对应工单状态: PAUSED */ - PAUSED("paused", "暂停中"), + PAUSED("PAUSED", "暂停中"), /** * 已移除 - 任务已完成或已取消 * 对应工单状态: COMPLETED / CANCELLED */ - REMOVED("removed", "已移除"); + REMOVED("REMOVED", "已移除"); - public static final String[] ARRAYS = Arrays.stream(values()).map(OrderQueueStatusEnum::getStatus).toArray(String[]::new); + public static final String[] ARRAYS = Arrays.stream(values()).map(OrderQueueStatusEnum::getStatus) + .toArray(String[]::new); /** * 状态代码 @@ -76,7 +80,7 @@ public enum OrderQueueStatusEnum implements ArrayValuable { if (status == null) { return WAITING; } - + // 兼容旧状态值 switch (status) { case "dispatched": @@ -86,11 +90,11 @@ public enum OrderQueueStatusEnum implements ArrayValuable { default: break; } - + return Arrays.stream(values()) - .filter(e -> e.getStatus().equals(status)) - .findFirst() - .orElse(WAITING); + .filter(e -> e.getStatus().equals(status)) + .findFirst() + .orElse(WAITING); } // ========== 业务判断方法 ========== diff --git a/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/core/dispatch/DispatchEngineImpl.java b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/core/dispatch/DispatchEngineImpl.java index acdf652..066c912 100644 --- a/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/core/dispatch/DispatchEngineImpl.java +++ b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/core/dispatch/DispatchEngineImpl.java @@ -224,7 +224,8 @@ public class DispatchEngineImpl implements DispatchEngine { } // 2. 如果没有中断任务,推送队列中的下一个任务 - List waitingTasks = orderQueueService.getWaitingTasksByUserId(assigneeId); + // 注意:这里必须从 MySQL 读取最新数据,确保刚完成的任务不在等待列表中 + List waitingTasks = orderQueueService.getWaitingTasksByUserIdFromDb(assigneeId); if (waitingTasks.isEmpty()) { log.info("无等待任务,执行人变为空闲: assigneeId={}", assigneeId); @@ -423,8 +424,8 @@ public class DispatchEngineImpl implements DispatchEngine { private DispatchResult executePushAndEnqueue(OrderDispatchContext context, Long assigneeId) { log.info("执行推送等待+新任务入队: orderId={}, assigneeId={}", context.getOrderId(), assigneeId); - // 先推送第一个等待任务(如果有的话) - List waitingTasks = orderQueueService.getWaitingTasksByUserId(assigneeId); + // 先推送第一个等待任务(如果有的话)- 从 MySQL 读取确保数据最新 + List waitingTasks = orderQueueService.getWaitingTasksByUserIdFromDb(assigneeId); if (!waitingTasks.isEmpty()) { OrderQueueDTO firstWaiting = waitingTasks.get(0); OrderTransitionRequest dispatchRequest = OrderTransitionRequest.builder() diff --git a/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/core/lifecycle/handler/QueueSyncHandler.java b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/core/lifecycle/handler/QueueSyncHandler.java index a173208..a442c35 100644 --- a/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/core/lifecycle/handler/QueueSyncHandler.java +++ b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/core/lifecycle/handler/QueueSyncHandler.java @@ -103,7 +103,8 @@ public class QueueSyncHandler extends TransitionHandler { Long queueId = request.getQueueId(); if (queueId == null) { - // 直接派单场景(无队列记录),跳过队列同步,继续执行后续处理器 + // 直接派单场景(无队列记录),用户不希望此类工单进入队列表 + // 只有 WAITING 和 PAUSED 的才需要在队列表中 log.debug("直接派单无队列记录,跳过队列同步: orderId={}", context.getOrder().getId()); return; } @@ -147,18 +148,47 @@ public class QueueSyncHandler extends TransitionHandler { OrderTransitionRequest request = context.getRequest(); Long queueId = request.getQueueId(); + // 1. 尝试查找现有记录 if (queueId == null) { - // 尝试通过工单ID查找队列记录 OrderQueueDTO queueDTO = orderQueueService.getByOpsOrderId(context.getOrder().getId()); if (queueDTO != null) { queueId = queueDTO.getId(); } } + // 2. 如果存在,更新状态 if (queueId != null) { - // 队列状态变更为 PAUSED - orderQueueService.updateStatus(queueId, OrderQueueStatusEnum.PAUSED); + orderQueueService.pauseTask(queueId); // 使用 pauseTask 方法,会自动计算暂停时长 log.debug("队列状态已更新为PAUSED: queueId={}", queueId); + return; + } + + // 3. 如果不存在(例如直接派单后被暂停),则必须创建一条 PAUSED 记录 + // 因为用户要求"暂停的工单"必须在队列中 + try { + Long assigneeId = request.getAssigneeId(); + if (assigneeId == null) { + assigneeId = context.getOrder().getAssigneeId(); + } + + if (assigneeId != null) { + log.info("工单暂停且无队列记录,正在补充创建: orderId={}", context.getOrder().getId()); + queueId = orderQueueService.enqueue( + context.getOrder().getId(), + assigneeId, + context.getOrder().getPriorityEnum(), + null); + + // 立即更新为 PAUSED + orderQueueService.updateStatus(queueId, OrderQueueStatusEnum.PAUSED); + log.info("已补充创建PAUSED队列记录: queueId={}", queueId); + } else { + log.warn("工单暂停但无法创建队列记录(缺少执行人): orderId={}", context.getOrder().getId()); + } + } catch (Exception e) { + log.error("补充创建PAUSED队列记录失败: orderId={}", context.getOrder().getId(), e); + // 抛出异常以回滚状态变更,保证一致性 + throw new IllegalStateException("同步队列暂停状态失败", e); } } } diff --git a/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/dal/mysql/queue/OpsOrderQueueMapper.java b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/dal/mysql/queue/OpsOrderQueueMapper.java index 8977940..f6ed6f6 100644 --- a/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/dal/mysql/queue/OpsOrderQueueMapper.java +++ b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/dal/mysql/queue/OpsOrderQueueMapper.java @@ -62,13 +62,25 @@ public interface OpsOrderQueueMapper extends BaseMapperX { .orderByDesc(OpsOrderQueueDO::getEnqueueTime)); } + /** + * 根据用户ID和状态查询队列列表 + * 用于强制从 MySQL 读取最新数据 + */ + default List selectListByUserIdAndStatus(Long userId, String queueStatus) { + return selectList(new LambdaQueryWrapperX() + .eq(OpsOrderQueueDO::getUserId, userId) + .eq(OpsOrderQueueDO::getQueueStatus, queueStatus) + .orderByAsc(OpsOrderQueueDO::getQueueScore) + .orderByAsc(OpsOrderQueueDO::getEnqueueTime)); + } + /** * 获取用户当前正在执行的任务 */ default OpsOrderQueueDO selectCurrentExecutingByUserId(Long userId) { return selectOne(new LambdaQueryWrapperX() .eq(OpsOrderQueueDO::getUserId, userId) - .eq(OpsOrderQueueDO::getQueueStatus, "EXECUTING") + .eq(OpsOrderQueueDO::getQueueStatus, "PROCESSING") .orderByDesc(OpsOrderQueueDO::getDequeueTime) .last("LIMIT 1")); } @@ -90,8 +102,8 @@ public interface OpsOrderQueueMapper extends BaseMapperX { */ default List selectListExecuting() { return selectList(new LambdaQueryWrapperX() - .eq(OpsOrderQueueDO::getQueueStatus, "EXECUTING") - .orderByAsc(OpsOrderQueueDO::getPriority) // 从低优先级开始 + .eq(OpsOrderQueueDO::getQueueStatus, "PROCESSING") + .orderByAsc(OpsOrderQueueDO::getPriority) // 从低优先级开始 .orderByAsc(OpsOrderQueueDO::getDequeueTime)); } @@ -101,7 +113,7 @@ public interface OpsOrderQueueMapper extends BaseMapperX { default Long countByUserId(Long userId) { return selectCount(new LambdaQueryWrapperX() .eq(OpsOrderQueueDO::getUserId, userId) - .in(OpsOrderQueueDO::getQueueStatus, "WAITING", "EXECUTING")); + .in(OpsOrderQueueDO::getQueueStatus, "WAITING", "PROCESSING")); } /** diff --git a/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/queue/OrderQueueServiceImpl.java b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/queue/OrderQueueServiceImpl.java index f2253e7..ffcd7be 100644 --- a/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/queue/OrderQueueServiceImpl.java +++ b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/queue/OrderQueueServiceImpl.java @@ -40,10 +40,10 @@ public class OrderQueueServiceImpl implements OrderQueueService { * 结果:优先级高的排在前面,同优先级按时间排序 */ private static final Map PRIORITY_SCORES = Map.of( - 0, 0L, // P0: 0 - 1, 1000000L, // P1: 1,000,000 - 2, 2000000L, // P2: 2,000,000 - 3, 3000000L // P3: 3,000,000 + 0, 0L, // P0: 0 + 1, 1000000L, // P1: 1,000,000 + 2, 2000000L, // P2: 2,000,000 + 3, 3000000L // P3: 3,000,000 ); @Resource @@ -61,7 +61,8 @@ public class OrderQueueServiceImpl implements OrderQueueService { // 1. 检查工单是否已在队列中(MySQL) OpsOrderQueueDO existingQueue = orderQueueMapper.selectByOpsOrderId(opsOrderId); if (existingQueue != null) { - log.warn("工单已在队列中: opsOrderId={}, userId={}", opsOrderId, userId); + log.warn("工单已在队列中: opsOrderId={}, userId={}, existingQueueId={}", opsOrderId, userId, + existingQueue.getId()); return existingQueue.getId(); } @@ -177,17 +178,30 @@ public class OrderQueueServiceImpl implements OrderQueueService { int updated = orderQueueMapper.updateById(queueDO); - // 2. 异步更新 Redis + // 2. 更新 Redis if (updated > 0) { OrderQueueDTO dto = convertToDTO(queueDO); - CompletableFuture.runAsync(() -> { + + // REMOVED 状态需要同步更新,确保任务完成后立即从 Redis 队列移除 + // 避免自动派单下一个时查询到已完成的任务 + if (OrderQueueStatusEnum.REMOVED == newStatus) { try { redisQueueService.updateStatus(queueId, newStatus.getStatus()); - log.debug("Redis 更新状态成功: queueId={}, status={}", queueId, newStatus.getStatus()); + log.debug("Redis 同步更新状态成功: queueId={}, status={}", queueId, newStatus.getStatus()); } catch (Exception e) { - log.error("Redis 更新状态失败: queueId={}", queueId, e); + log.error("Redis 同步更新状态失败: queueId={}", queueId, e); } - }); + } else { + // 其他状态异步更新 + CompletableFuture.runAsync(() -> { + try { + redisQueueService.updateStatus(queueId, newStatus.getStatus()); + log.debug("Redis 异步更新状态成功: queueId={}, status={}", queueId, newStatus.getStatus()); + } catch (Exception e) { + log.error("Redis 异步更新状态失败: queueId={}", queueId, e); + } + }); + } } log.info("队列状态已更新: queueId={}, opsOrderId={}, oldStatus={}, newStatus={}", @@ -259,8 +273,7 @@ public class OrderQueueServiceImpl implements OrderQueueService { if (queueDO.getDequeueTime() != null) { long pausedSeconds = java.time.Duration.between( queueDO.getDequeueTime(), - LocalDateTime.now() - ).getSeconds(); + LocalDateTime.now()).getSeconds(); queueDO.addPausedDuration((int) pausedSeconds); } @@ -469,7 +482,7 @@ public class OrderQueueServiceImpl implements OrderQueueService { // 过滤出 WAITING 状态的任务,并按队列分数排序 return allTasks.stream() - .filter(task -> OrderQueueStatusEnum.WAITING.getStatus().equals(task.getQueueStatus())) + .filter(task -> OrderQueueStatusEnum.WAITING.getStatus().equalsIgnoreCase(task.getQueueStatus())) .sorted((a, b) -> { // 优先使用队列分数 score 排序(已在入队时计算好) if (a.getQueueScore() != null && b.getQueueScore() != null) { @@ -480,8 +493,7 @@ public class OrderQueueServiceImpl implements OrderQueueService { // 按优先级排序(P0 > P1 > P2 > P3) int priorityCompare = Integer.compare( b.getPriority() != null ? b.getPriority() : 999, - a.getPriority() != null ? a.getPriority() : 999 - ); + a.getPriority() != null ? a.getPriority() : 999); if (priorityCompare != 0) { return priorityCompare; } @@ -491,6 +503,38 @@ public class OrderQueueServiceImpl implements OrderQueueService { .collect(Collectors.toList()); } + @Override + public List getWaitingTasksByUserIdFromDb(Long userId) { + // 直接从 MySQL 获取等待中的任务(忽略 Redis 缓存) + // 用于确保获取最新数据,例如在任务完成后自动派单下一个时 + List mysqlList = orderQueueMapper.selectListByUserIdAndStatus( + userId, OrderQueueStatusEnum.WAITING.getStatus()); + + if (mysqlList == null || mysqlList.isEmpty()) { + return Collections.emptyList(); + } + + // 转换为 DTO 并按队列分数排序 + return mysqlList.stream() + .map(this::convertToDTO) + .filter(Objects::nonNull) + .sorted((a, b) -> { + // 优先使用队列分数 score 排序 + if (a.getQueueScore() != null && b.getQueueScore() != null) { + return Double.compare(a.getQueueScore(), b.getQueueScore()); + } + // 兜底:如果 score 为空,则按优先级+时间排序 + int priorityCompare = Integer.compare( + b.getPriority() != null ? b.getPriority() : 999, + a.getPriority() != null ? a.getPriority() : 999); + if (priorityCompare != 0) { + return priorityCompare; + } + return a.getEnqueueTime().compareTo(b.getEnqueueTime()); + }) + .collect(Collectors.toList()); + } + @Override public List getInterruptedTasksByUserId(Long userId) { // 获取所有任务 diff --git a/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/queue/RedisOrderQueueServiceImpl.java b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/queue/RedisOrderQueueServiceImpl.java index 4f832e9..4ec6f92 100644 --- a/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/queue/RedisOrderQueueServiceImpl.java +++ b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/queue/RedisOrderQueueServiceImpl.java @@ -54,14 +54,12 @@ public class RedisOrderQueueServiceImpl implements RedisOrderQueueService { // 1. 计算分数(优先级 + 时间戳) double score = calculateScore(dto.getPriority(), dto.getEnqueueTime()); + dto.setQueueScore(score); - // 2. 序列化为 JSON - String json = objectMapper.writeValueAsString(dto); + // 2. 添加到 Sorted Set(使用 queueId 作为 member,而非 JSON) + redisTemplate.opsForZSet().add(queueKey, dto.getId().toString(), score); - // 3. 添加到 Sorted Set - redisTemplate.opsForZSet().add(queueKey, json, score); - - // 4. 存储详细信息到 Hash + // 3. 存储详细信息到 Hash Map infoMap = convertToMap(dto); redisTemplate.opsForHash().putAll(infoKey, infoMap); redisTemplate.expire(infoKey, 24, TimeUnit.HOURS); @@ -91,16 +89,14 @@ public class RedisOrderQueueServiceImpl implements RedisOrderQueueService { byte[] queueKey = (QUEUE_KEY_PREFIX + dto.getUserId()).getBytes(); byte[] infoKey = (INFO_KEY_PREFIX + dto.getId()).getBytes(); - // 计算分数 + // 计算分数并设置到 DTO double score = calculateScore(dto.getPriority(), dto.getEnqueueTime()); + dto.setQueueScore(score); - // 序列化 - byte[] json = objectMapper.writeValueAsBytes(dto); + // 添加到 Sorted Set(使用 queueId 作为 member) + connection.zAdd(queueKey, score, dto.getId().toString().getBytes()); - // 添加到 Sorted Set - connection.zAdd(queueKey, score, json); - - // 存储详细信息 + // 存储详细信息到 Hash Map infoMap = convertToByteMap(dto); connection.hMSet(infoKey, infoMap); @@ -129,7 +125,7 @@ public class RedisOrderQueueServiceImpl implements RedisOrderQueueService { String queueKey = QUEUE_KEY_PREFIX + cleanerId; // 使用 ZPOPMIN 原子性出队(获取并移除最高优先级任务) - // popMin 返回单个 TypedTuple + // popMin 返回单个 TypedTuple,值为 queueId org.springframework.data.redis.core.ZSetOperations.TypedTuple typedTuple = redisTemplate.opsForZSet().popMin(queueKey); @@ -137,19 +133,30 @@ public class RedisOrderQueueServiceImpl implements RedisOrderQueueService { return null; } - // 从 TypedTuple 中获取值 - Object taskObj = typedTuple.getValue(); - String json = (String) taskObj; + // 从 TypedTuple 中获取 queueId + Object queueIdObj = typedTuple.getValue(); + if (queueIdObj == null) { + return null; + } - // 反序列化 - OrderQueueDTO dto = objectMapper.readValue(json, OrderQueueDTO.class); + Long queueId = Long.parseLong(queueIdObj.toString()); - // 从 Hash 中删除详细信息 - String infoKey = INFO_KEY_PREFIX + dto.getId(); + // 从 Hash 获取详细信息 + String infoKey = INFO_KEY_PREFIX + queueId; + Map infoMap = redisTemplate.opsForHash().entries(infoKey); + + if (infoMap == null || infoMap.isEmpty()) { + log.warn("Redis 出队时队列记录不存在: queueId={}", queueId); + return null; + } + + OrderQueueDTO dto = mapToDto(infoMap); + + // 删除 Hash 中的详细信息 redisTemplate.delete(infoKey); log.debug("Redis 出队成功: queueId={}, orderId={}, cleanerId={}", - dto.getId(), dto.getOpsOrderId(), cleanerId); + queueId, dto != null ? dto.getOpsOrderId() : null, cleanerId); return dto; @@ -164,25 +171,31 @@ public class RedisOrderQueueServiceImpl implements RedisOrderQueueService { try { String queueKey = QUEUE_KEY_PREFIX + cleanerId; - // 查询前 N 个任务(不删除) - Set tasks = redisTemplate.opsForZSet().range(queueKey, 0, count - 1); + // 1. 查询前 N 个 queueId(按 score 排序) + Set queueIds = redisTemplate.opsForZSet().range(queueKey, 0, count - 1); - if (tasks == null || tasks.isEmpty()) { + if (queueIds == null || queueIds.isEmpty()) { return Collections.emptyList(); } - // 反序列化 - return tasks.stream() - .map(task -> { + // 2. 批量从 Hash 获取详细信息 + return queueIds.stream() + .map(queueId -> { try { - String json = (String) task; - return objectMapper.readValue(json, OrderQueueDTO.class); + String idStr = queueId.toString(); + String infoKey = INFO_KEY_PREFIX + idStr; + Map infoMap = redisTemplate.opsForHash().entries(infoKey); + if (infoMap != null && !infoMap.isEmpty()) { + return mapToDto(infoMap); + } + return null; } catch (Exception e) { - log.error("反序列化失败", e); + log.error("查询队列任务失败: queueId={}", queueId, e); return null; } }) .filter(Objects::nonNull) + .sorted((a, b) -> Double.compare(a.getQueueScore(), b.getQueueScore())) .collect(Collectors.toList()); } catch (Exception e) { @@ -221,14 +234,77 @@ public class RedisOrderQueueServiceImpl implements RedisOrderQueueService { try { String infoKey = INFO_KEY_PREFIX + queueId; - // 更新 Hash 中的状态 - redisTemplate.opsForHash().put(infoKey, "queueStatus", newStatus); + // 先从 Hash 获取 userId + Map infoMap = redisTemplate.opsForHash().entries(infoKey); + if (infoMap == null || infoMap.isEmpty()) { + log.warn("队列记录不存在: queueId={}", queueId); + return false; + } - // TODO: 如果需要同步更新 Sorted Set 中的数据,需要先删除再重新添加 - // 这里简化处理,只更新 Hash + Object userIdObj = infoMap.get("userId"); + if (userIdObj == null) { + log.warn("队列记录缺少 userId: queueId={}", queueId); + return false; + } - log.debug("Redis 更新状态成功: queueId={}, newStatus={}", queueId, newStatus); - return true; + Long userId = Long.parseLong(userIdObj.toString()); + String queueKey = QUEUE_KEY_PREFIX + userId; + + // 如果状态是 REMOVED,从 Sorted Set 中移除并删除 Hash + if ("REMOVED".equalsIgnoreCase(newStatus)) { + String removeScript = + "local infoKey = KEYS[1] " + + "local queueKey = KEYS[2] " + + "local queueIdStr = ARGV[1] " + + "redis.call('ZREM', queueKey, queueIdStr) " + + "redis.call('DEL', infoKey) " + + "return 1"; + + redisTemplate.execute( + new DefaultRedisScript<>(removeScript, Long.class), + Arrays.asList(infoKey, queueKey), + queueId.toString() + ); + + log.debug("Redis 已移除队列记录: queueId={}, status=REMOVED", queueId); + return true; + } + + // 其他状态:使用 Lua 脚本原子性更新 Hash 和刷新 Sorted Set + String script = + "local infoKey = KEYS[1] " + + "local queueKey = KEYS[2] " + + "local newStatus = ARGV[1] " + + "local queueIdStr = ARGV[2] " + + + // 1. 更新 Hash 中的状态 + "redis.call('HSET', infoKey, 'queueStatus', newStatus) " + + + // 2. 刷新 Sorted Set 中的 member(保持原 score,先删除再添加) + "local score = redis.call('ZSCORE', queueKey, queueIdStr) " + + "if score then " + + " redis.call('ZREM', queueKey, queueIdStr) " + + " redis.call('ZADD', queueKey, score, queueIdStr) " + + "end " + + + // 3. 刷新 Hash 过期时间 + "redis.call('EXPIRE', infoKey, 86400) " + + + "return 1"; + + Long result = redisTemplate.execute( + new DefaultRedisScript<>(script, Long.class), + Arrays.asList(infoKey, queueKey), + newStatus, queueId.toString() + ); + + if (result != null && result > 0) { + log.debug("Redis 更新状态成功: queueId={}, newStatus={}", queueId, newStatus); + return true; + } else { + log.warn("Redis 更新状态失败: queueId={}, newStatus={}", queueId, newStatus); + return false; + } } catch (Exception e) { log.error("Redis 更新状态失败: queueId={}, newStatus={}", queueId, newStatus, e); @@ -248,38 +324,48 @@ public class RedisOrderQueueServiceImpl implements RedisOrderQueueService { return false; } - // 更新优先级 - infoMap.put("priority", newPriority); - // 反序列化回 DTO OrderQueueDTO dto = mapToDto(infoMap); if (dto == null) { return false; } - // 更新 Sorted Set(先删除旧的,再添加新的) String queueKey = QUEUE_KEY_PREFIX + dto.getUserId(); - String oldJson = objectMapper.writeValueAsString(dto); - dto.setPriority(newPriority); double newScore = calculateScore(newPriority, dto.getEnqueueTime()); - String newJson = objectMapper.writeValueAsString(dto); - // 使用 Lua 脚本原子性更新 + // 使用 Lua 脚本原子性更新 Hash 和 Sorted Set String script = - "local old = redis.call('ZREM', KEYS[1], ARGV[1]) " + - "redis.call('ZADD', KEYS[1], ARGV[2], ARGV[3]) " + - "redis.call('HMSET', KEYS[2], 'priority', ARGV[4]) " + - "return old"; + "local infoKey = KEYS[1] " + + "local queueKey = KEYS[2] " + + "local queueIdStr = ARGV[1] " + + "local newScore = tonumber(ARGV[2]) " + + "local newPriority = ARGV[3] " + + // 更新 Hash 中的优先级 + "redis.call('HSET', infoKey, 'priority', newPriority) " + + // 更新 Sorted Set 中的 score(先删除再添加) + "local oldScore = redis.call('ZSCORE', queueKey, queueIdStr) " + + "if oldScore then " + + " redis.call('ZREM', queueKey, queueIdStr) " + + " redis.call('ZADD', queueKey, newScore, queueIdStr) " + + "end " + + // 刷新 Hash 过期时间 + "redis.call('EXPIRE', infoKey, 86400) " + + "return 1"; - redisTemplate.execute( + Long result = redisTemplate.execute( new DefaultRedisScript<>(script, Long.class), - Arrays.asList(queueKey, infoKey), - oldJson, newScore, newJson, newPriority + Arrays.asList(infoKey, queueKey), + queueId.toString(), String.valueOf(newScore), String.valueOf(newPriority) ); - log.debug("Redis 更新优先级成功: queueId={}, newPriority={}", queueId, newPriority); - return true; + if (result != null && result > 0) { + log.debug("Redis 更新优先级成功: queueId={}, newPriority={}", queueId, newPriority); + return true; + } else { + log.warn("Redis 更新优先级失败: queueId={}, newPriority={}", queueId, newPriority); + return false; + } } catch (Exception e) { log.error("Redis 更新优先级失败: queueId={}, newPriority={}", queueId, newPriority, e); @@ -292,7 +378,7 @@ public class RedisOrderQueueServiceImpl implements RedisOrderQueueService { try { String infoKey = INFO_KEY_PREFIX + queueId; - // 从 Hash 中获取数据 + // 从 Hash 中获取 userId Map infoMap = redisTemplate.opsForHash().entries(infoKey); if (infoMap.isEmpty()) { return false; @@ -306,15 +392,24 @@ public class RedisOrderQueueServiceImpl implements RedisOrderQueueService { Long userId = Long.parseLong(userIdObj.toString()); String queueKey = QUEUE_KEY_PREFIX + userId; - // 从 Sorted Set 中删除 - String json = objectMapper.writeValueAsString(infoMap); - redisTemplate.opsForZSet().remove(queueKey, json); + // 使用 Lua 脚本原子性删除 Sorted Set 和 Hash + String script = + "local queueKey = KEYS[1] " + + "local infoKey = KEYS[2] " + + "local queueIdStr = ARGV[1] " + + "local removedFromZSet = redis.call('ZREM', queueKey, queueIdStr) " + + "local deletedHash = redis.call('DEL', infoKey) " + + "return (removedFromZSet > 0 or deletedHash > 0) and 1 or 0"; - // 删除 Hash - redisTemplate.delete(infoKey); + Long result = redisTemplate.execute( + new DefaultRedisScript<>(script, Long.class), + Arrays.asList(queueKey, infoKey), + queueId.toString() + ); - log.debug("Redis 删除队列记录成功: queueId={}", queueId); - return true; + boolean removed = result != null && result == 1; + log.debug("Redis 删除队列记录: queueId={}, removed={}", queueId, removed); + return removed; } catch (Exception e) { log.error("Redis 删除队列记录失败: queueId={}", queueId, e); @@ -348,24 +443,25 @@ public class RedisOrderQueueServiceImpl implements RedisOrderQueueService { @Override public OrderQueueDTO getByOrderId(Long orderId) { try { - // 查询所有保洁员的队列(效率较低,慎用) - Set keys = redisTemplate.keys(QUEUE_KEY_PREFIX + "*"); + // 查询所有 Hash 信息键(效率较低,慎用) + Set keys = redisTemplate.keys(INFO_KEY_PREFIX + "*"); if (keys == null || keys.isEmpty()) { return null; } - for (String queueKey : keys) { - Set tasks = redisTemplate.opsForZSet().range(queueKey, 0, -1); - if (tasks != null) { - for (Object task : tasks) { - try { - OrderQueueDTO dto = objectMapper.readValue((String) task, OrderQueueDTO.class); - if (dto.getOpsOrderId().equals(orderId)) { - return dto; + for (String infoKey : keys) { + try { + Map infoMap = redisTemplate.opsForHash().entries(infoKey); + if (infoMap != null && !infoMap.isEmpty()) { + Object opsOrderIdObj = infoMap.get("opsOrderId"); + if (opsOrderIdObj != null) { + Long opsOrderId = Long.parseLong(opsOrderIdObj.toString()); + if (opsOrderId.equals(orderId)) { + return mapToDto(infoMap); } - } catch (Exception ignored) { } } + } catch (Exception ignored) { } } @@ -470,6 +566,7 @@ public class RedisOrderQueueServiceImpl implements RedisOrderQueueService { map.put("queueIndex", dto.getQueueIndex()); map.put("priority", dto.getPriority()); map.put("queueStatus", dto.getQueueStatus()); + map.put("queueScore", dto.getQueueScore()); // LocalDateTime 转换为字符串存储 if (dto.getEnqueueTime() != null) { @@ -490,6 +587,7 @@ public class RedisOrderQueueServiceImpl implements RedisOrderQueueService { map.put("queueIndex".getBytes(), String.valueOf(dto.getQueueIndex()).getBytes()); map.put("priority".getBytes(), String.valueOf(dto.getPriority()).getBytes()); map.put("queueStatus".getBytes(), dto.getQueueStatus().getBytes()); + map.put("queueScore".getBytes(), String.valueOf(dto.getQueueScore()).getBytes()); // LocalDateTime 转换为字符串存储 if (dto.getEnqueueTime() != null) { @@ -512,6 +610,21 @@ public class RedisOrderQueueServiceImpl implements RedisOrderQueueService { dto.setPriority((Integer) map.get("priority")); dto.setQueueStatus((String) map.get("queueStatus")); + // 读取 queueScore + Object queueScoreObj = map.get("queueScore"); + if (queueScoreObj != null) { + dto.setQueueScore(Double.parseDouble(queueScoreObj.toString())); + } else { + // 如果没有存储,则根据 priority 和 enqueueTime 计算 + Integer priority = dto.getPriority(); + LocalDateTime enqueueTime = null; + Object enqueueTimeObj = map.get("enqueueTime"); + if (enqueueTimeObj != null) { + enqueueTime = LocalDateTime.parse(enqueueTimeObj.toString()); + } + dto.setQueueScore(calculateScore(priority, enqueueTime)); + } + // 字符串转换为 LocalDateTime Object enqueueTimeObj = map.get("enqueueTime"); if (enqueueTimeObj != null) {