feat(ops): 重构派单队列评分逻辑,支持楼层差与等待老化综合排序

- 新增 QueueScoreCalculator/QueueScoreContext/QueueScoreResult,统一按优先级分 + 楼层差分 - 等待老化分计算队列总分,并将 PRIORITY_WEIGHT 调整为 1500
- OrderQueueService 新增 rebuildWaitingTasksByUserId 接口,OrderQueueServiceEnhanced 支持按执行人重算 WAITING 队列、以当前执行工单楼层为基准动态重排,并在事务提交后同步刷新 Redis
- RedisOrderQueueServiceImpl 支持持久化 baseFloorNo、targetFloorNo、floorDiff、waitMinutes、scoreUpdateTime 等评分明细,清队列时同时清理关联 Hash,避免脏数据残留
- DispatchEngineImpl、CleanerPriorityScheduleStrategy、BadgeDeviceScheduleStrategy 调整为非抢占式派单:P0 忙碌时仅入队等待,空闲时直接派发,自动派单前按总分重排并派发下一单
- CleanOrderServiceImpl 取消 P0 自动打断链路,升级到 P0 后仅重算等待队列并发送通知;补充 QueueScoreCalculatorTest、OrderQueueServiceEnhancedTest、CleanerPriorityScheduleStrategyTest、CleanOrderEndToEndTest 覆盖新行为
This commit is contained in:
lzh
2026-03-07 21:12:48 +08:00
parent 26c4ce07eb
commit a9fd9313cc
16 changed files with 674 additions and 261 deletions

View File

@@ -245,13 +245,13 @@ public class CleanOrderServiceImpl implements CleanOrderService {
if (queueDTO != null) {
orderQueueService.adjustPriority(queueDTO.getId(), PriorityEnum.P0, reason);
// 5. 使用新的调度引擎处理P0紧急插队
DispatchResult result = dispatchEngine.urgentInterrupt(orderId, queueDTO.getUserId());
// 5. 重算等待队列P0 不再打断当前任务
orderQueueService.rebuildWaitingTasksByUserId(queueDTO.getUserId(), order.getAreaId());
// 6. 发送优先级升级通知
cleanOrderEventListener.sendPriorityUpgradeNotification(queueDTO.getUserId(), order.getOrderCode(), orderId);
return result.isSuccess();
return true;
}
return true;
@@ -289,11 +289,11 @@ public class CleanOrderServiceImpl implements CleanOrderService {
if (queueDTO != null) {
orderQueueService.adjustPriority(queueDTO.getId(), newPriority, reason);
// 6. 如果升级到 P0触发紧急打断逻辑
// 6. 如果升级到 P0仅重算等待队列,不再触发打断
if (newPriority == PriorityEnum.P0) {
DispatchResult result = dispatchEngine.urgentInterrupt(orderId, queueDTO.getUserId());
orderQueueService.rebuildWaitingTasksByUserId(queueDTO.getUserId(), order.getAreaId());
cleanOrderEventListener.sendPriorityUpgradeNotification(queueDTO.getUserId(), order.getOrderCode(), orderId);
log.warn("客流升级到P0触发紧急打断: orderId={}, success={}", orderId, result.isSuccess());
log.warn("客流升级到P0已重算等待队列: orderId={}", orderId);
}
}

View File

@@ -23,13 +23,13 @@ import java.util.List;
* <p>
* 职责:怎么派单
* <p>
* 策略规则:
* <ul>
* <li>空闲无任务 → DIRECT_DISPATCH直接派单</li>
* <li>空闲有等待 → PUSH_AND_ENQUEUE推送等待+新任务入队)</li>
* <li>忙碌且非P0 → ENQUEUE_ONLY仅入队</li>
* <li>忙碌且P0 → INTERRUPT_AND_DISPATCH打断并派单</li>
* </ul>
* 策略规则:
* <ul>
* <li>空闲无任务 → DIRECT_DISPATCH直接派单</li>
* <li>空闲有等待 → PUSH_AND_ENQUEUE推送等待+新任务入队)</li>
* <li>忙碌且非P0 → ENQUEUE_ONLY仅入队</li>
* <li>忙碌且P0 → ENQUEUE_ONLY不抢断进入等待队列</li>
* </ul>
*
* @author lzh
*/
@@ -106,12 +106,12 @@ public class BadgeDeviceScheduleStrategy implements ScheduleStrategy {
// P0紧急任务需要打断
if (assigneeStatus.getCurrentTaskCount() > 0) {
Long currentOrderId = deviceStatus.getCurrentOpsOrderId();
log.warn("决策: INTERRUPT_AND_DISPATCH - P0紧急任务打断当前任务: currentOrderId={}", currentOrderId);
return DispatchDecision.interruptAndDispatch(currentOrderId);
} else {
log.info("决策: DIRECT_DISPATCH - P0紧急任务直接派");
return DispatchDecision.directDispatch();
}
log.info("决策: ENQUEUE_ONLY - P0工单不再打断当前任务,进入等待队列: currentOrderId={}", currentOrderId);
return DispatchDecision.enqueueOnly();
} else {
log.info("决策: DIRECT_DISPATCH - P0工单在空闲状态下直接派");
return DispatchDecision.directDispatch();
}
} else {
// 非紧急任务,设备忙碌,入队等待
log.info("决策: ENQUEUE_ONLY - 设备忙碌,任务入队等待");
@@ -133,15 +133,14 @@ public class BadgeDeviceScheduleStrategy implements ScheduleStrategy {
// P0任务可以打断任何任务
if (urgentContext.isUrgent()) {
log.warn("允许打断: P0紧急任务可以打断当前任务");
return InterruptDecision.allowByDefault();
}
// P1/P2任务不能打断
log.info("拒绝打断: 非P0任务不能打断当前任务");
return InterruptDecision.deny(
"紧急任务优先级不足",
"建议等待当前任务完成"
);
log.info("拒绝打断: 当前调度已改为非抢占式队列派发");
return InterruptDecision.deny("当前调度不再支持抢断", "工单将按队列总分在下一轮派发");
}
log.info("拒绝打断: 当前调度已改为非抢占式队列派发");
return InterruptDecision.deny(
"当前调度不再支持抢断",
"工单将按队列总分在下一轮派发"
);
}
}

View File

