chore: 【ops】工单排队队列实现

This commit is contained in:
lzh
2026-01-06 10:50:20 +08:00
parent 9ef2730fd0
commit 46926e8127
5 changed files with 1793 additions and 0 deletions

View File

@@ -0,0 +1,197 @@
package com.viewsh.module.ops.api.queue;
import com.viewsh.module.ops.enums.OrderQueueStatusEnum;
import com.viewsh.module.ops.enums.PriorityEnum;
import java.util.List;
/**
* 工单队列管理服务接口
* 提供工单队列的核心管理功能,包括入队、出队、状态变更、优先级调整等
*
* @author lzh
*/
public interface OrderQueueService {
/**
* 工单入队
* 将工单加入派单队列,等待派单
*
* @param opsOrderId 工单ID
* @param userId 执行人员ID保洁员
* @param priority 优先级
* @param queueIndex 队列顺序(可选,用于同优先级排序)
* @return 队列记录ID
*/
Long enqueue(Long opsOrderId, Long userId, PriorityEnum priority, Integer queueIndex);
/**
* 工单出队
* 将工单从队列中移除(通常用于工单完成或取消)
*
* @param queueId 队列记录ID
* @return 是否成功
*/
boolean dequeue(Long queueId);
/**
* 根据工单ID出队
*
* @param opsOrderId 工单ID
* @return 是否成功
*/
boolean dequeueByOpsOrderId(Long opsOrderId);
/**
* 更新队列状态
*
* @param queueId 队列记录ID
* @param newStatus 新状态
* @return 是否成功
*/
boolean updateStatus(Long queueId, OrderQueueStatusEnum newStatus);
/**
* 开始执行任务(出队)
* 将队列状态从 WAITING 改为 dispatched并记录 dequeueTime
*
* @param queueId 队列记录ID
* @return 是否成功
*/
boolean startExecution(Long queueId);
/**
* 暂停任务
* 将队列状态从 dispatched 改为 PAUSED并记录暂停时长
*
* @param queueId 队列记录ID
* @return 是否成功
*/
boolean pauseTask(Long queueId);
/**
* 恢复任务
* 将队列状态从 PAUSED 改为 dispatched
*
* @param queueId 队列记录ID
* @return 是否成功
*/
boolean resumeTask(Long queueId);
/**
* 调整优先级
* 动态调整工单的优先级(用于紧急插队等场景)
*
* @param queueId 队列记录ID
* @param newPriority 新优先级
* @param reason 调整原因
* @return 是否成功
*/
boolean adjustPriority(Long queueId, PriorityEnum newPriority, String reason);
/**
* 根据工单ID调整优先级
*
* @param opsOrderId 工单ID
* @param newPriority 新优先级
* @param reason 调整原因
* @return 是否成功
*/
boolean adjustPriorityByOpsOrderId(Long opsOrderId, PriorityEnum newPriority, String reason);
/**
* 升级为紧急任务
* 将工单优先级调整为P0并触发紧急派单流程
*
* @param opsOrderId 工单ID
* @param reason 升级原因
* @return 是否成功
*/
boolean upgradeToUrgent(Long opsOrderId, String reason);
/**
* 获取等待中的队列列表
*
* @return 等待中的工单列表,按优先级排序
*/
List<OrderQueueDTO> getWaitingQueue();
/**
* 获取P0紧急任务列表
*
* @return P0紧急任务列表
*/
List<OrderQueueDTO> getUrgentOrders();
/**
* 获取用户的当前任务
*
* @param userId 用户ID保洁员
* @return 当前任务信息
*/
OrderQueueDTO getCurrentTaskByUserId(Long userId);
/**
* 获取用户的任务列表
*
* @param userId 用户ID
* @return 任务列表
*/
List<OrderQueueDTO> getTasksByUserId(Long userId);
/**
* 获取用户的等待中任务列表WAITING状态按队列分数排序
*
* 队列分数计算公式:优先级分数 + 时间戳
* - P0: 0 + timestamp
* - P1: 1000000 + timestamp
* - P2: 2000000 + timestamp
* - P3: 3000000 + timestamp
*
* 结果:优先级高的排在前面,同优先级按入队时间排序
*
* @param userId 用户ID
* @return 等待中的任务列表(已按队列分数排序)
*/
List<OrderQueueDTO> getWaitingTasksByUserId(Long userId);
/**
* 查询队列记录
*
* @param queueId 队列记录ID
* @return 队列记录
*/
OrderQueueDTO getById(Long queueId);
/**
* 根据工单ID查询队列记录
*
* @param opsOrderId 工单ID
* @return 队列记录
*/
OrderQueueDTO getByOpsOrderId(Long opsOrderId);
/**
* 统计等待中的任务数量
*
* @return 任务数量
*/
Long countWaiting();
/**
* 统计指定用户的任务数量
*
* @param userId 用户ID
* @return 任务数量
*/
Long countByUserId(Long userId);
/**
* 更新队列记录的事件消息
*
* @param queueId 队列记录ID
* @param eventMessage 事件消息
* @return 是否成功
*/
boolean updateEventMessage(Long queueId, String eventMessage);
}

View File

