From 5974c767d5d060d356f764e682986211b1df1501 Mon Sep 17 00:00:00 2001 From: lzh Date: Fri, 9 Jan 2026 17:38:01 +0800 Subject: [PATCH] =?UTF-8?q?chore:=20=E3=80=90ops=E3=80=91=E5=B7=A5?= =?UTF-8?q?=E5=8D=95=E5=9F=BA=E7=A1=80=E6=93=8D=E4=BD=9C=E4=BF=AE=E6=94=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../viewsh-module-environment-biz/pom.xml | 13 + .../DispatchEngineServiceAdapter.java | 396 ++++++++++ .../dispatch/DispatchEngineServiceImpl.java | 324 -------- .../service/dispatch/DispatchStrategy.java | 40 - .../ops/service/fsm/OrderStateMachine.java | 17 +- .../service/order/OpsOrderServiceImpl.java | 56 +- .../queue/OrderQueueServiceEnhanced.java | 689 ++++++++++++++++++ .../ops/service/queue/QueueSyncService.java | 14 +- 8 files changed, 1140 insertions(+), 409 deletions(-) create mode 100644 viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/dispatch/DispatchEngineServiceAdapter.java delete mode 100644 viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/dispatch/DispatchEngineServiceImpl.java delete mode 100644 viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/dispatch/DispatchStrategy.java create mode 100644 viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/queue/OrderQueueServiceEnhanced.java diff --git a/viewsh-module-ops/viewsh-module-environment-biz/pom.xml b/viewsh-module-ops/viewsh-module-environment-biz/pom.xml index 3816476..9e94a54 100644 --- a/viewsh-module-ops/viewsh-module-environment-biz/pom.xml +++ b/viewsh-module-ops/viewsh-module-environment-biz/pom.xml @@ -48,5 +48,18 @@ com.viewsh viewsh-spring-boot-starter-biz-tenant + + + + com.viewsh + viewsh-spring-boot-starter-mq + + + + + org.apache.rocketmq + rocketmq-spring-boot-starter + true + diff --git a/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/dispatch/DispatchEngineServiceAdapter.java b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/dispatch/DispatchEngineServiceAdapter.java new file mode 100644 index 0000000..16ee5e3 --- /dev/null +++ b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/dispatch/DispatchEngineServiceAdapter.java @@ -0,0 +1,396 @@ +package com.viewsh.module.ops.service.dispatch; + +import com.viewsh.module.ops.api.dispatch.DispatchEngineService; +import com.viewsh.module.ops.api.queue.OrderQueueDTO; +import com.viewsh.module.ops.api.queue.OrderQueueService; +import com.viewsh.module.ops.core.dispatch.DispatchEngine; +import com.viewsh.module.ops.core.dispatch.model.DispatchResult; +import com.viewsh.module.ops.core.dispatch.model.OrderDispatchContext; +import com.viewsh.module.ops.core.lifecycle.OrderLifecycleManager; +import com.viewsh.module.ops.dal.dataobject.workorder.OpsOrderDO; +import com.viewsh.module.ops.dal.mysql.workorder.OpsOrderMapper; +import com.viewsh.module.ops.enums.OperatorTypeEnum; +import com.viewsh.module.ops.enums.PriorityEnum; +import com.viewsh.module.ops.enums.WorkOrderStatusEnum; +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; + +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; + +/** + * 派单引擎服务适配器 + *

+ * 职责: + * 1. 适配 {@link DispatchEngineService} API 到 {@link DispatchEngine} 实现 + * 2. 使用新的调度引擎处理派单逻辑 + *

+ * 设计模式:适配器模式 + * - 将旧的 API 调用转换为新的调度引擎调用 + * - 新架构:策略模式 + 责任链模式 + * + * @author lzh + */ +@Slf4j +@Service +public class DispatchEngineServiceAdapter implements DispatchEngineService { + + @Resource + private DispatchEngine dispatchEngine; + + @Resource + private OrderLifecycleManager orderLifecycleManager; + + @Resource + private OrderQueueService orderQueueService; + + @Resource + private OpsOrderMapper orderMapper; + + @Override + @Transactional(rollbackFor = Exception.class) + public boolean autoDispatch(Long queueId) { + log.info("自动派单: queueId={}", queueId); + + // 1. 查询队列记录 + OrderQueueDTO queueDTO = orderQueueService.getById(queueId); + if (queueDTO == null) { + log.error("队列记录不存在: queueId={}", queueId); + return false; + } + + // 2. 查询工单 + OpsOrderDO order = orderMapper.selectById(queueDTO.getOpsOrderId()); + if (order == null) { + log.error("工单不存在: orderId={}", queueDTO.getOpsOrderId()); + return false; + } + + // 3. 构建调度上下文 + OrderDispatchContext context = OrderDispatchContext.builder() + .orderId(order.getId()) + .orderCode(order.getOrderCode()) + .orderTitle(order.getTitle()) + .businessType(order.getOrderType()) + .areaId(order.getAreaId()) + .priority(PriorityEnum.fromPriority(order.getPriority())) + .recommendedAssigneeId(queueDTO.getUserId()) + .build(); + + // 4. 执行调度(使用新引擎) + DispatchResult result = dispatchEngine.dispatch(context); + + if (result.isSuccess()) { + log.info("自动派单成功: queueId={}, assigneeId={}, path={}", + queueId, result.getAssigneeId(), result.getPath()); + return true; + } else { + log.error("自动派单失败: queueId={}, reason={}", queueId, result.getMessage()); + return false; + } + } + + @Override + @Transactional(rollbackFor = Exception.class) + public boolean manualDispatch(Long queueId, Long assigneeId) { + log.info("手动派单: queueId={}, assigneeId={}", queueId, assigneeId); + + // 1. 查询队列记录 + OrderQueueDTO queueDTO = orderQueueService.getById(queueId); + if (queueDTO == null) { + log.error("队列记录不存在: queueId={}", queueId); + return false; + } + + // 2. 查询工单 + OpsOrderDO order = orderMapper.selectById(queueDTO.getOpsOrderId()); + if (order == null) { + log.error("工单不存在: orderId={}", queueDTO.getOpsOrderId()); + return false; + } + + // 3. 更新工单的执行人 + order.setAssigneeId(assigneeId); + orderMapper.updateById(order); + + // 4. 使用生命周期管理器直接派单 + orderLifecycleManager.dispatch( + com.viewsh.module.ops.core.lifecycle.model.OrderTransitionRequest.builder() + .orderId(order.getId()) + .targetStatus(WorkOrderStatusEnum.DISPATCHED) + .assigneeId(assigneeId) + .queueId(queueId) + .operatorType(OperatorTypeEnum.ADMIN) + .operatorId(assigneeId) + .reason("管理员手动派单") + .build() + ); + + log.info("手动派单成功: queueId={}, assigneeId={}", queueId, assigneeId); + return true; + } + + @Override + public int batchAutoDispatch(Long areaId, int batchSize) { + log.info("批量自动派单: areaId={}, batchSize={}", areaId, batchSize); + + // 查询所有等待派单的任务 + List allWaitingTasks = orderQueueService.getWaitingQueue(); + if (allWaitingTasks.isEmpty()) { + log.info("没有等待派单的任务: areaId={}", areaId); + return 0; + } + + // 过滤指定区域的任务 + List waitingTasks = areaId != null + ? allWaitingTasks.stream() + .filter(t -> { + OpsOrderDO order = orderMapper.selectById(t.getOpsOrderId()); + return order != null && order.getAreaId().equals(areaId); + }) + .limit(batchSize) + .collect(Collectors.toList()) + : allWaitingTasks.stream().limit(batchSize).collect(Collectors.toList()); + + if (waitingTasks.isEmpty()) { + log.info("该区域没有等待派单的任务: areaId={}", areaId); + return 0; + } + + int successCount = 0; + for (OrderQueueDTO task : waitingTasks) { + try { + if (autoDispatch(task.getId())) { + successCount++; + } + } catch (Exception e) { + log.error("批量派单失败: queueId={}", task.getId(), e); + } + } + + log.info("批量派单完成: total={}, success={}", waitingTasks.size(), successCount); + return successCount; + } + + @Override + @Transactional(rollbackFor = Exception.class) + public boolean urgentDispatch(Long queueId) { + log.warn("紧急派单: queueId={}", queueId); + + // 1. 查询队列记录 + OrderQueueDTO queueDTO = orderQueueService.getById(queueId); + if (queueDTO == null) { + log.error("队列记录不存在: queueId={}", queueId); + return false; + } + + // 2. 查询工单 + OpsOrderDO order = orderMapper.selectById(queueDTO.getOpsOrderId()); + if (order == null) { + log.error("工单不存在: orderId={}", queueDTO.getOpsOrderId()); + return false; + } + + // 3. 使用新引擎的紧急打断方法 + DispatchResult result = dispatchEngine.urgentInterrupt(order.getId(), queueDTO.getUserId()); + + if (result.isSuccess()) { + log.warn("紧急派单成功: queueId={}, assigneeId={}", queueId, result.getAssigneeId()); + return true; + } else { + log.error("紧急派单失败: queueId={}, reason={}", queueId, result.getMessage()); + return false; + } + } + + @Override + @Transactional(rollbackFor = Exception.class) + public boolean handlePriorityInterrupt(Long orderId) { + log.warn("处理优先级升级打断: orderId={}", orderId); + + // 1. 查询工单 + OpsOrderDO order = orderMapper.selectById(orderId); + if (order == null) { + log.error("工单不存在: orderId={}", orderId); + return false; + } + + // 2. 查询队列记录 + OrderQueueDTO queueDTO = orderQueueService.getByOpsOrderId(orderId); + if (queueDTO == null) { + log.warn("工单不在队列中: orderId={}", orderId); + return true; + } + + // 3. 使用新引擎的紧急打断方法 + DispatchResult result = dispatchEngine.urgentInterrupt(orderId, queueDTO.getUserId()); + + if (result.isSuccess()) { + log.warn("优先级升级打断成功: orderId={}", orderId); + return true; + } else { + log.error("优先级升级打断失败: orderId={}, reason={}", orderId, result.getMessage()); + return false; + } + } + + @Override + public List findAvailableAssignees(Long areaId, String assigneeType) { + log.info("查询可用执行人: areaId={}, assigneeType={}", areaId, assigneeType); + + // 根据业务类型查询可用人员 + // 这里简化处理,直接返回空列表 + // 实际应该调用相应的分配策略获取可用人员 + log.warn("findAvailableAssignees 方法暂未实现,返回空列表"); + return Collections.emptyList(); + } + + @Override + public Long recommendAssignee(Long queueId, com.viewsh.module.ops.enums.DispatchStrategyEnum dispatchStrategy) { + log.info("推荐执行人: queueId={}, strategy={}", queueId, dispatchStrategy); + + // 1. 查询队列记录 + OrderQueueDTO queueDTO = orderQueueService.getById(queueId); + if (queueDTO == null) { + log.error("队列记录不存在: queueId={}", queueId); + return null; + } + + // 2. 查询工单 + OpsOrderDO order = orderMapper.selectById(queueDTO.getOpsOrderId()); + if (order == null) { + log.error("工单不存在: orderId={}", queueDTO.getOpsOrderId()); + return null; + } + + // 3. 使用新引擎的推荐方法 + OrderDispatchContext context = OrderDispatchContext.builder() + .orderId(order.getId()) + .orderCode(order.getOrderCode()) + .orderTitle(order.getTitle()) + .businessType(order.getOrderType()) + .areaId(order.getAreaId()) + .priority(PriorityEnum.fromPriority(order.getPriority())) + .build(); + + com.viewsh.module.ops.core.dispatch.model.AssigneeRecommendation recommendation = + dispatchEngine.recommendAssignee(context); + + if (recommendation != null && recommendation.hasRecommendation()) { + log.info("推荐执行人成功: queueId={}, assigneeId={}, assigneeName={}", + queueId, recommendation.getAssigneeId(), recommendation.getAssigneeName()); + return recommendation.getAssigneeId(); + } + + log.warn("未找到合适的执行人: queueId={}", queueId); + return null; + } + + @Override + public DispatchSuggestion getDispatchSuggestion(Long queueId) { + log.info("获取派单建议: queueId={}", queueId); + + // 1. 查询队列记录 + OrderQueueDTO queueDTO = orderQueueService.getById(queueId); + if (queueDTO == null) { + log.error("队列记录不存在: queueId={}", queueId); + return new DispatchSuggestion(null, null, "队列记录不存在", 0); + } + + // 2. 查询工单 + OpsOrderDO order = orderMapper.selectById(queueDTO.getOpsOrderId()); + if (order == null) { + log.error("工单不存在: orderId={}", queueDTO.getOpsOrderId()); + return new DispatchSuggestion(null, null, "工单不存在", 0); + } + + // 3. 使用新引擎的推荐方法 + OrderDispatchContext context = OrderDispatchContext.builder() + .orderId(order.getId()) + .orderCode(order.getOrderCode()) + .orderTitle(order.getTitle()) + .businessType(order.getOrderType()) + .areaId(order.getAreaId()) + .priority(PriorityEnum.fromPriority(order.getPriority())) + .build(); + + com.viewsh.module.ops.core.dispatch.model.AssigneeRecommendation recommendation = + dispatchEngine.recommendAssignee(context); + + if (recommendation != null && recommendation.hasRecommendation()) { + return new DispatchSuggestion( + recommendation.getAssigneeId(), + recommendation.getAssigneeName(), + recommendation.getReason(), + recommendation.getScore() + ); + } + + return new DispatchSuggestion(null, null, "未找到合适的执行人", 0); + } + + @Override + @Transactional(rollbackFor = Exception.class) + public boolean cancelDispatch(Long queueId, String reason) { + log.info("取消派单: queueId={}, reason={}", queueId, reason); + + // 1. 查询队列记录 + OrderQueueDTO queueDTO = orderQueueService.getById(queueId); + if (queueDTO == null) { + log.error("队列记录不存在: queueId={}", queueId); + return false; + } + + // 2. 使用生命周期管理器取消工单 + try { + orderLifecycleManager.cancelOrder( + queueDTO.getOpsOrderId(), + queueDTO.getUserId(), + OperatorTypeEnum.ADMIN, + reason + ); + log.info("取消派单成功: queueId={}", queueId); + return true; + } catch (Exception e) { + log.error("取消派单失败: queueId={}", queueId, e); + return false; + } + } + + @Override + @Transactional(rollbackFor = Exception.class) + public boolean redispatch(Long queueId) { + log.info("重新派单: queueId={}", queueId); + + // 1. 查询队列记录 + OrderQueueDTO queueDTO = orderQueueService.getById(queueId); + if (queueDTO == null) { + log.error("队列记录不存在: queueId={}", queueId); + return false; + } + + // 2. 重置队列状态为等待 + orderQueueService.updateStatus(queueId, com.viewsh.module.ops.enums.OrderQueueStatusEnum.WAITING); + + // 3. 重新执行自动派单 + return autoDispatch(queueId); + } + + @Override + public void startDispatchEngine() { + log.info("启动派单引擎"); + // 新架构下,派单引擎不使用定时任务 + // 派单由事件驱动触发 + log.info("派单引擎已启动(事件驱动模式)"); + } + + @Override + public void stopDispatchEngine() { + log.info("停止派单引擎"); + // 新架构下,派单引擎不使用定时任务 + log.info("派单引擎已停止"); + } +} diff --git a/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/dispatch/DispatchEngineServiceImpl.java b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/dispatch/DispatchEngineServiceImpl.java deleted file mode 100644 index 62a1f29..0000000 --- a/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/dispatch/DispatchEngineServiceImpl.java +++ /dev/null @@ -1,324 +0,0 @@ -package com.viewsh.module.ops.service.dispatch; - -import com.viewsh.module.ops.api.dispatch.DispatchEngineService; -import com.viewsh.module.ops.api.queue.OrderQueueDTO; -import com.viewsh.module.ops.api.queue.OrderQueueService; -import com.viewsh.module.ops.dal.dataobject.queue.OpsOrderQueueDO; -import com.viewsh.module.ops.dal.mysql.queue.OpsOrderQueueMapper; -import com.viewsh.module.ops.enums.DispatchStrategyEnum; -import com.viewsh.module.ops.enums.OrderQueueStatusEnum; -import com.viewsh.module.ops.enums.PriorityEnum; -import jakarta.annotation.PostConstruct; -import jakarta.annotation.Resource; -import lombok.extern.slf4j.Slf4j; -import org.springframework.stereotype.Service; -import org.springframework.transaction.annotation.Transactional; - -import java.util.*; -import java.util.concurrent.ConcurrentHashMap; - -/** - * 派单引擎服务实现 - * 提供通用的派单框架,各业务模块通过实现 DispatchStrategy 接口提供具体策略 - * - * 注意:新的队列模型是在工单创建时就指定了 userId(保洁员), - * 所以派单引擎的主要职责是: - * 1. 管理队列状态的流转(WAITING -> dispatched) - * 2. 处理P0紧急任务的插队和打断 - * - * @author lzh - */ -@Service -@Slf4j -public class DispatchEngineServiceImpl implements DispatchEngineService { - - @Resource - private OrderQueueService orderQueueService; - - @Resource - private OpsOrderQueueMapper orderQueueMapper; - - /** - * 派单策略注册表 - * Key: 策略名称 - * Value: 策略实现 - */ - private final Map strategyRegistry = new ConcurrentHashMap<>(); - - /** - * 执行人员类型与策略的映射 - */ - private final Map assigneeTypeStrategyMap = new ConcurrentHashMap<>(); - - @PostConstruct - public void init() { - log.info("派单引擎已初始化"); - } - - // ========== 策略管理 ========== - - /** - * 注册派单策略 - */ - public void registerStrategy(DispatchStrategy strategy) { - strategyRegistry.put(strategy.getName(), strategy); - log.info("派单策略已注册: strategyName={}", strategy.getName()); - } - - /** - * 注册执行人员类型与策略的映射 - */ - public void registerAssigneeTypeStrategy(String assigneeType, String strategyName) { - assigneeTypeStrategyMap.put(assigneeType, strategyName); - log.info("执行人员类型策略映射已注册: assigneeType={}, strategyName={}", - assigneeType, strategyName); - } - - private DispatchStrategy getStrategy(String strategyName) { - return strategyRegistry.get(strategyName); - } - - private DispatchStrategy getStrategyByAssigneeType(String assigneeType) { - String strategyName = assigneeTypeStrategyMap.get(assigneeType); - if (strategyName == null) { - log.warn("未找到执行人员类型对应的策略: assigneeType={}", assigneeType); - return null; - } - return getStrategy(strategyName); - } - - // ========== 派单方法实现 ========== - - @Override - @Transactional(rollbackFor = Exception.class) - public boolean autoDispatch(Long queueId) { - log.info("开始自动派单: queueId={}", queueId); - - // 在新的队列模型中,工单入队时已经指定了 userId - // 派单引擎只需要将状态从 WAITING 改为 EXECUTING - boolean success = orderQueueService.startExecution(queueId); - - if (success) { - log.info("自动派单成功: queueId={}", queueId); - } else { - log.warn("自动派单失败: queueId={}", queueId); - } - - return success; - } - - @Override - @Transactional(rollbackFor = Exception.class) - public boolean manualDispatch(Long queueId, Long assigneeId) { - log.info("手动派单: queueId={}, assigneeId={}", queueId, assigneeId); - - // 在新的队列模型中,userId 在入队时已经确定 - // 手动派单主要是重新分配 userId,然后开始执行 - OpsOrderQueueDO queueDO = orderQueueMapper.selectById(queueId); - if (queueDO == null) { - log.warn("队列记录不存在: queueId={}", queueId); - return false; - } - - // 更新 userId - queueDO.setUserId(assigneeId); - orderQueueMapper.updateById(queueDO); - - // 开始执行 - return autoDispatch(queueId); - } - - @Override - @Transactional(rollbackFor = Exception.class) - public int batchAutoDispatch(Long areaId, int batchSize) { - log.info("开始批量自动派单: batchSize={}", batchSize); - - // 查询等待中的队列 - List waitingQueue = orderQueueService.getWaitingQueue(); - if (waitingQueue.isEmpty()) { - log.info("没有待派单的任务"); - return 0; - } - - // 限制批次大小 - int processCount = Math.min(batchSize, waitingQueue.size()); - int successCount = 0; - - // 逐个派单 - for (int i = 0; i < processCount; i++) { - OrderQueueDTO queueDTO = waitingQueue.get(i); - try { - boolean success = autoDispatch(queueDTO.getId()); - if (success) { - successCount++; - } - } catch (Exception e) { - log.error("派单失败: queueId={}, opsOrderId={}", - queueDTO.getId(), queueDTO.getOpsOrderId(), e); - } - } - - log.info("批量派单完成: total={}, success={}", processCount, successCount); - return successCount; - } - - @Override - @Transactional(rollbackFor = Exception.class) - public boolean urgentDispatch(Long queueId) { - log.warn("开始紧急派单(P0): queueId={}", queueId); - - OrderQueueDTO queueDTO = orderQueueService.getById(queueId); - if (queueDTO == null) { - log.warn("队列记录不存在: queueId={}", queueId); - return false; - } - - // P0 紧急任务的处理: - // 1. 直接开始执行 - // 2. 如果该用户正在执行其他任务,需要先打断 - - // 检查该用户是否正在执行其他任务 - OrderQueueDTO currentTask = orderQueueService.getCurrentTaskByUserId(queueDTO.getUserId()); - if (currentTask != null && !currentTask.getId().equals(queueId)) { - // 用户正在执行其他任务,需要打断 - log.warn("该用户正在执行其他任务,需要打断: userId={}, currentTaskId={}", - queueDTO.getUserId(), currentTask.getId()); - - // 判断是否可以打断 - String assigneeType = "CLEANER"; // TODO: 从用户表获取 - DispatchStrategy strategy = getStrategyByAssigneeType(assigneeType); - if (strategy != null && strategy.canInterrupt( - queueDTO.getUserId(), - currentTask.getOpsOrderId(), - queueDTO.getOpsOrderId())) { - // 打断当前任务 - interruptTask(currentTask.getId(), currentTask.getOpsOrderId(), queueDTO.getOpsOrderId()); - } else { - log.warn("不允许打断当前任务: currentTaskId={}", currentTask.getId()); - return false; - } - } - - // 开始执行紧急任务 - return autoDispatch(queueId); - } - - @Override - @Transactional(rollbackFor = Exception.class) - public boolean handlePriorityInterrupt(Long orderId) { - log.warn("处理紧急插队: orderId={}", orderId); - - OrderQueueDTO queueDTO = orderQueueService.getByOpsOrderId(orderId); - if (queueDTO == null) { - log.warn("工单不在队列中: orderId={}", orderId); - return false; - } - - // 如果不是P0,不需要特殊处理 - if (!PriorityEnum.P0.getPriority().equals(queueDTO.getPriority())) { - log.info("非P0工单,无需插队处理: orderId={}, priority={}", - orderId, queueDTO.getPriority()); - return true; - } - - // 执行紧急派单 - return urgentDispatch(queueDTO.getId()); - } - - @Override - public List findAvailableAssignees(Long areaId, String assigneeType) { - log.debug("查询可用执行人员: areaId={}, assigneeType={}", areaId, assigneeType); - - // 根据执行人员类型获取策略 - DispatchStrategy strategy = getStrategyByAssigneeType(assigneeType); - if (strategy == null) { - log.warn("未找到派单策略: assigneeType={}", assigneeType); - return Collections.emptyList(); - } - - // 调用策略查询可用人员 - // TODO: 这里需要在策略接口中添加查询可用人员列表的方法 - return Collections.emptyList(); - } - - @Override - public Long recommendAssignee(Long queueId, DispatchStrategyEnum dispatchStrategy) { - log.debug("推荐执行人员: queueId={}", queueId); - - // 在新的队列模型中,推荐执行人员的逻辑应该在入队之前完成 - // 这里只是预留接口,实际应该返回 null - log.warn("在新队列模型中,推荐执行人员应该在入队时完成,而非派单时"); - return null; - } - - @Override - public DispatchSuggestion getDispatchSuggestion(Long queueId) { - // 在新的队列模型中,这个方法的意义不大 - // 因为 userId 在入队时已经确定 - log.warn("在新队列模型中,派单建议功能不适用"); - return null; - } - - @Override - @Transactional(rollbackFor = Exception.class) - public boolean cancelDispatch(Long queueId, String reason) { - log.info("取消派单: queueId={}, reason={}", queueId, reason); - - OrderQueueDTO queueDTO = orderQueueService.getById(queueId); - if (queueDTO == null) { - return false; - } - - if (!queueDTO.canCancel()) { - log.warn("当前状态不允许取��: queueId={}, status={}", queueId, queueDTO.getQueueStatus()); - return false; - } - - return orderQueueService.updateStatus(queueId, OrderQueueStatusEnum.CANCELLED); - } - - @Override - @Transactional(rollbackFor = Exception.class) - public boolean redispatch(Long queueId) { - log.info("重新派单: queueId={}", queueId); - - // 重新派单就是重新开始执行 - return autoDispatch(queueId); - } - - @Override - public void startDispatchEngine() { - log.info("派单引擎已启动"); - // TODO: 启动定时任务,定时处理待派单队列 - } - - @Override - public void stopDispatchEngine() { - log.info("派单引擎已停止"); - // TODO: 停止定时任务 - } - - // ========== 私有方法 ========== - - /** - * 打断任务 - */ - private void interruptTask(Long queueId, Long currentOrderId, Long urgentOrderId) { - log.warn("打断任务: queueId={}, currentOrderId={}, urgentOrderId={}", - queueId, currentOrderId, urgentOrderId); - - // 暂停当前任务 - boolean paused = orderQueueService.pauseTask(queueId); - - if (paused) { - // 更新事件消息 - orderQueueService.updateEventMessage(queueId, - "任务被P0紧急任务打断,紧急工单ID: " + urgentOrderId); - - // TODO: 通知执行人员任务被打断(通过工牌推送消息) - // TODO: 更新工单状态为暂停(调用工单服务) - - log.info("任务已打断,执行人员可处理紧急任务: queueId={}", queueId); - } - } -} diff --git a/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/dispatch/DispatchStrategy.java b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/dispatch/DispatchStrategy.java deleted file mode 100644 index ced8d36..0000000 --- a/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/dispatch/DispatchStrategy.java +++ /dev/null @@ -1,40 +0,0 @@ -package com.viewsh.module.ops.service.dispatch; - -import com.viewsh.module.ops.api.queue.OrderQueueDTO; - -/** - * 派单策略接口 - * 各业务模块(保洁、安保、工程等)需要实现此接口,定义自己的派单逻辑 - * - * @author lzh - */ -public interface DispatchStrategy { - - /** - * 策略名称 - * 如:cleaner_area_priority, security_skill_match - * - * @return 策略名称 - */ - String getName(); - - /** - * 执行派单策略 - * 根据队列记录推荐合适的执行人员 - * - * @param queueDTO 队列记录 - * @return 推荐的执行人员ID,如果没有合适的返回null - */ - Long recommendAssignee(OrderQueueDTO queueDTO); - - /** - * 判断是否可打断当前任务 - * 当P0紧急任务需要插队时,判断是否可以打断当前执行的任务 - * - * @param currentAssigneeId 当前执行任务的执行人员ID - * @param currentOrderId 当前正在执行的工单ID - * @param urgentOrderId 紧急工单ID - * @return 是否可以打断 - */ - boolean canInterrupt(Long currentAssigneeId, Long currentOrderId, Long urgentOrderId); -} diff --git a/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/fsm/OrderStateMachine.java b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/fsm/OrderStateMachine.java index e486909..8a3e655 100644 --- a/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/fsm/OrderStateMachine.java +++ b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/fsm/OrderStateMachine.java @@ -133,17 +133,26 @@ public class OrderStateMachine { throw new IllegalArgumentException("目标状态不能为空"); } - // 2. 校验状态转换合法性 + // 2. 获取当前状态 WorkOrderStatusEnum currentStatus = WorkOrderStatusEnum.valueOf(order.getStatus()); + + // 3. 如果目标状态与当前状态相同,直接返回(避免不必要的操作) + if (currentStatus == newStatus) { + log.debug("工单状态未变化,跳过状态转换: orderId={}, status={}", + order.getId(), currentStatus); + return; + } + + // 4. 校验状态转换合法性 validateTransition(currentStatus, newStatus); - // 3. 更新工单状态和相关字段 + // 5. 更新工单状态和相关字段 WorkOrderStatusEnum oldStatus = currentStatus; order.setStatus(newStatus.name()); updateStatusFields(order, newStatus); opsOrderMapper.updateById(order); - // 4. 记录事件流 + // 6. 记录事件流 eventService.recordEvent( order.getId(), oldStatus.name(), @@ -154,7 +163,7 @@ public class OrderStateMachine { remark ); - // 5. 发布状态变更事件(替代监听器机制) + // 7. 发布状态变更事件(替代监听器机制) publishStateChangedEvent(order, oldStatus, newStatus, operatorType, operatorId, remark); log.info("工单状态转换成功: orderId={}, {} -> {}, operatorType={}, operatorId={}", diff --git a/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/order/OpsOrderServiceImpl.java b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/order/OpsOrderServiceImpl.java index b160761..d9db6bc 100644 --- a/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/order/OpsOrderServiceImpl.java +++ b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/order/OpsOrderServiceImpl.java @@ -2,6 +2,7 @@ package com.viewsh.module.ops.service.order; import com.viewsh.framework.common.pojo.PageResult; import com.viewsh.framework.common.util.object.BeanUtils; +import com.viewsh.module.ops.core.lifecycle.OrderLifecycleManager; import com.viewsh.module.ops.dal.dataobject.dto.*; import com.viewsh.module.ops.dal.dataobject.workorder.OpsOrderDO; import com.viewsh.module.ops.dal.mysql.workorder.OpsOrderMapper; @@ -16,10 +17,18 @@ import org.springframework.transaction.annotation.Transactional; import jakarta.annotation.Resource; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; -import java.util.List; /** * 通用工单服务实现 + *

+ * 职责: + * 1. 工单CRUD基础操作(createOrder, updateOrder, deleteOrder, getOrder) + * 2. 简单状态转换(assignOrder, acceptOrder, completeOrder) + * 3. 复杂生命周期操作委托给 OrderLifecycleManager(pauseOrder, resumeOrder, cancelOrder) + *

+ * 架构说明: + * - 暂停/恢复/取消操作委托给 {@link OrderLifecycleManager} 以确保工单和队列状态同步 + * - 简单状态转换直接使用 {@link OrderStateMachine} * * @author lzh */ @@ -33,6 +42,12 @@ public class OpsOrderServiceImpl implements OpsOrderService { @Resource private OrderStateMachine orderStateMachine; + /** + * 工单生命周期管理器(用于处理需要同步队列状态的操作) + */ + @Resource + private OrderLifecycleManager orderLifecycleManager; + private static final DateTimeFormatter ORDER_CODE_FORMATTER = DateTimeFormatter.ofPattern("yyyyMMddHHmmss"); @Override @@ -58,14 +73,7 @@ public class OpsOrderServiceImpl implements OpsOrderService { // 4. 插入数据库 opsOrderMapper.insert(order); - // 5. 记录创建事件 - orderStateMachine.transition( - order, - WorkOrderStatusEnum.PENDING, - OperatorTypeEnum.SYSTEM, - null, - "创建工单" - ); + // 5. 发布工单创建事件 log.info("创建工单成功: orderId={}, orderCode={}, title={}", order.getId(), order.getOrderCode(), order.getTitle()); @@ -232,14 +240,8 @@ public class OpsOrderServiceImpl implements OpsOrderService { throw new RuntimeException("无权暂停工单,您不是该工单的执行人"); } - // 3. 通过状态机执行状态转换 - orderStateMachine.transition( - order, - WorkOrderStatusEnum.PAUSED, - OperatorTypeEnum.CLEANER, - userId, - reason - ); + // 3. 委托给 OrderLifecycleManager 处理(确保工单和队列状态同步) + orderLifecycleManager.pauseOrder(orderId, userId, reason); log.info("暂停工单成功: orderId={}, userId={}, reason={}", orderId, userId, reason); } @@ -253,14 +255,8 @@ public class OpsOrderServiceImpl implements OpsOrderService { throw new RuntimeException("工单不存在: " + orderId); } - // 2. 通过状态机执行状态转换 - orderStateMachine.transition( - order, - WorkOrderStatusEnum.ARRIVED, - OperatorTypeEnum.CLEANER, - userId, - "恢复工单" - ); + // 2. 委托给 OrderLifecycleManager 处理(确保工单和队列状态同步) + orderLifecycleManager.resumeOrder(orderId, userId); log.info("恢复工单成功: orderId={}, userId={}", orderId, userId); } @@ -279,14 +275,8 @@ public class OpsOrderServiceImpl implements OpsOrderService { throw new RuntimeException("已完成的工单不能取消"); } - // 3. 通过状态机执行状态转换 - orderStateMachine.transition( - order, - WorkOrderStatusEnum.CANCELLED, - operatorType, - operatorId, - reason - ); + // 3. 委托给 OrderLifecycleManager 处理(确保工单和队列状态同步) + orderLifecycleManager.cancelOrder(orderId, operatorId, operatorType, reason); log.info("取消工单成功: orderId={}, reason={}", orderId, reason); } diff --git a/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/queue/OrderQueueServiceEnhanced.java b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/queue/OrderQueueServiceEnhanced.java new file mode 100644 index 0000000..d9425ac --- /dev/null +++ b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/queue/OrderQueueServiceEnhanced.java @@ -0,0 +1,689 @@ +package com.viewsh.module.ops.service.queue; + +import com.viewsh.module.ops.api.queue.OrderQueueDTO; +import com.viewsh.module.ops.api.queue.OrderQueueService; +import com.viewsh.module.ops.dal.dataobject.queue.OpsOrderQueueDO; +import com.viewsh.module.ops.dal.mysql.queue.OpsOrderQueueMapper; +import com.viewsh.module.ops.enums.OrderQueueStatusEnum; +import com.viewsh.module.ops.enums.PriorityEnum; +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.BeanUtils; +import org.springframework.context.annotation.Primary; +import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; + +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.util.*; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; + +/** + * 工单队列管理服务实现 + *

+ * 架构说明: + * 1. 写入:先写 MySQL(持久化),再异步写 Redis(高性能) + * 2. 读取:优先读 Redis,未命中则读 MySQL 并同步到 Redis + * 3. 同步:定时任务将 MySQL 数据同步到 Redis + * 4. 容灾:Redis 宕机时降级到纯 MySQL 模式 + * + * @author lzh + */ +@Slf4j +@Service +public class OrderQueueServiceEnhanced implements OrderQueueService { + + /** + * Score 计算公式:优先级分数 + 时间戳 + * 优先级分数:P0=0, P1=1000000, P2=2000000, P3=3000000 + * 时间戳:秒级时间戳 + * 结果:优先级高的排在前面,同优先级按时间排序 + */ + private static final Map PRIORITY_SCORES = Map.of( + 0, 0L, // P0: 0 + 1, 1000000L, // P1: 1,000,000 + 2, 2000000L, // P2: 2,000,000 + 3, 3000000L // P3: 3,000,000 + ); + + @Resource + private OpsOrderQueueMapper orderQueueMapper; + + @Resource + private RedisOrderQueueService redisQueueService; + + @Resource + private QueueSyncService queueSyncService; + + @Override + @Transactional(rollbackFor = Exception.class) + public Long enqueue(Long opsOrderId, Long userId, PriorityEnum priority, Integer queueIndex) { + // 1. 检查工单是否已在队列中(MySQL) + OpsOrderQueueDO existingQueue = orderQueueMapper.selectByOpsOrderId(opsOrderId); + if (existingQueue != null) { + log.warn("工单已在队列中: opsOrderId={}, userId={}", opsOrderId, userId); + return existingQueue.getId(); + } + + // 2. 计算队列分数 + LocalDateTime now = LocalDateTime.now(); + double queueScore = calculateQueueScore(priority.getPriority(), now); + + // 3. 创建队列记录(MySQL) + OpsOrderQueueDO queueDO = OpsOrderQueueDO.builder() + .opsOrderId(opsOrderId) + .userId(userId) + .queueIndex(queueIndex != null ? queueIndex : calculateNextQueueIndex(priority)) + .priority(priority.getPriority()) + .queueScore(queueScore) + .queueStatus(OrderQueueStatusEnum.WAITING.getStatus()) + .enqueueTime(now) + .pausedDuration(0) + .eventMessage("工单入队,等待派单") + .build(); + + orderQueueMapper.insert(queueDO); + + log.info("工单已入队(MySQL): opsOrderId={}, userId={}, priority={}, queueId={}", + opsOrderId, userId, priority, queueDO.getId()); + + // 3. 异步写入 Redis(失败不影响主流程) + OrderQueueDTO dto = convertToDTO(queueDO); + CompletableFuture.runAsync(() -> { + try { + redisQueueService.enqueue(dto); + log.debug("工单已入队(Redis): queueId={}", dto.getId()); + } catch (Exception e) { + log.error("Redis 入队失败,依赖定时同步任务补偿: queueId={}", dto.getId(), e); + } + }); + + // 4. 如果是P0紧急任务,记录警告日志 + if (priority.isUrgent()) { + log.warn("检测到P0紧急任务入队,需立即处理: opsOrderId={}", opsOrderId); + // TODO: 触发紧急派单流程(在派单引擎中实现) + } + + return queueDO.getId(); + } + + @Override + @Transactional(rollbackFor = Exception.class) + public boolean dequeue(Long queueId) { + // 1. MySQL 删除 + OpsOrderQueueDO queueDO = orderQueueMapper.selectById(queueId); + if (queueDO == null) { + log.warn("队列记录不存在: queueId={}", queueId); + return false; + } + + // 只允许删除终态的记录 + if (!isTerminalStatus(queueDO.getQueueStatus())) { + log.warn("只能删除终态的队列记录: queueId={}, status={}", queueId, queueDO.getQueueStatus()); + return false; + } + + int deleted = orderQueueMapper.deleteById(queueId); + + // 2. 异步删除 Redis + if (deleted > 0) { + CompletableFuture.runAsync(() -> { + try { + redisQueueService.remove(queueId); + log.debug("Redis 删除队列记录成功: queueId={}", queueId); + } catch (Exception e) { + log.error("Redis 删除队列记录失败: queueId={}", queueId, e); + } + }); + } + + log.info("队列记录已删除: queueId={}, deleted={}", queueId, deleted > 0); + return deleted > 0; + } + + @Override + @Transactional(rollbackFor = Exception.class) + public boolean dequeueByOpsOrderId(Long opsOrderId) { + OpsOrderQueueDO queueDO = orderQueueMapper.selectByOpsOrderId(opsOrderId); + if (queueDO == null) { + log.warn("工单不在队列中: opsOrderId={}", opsOrderId); + return false; + } + + return dequeue(queueDO.getId()); + } + + @Override + @Transactional(rollbackFor = Exception.class) + public boolean updateStatus(Long queueId, OrderQueueStatusEnum newStatus) { + // 1. MySQL 更新 + OpsOrderQueueDO queueDO = orderQueueMapper.selectById(queueId); + if (queueDO == null) { + log.warn("队列记录不存在: queueId={}", queueId); + return false; + } + + String oldStatus = queueDO.getQueueStatus(); + + // 状态流转校验 + if (!validateStatusTransition(oldStatus, newStatus.getStatus())) { + log.warn("非法的状态流转: queueId={}, oldStatus={}, newStatus={}", + queueId, oldStatus, newStatus.getStatus()); + return false; + } + + queueDO.setQueueStatus(newStatus.getStatus().toUpperCase()); + updateStatusTimestamp(queueDO, newStatus); + + int updated = orderQueueMapper.updateById(queueDO); + + // 2. 异步更新 Redis + if (updated > 0) { + OrderQueueDTO dto = convertToDTO(queueDO); + CompletableFuture.runAsync(() -> { + try { + redisQueueService.updateStatus(queueId, newStatus.getStatus()); + log.debug("Redis 更新状态成功: queueId={}, status={}", queueId, newStatus.getStatus()); + } catch (Exception e) { + log.error("Redis 更新状态失败: queueId={}", queueId, e); + } + }); + } + + log.info("队列状态已更新: queueId={}, opsOrderId={}, oldStatus={}, newStatus={}", + queueId, queueDO.getOpsOrderId(), oldStatus, newStatus.getStatus()); + + return updated > 0; + } + + @Override + @Transactional(rollbackFor = Exception.class) + public boolean startExecution(Long queueId) { + OpsOrderQueueDO queueDO = orderQueueMapper.selectById(queueId); + if (queueDO == null) { + log.warn("队列记录不存在: queueId={}", queueId); + return false; + } + + if (!OrderQueueStatusEnum.WAITING.getStatus().equals(queueDO.getQueueStatus())) { + log.warn("只有WAITING状态的任务可以开始执行: queueId={}, status={}", + queueId, queueDO.getQueueStatus()); + return false; + } + + // 队列状态流转:WAITING → PROCESSING + queueDO.setQueueStatus(OrderQueueStatusEnum.PROCESSING.getStatus()); + queueDO.setDequeueTime(LocalDateTime.now()); + queueDO.setEventMessage("派单成功,已分配给执行人员"); + + int updated = orderQueueMapper.updateById(queueDO); + + // 异步更新 Redis + if (updated > 0) { + CompletableFuture.runAsync(() -> { + try { + redisQueueService.updateStatus(queueId, OrderQueueStatusEnum.PROCESSING.getStatus()); + } catch (Exception e) { + log.error("Redis 更新状态失败: queueId={}", queueId, e); + } + }); + } + + log.info("派单成功: queueId={}, opsOrderId={}, assigneeId={}", + queueId, queueDO.getOpsOrderId(), queueDO.getUserId()); + + return updated > 0; + } + + @Override + @Transactional(rollbackFor = Exception.class) + public boolean pauseTask(Long queueId) { + OpsOrderQueueDO queueDO = orderQueueMapper.selectById(queueId); + if (queueDO == null) { + log.warn("队列记录不存在: queueId={}", queueId); + return false; + } + + // 允许 PROCESSING 状态的任务可以暂停 + String currentStatus = queueDO.getQueueStatus(); + if (!OrderQueueStatusEnum.PROCESSING.getStatus().equals(currentStatus)) { + log.warn("只有PROCESSING状态的任务可以暂停: queueId={}, status={}", + queueId, currentStatus); + return false; + } + + // 队列状态流转:PROCESSING → PAUSED + queueDO.setQueueStatus(OrderQueueStatusEnum.PAUSED.getStatus()); + + // 计算暂停时长 + if (queueDO.getDequeueTime() != null) { + long pausedSeconds = java.time.Duration.between( + queueDO.getDequeueTime(), + LocalDateTime.now() + ).getSeconds(); + queueDO.addPausedDuration((int) pausedSeconds); + } + + queueDO.setEventMessage("任务已暂停"); + int updated = orderQueueMapper.updateById(queueDO); + + // 异步更新 Redis + if (updated > 0) { + CompletableFuture.runAsync(() -> { + try { + redisQueueService.updateStatus(queueId, OrderQueueStatusEnum.PAUSED.getStatus()); + } catch (Exception e) { + log.error("Redis 更新状态失败: queueId={}", queueId, e); + } + }); + } + + log.info("任务已暂停: queueId={}, opsOrderId={}, pausedDuration={}", + queueId, queueDO.getOpsOrderId(), queueDO.getPausedDuration()); + + return updated > 0; + } + + @Override + @Transactional(rollbackFor = Exception.class) + public boolean resumeTask(Long queueId) { + OpsOrderQueueDO queueDO = orderQueueMapper.selectById(queueId); + if (queueDO == null) { + log.warn("队列记录不存在: queueId={}", queueId); + return false; + } + + // 只有 PAUSED 状态的任务可以恢复 + String currentStatus = queueDO.getQueueStatus(); + if (!OrderQueueStatusEnum.PAUSED.getStatus().equals(currentStatus)) { + log.warn("只有PAUSED状态的任务可以恢复: queueId={}, status={}", + queueId, currentStatus); + return false; + } + + // 队列状态流转:PAUSED → PROCESSING + queueDO.setQueueStatus(OrderQueueStatusEnum.PROCESSING.getStatus()); + queueDO.setDequeueTime(LocalDateTime.now()); // 重置出队时间 + queueDO.setEventMessage("任务已恢复执行"); + + int updated = orderQueueMapper.updateById(queueDO); + + // 异步更新 Redis + if (updated > 0) { + CompletableFuture.runAsync(() -> { + try { + redisQueueService.updateStatus(queueId, OrderQueueStatusEnum.PROCESSING.getStatus()); + } catch (Exception e) { + log.error("Redis 更新状态失败: queueId={}", queueId, e); + } + }); + } + + log.info("任务已恢复执行: queueId={}, opsOrderId={}", queueId, queueDO.getOpsOrderId()); + + return updated > 0; + } + + @Override + @Transactional(rollbackFor = Exception.class) + public boolean adjustPriority(Long queueId, PriorityEnum newPriority, String reason) { + OpsOrderQueueDO queueDO = orderQueueMapper.selectById(queueId); + if (queueDO == null) { + log.warn("队列记录不存在: queueId={}", queueId); + return false; + } + + Integer oldPriority = queueDO.getPriority(); + + // 如果优先级没有变化,直接返回 + if (oldPriority.equals(newPriority.getPriority())) { + log.info("优先级未变化: queueId={}, priority={}", queueId, newPriority); + return true; + } + + queueDO.setPriorityEnum(newPriority); + // 重新计算队列顺序 + queueDO.setQueueIndex(calculateNextQueueIndex(newPriority)); + // 重新计算队列分数(使用原入队时间保持时间戳不变) + LocalDateTime enqueueTime = queueDO.getEnqueueTime() != null ? queueDO.getEnqueueTime() : LocalDateTime.now(); + double newQueueScore = calculateQueueScore(newPriority.getPriority(), enqueueTime); + queueDO.setQueueScore(newQueueScore); + queueDO.setEventMessage("优先级调整: " + oldPriority + " -> " + newPriority + ", 原因: " + reason); + + int updated = orderQueueMapper.updateById(queueDO); + + // 异步更新 Redis + if (updated > 0) { + CompletableFuture.runAsync(() -> { + try { + redisQueueService.updatePriority(queueId, newPriority.getPriority()); + } catch (Exception e) { + log.error("Redis 更新优先级失败: queueId={}", queueId, e); + } + }); + } + + log.info("优先级已调整: queueId={}, opsOrderId={}, oldPriority={}, newPriority={}, reason={}, newScore={}", + queueId, queueDO.getOpsOrderId(), oldPriority, newPriority, reason, newQueueScore); + + // 如果升级为P0,触发紧急派单 + if (newPriority.isUrgent()) { + log.warn("任务升级为P0紧急,需立即处理: queueId={}, opsOrderId={}", + queueId, queueDO.getOpsOrderId()); + // TODO: 触发紧急派单流程 + } + + return updated > 0; + } + + @Override + @Transactional(rollbackFor = Exception.class) + public boolean adjustPriorityByOpsOrderId(Long opsOrderId, PriorityEnum newPriority, String reason) { + OpsOrderQueueDO queueDO = orderQueueMapper.selectByOpsOrderId(opsOrderId); + if (queueDO == null) { + log.warn("工单不在队列中: opsOrderId={}", opsOrderId); + return false; + } + + return adjustPriority(queueDO.getId(), newPriority, reason); + } + + @Override + @Transactional(rollbackFor = Exception.class) + public boolean upgradeToUrgent(Long opsOrderId, String reason) { + return adjustPriorityByOpsOrderId(opsOrderId, PriorityEnum.P0, reason); + } + + @Override + public List getWaitingQueue() { + // 优先从 Redis 获取 + // TODO: 实现全局等待队列(需要聚合所有用户的队列) + // 这里暂时使用 MySQL + List list = orderQueueMapper.selectListWaiting(); + return convertToDTO(list); + } + + @Override + public List getUrgentOrders() { + // TODO: 优先从 Redis 获取 + List list = orderQueueMapper.selectUrgentOrders(); + return convertToDTO(list); + } + + @Override + public OrderQueueDTO getCurrentTaskByUserId(Long userId) { + // 1. 优先从 Redis 获取 + List redisTasks = redisQueueService.peekTasks(userId, 1); + if (redisTasks != null && !redisTasks.isEmpty()) { + return redisTasks.get(0); + } + + // 2. Redis 未命中,从 MySQL 获取并同步到 Redis + OpsOrderQueueDO queueDO = orderQueueMapper.selectCurrentExecutingByUserId(userId); + if (queueDO != null) { + // 同步到 Redis + OrderQueueDTO dto = convertToDTO(queueDO); + CompletableFuture.runAsync(() -> { + try { + redisQueueService.enqueue(dto); + } catch (Exception e) { + log.error("同步到 Redis 失败: queueId={}", dto.getId(), e); + } + }); + return dto; + } + + return null; + } + + @Override + public List getTasksByUserId(Long userId) { + // 1. 优先从 Redis 获取 + List redisTasks = redisQueueService.getTasksByUserId(userId); + if (redisTasks != null && !redisTasks.isEmpty()) { + return redisTasks; + } + + // 2. Redis 未命中,从 MySQL 获取并同步到 Redis + List mysqlList = orderQueueMapper.selectListByUserId(userId); + if (mysqlList != null && !mysqlList.isEmpty()) { + // 同步到 Redis + List dtoList = convertToDTO(mysqlList); + CompletableFuture.runAsync(() -> { + try { + redisQueueService.batchEnqueue(dtoList); + } catch (Exception e) { + log.error("批量同步到 Redis 失败: userId={}", userId, e); + } + }); + return dtoList; + } + + return Collections.emptyList(); + } + + @Override + public List getWaitingTasksByUserId(Long userId) { + // 获取所有任务 + List allTasks = getTasksByUserId(userId); + + // 过滤出 WAITING 状态的任务,并按队列分数排序 + return allTasks.stream() + .filter(task -> OrderQueueStatusEnum.WAITING.getStatus().equals(task.getQueueStatus())) + .sorted((a, b) -> { + // 优先使用队列分数 score 排序(已在入队时计算好) + if (a.getQueueScore() != null && b.getQueueScore() != null) { + return Double.compare(a.getQueueScore(), b.getQueueScore()); + } + + // 兜底:如果 score 为空,则按优先级+时间排序 + // 按优先级排序(P0 > P1 > P2 > P3) + int priorityCompare = Integer.compare( + b.getPriority() != null ? b.getPriority() : 999, + a.getPriority() != null ? a.getPriority() : 999 + ); + if (priorityCompare != 0) { + return priorityCompare; + } + // 优先级相同,按入队时间排序 + return a.getEnqueueTime().compareTo(b.getEnqueueTime()); + }) + .collect(Collectors.toList()); + } + + @Override + public List getInterruptedTasksByUserId(Long userId) { + // 获取所有任务 + List allTasks = getTasksByUserId(userId); + + // 过滤出 PAUSED 状态的任务,并按中断时间排序 + return allTasks.stream() + .filter(task -> OrderQueueStatusEnum.PAUSED.getStatus().equals(task.getQueueStatus())) + .sorted((a, b) -> { + // 按中断时间排序(最早中断的排在前面) + // 如果没有中断时间字段,则按入队时间排序 + if (a.getEnqueueTime() != null && b.getEnqueueTime() != null) { + return a.getEnqueueTime().compareTo(b.getEnqueueTime()); + } + return 0; + }) + .collect(Collectors.toList()); + } + + @Override + public OrderQueueDTO getById(Long queueId) { + // 1. 优先从 Redis Hash 获取 + OrderQueueDTO redisDTO = redisQueueService.getByQueueId(queueId); + if (redisDTO != null) { + return redisDTO; + } + + // 2. Redis 未命中,从 MySQL 获取 + OpsOrderQueueDO queueDO = orderQueueMapper.selectById(queueId); + if (queueDO != null) { + OrderQueueDTO dto = convertToDTO(queueDO); + // 同步到 Redis + CompletableFuture.runAsync(() -> { + try { + redisQueueService.enqueue(dto); + } catch (Exception e) { + log.error("同步到 Redis 失败: queueId={}", dto.getId(), e); + } + }); + return dto; + } + + return null; + } + + @Override + public OrderQueueDTO getByOpsOrderId(Long opsOrderId) { + // 1. 优先从 Redis 获取(遍历所有用户队列,性能较差,慎用) + OrderQueueDTO redisDTO = redisQueueService.getByOrderId(opsOrderId); + if (redisDTO != null) { + return redisDTO; + } + + // 2. Redis 未命中,从 MySQL 获取 + OpsOrderQueueDO queueDO = orderQueueMapper.selectByOpsOrderId(opsOrderId); + if (queueDO != null) { + return convertToDTO(queueDO); + } + + return null; + } + + @Override + public Long countWaiting() { + // 优先从 Redis 获取(需要聚合所有用户) + // TODO: 实现分布式计数器 + return orderQueueMapper.countWaiting(); + } + + @Override + public Long countByUserId(Long userId) { + // 优先从 Redis 获取 + long redisCount = redisQueueService.getQueueSize(userId); + if (redisCount > 0) { + return redisCount; + } + + // Redis 未命中,从 MySQL 获取 + return orderQueueMapper.countByUserId(userId); + } + + @Override + @Transactional(rollbackFor = Exception.class) + public boolean updateEventMessage(Long queueId, String eventMessage) { + OpsOrderQueueDO queueDO = orderQueueMapper.selectById(queueId); + if (queueDO == null) { + log.warn("队列记录不存在: queueId={}", queueId); + return false; + } + + queueDO.setEventMessage(eventMessage); + int updated = orderQueueMapper.updateById(queueDO); + + log.info("事件消息已更新: queueId={}, eventMessage={}", queueId, eventMessage); + + return updated > 0; + } + + // ========== 私有方法 ========== + + /** + * 计算队列分数(用于排序) + * 公式:优先级分数 + 时间戳 + * + * @param priority 优先级(0=P0, 1=P1, 2=P2, 3=P3) + * @param enqueueTime 入队时间 + * @return 队列分数 + */ + private double calculateQueueScore(Integer priority, LocalDateTime enqueueTime) { + // 获取优先级分数 + long priorityScore = PRIORITY_SCORES.getOrDefault(priority, 3000000L); + + // 计算时间戳(秒级) + long timestamp; + if (enqueueTime != null) { + timestamp = enqueueTime.atZone(ZoneId.systemDefault()).toEpochSecond(); + } else { + timestamp = System.currentTimeMillis() / 1000; + } + + return priorityScore + timestamp; + } + + /** + * 计算下一个队列顺序号 + */ + private Integer calculateNextQueueIndex(PriorityEnum priority) { + // TODO: 查询当前优先级的最大 queueIndex,然后 +1 + // 这里简化处理,返回默认值 + return priority.getQueueOrder() * 1000; + } + + /** + * 判断是否为终态 + */ + private boolean isTerminalStatus(String status) { + return "CANCELLED".equals(status) || "COMPLETED".equals(status) + || "REMOVED".equals(status) || "FAILED".equals(status); + } + + /** + * 校验状态流转是否合法 + */ + private boolean validateStatusTransition(String oldStatus, String newStatus) { + // 终态不能再变更 + if (isTerminalStatus(oldStatus)) { + return false; + } + + // 允许的状态流转(支持新旧状态) + return switch (oldStatus.toUpperCase()) { + case "PENDING" -> "WAITING".equals(newStatus) || "CANCELLED".equals(newStatus); + case "WAITING" -> "PROCESSING".equals(newStatus) || "DISPATCHED".equals(newStatus) + || "CANCELLED".equals(newStatus); + case "PROCESSING", "DISPATCHED" -> "PAUSED".equals(newStatus) + || "REMOVED".equals(newStatus) || "COMPLETED".equals(newStatus); + case "PAUSED" -> "PROCESSING".equals(newStatus) || "DISPATCHED".equals(newStatus); + default -> false; + }; + } + + /** + * 根据状态更新时间戳 + */ + private void updateStatusTimestamp(OpsOrderQueueDO queueDO, OrderQueueStatusEnum status) { + switch (status.getStatus().toUpperCase()) { + case "DISPATCHED" -> { + if (queueDO.getDequeueTime() == null) { + queueDO.setDequeueTime(LocalDateTime.now()); + } + } + case "REMOVED", "COMPLETED" -> { + // 可以记录完成时间(如果表有该字段) + } + } + } + + /** + * 转换为 DTO + */ + private OrderQueueDTO convertToDTO(OpsOrderQueueDO queueDO) { + OrderQueueDTO dto = new OrderQueueDTO(); + BeanUtils.copyProperties(queueDO, dto); + return dto; + } + + /** + * 批量转换为 DTO + */ + private List convertToDTO(List list) { + return list.stream() + .map(this::convertToDTO) + .collect(Collectors.toList()); + } +} diff --git a/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/queue/QueueSyncService.java b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/queue/QueueSyncService.java index 6724561..57168a0 100644 --- a/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/queue/QueueSyncService.java +++ b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/queue/QueueSyncService.java @@ -6,12 +6,10 @@ import com.viewsh.module.ops.dal.mysql.queue.OpsOrderQueueMapper; import com.viewsh.module.ops.enums.OrderQueueStatusEnum; import jakarta.annotation.Resource; import lombok.extern.slf4j.Slf4j; -import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import java.time.LocalDateTime; -import java.util.Date; import java.util.List; import java.util.Map; import java.util.stream.Collectors; @@ -36,10 +34,10 @@ public class QueueSyncService { private RedisOrderQueueService redisQueueService; /** - * 定时同步任务:每5分钟执行一次 + * 同步 MySQL 数据到 Redis * 将最近1小时内变更的数据同步到 Redis + * 由 xxl-job 定时任务调用 */ - @Scheduled(cron = "0 */5 * * * ?") public void syncMySQLToRedis() { try { log.info("开始定时同步:MySQL -> Redis"); @@ -51,7 +49,7 @@ public class QueueSyncService { .ge(OpsOrderQueueDO::getUpdateTime, startTime) .in(OpsOrderQueueDO::getQueueStatus, OrderQueueStatusEnum.WAITING.getStatus(), - OrderQueueStatusEnum.DISPATCHED.getStatus(), + OrderQueueStatusEnum.PROCESSING.getStatus(), OrderQueueStatusEnum.PAUSED.getStatus()) ); @@ -93,7 +91,7 @@ public class QueueSyncService { .eq(OpsOrderQueueDO::getUserId, cleanerId) .in(OpsOrderQueueDO::getQueueStatus, OrderQueueStatusEnum.WAITING.getStatus(), - OrderQueueStatusEnum.DISPATCHED.getStatus(), + OrderQueueStatusEnum.PROCESSING.getStatus(), OrderQueueStatusEnum.PAUSED.getStatus()) ); @@ -141,7 +139,7 @@ public class QueueSyncService { .eq(OpsOrderQueueDO::getUserId, cleanerId) .in(OpsOrderQueueDO::getQueueStatus, OrderQueueStatusEnum.WAITING.getStatus(), - OrderQueueStatusEnum.DISPATCHED.getStatus(), + OrderQueueStatusEnum.PROCESSING.getStatus(), OrderQueueStatusEnum.PAUSED.getStatus()) ); @@ -181,7 +179,7 @@ public class QueueSyncService { new com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper() .in(OpsOrderQueueDO::getQueueStatus, OrderQueueStatusEnum.WAITING.getStatus(), - OrderQueueStatusEnum.DISPATCHED.getStatus(), + OrderQueueStatusEnum.PROCESSING.getStatus(), OrderQueueStatusEnum.PAUSED.getStatus()) );