diff --git a/viewsh-module-ops/viewsh-module-ops-api/src/main/java/com/viewsh/module/ops/api/queue/OrderQueueService.java b/viewsh-module-ops/viewsh-module-ops-api/src/main/java/com/viewsh/module/ops/api/queue/OrderQueueService.java new file mode 100644 index 0000000..cd015e4 --- /dev/null +++ b/viewsh-module-ops/viewsh-module-ops-api/src/main/java/com/viewsh/module/ops/api/queue/OrderQueueService.java @@ -0,0 +1,197 @@ +package com.viewsh.module.ops.api.queue; + +import com.viewsh.module.ops.enums.OrderQueueStatusEnum; +import com.viewsh.module.ops.enums.PriorityEnum; + +import java.util.List; + +/** + * 工单队列管理服务接口 + * 提供工单队列的核心管理功能,包括入队、出队、状态变更、优先级调整等 + * + * @author lzh + */ +public interface OrderQueueService { + + /** + * 工单入队 + * 将工单加入派单队列,等待派单 + * + * @param opsOrderId 工单ID + * @param userId 执行人员ID(保洁员) + * @param priority 优先级 + * @param queueIndex 队列顺序(可选,用于同优先级排序) + * @return 队列记录ID + */ + Long enqueue(Long opsOrderId, Long userId, PriorityEnum priority, Integer queueIndex); + + /** + * 工单出队 + * 将工单从队列中移除(通常用于工单完成或取消) + * + * @param queueId 队列记录ID + * @return 是否成功 + */ + boolean dequeue(Long queueId); + + /** + * 根据工单ID出队 + * + * @param opsOrderId 工单ID + * @return 是否成功 + */ + boolean dequeueByOpsOrderId(Long opsOrderId); + + /** + * 更新队列状态 + * + * @param queueId 队列记录ID + * @param newStatus 新状态 + * @return 是否成功 + */ + boolean updateStatus(Long queueId, OrderQueueStatusEnum newStatus); + + /** + * 开始执行任务(出队) + * 将队列状态从 WAITING 改为 dispatched,并记录 dequeueTime + * + * @param queueId 队列记录ID + * @return 是否成功 + */ + boolean startExecution(Long queueId); + + /** + * 暂停任务 + * 将队列状态从 dispatched 改为 PAUSED,并记录暂停时长 + * + * @param queueId 队列记录ID + * @return 是否成功 + */ + boolean pauseTask(Long queueId); + + /** + * 恢复任务 + * 将队列状态从 PAUSED 改为 dispatched + * + * @param queueId 队列记录ID + * @return 是否成功 + */ + boolean resumeTask(Long queueId); + + /** + * 调整优先级 + * 动态调整工单的优先级(用于紧急插队等场景) + * + * @param queueId 队列记录ID + * @param newPriority 新优先级 + * @param reason 调整原因 + * @return 是否成功 + */ + boolean adjustPriority(Long queueId, PriorityEnum newPriority, String reason); + + /** + * 根据工单ID调整优先级 + * + * @param opsOrderId 工单ID + * @param newPriority 新优先级 + * @param reason 调整原因 + * @return 是否成功 + */ + boolean adjustPriorityByOpsOrderId(Long opsOrderId, PriorityEnum newPriority, String reason); + + /** + * 升级为紧急任务 + * 将工单优先级调整为P0,并触发紧急派单流程 + * + * @param opsOrderId 工单ID + * @param reason 升级原因 + * @return 是否成功 + */ + boolean upgradeToUrgent(Long opsOrderId, String reason); + + /** + * 获取等待中的队列列表 + * + * @return 等待中的工单列表,按优先级排序 + */ + List getWaitingQueue(); + + /** + * 获取P0紧急任务列表 + * + * @return P0紧急任务列表 + */ + List getUrgentOrders(); + + /** + * 获取用户的当前任务 + * + * @param userId 用户ID(保洁员) + * @return 当前任务信息 + */ + OrderQueueDTO getCurrentTaskByUserId(Long userId); + + /** + * 获取用户的任务列表 + * + * @param userId 用户ID + * @return 任务列表 + */ + List getTasksByUserId(Long userId); + + /** + * 获取用户的等待中任务列表(WAITING状态,按队列分数排序) + * + * 队列分数计算公式:优先级分数 + 时间戳 + * - P0: 0 + timestamp + * - P1: 1000000 + timestamp + * - P2: 2000000 + timestamp + * - P3: 3000000 + timestamp + * + * 结果:优先级高的排在前面,同优先级按入队时间排序 + * + * @param userId 用户ID + * @return 等待中的任务列表(已按队列分数排序) + */ + List getWaitingTasksByUserId(Long userId); + + /** + * 查询队列记录 + * + * @param queueId 队列记录ID + * @return 队列记录 + */ + OrderQueueDTO getById(Long queueId); + + /** + * 根据工单ID查询队列记录 + * + * @param opsOrderId 工单ID + * @return 队列记录 + */ + OrderQueueDTO getByOpsOrderId(Long opsOrderId); + + /** + * 统计等待中的任务数量 + * + * @return 任务数量 + */ + Long countWaiting(); + + /** + * 统计指定用户的任务数量 + * + * @param userId 用户ID + * @return 任务数量 + */ + Long countByUserId(Long userId); + + /** + * 更新队列记录的事件消息 + * + * @param queueId 队列记录ID + * @param eventMessage 事件消息 + * @return 是否成功 + */ + boolean updateEventMessage(Long queueId, String eventMessage); +} diff --git a/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/queue/OrderQueueServiceEnhanced.java b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/queue/OrderQueueServiceEnhanced.java new file mode 100644 index 0000000..404ae24 --- /dev/null +++ b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/queue/OrderQueueServiceEnhanced.java @@ -0,0 +1,662 @@ +package com.viewsh.module.ops.service.queue; + +import com.viewsh.module.ops.api.queue.OrderQueueDTO; +import com.viewsh.module.ops.api.queue.OrderQueueService; +import com.viewsh.module.ops.dal.dataobject.queue.OpsOrderQueueDO; +import com.viewsh.module.ops.dal.mysql.queue.OpsOrderQueueMapper; +import com.viewsh.module.ops.enums.OrderQueueStatusEnum; +import com.viewsh.module.ops.enums.PriorityEnum; +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.BeanUtils; +import org.springframework.context.annotation.Primary; +import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; + +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.util.*; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; + +/** + * 工单队列管理服务实现(Redis + MySQL 混合架构) + * + * 架构说明: + * 1. 写入:先写 MySQL(持久化),再异步写 Redis(高性能) + * 2. 读取:优先读 Redis,未命中则读 MySQL 并同步到 Redis + * 3. 同步:定时任务将 MySQL 数据同步到 Redis + * 4. 容灾:Redis 宕机时降级到纯 MySQL 模式 + * + * @author lzh + */ +@Slf4j +@Service +public class OrderQueueServiceEnhanced implements OrderQueueService { + + /** + * Score 计算公式:优先级分数 + 时间戳 + * 优先级分数:P0=0, P1=1000000, P2=2000000, P3=3000000 + * 时间戳:秒级时间戳 + * 结果:优先级高的排在前面,同优先级按时间排序 + */ + private static final Map PRIORITY_SCORES = Map.of( + 0, 0L, // P0: 0 + 1, 1000000L, // P1: 1,000,000 + 2, 2000000L, // P2: 2,000,000 + 3, 3000000L // P3: 3,000,000 + ); + + @Resource + private OpsOrderQueueMapper orderQueueMapper; + + @Resource + private RedisOrderQueueService redisQueueService; + + @Resource + private QueueSyncService queueSyncService; + + @Override + @Transactional(rollbackFor = Exception.class) + public Long enqueue(Long opsOrderId, Long userId, PriorityEnum priority, Integer queueIndex) { + // 1. 检查工单是否已在队列中(MySQL) + OpsOrderQueueDO existingQueue = orderQueueMapper.selectByOpsOrderId(opsOrderId); + if (existingQueue != null) { + log.warn("工单已在队列中: opsOrderId={}, userId={}", opsOrderId, userId); + return existingQueue.getId(); + } + + // 2. 计算队列分数 + LocalDateTime now = LocalDateTime.now(); + double queueScore = calculateQueueScore(priority.getPriority(), now); + + // 3. 创建队列记录(MySQL) + OpsOrderQueueDO queueDO = OpsOrderQueueDO.builder() + .opsOrderId(opsOrderId) + .userId(userId) + .queueIndex(queueIndex != null ? queueIndex : calculateNextQueueIndex(priority)) + .priority(priority.getPriority()) + .queueScore(queueScore) + .queueStatus(OrderQueueStatusEnum.WAITING.getStatus()) + .enqueueTime(now) + .pausedDuration(0) + .eventMessage("工单入队,等待派单") + .build(); + + orderQueueMapper.insert(queueDO); + + log.info("工单已入队(MySQL): opsOrderId={}, userId={}, priority={}, queueId={}", + opsOrderId, userId, priority, queueDO.getId()); + + // 3. 异步写入 Redis(失败不影响主流程) + OrderQueueDTO dto = convertToDTO(queueDO); + CompletableFuture.runAsync(() -> { + try { + redisQueueService.enqueue(dto); + log.debug("工单已入队(Redis): queueId={}", dto.getId()); + } catch (Exception e) { + log.error("Redis 入队失败,依赖定时同步任务补偿: queueId={}", dto.getId(), e); + } + }); + + // 4. 如果是P0紧急任务,记录警告日志 + if (priority.isUrgent()) { + log.warn("检测到P0紧急任务入队,需立即处理: opsOrderId={}", opsOrderId); + // TODO: 触发紧急派单流程(在派单引擎中实现) + } + + return queueDO.getId(); + } + + @Override + @Transactional(rollbackFor = Exception.class) + public boolean dequeue(Long queueId) { + // 1. MySQL 删除 + OpsOrderQueueDO queueDO = orderQueueMapper.selectById(queueId); + if (queueDO == null) { + log.warn("队列记录不存在: queueId={}", queueId); + return false; + } + + // 只允许删除终态的记录 + if (!isTerminalStatus(queueDO.getQueueStatus())) { + log.warn("只能删除终态的队列记录: queueId={}, status={}", queueId, queueDO.getQueueStatus()); + return false; + } + + int deleted = orderQueueMapper.deleteById(queueId); + + // 2. 异步删除 Redis + if (deleted > 0) { + CompletableFuture.runAsync(() -> { + try { + redisQueueService.remove(queueId); + log.debug("Redis 删除队列记录成功: queueId={}", queueId); + } catch (Exception e) { + log.error("Redis 删除队列记录失败: queueId={}", queueId, e); + } + }); + } + + log.info("队列记录已删除: queueId={}, deleted={}", queueId, deleted > 0); + return deleted > 0; + } + + @Override + @Transactional(rollbackFor = Exception.class) + public boolean dequeueByOpsOrderId(Long opsOrderId) { + OpsOrderQueueDO queueDO = orderQueueMapper.selectByOpsOrderId(opsOrderId); + if (queueDO == null) { + log.warn("工单不在队列中: opsOrderId={}", opsOrderId); + return false; + } + + return dequeue(queueDO.getId()); + } + + @Override + @Transactional(rollbackFor = Exception.class) + public boolean updateStatus(Long queueId, OrderQueueStatusEnum newStatus) { + // 1. MySQL 更新 + OpsOrderQueueDO queueDO = orderQueueMapper.selectById(queueId); + if (queueDO == null) { + log.warn("队列记录不存在: queueId={}", queueId); + return false; + } + + String oldStatus = queueDO.getQueueStatus(); + + // 状态流转校验 + if (!validateStatusTransition(oldStatus, newStatus.getStatus())) { + log.warn("非法的状态流转: queueId={}, oldStatus={}, newStatus={}", + queueId, oldStatus, newStatus.getStatus()); + return false; + } + + queueDO.setQueueStatus(newStatus.getStatus().toUpperCase()); + updateStatusTimestamp(queueDO, newStatus); + + int updated = orderQueueMapper.updateById(queueDO); + + // 2. 异步更新 Redis + if (updated > 0) { + OrderQueueDTO dto = convertToDTO(queueDO); + CompletableFuture.runAsync(() -> { + try { + redisQueueService.updateStatus(queueId, newStatus.getStatus()); + log.debug("Redis 更新状态成功: queueId={}, status={}", queueId, newStatus.getStatus()); + } catch (Exception e) { + log.error("Redis 更新状态失败: queueId={}", queueId, e); + } + }); + } + + log.info("队列状态已更新: queueId={}, opsOrderId={}, oldStatus={}, newStatus={}", + queueId, queueDO.getOpsOrderId(), oldStatus, newStatus.getStatus()); + + return updated > 0; + } + + @Override + @Transactional(rollbackFor = Exception.class) + public boolean startExecution(Long queueId) { + OpsOrderQueueDO queueDO = orderQueueMapper.selectById(queueId); + if (queueDO == null) { + log.warn("队列记录不存在: queueId={}", queueId); + return false; + } + + if (!OrderQueueStatusEnum.WAITING.getStatus().equals(queueDO.getQueueStatus())) { + log.warn("只有WAITING状态的任务可以开始执行: queueId={}, status={}", + queueId, queueDO.getQueueStatus()); + return false; + } + + // 队列状态流转:WAITING → DISPATCHED + queueDO.setQueueStatus(OrderQueueStatusEnum.DISPATCHED.getStatus()); + queueDO.setDequeueTime(LocalDateTime.now()); + queueDO.setEventMessage("派单成功,已分配给执行人员"); + + int updated = orderQueueMapper.updateById(queueDO); + + // 异步更新 Redis + if (updated > 0) { + CompletableFuture.runAsync(() -> { + try { + redisQueueService.updateStatus(queueId, OrderQueueStatusEnum.DISPATCHED.getStatus()); + } catch (Exception e) { + log.error("Redis 更新状态失败: queueId={}", queueId, e); + } + }); + } + + log.info("派单成功: queueId={}, opsOrderId={}, assigneeId={}", + queueId, queueDO.getOpsOrderId(), queueDO.getUserId()); + + return updated > 0; + } + + @Override + @Transactional(rollbackFor = Exception.class) + public boolean pauseTask(Long queueId) { + OpsOrderQueueDO queueDO = orderQueueMapper.selectById(queueId); + if (queueDO == null) { + log.warn("队列记录不存在: queueId={}", queueId); + return false; + } + + if (!OrderQueueStatusEnum.DISPATCHED.getStatus().equals(queueDO.getQueueStatus())) { + log.warn("只有DISPATCHED状态的任务可以暂停: queueId={}, status={}", + queueId, queueDO.getQueueStatus()); + return false; + } + + // 队列状态流转:DISPATCHED → PAUSED + queueDO.setQueueStatus(OrderQueueStatusEnum.PAUSED.getStatus()); + + // 计算暂停时长 + if (queueDO.getDequeueTime() != null) { + long pausedSeconds = java.time.Duration.between( + queueDO.getDequeueTime(), + LocalDateTime.now() + ).getSeconds(); + queueDO.addPausedDuration((int) pausedSeconds); + } + + queueDO.setEventMessage("任务已暂停"); + int updated = orderQueueMapper.updateById(queueDO); + + // 异步更新 Redis + if (updated > 0) { + CompletableFuture.runAsync(() -> { + try { + redisQueueService.updateStatus(queueId, OrderQueueStatusEnum.PAUSED.getStatus()); + } catch (Exception e) { + log.error("Redis 更新状态失败: queueId={}", queueId, e); + } + }); + } + + log.info("任务已暂停: queueId={}, opsOrderId={}, pausedDuration={}", + queueId, queueDO.getOpsOrderId(), queueDO.getPausedDuration()); + + return updated > 0; + } + + @Override + @Transactional(rollbackFor = Exception.class) + public boolean resumeTask(Long queueId) { + OpsOrderQueueDO queueDO = orderQueueMapper.selectById(queueId); + if (queueDO == null) { + log.warn("队列记录不存在: queueId={}", queueId); + return false; + } + + if (!OrderQueueStatusEnum.PAUSED.getStatus().equals(queueDO.getQueueStatus())) { + log.warn("只有PAUSED状态的任务可以恢复: queueId={}, status={}", + queueId, queueDO.getQueueStatus()); + return false; + } + + // 队列状态流转:PAUSED → DISPATCHED + queueDO.setQueueStatus(OrderQueueStatusEnum.DISPATCHED.getStatus()); + queueDO.setDequeueTime(LocalDateTime.now()); // 重置出队时间 + queueDO.setEventMessage("任务已恢复执行"); + + int updated = orderQueueMapper.updateById(queueDO); + + // 异步更新 Redis + if (updated > 0) { + CompletableFuture.runAsync(() -> { + try { + redisQueueService.updateStatus(queueId, OrderQueueStatusEnum.DISPATCHED.getStatus()); + } catch (Exception e) { + log.error("Redis 更新状态失败: queueId={}", queueId, e); + } + }); + } + + log.info("任务已恢复执行: queueId={}, opsOrderId={}", queueId, queueDO.getOpsOrderId()); + + return updated > 0; + } + + @Override + @Transactional(rollbackFor = Exception.class) + public boolean adjustPriority(Long queueId, PriorityEnum newPriority, String reason) { + OpsOrderQueueDO queueDO = orderQueueMapper.selectById(queueId); + if (queueDO == null) { + log.warn("队列记录不存在: queueId={}", queueId); + return false; + } + + Integer oldPriority = queueDO.getPriority(); + + // 如果优先级没有变化,直接返回 + if (oldPriority.equals(newPriority.getPriority())) { + log.info("优先级未变化: queueId={}, priority={}", queueId, newPriority); + return true; + } + + queueDO.setPriorityEnum(newPriority); + // 重新计算队列顺序 + queueDO.setQueueIndex(calculateNextQueueIndex(newPriority)); + // 重新计算队列分数(使用原入队时间保持时间戳不变) + LocalDateTime enqueueTime = queueDO.getEnqueueTime() != null ? queueDO.getEnqueueTime() : LocalDateTime.now(); + double newQueueScore = calculateQueueScore(newPriority.getPriority(), enqueueTime); + queueDO.setQueueScore(newQueueScore); + queueDO.setEventMessage("优先级调整: " + oldPriority + " -> " + newPriority + ", 原因: " + reason); + + int updated = orderQueueMapper.updateById(queueDO); + + // 异步更新 Redis + if (updated > 0) { + CompletableFuture.runAsync(() -> { + try { + redisQueueService.updatePriority(queueId, newPriority.getPriority()); + } catch (Exception e) { + log.error("Redis 更新优先级失败: queueId={}", queueId, e); + } + }); + } + + log.info("优先级已调整: queueId={}, opsOrderId={}, oldPriority={}, newPriority={}, reason={}, newScore={}", + queueId, queueDO.getOpsOrderId(), oldPriority, newPriority, reason, newQueueScore); + + // 如果升级为P0,触发紧急派单 + if (newPriority.isUrgent()) { + log.warn("任务升级为P0紧急,需立即处理: queueId={}, opsOrderId={}", + queueId, queueDO.getOpsOrderId()); + // TODO: 触发紧急派单流程 + } + + return updated > 0; + } + + @Override + @Transactional(rollbackFor = Exception.class) + public boolean adjustPriorityByOpsOrderId(Long opsOrderId, PriorityEnum newPriority, String reason) { + OpsOrderQueueDO queueDO = orderQueueMapper.selectByOpsOrderId(opsOrderId); + if (queueDO == null) { + log.warn("工单不在队列中: opsOrderId={}", opsOrderId); + return false; + } + + return adjustPriority(queueDO.getId(), newPriority, reason); + } + + @Override + @Transactional(rollbackFor = Exception.class) + public boolean upgradeToUrgent(Long opsOrderId, String reason) { + return adjustPriorityByOpsOrderId(opsOrderId, PriorityEnum.P0, reason); + } + + @Override + public List getWaitingQueue() { + // 优先从 Redis 获取 + // TODO: 实现全局等待队列(需要聚合所有用户的队列) + // 这里暂时使用 MySQL + List list = orderQueueMapper.selectListWaiting(); + return convertToDTO(list); + } + + @Override + public List getUrgentOrders() { + // TODO: 优先从 Redis 获取 + List list = orderQueueMapper.selectUrgentOrders(); + return convertToDTO(list); + } + + @Override + public OrderQueueDTO getCurrentTaskByUserId(Long userId) { + // 1. 优先从 Redis 获取 + List redisTasks = redisQueueService.peekTasks(userId, 1); + if (redisTasks != null && !redisTasks.isEmpty()) { + return redisTasks.get(0); + } + + // 2. Redis 未命中,从 MySQL 获取并同步到 Redis + OpsOrderQueueDO queueDO = orderQueueMapper.selectCurrentExecutingByUserId(userId); + if (queueDO != null) { + // 同步到 Redis + OrderQueueDTO dto = convertToDTO(queueDO); + CompletableFuture.runAsync(() -> { + try { + redisQueueService.enqueue(dto); + } catch (Exception e) { + log.error("同步到 Redis 失败: queueId={}", dto.getId(), e); + } + }); + return dto; + } + + return null; + } + + @Override + public List getTasksByUserId(Long userId) { + // 1. 优先从 Redis 获取 + List redisTasks = redisQueueService.getTasksByUserId(userId); + if (redisTasks != null && !redisTasks.isEmpty()) { + return redisTasks; + } + + // 2. Redis 未命中,从 MySQL 获取并同步到 Redis + List mysqlList = orderQueueMapper.selectListByUserId(userId); + if (mysqlList != null && !mysqlList.isEmpty()) { + // 同步到 Redis + List dtoList = convertToDTO(mysqlList); + CompletableFuture.runAsync(() -> { + try { + redisQueueService.batchEnqueue(dtoList); + } catch (Exception e) { + log.error("批量同步到 Redis 失败: userId={}", userId, e); + } + }); + return dtoList; + } + + return Collections.emptyList(); + } + + @Override + public List getWaitingTasksByUserId(Long userId) { + // 获取所有任务 + List allTasks = getTasksByUserId(userId); + + // 过滤出 WAITING 状态的任务,并按队列分数排序 + return allTasks.stream() + .filter(task -> OrderQueueStatusEnum.WAITING.getStatus().equals(task.getQueueStatus())) + .sorted((a, b) -> { + // 优先使用队列分数 score 排序(已在入队时计算好) + if (a.getQueueScore() != null && b.getQueueScore() != null) { + return Double.compare(a.getQueueScore(), b.getQueueScore()); + } + + // 兜底:如果 score 为空,则按优先级+时间排序 + // 按优先级排序(P0 > P1 > P2 > P3) + int priorityCompare = Integer.compare( + b.getPriority() != null ? b.getPriority() : 999, + a.getPriority() != null ? a.getPriority() : 999 + ); + if (priorityCompare != 0) { + return priorityCompare; + } + // 优先级相同,按入队时间排序 + return a.getEnqueueTime().compareTo(b.getEnqueueTime()); + }) + .collect(Collectors.toList()); + } + + @Override + public OrderQueueDTO getById(Long queueId) { + // 1. 优先从 Redis Hash 获取 + OrderQueueDTO redisDTO = redisQueueService.getByQueueId(queueId); + if (redisDTO != null) { + return redisDTO; + } + + // 2. Redis 未命中,从 MySQL 获取 + OpsOrderQueueDO queueDO = orderQueueMapper.selectById(queueId); + if (queueDO != null) { + OrderQueueDTO dto = convertToDTO(queueDO); + // 同步到 Redis + CompletableFuture.runAsync(() -> { + try { + redisQueueService.enqueue(dto); + } catch (Exception e) { + log.error("同步到 Redis 失败: queueId={}", dto.getId(), e); + } + }); + return dto; + } + + return null; + } + + @Override + public OrderQueueDTO getByOpsOrderId(Long opsOrderId) { + // 1. 优先从 Redis 获取(遍历所有用户队列,性能较差,慎用) + OrderQueueDTO redisDTO = redisQueueService.getByOrderId(opsOrderId); + if (redisDTO != null) { + return redisDTO; + } + + // 2. Redis 未命中,从 MySQL 获取 + OpsOrderQueueDO queueDO = orderQueueMapper.selectByOpsOrderId(opsOrderId); + if (queueDO != null) { + return convertToDTO(queueDO); + } + + return null; + } + + @Override + public Long countWaiting() { + // 优先从 Redis 获取(需要聚合所有用户) + // TODO: 实现分布式计数器 + return orderQueueMapper.countWaiting(); + } + + @Override + public Long countByUserId(Long userId) { + // 优先从 Redis 获取 + long redisCount = redisQueueService.getQueueSize(userId); + if (redisCount > 0) { + return redisCount; + } + + // Redis 未命中,从 MySQL 获取 + return orderQueueMapper.countByUserId(userId); + } + + @Override + @Transactional(rollbackFor = Exception.class) + public boolean updateEventMessage(Long queueId, String eventMessage) { + OpsOrderQueueDO queueDO = orderQueueMapper.selectById(queueId); + if (queueDO == null) { + log.warn("队列记录不存在: queueId={}", queueId); + return false; + } + + queueDO.setEventMessage(eventMessage); + int updated = orderQueueMapper.updateById(queueDO); + + log.info("事件消息已更新: queueId={}, eventMessage={}", queueId, eventMessage); + + return updated > 0; + } + + // ========== 私有方法 ========== + + /** + * 计算队列分数(用于排序) + * 公式:优先级分数 + 时间戳 + * + * @param priority 优先级(0=P0, 1=P1, 2=P2, 3=P3) + * @param enqueueTime 入队时间 + * @return 队列分数 + */ + private double calculateQueueScore(Integer priority, LocalDateTime enqueueTime) { + // 获取优先级分数 + long priorityScore = PRIORITY_SCORES.getOrDefault(priority, 3000000L); + + // 计算时间戳(秒级) + long timestamp; + if (enqueueTime != null) { + timestamp = enqueueTime.atZone(ZoneId.systemDefault()).toEpochSecond(); + } else { + timestamp = System.currentTimeMillis() / 1000; + } + + return priorityScore + timestamp; + } + + /** + * 计算下一个队列顺序号 + */ + private Integer calculateNextQueueIndex(PriorityEnum priority) { + // TODO: 查询当前优先级的最大 queueIndex,然后 +1 + // 这里简化处理,返回默认值 + return priority.getQueueOrder() * 1000; + } + + /** + * 判断是否为终态 + */ + private boolean isTerminalStatus(String status) { + return "CANCELLED".equals(status) || "COMPLETED".equals(status) || "FAILED".equals(status); + } + + /** + * 校验状态流转是否合法 + */ + private boolean validateStatusTransition(String oldStatus, String newStatus) { + // 终态不能再变更 + if (isTerminalStatus(oldStatus)) { + return false; + } + + // 允许的状态流转(简化版本,根据实际表结构) + return switch (oldStatus.toUpperCase()) { + case "WAITING" -> "DISPATCHED".equals(newStatus) || "CANCELLED".equals(newStatus); + case "DISPATCHED" -> "PAUSED".equals(newStatus) || "CANCELLED".equals(newStatus); + case "PAUSED" -> "DISPATCHED".equals(newStatus) || "CANCELLED".equals(newStatus); + default -> false; + }; + } + + /** + * 根据状态更新时间戳 + */ + private void updateStatusTimestamp(OpsOrderQueueDO queueDO, OrderQueueStatusEnum status) { + switch (status.getStatus().toUpperCase()) { + case "DISPATCHED" -> { + if (queueDO.getDequeueTime() == null) { + queueDO.setDequeueTime(LocalDateTime.now()); + } + } + case "CANCELLED", "COMPLETED" -> { + // 可以记录完成时间(如果表有该字段) + } + } + } + + /** + * 转换为 DTO + */ + private OrderQueueDTO convertToDTO(OpsOrderQueueDO queueDO) { + OrderQueueDTO dto = new OrderQueueDTO(); + BeanUtils.copyProperties(queueDO, dto); + return dto; + } + + /** + * 批量转换为 DTO + */ + private List convertToDTO(List list) { + return list.stream() + .map(this::convertToDTO) + .collect(Collectors.toList()); + } +} diff --git a/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/queue/QueueSyncService.java b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/queue/QueueSyncService.java new file mode 100644 index 0000000..6724561 --- /dev/null +++ b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/queue/QueueSyncService.java @@ -0,0 +1,244 @@ +package com.viewsh.module.ops.service.queue; + +import com.viewsh.module.ops.api.queue.OrderQueueDTO; +import com.viewsh.module.ops.dal.dataobject.queue.OpsOrderQueueDO; +import com.viewsh.module.ops.dal.mysql.queue.OpsOrderQueueMapper; +import com.viewsh.module.ops.enums.OrderQueueStatusEnum; +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; + +import java.time.LocalDateTime; +import java.util.Date; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +/** + * 队列同步服务 + * 职责: + * 1. 定时将 MySQL 数据同步到 Redis + * 2. Redis 宕机恢复 + * 3. 数据一致性保障 + * + * @author lzh + */ +@Slf4j +@Service +public class QueueSyncService { + + @Resource + private OpsOrderQueueMapper orderQueueMapper; + + @Resource + private RedisOrderQueueService redisQueueService; + + /** + * 定时同步任务:每5分钟执行一次 + * 将最近1小时内变更的数据同步到 Redis + */ + @Scheduled(cron = "0 */5 * * * ?") + public void syncMySQLToRedis() { + try { + log.info("开始定时同步:MySQL -> Redis"); + + // 1. 查询最近1小时内变更的数据 + LocalDateTime startTime = LocalDateTime.now().minusHours(1); + List changedDOList = orderQueueMapper.selectList( + new com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper() + .ge(OpsOrderQueueDO::getUpdateTime, startTime) + .in(OpsOrderQueueDO::getQueueStatus, + OrderQueueStatusEnum.WAITING.getStatus(), + OrderQueueStatusEnum.DISPATCHED.getStatus(), + OrderQueueStatusEnum.PAUSED.getStatus()) + ); + + if (changedDOList.isEmpty()) { + log.info("没有需要同步的数据"); + return; + } + + // 2. 转换为 DTO + List changedDTOList = changedDOList.stream() + .map(this::convertToDTO) + .collect(Collectors.toList()); + + // 3. 批量同步到 Redis + long syncCount = redisQueueService.batchEnqueue(changedDTOList); + + log.info("定时同步完成:同步{}条记录", syncCount); + + } catch (Exception e) { + log.error("定时同步失败", e); + } + } + + /** + * 同步指定保洁员的队列到 Redis + * 用于 Redis 宕机恢复或数据不一致修复 + * + * @param cleanerId 保洁员ID + * @return 同步的记录数量 + */ + @Transactional(readOnly = true) + public int syncUserQueueToRedis(Long cleanerId) { + try { + log.info("开始同步保洁员队列到Redis: cleanerId={}", cleanerId); + + // 1. 查询该保洁员的所有活跃队列记录 + List queueDOList = orderQueueMapper.selectList( + new com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper() + .eq(OpsOrderQueueDO::getUserId, cleanerId) + .in(OpsOrderQueueDO::getQueueStatus, + OrderQueueStatusEnum.WAITING.getStatus(), + OrderQueueStatusEnum.DISPATCHED.getStatus(), + OrderQueueStatusEnum.PAUSED.getStatus()) + ); + + if (queueDOList.isEmpty()) { + log.info("保洁员队列为空: cleanerId={}", cleanerId); + return 0; + } + + // 2. 清空 Redis 中的旧数据 + redisQueueService.clearQueue(cleanerId); + + // 3. 批量同步到 Redis + List queueDTOList = queueDOList.stream() + .map(this::convertToDTO) + .collect(Collectors.toList()); + + long syncCount = redisQueueService.batchEnqueue(queueDTOList); + + log.info("同步保洁员队列完成: cleanerId={}, count={}", cleanerId, syncCount); + return (int) syncCount; + + } catch (Exception e) { + log.error("同步保洁员队列失败: cleanerId={}", cleanerId, e); + return 0; + } + } + + /** + * 数据一致性校验 + * 对比 Redis 和 MySQL 的数据,返回不一致的记录 + * + * @param cleanerId 保洁员ID + * @return 不一致的记录数量 + */ + public int validateConsistency(Long cleanerId) { + try { + log.info("开始校验数据一致性: cleanerId={}", cleanerId); + + // 1. 从 Redis 获取队列长度 + long redisCount = redisQueueService.getQueueSize(cleanerId); + + // 2. 从 MySQL 获取队列长度 + Long mysqlCount = orderQueueMapper.selectCount( + new com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper() + .eq(OpsOrderQueueDO::getUserId, cleanerId) + .in(OpsOrderQueueDO::getQueueStatus, + OrderQueueStatusEnum.WAITING.getStatus(), + OrderQueueStatusEnum.DISPATCHED.getStatus(), + OrderQueueStatusEnum.PAUSED.getStatus()) + ); + + int diff = (int) (redisCount - (mysqlCount != null ? mysqlCount : 0)); + + if (diff != 0) { + log.warn("数据不一致: cleanerId={}, redisCount={}, mysqlCount={}, diff={}", + cleanerId, redisCount, mysqlCount, diff); + + // 自动触发同步 + syncUserQueueToRedis(cleanerId); + } else { + log.info("数据一致性校验通过: cleanerId={}", cleanerId); + } + + return Math.abs(diff); + + } catch (Exception e) { + log.error("数据一致性校验失败: cleanerId={}", cleanerId, e); + return -1; + } + } + + /** + * 强制全量同步 + * 用于数据修复或紧急恢复 + * + * @return 同步的记录数量 + */ + @Transactional(readOnly = true) + public int forceSyncAll() { + try { + log.warn("开始强制全量同步"); + + // 1. 查询所有活跃队列记录 + List allQueueDOList = orderQueueMapper.selectList( + new com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper() + .in(OpsOrderQueueDO::getQueueStatus, + OrderQueueStatusEnum.WAITING.getStatus(), + OrderQueueStatusEnum.DISPATCHED.getStatus(), + OrderQueueStatusEnum.PAUSED.getStatus()) + ); + + if (allQueueDOList.isEmpty()) { + log.info("没有需要同步的数据"); + return 0; + } + + // 2. 按保洁员分组 + Map> groupedByUser = allQueueDOList.stream() + .collect(Collectors.groupingBy(OpsOrderQueueDO::getUserId)); + + int totalCount = 0; + + // 3. 逐个保洁员同步 + for (Map.Entry> entry : groupedByUser.entrySet()) { + Long cleanerId = entry.getKey(); + List queueDOList = entry.getValue(); + + // 清空 Redis + redisQueueService.clearQueue(cleanerId); + + // 批量同步 + List queueDTOList = queueDOList.stream() + .map(this::convertToDTO) + .collect(Collectors.toList()); + + long count = redisQueueService.batchEnqueue(queueDTOList); + totalCount += count; + + log.info("同步保洁员队列: cleanerId={}, count={}", cleanerId, count); + } + + log.warn("强制全量同步完成: totalCount={}", totalCount); + return totalCount; + + } catch (Exception e) { + log.error("强制全量同步失败", e); + return 0; + } + } + + /** + * 将 DO 转换为 DTO + */ + private OrderQueueDTO convertToDTO(OpsOrderQueueDO queueDO) { + OrderQueueDTO dto = new OrderQueueDTO(); + dto.setId(queueDO.getId()); + dto.setOpsOrderId(queueDO.getOpsOrderId()); + dto.setUserId(queueDO.getUserId()); + dto.setQueueIndex(queueDO.getQueueIndex()); + dto.setPriority(queueDO.getPriority()); + dto.setQueueStatus(queueDO.getQueueStatus()); + dto.setEnqueueTime(queueDO.getEnqueueTime()); + dto.setDequeueTime(queueDO.getDequeueTime()); + dto.setPausedDuration(queueDO.getPausedDuration()); + dto.setEventMessage(queueDO.getEventMessage()); + return dto; + } +} diff --git a/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/queue/RedisOrderQueueService.java b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/queue/RedisOrderQueueService.java new file mode 100644 index 0000000..16f3783 --- /dev/null +++ b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/queue/RedisOrderQueueService.java @@ -0,0 +1,163 @@ +package com.viewsh.module.ops.service.queue; + +import com.viewsh.module.ops.api.queue.OrderQueueDTO; + +import java.util.List; + +/** + * Redis 工单队列服务接口 + * + * @author lzh + */ +public interface RedisOrderQueueService { + + /** + * Redis Key 前缀 + */ + String QUEUE_KEY_PREFIX = "ops:order:queue:"; + String INFO_KEY_PREFIX = "ops:order:queue:info:"; + String LOCK_KEY_PREFIX = "ops:order:queue:lock:"; + + // ========== 队列操作 ========== + + /** + * 入队(添加到 Redis Sorted Set) + * + * @param dto 队列记录 + * @return 是否成功 + */ + boolean enqueue(OrderQueueDTO dto); + + /** + * 批量入队(使用 Pipeline) + * + * @param dtos 队列记录列表 + * @return 成功入队的数量 + */ + long batchEnqueue(List dtos); + + /** + * 出队(移除并返回最高优先级任务) + * 使用 ZPOPMIN 命令,原子性操作 + * + * @param cleanerId 保洁员ID + * @return 队列记录,如果队列为空返回 null + */ + OrderQueueDTO dequeue(Long cleanerId); + + /** + * 查询队列中前 N 个任务(不删除) + * + * @param cleanerId 保洁员ID + * @param count 查询数量 + * @return 任务列表 + */ + List peekTasks(Long cleanerId, int count); + + /** + * 获取队列长度 + * + * @param cleanerId 保洁员ID + * @return 队列长度 + */ + long getQueueSize(Long cleanerId); + + /** + * 清空队列 + * + * @param cleanerId 保洁员ID + * @return 清空的数量 + */ + long clearQueue(Long cleanerId); + + // ========== 状态更新 ========== + + /** + * 更新队列状态 + * + * @param queueId 队列ID + * @param newStatus 新状态 + * @return 是否成功 + */ + boolean updateStatus(Long queueId, String newStatus); + + /** + * 更新优先级 + * + * @param queueId 队列ID + * @param newPriority 新优先级 + * @return 是否成功 + */ + boolean updatePriority(Long queueId, Integer newPriority); + + /** + * 从队列中移除指定任务 + * + * @param queueId 队列ID + * @return 是否成功 + */ + boolean remove(Long queueId); + + // ========== 查询操作 ========== + + /** + * 查询指定保洁员的所有任务 + * + * @param cleanerId 保洁员ID + * @return 任务列表 + */ + List getTasksByUserId(Long cleanerId); + + /** + * 查询指定队列记录的详细信息 + * + * @param queueId 队列ID + * @return 队列记录 + */ + OrderQueueDTO getByQueueId(Long queueId); + + /** + * 查询指定工单的队列记录 + * + * @param orderId 工单ID + * @return 队列记录 + */ + OrderQueueDTO getByOrderId(Long orderId); + + /** + * 检查工单是否已在队列中 + * + * @param orderId 工单ID + * @return 是否存在 + */ + boolean exists(Long orderId); + + // ========== 分布式锁 ========== + + /** + * 尝试获取分布式锁 + * + * @param cleanerId 保洁员ID + * @param lockValue 锁值(UUID) + * @param expireTime 过期时间(毫秒) + * @return 是否获取成功 + */ + boolean tryLock(Long cleanerId, String lockValue, long expireTime); + + /** + * 释放分布式锁 + * + * @param cleanerId 保洁员ID + * @param lockValue 锁值 + * @return 是否释放成功 + */ + boolean unlock(Long cleanerId, String lockValue); + + /** + * 强制释放锁(不管锁值是否匹配) + * + * @param cleanerId 保洁员ID + * @return 是否释放成功 + */ + boolean forceUnlock(Long cleanerId); +} diff --git a/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/queue/RedisOrderQueueServiceImpl.java b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/queue/RedisOrderQueueServiceImpl.java new file mode 100644 index 0000000..4f832e9 --- /dev/null +++ b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/queue/RedisOrderQueueServiceImpl.java @@ -0,0 +1,527 @@ +package com.viewsh.module.ops.service.queue; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.viewsh.module.ops.api.queue.OrderQueueDTO; +import com.viewsh.module.ops.enums.PriorityEnum; +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.data.redis.core.ZSetOperations; +import org.springframework.data.redis.core.script.DefaultRedisScript; +import org.springframework.stereotype.Service; + +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.util.*; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +/** + * Redis 工单队列服务实现 + * 基于 Redis Sorted Set + Hash 实现高性能队列 + * + * @author lzh + */ +@Slf4j +@Service +public class RedisOrderQueueServiceImpl implements RedisOrderQueueService { + + @Resource + private RedisTemplate redisTemplate; + + @Resource + private ObjectMapper objectMapper; + + /** + * Score 计算公式:优先级分数 + 时间戳 + * 优先级分数:P0=0, P1=1000000, P2=2000000, P3=3000000 + * 时间戳:毫秒级时间戳 + * 结果:优先级高的排在前面,同优先级按时间排序 + */ + private static final Map PRIORITY_SCORES = Map.of( + 0, 0L, // P0: 0 + 1, 1000000L, // P1: 1,000,000 + 2, 2000000L, // P2: 2,000,000 + 3, 3000000L // P3: 3,000,000 + ); + + @Override + public boolean enqueue(OrderQueueDTO dto) { + try { + String queueKey = QUEUE_KEY_PREFIX + dto.getUserId(); + String infoKey = INFO_KEY_PREFIX + dto.getId(); + + // 1. 计算分数(优先级 + 时间戳) + double score = calculateScore(dto.getPriority(), dto.getEnqueueTime()); + + // 2. 序列化为 JSON + String json = objectMapper.writeValueAsString(dto); + + // 3. 添加到 Sorted Set + redisTemplate.opsForZSet().add(queueKey, json, score); + + // 4. 存储详细信息到 Hash + Map infoMap = convertToMap(dto); + redisTemplate.opsForHash().putAll(infoKey, infoMap); + redisTemplate.expire(infoKey, 24, TimeUnit.HOURS); + + log.debug("Redis 入队成功: queueId={}, orderId={}, score={}", + dto.getId(), dto.getOpsOrderId(), score); + + return true; + + } catch (Exception e) { + log.error("Redis 入队失败: queueId={}", dto.getId(), e); + return false; + } + } + + @Override + public long batchEnqueue(List dtos) { + if (dtos == null || dtos.isEmpty()) { + return 0; + } + + try { + // 使用 Pipeline 批量操作 + redisTemplate.executePipelined((org.springframework.data.redis.core.RedisCallback) connection -> { + dtos.forEach(dto -> { + try { + byte[] queueKey = (QUEUE_KEY_PREFIX + dto.getUserId()).getBytes(); + byte[] infoKey = (INFO_KEY_PREFIX + dto.getId()).getBytes(); + + // 计算分数 + double score = calculateScore(dto.getPriority(), dto.getEnqueueTime()); + + // 序列化 + byte[] json = objectMapper.writeValueAsBytes(dto); + + // 添加到 Sorted Set + connection.zAdd(queueKey, score, json); + + // 存储详细信息 + Map infoMap = convertToByteMap(dto); + connection.hMSet(infoKey, infoMap); + + // 设置过期时间 + connection.expire(infoKey, 86400); // 24小时 + + } catch (Exception e) { + log.error("批量入队失败: queueId={}", dto.getId(), e); + } + }); + return null; + }); + + log.info("Redis 批量入队成功: count={}", dtos.size()); + return dtos.size(); + + } catch (Exception e) { + log.error("Redis 批量入队失败", e); + return 0; + } + } + + @Override + public OrderQueueDTO dequeue(Long cleanerId) { + try { + String queueKey = QUEUE_KEY_PREFIX + cleanerId; + + // 使用 ZPOPMIN 原子性出队(获取并移除最高优先级任务) + // popMin 返回单个 TypedTuple + org.springframework.data.redis.core.ZSetOperations.TypedTuple typedTuple = + redisTemplate.opsForZSet().popMin(queueKey); + + if (typedTuple == null) { + return null; + } + + // 从 TypedTuple 中获取值 + Object taskObj = typedTuple.getValue(); + String json = (String) taskObj; + + // 反序列化 + OrderQueueDTO dto = objectMapper.readValue(json, OrderQueueDTO.class); + + // 从 Hash 中删除详细信息 + String infoKey = INFO_KEY_PREFIX + dto.getId(); + redisTemplate.delete(infoKey); + + log.debug("Redis 出队成功: queueId={}, orderId={}, cleanerId={}", + dto.getId(), dto.getOpsOrderId(), cleanerId); + + return dto; + + } catch (Exception e) { + log.error("Redis 出队失败: cleanerId={}", cleanerId, e); + return null; + } + } + + @Override + public List peekTasks(Long cleanerId, int count) { + try { + String queueKey = QUEUE_KEY_PREFIX + cleanerId; + + // 查询前 N 个任务(不删除) + Set tasks = redisTemplate.opsForZSet().range(queueKey, 0, count - 1); + + if (tasks == null || tasks.isEmpty()) { + return Collections.emptyList(); + } + + // 反序列化 + return tasks.stream() + .map(task -> { + try { + String json = (String) task; + return objectMapper.readValue(json, OrderQueueDTO.class); + } catch (Exception e) { + log.error("反序列化失败", e); + return null; + } + }) + .filter(Objects::nonNull) + .collect(Collectors.toList()); + + } catch (Exception e) { + log.error("Redis 查询任务失败: cleanerId={}, count={}", cleanerId, count, e); + return Collections.emptyList(); + } + } + + @Override + public long getQueueSize(Long cleanerId) { + try { + String queueKey = QUEUE_KEY_PREFIX + cleanerId; + Long size = redisTemplate.opsForZSet().size(queueKey); + return size != null ? size : 0; + } catch (Exception e) { + log.error("Redis 查询队列长度失败: cleanerId={}", cleanerId, e); + return 0; + } + } + + @Override + public long clearQueue(Long cleanerId) { + try { + String queueKey = QUEUE_KEY_PREFIX + cleanerId; + redisTemplate.delete(queueKey); + log.info("Redis 清空队列成功: cleanerId={}", cleanerId); + return 0; // Redis 不返回删除数量 + } catch (Exception e) { + log.error("Redis 清空队列失败: cleanerId={}", cleanerId, e); + return 0; + } + } + + @Override + public boolean updateStatus(Long queueId, String newStatus) { + try { + String infoKey = INFO_KEY_PREFIX + queueId; + + // 更新 Hash 中的状态 + redisTemplate.opsForHash().put(infoKey, "queueStatus", newStatus); + + // TODO: 如果需要同步更新 Sorted Set 中的数据,需要先删除再重新添加 + // 这里简化处理,只更新 Hash + + log.debug("Redis 更新状态成功: queueId={}, newStatus={}", queueId, newStatus); + return true; + + } catch (Exception e) { + log.error("Redis 更新状态失败: queueId={}, newStatus={}", queueId, newStatus, e); + return false; + } + } + + @Override + public boolean updatePriority(Long queueId, Integer newPriority) { + try { + String infoKey = INFO_KEY_PREFIX + queueId; + + // 从 Hash 中获取数据 + Map infoMap = redisTemplate.opsForHash().entries(infoKey); + if (infoMap.isEmpty()) { + log.warn("队列记录不存在: queueId={}", queueId); + return false; + } + + // 更新优先级 + infoMap.put("priority", newPriority); + + // 反序列化回 DTO + OrderQueueDTO dto = mapToDto(infoMap); + if (dto == null) { + return false; + } + + // 更新 Sorted Set(先删除旧的,再添加新的) + String queueKey = QUEUE_KEY_PREFIX + dto.getUserId(); + String oldJson = objectMapper.writeValueAsString(dto); + + dto.setPriority(newPriority); + double newScore = calculateScore(newPriority, dto.getEnqueueTime()); + String newJson = objectMapper.writeValueAsString(dto); + + // 使用 Lua 脚本原子性更新 + String script = + "local old = redis.call('ZREM', KEYS[1], ARGV[1]) " + + "redis.call('ZADD', KEYS[1], ARGV[2], ARGV[3]) " + + "redis.call('HMSET', KEYS[2], 'priority', ARGV[4]) " + + "return old"; + + redisTemplate.execute( + new DefaultRedisScript<>(script, Long.class), + Arrays.asList(queueKey, infoKey), + oldJson, newScore, newJson, newPriority + ); + + log.debug("Redis 更新优先级成功: queueId={}, newPriority={}", queueId, newPriority); + return true; + + } catch (Exception e) { + log.error("Redis 更新优先级失败: queueId={}, newPriority={}", queueId, newPriority, e); + return false; + } + } + + @Override + public boolean remove(Long queueId) { + try { + String infoKey = INFO_KEY_PREFIX + queueId; + + // 从 Hash 中获取数据 + Map infoMap = redisTemplate.opsForHash().entries(infoKey); + if (infoMap.isEmpty()) { + return false; + } + + Object userIdObj = infoMap.get("userId"); + if (userIdObj == null) { + return false; + } + + Long userId = Long.parseLong(userIdObj.toString()); + String queueKey = QUEUE_KEY_PREFIX + userId; + + // 从 Sorted Set 中删除 + String json = objectMapper.writeValueAsString(infoMap); + redisTemplate.opsForZSet().remove(queueKey, json); + + // 删除 Hash + redisTemplate.delete(infoKey); + + log.debug("Redis 删除队列记录成功: queueId={}", queueId); + return true; + + } catch (Exception e) { + log.error("Redis 删除队列记录失败: queueId={}", queueId, e); + return false; + } + } + + @Override + public List getTasksByUserId(Long cleanerId) { + return peekTasks(cleanerId, Integer.MAX_VALUE); + } + + @Override + public OrderQueueDTO getByQueueId(Long queueId) { + try { + String infoKey = INFO_KEY_PREFIX + queueId; + + Map infoMap = redisTemplate.opsForHash().entries(infoKey); + if (infoMap.isEmpty()) { + return null; + } + + return mapToDto(infoMap); + + } catch (Exception e) { + log.error("Redis 查询队列记录失败: queueId={}", queueId, e); + return null; + } + } + + @Override + public OrderQueueDTO getByOrderId(Long orderId) { + try { + // 查询所有保洁员的队列(效率较低,慎用) + Set keys = redisTemplate.keys(QUEUE_KEY_PREFIX + "*"); + if (keys == null || keys.isEmpty()) { + return null; + } + + for (String queueKey : keys) { + Set tasks = redisTemplate.opsForZSet().range(queueKey, 0, -1); + if (tasks != null) { + for (Object task : tasks) { + try { + OrderQueueDTO dto = objectMapper.readValue((String) task, OrderQueueDTO.class); + if (dto.getOpsOrderId().equals(orderId)) { + return dto; + } + } catch (Exception ignored) { + } + } + } + } + + return null; + + } catch (Exception e) { + log.error("Redis 查询工单失败: orderId={}", orderId, e); + return null; + } + } + + @Override + public boolean exists(Long orderId) { + return getByOrderId(orderId) != null; + } + + @Override + public boolean tryLock(Long cleanerId, String lockValue, long expireTime) { + try { + String lockKey = LOCK_KEY_PREFIX + cleanerId; + + Boolean acquired = redisTemplate.opsForValue() + .setIfAbsent(lockKey, lockValue, expireTime, TimeUnit.MILLISECONDS); + + log.debug("尝试获取锁: cleanerId={}, acquired={}", cleanerId, acquired); + + return Boolean.TRUE.equals(acquired); + + } catch (Exception e) { + log.error("获取分布式锁失败: cleanerId={}", cleanerId, e); + return false; + } + } + + @Override + public boolean unlock(Long cleanerId, String lockValue) { + try { + String lockKey = LOCK_KEY_PREFIX + cleanerId; + + // 使用 Lua 脚本确保只删除自己的锁 + String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end"; + + Long result = redisTemplate.execute( + new DefaultRedisScript<>(script, Long.class), + Collections.singletonList(lockKey), + lockValue + ); + + boolean unlocked = result != null && result == 1; + log.debug("释放分布式锁: cleanerId={}, unlocked={}", cleanerId, unlocked); + + return unlocked; + + } catch (Exception e) { + log.error("释放分布式锁失败: cleanerId={}", cleanerId, e); + return false; + } + } + + @Override + public boolean forceUnlock(Long cleanerId) { + try { + String lockKey = LOCK_KEY_PREFIX + cleanerId; + redisTemplate.delete(lockKey); + + log.warn("强制释放分布式锁: cleanerId={}", cleanerId); + return true; + + } catch (Exception e) { + log.error("强制释放分布式锁失败: cleanerId={}", cleanerId, e); + return false; + } + } + + // ========== 私有方法 ========== + + /** + * 计算分数(优先级 + 时间戳) + */ + private double calculateScore(Integer priority, LocalDateTime enqueueTime) { + long priorityScore = PRIORITY_SCORES.getOrDefault(priority, 3000000L); + long timestamp; + if (enqueueTime != null) { + timestamp = enqueueTime + .atZone(ZoneId.systemDefault()) + .toEpochSecond(); + } else { + timestamp = System.currentTimeMillis() / 1000; + } + + return priorityScore + timestamp; + } + + /** + * 将 DTO 转换为 Map(用于 Hash 存储) + */ + private Map convertToMap(OrderQueueDTO dto) { + Map map = new HashMap<>(); + map.put("id", dto.getId()); + map.put("opsOrderId", dto.getOpsOrderId()); + map.put("userId", dto.getUserId()); + map.put("queueIndex", dto.getQueueIndex()); + map.put("priority", dto.getPriority()); + map.put("queueStatus", dto.getQueueStatus()); + + // LocalDateTime 转换为字符串存储 + if (dto.getEnqueueTime() != null) { + map.put("enqueueTime", dto.getEnqueueTime().toString()); + } + + return map; + } + + /** + * 将 DTO 转换为 Byte Map(用于 Pipeline) + */ + private Map convertToByteMap(OrderQueueDTO dto) throws Exception { + Map map = new HashMap<>(); + map.put("id".getBytes(), String.valueOf(dto.getId()).getBytes()); + map.put("opsOrderId".getBytes(), String.valueOf(dto.getOpsOrderId()).getBytes()); + map.put("userId".getBytes(), String.valueOf(dto.getUserId()).getBytes()); + map.put("queueIndex".getBytes(), String.valueOf(dto.getQueueIndex()).getBytes()); + map.put("priority".getBytes(), String.valueOf(dto.getPriority()).getBytes()); + map.put("queueStatus".getBytes(), dto.getQueueStatus().getBytes()); + + // LocalDateTime 转换为字符串存储 + if (dto.getEnqueueTime() != null) { + map.put("enqueueTime".getBytes(), dto.getEnqueueTime().toString().getBytes()); + } + + return map; + } + + /** + * 将 Map 转换回 DTO + */ + private OrderQueueDTO mapToDto(Map map) { + try { + OrderQueueDTO dto = new OrderQueueDTO(); + dto.setId(Long.parseLong(map.get("id").toString())); + dto.setOpsOrderId(Long.parseLong(map.get("opsOrderId").toString())); + dto.setUserId(Long.parseLong(map.get("userId").toString())); + dto.setQueueIndex((Integer) map.get("queueIndex")); + dto.setPriority((Integer) map.get("priority")); + dto.setQueueStatus((String) map.get("queueStatus")); + + // 字符串转换为 LocalDateTime + Object enqueueTimeObj = map.get("enqueueTime"); + if (enqueueTimeObj != null) { + dto.setEnqueueTime(LocalDateTime.parse(enqueueTimeObj.toString())); + } + + return dto; + } catch (Exception e) { + log.error("Map 转 DTO 失败", e); + return null; + } + } +}