diff --git a/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/service/cleanorder/CleanOrderServiceImpl.java b/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/service/cleanorder/CleanOrderServiceImpl.java index 1dd0195..e0a8308 100644 --- a/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/service/cleanorder/CleanOrderServiceImpl.java +++ b/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/service/cleanorder/CleanOrderServiceImpl.java @@ -245,13 +245,13 @@ public class CleanOrderServiceImpl implements CleanOrderService { if (queueDTO != null) { orderQueueService.adjustPriority(queueDTO.getId(), PriorityEnum.P0, reason); - // 5. 使用新的调度引擎处理P0紧急插队 - DispatchResult result = dispatchEngine.urgentInterrupt(orderId, queueDTO.getUserId()); + // 5. 重算等待队列,P0 不再打断当前任务 + orderQueueService.rebuildWaitingTasksByUserId(queueDTO.getUserId(), order.getAreaId()); // 6. 发送优先级升级通知 cleanOrderEventListener.sendPriorityUpgradeNotification(queueDTO.getUserId(), order.getOrderCode(), orderId); - return result.isSuccess(); + return true; } return true; @@ -289,11 +289,11 @@ public class CleanOrderServiceImpl implements CleanOrderService { if (queueDTO != null) { orderQueueService.adjustPriority(queueDTO.getId(), newPriority, reason); - // 6. 如果升级到 P0,触发紧急打断逻辑 + // 6. 如果升级到 P0,仅重算等待队列,不再触发打断 if (newPriority == PriorityEnum.P0) { - DispatchResult result = dispatchEngine.urgentInterrupt(orderId, queueDTO.getUserId()); + orderQueueService.rebuildWaitingTasksByUserId(queueDTO.getUserId(), order.getAreaId()); cleanOrderEventListener.sendPriorityUpgradeNotification(queueDTO.getUserId(), order.getOrderCode(), orderId); - log.warn("客流升级到P0,触发紧急打断: orderId={}, success={}", orderId, result.isSuccess()); + log.warn("客流升级到P0,已重算等待队列: orderId={}", orderId); } } diff --git a/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/service/dispatch/BadgeDeviceScheduleStrategy.java b/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/service/dispatch/BadgeDeviceScheduleStrategy.java index 0dec654..4e7a3ee 100644 --- a/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/service/dispatch/BadgeDeviceScheduleStrategy.java +++ b/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/service/dispatch/BadgeDeviceScheduleStrategy.java @@ -23,13 +23,13 @@ import java.util.List; *

* 职责:怎么派单 *

- * 策略规则: - *

+ * 策略规则: + * * * @author lzh */ @@ -106,12 +106,12 @@ public class BadgeDeviceScheduleStrategy implements ScheduleStrategy { // P0紧急任务,需要打断 if (assigneeStatus.getCurrentTaskCount() > 0) { Long currentOrderId = deviceStatus.getCurrentOpsOrderId(); - log.warn("决策: INTERRUPT_AND_DISPATCH - P0紧急任务打断当前任务: currentOrderId={}", currentOrderId); - return DispatchDecision.interruptAndDispatch(currentOrderId); - } else { - log.info("决策: DIRECT_DISPATCH - P0紧急任务直接派单"); - return DispatchDecision.directDispatch(); - } + log.info("决策: ENQUEUE_ONLY - P0工单不再打断当前任务,进入等待队列: currentOrderId={}", currentOrderId); + return DispatchDecision.enqueueOnly(); + } else { + log.info("决策: DIRECT_DISPATCH - P0工单在空闲状态下直接派发"); + return DispatchDecision.directDispatch(); + } } else { // 非紧急任务,设备忙碌,入队等待 log.info("决策: ENQUEUE_ONLY - 设备忙碌,任务入队等待"); @@ -133,15 +133,14 @@ public class BadgeDeviceScheduleStrategy implements ScheduleStrategy { // P0任务可以打断任何任务 if (urgentContext.isUrgent()) { - log.warn("允许打断: P0紧急任务可以打断当前任务"); - return InterruptDecision.allowByDefault(); - } - - // P1/P2任务不能打断 - log.info("拒绝打断: 非P0任务不能打断当前任务"); - return InterruptDecision.deny( - "紧急任务优先级不足", - "建议等待当前任务完成" - ); + log.info("拒绝打断: 当前调度已改为非抢占式队列派发"); + return InterruptDecision.deny("当前调度不再支持抢断", "工单将按队列总分在下一轮派发"); + } + + log.info("拒绝打断: 当前调度已改为非抢占式队列派发"); + return InterruptDecision.deny( + "当前调度不再支持抢断", + "工单将按队列总分在下一轮派发" + ); } } diff --git a/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/service/dispatch/CleanerPriorityScheduleStrategy.java b/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/service/dispatch/CleanerPriorityScheduleStrategy.java index 09a9077..d54f9d0 100644 --- a/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/service/dispatch/CleanerPriorityScheduleStrategy.java +++ b/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/service/dispatch/CleanerPriorityScheduleStrategy.java @@ -28,7 +28,7 @@ import java.util.List; *
  • 空闲无任务 → DIRECT_DISPATCH(直接派单)
  • *
  • 空闲有等待 → PUSH_AND_ENQUEUE(推送等待+新任务入队)
  • *
  • 忙碌且非P0 → ENQUEUE_ONLY(仅入队)
  • - *
  • 忙碌且P0 → INTERRUPT_AND_DISPATCH(打断并派单)
  • + *
  • 忙碌且P0 → ENQUEUE_ONLY(不抢断,进入等待队列)
  • * * * @author lzh @@ -106,10 +106,10 @@ public class CleanerPriorityScheduleStrategy implements ScheduleStrategy { // P0紧急任务,需要打断 if (assigneeStatus.getCurrentTaskCount() > 0) { Long currentOrderId = cleanerStatus.getCurrentOpsOrderId(); - log.warn("决策: INTERRUPT_AND_DISPATCH - P0紧急任务打断当前任务: currentOrderId={}", currentOrderId); - return DispatchDecision.interruptAndDispatch(currentOrderId); + log.info("决策: ENQUEUE_ONLY - P0工单不再打断当前任务,进入等待队列: currentOrderId={}", currentOrderId); + return DispatchDecision.enqueueOnly(); } else { - log.info("决策: DIRECT_DISPATCH - P0紧急任务直接派单"); + log.info("决策: DIRECT_DISPATCH - P0工单在空闲状态下直接派发"); return DispatchDecision.directDispatch(); } } else { @@ -133,15 +133,14 @@ public class CleanerPriorityScheduleStrategy implements ScheduleStrategy { // P0任务可以打断任何任务 if (urgentContext.isUrgent()) { - log.warn("允许打断: P0紧急任务可以打断当前任务"); - return InterruptDecision.allowByDefault(); + log.info("拒绝打断: 当前调度已改为非抢占式队列派发"); + return InterruptDecision.deny("当前调度不再支持抢断", "工单将按队列总分在下一轮派发"); } - // P1/P2任务不能打断 - log.info("拒绝打断: 非P0任务不能打断当前任务"); + log.info("拒绝打断: 当前调度已改为非抢占式队列派发"); return InterruptDecision.deny( - "紧急任务优先级不足", - "建议等待当前任务完成" + "当前调度不再支持抢断", + "工单将按队列总分在下一轮派发" ); } } diff --git a/viewsh-module-ops/viewsh-module-environment-biz/src/test/java/com/viewsh/module/ops/environment/service/cleanorder/CleanOrderEndToEndTest.java b/viewsh-module-ops/viewsh-module-environment-biz/src/test/java/com/viewsh/module/ops/environment/service/cleanorder/CleanOrderEndToEndTest.java index 2722bea..de9bda5 100644 --- a/viewsh-module-ops/viewsh-module-environment-biz/src/test/java/com/viewsh/module/ops/environment/service/cleanorder/CleanOrderEndToEndTest.java +++ b/viewsh-module-ops/viewsh-module-environment-biz/src/test/java/com/viewsh/module/ops/environment/service/cleanorder/CleanOrderEndToEndTest.java @@ -9,10 +9,13 @@ import com.viewsh.module.ops.core.event.OrderEventPublisher; import com.viewsh.module.ops.core.lifecycle.OrderLifecycleManager; import com.viewsh.module.ops.core.lifecycle.model.OrderTransitionRequest; import com.viewsh.module.ops.dal.dataobject.workorder.OpsOrderDO; +import com.viewsh.module.ops.dal.dataobject.area.OpsBusAreaDO; +import com.viewsh.module.ops.dal.mysql.area.OpsBusAreaMapper; import com.viewsh.module.ops.dal.mysql.workorder.OpsOrderMapper; import com.viewsh.module.ops.enums.PriorityEnum; import com.viewsh.module.ops.enums.WorkOrderStatusEnum; import com.viewsh.module.ops.environment.dal.dataobject.workorder.OpsOrderCleanExtDO; +import com.viewsh.module.ops.environment.dal.redis.TrafficActiveOrderRedisDAO; import com.viewsh.module.ops.environment.dal.mysql.workorder.OpsOrderCleanExtMapper; import com.viewsh.module.ops.environment.integration.consumer.*; import com.viewsh.framework.common.pojo.CommonResult; @@ -73,6 +76,8 @@ public class CleanOrderEndToEndTest { @Mock private OpsOrderMapper opsOrderMapper; @Mock + private OpsBusAreaMapper opsBusAreaMapper; + @Mock private OpsOrderCleanExtMapper cleanExtMapper; @Mock private OrderIdGenerator orderIdGenerator; @@ -94,6 +99,8 @@ public class CleanOrderEndToEndTest { private ValueOperations valueOperations; @Mock private VoiceBroadcastService voiceBroadcastService; + @Mock + private TrafficActiveOrderRedisDAO trafficActiveOrderRedisDAO; @Mock private BadgeDeviceStatusService badgeDeviceStatusService; @@ -149,6 +156,7 @@ public class CleanOrderEndToEndTest { // 注入 CleanOrderEventListener injectField(cleanOrderService, "cleanOrderEventListener", cleanOrderEventListener); + injectField(cleanOrderService, "opsBusAreaMapper", opsBusAreaMapper); // 注入 CleanOrderAuditEventHandler 依赖 injectField(auditEventHandler, "eventLogRecorder", eventLogRecorder); @@ -156,10 +164,18 @@ public class CleanOrderEndToEndTest { injectField(auditEventHandler, "opsOrderMapper", opsOrderMapper); injectField(auditEventHandler, "stringRedisTemplate", stringRedisTemplate); injectField(auditEventHandler, "objectMapper", objectMapper); + injectField(createEventHandler, "trafficActiveOrderRedisDAO", trafficActiveOrderRedisDAO); // Stub IotDeviceControlApi for resetTrafficCounter lenient().when(iotDeviceControlApi.resetTrafficCounter(any())) .thenReturn(CommonResult.success(true)); + lenient().when(opsBusAreaMapper.selectById(anyLong())) + .thenAnswer(i -> OpsBusAreaDO.builder() + .id(i.getArgument(0)) + .areaName("测试区域") + .parentPath(null) + .floorNo(1) + .build()); } // ========================================== @@ -338,7 +354,7 @@ public class CleanOrderEndToEndTest { verify(eventLogRecorder).record(any()); // 2. TTS sent (orderId can be null for TTS_REQUEST events) - verify(voiceBroadcastService).broadcastInOrder(eq(5001L), contains("请回到作业区域"), eq((Long) null)); + verify(voiceBroadcastService).broadcastDirect(eq(5001L), contains("请回到作业区域"), eq(9), eq((Long) null)); } // ========================================== @@ -378,9 +394,6 @@ public class CleanOrderEndToEndTest { when(opsOrderMapper.selectById(orderId)).thenReturn(order); when(orderQueueService.getByOpsOrderId(orderId)).thenReturn(queueDTO); - when(dispatchEngine.urgentInterrupt(eq(orderId), eq(2001L))) - .thenReturn(DispatchResult.success("Success", 2001L)); - // Execute boolean result = cleanOrderService.upgradePriorityToP0(orderId, "Manual Upgrade"); @@ -392,7 +405,7 @@ public class CleanOrderEndToEndTest { assertEquals(PriorityEnum.P0.getPriority(), orderCaptor.getValue().getPriority()); verify(orderQueueService).adjustPriority(eq(500L), eq(PriorityEnum.P0), anyString()); - verify(dispatchEngine).urgentInterrupt(orderId, 2001L); + verify(orderQueueService).rebuildWaitingTasksByUserId(2001L, order.getAreaId()); verify(cleanOrderEventListener).sendPriorityUpgradeNotification(eq(2001L), eq("WO-P2"), eq(orderId)); } diff --git a/viewsh-module-ops/viewsh-module-environment-biz/src/test/java/com/viewsh/module/ops/environment/service/dispatch/CleanerPriorityScheduleStrategyTest.java b/viewsh-module-ops/viewsh-module-environment-biz/src/test/java/com/viewsh/module/ops/environment/service/dispatch/CleanerPriorityScheduleStrategyTest.java index 9ebb1d2..763b100 100644 --- a/viewsh-module-ops/viewsh-module-environment-biz/src/test/java/com/viewsh/module/ops/environment/service/dispatch/CleanerPriorityScheduleStrategyTest.java +++ b/viewsh-module-ops/viewsh-module-environment-biz/src/test/java/com/viewsh/module/ops/environment/service/dispatch/CleanerPriorityScheduleStrategyTest.java @@ -36,7 +36,7 @@ class CleanerPriorityScheduleStrategyTest { private com.viewsh.module.ops.core.dispatch.DispatchEngine dispatchEngine; @Test - void testDecide_P0_Interrupt() { + void testDecide_P0_EnqueueOnlyWhenBusy() { // Setup OpsCleanerStatusDO c1 = new OpsCleanerStatusDO(); c1.setUserId(1L); @@ -54,8 +54,7 @@ class CleanerPriorityScheduleStrategyTest { DispatchDecision decision = strategy.decide(context); // Verify - assertEquals(DispatchPath.INTERRUPT_AND_DISPATCH, decision.getPath()); - assertEquals(500L, decision.getInterruptedOrderId()); + assertEquals(DispatchPath.ENQUEUE_ONLY, decision.getPath()); } @Test diff --git a/viewsh-module-ops/viewsh-module-ops-api/src/main/java/com/viewsh/module/ops/api/queue/OrderQueueDTO.java b/viewsh-module-ops/viewsh-module-ops-api/src/main/java/com/viewsh/module/ops/api/queue/OrderQueueDTO.java index d1f8f39..7d0f98f 100644 --- a/viewsh-module-ops/viewsh-module-ops-api/src/main/java/com/viewsh/module/ops/api/queue/OrderQueueDTO.java +++ b/viewsh-module-ops/viewsh-module-ops-api/src/main/java/com/viewsh/module/ops/api/queue/OrderQueueDTO.java @@ -40,13 +40,8 @@ public class OrderQueueDTO { /** * 队列分数(用于排序) - * 计算公式:优先级分数 + 时间戳 - * - P0: 0 + timestamp - * - P1: 1000000 + timestamp - * - P2: 2000000 + timestamp - * - P3: 3000000 + timestamp - * - * 用于数据库层面的排序,优先级高的排在前面,同优先级按时间排序 + * 计算公式:优先级分 + 楼层差分 - 等待老化分 + * 分数越小越靠前,用于等待队列的动态重排 */ private Double queueScore; @@ -95,6 +90,31 @@ public class OrderQueueDTO { */ private LocalDateTime updateTime; + /** + * 评分基准楼层 + */ + private Integer baseFloorNo; + + /** + * 目标工单楼层 + */ + private Integer targetFloorNo; + + /** + * 楼层差 + */ + private Integer floorDiff; + + /** + * 等待分钟数 + */ + private Long waitMinutes; + + /** + * 分数更新时间 + */ + private LocalDateTime scoreUpdateTime; + // ========== 兼容旧字段名的getter方法 ========== /** diff --git a/viewsh-module-ops/viewsh-module-ops-api/src/main/java/com/viewsh/module/ops/api/queue/OrderQueueService.java b/viewsh-module-ops/viewsh-module-ops-api/src/main/java/com/viewsh/module/ops/api/queue/OrderQueueService.java index 81c0171..2024ac2 100644 --- a/viewsh-module-ops/viewsh-module-ops-api/src/main/java/com/viewsh/module/ops/api/queue/OrderQueueService.java +++ b/viewsh-module-ops/viewsh-module-ops-api/src/main/java/com/viewsh/module/ops/api/queue/OrderQueueService.java @@ -164,6 +164,15 @@ public interface OrderQueueService { */ List getWaitingTasksByUserIdFromDb(Long userId); + /** + * 按当前上下文重算指定执行人等待队列的总分并返回最新排序结果 + * + * @param userId 执行人ID + * @param fallbackAreaId 当没有执行中工单时可使用的楼层基准区域ID + * @return 已按最新总分排序的 WAITING 工单列表 + */ + List rebuildWaitingTasksByUserId(Long userId, Long fallbackAreaId); + /** * 获取用户的暂停任务列表(PAUSED状态,按暂停时间排序) * diff --git a/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/core/dispatch/DispatchEngineImpl.java b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/core/dispatch/DispatchEngineImpl.java index 7367798..2365319 100644 --- a/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/core/dispatch/DispatchEngineImpl.java +++ b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/core/dispatch/DispatchEngineImpl.java @@ -134,7 +134,7 @@ public class DispatchEngineImpl implements DispatchEngine { @Override @Transactional(rollbackFor = Exception.class) public DispatchResult urgentInterrupt(Long urgentOrderId, Long assigneeId) { - log.warn("开始P0紧急插队: urgentOrderId={}, assigneeId={}", urgentOrderId, assigneeId); + log.warn("处理P0工单派发请求: urgentOrderId={}, assigneeId={}", urgentOrderId, assigneeId); // 查询紧急工单 OpsOrderDO urgentOrder = orderMapper.selectById(urgentOrderId); @@ -156,23 +156,15 @@ public class DispatchEngineImpl implements DispatchEngine { // 查询执行人当前任务 List currentTasks = orderQueueService.getTasksByUserId(assigneeId); - // 如果有正在执行的任务,需要打断 + // 如果有正在执行的任务,不再打断,重排等待队列后等待下一轮派发 OrderQueueDTO currentTask = currentTasks.stream() .filter(t -> OrderQueueStatusEnum.PROCESSING.getStatus().equals(t.getQueueStatus())) .findFirst() .orElse(null); if (currentTask != null && !currentTask.getOpsOrderId().equals(urgentOrderId)) { - // 需要打断当前任务 - log.info("打断当前任务: currentOrderId={}, urgentOrderId={}", - currentTask.getOpsOrderId(), urgentOrderId); - - try { - orderLifecycleManager.interruptOrder( - currentTask.getOpsOrderId(), urgentOrderId, assigneeId); - } catch (Exception e) { - log.warn("打断任务失败: currentOrderId={}", currentTask.getOpsOrderId(), e); - } + orderQueueService.rebuildWaitingTasksByUserId(assigneeId, null); + return DispatchResult.success("P0工单已入队等待,不再打断当前任务", assigneeId); } // 派发紧急任务 @@ -182,63 +174,42 @@ public class DispatchEngineImpl implements DispatchEngine { .assigneeId(assigneeId) .operatorType(OperatorTypeEnum.SYSTEM) .operatorId(assigneeId) - .reason("P0紧急任务派单") + .reason("P0工单直接派发") .build(); OrderTransitionResult result = orderLifecycleManager.dispatch(request); if (result.isSuccess()) { - return DispatchResult.success("P0紧急任务已派单", assigneeId); + return DispatchResult.success("P0工单已直接派发", assigneeId); } else { - return DispatchResult.fail("P0紧急任务派单失败: " + result.getMessage()); + return DispatchResult.fail("P0工单派发失败: " + result.getMessage()); } } @Override @Transactional(rollbackFor = Exception.class) public DispatchResult autoDispatchNext(Long completedOrderId, Long assigneeId) { - log.info("任务完成后自动调度下一个: completedOrderId={}, assigneeId={}", completedOrderId, assigneeId); + log.info("任务完成后自动派发下一单: completedOrderId={}, assigneeId={}", completedOrderId, assigneeId); - // 1. 优先检查是否有被中断的任务 - List interruptedTasks = orderQueueService.getInterruptedTasksByUserId(assigneeId); - - if (!interruptedTasks.isEmpty()) { - // 恢复第一个中断的任务 - OrderQueueDTO interruptedTask = interruptedTasks.get(0); - log.info("恢复中断任务: orderId={}", interruptedTask.getOpsOrderId()); - - OrderTransitionRequest request = OrderTransitionRequest.builder() - .orderId(interruptedTask.getOpsOrderId()) - .targetStatus(WorkOrderStatusEnum.DISPATCHED) - .queueId(interruptedTask.getId()) - .assigneeId(assigneeId) - .operatorType(OperatorTypeEnum.SYSTEM) - .operatorId(assigneeId) - .reason("恢复中断任务") - .build(); - - OrderTransitionResult result = orderLifecycleManager.transition(request); - - if (result.isSuccess()) { - return DispatchResult.success("已恢复中断任务", assigneeId); - } else { - return DispatchResult.fail("恢复中断任务失败: " + result.getMessage()); - } + Long fallbackAreaId = null; + OpsOrderDO completedOrder = orderMapper.selectById(completedOrderId); + if (completedOrder != null) { + fallbackAreaId = completedOrder.getAreaId(); } - // 2. 如果没有中断任务,推送队列中的下一个任务 - // 注意:这里必须从 MySQL 读取最新数据,确保刚完成的任务不在等待列表中 - List waitingTasks = orderQueueService.getWaitingTasksByUserIdFromDb(assigneeId); + List waitingTasks = orderQueueService.rebuildWaitingTasksByUserId(assigneeId, fallbackAreaId); if (waitingTasks.isEmpty()) { log.info("无等待任务,执行人变为空闲: assigneeId={}", assigneeId); // 发布事件,由业务层更新执行人状态 - return DispatchResult.success("无等待任务,任务完成", assigneeId); + return DispatchResult.success("无等待工单,执行人保持空闲", assigneeId); } - // ��送第一个等待任务 + // 动态总分重排后,派发得分最低的等待工单 OrderQueueDTO nextTask = waitingTasks.get(0); - log.info("推送下一个等待任务: taskId={}, orderId={}", nextTask.getId(), nextTask.getOpsOrderId()); + log.info("派发下一单: queueId={}, orderId={}, score={}, floorDiff={}, waitMinutes={}", + nextTask.getId(), nextTask.getOpsOrderId(), nextTask.getQueueScore(), + nextTask.getFloorDiff(), nextTask.getWaitMinutes()); OrderTransitionRequest request = OrderTransitionRequest.builder() .orderId(nextTask.getOpsOrderId()) @@ -247,15 +218,15 @@ public class DispatchEngineImpl implements DispatchEngine { .assigneeId(assigneeId) .operatorType(OperatorTypeEnum.SYSTEM) .operatorId(assigneeId) - .reason("自动推送下一个任务") + .reason("等待队列动态重排后自动派发") .build(); OrderTransitionResult result = orderLifecycleManager.transition(request); if (result.isSuccess()) { - return DispatchResult.success("已推送下一个任务", assigneeId); + return DispatchResult.success("已按队列总分派发下一单", assigneeId); } else { - return DispatchResult.fail("推送下一个任务失败: " + result.getMessage()); + return DispatchResult.fail("按队列总分派发下一单失败: " + result.getMessage()); } } @@ -330,7 +301,7 @@ public class DispatchEngineImpl implements DispatchEngine { OrderDispatchContext urgentContext) { ScheduleStrategy strategy = scheduleStrategyRegistry.get(urgentContext.getBusinessType()); if (strategy == null) { - log.warn("未找到调度策略,使用默认打断规则: businessType={}", urgentContext.getBusinessType()); + log.warn("未找到调度策略,使用默认非抢断规则: businessType={}", urgentContext.getBusinessType()); return defaultInterruptDecision(urgentContext); } @@ -343,14 +314,13 @@ public class DispatchEngineImpl implements DispatchEngine { } /** - * 默认打断决策 - * P0任务可以打断任何非P0任务 + * 默认非抢断决策 */ private InterruptDecision defaultInterruptDecision(OrderDispatchContext urgentContext) { if (urgentContext.isUrgent()) { - return InterruptDecision.allowByDefault(); + return InterruptDecision.deny("当前调度不再支持抢断", "工单将按队列总分在下一轮派发"); } - return InterruptDecision.deny("紧急任务优先级不足", "建议等待当前任务完成"); + return InterruptDecision.deny("当前调度不再支持抢断", "建议等待当前任务完成后按队列总分派发"); } // ==================== 私有方法 ==================== @@ -502,41 +472,12 @@ public class DispatchEngineImpl implements DispatchEngine { } /** - * 打断并派单 + * 历史抢断入口已废弃,统一降级为仅入队等待下一轮动态派发 */ private DispatchResult executeInterruptAndDispatch(OrderDispatchContext context, Long assigneeId, Long interruptedOrderId) { - log.warn("执行打断并派单: orderId={}, assigneeId={}, interruptedOrderId={}", + log.warn("检测到过期抢断路径,降级为仅入队: orderId={}, assigneeId={}, interruptedOrderId={}", context.getOrderId(), assigneeId, interruptedOrderId); - - // 先打断当前任务 - if (interruptedOrderId != null) { - orderLifecycleManager.interruptOrder(interruptedOrderId, context.getOrderId(), assigneeId); - } - - // 派发紧急任务 - OrderTransitionRequest request = OrderTransitionRequest.builder() - .orderId(context.getOrderId()) - .targetStatus(WorkOrderStatusEnum.DISPATCHED) - .assigneeId(assigneeId) - .assigneeName(context.getRecommendedAssigneeName()) - .operatorType(OperatorTypeEnum.SYSTEM) - .operatorId(assigneeId) - .reason("P0紧急任务派单") - .build(); - - OrderTransitionResult result = orderLifecycleManager.dispatch(request); - - if (result.isSuccess()) { - return DispatchResult.success( - "P0紧急任务已派单", - assigneeId, - null, - DispatchPath.INTERRUPT_AND_DISPATCH, - result.getQueueId() - ); - } else { - return DispatchResult.fail("P0紧急任务派单失败: " + result.getMessage()); - } + return executeEnqueueOnly(context, assigneeId); } } diff --git a/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/dal/dataobject/queue/OpsOrderQueueDO.java b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/dal/dataobject/queue/OpsOrderQueueDO.java index ab5c54e..7ccc413 100644 --- a/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/dal/dataobject/queue/OpsOrderQueueDO.java +++ b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/dal/dataobject/queue/OpsOrderQueueDO.java @@ -62,13 +62,8 @@ public class OpsOrderQueueDO extends BaseDO { /** * 队列分数(用于排序) - * 计算公式:优先级分数 + 时间戳 - * - P0: 0 + timestamp - * - P1: 1000000 + timestamp - * - P2: 2000000 + timestamp - * - P3: 3000000 + timestamp - * - * 用于数据库层面的排序,优先级高的排在前面,同优先级按时间排序 + * 计算公式:优先级分 + 楼层差分 - 等待老化分 + * 分数越小越靠前,用于数据库与 Redis 的一致排序 */ @TableField("queue_score") private Double queueScore; diff --git a/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/queue/OrderQueueServiceEnhanced.java b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/queue/OrderQueueServiceEnhanced.java index bf2dca6..aeb3fc2 100644 --- a/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/queue/OrderQueueServiceEnhanced.java +++ b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/queue/OrderQueueServiceEnhanced.java @@ -1,21 +1,26 @@ package com.viewsh.module.ops.service.queue; -import com.viewsh.module.ops.api.queue.OrderQueueDTO; -import com.viewsh.module.ops.api.queue.OrderQueueService; -import com.viewsh.module.ops.dal.dataobject.queue.OpsOrderQueueDO; -import com.viewsh.module.ops.dal.mysql.queue.OpsOrderQueueMapper; +import com.viewsh.module.ops.api.queue.OrderQueueDTO; +import com.viewsh.module.ops.api.queue.OrderQueueService; +import com.viewsh.module.ops.dal.dataobject.area.OpsBusAreaDO; +import com.viewsh.module.ops.dal.dataobject.queue.OpsOrderQueueDO; +import com.viewsh.module.ops.dal.dataobject.workorder.OpsOrderDO; +import com.viewsh.module.ops.dal.mysql.area.OpsBusAreaMapper; +import com.viewsh.module.ops.dal.mysql.queue.OpsOrderQueueMapper; +import com.viewsh.module.ops.dal.mysql.workorder.OpsOrderMapper; import com.viewsh.module.ops.enums.OrderQueueStatusEnum; import com.viewsh.module.ops.enums.PriorityEnum; import jakarta.annotation.Resource; import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.BeanUtils; -import org.springframework.context.annotation.Primary; -import org.springframework.stereotype.Service; -import org.springframework.transaction.annotation.Transactional; +import org.springframework.beans.BeanUtils; +import org.springframework.context.annotation.Primary; +import org.springframework.stereotype.Service; +import org.springframework.transaction.support.TransactionSynchronization; +import org.springframework.transaction.support.TransactionSynchronizationManager; +import org.springframework.transaction.annotation.Transactional; import java.time.LocalDateTime; -import java.time.ZoneId; -import java.util.*; +import java.util.*; import java.util.concurrent.CompletableFuture; import java.util.stream.Collectors; @@ -35,27 +40,27 @@ import java.util.stream.Collectors; @Primary public class OrderQueueServiceEnhanced implements OrderQueueService { - /** - * Score 计算公式:优先级分数 + 时间戳 - * 优先级分数:P0=0, P1=1000000, P2=2000000, P3=3000000 - * 时间戳:秒级时间戳 - * 结果:优先级高的排在前面,同优先级按时间排序 - */ - private static final Map PRIORITY_SCORES = Map.of( - 0, 0L, // P0: 0 - 1, 1000000L, // P1: 1,000,000 - 2, 2000000L, // P2: 2,000,000 - 3, 3000000L // P3: 3,000,000 - ); - - @Resource - private OpsOrderQueueMapper orderQueueMapper; - - @Resource - private RedisOrderQueueService redisQueueService; - - @Resource - private QueueSyncService queueSyncService; + /** + * 队列总分由 QueueScoreCalculator 统一计算: + * 优先级分 + 楼层差分 - 等待老化分。 + */ + @Resource + private OpsOrderQueueMapper orderQueueMapper; + + @Resource + private OpsOrderMapper orderMapper; + + @Resource + private OpsBusAreaMapper areaMapper; + + @Resource + private RedisOrderQueueService redisQueueService; + + @Resource + private QueueSyncService queueSyncService; + + @Resource + private QueueScoreCalculator queueScoreCalculator; @Override @Transactional(rollbackFor = Exception.class) @@ -69,7 +74,11 @@ public class OrderQueueServiceEnhanced implements OrderQueueService { // 2. 计算队列分数 LocalDateTime now = LocalDateTime.now(); - double queueScore = calculateQueueScore(priority.getPriority(), now); + double queueScore = queueScoreCalculator.calculate(QueueScoreContext.builder() + .priority(priority.getPriority()) + .enqueueTime(now) + .now(now) + .build()).getTotalScore(); // 3. 创建队列记录(MySQL) OpsOrderQueueDO queueDO = OpsOrderQueueDO.builder() @@ -361,22 +370,20 @@ public class OrderQueueServiceEnhanced implements OrderQueueService { queueDO.setQueueIndex(calculateNextQueueIndex(newPriority)); // 重新计算队列分数(使用原入队时间保持时间戳不变) LocalDateTime enqueueTime = queueDO.getEnqueueTime() != null ? queueDO.getEnqueueTime() : LocalDateTime.now(); - double newQueueScore = calculateQueueScore(newPriority.getPriority(), enqueueTime); + double newQueueScore = queueScoreCalculator.calculate(QueueScoreContext.builder() + .priority(newPriority.getPriority()) + .enqueueTime(enqueueTime) + .now(LocalDateTime.now()) + .build()).getTotalScore(); queueDO.setQueueScore(newQueueScore); queueDO.setEventMessage("优先级调整: " + oldPriority + " -> " + newPriority + ", 原因: " + reason); int updated = orderQueueMapper.updateById(queueDO); - // 异步更新 Redis - if (updated > 0) { - CompletableFuture.runAsync(() -> { - try { - redisQueueService.updatePriority(queueId, newPriority.getPriority()); - } catch (Exception e) { - log.error("Redis 更新优先级失败: queueId={}", queueId, e); - } - }); - } + // 事务提交后重算并同步 Redis,避免在事务未提交前读到旧数据 + if (updated > 0) { + triggerQueueRebuildAfterCommit(queueDO.getUserId(), null); + } log.info("优先级已调整: queueId={}, opsOrderId={}, oldPriority={}, newPriority={}, reason={}, newScore={}", queueId, queueDO.getOpsOrderId(), oldPriority, newPriority, reason, newQueueScore); @@ -487,38 +494,58 @@ public class OrderQueueServiceEnhanced implements OrderQueueService { } @Override - public List getWaitingTasksByUserIdFromDb(Long userId) { - // 直接从 MySQL 获取所有任务 - List mysqlList = orderQueueMapper.selectListByUserId(userId); - List allTasks = convertToDTO(mysqlList); - - // 过滤出 WAITING 状态的任务,并按队列分数排序 - return filterAndSortWaitingTasks(allTasks); - } - - private List filterAndSortWaitingTasks(List allTasks) { - return allTasks.stream() - .filter(task -> OrderQueueStatusEnum.WAITING.getStatus().equals(task.getQueueStatus())) - .sorted((a, b) -> { - // 优先使用队列分数 score 排序(已在入队时计算好) - if (a.getQueueScore() != null && b.getQueueScore() != null) { - return Double.compare(a.getQueueScore(), b.getQueueScore()); - } - - // 兜底:如果 score 为空,则按优先级+时间排序 - // 按优先级排序(P0 > P1 > P2 > P3) - int priorityCompare = Integer.compare( - b.getPriority() != null ? b.getPriority() : 999, - a.getPriority() != null ? a.getPriority() : 999 - ); - if (priorityCompare != 0) { - return priorityCompare; - } - // 优先级相同,按入队时间排序 - return a.getEnqueueTime().compareTo(b.getEnqueueTime()); - }) - .collect(Collectors.toList()); - } + public List getWaitingTasksByUserIdFromDb(Long userId) { + return rebuildWaitingTasksByUserId(userId, null); + } + + @Override + @Transactional(rollbackFor = Exception.class) + public List rebuildWaitingTasksByUserId(Long userId, Long fallbackAreaId) { + List waitingQueues = orderQueueMapper.selectListByUserIdAndStatus( + userId, OrderQueueStatusEnum.WAITING.getStatus()); + if (waitingQueues == null || waitingQueues.isEmpty()) { + return Collections.emptyList(); + } + + Long baselineAreaId = resolveBaselineAreaId(userId, fallbackAreaId); + Integer baseFloorNo = resolveFloorNo(baselineAreaId); + LocalDateTime now = LocalDateTime.now(); + + List rebuiltTasks = new ArrayList<>(waitingQueues.size()); + for (OpsOrderQueueDO queueDO : waitingQueues) { + OrderQueueDTO dto = convertToDTO(queueDO); + Integer targetFloorNo = resolveFloorNo(resolveOrderAreaId(queueDO.getOpsOrderId())); + QueueScoreResult result = queueScoreCalculator.calculate(QueueScoreContext.builder() + .priority(queueDO.getPriority()) + .baseFloorNo(baseFloorNo) + .targetFloorNo(targetFloorNo) + .enqueueTime(queueDO.getEnqueueTime()) + .now(now) + .build()); + + queueDO.setQueueScore(result.getTotalScore()); + orderQueueMapper.updateById(queueDO); + + dto.setQueueScore(result.getTotalScore()); + dto.setBaseFloorNo(result.getBaseFloorNo()); + dto.setTargetFloorNo(result.getTargetFloorNo()); + dto.setFloorDiff(result.getFloorDiff()); + dto.setWaitMinutes(result.getWaitMinutes()); + dto.setScoreUpdateTime(now); + rebuiltTasks.add(dto); + } + + rebuiltTasks.sort(this::compareByDynamicScore); + syncUserQueueToRedis(userId, rebuiltTasks); + return rebuiltTasks; + } + + private List filterAndSortWaitingTasks(List allTasks) { + return allTasks.stream() + .filter(task -> OrderQueueStatusEnum.WAITING.getStatus().equals(task.getQueueStatus())) + .sorted(this::compareByDynamicScore) + .collect(Collectors.toList()); + } @Override public List getInterruptedTasksByUserId(Long userId) { @@ -620,32 +647,9 @@ public class OrderQueueServiceEnhanced implements OrderQueueService { // ========== 私有方法 ========== - /** - * 计算队列分数(用于排序) - * 公式:优先级分数 + 时间戳 - * - * @param priority 优先级(0=P0, 1=P1, 2=P2, 3=P3) - * @param enqueueTime 入队时间 - * @return 队列分数 - */ - private double calculateQueueScore(Integer priority, LocalDateTime enqueueTime) { - // 获取优先级分数 - long priorityScore = PRIORITY_SCORES.getOrDefault(priority, 3000000L); - - // 计算时间戳(秒级) - long timestamp; - if (enqueueTime != null) { - timestamp = enqueueTime.atZone(ZoneId.systemDefault()).toEpochSecond(); - } else { - timestamp = System.currentTimeMillis() / 1000; - } - - return priorityScore + timestamp; - } - - /** - * 计算下一个队列顺序号 - */ + /** + * 计算下一个队列顺序号 + */ private Integer calculateNextQueueIndex(PriorityEnum priority) { // TODO: 查询当前优先级的最大 queueIndex,然后 +1 // 这里简化处理,返回默认值 @@ -709,9 +713,106 @@ public class OrderQueueServiceEnhanced implements OrderQueueService { /** * 批量转换为 DTO */ - private List convertToDTO(List list) { - return list.stream() - .map(this::convertToDTO) - .collect(Collectors.toList()); - } -} + private List convertToDTO(List list) { + return list.stream() + .map(this::convertToDTO) + .collect(Collectors.toList()); + } + + private int compareByDynamicScore(OrderQueueDTO a, OrderQueueDTO b) { + int scoreCompare = Double.compare( + a.getQueueScore() != null ? a.getQueueScore() : Double.MAX_VALUE, + b.getQueueScore() != null ? b.getQueueScore() : Double.MAX_VALUE + ); + if (scoreCompare != 0) { + return scoreCompare; + } + if (a.getEnqueueTime() != null && b.getEnqueueTime() != null) { + int enqueueCompare = a.getEnqueueTime().compareTo(b.getEnqueueTime()); + if (enqueueCompare != 0) { + return enqueueCompare; + } + } + return Long.compare( + a.getId() != null ? a.getId() : Long.MAX_VALUE, + b.getId() != null ? b.getId() : Long.MAX_VALUE + ); + } + + private Long resolveBaselineAreaId(Long userId, Long fallbackAreaId) { + OpsOrderQueueDO processingQueue = orderQueueMapper.selectCurrentExecutingByUserId(userId); + if (processingQueue != null) { + Long processingAreaId = resolveOrderAreaId(processingQueue.getOpsOrderId()); + if (processingAreaId != null) { + return processingAreaId; + } + } + return fallbackAreaId; + } + + private Long resolveOrderAreaId(Long orderId) { + OpsOrderDO order = orderMapper.selectById(orderId); + return order != null ? order.getAreaId() : null; + } + + private Integer resolveFloorNo(Long areaId) { + if (areaId == null) { + return null; + } + OpsBusAreaDO area = areaMapper.selectById(areaId); + return area != null ? area.getFloorNo() : null; + } + + private void syncUserQueueToRedis(Long userId, List rebuiltWaitingTasks) { + List queues = orderQueueMapper.selectListByUserId(userId); + if (queues == null || queues.isEmpty()) { + redisQueueService.clearQueue(userId); + return; + } + + Map rebuiltTaskMap = rebuiltWaitingTasks == null + ? Collections.emptyMap() + : rebuiltWaitingTasks.stream() + .filter(dto -> dto.getId() != null) + .collect(Collectors.toMap(OrderQueueDTO::getId, dto -> dto)); + + List queueDTOs = queues.stream() + .map(this::convertToDTO) + .map(dto -> rebuiltTaskMap.getOrDefault(dto.getId(), dto)) + .sorted((a, b) -> { + if (Objects.equals(a.getUserId(), b.getUserId())) { + return compareByDynamicScore(a, b); + } + return Long.compare( + a.getUserId() != null ? a.getUserId() : Long.MAX_VALUE, + b.getUserId() != null ? b.getUserId() : Long.MAX_VALUE + ); + }) + .collect(Collectors.toList()); + + redisQueueService.clearQueue(userId); + redisQueueService.batchEnqueue(queueDTOs); + } + + private void triggerQueueRebuildAfterCommit(Long userId, Long fallbackAreaId) { + Runnable rebuildAction = () -> { + try { + rebuildWaitingTasksByUserId(userId, fallbackAreaId); + } catch (Exception e) { + log.error("等待队列重算失败: userId={}", userId, e); + } + }; + + if (!TransactionSynchronizationManager.isSynchronizationActive()) { + rebuildAction.run(); + return; + } + + TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() { + @Override + public void afterCommit() { + rebuildAction.run(); + } + }); + } +} diff --git a/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/queue/QueueScoreCalculator.java b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/queue/QueueScoreCalculator.java new file mode 100644 index 0000000..dd37022 --- /dev/null +++ b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/queue/QueueScoreCalculator.java @@ -0,0 +1,49 @@ +package com.viewsh.module.ops.service.queue; + +import org.springframework.stereotype.Component; + +import java.time.Duration; +import java.time.LocalDateTime; + +@Component +public class QueueScoreCalculator { + + static final int PRIORITY_WEIGHT = 1500; + static final int FLOOR_WEIGHT = 20; + static final int AGING_WEIGHT = 5; + static final int MAX_FLOOR_DIFF = 10; + static final int MAX_AGING_MINUTES = 240; + + public QueueScoreResult calculate(QueueScoreContext context) { + LocalDateTime now = context.getNow() != null ? context.getNow() : LocalDateTime.now(); + int priorityRank = context.getPriority() != null ? context.getPriority() : 3; + + Integer baseFloorNo = context.getBaseFloorNo(); + Integer targetFloorNo = context.getTargetFloorNo(); + Integer floorDiff = null; + int floorDiffScore = 0; + if (baseFloorNo != null && targetFloorNo != null) { + floorDiff = Math.abs(targetFloorNo - baseFloorNo); + floorDiffScore = Math.min(floorDiff, MAX_FLOOR_DIFF) * FLOOR_WEIGHT; + } else if (baseFloorNo != null) { + floorDiffScore = MAX_FLOOR_DIFF * FLOOR_WEIGHT; + } + + long waitMinutes = 0; + if (context.getEnqueueTime() != null) { + waitMinutes = Math.max(0, Duration.between(context.getEnqueueTime(), now).toMinutes()); + } + long agingScore = (long) Math.min(waitMinutes, MAX_AGING_MINUTES) * AGING_WEIGHT; + + long priorityScore = (long) priorityRank * PRIORITY_WEIGHT; + double totalScore = priorityScore + floorDiffScore - agingScore; + + return QueueScoreResult.builder() + .totalScore(totalScore) + .baseFloorNo(baseFloorNo) + .targetFloorNo(targetFloorNo) + .floorDiff(floorDiff) + .waitMinutes(waitMinutes) + .build(); + } +} diff --git a/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/queue/QueueScoreContext.java b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/queue/QueueScoreContext.java new file mode 100644 index 0000000..d55d86a --- /dev/null +++ b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/queue/QueueScoreContext.java @@ -0,0 +1,21 @@ +package com.viewsh.module.ops.service.queue; + +import lombok.Builder; +import lombok.Data; + +import java.time.LocalDateTime; + +@Data +@Builder +public class QueueScoreContext { + + private Integer priority; + + private Integer baseFloorNo; + + private Integer targetFloorNo; + + private LocalDateTime enqueueTime; + + private LocalDateTime now; +} diff --git a/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/queue/QueueScoreResult.java b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/queue/QueueScoreResult.java new file mode 100644 index 0000000..8263ec3 --- /dev/null +++ b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/queue/QueueScoreResult.java @@ -0,0 +1,19 @@ +package com.viewsh.module.ops.service.queue; + +import lombok.Builder; +import lombok.Data; + +@Data +@Builder +public class QueueScoreResult { + + private double totalScore; + + private Integer baseFloorNo; + + private Integer targetFloorNo; + + private Integer floorDiff; + + private long waitMinutes; +} diff --git a/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/queue/RedisOrderQueueServiceImpl.java b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/queue/RedisOrderQueueServiceImpl.java index b9a2537..c368e3c 100644 --- a/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/queue/RedisOrderQueueServiceImpl.java +++ b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/queue/RedisOrderQueueServiceImpl.java @@ -47,7 +47,9 @@ public class RedisOrderQueueServiceImpl implements RedisOrderQueueService { String infoKey = INFO_KEY_PREFIX + dto.getId(); // 1. 计算分数(优先级 + 时间戳) - double score = calculateScore(dto.getPriority(), dto.getEnqueueTime()); + double score = dto.getQueueScore() != null + ? dto.getQueueScore() + : calculateScore(dto.getPriority(), dto.getEnqueueTime()); dto.setQueueScore(score); // 2. 添加到 Sorted Set(使用 queueId 作为 member,而非 JSON) @@ -84,7 +86,9 @@ public class RedisOrderQueueServiceImpl implements RedisOrderQueueService { byte[] infoKey = (INFO_KEY_PREFIX + dto.getId()).getBytes(); // 计算分数并设置到 DTO - double score = calculateScore(dto.getPriority(), dto.getEnqueueTime()); + double score = dto.getQueueScore() != null + ? dto.getQueueScore() + : calculateScore(dto.getPriority(), dto.getEnqueueTime()); dto.setQueueScore(score); // 添加到 Sorted Set(使用 queueId 作为 member) @@ -214,9 +218,17 @@ public class RedisOrderQueueServiceImpl implements RedisOrderQueueService { public long clearQueue(Long cleanerId) { try { String queueKey = QUEUE_KEY_PREFIX + cleanerId; + Set queueIds = stringRedisTemplate.opsForZSet().range(queueKey, 0, -1); + if (queueIds != null && !queueIds.isEmpty()) { + List infoKeys = queueIds.stream() + .map(queueId -> INFO_KEY_PREFIX + queueId) + .collect(Collectors.toList()); + stringRedisTemplate.delete(infoKeys); + } stringRedisTemplate.delete(queueKey); - log.info("Redis 清空队列成功: cleanerId={}", cleanerId); - return 0; // Redis 不返回删除数量 + long removedCount = queueIds != null ? queueIds.size() : 0; + log.info("Redis 清空队列成功: cleanerId={}, removedCount={}", cleanerId, removedCount); + return removedCount; } catch (Exception e) { log.error("Redis 清空队列失败: cleanerId={}", cleanerId, e); return 0; @@ -326,7 +338,9 @@ public class RedisOrderQueueServiceImpl implements RedisOrderQueueService { String queueKey = QUEUE_KEY_PREFIX + dto.getUserId(); dto.setPriority(newPriority); - double newScore = calculateScore(newPriority, dto.getEnqueueTime()); + double newScore = dto.getQueueScore() != null + ? dto.getQueueScore() + : calculateScore(newPriority, dto.getEnqueueTime()); // 使用 Lua 脚本原子性更新 Hash 和 Sorted Set String script = @@ -451,7 +465,15 @@ public class RedisOrderQueueServiceImpl implements RedisOrderQueueService { if (opsOrderIdObj != null) { Long opsOrderId = Long.parseLong(opsOrderIdObj.toString()); if (opsOrderId.equals(orderId)) { - return mapToDto(infoMap); + OrderQueueDTO dto = mapToDto(infoMap); + if (dto == null || dto.getUserId() == null || dto.getId() == null) { + continue; + } + String queueKey = QUEUE_KEY_PREFIX + dto.getUserId(); + Double score = stringRedisTemplate.opsForZSet().score(queueKey, dto.getId().toString()); + if (score != null) { + return dto; + } } } } @@ -566,6 +588,21 @@ public class RedisOrderQueueServiceImpl implements RedisOrderQueueService { if (dto.getEnqueueTime() != null) { map.put("enqueueTime", dto.getEnqueueTime().toString()); } + if (dto.getBaseFloorNo() != null) { + map.put("baseFloorNo", String.valueOf(dto.getBaseFloorNo())); + } + if (dto.getTargetFloorNo() != null) { + map.put("targetFloorNo", String.valueOf(dto.getTargetFloorNo())); + } + if (dto.getFloorDiff() != null) { + map.put("floorDiff", String.valueOf(dto.getFloorDiff())); + } + if (dto.getWaitMinutes() != null) { + map.put("waitMinutes", String.valueOf(dto.getWaitMinutes())); + } + if (dto.getScoreUpdateTime() != null) { + map.put("scoreUpdateTime", dto.getScoreUpdateTime().toString()); + } return map; } @@ -587,6 +624,21 @@ public class RedisOrderQueueServiceImpl implements RedisOrderQueueService { if (dto.getEnqueueTime() != null) { map.put("enqueueTime".getBytes(), dto.getEnqueueTime().toString().getBytes()); } + if (dto.getBaseFloorNo() != null) { + map.put("baseFloorNo".getBytes(), String.valueOf(dto.getBaseFloorNo()).getBytes()); + } + if (dto.getTargetFloorNo() != null) { + map.put("targetFloorNo".getBytes(), String.valueOf(dto.getTargetFloorNo()).getBytes()); + } + if (dto.getFloorDiff() != null) { + map.put("floorDiff".getBytes(), String.valueOf(dto.getFloorDiff()).getBytes()); + } + if (dto.getWaitMinutes() != null) { + map.put("waitMinutes".getBytes(), String.valueOf(dto.getWaitMinutes()).getBytes()); + } + if (dto.getScoreUpdateTime() != null) { + map.put("scoreUpdateTime".getBytes(), dto.getScoreUpdateTime().toString().getBytes()); + } return map; } @@ -633,6 +685,26 @@ public class RedisOrderQueueServiceImpl implements RedisOrderQueueService { if (enqueueTimeObj != null) { dto.setEnqueueTime(LocalDateTime.parse(enqueueTimeObj.toString())); } + Object baseFloorNoObj = map.get("baseFloorNo"); + if (baseFloorNoObj != null) { + dto.setBaseFloorNo(Integer.parseInt(baseFloorNoObj.toString())); + } + Object targetFloorNoObj = map.get("targetFloorNo"); + if (targetFloorNoObj != null) { + dto.setTargetFloorNo(Integer.parseInt(targetFloorNoObj.toString())); + } + Object floorDiffObj = map.get("floorDiff"); + if (floorDiffObj != null) { + dto.setFloorDiff(Integer.parseInt(floorDiffObj.toString())); + } + Object waitMinutesObj = map.get("waitMinutes"); + if (waitMinutesObj != null) { + dto.setWaitMinutes(Long.parseLong(waitMinutesObj.toString())); + } + Object scoreUpdateTimeObj = map.get("scoreUpdateTime"); + if (scoreUpdateTimeObj != null) { + dto.setScoreUpdateTime(LocalDateTime.parse(scoreUpdateTimeObj.toString())); + } return dto; } catch (Exception e) { diff --git a/viewsh-module-ops/viewsh-module-ops-biz/src/test/java/com/viewsh/module/ops/service/queue/OrderQueueServiceEnhancedTest.java b/viewsh-module-ops/viewsh-module-ops-biz/src/test/java/com/viewsh/module/ops/service/queue/OrderQueueServiceEnhancedTest.java new file mode 100644 index 0000000..19f5885 --- /dev/null +++ b/viewsh-module-ops/viewsh-module-ops-biz/src/test/java/com/viewsh/module/ops/service/queue/OrderQueueServiceEnhancedTest.java @@ -0,0 +1,103 @@ +package com.viewsh.module.ops.service.queue; + +import com.viewsh.module.ops.api.queue.OrderQueueDTO; +import com.viewsh.module.ops.dal.dataobject.area.OpsBusAreaDO; +import com.viewsh.module.ops.dal.dataobject.queue.OpsOrderQueueDO; +import com.viewsh.module.ops.dal.dataobject.workorder.OpsOrderDO; +import com.viewsh.module.ops.dal.mysql.area.OpsBusAreaMapper; +import com.viewsh.module.ops.dal.mysql.queue.OpsOrderQueueMapper; +import com.viewsh.module.ops.dal.mysql.workorder.OpsOrderMapper; +import com.viewsh.module.ops.enums.OrderQueueStatusEnum; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.Spy; +import org.mockito.junit.jupiter.MockitoExtension; + +import java.time.LocalDateTime; +import java.util.List; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +class OrderQueueServiceEnhancedTest { + + @Mock + private OpsOrderQueueMapper orderQueueMapper; + @Mock + private OpsOrderMapper orderMapper; + @Mock + private OpsBusAreaMapper areaMapper; + @Mock + private RedisOrderQueueService redisQueueService; + @Mock + private QueueSyncService queueSyncService; + @Spy + private QueueScoreCalculator queueScoreCalculator = new QueueScoreCalculator(); + + @InjectMocks + private OrderQueueServiceEnhanced orderQueueService; + + @Test + void shouldRebuildWaitingTasksAndAvoidStarvation() { + LocalDateTime now = LocalDateTime.now(); + Long userId = 2001L; + + OpsOrderQueueDO olderFarTask = OpsOrderQueueDO.builder() + .id(11L) + .userId(userId) + .opsOrderId(101L) + .priority(1) + .queueScore(0D) + .queueStatus(OrderQueueStatusEnum.WAITING.getStatus()) + .enqueueTime(now.minusMinutes(80)) + .build(); + OpsOrderQueueDO newerNearTask = OpsOrderQueueDO.builder() + .id(12L) + .userId(userId) + .opsOrderId(102L) + .priority(1) + .queueScore(0D) + .queueStatus(OrderQueueStatusEnum.WAITING.getStatus()) + .enqueueTime(now.minusMinutes(5)) + .build(); + OpsOrderQueueDO currentTask = OpsOrderQueueDO.builder() + .id(13L) + .userId(userId) + .opsOrderId(900L) + .queueStatus(OrderQueueStatusEnum.PROCESSING.getStatus()) + .build(); + + when(orderQueueMapper.selectListByUserIdAndStatus(userId, OrderQueueStatusEnum.WAITING.getStatus())) + .thenReturn(List.of(olderFarTask, newerNearTask)); + when(orderQueueMapper.selectCurrentExecutingByUserId(userId)).thenReturn(currentTask); + when(orderQueueMapper.selectListByUserId(userId)).thenReturn(List.of(olderFarTask, newerNearTask, currentTask)); + when(orderMapper.selectById(900L)).thenReturn(OpsOrderDO.builder().id(900L).areaId(501L).build()); + when(orderMapper.selectById(101L)).thenReturn(OpsOrderDO.builder().id(101L).areaId(503L).build()); + when(orderMapper.selectById(102L)).thenReturn(OpsOrderDO.builder().id(102L).areaId(502L).build()); + when(areaMapper.selectById(501L)).thenReturn(OpsBusAreaDO.builder().id(501L).floorNo(5).build()); + when(areaMapper.selectById(502L)).thenReturn(OpsBusAreaDO.builder().id(502L).floorNo(6).build()); + when(areaMapper.selectById(503L)).thenReturn(OpsBusAreaDO.builder().id(503L).floorNo(8).build()); + when(orderQueueMapper.updateById(any(OpsOrderQueueDO.class))).thenReturn(1); + + List rebuiltTasks = orderQueueService.rebuildWaitingTasksByUserId(userId, null); + + assertEquals(2, rebuiltTasks.size()); + assertEquals(101L, rebuiltTasks.get(0).getOpsOrderId()); + assertEquals(3, rebuiltTasks.get(0).getFloorDiff()); + assertTrue(rebuiltTasks.get(0).getWaitMinutes() >= 79); + assertEquals(102L, rebuiltTasks.get(1).getOpsOrderId()); + assertEquals(1, rebuiltTasks.get(1).getFloorDiff()); + assertTrue(rebuiltTasks.get(0).getQueueScore() < rebuiltTasks.get(1).getQueueScore()); + + verify(orderQueueMapper, times(2)).updateById(any(OpsOrderQueueDO.class)); + verify(redisQueueService).clearQueue(userId); + verify(redisQueueService).batchEnqueue(any()); + } +} diff --git a/viewsh-module-ops/viewsh-module-ops-biz/src/test/java/com/viewsh/module/ops/service/queue/QueueScoreCalculatorTest.java b/viewsh-module-ops/viewsh-module-ops-biz/src/test/java/com/viewsh/module/ops/service/queue/QueueScoreCalculatorTest.java new file mode 100644 index 0000000..2f44a4d --- /dev/null +++ b/viewsh-module-ops/viewsh-module-ops-biz/src/test/java/com/viewsh/module/ops/service/queue/QueueScoreCalculatorTest.java @@ -0,0 +1,73 @@ +package com.viewsh.module.ops.service.queue; + +import org.junit.jupiter.api.Test; + +import java.time.LocalDateTime; + +import static org.junit.jupiter.api.Assertions.assertTrue; + +class QueueScoreCalculatorTest { + + private final QueueScoreCalculator calculator = new QueueScoreCalculator(); + + @Test + void shouldPreferSmallerFloorDiffWhenPrioritySame() { + LocalDateTime now = LocalDateTime.now(); + + QueueScoreResult near = calculator.calculate(QueueScoreContext.builder() + .priority(1) + .baseFloorNo(5) + .targetFloorNo(6) + .enqueueTime(now.minusMinutes(5)) + .now(now) + .build()); + + QueueScoreResult far = calculator.calculate(QueueScoreContext.builder() + .priority(1) + .baseFloorNo(5) + .targetFloorNo(9) + .enqueueTime(now.minusMinutes(5)) + .now(now) + .build()); + + assertTrue(near.getTotalScore() < far.getTotalScore()); + } + + @Test + void shouldAllowOlderOrderToGainAgingAdvantage() { + LocalDateTime now = LocalDateTime.now(); + + QueueScoreResult older = calculator.calculate(QueueScoreContext.builder() + .priority(1) + .baseFloorNo(5) + .targetFloorNo(8) + .enqueueTime(now.minusMinutes(80)) + .now(now) + .build()); + + QueueScoreResult newer = calculator.calculate(QueueScoreContext.builder() + .priority(1) + .baseFloorNo(5) + .targetFloorNo(6) + .enqueueTime(now.minusMinutes(5)) + .now(now) + .build()); + + assertTrue(older.getTotalScore() < newer.getTotalScore()); + } + + @Test + void shouldDegradeGracefullyWhenBaseFloorMissing() { + LocalDateTime now = LocalDateTime.now(); + + QueueScoreResult result = calculator.calculate(QueueScoreContext.builder() + .priority(2) + .baseFloorNo(null) + .targetFloorNo(6) + .enqueueTime(now.minusMinutes(10)) + .now(now) + .build()); + + assertTrue(result.getTotalScore() < QueueScoreCalculator.PRIORITY_WEIGHT * 2); + } +}