@@ -28,7 +28,7 @@ import java.util.List;
* <li>空闲无任务 → DIRECT_DISPATCH直接派单</li>
* <li>空闲有等待 → PUSH_AND_ENQUEUE推送等待+新任务入队)</li>
* <li>忙碌且非P0 → ENQUEUE_ONLY仅入队</li>
* <li>忙碌且P0 → INTERRUPT_AND_DISPATCH打断并派单</li>
* <li>忙碌且P0 → ENQUEUE_ONLY不抢断进入等待队列</li>
* </ul>
*
* @author lzh
@@ -106,10 +106,10 @@ public class CleanerPriorityScheduleStrategy implements ScheduleStrategy {
// P0紧急任务需要打断
if (assigneeStatus.getCurrentTaskCount() > 0) {
Long currentOrderId = cleanerStatus.getCurrentOpsOrderId();
log.warn("决策: INTERRUPT_AND_DISPATCH - P0紧急任务打断当前任务: currentOrderId={}", currentOrderId);
return DispatchDecision.interruptAndDispatch(currentOrderId);
log.info("决策: ENQUEUE_ONLY - P0工单不再打断当前任务,进入等待队列: currentOrderId={}", currentOrderId);
return DispatchDecision.enqueueOnly();
} else {
log.info("决策: DIRECT_DISPATCH - P0紧急任务直接派");
log.info("决策: DIRECT_DISPATCH - P0工单在空闲状态下直接派");
return DispatchDecision.directDispatch();
}
} else {
@@ -133,15 +133,14 @@ public class CleanerPriorityScheduleStrategy implements ScheduleStrategy {
// P0任务可以打断任何任务
if (urgentContext.isUrgent()) {
log.warn("允许打断: P0紧急任务可以打断当前任务");
return InterruptDecision.allowByDefault();
log.info("拒绝打断: 当前调度已改为非抢占式队列派发");
return InterruptDecision.deny("当前调度不再支持抢断", "工单将按队列总分在下一轮派发");
}
// P1/P2任务不能打断
log.info("拒绝打断: 非P0任务不能打断当前任务");
log.info("拒绝打断: 当前调度已改为非抢占式队列派发");
return InterruptDecision.deny(
"紧急任务优先级不足",
"建议等待当前任务完成"
"当前调度不再支持抢断",
"工单将按队列总分在下一轮派发"
);
}
}

View File