@@ -0,0 +1,662 @@
package com.viewsh.module.ops.service.queue;
import com.viewsh.module.ops.api.queue.OrderQueueDTO;
import com.viewsh.module.ops.api.queue.OrderQueueService;
import com.viewsh.module.ops.dal.dataobject.queue.OpsOrderQueueDO;
import com.viewsh.module.ops.dal.mysql.queue.OpsOrderQueueMapper;
import com.viewsh.module.ops.enums.OrderQueueStatusEnum;
import com.viewsh.module.ops.enums.PriorityEnum;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeanUtils;
import org.springframework.context.annotation.Primary;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
/**
* 工单队列管理服务实现Redis + MySQL 混合架构)
*
* 架构说明:
* 1. 写入:先写 MySQL持久化再异步写 Redis高性能
* 2. 读取:优先读 Redis未命中则读 MySQL 并同步到 Redis
* 3. 同步:定时任务将 MySQL 数据同步到 Redis
* 4. 容灾Redis 宕机时降级到纯 MySQL 模式
*
* @author lzh
*/
@Slf4j
@Service
public class OrderQueueServiceEnhanced implements OrderQueueService {
/**
* Score 计算公式:优先级分数 + 时间戳
* 优先级分数P0=0, P1=1000000, P2=2000000, P3=3000000
* 时间戳:秒级时间戳
* 结果:优先级高的排在前面,同优先级按时间排序
*/
private static final Map<Integer, Long> PRIORITY_SCORES = Map.of(
0, 0L, // P0: 0
1, 1000000L, // P1: 1,000,000
2, 2000000L, // P2: 2,000,000
3, 3000000L // P3: 3,000,000
);
@Resource
private OpsOrderQueueMapper orderQueueMapper;
@Resource
private RedisOrderQueueService redisQueueService;
@Resource
private QueueSyncService queueSyncService;
@Override
@Transactional(rollbackFor = Exception.class)
public Long enqueue(Long opsOrderId, Long userId, PriorityEnum priority, Integer queueIndex) {
// 1. 检查工单是否已在队列中MySQL
OpsOrderQueueDO existingQueue = orderQueueMapper.selectByOpsOrderId(opsOrderId);
if (existingQueue != null) {
log.warn("工单已在队列中: opsOrderId={}, userId={}", opsOrderId, userId);
return existingQueue.getId();
}
// 2. 计算队列分数
LocalDateTime now = LocalDateTime.now();
double queueScore = calculateQueueScore(priority.getPriority(), now);
// 3. 创建队列记录MySQL
OpsOrderQueueDO queueDO = OpsOrderQueueDO.builder()
.opsOrderId(opsOrderId)
.userId(userId)
.queueIndex(queueIndex != null ? queueIndex : calculateNextQueueIndex(priority))
.priority(priority.getPriority())
.queueScore(queueScore)
.queueStatus(OrderQueueStatusEnum.WAITING.getStatus())
.enqueueTime(now)
.pausedDuration(0)
.eventMessage("工单入队,等待派单")
.build();
orderQueueMapper.insert(queueDO);
log.info("工单已入队(MySQL): opsOrderId={}, userId={}, priority={}, queueId={}",
opsOrderId, userId, priority, queueDO.getId());
// 3. 异步写入 Redis失败不影响主流程
OrderQueueDTO dto = convertToDTO(queueDO);
CompletableFuture.runAsync(() -> {
try {
redisQueueService.enqueue(dto);
log.debug("工单已入队(Redis): queueId={}", dto.getId());
} catch (Exception e) {
log.error("Redis 入队失败,依赖定时同步任务补偿: queueId={}", dto.getId(), e);
}
});
// 4. 如果是P0紧急任务记录警告日志
if (priority.isUrgent()) {
log.warn("检测到P0紧急任务入队需立即处理: opsOrderId={}", opsOrderId);
// TODO: 触发紧急派单流程(在派单引擎中实现)
}
return queueDO.getId();
}
@Override
@Transactional(rollbackFor = Exception.class)
public boolean dequeue(Long queueId) {
// 1. MySQL 删除
OpsOrderQueueDO queueDO = orderQueueMapper.selectById(queueId);
if (queueDO == null) {
log.warn("队列记录不存在: queueId={}", queueId);
return false;
}
// 只允许删除终态的记录
if (!isTerminalStatus(queueDO.getQueueStatus())) {
log.warn("只能删除终态的队列记录: queueId={}, status={}", queueId, queueDO.getQueueStatus());
return false;
}
int deleted = orderQueueMapper.deleteById(queueId);
// 2. 异步删除 Redis
if (deleted > 0) {
CompletableFuture.runAsync(() -> {
try {
redisQueueService.remove(queueId);
log.debug("Redis 删除队列记录成功: queueId={}", queueId);
} catch (Exception e) {
log.error("Redis 删除队列记录失败: queueId={}", queueId, e);
}
});
}
log.info("队列记录已删除: queueId={}, deleted={}", queueId, deleted > 0);
return deleted > 0;
}
@Override
@Transactional(rollbackFor = Exception.class)
public boolean dequeueByOpsOrderId(Long opsOrderId) {
OpsOrderQueueDO queueDO = orderQueueMapper.selectByOpsOrderId(opsOrderId);
if (queueDO == null) {
log.warn("工单不在队列中: opsOrderId={}", opsOrderId);
return false;
}
return dequeue(queueDO.getId());
}
@Override
@Transactional(rollbackFor = Exception.class)
public boolean updateStatus(Long queueId, OrderQueueStatusEnum newStatus) {
// 1. MySQL 更新
OpsOrderQueueDO queueDO = orderQueueMapper.selectById(queueId);
if (queueDO == null) {
log.warn("队列记录不存在: queueId={}", queueId);
return false;
}
String oldStatus = queueDO.getQueueStatus();
// 状态流转校验
if (!validateStatusTransition(oldStatus, newStatus.getStatus())) {
log.warn("非法的状态流转: queueId={}, oldStatus={}, newStatus={}",
queueId, oldStatus, newStatus.getStatus());
return false;
}
queueDO.setQueueStatus(newStatus.getStatus().toUpperCase());
updateStatusTimestamp(queueDO, newStatus);
int updated = orderQueueMapper.updateById(queueDO);
// 2. 异步更新 Redis
if (updated > 0) {
OrderQueueDTO dto = convertToDTO(queueDO);
CompletableFuture.runAsync(() -> {
try {
redisQueueService.updateStatus(queueId, newStatus.getStatus());
log.debug("Redis 更新状态成功: queueId={}, status={}", queueId, newStatus.getStatus());
} catch (Exception e) {
log.error("Redis 更新状态失败: queueId={}", queueId, e);
}
});
}
log.info("队列状态已更新: queueId={}, opsOrderId={}, oldStatus={}, newStatus={}",
queueId, queueDO.getOpsOrderId(), oldStatus, newStatus.getStatus());
return updated > 0;
}
@Override
@Transactional(rollbackFor = Exception.class)
public boolean startExecution(Long queueId) {
OpsOrderQueueDO queueDO = orderQueueMapper.selectById(queueId);
if (queueDO == null) {
log.warn("队列记录不存在: queueId={}", queueId);
return false;
}
if (!OrderQueueStatusEnum.WAITING.getStatus().equals(queueDO.getQueueStatus())) {
log.warn("只有WAITING状态的任务可以开始执行: queueId={}, status={}",
queueId, queueDO.getQueueStatus());
return false;
}
// 队列状态流转WAITING → DISPATCHED
queueDO.setQueueStatus(OrderQueueStatusEnum.DISPATCHED.getStatus());
queueDO.setDequeueTime(LocalDateTime.now());
queueDO.setEventMessage("派单成功,已分配给执行人员");
int updated = orderQueueMapper.updateById(queueDO);
// 异步更新 Redis
if (updated > 0) {
CompletableFuture.runAsync(() -> {
try {
redisQueueService.updateStatus(queueId, OrderQueueStatusEnum.DISPATCHED.getStatus());
} catch (Exception e) {
log.error("Redis 更新状态失败: queueId={}", queueId, e);
}
});
}
log.info("派单成功: queueId={}, opsOrderId={}, assigneeId={}",
queueId, queueDO.getOpsOrderId(), queueDO.getUserId());
return updated > 0;
}
@Override
@Transactional(rollbackFor = Exception.class)
public boolean pauseTask(Long queueId) {
OpsOrderQueueDO queueDO = orderQueueMapper.selectById(queueId);
if (queueDO == null) {
log.warn("队列记录不存在: queueId={}", queueId);
return false;
}
if (!OrderQueueStatusEnum.DISPATCHED.getStatus().equals(queueDO.getQueueStatus())) {
log.warn("只有DISPATCHED状态的任务可以暂停: queueId={}, status={}",
queueId, queueDO.getQueueStatus());
return false;
}
// 队列状态流转DISPATCHED → PAUSED
queueDO.setQueueStatus(OrderQueueStatusEnum.PAUSED.getStatus());
// 计算暂停时长
if (queueDO.getDequeueTime() != null) {
long pausedSeconds = java.time.Duration.between(
queueDO.getDequeueTime(),
LocalDateTime.now()
).getSeconds();
queueDO.addPausedDuration((int) pausedSeconds);
}
queueDO.setEventMessage("任务已暂停");
int updated = orderQueueMapper.updateById(queueDO);
// 异步更新 Redis
if (updated > 0) {
CompletableFuture.runAsync(() -> {
try {
redisQueueService.updateStatus(queueId, OrderQueueStatusEnum.PAUSED.getStatus());
} catch (Exception e) {
log.error("Redis 更新状态失败: queueId={}", queueId, e);
}
});
}
log.info("任务已暂停: queueId={}, opsOrderId={}, pausedDuration={}",
queueId, queueDO.getOpsOrderId(), queueDO.getPausedDuration());
return updated > 0;
}
@Override
@Transactional(rollbackFor = Exception.class)
public boolean resumeTask(Long queueId) {
OpsOrderQueueDO queueDO = orderQueueMapper.selectById(queueId);
if (queueDO == null) {
log.warn("队列记录不存在: queueId={}", queueId);
return false;
}
if (!OrderQueueStatusEnum.PAUSED.getStatus().equals(queueDO.getQueueStatus())) {
log.warn("只有PAUSED状态的任务可以恢复: queueId={}, status={}",
queueId, queueDO.getQueueStatus());
return false;
}
// 队列状态流转PAUSED → DISPATCHED
queueDO.setQueueStatus(OrderQueueStatusEnum.DISPATCHED.getStatus());
queueDO.setDequeueTime(LocalDateTime.now()); // 重置出队时间
queueDO.setEventMessage("任务已恢复执行");
int updated = orderQueueMapper.updateById(queueDO);
// 异步更新 Redis
if (updated > 0) {
CompletableFuture.runAsync(() -> {
try {
redisQueueService.updateStatus(queueId, OrderQueueStatusEnum.DISPATCHED.getStatus());
} catch (Exception e) {
log.error("Redis 更新状态失败: queueId={}", queueId, e);
}
});
}
log.info("任务已恢复执行: queueId={}, opsOrderId={}", queueId, queueDO.getOpsOrderId());
return updated > 0;
}
@Override
@Transactional(rollbackFor = Exception.class)
public boolean adjustPriority(Long queueId, PriorityEnum newPriority, String reason) {
OpsOrderQueueDO queueDO = orderQueueMapper.selectById(queueId);
if (queueDO == null) {
log.warn("队列记录不存在: queueId={}", queueId);
return false;
}
Integer oldPriority = queueDO.getPriority();
// 如果优先级没有变化,直接返回
if (oldPriority.equals(newPriority.getPriority())) {
log.info("优先级未变化: queueId={}, priority={}", queueId, newPriority);
return true;
}
queueDO.setPriorityEnum(newPriority);
// 重新计算队列顺序
queueDO.setQueueIndex(calculateNextQueueIndex(newPriority));
// 重新计算队列分数(使用原入队时间保持时间戳不变)
LocalDateTime enqueueTime = queueDO.getEnqueueTime() != null ? queueDO.getEnqueueTime() : LocalDateTime.now();
double newQueueScore = calculateQueueScore(newPriority.getPriority(), enqueueTime);
queueDO.setQueueScore(newQueueScore);
queueDO.setEventMessage("优先级调整: " + oldPriority + " -> " + newPriority + ", 原因: " + reason);
int updated = orderQueueMapper.updateById(queueDO);
// 异步更新 Redis
if (updated > 0) {
CompletableFuture.runAsync(() -> {
try {
redisQueueService.updatePriority(queueId, newPriority.getPriority());
} catch (Exception e) {
log.error("Redis 更新优先级失败: queueId={}", queueId, e);
}
});
}
log.info("优先级已调整: queueId={}, opsOrderId={}, oldPriority={}, newPriority={}, reason={}, newScore={}",
queueId, queueDO.getOpsOrderId(), oldPriority, newPriority, reason, newQueueScore);
// 如果升级为P0触发紧急派单
if (newPriority.isUrgent()) {
log.warn("任务升级为P0紧急需立即处理: queueId={}, opsOrderId={}",
queueId, queueDO.getOpsOrderId());
// TODO: 触发紧急派单流程
}
return updated > 0;
}
@Override
@Transactional(rollbackFor = Exception.class)
public boolean adjustPriorityByOpsOrderId(Long opsOrderId, PriorityEnum newPriority, String reason) {
OpsOrderQueueDO queueDO = orderQueueMapper.selectByOpsOrderId(opsOrderId);
if (queueDO == null) {
log.warn("工单不在队列中: opsOrderId={}", opsOrderId);
return false;
}
return adjustPriority(queueDO.getId(), newPriority, reason);
}
@Override
@Transactional(rollbackFor = Exception.class)
public boolean upgradeToUrgent(Long opsOrderId, String reason) {
return adjustPriorityByOpsOrderId(opsOrderId, PriorityEnum.P0, reason);
}
@Override
public List<OrderQueueDTO> getWaitingQueue() {
// 优先从 Redis 获取
// TODO: 实现全局等待队列(需要聚合所有用户的队列)
// 这里暂时使用 MySQL
List<OpsOrderQueueDO> list = orderQueueMapper.selectListWaiting();
return convertToDTO(list);
}
@Override
public List<OrderQueueDTO> getUrgentOrders() {
// TODO: 优先从 Redis 获取
List<OpsOrderQueueDO> list = orderQueueMapper.selectUrgentOrders();
return convertToDTO(list);
}
@Override
public OrderQueueDTO getCurrentTaskByUserId(Long userId) {
// 1. 优先从 Redis 获取
List<OrderQueueDTO> redisTasks = redisQueueService.peekTasks(userId, 1);
if (redisTasks != null && !redisTasks.isEmpty()) {
return redisTasks.get(0);
}
// 2. Redis 未命中,从 MySQL 获取并同步到 Redis
OpsOrderQueueDO queueDO = orderQueueMapper.selectCurrentExecutingByUserId(userId);
if (queueDO != null) {
// 同步到 Redis
OrderQueueDTO dto = convertToDTO(queueDO);
CompletableFuture.runAsync(() -> {
try {
redisQueueService.enqueue(dto);
} catch (Exception e) {
log.error("同步到 Redis 失败: queueId={}", dto.getId(), e);
}
});
return dto;
}
return null;
}
@Override
public List<OrderQueueDTO> getTasksByUserId(Long userId) {
// 1. 优先从 Redis 获取
List<OrderQueueDTO> redisTasks = redisQueueService.getTasksByUserId(userId);
if (redisTasks != null && !redisTasks.isEmpty()) {
return redisTasks;
}
// 2. Redis 未命中,从 MySQL 获取并同步到 Redis
List<OpsOrderQueueDO> mysqlList = orderQueueMapper.selectListByUserId(userId);
if (mysqlList != null && !mysqlList.isEmpty()) {
// 同步到 Redis
List<OrderQueueDTO> dtoList = convertToDTO(mysqlList);
CompletableFuture.runAsync(() -> {
try {
redisQueueService.batchEnqueue(dtoList);
} catch (Exception e) {
log.error("批量同步到 Redis 失败: userId={}", userId, e);
}
});
return dtoList;
}
return Collections.emptyList();
}
@Override
public List<OrderQueueDTO> getWaitingTasksByUserId(Long userId) {
// 获取所有任务
List<OrderQueueDTO> allTasks = getTasksByUserId(userId);
// 过滤出 WAITING 状态的任务,并按队列分数排序
return allTasks.stream()
.filter(task -> OrderQueueStatusEnum.WAITING.getStatus().equals(task.getQueueStatus()))
.sorted((a, b) -> {
// 优先使用队列分数 score 排序(已在入队时计算好)
if (a.getQueueScore() != null && b.getQueueScore() != null) {
return Double.compare(a.getQueueScore(), b.getQueueScore());
}
// 兜底:如果 score 为空,则按优先级+时间排序
// 按优先级排序P0 > P1 > P2 > P3
int priorityCompare = Integer.compare(
b.getPriority() != null ? b.getPriority() : 999,
a.getPriority() != null ? a.getPriority() : 999
);
if (priorityCompare != 0) {
return priorityCompare;
}
// 优先级相同,按入队时间排序
return a.getEnqueueTime().compareTo(b.getEnqueueTime());
})
.collect(Collectors.toList());
}
@Override
public OrderQueueDTO getById(Long queueId) {
// 1. 优先从 Redis Hash 获取
OrderQueueDTO redisDTO = redisQueueService.getByQueueId(queueId);
if (redisDTO != null) {
return redisDTO;
}
// 2. Redis 未命中,从 MySQL 获取
OpsOrderQueueDO queueDO = orderQueueMapper.selectById(queueId);
if (queueDO != null) {
OrderQueueDTO dto = convertToDTO(queueDO);
// 同步到 Redis
CompletableFuture.runAsync(() -> {
try {
redisQueueService.enqueue(dto);
} catch (Exception e) {
log.error("同步到 Redis 失败: queueId={}", dto.getId(), e);
}
});
return dto;
}
return null;
}
@Override
public OrderQueueDTO getByOpsOrderId(Long opsOrderId) {
// 1. 优先从 Redis 获取(遍历所有用户队列,性能较差,慎用)
OrderQueueDTO redisDTO = redisQueueService.getByOrderId(opsOrderId);
if (redisDTO != null) {
return redisDTO;
}
// 2. Redis 未命中,从 MySQL 获取
OpsOrderQueueDO queueDO = orderQueueMapper.selectByOpsOrderId(opsOrderId);
if (queueDO != null) {
return convertToDTO(queueDO);
}
return null;
}
@Override
public Long countWaiting() {
// 优先从 Redis 获取(需要聚合所有用户)
// TODO: 实现分布式计数器
return orderQueueMapper.countWaiting();
}
@Override
public Long countByUserId(Long userId) {
// 优先从 Redis 获取
long redisCount = redisQueueService.getQueueSize(userId);
if (redisCount > 0) {
return redisCount;
}
// Redis 未命中,从 MySQL 获取
return orderQueueMapper.countByUserId(userId);
}
@Override
@Transactional(rollbackFor = Exception.class)
public boolean updateEventMessage(Long queueId, String eventMessage) {
OpsOrderQueueDO queueDO = orderQueueMapper.selectById(queueId);
if (queueDO == null) {
log.warn("队列记录不存在: queueId={}", queueId);
return false;
}
queueDO.setEventMessage(eventMessage);
int updated = orderQueueMapper.updateById(queueDO);
log.info("事件消息已更新: queueId={}, eventMessage={}", queueId, eventMessage);
return updated > 0;
}
// ========== 私有方法 ==========
/**
* 计算队列分数(用于排序)
* 公式:优先级分数 + 时间戳
*
* @param priority 优先级0=P0, 1=P1, 2=P2, 3=P3
* @param enqueueTime 入队时间
* @return 队列分数
*/
private double calculateQueueScore(Integer priority, LocalDateTime enqueueTime) {
// 获取优先级分数
long priorityScore = PRIORITY_SCORES.getOrDefault(priority, 3000000L);
// 计算时间戳(秒级)
long timestamp;
if (enqueueTime != null) {
timestamp = enqueueTime.atZone(ZoneId.systemDefault()).toEpochSecond();
} else {
timestamp = System.currentTimeMillis() / 1000;
}
return priorityScore + timestamp;
}
/**
* 计算下一个队列顺序号
*/
private Integer calculateNextQueueIndex(PriorityEnum priority) {
// TODO: 查询当前优先级的最大 queueIndex然后 +1
// 这里简化处理,返回默认值
return priority.getQueueOrder() * 1000;
}
/**
* 判断是否为终态
*/
private boolean isTerminalStatus(String status) {
return "CANCELLED".equals(status) || "COMPLETED".equals(status) || "FAILED".equals(status);
}
/**
* 校验状态流转是否合法
*/
private boolean validateStatusTransition(String oldStatus, String newStatus) {
// 终态不能再变更
if (isTerminalStatus(oldStatus)) {
return false;
}
// 允许的状态流转(简化版本,根据实际表结构)
return switch (oldStatus.toUpperCase()) {
case "WAITING" -> "DISPATCHED".equals(newStatus) || "CANCELLED".equals(newStatus);
case "DISPATCHED" -> "PAUSED".equals(newStatus) || "CANCELLED".equals(newStatus);
case "PAUSED" -> "DISPATCHED".equals(newStatus) || "CANCELLED".equals(newStatus);
default -> false;
};
}
/**
* 根据状态更新时间戳
*/
private void updateStatusTimestamp(OpsOrderQueueDO queueDO, OrderQueueStatusEnum status) {
switch (status.getStatus().toUpperCase()) {
case "DISPATCHED" -> {
if (queueDO.getDequeueTime() == null) {
queueDO.setDequeueTime(LocalDateTime.now());
}
}
case "CANCELLED", "COMPLETED" -> {
// 可以记录完成时间(如果表有该字段)
}
}
}
/**
* 转换为 DTO
*/
private OrderQueueDTO convertToDTO(OpsOrderQueueDO queueDO) {
OrderQueueDTO dto = new OrderQueueDTO();
BeanUtils.copyProperties(queueDO, dto);
return dto;
}
/**
* 批量转换为 DTO
*/
private List<OrderQueueDTO> convertToDTO(List<OpsOrderQueueDO> list) {
return list.stream()
.map(this::convertToDTO)
.collect(Collectors.toList());
}
}