@@ -9,10 +9,13 @@ import com.viewsh.module.ops.core.event.OrderEventPublisher;
import com.viewsh.module.ops.core.lifecycle.OrderLifecycleManager;
import com.viewsh.module.ops.core.lifecycle.model.OrderTransitionRequest;
import com.viewsh.module.ops.dal.dataobject.workorder.OpsOrderDO;
import com.viewsh.module.ops.dal.dataobject.area.OpsBusAreaDO;
import com.viewsh.module.ops.dal.mysql.area.OpsBusAreaMapper;
import com.viewsh.module.ops.dal.mysql.workorder.OpsOrderMapper;
import com.viewsh.module.ops.enums.PriorityEnum;
import com.viewsh.module.ops.enums.WorkOrderStatusEnum;
import com.viewsh.module.ops.environment.dal.dataobject.workorder.OpsOrderCleanExtDO;
import com.viewsh.module.ops.environment.dal.redis.TrafficActiveOrderRedisDAO;
import com.viewsh.module.ops.environment.dal.mysql.workorder.OpsOrderCleanExtMapper;
import com.viewsh.module.ops.environment.integration.consumer.*;
import com.viewsh.framework.common.pojo.CommonResult;
@@ -73,6 +76,8 @@ public class CleanOrderEndToEndTest {
@Mock
private OpsOrderMapper opsOrderMapper;
@Mock
private OpsBusAreaMapper opsBusAreaMapper;
@Mock
private OpsOrderCleanExtMapper cleanExtMapper;
@Mock
private OrderIdGenerator orderIdGenerator;
@@ -94,6 +99,8 @@ public class CleanOrderEndToEndTest {
private ValueOperations<String, String> valueOperations;
@Mock
private VoiceBroadcastService voiceBroadcastService;
@Mock
private TrafficActiveOrderRedisDAO trafficActiveOrderRedisDAO;
@Mock
private BadgeDeviceStatusService badgeDeviceStatusService;
@@ -149,6 +156,7 @@ public class CleanOrderEndToEndTest {
// 注入 CleanOrderEventListener
injectField(cleanOrderService, "cleanOrderEventListener", cleanOrderEventListener);
injectField(cleanOrderService, "opsBusAreaMapper", opsBusAreaMapper);
// 注入 CleanOrderAuditEventHandler 依赖
injectField(auditEventHandler, "eventLogRecorder", eventLogRecorder);
@@ -156,10 +164,18 @@ public class CleanOrderEndToEndTest {
injectField(auditEventHandler, "opsOrderMapper", opsOrderMapper);
injectField(auditEventHandler, "stringRedisTemplate", stringRedisTemplate);
injectField(auditEventHandler, "objectMapper", objectMapper);
injectField(createEventHandler, "trafficActiveOrderRedisDAO", trafficActiveOrderRedisDAO);
// Stub IotDeviceControlApi for resetTrafficCounter
lenient().when(iotDeviceControlApi.resetTrafficCounter(any()))
.thenReturn(CommonResult.success(true));
lenient().when(opsBusAreaMapper.selectById(anyLong()))
.thenAnswer(i -> OpsBusAreaDO.builder()
.id(i.getArgument(0))
.areaName("测试区域")
.parentPath(null)
.floorNo(1)
.build());
}
// ==========================================
@@ -338,7 +354,7 @@ public class CleanOrderEndToEndTest {
verify(eventLogRecorder).record(any());
// 2. TTS sent (orderId can be null for TTS_REQUEST events)
verify(voiceBroadcastService).broadcastInOrder(eq(5001L), contains("请回到作业区域"), eq((Long) null));
verify(voiceBroadcastService).broadcastDirect(eq(5001L), contains("请回到作业区域"), eq(9), eq((Long) null));
}
// ==========================================
@@ -378,9 +394,6 @@ public class CleanOrderEndToEndTest {
when(opsOrderMapper.selectById(orderId)).thenReturn(order);
when(orderQueueService.getByOpsOrderId(orderId)).thenReturn(queueDTO);
when(dispatchEngine.urgentInterrupt(eq(orderId), eq(2001L)))
.thenReturn(DispatchResult.success("Success", 2001L));
// Execute
boolean result = cleanOrderService.upgradePriorityToP0(orderId, "Manual Upgrade");
@@ -392,7 +405,7 @@ public class CleanOrderEndToEndTest {
assertEquals(PriorityEnum.P0.getPriority(), orderCaptor.getValue().getPriority());
verify(orderQueueService).adjustPriority(eq(500L), eq(PriorityEnum.P0), anyString());
verify(dispatchEngine).urgentInterrupt(orderId, 2001L);
verify(orderQueueService).rebuildWaitingTasksByUserId(2001L, order.getAreaId());
verify(cleanOrderEventListener).sendPriorityUpgradeNotification(eq(2001L), eq("WO-P2"), eq(orderId));
}

View File

@@ -36,7 +36,7 @@ class CleanerPriorityScheduleStrategyTest {
private com.viewsh.module.ops.core.dispatch.DispatchEngine dispatchEngine;
@Test
void testDecide_P0_Interrupt() {
void testDecide_P0_EnqueueOnlyWhenBusy() {
// Setup
OpsCleanerStatusDO c1 = new OpsCleanerStatusDO();
c1.setUserId(1L);
@@ -54,8 +54,7 @@ class CleanerPriorityScheduleStrategyTest {
DispatchDecision decision = strategy.decide(context);
// Verify
assertEquals(DispatchPath.INTERRUPT_AND_DISPATCH, decision.getPath());
assertEquals(500L, decision.getInterruptedOrderId());
assertEquals(DispatchPath.ENQUEUE_ONLY, decision.getPath());
}
@Test

View File

@@ -40,13 +40,8 @@ public class OrderQueueDTO {
/**
* 队列分数(用于排序)
* 计算公式:优先级分 + 时间戳
* - P0: 0 + timestamp
* - P1: 1000000 + timestamp
* - P2: 2000000 + timestamp
* - P3: 3000000 + timestamp
*
* 用于数据库层面的排序,优先级高的排在前面,同优先级按时间排序
* 计算公式:优先级分 + 楼层差分 - 等待老化分
* 分数越小越靠前,用于等待队列的动态重排
*/
private Double queueScore;
@@ -95,6 +90,31 @@ public class OrderQueueDTO {
*/
private LocalDateTime updateTime;
/**
* 评分基准楼层
*/
private Integer baseFloorNo;
/**
* 目标工单楼层
*/
private Integer targetFloorNo;
/**
* 楼层差
*/
private Integer floorDiff;
/**
* 等待分钟数
*/
private Long waitMinutes;
/**
* 分数更新时间
*/
private LocalDateTime scoreUpdateTime;
// ========== 兼容旧字段名的getter方法 ==========
/**

View File

@@ -164,6 +164,15 @@ public interface OrderQueueService {
*/
List<OrderQueueDTO> getWaitingTasksByUserIdFromDb(Long userId);
/**
* 按当前上下文重算指定执行人等待队列的总分并返回最新排序结果
*
* @param userId 执行人ID
* @param fallbackAreaId 当没有执行中工单时可使用的楼层基准区域ID
* @return 已按最新总分排序的 WAITING 工单列表
*/
List<OrderQueueDTO> rebuildWaitingTasksByUserId(Long userId, Long fallbackAreaId);
/**
* 获取用户的暂停任务列表PAUSED状态按暂停时间排序
*

View File

@@ -134,7 +134,7 @@ public class DispatchEngineImpl implements DispatchEngine {
@Override
@Transactional(rollbackFor = Exception.class)
public DispatchResult urgentInterrupt(Long urgentOrderId, Long assigneeId) {
log.warn("开始P0紧急插队: urgentOrderId={}, assigneeId={}", urgentOrderId, assigneeId);
log.warn("处理P0工单派发请求: urgentOrderId={}, assigneeId={}", urgentOrderId, assigneeId);
// 查询紧急工单
OpsOrderDO urgentOrder = orderMapper.selectById(urgentOrderId);
@@ -156,23 +156,15 @@ public class DispatchEngineImpl implements DispatchEngine {
// 查询执行人当前任务
List<OrderQueueDTO> currentTasks = orderQueueService.getTasksByUserId(assigneeId);
// 如果有正在执行的任务,需要打断
// 如果有正在执行的任务,不再打断,重排等待队列后等待下一轮派发
OrderQueueDTO currentTask = currentTasks.stream()
.filter(t -> OrderQueueStatusEnum.PROCESSING.getStatus().equals(t.getQueueStatus()))
.findFirst()
.orElse(null);
if (currentTask != null && !currentTask.getOpsOrderId().equals(urgentOrderId)) {
// 需要打断当前任务
log.info("打断当前任务: currentOrderId={}, urgentOrderId={}",
currentTask.getOpsOrderId(), urgentOrderId);
try {
orderLifecycleManager.interruptOrder(
currentTask.getOpsOrderId(), urgentOrderId, assigneeId);
} catch (Exception e) {
log.warn("打断任务失败: currentOrderId={}", currentTask.getOpsOrderId(), e);
}
orderQueueService.rebuildWaitingTasksByUserId(assigneeId, null);
return DispatchResult.success("P0工单已入队等待不再打断当前任务", assigneeId);
}
// 派发紧急任务
@@ -182,63 +174,42 @@ public class DispatchEngineImpl implements DispatchEngine {
.assigneeId(assigneeId)
.operatorType(OperatorTypeEnum.SYSTEM)
.operatorId(assigneeId)
.reason("P0紧急任务派单")
.reason("P0工单直接派发")
.build();
OrderTransitionResult result = orderLifecycleManager.dispatch(request);
if (result.isSuccess()) {
return DispatchResult.success("P0紧急任务已派单", assigneeId);
return DispatchResult.success("P0工单已直接派发", assigneeId);
} else {
return DispatchResult.fail("P0紧急任务派单失败: " + result.getMessage());
return DispatchResult.fail("P0工单派发失败: " + result.getMessage());
}
}
@Override
@Transactional(rollbackFor = Exception.class)
public DispatchResult autoDispatchNext(Long completedOrderId, Long assigneeId) {
log.info("任务完成后自动调度下一: completedOrderId={}, assigneeId={}", completedOrderId, assigneeId);
log.info("任务完成后自动派发下一: completedOrderId={}, assigneeId={}", completedOrderId, assigneeId);
// 1. 优先检查是否有被中断的任务
List<OrderQueueDTO> interruptedTasks = orderQueueService.getInterruptedTasksByUserId(assigneeId);
if (!interruptedTasks.isEmpty()) {
// 恢复第一个中断的任务
OrderQueueDTO interruptedTask = interruptedTasks.get(0);
log.info("恢复中断任务: orderId={}", interruptedTask.getOpsOrderId());
OrderTransitionRequest request = OrderTransitionRequest.builder()
.orderId(interruptedTask.getOpsOrderId())
.targetStatus(WorkOrderStatusEnum.DISPATCHED)
.queueId(interruptedTask.getId())
.assigneeId(assigneeId)
.operatorType(OperatorTypeEnum.SYSTEM)
.operatorId(assigneeId)
.reason("恢复中断任务")
.build();
OrderTransitionResult result = orderLifecycleManager.transition(request);
if (result.isSuccess()) {
return DispatchResult.success("已恢复中断任务", assigneeId);
} else {
return DispatchResult.fail("恢复中断任务失败: " + result.getMessage());
}
Long fallbackAreaId = null;
OpsOrderDO completedOrder = orderMapper.selectById(completedOrderId);
if (completedOrder != null) {
fallbackAreaId = completedOrder.getAreaId();
}
// 2. 如果没有中断任务,推送队列中的下一个任务
// 注意:这里必须从 MySQL 读取最新数据,确保刚完成的任务不在等待列表中
List<OrderQueueDTO> waitingTasks = orderQueueService.getWaitingTasksByUserIdFromDb(assigneeId);
List<OrderQueueDTO> waitingTasks = orderQueueService.rebuildWaitingTasksByUserId(assigneeId, fallbackAreaId);
if (waitingTasks.isEmpty()) {
log.info("无等待任务,执行人变为空闲: assigneeId={}", assigneeId);
// 发布事件,由业务层更新执行人状态
return DispatchResult.success("无等待任务,任务完成", assigneeId);
return DispatchResult.success("无等待工单,执行人保持空闲", assigneeId);
}
// <EFBFBD><EFBFBD>送第一个等待任务
// 动态总分重排后,派发得分最低的等待工单
OrderQueueDTO nextTask = waitingTasks.get(0);
log.info("推送下一个等待任务: taskId={}, orderId={}", nextTask.getId(), nextTask.getOpsOrderId());
log.info("派发下一单: queueId={}, orderId={}, score={}, floorDiff={}, waitMinutes={}",
nextTask.getId(), nextTask.getOpsOrderId(), nextTask.getQueueScore(),
nextTask.getFloorDiff(), nextTask.getWaitMinutes());
OrderTransitionRequest request = OrderTransitionRequest.builder()
.orderId(nextTask.getOpsOrderId())
@@ -247,15 +218,15 @@ public class DispatchEngineImpl implements DispatchEngine {
.assigneeId(assigneeId)
.operatorType(OperatorTypeEnum.SYSTEM)
.operatorId(assigneeId)
.reason("自动推送下一个任务")
.reason("等待队列动态重排后自动派发")
.build();
OrderTransitionResult result = orderLifecycleManager.transition(request);
if (result.isSuccess()) {
return DispatchResult.success("推送下一个任务", assigneeId);
return DispatchResult.success("按队列总分派发下一单", assigneeId);
} else {
return DispatchResult.fail("推送下一个任务失败: " + result.getMessage());
return DispatchResult.fail("按队列总分派发下一单失败: " + result.getMessage());
}
}
@@ -330,7 +301,7 @@ public class DispatchEngineImpl implements DispatchEngine {
OrderDispatchContext urgentContext) {
ScheduleStrategy strategy = scheduleStrategyRegistry.get(urgentContext.getBusinessType());
if (strategy == null) {
log.warn("未找到调度策略,使用默认断规则: businessType={}", urgentContext.getBusinessType());
log.warn("未找到调度策略,使用默认非抢断规则: businessType={}", urgentContext.getBusinessType());
return defaultInterruptDecision(urgentContext);
}
@@ -343,14 +314,13 @@ public class DispatchEngineImpl implements DispatchEngine {
}
/**
* 默认断决策
* P0任务可以打断任何非P0任务
* 默认非抢断决策
*/
private InterruptDecision defaultInterruptDecision(OrderDispatchContext urgentContext) {
if (urgentContext.isUrgent()) {
return InterruptDecision.allowByDefault();
return InterruptDecision.deny("当前调度不再支持抢断", "工单将按队列总分在下一轮派发");
}
return InterruptDecision.deny("紧急任务优先级不足", "建议等待当前任务完成");
return InterruptDecision.deny("当前调度不再支持抢断", "建议等待当前任务完成后按队列总分派发");
}
// ==================== 私有方法 ====================
@@ -502,41 +472,12 @@ public class DispatchEngineImpl implements DispatchEngine {
}
/**
* 打断并派单
* 历史抢断入口已废弃,统一降级为仅入队等待下一轮动态派发
*/
private DispatchResult executeInterruptAndDispatch(OrderDispatchContext context, Long assigneeId,
Long interruptedOrderId) {
log.warn("执行打断并派单: orderId={}, assigneeId={}, interruptedOrderId={}",
log.warn("检测到过期抢断路径,降级为仅入队: orderId={}, assigneeId={}, interruptedOrderId={}",
context.getOrderId(), assigneeId, interruptedOrderId);
// 先打断当前任务
if (interruptedOrderId != null) {
orderLifecycleManager.interruptOrder(interruptedOrderId, context.getOrderId(), assigneeId);
}
// 派发紧急任务
OrderTransitionRequest request = OrderTransitionRequest.builder()
.orderId(context.getOrderId())
.targetStatus(WorkOrderStatusEnum.DISPATCHED)
.assigneeId(assigneeId)
.assigneeName(context.getRecommendedAssigneeName())
.operatorType(OperatorTypeEnum.SYSTEM)
.operatorId(assigneeId)
.reason("P0紧急任务派单")
.build();
OrderTransitionResult result = orderLifecycleManager.dispatch(request);
if (result.isSuccess()) {
return DispatchResult.success(
"P0紧急任务已派单",
assigneeId,
null,
DispatchPath.INTERRUPT_AND_DISPATCH,
result.getQueueId()
);
} else {
return DispatchResult.fail("P0紧急任务派单失败: " + result.getMessage());
}
return executeEnqueueOnly(context, assigneeId);
}
}

View File

@@ -62,13 +62,8 @@ public class OpsOrderQueueDO extends BaseDO {
/**
* 队列分数(用于排序)
* 计算公式:优先级分 + 时间戳
* - P0: 0 + timestamp
* - P1: 1000000 + timestamp
* - P2: 2000000 + timestamp
* - P3: 3000000 + timestamp
*
* 用于数据库层面的排序,优先级高的排在前面,同优先级按时间排序
* 计算公式:优先级分 + 楼层差分 - 等待老化分
* 分数越小越靠前,用于数据库与 Redis 的一致排序
*/
@TableField("queue_score")
private Double queueScore;

View File

@@ -1,21 +1,26 @@
package com.viewsh.module.ops.service.queue;
import com.viewsh.module.ops.api.queue.OrderQueueDTO;
import com.viewsh.module.ops.api.queue.OrderQueueService;
import com.viewsh.module.ops.dal.dataobject.queue.OpsOrderQueueDO;
import com.viewsh.module.ops.dal.mysql.queue.OpsOrderQueueMapper;
import com.viewsh.module.ops.api.queue.OrderQueueDTO;
import com.viewsh.module.ops.api.queue.OrderQueueService;
import com.viewsh.module.ops.dal.dataobject.area.OpsBusAreaDO;
import com.viewsh.module.ops.dal.dataobject.queue.OpsOrderQueueDO;
import com.viewsh.module.ops.dal.dataobject.workorder.OpsOrderDO;
import com.viewsh.module.ops.dal.mysql.area.OpsBusAreaMapper;
import com.viewsh.module.ops.dal.mysql.queue.OpsOrderQueueMapper;
import com.viewsh.module.ops.dal.mysql.workorder.OpsOrderMapper;
import com.viewsh.module.ops.enums.OrderQueueStatusEnum;
import com.viewsh.module.ops.enums.PriorityEnum;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeanUtils;
import org.springframework.context.annotation.Primary;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.beans.BeanUtils;
import org.springframework.context.annotation.Primary;
import org.springframework.stereotype.Service;
import org.springframework.transaction.support.TransactionSynchronization;
import org.springframework.transaction.support.TransactionSynchronizationManager;
import org.springframework.transaction.annotation.Transactional;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.*;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
@@ -35,27 +40,27 @@ import java.util.stream.Collectors;
@Primary
public class OrderQueueServiceEnhanced implements OrderQueueService {
/**
* Score 计算公式:优先级分数 + 时间戳
* 优先级分P0=0, P1=1000000, P2=2000000, P3=3000000
* 时间戳:秒级时间戳
* 结果:优先级高的排在前面,同优先级按时间排序
*/
private static final Map<Integer, Long> PRIORITY_SCORES = Map.of(
0, 0L, // P0: 0
1, 1000000L, // P1: 1,000,000
2, 2000000L, // P2: 2,000,000
3, 3000000L // P3: 3,000,000
);
@Resource
private OpsOrderQueueMapper orderQueueMapper;
@Resource
private RedisOrderQueueService redisQueueService;
@Resource
private QueueSyncService queueSyncService;
/**
* 队列总分由 QueueScoreCalculator 统一计算:
* 优先级分 + 楼层差分 - 等待老化分。
*/
@Resource
private OpsOrderQueueMapper orderQueueMapper;
@Resource
private OpsOrderMapper orderMapper;
@Resource
private OpsBusAreaMapper areaMapper;
@Resource
private RedisOrderQueueService redisQueueService;
@Resource
private QueueSyncService queueSyncService;
@Resource
private QueueScoreCalculator queueScoreCalculator;
@Override
@Transactional(rollbackFor = Exception.class)
@@ -69,7 +74,11 @@ public class OrderQueueServiceEnhanced implements OrderQueueService {
// 2. 计算队列分数
LocalDateTime now = LocalDateTime.now();
double queueScore = calculateQueueScore(priority.getPriority(), now);
double queueScore = queueScoreCalculator.calculate(QueueScoreContext.builder()
.priority(priority.getPriority())
.enqueueTime(now)
.now(now)
.build()).getTotalScore();
// 3. 创建队列记录MySQL
OpsOrderQueueDO queueDO = OpsOrderQueueDO.builder()
@@ -361,22 +370,20 @@ public class OrderQueueServiceEnhanced implements OrderQueueService {
queueDO.setQueueIndex(calculateNextQueueIndex(newPriority));
// 重新计算队列分数(使用原入队时间保持时间戳不变)
LocalDateTime enqueueTime = queueDO.getEnqueueTime() != null ? queueDO.getEnqueueTime() : LocalDateTime.now();
double newQueueScore = calculateQueueScore(newPriority.getPriority(), enqueueTime);
double newQueueScore = queueScoreCalculator.calculate(QueueScoreContext.builder()
.priority(newPriority.getPriority())
.enqueueTime(enqueueTime)
.now(LocalDateTime.now())
.build()).getTotalScore();
queueDO.setQueueScore(newQueueScore);
queueDO.setEventMessage("优先级调整: " + oldPriority + " -> " + newPriority + ", 原因: " + reason);
int updated = orderQueueMapper.updateById(queueDO);
// 异步更新 Redis
if (updated > 0) {
CompletableFuture.runAsync(() -> {
try {
redisQueueService.updatePriority(queueId, newPriority.getPriority());
} catch (Exception e) {
log.error("Redis 更新优先级失败: queueId={}", queueId, e);
}
});
}
// 事务提交后重算并同步 Redis避免在事务未提交前读到旧数据
if (updated > 0) {
triggerQueueRebuildAfterCommit(queueDO.getUserId(), null);
}
log.info("优先级已调整: queueId={}, opsOrderId={}, oldPriority={}, newPriority={}, reason={}, newScore={}",
queueId, queueDO.getOpsOrderId(), oldPriority, newPriority, reason, newQueueScore);
@@ -487,38 +494,58 @@ public class OrderQueueServiceEnhanced implements OrderQueueService {
}
@Override
public List<OrderQueueDTO> getWaitingTasksByUserIdFromDb(Long userId) {
// 直接从 MySQL 获取所有任务
List<OpsOrderQueueDO> mysqlList = orderQueueMapper.selectListByUserId(userId);
List<OrderQueueDTO> allTasks = convertToDTO(mysqlList);
// 过滤出 WAITING 状态的任务,并按队列分数排序
return filterAndSortWaitingTasks(allTasks);
}
private List<OrderQueueDTO> filterAndSortWaitingTasks(List<OrderQueueDTO> allTasks) {
return allTasks.stream()
.filter(task -> OrderQueueStatusEnum.WAITING.getStatus().equals(task.getQueueStatus()))
.sorted((a, b) -> {
// 优先使用队列分数 score 排序(已在入队时计算好)
if (a.getQueueScore() != null && b.getQueueScore() != null) {
return Double.compare(a.getQueueScore(), b.getQueueScore());
}
// 兜底:如果 score 为空,则按优先级+时间排序
// 按优先级排序P0 > P1 > P2 > P3
int priorityCompare = Integer.compare(
b.getPriority() != null ? b.getPriority() : 999,
a.getPriority() != null ? a.getPriority() : 999
);
if (priorityCompare != 0) {
return priorityCompare;
}
// 优先级相同,按入队时间排序
return a.getEnqueueTime().compareTo(b.getEnqueueTime());
})
.collect(Collectors.toList());
}
public List<OrderQueueDTO> getWaitingTasksByUserIdFromDb(Long userId) {
return rebuildWaitingTasksByUserId(userId, null);
}
@Override
@Transactional(rollbackFor = Exception.class)
public List<OrderQueueDTO> rebuildWaitingTasksByUserId(Long userId, Long fallbackAreaId) {
List<OpsOrderQueueDO> waitingQueues = orderQueueMapper.selectListByUserIdAndStatus(
userId, OrderQueueStatusEnum.WAITING.getStatus());
if (waitingQueues == null || waitingQueues.isEmpty()) {
return Collections.emptyList();
}
Long baselineAreaId = resolveBaselineAreaId(userId, fallbackAreaId);
Integer baseFloorNo = resolveFloorNo(baselineAreaId);
LocalDateTime now = LocalDateTime.now();
List<OrderQueueDTO> rebuiltTasks = new ArrayList<>(waitingQueues.size());
for (OpsOrderQueueDO queueDO : waitingQueues) {
OrderQueueDTO dto = convertToDTO(queueDO);
Integer targetFloorNo = resolveFloorNo(resolveOrderAreaId(queueDO.getOpsOrderId()));
QueueScoreResult result = queueScoreCalculator.calculate(QueueScoreContext.builder()
.priority(queueDO.getPriority())
.baseFloorNo(baseFloorNo)
.targetFloorNo(targetFloorNo)
.enqueueTime(queueDO.getEnqueueTime())
.now(now)
.build());
queueDO.setQueueScore(result.getTotalScore());
orderQueueMapper.updateById(queueDO);
dto.setQueueScore(result.getTotalScore());
dto.setBaseFloorNo(result.getBaseFloorNo());
dto.setTargetFloorNo(result.getTargetFloorNo());
dto.setFloorDiff(result.getFloorDiff());
dto.setWaitMinutes(result.getWaitMinutes());
dto.setScoreUpdateTime(now);
rebuiltTasks.add(dto);
}
rebuiltTasks.sort(this::compareByDynamicScore);
syncUserQueueToRedis(userId, rebuiltTasks);
return rebuiltTasks;
}
private List<OrderQueueDTO> filterAndSortWaitingTasks(List<OrderQueueDTO> allTasks) {
return allTasks.stream()
.filter(task -> OrderQueueStatusEnum.WAITING.getStatus().equals(task.getQueueStatus()))
.sorted(this::compareByDynamicScore)
.collect(Collectors.toList());
}
@Override
public List<OrderQueueDTO> getInterruptedTasksByUserId(Long userId) {
@@ -620,32 +647,9 @@ public class OrderQueueServiceEnhanced implements OrderQueueService {
// ========== 私有方法 ==========
/**
* 计算队列分数(用于排序)
* 公式:优先级分数 + 时间戳
*
* @param priority 优先级0=P0, 1=P1, 2=P2, 3=P3
* @param enqueueTime 入队时间
* @return 队列分数
*/
private double calculateQueueScore(Integer priority, LocalDateTime enqueueTime) {
// 获取优先级分数
long priorityScore = PRIORITY_SCORES.getOrDefault(priority, 3000000L);
// 计算时间戳(秒级)
long timestamp;
if (enqueueTime != null) {
timestamp = enqueueTime.atZone(ZoneId.systemDefault()).toEpochSecond();
} else {
timestamp = System.currentTimeMillis() / 1000;
}
return priorityScore + timestamp;
}
/**
* 计算下一个队列顺序号
*/
/**
* 计算下一个队列顺序号
*/
private Integer calculateNextQueueIndex(PriorityEnum priority) {
// TODO: 查询当前优先级的最大 queueIndex然后 +1
// 这里简化处理,返回默认值
@@ -709,9 +713,106 @@ public class OrderQueueServiceEnhanced implements OrderQueueService {
/**
* 批量转换为 DTO
*/
private List<OrderQueueDTO> convertToDTO(List<OpsOrderQueueDO> list) {
return list.stream()
.map(this::convertToDTO)
.collect(Collectors.toList());
}
}
private List<OrderQueueDTO> convertToDTO(List<OpsOrderQueueDO> list) {
return list.stream()
.map(this::convertToDTO)
.collect(Collectors.toList());
}
private int compareByDynamicScore(OrderQueueDTO a, OrderQueueDTO b) {
int scoreCompare = Double.compare(
a.getQueueScore() != null ? a.getQueueScore() : Double.MAX_VALUE,
b.getQueueScore() != null ? b.getQueueScore() : Double.MAX_VALUE
);
if (scoreCompare != 0) {
return scoreCompare;
}
if (a.getEnqueueTime() != null && b.getEnqueueTime() != null) {
int enqueueCompare = a.getEnqueueTime().compareTo(b.getEnqueueTime());
if (enqueueCompare != 0) {
return enqueueCompare;
}
}
return Long.compare(
a.getId() != null ? a.getId() : Long.MAX_VALUE,
b.getId() != null ? b.getId() : Long.MAX_VALUE
);
}
private Long resolveBaselineAreaId(Long userId, Long fallbackAreaId) {
OpsOrderQueueDO processingQueue = orderQueueMapper.selectCurrentExecutingByUserId(userId);
if (processingQueue != null) {
Long processingAreaId = resolveOrderAreaId(processingQueue.getOpsOrderId());
if (processingAreaId != null) {
return processingAreaId;
}
}
return fallbackAreaId;
}
private Long resolveOrderAreaId(Long orderId) {
OpsOrderDO order = orderMapper.selectById(orderId);
return order != null ? order.getAreaId() : null;
}
private Integer resolveFloorNo(Long areaId) {
if (areaId == null) {
return null;
}
OpsBusAreaDO area = areaMapper.selectById(areaId);
return area != null ? area.getFloorNo() : null;
}
private void syncUserQueueToRedis(Long userId, List<OrderQueueDTO> rebuiltWaitingTasks) {
List<OpsOrderQueueDO> queues = orderQueueMapper.selectListByUserId(userId);
if (queues == null || queues.isEmpty()) {
redisQueueService.clearQueue(userId);
return;
}
Map<Long, OrderQueueDTO> rebuiltTaskMap = rebuiltWaitingTasks == null
? Collections.emptyMap()
: rebuiltWaitingTasks.stream()
.filter(dto -> dto.getId() != null)
.collect(Collectors.toMap(OrderQueueDTO::getId, dto -> dto));
List<OrderQueueDTO> queueDTOs = queues.stream()
.map(this::convertToDTO)
.map(dto -> rebuiltTaskMap.getOrDefault(dto.getId(), dto))
.sorted((a, b) -> {
if (Objects.equals(a.getUserId(), b.getUserId())) {
return compareByDynamicScore(a, b);
}
return Long.compare(
a.getUserId() != null ? a.getUserId() : Long.MAX_VALUE,
b.getUserId() != null ? b.getUserId() : Long.MAX_VALUE
);
})
.collect(Collectors.toList());
redisQueueService.clearQueue(userId);
redisQueueService.batchEnqueue(queueDTOs);
}
private void triggerQueueRebuildAfterCommit(Long userId, Long fallbackAreaId) {
Runnable rebuildAction = () -> {
try {
rebuildWaitingTasksByUserId(userId, fallbackAreaId);
} catch (Exception e) {
log.error("等待队列重算失败: userId={}", userId, e);
}
};
if (!TransactionSynchronizationManager.isSynchronizationActive()) {
rebuildAction.run();
return;
}
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() {
@Override
public void afterCommit() {
rebuildAction.run();
}
});
}
}

View File

@@ -0,0 +1,49 @@
package com.viewsh.module.ops.service.queue;
import org.springframework.stereotype.Component;
import java.time.Duration;
import java.time.LocalDateTime;
@Component
public class QueueScoreCalculator {
static final int PRIORITY_WEIGHT = 1500;
static final int FLOOR_WEIGHT = 20;
static final int AGING_WEIGHT = 5;
static final int MAX_FLOOR_DIFF = 10;
static final int MAX_AGING_MINUTES = 240;
public QueueScoreResult calculate(QueueScoreContext context) {
LocalDateTime now = context.getNow() != null ? context.getNow() : LocalDateTime.now();
int priorityRank = context.getPriority() != null ? context.getPriority() : 3;
Integer baseFloorNo = context.getBaseFloorNo();
Integer targetFloorNo = context.getTargetFloorNo();
Integer floorDiff = null;
int floorDiffScore = 0;
if (baseFloorNo != null && targetFloorNo != null) {
floorDiff = Math.abs(targetFloorNo - baseFloorNo);
floorDiffScore = Math.min(floorDiff, MAX_FLOOR_DIFF) * FLOOR_WEIGHT;
} else if (baseFloorNo != null) {
floorDiffScore = MAX_FLOOR_DIFF * FLOOR_WEIGHT;
}
long waitMinutes = 0;
if (context.getEnqueueTime() != null) {
waitMinutes = Math.max(0, Duration.between(context.getEnqueueTime(), now).toMinutes());
}
long agingScore = (long) Math.min(waitMinutes, MAX_AGING_MINUTES) * AGING_WEIGHT;
long priorityScore = (long) priorityRank * PRIORITY_WEIGHT;
double totalScore = priorityScore + floorDiffScore - agingScore;
return QueueScoreResult.builder()
.totalScore(totalScore)
.baseFloorNo(baseFloorNo)
.targetFloorNo(targetFloorNo)
.floorDiff(floorDiff)
.waitMinutes(waitMinutes)
.build();
}
}

View File

@@ -0,0 +1,21 @@
package com.viewsh.module.ops.service.queue;
import lombok.Builder;
import lombok.Data;
import java.time.LocalDateTime;
@Data
@Builder
public class QueueScoreContext {
private Integer priority;
private Integer baseFloorNo;
private Integer targetFloorNo;
private LocalDateTime enqueueTime;
private LocalDateTime now;
}

View File

@@ -0,0 +1,19 @@
package com.viewsh.module.ops.service.queue;
import lombok.Builder;
import lombok.Data;
@Data
@Builder
public class QueueScoreResult {
private double totalScore;
private Integer baseFloorNo;
private Integer targetFloorNo;
private Integer floorDiff;
private long waitMinutes;
}

View File

@@ -47,7 +47,9 @@ public class RedisOrderQueueServiceImpl implements RedisOrderQueueService {
String infoKey = INFO_KEY_PREFIX + dto.getId();
// 1. 计算分数(优先级 + 时间戳)
double score = calculateScore(dto.getPriority(), dto.getEnqueueTime());
double score = dto.getQueueScore() != null
? dto.getQueueScore()
: calculateScore(dto.getPriority(), dto.getEnqueueTime());
dto.setQueueScore(score);
// 2. 添加到 Sorted Set使用 queueId 作为 member而非 JSON
@@ -84,7 +86,9 @@ public class RedisOrderQueueServiceImpl implements RedisOrderQueueService {
byte[] infoKey = (INFO_KEY_PREFIX + dto.getId()).getBytes();
// 计算分数并设置到 DTO
double score = calculateScore(dto.getPriority(), dto.getEnqueueTime());
double score = dto.getQueueScore() != null
? dto.getQueueScore()
: calculateScore(dto.getPriority(), dto.getEnqueueTime());
dto.setQueueScore(score);
// 添加到 Sorted Set使用 queueId 作为 member
@@ -214,9 +218,17 @@ public class RedisOrderQueueServiceImpl implements RedisOrderQueueService {
public long clearQueue(Long cleanerId) {
try {
String queueKey = QUEUE_KEY_PREFIX + cleanerId;
Set<String> queueIds = stringRedisTemplate.opsForZSet().range(queueKey, 0, -1);
if (queueIds != null && !queueIds.isEmpty()) {
List<String> infoKeys = queueIds.stream()
.map(queueId -> INFO_KEY_PREFIX + queueId)
.collect(Collectors.toList());
stringRedisTemplate.delete(infoKeys);
}
stringRedisTemplate.delete(queueKey);
log.info("Redis 清空队列成功: cleanerId={}", cleanerId);
return 0; // Redis 不返回删除数量
long removedCount = queueIds != null ? queueIds.size() : 0;
log.info("Redis 清空队列成功: cleanerId={}, removedCount={}", cleanerId, removedCount);
return removedCount;
} catch (Exception e) {
log.error("Redis 清空队列失败: cleanerId={}", cleanerId, e);
return 0;
@@ -326,7 +338,9 @@ public class RedisOrderQueueServiceImpl implements RedisOrderQueueService {
String queueKey = QUEUE_KEY_PREFIX + dto.getUserId();
dto.setPriority(newPriority);
double newScore = calculateScore(newPriority, dto.getEnqueueTime());
double newScore = dto.getQueueScore() != null
? dto.getQueueScore()
: calculateScore(newPriority, dto.getEnqueueTime());
// 使用 Lua 脚本原子性更新 Hash 和 Sorted Set
String script =
@@ -451,7 +465,15 @@ public class RedisOrderQueueServiceImpl implements RedisOrderQueueService {
if (opsOrderIdObj != null) {
Long opsOrderId = Long.parseLong(opsOrderIdObj.toString());
if (opsOrderId.equals(orderId)) {
return mapToDto(infoMap);
OrderQueueDTO dto = mapToDto(infoMap);
if (dto == null || dto.getUserId() == null || dto.getId() == null) {
continue;
}
String queueKey = QUEUE_KEY_PREFIX + dto.getUserId();
Double score = stringRedisTemplate.opsForZSet().score(queueKey, dto.getId().toString());
if (score != null) {
return dto;
}
}
}
}
@@ -566,6 +588,21 @@ public class RedisOrderQueueServiceImpl implements RedisOrderQueueService {
if (dto.getEnqueueTime() != null) {
map.put("enqueueTime", dto.getEnqueueTime().toString());
}
if (dto.getBaseFloorNo() != null) {
map.put("baseFloorNo", String.valueOf(dto.getBaseFloorNo()));
}
if (dto.getTargetFloorNo() != null) {
map.put("targetFloorNo", String.valueOf(dto.getTargetFloorNo()));
}
if (dto.getFloorDiff() != null) {
map.put("floorDiff", String.valueOf(dto.getFloorDiff()));
}
if (dto.getWaitMinutes() != null) {
map.put("waitMinutes", String.valueOf(dto.getWaitMinutes()));
}
if (dto.getScoreUpdateTime() != null) {
map.put("scoreUpdateTime", dto.getScoreUpdateTime().toString());
}
return map;
}
@@ -587,6 +624,21 @@ public class RedisOrderQueueServiceImpl implements RedisOrderQueueService {
if (dto.getEnqueueTime() != null) {
map.put("enqueueTime".getBytes(), dto.getEnqueueTime().toString().getBytes());
}
if (dto.getBaseFloorNo() != null) {
map.put("baseFloorNo".getBytes(), String.valueOf(dto.getBaseFloorNo()).getBytes());
}
if (dto.getTargetFloorNo() != null) {
map.put("targetFloorNo".getBytes(), String.valueOf(dto.getTargetFloorNo()).getBytes());
}
if (dto.getFloorDiff() != null) {
map.put("floorDiff".getBytes(), String.valueOf(dto.getFloorDiff()).getBytes());
}
if (dto.getWaitMinutes() != null) {
map.put("waitMinutes".getBytes(), String.valueOf(dto.getWaitMinutes()).getBytes());
}
if (dto.getScoreUpdateTime() != null) {
map.put("scoreUpdateTime".getBytes(), dto.getScoreUpdateTime().toString().getBytes());
}
return map;
}
@@ -633,6 +685,26 @@ public class RedisOrderQueueServiceImpl implements RedisOrderQueueService {
if (enqueueTimeObj != null) {
dto.setEnqueueTime(LocalDateTime.parse(enqueueTimeObj.toString()));
}
Object baseFloorNoObj = map.get("baseFloorNo");
if (baseFloorNoObj != null) {
dto.setBaseFloorNo(Integer.parseInt(baseFloorNoObj.toString()));
}
Object targetFloorNoObj = map.get("targetFloorNo");
if (targetFloorNoObj != null) {
dto.setTargetFloorNo(Integer.parseInt(targetFloorNoObj.toString()));
}
Object floorDiffObj = map.get("floorDiff");
if (floorDiffObj != null) {
dto.setFloorDiff(Integer.parseInt(floorDiffObj.toString()));
}
Object waitMinutesObj = map.get("waitMinutes");
if (waitMinutesObj != null) {
dto.setWaitMinutes(Long.parseLong(waitMinutesObj.toString()));
}
Object scoreUpdateTimeObj = map.get("scoreUpdateTime");
if (scoreUpdateTimeObj != null) {
dto.setScoreUpdateTime(LocalDateTime.parse(scoreUpdateTimeObj.toString()));
}
return dto;
} catch (Exception e) {

View File

@@ -0,0 +1,103 @@
package com.viewsh.module.ops.service.queue;
import com.viewsh.module.ops.api.queue.OrderQueueDTO;
import com.viewsh.module.ops.dal.dataobject.area.OpsBusAreaDO;
import com.viewsh.module.ops.dal.dataobject.queue.OpsOrderQueueDO;
import com.viewsh.module.ops.dal.dataobject.workorder.OpsOrderDO;
import com.viewsh.module.ops.dal.mysql.area.OpsBusAreaMapper;
import com.viewsh.module.ops.dal.mysql.queue.OpsOrderQueueMapper;
import com.viewsh.module.ops.dal.mysql.workorder.OpsOrderMapper;
import com.viewsh.module.ops.enums.OrderQueueStatusEnum;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.Spy;
import org.mockito.junit.jupiter.MockitoExtension;
import java.time.LocalDateTime;
import java.util.List;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
class OrderQueueServiceEnhancedTest {
@Mock
private OpsOrderQueueMapper orderQueueMapper;
@Mock
private OpsOrderMapper orderMapper;
@Mock
private OpsBusAreaMapper areaMapper;
@Mock
private RedisOrderQueueService redisQueueService;
@Mock
private QueueSyncService queueSyncService;
@Spy
private QueueScoreCalculator queueScoreCalculator = new QueueScoreCalculator();
@InjectMocks
private OrderQueueServiceEnhanced orderQueueService;
@Test
void shouldRebuildWaitingTasksAndAvoidStarvation() {
LocalDateTime now = LocalDateTime.now();
Long userId = 2001L;
OpsOrderQueueDO olderFarTask = OpsOrderQueueDO.builder()
.id(11L)
.userId(userId)
.opsOrderId(101L)
.priority(1)
.queueScore(0D)
.queueStatus(OrderQueueStatusEnum.WAITING.getStatus())
.enqueueTime(now.minusMinutes(80))
.build();
OpsOrderQueueDO newerNearTask = OpsOrderQueueDO.builder()
.id(12L)
.userId(userId)
.opsOrderId(102L)
.priority(1)
.queueScore(0D)
.queueStatus(OrderQueueStatusEnum.WAITING.getStatus())
.enqueueTime(now.minusMinutes(5))
.build();
OpsOrderQueueDO currentTask = OpsOrderQueueDO.builder()
.id(13L)
.userId(userId)
.opsOrderId(900L)
.queueStatus(OrderQueueStatusEnum.PROCESSING.getStatus())
.build();
when(orderQueueMapper.selectListByUserIdAndStatus(userId, OrderQueueStatusEnum.WAITING.getStatus()))
.thenReturn(List.of(olderFarTask, newerNearTask));
when(orderQueueMapper.selectCurrentExecutingByUserId(userId)).thenReturn(currentTask);
when(orderQueueMapper.selectListByUserId(userId)).thenReturn(List.of(olderFarTask, newerNearTask, currentTask));
when(orderMapper.selectById(900L)).thenReturn(OpsOrderDO.builder().id(900L).areaId(501L).build());
when(orderMapper.selectById(101L)).thenReturn(OpsOrderDO.builder().id(101L).areaId(503L).build());
when(orderMapper.selectById(102L)).thenReturn(OpsOrderDO.builder().id(102L).areaId(502L).build());
when(areaMapper.selectById(501L)).thenReturn(OpsBusAreaDO.builder().id(501L).floorNo(5).build());
when(areaMapper.selectById(502L)).thenReturn(OpsBusAreaDO.builder().id(502L).floorNo(6).build());
when(areaMapper.selectById(503L)).thenReturn(OpsBusAreaDO.builder().id(503L).floorNo(8).build());
when(orderQueueMapper.updateById(any(OpsOrderQueueDO.class))).thenReturn(1);
List<OrderQueueDTO> rebuiltTasks = orderQueueService.rebuildWaitingTasksByUserId(userId, null);
assertEquals(2, rebuiltTasks.size());
assertEquals(101L, rebuiltTasks.get(0).getOpsOrderId());
assertEquals(3, rebuiltTasks.get(0).getFloorDiff());
assertTrue(rebuiltTasks.get(0).getWaitMinutes() >= 79);
assertEquals(102L, rebuiltTasks.get(1).getOpsOrderId());
assertEquals(1, rebuiltTasks.get(1).getFloorDiff());
assertTrue(rebuiltTasks.get(0).getQueueScore() < rebuiltTasks.get(1).getQueueScore());
verify(orderQueueMapper, times(2)).updateById(any(OpsOrderQueueDO.class));
verify(redisQueueService).clearQueue(userId);
verify(redisQueueService).batchEnqueue(any());
}
}

View File

@@ -0,0 +1,73 @@
package com.viewsh.module.ops.service.queue;
import org.junit.jupiter.api.Test;
import java.time.LocalDateTime;
import static org.junit.jupiter.api.Assertions.assertTrue;
class QueueScoreCalculatorTest {
private final QueueScoreCalculator calculator = new QueueScoreCalculator();
@Test
void shouldPreferSmallerFloorDiffWhenPrioritySame() {
LocalDateTime now = LocalDateTime.now();
QueueScoreResult near = calculator.calculate(QueueScoreContext.builder()
.priority(1)
.baseFloorNo(5)
.targetFloorNo(6)
.enqueueTime(now.minusMinutes(5))
.now(now)
.build());
QueueScoreResult far = calculator.calculate(QueueScoreContext.builder()
.priority(1)
.baseFloorNo(5)
.targetFloorNo(9)
.enqueueTime(now.minusMinutes(5))
.now(now)
.build());
assertTrue(near.getTotalScore() < far.getTotalScore());
}
@Test
void shouldAllowOlderOrderToGainAgingAdvantage() {
LocalDateTime now = LocalDateTime.now();
QueueScoreResult older = calculator.calculate(QueueScoreContext.builder()
.priority(1)
.baseFloorNo(5)
.targetFloorNo(8)
.enqueueTime(now.minusMinutes(80))
.now(now)
.build());
QueueScoreResult newer = calculator.calculate(QueueScoreContext.builder()
.priority(1)
.baseFloorNo(5)
.targetFloorNo(6)
.enqueueTime(now.minusMinutes(5))
.now(now)
.build());
assertTrue(older.getTotalScore() < newer.getTotalScore());
}
@Test
void shouldDegradeGracefullyWhenBaseFloorMissing() {
LocalDateTime now = LocalDateTime.now();
QueueScoreResult result = calculator.calculate(QueueScoreContext.builder()
.priority(2)
.baseFloorNo(null)
.targetFloorNo(6)
.enqueueTime(now.minusMinutes(10))
.now(now)
.build());
assertTrue(result.getTotalScore() < QueueScoreCalculator.PRIORITY_WEIGHT * 2);
}
}