View File

@@ -0,0 +1,244 @@
package com.viewsh.module.ops.service.queue;
import com.viewsh.module.ops.api.queue.OrderQueueDTO;
import com.viewsh.module.ops.dal.dataobject.queue.OpsOrderQueueDO;
import com.viewsh.module.ops.dal.mysql.queue.OpsOrderQueueMapper;
import com.viewsh.module.ops.enums.OrderQueueStatusEnum;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.time.LocalDateTime;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
/**
* 队列同步服务
* 职责:
* 1. 定时将 MySQL 数据同步到 Redis
* 2. Redis 宕机恢复
* 3. 数据一致性保障
*
* @author lzh
*/
@Slf4j
@Service
public class QueueSyncService {
@Resource
private OpsOrderQueueMapper orderQueueMapper;
@Resource
private RedisOrderQueueService redisQueueService;
/**
* 定时同步任务每5分钟执行一次
* 将最近1小时内变更的数据同步到 Redis
*/
@Scheduled(cron = "0 */5 * * * ?")
public void syncMySQLToRedis() {
try {
log.info("开始定时同步MySQL -> Redis");
// 1. 查询最近1小时内变更的数据
LocalDateTime startTime = LocalDateTime.now().minusHours(1);
List<OpsOrderQueueDO> changedDOList = orderQueueMapper.selectList(
new com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper<OpsOrderQueueDO>()
.ge(OpsOrderQueueDO::getUpdateTime, startTime)
.in(OpsOrderQueueDO::getQueueStatus,
OrderQueueStatusEnum.WAITING.getStatus(),
OrderQueueStatusEnum.DISPATCHED.getStatus(),
OrderQueueStatusEnum.PAUSED.getStatus())
);
if (changedDOList.isEmpty()) {
log.info("没有需要同步的数据");
return;
}
// 2. 转换为 DTO
List<OrderQueueDTO> changedDTOList = changedDOList.stream()
.map(this::convertToDTO)
.collect(Collectors.toList());
// 3. 批量同步到 Redis
long syncCount = redisQueueService.batchEnqueue(changedDTOList);
log.info("定时同步完成:同步{}条记录", syncCount);
} catch (Exception e) {
log.error("定时同步失败", e);
}
}
/**
* 同步指定保洁员的队列到 Redis
* 用于 Redis 宕机恢复或数据不一致修复
*
* @param cleanerId 保洁员ID
* @return 同步的记录数量
*/
@Transactional(readOnly = true)
public int syncUserQueueToRedis(Long cleanerId) {
try {
log.info("开始同步保洁员队列到Redis: cleanerId={}", cleanerId);
// 1. 查询该保洁员的所有活跃队列记录
List<OpsOrderQueueDO> queueDOList = orderQueueMapper.selectList(
new com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper<OpsOrderQueueDO>()
.eq(OpsOrderQueueDO::getUserId, cleanerId)
.in(OpsOrderQueueDO::getQueueStatus,
OrderQueueStatusEnum.WAITING.getStatus(),
OrderQueueStatusEnum.DISPATCHED.getStatus(),
OrderQueueStatusEnum.PAUSED.getStatus())
);
if (queueDOList.isEmpty()) {
log.info("保洁员队列为空: cleanerId={}", cleanerId);
return 0;
}
// 2. 清空 Redis 中的旧数据
redisQueueService.clearQueue(cleanerId);
// 3. 批量同步到 Redis
List<OrderQueueDTO> queueDTOList = queueDOList.stream()
.map(this::convertToDTO)
.collect(Collectors.toList());
long syncCount = redisQueueService.batchEnqueue(queueDTOList);
log.info("同步保洁员队列完成: cleanerId={}, count={}", cleanerId, syncCount);
return (int) syncCount;
} catch (Exception e) {
log.error("同步保洁员队列失败: cleanerId={}", cleanerId, e);
return 0;
}
}
/**
* 数据一致性校验
* 对比 Redis 和 MySQL 的数据,返回不一致的记录
*
* @param cleanerId 保洁员ID
* @return 不一致的记录数量
*/
public int validateConsistency(Long cleanerId) {
try {
log.info("开始校验数据一致性: cleanerId={}", cleanerId);
// 1. 从 Redis 获取队列长度
long redisCount = redisQueueService.getQueueSize(cleanerId);
// 2. 从 MySQL 获取队列长度
Long mysqlCount = orderQueueMapper.selectCount(
new com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper<OpsOrderQueueDO>()
.eq(OpsOrderQueueDO::getUserId, cleanerId)
.in(OpsOrderQueueDO::getQueueStatus,
OrderQueueStatusEnum.WAITING.getStatus(),
OrderQueueStatusEnum.DISPATCHED.getStatus(),
OrderQueueStatusEnum.PAUSED.getStatus())
);
int diff = (int) (redisCount - (mysqlCount != null ? mysqlCount : 0));
if (diff != 0) {
log.warn("数据不一致: cleanerId={}, redisCount={}, mysqlCount={}, diff={}",
cleanerId, redisCount, mysqlCount, diff);
// 自动触发同步
syncUserQueueToRedis(cleanerId);
} else {
log.info("数据一致性校验通过: cleanerId={}", cleanerId);
}
return Math.abs(diff);
} catch (Exception e) {
log.error("数据一致性校验失败: cleanerId={}", cleanerId, e);
return -1;
}
}
/**
* 强制全量同步
* 用于数据修复或紧急恢复
*
* @return 同步的记录数量
*/
@Transactional(readOnly = true)
public int forceSyncAll() {
try {
log.warn("开始强制全量同步");
// 1. 查询所有活跃队列记录
List<OpsOrderQueueDO> allQueueDOList = orderQueueMapper.selectList(
new com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper<OpsOrderQueueDO>()
.in(OpsOrderQueueDO::getQueueStatus,
OrderQueueStatusEnum.WAITING.getStatus(),
OrderQueueStatusEnum.DISPATCHED.getStatus(),
OrderQueueStatusEnum.PAUSED.getStatus())
);
if (allQueueDOList.isEmpty()) {
log.info("没有需要同步的数据");
return 0;
}
// 2. 按保洁员分组
Map<Long, List<OpsOrderQueueDO>> groupedByUser = allQueueDOList.stream()
.collect(Collectors.groupingBy(OpsOrderQueueDO::getUserId));
int totalCount = 0;
// 3. 逐个保洁员同步
for (Map.Entry<Long, List<OpsOrderQueueDO>> entry : groupedByUser.entrySet()) {
Long cleanerId = entry.getKey();
List<OpsOrderQueueDO> queueDOList = entry.getValue();
// 清空 Redis
redisQueueService.clearQueue(cleanerId);
// 批量同步
List<OrderQueueDTO> queueDTOList = queueDOList.stream()
.map(this::convertToDTO)
.collect(Collectors.toList());
long count = redisQueueService.batchEnqueue(queueDTOList);
totalCount += count;
log.info("同步保洁员队列: cleanerId={}, count={}", cleanerId, count);
}
log.warn("强制全量同步完成: totalCount={}", totalCount);
return totalCount;
} catch (Exception e) {
log.error("强制全量同步失败", e);
return 0;
}
}
/**
* 将 DO 转换为 DTO
*/
private OrderQueueDTO convertToDTO(OpsOrderQueueDO queueDO) {
OrderQueueDTO dto = new OrderQueueDTO();
dto.setId(queueDO.getId());
dto.setOpsOrderId(queueDO.getOpsOrderId());
dto.setUserId(queueDO.getUserId());
dto.setQueueIndex(queueDO.getQueueIndex());
dto.setPriority(queueDO.getPriority());
dto.setQueueStatus(queueDO.getQueueStatus());
dto.setEnqueueTime(queueDO.getEnqueueTime());
dto.setDequeueTime(queueDO.getDequeueTime());
dto.setPausedDuration(queueDO.getPausedDuration());
dto.setEventMessage(queueDO.getEventMessage());
return dto;
}
}

View File

@@ -0,0 +1,163 @@
package com.viewsh.module.ops.service.queue;
import com.viewsh.module.ops.api.queue.OrderQueueDTO;
import java.util.List;
/**
* Redis 工单队列服务接口
*
* @author lzh
*/
public interface RedisOrderQueueService {
/**
* Redis Key 前缀
*/
String QUEUE_KEY_PREFIX = "ops:order:queue:";
String INFO_KEY_PREFIX = "ops:order:queue:info:";
String LOCK_KEY_PREFIX = "ops:order:queue:lock:";
// ========== 队列操作 ==========
/**
* 入队(添加到 Redis Sorted Set
*
* @param dto 队列记录
* @return 是否成功
*/
boolean enqueue(OrderQueueDTO dto);
/**
* 批量入队(使用 Pipeline
*
* @param dtos 队列记录列表
* @return 成功入队的数量
*/
long batchEnqueue(List<OrderQueueDTO> dtos);
/**
* 出队(移除并返回最高优先级任务)
* 使用 ZPOPMIN 命令,原子性操作
*
* @param cleanerId 保洁员ID
* @return 队列记录,如果队列为空返回 null
*/
OrderQueueDTO dequeue(Long cleanerId);
/**
* 查询队列中前 N 个任务(不删除)
*
* @param cleanerId 保洁员ID
* @param count 查询数量
* @return 任务列表
*/
List<OrderQueueDTO> peekTasks(Long cleanerId, int count);
/**
* 获取队列长度
*
* @param cleanerId 保洁员ID
* @return 队列长度
*/
long getQueueSize(Long cleanerId);
/**
* 清空队列
*
* @param cleanerId 保洁员ID
* @return 清空的数量
*/
long clearQueue(Long cleanerId);
// ========== 状态更新 ==========
/**
* 更新队列状态
*
* @param queueId 队列ID
* @param newStatus 新状态
* @return 是否成功
*/
boolean updateStatus(Long queueId, String newStatus);
/**
* 更新优先级
*
* @param queueId 队列ID
* @param newPriority 新优先级
* @return 是否成功
*/
boolean updatePriority(Long queueId, Integer newPriority);
/**
* 从队列中移除指定任务
*
* @param queueId 队列ID
* @return 是否成功
*/
boolean remove(Long queueId);
// ========== 查询操作 ==========
/**
* 查询指定保洁员的所有任务
*
* @param cleanerId 保洁员ID
* @return 任务列表
*/
List<OrderQueueDTO> getTasksByUserId(Long cleanerId);
/**
* 查询指定队列记录的详细信息
*
* @param queueId 队列ID
* @return 队列记录
*/
OrderQueueDTO getByQueueId(Long queueId);
/**
* 查询指定工单的队列记录
*
* @param orderId 工单ID
* @return 队列记录
*/
OrderQueueDTO getByOrderId(Long orderId);
/**
* 检查工单是否已在队列中
*
* @param orderId 工单ID
* @return 是否存在
*/
boolean exists(Long orderId);
// ========== 分布式锁 ==========
/**
* 尝试获取分布式锁
*
* @param cleanerId 保洁员ID
* @param lockValue 锁值UUID
* @param expireTime 过期时间(毫秒)
* @return 是否获取成功
*/
boolean tryLock(Long cleanerId, String lockValue, long expireTime);
/**
* 释放分布式锁
*
* @param cleanerId 保洁员ID
* @param lockValue 锁值
* @return 是否释放成功
*/
boolean unlock(Long cleanerId, String lockValue);
/**
* 强制释放锁(不管锁值是否匹配)
*
* @param cleanerId 保洁员ID
* @return 是否释放成功
*/
boolean forceUnlock(Long cleanerId);
}

View File

@@ -0,0 +1,527 @@
package com.viewsh.module.ops.service.queue;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.viewsh.module.ops.api.queue.OrderQueueDTO;
import com.viewsh.module.ops.enums.PriorityEnum;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.ZSetOperations;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.stereotype.Service;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
/**
* Redis 工单队列服务实现
* 基于 Redis Sorted Set + Hash 实现高性能队列
*
* @author lzh
*/
@Slf4j
@Service
public class RedisOrderQueueServiceImpl implements RedisOrderQueueService {
@Resource
private RedisTemplate<String, Object> redisTemplate;
@Resource
private ObjectMapper objectMapper;
/**
* Score 计算公式:优先级分数 + 时间戳
* 优先级分数P0=0, P1=1000000, P2=2000000, P3=3000000
* 时间戳:毫秒级时间戳
* 结果:优先级高的排在前面,同优先级按时间排序
*/
private static final Map<Integer, Long> PRIORITY_SCORES = Map.of(
0, 0L, // P0: 0
1, 1000000L, // P1: 1,000,000
2, 2000000L, // P2: 2,000,000
3, 3000000L // P3: 3,000,000
);
@Override
public boolean enqueue(OrderQueueDTO dto) {
try {
String queueKey = QUEUE_KEY_PREFIX + dto.getUserId();
String infoKey = INFO_KEY_PREFIX + dto.getId();
// 1. 计算分数(优先级 + 时间戳)
double score = calculateScore(dto.getPriority(), dto.getEnqueueTime());
// 2. 序列化为 JSON
String json = objectMapper.writeValueAsString(dto);
// 3. 添加到 Sorted Set
redisTemplate.opsForZSet().add(queueKey, json, score);
// 4. 存储详细信息到 Hash
Map<String, Object> infoMap = convertToMap(dto);
redisTemplate.opsForHash().putAll(infoKey, infoMap);
redisTemplate.expire(infoKey, 24, TimeUnit.HOURS);
log.debug("Redis 入队成功: queueId={}, orderId={}, score={}",
dto.getId(), dto.getOpsOrderId(), score);
return true;
} catch (Exception e) {
log.error("Redis 入队失败: queueId={}", dto.getId(), e);
return false;
}
}
@Override
public long batchEnqueue(List<OrderQueueDTO> dtos) {
if (dtos == null || dtos.isEmpty()) {
return 0;
}
try {
// 使用 Pipeline 批量操作
redisTemplate.executePipelined((org.springframework.data.redis.core.RedisCallback<Object>) connection -> {
dtos.forEach(dto -> {
try {
byte[] queueKey = (QUEUE_KEY_PREFIX + dto.getUserId()).getBytes();
byte[] infoKey = (INFO_KEY_PREFIX + dto.getId()).getBytes();
// 计算分数
double score = calculateScore(dto.getPriority(), dto.getEnqueueTime());
// 序列化
byte[] json = objectMapper.writeValueAsBytes(dto);
// 添加到 Sorted Set
connection.zAdd(queueKey, score, json);
// 存储详细信息
Map<byte[], byte[]> infoMap = convertToByteMap(dto);
connection.hMSet(infoKey, infoMap);
// 设置过期时间
connection.expire(infoKey, 86400); // 24小时
} catch (Exception e) {
log.error("批量入队失败: queueId={}", dto.getId(), e);
}
});
return null;
});
log.info("Redis 批量入队成功: count={}", dtos.size());
return dtos.size();
} catch (Exception e) {
log.error("Redis 批量入队失败", e);
return 0;
}
}
@Override
public OrderQueueDTO dequeue(Long cleanerId) {
try {
String queueKey = QUEUE_KEY_PREFIX + cleanerId;
// 使用 ZPOPMIN 原子性出队(获取并移除最高优先级任务)
// popMin 返回单个 TypedTuple<Object>
org.springframework.data.redis.core.ZSetOperations.TypedTuple<Object> typedTuple =
redisTemplate.opsForZSet().popMin(queueKey);
if (typedTuple == null) {
return null;
}
// 从 TypedTuple 中获取值
Object taskObj = typedTuple.getValue();
String json = (String) taskObj;
// 反序列化
OrderQueueDTO dto = objectMapper.readValue(json, OrderQueueDTO.class);
// 从 Hash 中删除详细信息
String infoKey = INFO_KEY_PREFIX + dto.getId();
redisTemplate.delete(infoKey);
log.debug("Redis 出队成功: queueId={}, orderId={}, cleanerId={}",
dto.getId(), dto.getOpsOrderId(), cleanerId);
return dto;
} catch (Exception e) {
log.error("Redis 出队失败: cleanerId={}", cleanerId, e);
return null;
}
}
@Override
public List<OrderQueueDTO> peekTasks(Long cleanerId, int count) {
try {
String queueKey = QUEUE_KEY_PREFIX + cleanerId;
// 查询前 N 个任务(不删除)
Set<Object> tasks = redisTemplate.opsForZSet().range(queueKey, 0, count - 1);
if (tasks == null || tasks.isEmpty()) {
return Collections.emptyList();
}
// 反序列化
return tasks.stream()
.map(task -> {
try {
String json = (String) task;
return objectMapper.readValue(json, OrderQueueDTO.class);
} catch (Exception e) {
log.error("反序列化失败", e);
return null;
}
})
.filter(Objects::nonNull)
.collect(Collectors.toList());
} catch (Exception e) {
log.error("Redis 查询任务失败: cleanerId={}, count={}", cleanerId, count, e);
return Collections.emptyList();
}
}
@Override
public long getQueueSize(Long cleanerId) {
try {
String queueKey = QUEUE_KEY_PREFIX + cleanerId;
Long size = redisTemplate.opsForZSet().size(queueKey);
return size != null ? size : 0;
} catch (Exception e) {
log.error("Redis 查询队列长度失败: cleanerId={}", cleanerId, e);
return 0;
}
}
@Override
public long clearQueue(Long cleanerId) {
try {
String queueKey = QUEUE_KEY_PREFIX + cleanerId;
redisTemplate.delete(queueKey);
log.info("Redis 清空队列成功: cleanerId={}", cleanerId);
return 0; // Redis 不返回删除数量
} catch (Exception e) {
log.error("Redis 清空队列失败: cleanerId={}", cleanerId, e);
return 0;
}
}
@Override
public boolean updateStatus(Long queueId, String newStatus) {
try {
String infoKey = INFO_KEY_PREFIX + queueId;
// 更新 Hash 中的状态
redisTemplate.opsForHash().put(infoKey, "queueStatus", newStatus);
// TODO: 如果需要同步更新 Sorted Set 中的数据,需要先删除再重新添加
// 这里简化处理,只更新 Hash
log.debug("Redis 更新状态成功: queueId={}, newStatus={}", queueId, newStatus);
return true;
} catch (Exception e) {
log.error("Redis 更新状态失败: queueId={}, newStatus={}", queueId, newStatus, e);
return false;
}
}
@Override
public boolean updatePriority(Long queueId, Integer newPriority) {
try {
String infoKey = INFO_KEY_PREFIX + queueId;
// 从 Hash 中获取数据
Map<Object, Object> infoMap = redisTemplate.opsForHash().entries(infoKey);
if (infoMap.isEmpty()) {
log.warn("队列记录不存在: queueId={}", queueId);
return false;
}
// 更新优先级
infoMap.put("priority", newPriority);
// 反序列化回 DTO
OrderQueueDTO dto = mapToDto(infoMap);
if (dto == null) {
return false;
}
// 更新 Sorted Set先删除旧的再添加新的
String queueKey = QUEUE_KEY_PREFIX + dto.getUserId();
String oldJson = objectMapper.writeValueAsString(dto);
dto.setPriority(newPriority);
double newScore = calculateScore(newPriority, dto.getEnqueueTime());
String newJson = objectMapper.writeValueAsString(dto);
// 使用 Lua 脚本原子性更新
String script =
"local old = redis.call('ZREM', KEYS[1], ARGV[1]) " +
"redis.call('ZADD', KEYS[1], ARGV[2], ARGV[3]) " +
"redis.call('HMSET', KEYS[2], 'priority', ARGV[4]) " +
"return old";
redisTemplate.execute(
new DefaultRedisScript<>(script, Long.class),
Arrays.asList(queueKey, infoKey),
oldJson, newScore, newJson, newPriority
);
log.debug("Redis 更新优先级成功: queueId={}, newPriority={}", queueId, newPriority);
return true;
} catch (Exception e) {
log.error("Redis 更新优先级失败: queueId={}, newPriority={}", queueId, newPriority, e);
return false;
}
}
@Override
public boolean remove(Long queueId) {
try {
String infoKey = INFO_KEY_PREFIX + queueId;
// 从 Hash 中获取数据
Map<Object, Object> infoMap = redisTemplate.opsForHash().entries(infoKey);
if (infoMap.isEmpty()) {
return false;
}
Object userIdObj = infoMap.get("userId");
if (userIdObj == null) {
return false;
}
Long userId = Long.parseLong(userIdObj.toString());
String queueKey = QUEUE_KEY_PREFIX + userId;
// 从 Sorted Set 中删除
String json = objectMapper.writeValueAsString(infoMap);
redisTemplate.opsForZSet().remove(queueKey, json);
// 删除 Hash
redisTemplate.delete(infoKey);
log.debug("Redis 删除队列记录成功: queueId={}", queueId);
return true;
} catch (Exception e) {
log.error("Redis 删除队列记录失败: queueId={}", queueId, e);
return false;
}
}
@Override
public List<OrderQueueDTO> getTasksByUserId(Long cleanerId) {
return peekTasks(cleanerId, Integer.MAX_VALUE);
}
@Override
public OrderQueueDTO getByQueueId(Long queueId) {
try {
String infoKey = INFO_KEY_PREFIX + queueId;
Map<Object, Object> infoMap = redisTemplate.opsForHash().entries(infoKey);
if (infoMap.isEmpty()) {
return null;
}
return mapToDto(infoMap);
} catch (Exception e) {
log.error("Redis 查询队列记录失败: queueId={}", queueId, e);
return null;
}
}
@Override
public OrderQueueDTO getByOrderId(Long orderId) {
try {
// 查询所有保洁员的队列(效率较低,慎用)
Set<String> keys = redisTemplate.keys(QUEUE_KEY_PREFIX + "*");
if (keys == null || keys.isEmpty()) {
return null;
}
for (String queueKey : keys) {
Set<Object> tasks = redisTemplate.opsForZSet().range(queueKey, 0, -1);
if (tasks != null) {
for (Object task : tasks) {
try {
OrderQueueDTO dto = objectMapper.readValue((String) task, OrderQueueDTO.class);
if (dto.getOpsOrderId().equals(orderId)) {
return dto;
}
} catch (Exception ignored) {
}
}
}
}
return null;
} catch (Exception e) {
log.error("Redis 查询工单失败: orderId={}", orderId, e);
return null;
}
}
@Override
public boolean exists(Long orderId) {
return getByOrderId(orderId) != null;
}
@Override
public boolean tryLock(Long cleanerId, String lockValue, long expireTime) {
try {
String lockKey = LOCK_KEY_PREFIX + cleanerId;
Boolean acquired = redisTemplate.opsForValue()
.setIfAbsent(lockKey, lockValue, expireTime, TimeUnit.MILLISECONDS);
log.debug("尝试获取锁: cleanerId={}, acquired={}", cleanerId, acquired);
return Boolean.TRUE.equals(acquired);
} catch (Exception e) {
log.error("获取分布式锁失败: cleanerId={}", cleanerId, e);
return false;
}
}
@Override
public boolean unlock(Long cleanerId, String lockValue) {
try {
String lockKey = LOCK_KEY_PREFIX + cleanerId;
// 使用 Lua 脚本确保只删除自己的锁
String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
Long result = redisTemplate.execute(
new DefaultRedisScript<>(script, Long.class),
Collections.singletonList(lockKey),
lockValue
);
boolean unlocked = result != null && result == 1;
log.debug("释放分布式锁: cleanerId={}, unlocked={}", cleanerId, unlocked);
return unlocked;
} catch (Exception e) {
log.error("释放分布式锁失败: cleanerId={}", cleanerId, e);
return false;
}
}
@Override
public boolean forceUnlock(Long cleanerId) {
try {
String lockKey = LOCK_KEY_PREFIX + cleanerId;
redisTemplate.delete(lockKey);
log.warn("强制释放分布式锁: cleanerId={}", cleanerId);
return true;
} catch (Exception e) {
log.error("强制释放分布式锁失败: cleanerId={}", cleanerId, e);
return false;
}
}
// ========== 私有方法 ==========
/**
* 计算分数(优先级 + 时间戳)
*/
private double calculateScore(Integer priority, LocalDateTime enqueueTime) {
long priorityScore = PRIORITY_SCORES.getOrDefault(priority, 3000000L);
long timestamp;
if (enqueueTime != null) {
timestamp = enqueueTime
.atZone(ZoneId.systemDefault())
.toEpochSecond();
} else {
timestamp = System.currentTimeMillis() / 1000;
}
return priorityScore + timestamp;
}
/**
* 将 DTO 转换为 Map用于 Hash 存储)
*/
private Map<String, Object> convertToMap(OrderQueueDTO dto) {
Map<String, Object> map = new HashMap<>();
map.put("id", dto.getId());
map.put("opsOrderId", dto.getOpsOrderId());
map.put("userId", dto.getUserId());
map.put("queueIndex", dto.getQueueIndex());
map.put("priority", dto.getPriority());
map.put("queueStatus", dto.getQueueStatus());
// LocalDateTime 转换为字符串存储
if (dto.getEnqueueTime() != null) {
map.put("enqueueTime", dto.getEnqueueTime().toString());
}
return map;
}
/**
* 将 DTO 转换为 Byte Map用于 Pipeline
*/
private Map<byte[], byte[]> convertToByteMap(OrderQueueDTO dto) throws Exception {
Map<byte[], byte[]> map = new HashMap<>();
map.put("id".getBytes(), String.valueOf(dto.getId()).getBytes());
map.put("opsOrderId".getBytes(), String.valueOf(dto.getOpsOrderId()).getBytes());
map.put("userId".getBytes(), String.valueOf(dto.getUserId()).getBytes());
map.put("queueIndex".getBytes(), String.valueOf(dto.getQueueIndex()).getBytes());
map.put("priority".getBytes(), String.valueOf(dto.getPriority()).getBytes());
map.put("queueStatus".getBytes(), dto.getQueueStatus().getBytes());
// LocalDateTime 转换为字符串存储
if (dto.getEnqueueTime() != null) {
map.put("enqueueTime".getBytes(), dto.getEnqueueTime().toString().getBytes());
}
return map;
}
/**
* 将 Map 转换回 DTO
*/
private OrderQueueDTO mapToDto(Map<Object, Object> map) {
try {
OrderQueueDTO dto = new OrderQueueDTO();
dto.setId(Long.parseLong(map.get("id").toString()));
dto.setOpsOrderId(Long.parseLong(map.get("opsOrderId").toString()));
dto.setUserId(Long.parseLong(map.get("userId").toString()));
dto.setQueueIndex((Integer) map.get("queueIndex"));
dto.setPriority((Integer) map.get("priority"));
dto.setQueueStatus((String) map.get("queueStatus"));
// 字符串转换为 LocalDateTime
Object enqueueTimeObj = map.get("enqueueTime");
if (enqueueTimeObj != null) {
dto.setEnqueueTime(LocalDateTime.parse(enqueueTimeObj.toString()));
}
return dto;
} catch (Exception e) {
log.error("Map 转 DTO 失败", e);
return null;
}
}
}