diff --git a/viewsh-module-ops/viewsh-module-environment-biz/pom.xml b/viewsh-module-ops/viewsh-module-environment-biz/pom.xml
index 3816476..9e94a54 100644
--- a/viewsh-module-ops/viewsh-module-environment-biz/pom.xml
+++ b/viewsh-module-ops/viewsh-module-environment-biz/pom.xml
@@ -48,5 +48,18 @@
com.viewsh
viewsh-spring-boot-starter-biz-tenant
+
+
+
+ com.viewsh
+ viewsh-spring-boot-starter-mq
+
+
+
+
+ org.apache.rocketmq
+ rocketmq-spring-boot-starter
+ true
+
diff --git a/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/dispatch/DispatchEngineServiceAdapter.java b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/dispatch/DispatchEngineServiceAdapter.java
new file mode 100644
index 0000000..16ee5e3
--- /dev/null
+++ b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/dispatch/DispatchEngineServiceAdapter.java
@@ -0,0 +1,396 @@
+package com.viewsh.module.ops.service.dispatch;
+
+import com.viewsh.module.ops.api.dispatch.DispatchEngineService;
+import com.viewsh.module.ops.api.queue.OrderQueueDTO;
+import com.viewsh.module.ops.api.queue.OrderQueueService;
+import com.viewsh.module.ops.core.dispatch.DispatchEngine;
+import com.viewsh.module.ops.core.dispatch.model.DispatchResult;
+import com.viewsh.module.ops.core.dispatch.model.OrderDispatchContext;
+import com.viewsh.module.ops.core.lifecycle.OrderLifecycleManager;
+import com.viewsh.module.ops.dal.dataobject.workorder.OpsOrderDO;
+import com.viewsh.module.ops.dal.mysql.workorder.OpsOrderMapper;
+import com.viewsh.module.ops.enums.OperatorTypeEnum;
+import com.viewsh.module.ops.enums.PriorityEnum;
+import com.viewsh.module.ops.enums.WorkOrderStatusEnum;
+import jakarta.annotation.Resource;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Service;
+import org.springframework.transaction.annotation.Transactional;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * 派单引擎服务适配器
+ *
+ * 职责:
+ * 1. 适配 {@link DispatchEngineService} API 到 {@link DispatchEngine} 实现
+ * 2. 使用新的调度引擎处理派单逻辑
+ *
+ * 设计模式:适配器模式
+ * - 将旧的 API 调用转换为新的调度引擎调用
+ * - 新架构:策略模式 + 责任链模式
+ *
+ * @author lzh
+ */
+@Slf4j
+@Service
+public class DispatchEngineServiceAdapter implements DispatchEngineService {
+
+ @Resource
+ private DispatchEngine dispatchEngine;
+
+ @Resource
+ private OrderLifecycleManager orderLifecycleManager;
+
+ @Resource
+ private OrderQueueService orderQueueService;
+
+ @Resource
+ private OpsOrderMapper orderMapper;
+
+ @Override
+ @Transactional(rollbackFor = Exception.class)
+ public boolean autoDispatch(Long queueId) {
+ log.info("自动派单: queueId={}", queueId);
+
+ // 1. 查询队列记录
+ OrderQueueDTO queueDTO = orderQueueService.getById(queueId);
+ if (queueDTO == null) {
+ log.error("队列记录不存在: queueId={}", queueId);
+ return false;
+ }
+
+ // 2. 查询工单
+ OpsOrderDO order = orderMapper.selectById(queueDTO.getOpsOrderId());
+ if (order == null) {
+ log.error("工单不存在: orderId={}", queueDTO.getOpsOrderId());
+ return false;
+ }
+
+ // 3. 构建调度上下文
+ OrderDispatchContext context = OrderDispatchContext.builder()
+ .orderId(order.getId())
+ .orderCode(order.getOrderCode())
+ .orderTitle(order.getTitle())
+ .businessType(order.getOrderType())
+ .areaId(order.getAreaId())
+ .priority(PriorityEnum.fromPriority(order.getPriority()))
+ .recommendedAssigneeId(queueDTO.getUserId())
+ .build();
+
+ // 4. 执行调度(使用新引擎)
+ DispatchResult result = dispatchEngine.dispatch(context);
+
+ if (result.isSuccess()) {
+ log.info("自动派单成功: queueId={}, assigneeId={}, path={}",
+ queueId, result.getAssigneeId(), result.getPath());
+ return true;
+ } else {
+ log.error("自动派单失败: queueId={}, reason={}", queueId, result.getMessage());
+ return false;
+ }
+ }
+
+ @Override
+ @Transactional(rollbackFor = Exception.class)
+ public boolean manualDispatch(Long queueId, Long assigneeId) {
+ log.info("手动派单: queueId={}, assigneeId={}", queueId, assigneeId);
+
+ // 1. 查询队列记录
+ OrderQueueDTO queueDTO = orderQueueService.getById(queueId);
+ if (queueDTO == null) {
+ log.error("队列记录不存在: queueId={}", queueId);
+ return false;
+ }
+
+ // 2. 查询工单
+ OpsOrderDO order = orderMapper.selectById(queueDTO.getOpsOrderId());
+ if (order == null) {
+ log.error("工单不存在: orderId={}", queueDTO.getOpsOrderId());
+ return false;
+ }
+
+ // 3. 更新工单的执行人
+ order.setAssigneeId(assigneeId);
+ orderMapper.updateById(order);
+
+ // 4. 使用生命周期管理器直接派单
+ orderLifecycleManager.dispatch(
+ com.viewsh.module.ops.core.lifecycle.model.OrderTransitionRequest.builder()
+ .orderId(order.getId())
+ .targetStatus(WorkOrderStatusEnum.DISPATCHED)
+ .assigneeId(assigneeId)
+ .queueId(queueId)
+ .operatorType(OperatorTypeEnum.ADMIN)
+ .operatorId(assigneeId)
+ .reason("管理员手动派单")
+ .build()
+ );
+
+ log.info("手动派单成功: queueId={}, assigneeId={}", queueId, assigneeId);
+ return true;
+ }
+
+ @Override
+ public int batchAutoDispatch(Long areaId, int batchSize) {
+ log.info("批量自动派单: areaId={}, batchSize={}", areaId, batchSize);
+
+ // 查询所有等待派单的任务
+ List allWaitingTasks = orderQueueService.getWaitingQueue();
+ if (allWaitingTasks.isEmpty()) {
+ log.info("没有等待派单的任务: areaId={}", areaId);
+ return 0;
+ }
+
+ // 过滤指定区域的任务
+ List waitingTasks = areaId != null
+ ? allWaitingTasks.stream()
+ .filter(t -> {
+ OpsOrderDO order = orderMapper.selectById(t.getOpsOrderId());
+ return order != null && order.getAreaId().equals(areaId);
+ })
+ .limit(batchSize)
+ .collect(Collectors.toList())
+ : allWaitingTasks.stream().limit(batchSize).collect(Collectors.toList());
+
+ if (waitingTasks.isEmpty()) {
+ log.info("该区域没有等待派单的任务: areaId={}", areaId);
+ return 0;
+ }
+
+ int successCount = 0;
+ for (OrderQueueDTO task : waitingTasks) {
+ try {
+ if (autoDispatch(task.getId())) {
+ successCount++;
+ }
+ } catch (Exception e) {
+ log.error("批量派单失败: queueId={}", task.getId(), e);
+ }
+ }
+
+ log.info("批量派单完成: total={}, success={}", waitingTasks.size(), successCount);
+ return successCount;
+ }
+
+ @Override
+ @Transactional(rollbackFor = Exception.class)
+ public boolean urgentDispatch(Long queueId) {
+ log.warn("紧急派单: queueId={}", queueId);
+
+ // 1. 查询队列记录
+ OrderQueueDTO queueDTO = orderQueueService.getById(queueId);
+ if (queueDTO == null) {
+ log.error("队列记录不存在: queueId={}", queueId);
+ return false;
+ }
+
+ // 2. 查询工单
+ OpsOrderDO order = orderMapper.selectById(queueDTO.getOpsOrderId());
+ if (order == null) {
+ log.error("工单不存在: orderId={}", queueDTO.getOpsOrderId());
+ return false;
+ }
+
+ // 3. 使用新引擎的紧急打断方法
+ DispatchResult result = dispatchEngine.urgentInterrupt(order.getId(), queueDTO.getUserId());
+
+ if (result.isSuccess()) {
+ log.warn("紧急派单成功: queueId={}, assigneeId={}", queueId, result.getAssigneeId());
+ return true;
+ } else {
+ log.error("紧急派单失败: queueId={}, reason={}", queueId, result.getMessage());
+ return false;
+ }
+ }
+
+ @Override
+ @Transactional(rollbackFor = Exception.class)
+ public boolean handlePriorityInterrupt(Long orderId) {
+ log.warn("处理优先级升级打断: orderId={}", orderId);
+
+ // 1. 查询工单
+ OpsOrderDO order = orderMapper.selectById(orderId);
+ if (order == null) {
+ log.error("工单不存在: orderId={}", orderId);
+ return false;
+ }
+
+ // 2. 查询队列记录
+ OrderQueueDTO queueDTO = orderQueueService.getByOpsOrderId(orderId);
+ if (queueDTO == null) {
+ log.warn("工单不在队列中: orderId={}", orderId);
+ return true;
+ }
+
+ // 3. 使用新引擎的紧急打断方法
+ DispatchResult result = dispatchEngine.urgentInterrupt(orderId, queueDTO.getUserId());
+
+ if (result.isSuccess()) {
+ log.warn("优先级升级打断成功: orderId={}", orderId);
+ return true;
+ } else {
+ log.error("优先级升级打断失败: orderId={}, reason={}", orderId, result.getMessage());
+ return false;
+ }
+ }
+
+ @Override
+ public List findAvailableAssignees(Long areaId, String assigneeType) {
+ log.info("查询可用执行人: areaId={}, assigneeType={}", areaId, assigneeType);
+
+ // 根据业务类型查询可用人员
+ // 这里简化处理,直接返回空列表
+ // 实际应该调用相应的分配策略获取可用人员
+ log.warn("findAvailableAssignees 方法暂未实现,返回空列表");
+ return Collections.emptyList();
+ }
+
+ @Override
+ public Long recommendAssignee(Long queueId, com.viewsh.module.ops.enums.DispatchStrategyEnum dispatchStrategy) {
+ log.info("推荐执行人: queueId={}, strategy={}", queueId, dispatchStrategy);
+
+ // 1. 查询队列记录
+ OrderQueueDTO queueDTO = orderQueueService.getById(queueId);
+ if (queueDTO == null) {
+ log.error("队列记录不存在: queueId={}", queueId);
+ return null;
+ }
+
+ // 2. 查询工单
+ OpsOrderDO order = orderMapper.selectById(queueDTO.getOpsOrderId());
+ if (order == null) {
+ log.error("工单不存在: orderId={}", queueDTO.getOpsOrderId());
+ return null;
+ }
+
+ // 3. 使用新引擎的推荐方法
+ OrderDispatchContext context = OrderDispatchContext.builder()
+ .orderId(order.getId())
+ .orderCode(order.getOrderCode())
+ .orderTitle(order.getTitle())
+ .businessType(order.getOrderType())
+ .areaId(order.getAreaId())
+ .priority(PriorityEnum.fromPriority(order.getPriority()))
+ .build();
+
+ com.viewsh.module.ops.core.dispatch.model.AssigneeRecommendation recommendation =
+ dispatchEngine.recommendAssignee(context);
+
+ if (recommendation != null && recommendation.hasRecommendation()) {
+ log.info("推荐执行人成功: queueId={}, assigneeId={}, assigneeName={}",
+ queueId, recommendation.getAssigneeId(), recommendation.getAssigneeName());
+ return recommendation.getAssigneeId();
+ }
+
+ log.warn("未找到合适的执行人: queueId={}", queueId);
+ return null;
+ }
+
+ @Override
+ public DispatchSuggestion getDispatchSuggestion(Long queueId) {
+ log.info("获取派单建议: queueId={}", queueId);
+
+ // 1. 查询队列记录
+ OrderQueueDTO queueDTO = orderQueueService.getById(queueId);
+ if (queueDTO == null) {
+ log.error("队列记录不存在: queueId={}", queueId);
+ return new DispatchSuggestion(null, null, "队列记录不存在", 0);
+ }
+
+ // 2. 查询工单
+ OpsOrderDO order = orderMapper.selectById(queueDTO.getOpsOrderId());
+ if (order == null) {
+ log.error("工单不存在: orderId={}", queueDTO.getOpsOrderId());
+ return new DispatchSuggestion(null, null, "工单不存在", 0);
+ }
+
+ // 3. 使用新引擎的推荐方法
+ OrderDispatchContext context = OrderDispatchContext.builder()
+ .orderId(order.getId())
+ .orderCode(order.getOrderCode())
+ .orderTitle(order.getTitle())
+ .businessType(order.getOrderType())
+ .areaId(order.getAreaId())
+ .priority(PriorityEnum.fromPriority(order.getPriority()))
+ .build();
+
+ com.viewsh.module.ops.core.dispatch.model.AssigneeRecommendation recommendation =
+ dispatchEngine.recommendAssignee(context);
+
+ if (recommendation != null && recommendation.hasRecommendation()) {
+ return new DispatchSuggestion(
+ recommendation.getAssigneeId(),
+ recommendation.getAssigneeName(),
+ recommendation.getReason(),
+ recommendation.getScore()
+ );
+ }
+
+ return new DispatchSuggestion(null, null, "未找到合适的执行人", 0);
+ }
+
+ @Override
+ @Transactional(rollbackFor = Exception.class)
+ public boolean cancelDispatch(Long queueId, String reason) {
+ log.info("取消派单: queueId={}, reason={}", queueId, reason);
+
+ // 1. 查询队列记录
+ OrderQueueDTO queueDTO = orderQueueService.getById(queueId);
+ if (queueDTO == null) {
+ log.error("队列记录不存在: queueId={}", queueId);
+ return false;
+ }
+
+ // 2. 使用生命周期管理器取消工单
+ try {
+ orderLifecycleManager.cancelOrder(
+ queueDTO.getOpsOrderId(),
+ queueDTO.getUserId(),
+ OperatorTypeEnum.ADMIN,
+ reason
+ );
+ log.info("取消派单成功: queueId={}", queueId);
+ return true;
+ } catch (Exception e) {
+ log.error("取消派单失败: queueId={}", queueId, e);
+ return false;
+ }
+ }
+
+ @Override
+ @Transactional(rollbackFor = Exception.class)
+ public boolean redispatch(Long queueId) {
+ log.info("重新派单: queueId={}", queueId);
+
+ // 1. 查询队列记录
+ OrderQueueDTO queueDTO = orderQueueService.getById(queueId);
+ if (queueDTO == null) {
+ log.error("队列记录不存在: queueId={}", queueId);
+ return false;
+ }
+
+ // 2. 重置队列状态为等待
+ orderQueueService.updateStatus(queueId, com.viewsh.module.ops.enums.OrderQueueStatusEnum.WAITING);
+
+ // 3. 重新执行自动派单
+ return autoDispatch(queueId);
+ }
+
+ @Override
+ public void startDispatchEngine() {
+ log.info("启动派单引擎");
+ // 新架构下,派单引擎不使用定时任务
+ // 派单由事件驱动触发
+ log.info("派单引擎已启动(事件驱动模式)");
+ }
+
+ @Override
+ public void stopDispatchEngine() {
+ log.info("停止派单引擎");
+ // 新架构下,派单引擎不使用定时任务
+ log.info("派单引擎已停止");
+ }
+}
diff --git a/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/dispatch/DispatchEngineServiceImpl.java b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/dispatch/DispatchEngineServiceImpl.java
deleted file mode 100644
index 62a1f29..0000000
--- a/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/dispatch/DispatchEngineServiceImpl.java
+++ /dev/null
@@ -1,324 +0,0 @@
-package com.viewsh.module.ops.service.dispatch;
-
-import com.viewsh.module.ops.api.dispatch.DispatchEngineService;
-import com.viewsh.module.ops.api.queue.OrderQueueDTO;
-import com.viewsh.module.ops.api.queue.OrderQueueService;
-import com.viewsh.module.ops.dal.dataobject.queue.OpsOrderQueueDO;
-import com.viewsh.module.ops.dal.mysql.queue.OpsOrderQueueMapper;
-import com.viewsh.module.ops.enums.DispatchStrategyEnum;
-import com.viewsh.module.ops.enums.OrderQueueStatusEnum;
-import com.viewsh.module.ops.enums.PriorityEnum;
-import jakarta.annotation.PostConstruct;
-import jakarta.annotation.Resource;
-import lombok.extern.slf4j.Slf4j;
-import org.springframework.stereotype.Service;
-import org.springframework.transaction.annotation.Transactional;
-
-import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
-
-/**
- * 派单引擎服务实现
- * 提供通用的派单框架,各业务模块通过实现 DispatchStrategy 接口提供具体策略
- *
- * 注意:新的队列模型是在工单创建时就指定了 userId(保洁员),
- * 所以派单引擎的主要职责是:
- * 1. 管理队列状态的流转(WAITING -> dispatched)
- * 2. 处理P0紧急任务的插队和打断
- *
- * @author lzh
- */
-@Service
-@Slf4j
-public class DispatchEngineServiceImpl implements DispatchEngineService {
-
- @Resource
- private OrderQueueService orderQueueService;
-
- @Resource
- private OpsOrderQueueMapper orderQueueMapper;
-
- /**
- * 派单策略注册表
- * Key: 策略名称
- * Value: 策略实现
- */
- private final Map strategyRegistry = new ConcurrentHashMap<>();
-
- /**
- * 执行人员类型与策略的映射
- */
- private final Map assigneeTypeStrategyMap = new ConcurrentHashMap<>();
-
- @PostConstruct
- public void init() {
- log.info("派单引擎已初始化");
- }
-
- // ========== 策略管理 ==========
-
- /**
- * 注册派单策略
- */
- public void registerStrategy(DispatchStrategy strategy) {
- strategyRegistry.put(strategy.getName(), strategy);
- log.info("派单策略已注册: strategyName={}", strategy.getName());
- }
-
- /**
- * 注册执行人员类型与策略的映射
- */
- public void registerAssigneeTypeStrategy(String assigneeType, String strategyName) {
- assigneeTypeStrategyMap.put(assigneeType, strategyName);
- log.info("执行人员类型策略映射已注册: assigneeType={}, strategyName={}",
- assigneeType, strategyName);
- }
-
- private DispatchStrategy getStrategy(String strategyName) {
- return strategyRegistry.get(strategyName);
- }
-
- private DispatchStrategy getStrategyByAssigneeType(String assigneeType) {
- String strategyName = assigneeTypeStrategyMap.get(assigneeType);
- if (strategyName == null) {
- log.warn("未找到执行人员类型对应的策略: assigneeType={}", assigneeType);
- return null;
- }
- return getStrategy(strategyName);
- }
-
- // ========== 派单方法实现 ==========
-
- @Override
- @Transactional(rollbackFor = Exception.class)
- public boolean autoDispatch(Long queueId) {
- log.info("开始自动派单: queueId={}", queueId);
-
- // 在新的队列模型中,工单入队时已经指定了 userId
- // 派单引擎只需要将状态从 WAITING 改为 EXECUTING
- boolean success = orderQueueService.startExecution(queueId);
-
- if (success) {
- log.info("自动派单成功: queueId={}", queueId);
- } else {
- log.warn("自动派单失败: queueId={}", queueId);
- }
-
- return success;
- }
-
- @Override
- @Transactional(rollbackFor = Exception.class)
- public boolean manualDispatch(Long queueId, Long assigneeId) {
- log.info("手动派单: queueId={}, assigneeId={}", queueId, assigneeId);
-
- // 在新的队列模型中,userId 在入队时已经确定
- // 手动派单主要是重新分配 userId,然后开始执行
- OpsOrderQueueDO queueDO = orderQueueMapper.selectById(queueId);
- if (queueDO == null) {
- log.warn("队列记录不存在: queueId={}", queueId);
- return false;
- }
-
- // 更新 userId
- queueDO.setUserId(assigneeId);
- orderQueueMapper.updateById(queueDO);
-
- // 开始执行
- return autoDispatch(queueId);
- }
-
- @Override
- @Transactional(rollbackFor = Exception.class)
- public int batchAutoDispatch(Long areaId, int batchSize) {
- log.info("开始批量自动派单: batchSize={}", batchSize);
-
- // 查询等待中的队列
- List waitingQueue = orderQueueService.getWaitingQueue();
- if (waitingQueue.isEmpty()) {
- log.info("没有待派单的任务");
- return 0;
- }
-
- // 限制批次大小
- int processCount = Math.min(batchSize, waitingQueue.size());
- int successCount = 0;
-
- // 逐个派单
- for (int i = 0; i < processCount; i++) {
- OrderQueueDTO queueDTO = waitingQueue.get(i);
- try {
- boolean success = autoDispatch(queueDTO.getId());
- if (success) {
- successCount++;
- }
- } catch (Exception e) {
- log.error("派单失败: queueId={}, opsOrderId={}",
- queueDTO.getId(), queueDTO.getOpsOrderId(), e);
- }
- }
-
- log.info("批量派单完成: total={}, success={}", processCount, successCount);
- return successCount;
- }
-
- @Override
- @Transactional(rollbackFor = Exception.class)
- public boolean urgentDispatch(Long queueId) {
- log.warn("开始紧急派单(P0): queueId={}", queueId);
-
- OrderQueueDTO queueDTO = orderQueueService.getById(queueId);
- if (queueDTO == null) {
- log.warn("队列记录不存在: queueId={}", queueId);
- return false;
- }
-
- // P0 紧急任务的处理:
- // 1. 直接开始执行
- // 2. 如果该用户正在执行其他任务,需要先打断
-
- // 检查该用户是否正在执行其他任务
- OrderQueueDTO currentTask = orderQueueService.getCurrentTaskByUserId(queueDTO.getUserId());
- if (currentTask != null && !currentTask.getId().equals(queueId)) {
- // 用户正在执行其他任务,需要打断
- log.warn("该用户正在执行其他任务,需要打断: userId={}, currentTaskId={}",
- queueDTO.getUserId(), currentTask.getId());
-
- // 判断是否可以打断
- String assigneeType = "CLEANER"; // TODO: 从用户表获取
- DispatchStrategy strategy = getStrategyByAssigneeType(assigneeType);
- if (strategy != null && strategy.canInterrupt(
- queueDTO.getUserId(),
- currentTask.getOpsOrderId(),
- queueDTO.getOpsOrderId())) {
- // 打断当前任务
- interruptTask(currentTask.getId(), currentTask.getOpsOrderId(), queueDTO.getOpsOrderId());
- } else {
- log.warn("不允许打断当前任务: currentTaskId={}", currentTask.getId());
- return false;
- }
- }
-
- // 开始执行紧急任务
- return autoDispatch(queueId);
- }
-
- @Override
- @Transactional(rollbackFor = Exception.class)
- public boolean handlePriorityInterrupt(Long orderId) {
- log.warn("处理紧急插队: orderId={}", orderId);
-
- OrderQueueDTO queueDTO = orderQueueService.getByOpsOrderId(orderId);
- if (queueDTO == null) {
- log.warn("工单不在队列中: orderId={}", orderId);
- return false;
- }
-
- // 如果不是P0,不需要特殊处理
- if (!PriorityEnum.P0.getPriority().equals(queueDTO.getPriority())) {
- log.info("非P0工单,无需插队处理: orderId={}, priority={}",
- orderId, queueDTO.getPriority());
- return true;
- }
-
- // 执行紧急派单
- return urgentDispatch(queueDTO.getId());
- }
-
- @Override
- public List findAvailableAssignees(Long areaId, String assigneeType) {
- log.debug("查询可用执行人员: areaId={}, assigneeType={}", areaId, assigneeType);
-
- // 根据执行人员类型获取策略
- DispatchStrategy strategy = getStrategyByAssigneeType(assigneeType);
- if (strategy == null) {
- log.warn("未找到派单策略: assigneeType={}", assigneeType);
- return Collections.emptyList();
- }
-
- // 调用策略查询可用人员
- // TODO: 这里需要在策略接口中添加查询可用人员列表的方法
- return Collections.emptyList();
- }
-
- @Override
- public Long recommendAssignee(Long queueId, DispatchStrategyEnum dispatchStrategy) {
- log.debug("推荐执行人员: queueId={}", queueId);
-
- // 在新的队列模型中,推荐执行人员的逻辑应该在入队之前完成
- // 这里只是预留接口,实际应该返回 null
- log.warn("在新队列模型中,推荐执行人员应该在入队时完成,而非派单时");
- return null;
- }
-
- @Override
- public DispatchSuggestion getDispatchSuggestion(Long queueId) {
- // 在新的队列模型中,这个方法的意义不大
- // 因为 userId 在入队时已经确定
- log.warn("在新队列模型中,派单建议功能不适用");
- return null;
- }
-
- @Override
- @Transactional(rollbackFor = Exception.class)
- public boolean cancelDispatch(Long queueId, String reason) {
- log.info("取消派单: queueId={}, reason={}", queueId, reason);
-
- OrderQueueDTO queueDTO = orderQueueService.getById(queueId);
- if (queueDTO == null) {
- return false;
- }
-
- if (!queueDTO.canCancel()) {
- log.warn("当前状态不允许取��: queueId={}, status={}", queueId, queueDTO.getQueueStatus());
- return false;
- }
-
- return orderQueueService.updateStatus(queueId, OrderQueueStatusEnum.CANCELLED);
- }
-
- @Override
- @Transactional(rollbackFor = Exception.class)
- public boolean redispatch(Long queueId) {
- log.info("重新派单: queueId={}", queueId);
-
- // 重新派单就是重新开始执行
- return autoDispatch(queueId);
- }
-
- @Override
- public void startDispatchEngine() {
- log.info("派单引擎已启动");
- // TODO: 启动定时任务,定时处理待派单队列
- }
-
- @Override
- public void stopDispatchEngine() {
- log.info("派单引擎已停止");
- // TODO: 停止定时任务
- }
-
- // ========== 私有方法 ==========
-
- /**
- * 打断任务
- */
- private void interruptTask(Long queueId, Long currentOrderId, Long urgentOrderId) {
- log.warn("打断任务: queueId={}, currentOrderId={}, urgentOrderId={}",
- queueId, currentOrderId, urgentOrderId);
-
- // 暂停当前任务
- boolean paused = orderQueueService.pauseTask(queueId);
-
- if (paused) {
- // 更新事件消息
- orderQueueService.updateEventMessage(queueId,
- "任务被P0紧急任务打断,紧急工单ID: " + urgentOrderId);
-
- // TODO: 通知执行人员任务被打断(通过工牌推送消息)
- // TODO: 更新工单状态为暂停(调用工单服务)
-
- log.info("任务已打断,执行人员可处理紧急任务: queueId={}", queueId);
- }
- }
-}
diff --git a/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/dispatch/DispatchStrategy.java b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/dispatch/DispatchStrategy.java
deleted file mode 100644
index ced8d36..0000000
--- a/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/dispatch/DispatchStrategy.java
+++ /dev/null
@@ -1,40 +0,0 @@
-package com.viewsh.module.ops.service.dispatch;
-
-import com.viewsh.module.ops.api.queue.OrderQueueDTO;
-
-/**
- * 派单策略接口
- * 各业务模块(保洁、安保、工程等)需要实现此接口,定义自己的派单逻辑
- *
- * @author lzh
- */
-public interface DispatchStrategy {
-
- /**
- * 策略名称
- * 如:cleaner_area_priority, security_skill_match
- *
- * @return 策略名称
- */
- String getName();
-
- /**
- * 执行派单策略
- * 根据队列记录推荐合适的执行人员
- *
- * @param queueDTO 队列记录
- * @return 推荐的执行人员ID,如果没有合适的返回null
- */
- Long recommendAssignee(OrderQueueDTO queueDTO);
-
- /**
- * 判断是否可打断当前任务
- * 当P0紧急任务需要插队时,判断是否可以打断当前执行的任务
- *
- * @param currentAssigneeId 当前执行任务的执行人员ID
- * @param currentOrderId 当前正在执行的工单ID
- * @param urgentOrderId 紧急工单ID
- * @return 是否可以打断
- */
- boolean canInterrupt(Long currentAssigneeId, Long currentOrderId, Long urgentOrderId);
-}
diff --git a/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/fsm/OrderStateMachine.java b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/fsm/OrderStateMachine.java
index e486909..8a3e655 100644
--- a/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/fsm/OrderStateMachine.java
+++ b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/fsm/OrderStateMachine.java
@@ -133,17 +133,26 @@ public class OrderStateMachine {
throw new IllegalArgumentException("目标状态不能为空");
}
- // 2. 校验状态转换合法性
+ // 2. 获取当前状态
WorkOrderStatusEnum currentStatus = WorkOrderStatusEnum.valueOf(order.getStatus());
+
+ // 3. 如果目标状态与当前状态相同,直接返回(避免不必要的操作)
+ if (currentStatus == newStatus) {
+ log.debug("工单状态未变化,跳过状态转换: orderId={}, status={}",
+ order.getId(), currentStatus);
+ return;
+ }
+
+ // 4. 校验状态转换合法性
validateTransition(currentStatus, newStatus);
- // 3. 更新工单状态和相关字段
+ // 5. 更新工单状态和相关字段
WorkOrderStatusEnum oldStatus = currentStatus;
order.setStatus(newStatus.name());
updateStatusFields(order, newStatus);
opsOrderMapper.updateById(order);
- // 4. 记录事件流
+ // 6. 记录事件流
eventService.recordEvent(
order.getId(),
oldStatus.name(),
@@ -154,7 +163,7 @@ public class OrderStateMachine {
remark
);
- // 5. 发布状态变更事件(替代监听器机制)
+ // 7. 发布状态变更事件(替代监听器机制)
publishStateChangedEvent(order, oldStatus, newStatus, operatorType, operatorId, remark);
log.info("工单状态转换成功: orderId={}, {} -> {}, operatorType={}, operatorId={}",
diff --git a/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/order/OpsOrderServiceImpl.java b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/order/OpsOrderServiceImpl.java
index b160761..d9db6bc 100644
--- a/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/order/OpsOrderServiceImpl.java
+++ b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/order/OpsOrderServiceImpl.java
@@ -2,6 +2,7 @@ package com.viewsh.module.ops.service.order;
import com.viewsh.framework.common.pojo.PageResult;
import com.viewsh.framework.common.util.object.BeanUtils;
+import com.viewsh.module.ops.core.lifecycle.OrderLifecycleManager;
import com.viewsh.module.ops.dal.dataobject.dto.*;
import com.viewsh.module.ops.dal.dataobject.workorder.OpsOrderDO;
import com.viewsh.module.ops.dal.mysql.workorder.OpsOrderMapper;
@@ -16,10 +17,18 @@ import org.springframework.transaction.annotation.Transactional;
import jakarta.annotation.Resource;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
-import java.util.List;
/**
* 通用工单服务实现
+ *
+ * 职责:
+ * 1. 工单CRUD基础操作(createOrder, updateOrder, deleteOrder, getOrder)
+ * 2. 简单状态转换(assignOrder, acceptOrder, completeOrder)
+ * 3. 复杂生命周期操作委托给 OrderLifecycleManager(pauseOrder, resumeOrder, cancelOrder)
+ *
+ * 架构说明:
+ * - 暂停/恢复/取消操作委托给 {@link OrderLifecycleManager} 以确保工单和队列状态同步
+ * - 简单状态转换直接使用 {@link OrderStateMachine}
*
* @author lzh
*/
@@ -33,6 +42,12 @@ public class OpsOrderServiceImpl implements OpsOrderService {
@Resource
private OrderStateMachine orderStateMachine;
+ /**
+ * 工单生命周期管理器(用于处理需要同步队列状态的操作)
+ */
+ @Resource
+ private OrderLifecycleManager orderLifecycleManager;
+
private static final DateTimeFormatter ORDER_CODE_FORMATTER = DateTimeFormatter.ofPattern("yyyyMMddHHmmss");
@Override
@@ -58,14 +73,7 @@ public class OpsOrderServiceImpl implements OpsOrderService {
// 4. 插入数据库
opsOrderMapper.insert(order);
- // 5. 记录创建事件
- orderStateMachine.transition(
- order,
- WorkOrderStatusEnum.PENDING,
- OperatorTypeEnum.SYSTEM,
- null,
- "创建工单"
- );
+ // 5. 发布工单创建事件
log.info("创建工单成功: orderId={}, orderCode={}, title={}",
order.getId(), order.getOrderCode(), order.getTitle());
@@ -232,14 +240,8 @@ public class OpsOrderServiceImpl implements OpsOrderService {
throw new RuntimeException("无权暂停工单,您不是该工单的执行人");
}
- // 3. 通过状态机执行状态转换
- orderStateMachine.transition(
- order,
- WorkOrderStatusEnum.PAUSED,
- OperatorTypeEnum.CLEANER,
- userId,
- reason
- );
+ // 3. 委托给 OrderLifecycleManager 处理(确保工单和队列状态同步)
+ orderLifecycleManager.pauseOrder(orderId, userId, reason);
log.info("暂停工单成功: orderId={}, userId={}, reason={}", orderId, userId, reason);
}
@@ -253,14 +255,8 @@ public class OpsOrderServiceImpl implements OpsOrderService {
throw new RuntimeException("工单不存在: " + orderId);
}
- // 2. 通过状态机执行状态转换
- orderStateMachine.transition(
- order,
- WorkOrderStatusEnum.ARRIVED,
- OperatorTypeEnum.CLEANER,
- userId,
- "恢复工单"
- );
+ // 2. 委托给 OrderLifecycleManager 处理(确保工单和队列状态同步)
+ orderLifecycleManager.resumeOrder(orderId, userId);
log.info("恢复工单成功: orderId={}, userId={}", orderId, userId);
}
@@ -279,14 +275,8 @@ public class OpsOrderServiceImpl implements OpsOrderService {
throw new RuntimeException("已完成的工单不能取消");
}
- // 3. 通过状态机执行状态转换
- orderStateMachine.transition(
- order,
- WorkOrderStatusEnum.CANCELLED,
- operatorType,
- operatorId,
- reason
- );
+ // 3. 委托给 OrderLifecycleManager 处理(确保工单和队列状态同步)
+ orderLifecycleManager.cancelOrder(orderId, operatorId, operatorType, reason);
log.info("取消工单成功: orderId={}, reason={}", orderId, reason);
}
diff --git a/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/queue/OrderQueueServiceEnhanced.java b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/queue/OrderQueueServiceEnhanced.java
new file mode 100644
index 0000000..d9425ac
--- /dev/null
+++ b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/queue/OrderQueueServiceEnhanced.java
@@ -0,0 +1,689 @@
+package com.viewsh.module.ops.service.queue;
+
+import com.viewsh.module.ops.api.queue.OrderQueueDTO;
+import com.viewsh.module.ops.api.queue.OrderQueueService;
+import com.viewsh.module.ops.dal.dataobject.queue.OpsOrderQueueDO;
+import com.viewsh.module.ops.dal.mysql.queue.OpsOrderQueueMapper;
+import com.viewsh.module.ops.enums.OrderQueueStatusEnum;
+import com.viewsh.module.ops.enums.PriorityEnum;
+import jakarta.annotation.Resource;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.BeanUtils;
+import org.springframework.context.annotation.Primary;
+import org.springframework.stereotype.Service;
+import org.springframework.transaction.annotation.Transactional;
+
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.util.*;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+/**
+ * 工单队列管理服务实现
+ *
+ * 架构说明:
+ * 1. 写入:先写 MySQL(持久化),再异步写 Redis(高性能)
+ * 2. 读取:优先读 Redis,未命中则读 MySQL 并同步到 Redis
+ * 3. 同步:定时任务将 MySQL 数据同步到 Redis
+ * 4. 容灾:Redis 宕机时降级到纯 MySQL 模式
+ *
+ * @author lzh
+ */
+@Slf4j
+@Service
+public class OrderQueueServiceEnhanced implements OrderQueueService {
+
+ /**
+ * Score 计算公式:优先级分数 + 时间戳
+ * 优先级分数:P0=0, P1=1000000, P2=2000000, P3=3000000
+ * 时间戳:秒级时间戳
+ * 结果:优先级高的排在前面,同优先级按时间排序
+ */
+ private static final Map PRIORITY_SCORES = Map.of(
+ 0, 0L, // P0: 0
+ 1, 1000000L, // P1: 1,000,000
+ 2, 2000000L, // P2: 2,000,000
+ 3, 3000000L // P3: 3,000,000
+ );
+
+ @Resource
+ private OpsOrderQueueMapper orderQueueMapper;
+
+ @Resource
+ private RedisOrderQueueService redisQueueService;
+
+ @Resource
+ private QueueSyncService queueSyncService;
+
+ @Override
+ @Transactional(rollbackFor = Exception.class)
+ public Long enqueue(Long opsOrderId, Long userId, PriorityEnum priority, Integer queueIndex) {
+ // 1. 检查工单是否已在队列中(MySQL)
+ OpsOrderQueueDO existingQueue = orderQueueMapper.selectByOpsOrderId(opsOrderId);
+ if (existingQueue != null) {
+ log.warn("工单已在队列中: opsOrderId={}, userId={}", opsOrderId, userId);
+ return existingQueue.getId();
+ }
+
+ // 2. 计算队列分数
+ LocalDateTime now = LocalDateTime.now();
+ double queueScore = calculateQueueScore(priority.getPriority(), now);
+
+ // 3. 创建队列记录(MySQL)
+ OpsOrderQueueDO queueDO = OpsOrderQueueDO.builder()
+ .opsOrderId(opsOrderId)
+ .userId(userId)
+ .queueIndex(queueIndex != null ? queueIndex : calculateNextQueueIndex(priority))
+ .priority(priority.getPriority())
+ .queueScore(queueScore)
+ .queueStatus(OrderQueueStatusEnum.WAITING.getStatus())
+ .enqueueTime(now)
+ .pausedDuration(0)
+ .eventMessage("工单入队,等待派单")
+ .build();
+
+ orderQueueMapper.insert(queueDO);
+
+ log.info("工单已入队(MySQL): opsOrderId={}, userId={}, priority={}, queueId={}",
+ opsOrderId, userId, priority, queueDO.getId());
+
+ // 3. 异步写入 Redis(失败不影响主流程)
+ OrderQueueDTO dto = convertToDTO(queueDO);
+ CompletableFuture.runAsync(() -> {
+ try {
+ redisQueueService.enqueue(dto);
+ log.debug("工单已入队(Redis): queueId={}", dto.getId());
+ } catch (Exception e) {
+ log.error("Redis 入队失败,依赖定时同步任务补偿: queueId={}", dto.getId(), e);
+ }
+ });
+
+ // 4. 如果是P0紧急任务,记录警告日志
+ if (priority.isUrgent()) {
+ log.warn("检测到P0紧急任务入队,需立即处理: opsOrderId={}", opsOrderId);
+ // TODO: 触发紧急派单流程(在派单引擎中实现)
+ }
+
+ return queueDO.getId();
+ }
+
+ @Override
+ @Transactional(rollbackFor = Exception.class)
+ public boolean dequeue(Long queueId) {
+ // 1. MySQL 删除
+ OpsOrderQueueDO queueDO = orderQueueMapper.selectById(queueId);
+ if (queueDO == null) {
+ log.warn("队列记录不存在: queueId={}", queueId);
+ return false;
+ }
+
+ // 只允许删除终态的记录
+ if (!isTerminalStatus(queueDO.getQueueStatus())) {
+ log.warn("只能删除终态的队列记录: queueId={}, status={}", queueId, queueDO.getQueueStatus());
+ return false;
+ }
+
+ int deleted = orderQueueMapper.deleteById(queueId);
+
+ // 2. 异步删除 Redis
+ if (deleted > 0) {
+ CompletableFuture.runAsync(() -> {
+ try {
+ redisQueueService.remove(queueId);
+ log.debug("Redis 删除队列记录成功: queueId={}", queueId);
+ } catch (Exception e) {
+ log.error("Redis 删除队列记录失败: queueId={}", queueId, e);
+ }
+ });
+ }
+
+ log.info("队列记录已删除: queueId={}, deleted={}", queueId, deleted > 0);
+ return deleted > 0;
+ }
+
+ @Override
+ @Transactional(rollbackFor = Exception.class)
+ public boolean dequeueByOpsOrderId(Long opsOrderId) {
+ OpsOrderQueueDO queueDO = orderQueueMapper.selectByOpsOrderId(opsOrderId);
+ if (queueDO == null) {
+ log.warn("工单不在队列中: opsOrderId={}", opsOrderId);
+ return false;
+ }
+
+ return dequeue(queueDO.getId());
+ }
+
+ @Override
+ @Transactional(rollbackFor = Exception.class)
+ public boolean updateStatus(Long queueId, OrderQueueStatusEnum newStatus) {
+ // 1. MySQL 更新
+ OpsOrderQueueDO queueDO = orderQueueMapper.selectById(queueId);
+ if (queueDO == null) {
+ log.warn("队列记录不存在: queueId={}", queueId);
+ return false;
+ }
+
+ String oldStatus = queueDO.getQueueStatus();
+
+ // 状态流转校验
+ if (!validateStatusTransition(oldStatus, newStatus.getStatus())) {
+ log.warn("非法的状态流转: queueId={}, oldStatus={}, newStatus={}",
+ queueId, oldStatus, newStatus.getStatus());
+ return false;
+ }
+
+ queueDO.setQueueStatus(newStatus.getStatus().toUpperCase());
+ updateStatusTimestamp(queueDO, newStatus);
+
+ int updated = orderQueueMapper.updateById(queueDO);
+
+ // 2. 异步更新 Redis
+ if (updated > 0) {
+ OrderQueueDTO dto = convertToDTO(queueDO);
+ CompletableFuture.runAsync(() -> {
+ try {
+ redisQueueService.updateStatus(queueId, newStatus.getStatus());
+ log.debug("Redis 更新状态成功: queueId={}, status={}", queueId, newStatus.getStatus());
+ } catch (Exception e) {
+ log.error("Redis 更新状态失败: queueId={}", queueId, e);
+ }
+ });
+ }
+
+ log.info("队列状态已更新: queueId={}, opsOrderId={}, oldStatus={}, newStatus={}",
+ queueId, queueDO.getOpsOrderId(), oldStatus, newStatus.getStatus());
+
+ return updated > 0;
+ }
+
+ @Override
+ @Transactional(rollbackFor = Exception.class)
+ public boolean startExecution(Long queueId) {
+ OpsOrderQueueDO queueDO = orderQueueMapper.selectById(queueId);
+ if (queueDO == null) {
+ log.warn("队列记录不存在: queueId={}", queueId);
+ return false;
+ }
+
+ if (!OrderQueueStatusEnum.WAITING.getStatus().equals(queueDO.getQueueStatus())) {
+ log.warn("只有WAITING状态的任务可以开始执行: queueId={}, status={}",
+ queueId, queueDO.getQueueStatus());
+ return false;
+ }
+
+ // 队列状态流转:WAITING → PROCESSING
+ queueDO.setQueueStatus(OrderQueueStatusEnum.PROCESSING.getStatus());
+ queueDO.setDequeueTime(LocalDateTime.now());
+ queueDO.setEventMessage("派单成功,已分配给执行人员");
+
+ int updated = orderQueueMapper.updateById(queueDO);
+
+ // 异步更新 Redis
+ if (updated > 0) {
+ CompletableFuture.runAsync(() -> {
+ try {
+ redisQueueService.updateStatus(queueId, OrderQueueStatusEnum.PROCESSING.getStatus());
+ } catch (Exception e) {
+ log.error("Redis 更新状态失败: queueId={}", queueId, e);
+ }
+ });
+ }
+
+ log.info("派单成功: queueId={}, opsOrderId={}, assigneeId={}",
+ queueId, queueDO.getOpsOrderId(), queueDO.getUserId());
+
+ return updated > 0;
+ }
+
+ @Override
+ @Transactional(rollbackFor = Exception.class)
+ public boolean pauseTask(Long queueId) {
+ OpsOrderQueueDO queueDO = orderQueueMapper.selectById(queueId);
+ if (queueDO == null) {
+ log.warn("队列记录不存在: queueId={}", queueId);
+ return false;
+ }
+
+ // 允许 PROCESSING 状态的任务可以暂停
+ String currentStatus = queueDO.getQueueStatus();
+ if (!OrderQueueStatusEnum.PROCESSING.getStatus().equals(currentStatus)) {
+ log.warn("只有PROCESSING状态的任务可以暂停: queueId={}, status={}",
+ queueId, currentStatus);
+ return false;
+ }
+
+ // 队列状态流转:PROCESSING → PAUSED
+ queueDO.setQueueStatus(OrderQueueStatusEnum.PAUSED.getStatus());
+
+ // 计算暂停时长
+ if (queueDO.getDequeueTime() != null) {
+ long pausedSeconds = java.time.Duration.between(
+ queueDO.getDequeueTime(),
+ LocalDateTime.now()
+ ).getSeconds();
+ queueDO.addPausedDuration((int) pausedSeconds);
+ }
+
+ queueDO.setEventMessage("任务已暂停");
+ int updated = orderQueueMapper.updateById(queueDO);
+
+ // 异步更新 Redis
+ if (updated > 0) {
+ CompletableFuture.runAsync(() -> {
+ try {
+ redisQueueService.updateStatus(queueId, OrderQueueStatusEnum.PAUSED.getStatus());
+ } catch (Exception e) {
+ log.error("Redis 更新状态失败: queueId={}", queueId, e);
+ }
+ });
+ }
+
+ log.info("任务已暂停: queueId={}, opsOrderId={}, pausedDuration={}",
+ queueId, queueDO.getOpsOrderId(), queueDO.getPausedDuration());
+
+ return updated > 0;
+ }
+
+ @Override
+ @Transactional(rollbackFor = Exception.class)
+ public boolean resumeTask(Long queueId) {
+ OpsOrderQueueDO queueDO = orderQueueMapper.selectById(queueId);
+ if (queueDO == null) {
+ log.warn("队列记录不存在: queueId={}", queueId);
+ return false;
+ }
+
+ // 只有 PAUSED 状态的任务可以恢复
+ String currentStatus = queueDO.getQueueStatus();
+ if (!OrderQueueStatusEnum.PAUSED.getStatus().equals(currentStatus)) {
+ log.warn("只有PAUSED状态的任务可以恢复: queueId={}, status={}",
+ queueId, currentStatus);
+ return false;
+ }
+
+ // 队列状态流转:PAUSED → PROCESSING
+ queueDO.setQueueStatus(OrderQueueStatusEnum.PROCESSING.getStatus());
+ queueDO.setDequeueTime(LocalDateTime.now()); // 重置出队时间
+ queueDO.setEventMessage("任务已恢复执行");
+
+ int updated = orderQueueMapper.updateById(queueDO);
+
+ // 异步更新 Redis
+ if (updated > 0) {
+ CompletableFuture.runAsync(() -> {
+ try {
+ redisQueueService.updateStatus(queueId, OrderQueueStatusEnum.PROCESSING.getStatus());
+ } catch (Exception e) {
+ log.error("Redis 更新状态失败: queueId={}", queueId, e);
+ }
+ });
+ }
+
+ log.info("任务已恢复执行: queueId={}, opsOrderId={}", queueId, queueDO.getOpsOrderId());
+
+ return updated > 0;
+ }
+
+ @Override
+ @Transactional(rollbackFor = Exception.class)
+ public boolean adjustPriority(Long queueId, PriorityEnum newPriority, String reason) {
+ OpsOrderQueueDO queueDO = orderQueueMapper.selectById(queueId);
+ if (queueDO == null) {
+ log.warn("队列记录不存在: queueId={}", queueId);
+ return false;
+ }
+
+ Integer oldPriority = queueDO.getPriority();
+
+ // 如果优先级没有变化,直接返回
+ if (oldPriority.equals(newPriority.getPriority())) {
+ log.info("优先级未变化: queueId={}, priority={}", queueId, newPriority);
+ return true;
+ }
+
+ queueDO.setPriorityEnum(newPriority);
+ // 重新计算队列顺序
+ queueDO.setQueueIndex(calculateNextQueueIndex(newPriority));
+ // 重新计算队列分数(使用原入队时间保持时间戳不变)
+ LocalDateTime enqueueTime = queueDO.getEnqueueTime() != null ? queueDO.getEnqueueTime() : LocalDateTime.now();
+ double newQueueScore = calculateQueueScore(newPriority.getPriority(), enqueueTime);
+ queueDO.setQueueScore(newQueueScore);
+ queueDO.setEventMessage("优先级调整: " + oldPriority + " -> " + newPriority + ", 原因: " + reason);
+
+ int updated = orderQueueMapper.updateById(queueDO);
+
+ // 异步更新 Redis
+ if (updated > 0) {
+ CompletableFuture.runAsync(() -> {
+ try {
+ redisQueueService.updatePriority(queueId, newPriority.getPriority());
+ } catch (Exception e) {
+ log.error("Redis 更新优先级失败: queueId={}", queueId, e);
+ }
+ });
+ }
+
+ log.info("优先级已调整: queueId={}, opsOrderId={}, oldPriority={}, newPriority={}, reason={}, newScore={}",
+ queueId, queueDO.getOpsOrderId(), oldPriority, newPriority, reason, newQueueScore);
+
+ // 如果升级为P0,触发紧急派单
+ if (newPriority.isUrgent()) {
+ log.warn("任务升级为P0紧急,需立即处理: queueId={}, opsOrderId={}",
+ queueId, queueDO.getOpsOrderId());
+ // TODO: 触发紧急派单流程
+ }
+
+ return updated > 0;
+ }
+
+ @Override
+ @Transactional(rollbackFor = Exception.class)
+ public boolean adjustPriorityByOpsOrderId(Long opsOrderId, PriorityEnum newPriority, String reason) {
+ OpsOrderQueueDO queueDO = orderQueueMapper.selectByOpsOrderId(opsOrderId);
+ if (queueDO == null) {
+ log.warn("工单不在队列中: opsOrderId={}", opsOrderId);
+ return false;
+ }
+
+ return adjustPriority(queueDO.getId(), newPriority, reason);
+ }
+
+ @Override
+ @Transactional(rollbackFor = Exception.class)
+ public boolean upgradeToUrgent(Long opsOrderId, String reason) {
+ return adjustPriorityByOpsOrderId(opsOrderId, PriorityEnum.P0, reason);
+ }
+
+ @Override
+ public List getWaitingQueue() {
+ // 优先从 Redis 获取
+ // TODO: 实现全局等待队列(需要聚合所有用户的队列)
+ // 这里暂时使用 MySQL
+ List list = orderQueueMapper.selectListWaiting();
+ return convertToDTO(list);
+ }
+
+ @Override
+ public List getUrgentOrders() {
+ // TODO: 优先从 Redis 获取
+ List list = orderQueueMapper.selectUrgentOrders();
+ return convertToDTO(list);
+ }
+
+ @Override
+ public OrderQueueDTO getCurrentTaskByUserId(Long userId) {
+ // 1. 优先从 Redis 获取
+ List redisTasks = redisQueueService.peekTasks(userId, 1);
+ if (redisTasks != null && !redisTasks.isEmpty()) {
+ return redisTasks.get(0);
+ }
+
+ // 2. Redis 未命中,从 MySQL 获取并同步到 Redis
+ OpsOrderQueueDO queueDO = orderQueueMapper.selectCurrentExecutingByUserId(userId);
+ if (queueDO != null) {
+ // 同步到 Redis
+ OrderQueueDTO dto = convertToDTO(queueDO);
+ CompletableFuture.runAsync(() -> {
+ try {
+ redisQueueService.enqueue(dto);
+ } catch (Exception e) {
+ log.error("同步到 Redis 失败: queueId={}", dto.getId(), e);
+ }
+ });
+ return dto;
+ }
+
+ return null;
+ }
+
+ @Override
+ public List getTasksByUserId(Long userId) {
+ // 1. 优先从 Redis 获取
+ List redisTasks = redisQueueService.getTasksByUserId(userId);
+ if (redisTasks != null && !redisTasks.isEmpty()) {
+ return redisTasks;
+ }
+
+ // 2. Redis 未命中,从 MySQL 获取并同步到 Redis
+ List mysqlList = orderQueueMapper.selectListByUserId(userId);
+ if (mysqlList != null && !mysqlList.isEmpty()) {
+ // 同步到 Redis
+ List dtoList = convertToDTO(mysqlList);
+ CompletableFuture.runAsync(() -> {
+ try {
+ redisQueueService.batchEnqueue(dtoList);
+ } catch (Exception e) {
+ log.error("批量同步到 Redis 失败: userId={}", userId, e);
+ }
+ });
+ return dtoList;
+ }
+
+ return Collections.emptyList();
+ }
+
+ @Override
+ public List getWaitingTasksByUserId(Long userId) {
+ // 获取所有任务
+ List allTasks = getTasksByUserId(userId);
+
+ // 过滤出 WAITING 状态的任务,并按队列分数排序
+ return allTasks.stream()
+ .filter(task -> OrderQueueStatusEnum.WAITING.getStatus().equals(task.getQueueStatus()))
+ .sorted((a, b) -> {
+ // 优先使用队列分数 score 排序(已在入队时计算好)
+ if (a.getQueueScore() != null && b.getQueueScore() != null) {
+ return Double.compare(a.getQueueScore(), b.getQueueScore());
+ }
+
+ // 兜底:如果 score 为空,则按优先级+时间排序
+ // 按优先级排序(P0 > P1 > P2 > P3)
+ int priorityCompare = Integer.compare(
+ b.getPriority() != null ? b.getPriority() : 999,
+ a.getPriority() != null ? a.getPriority() : 999
+ );
+ if (priorityCompare != 0) {
+ return priorityCompare;
+ }
+ // 优先级相同,按入队时间排序
+ return a.getEnqueueTime().compareTo(b.getEnqueueTime());
+ })
+ .collect(Collectors.toList());
+ }
+
+ @Override
+ public List getInterruptedTasksByUserId(Long userId) {
+ // 获取所有任务
+ List allTasks = getTasksByUserId(userId);
+
+ // 过滤出 PAUSED 状态的任务,并按中断时间排序
+ return allTasks.stream()
+ .filter(task -> OrderQueueStatusEnum.PAUSED.getStatus().equals(task.getQueueStatus()))
+ .sorted((a, b) -> {
+ // 按中断时间排序(最早中断的排在前面)
+ // 如果没有中断时间字段,则按入队时间排序
+ if (a.getEnqueueTime() != null && b.getEnqueueTime() != null) {
+ return a.getEnqueueTime().compareTo(b.getEnqueueTime());
+ }
+ return 0;
+ })
+ .collect(Collectors.toList());
+ }
+
+ @Override
+ public OrderQueueDTO getById(Long queueId) {
+ // 1. 优先从 Redis Hash 获取
+ OrderQueueDTO redisDTO = redisQueueService.getByQueueId(queueId);
+ if (redisDTO != null) {
+ return redisDTO;
+ }
+
+ // 2. Redis 未命中,从 MySQL 获取
+ OpsOrderQueueDO queueDO = orderQueueMapper.selectById(queueId);
+ if (queueDO != null) {
+ OrderQueueDTO dto = convertToDTO(queueDO);
+ // 同步到 Redis
+ CompletableFuture.runAsync(() -> {
+ try {
+ redisQueueService.enqueue(dto);
+ } catch (Exception e) {
+ log.error("同步到 Redis 失败: queueId={}", dto.getId(), e);
+ }
+ });
+ return dto;
+ }
+
+ return null;
+ }
+
+ @Override
+ public OrderQueueDTO getByOpsOrderId(Long opsOrderId) {
+ // 1. 优先从 Redis 获取(遍历所有用户队列,性能较差,慎用)
+ OrderQueueDTO redisDTO = redisQueueService.getByOrderId(opsOrderId);
+ if (redisDTO != null) {
+ return redisDTO;
+ }
+
+ // 2. Redis 未命中,从 MySQL 获取
+ OpsOrderQueueDO queueDO = orderQueueMapper.selectByOpsOrderId(opsOrderId);
+ if (queueDO != null) {
+ return convertToDTO(queueDO);
+ }
+
+ return null;
+ }
+
+ @Override
+ public Long countWaiting() {
+ // 优先从 Redis 获取(需要聚合所有用户)
+ // TODO: 实现分布式计数器
+ return orderQueueMapper.countWaiting();
+ }
+
+ @Override
+ public Long countByUserId(Long userId) {
+ // 优先从 Redis 获取
+ long redisCount = redisQueueService.getQueueSize(userId);
+ if (redisCount > 0) {
+ return redisCount;
+ }
+
+ // Redis 未命中,从 MySQL 获取
+ return orderQueueMapper.countByUserId(userId);
+ }
+
+ @Override
+ @Transactional(rollbackFor = Exception.class)
+ public boolean updateEventMessage(Long queueId, String eventMessage) {
+ OpsOrderQueueDO queueDO = orderQueueMapper.selectById(queueId);
+ if (queueDO == null) {
+ log.warn("队列记录不存在: queueId={}", queueId);
+ return false;
+ }
+
+ queueDO.setEventMessage(eventMessage);
+ int updated = orderQueueMapper.updateById(queueDO);
+
+ log.info("事件消息已更新: queueId={}, eventMessage={}", queueId, eventMessage);
+
+ return updated > 0;
+ }
+
+ // ========== 私有方法 ==========
+
+ /**
+ * 计算队列分数(用于排序)
+ * 公式:优先级分数 + 时间戳
+ *
+ * @param priority 优先级(0=P0, 1=P1, 2=P2, 3=P3)
+ * @param enqueueTime 入队时间
+ * @return 队列分数
+ */
+ private double calculateQueueScore(Integer priority, LocalDateTime enqueueTime) {
+ // 获取优先级分数
+ long priorityScore = PRIORITY_SCORES.getOrDefault(priority, 3000000L);
+
+ // 计算时间戳(秒级)
+ long timestamp;
+ if (enqueueTime != null) {
+ timestamp = enqueueTime.atZone(ZoneId.systemDefault()).toEpochSecond();
+ } else {
+ timestamp = System.currentTimeMillis() / 1000;
+ }
+
+ return priorityScore + timestamp;
+ }
+
+ /**
+ * 计算下一个队列顺序号
+ */
+ private Integer calculateNextQueueIndex(PriorityEnum priority) {
+ // TODO: 查询当前优先级的最大 queueIndex,然后 +1
+ // 这里简化处理,返回默认值
+ return priority.getQueueOrder() * 1000;
+ }
+
+ /**
+ * 判断是否为终态
+ */
+ private boolean isTerminalStatus(String status) {
+ return "CANCELLED".equals(status) || "COMPLETED".equals(status)
+ || "REMOVED".equals(status) || "FAILED".equals(status);
+ }
+
+ /**
+ * 校验状态流转是否合法
+ */
+ private boolean validateStatusTransition(String oldStatus, String newStatus) {
+ // 终态不能再变更
+ if (isTerminalStatus(oldStatus)) {
+ return false;
+ }
+
+ // 允许的状态流转(支持新旧状态)
+ return switch (oldStatus.toUpperCase()) {
+ case "PENDING" -> "WAITING".equals(newStatus) || "CANCELLED".equals(newStatus);
+ case "WAITING" -> "PROCESSING".equals(newStatus) || "DISPATCHED".equals(newStatus)
+ || "CANCELLED".equals(newStatus);
+ case "PROCESSING", "DISPATCHED" -> "PAUSED".equals(newStatus)
+ || "REMOVED".equals(newStatus) || "COMPLETED".equals(newStatus);
+ case "PAUSED" -> "PROCESSING".equals(newStatus) || "DISPATCHED".equals(newStatus);
+ default -> false;
+ };
+ }
+
+ /**
+ * 根据状态更新时间戳
+ */
+ private void updateStatusTimestamp(OpsOrderQueueDO queueDO, OrderQueueStatusEnum status) {
+ switch (status.getStatus().toUpperCase()) {
+ case "DISPATCHED" -> {
+ if (queueDO.getDequeueTime() == null) {
+ queueDO.setDequeueTime(LocalDateTime.now());
+ }
+ }
+ case "REMOVED", "COMPLETED" -> {
+ // 可以记录完成时间(如果表有该字段)
+ }
+ }
+ }
+
+ /**
+ * 转换为 DTO
+ */
+ private OrderQueueDTO convertToDTO(OpsOrderQueueDO queueDO) {
+ OrderQueueDTO dto = new OrderQueueDTO();
+ BeanUtils.copyProperties(queueDO, dto);
+ return dto;
+ }
+
+ /**
+ * 批量转换为 DTO
+ */
+ private List convertToDTO(List list) {
+ return list.stream()
+ .map(this::convertToDTO)
+ .collect(Collectors.toList());
+ }
+}
diff --git a/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/queue/QueueSyncService.java b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/queue/QueueSyncService.java
index 6724561..57168a0 100644
--- a/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/queue/QueueSyncService.java
+++ b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/queue/QueueSyncService.java
@@ -6,12 +6,10 @@ import com.viewsh.module.ops.dal.mysql.queue.OpsOrderQueueMapper;
import com.viewsh.module.ops.enums.OrderQueueStatusEnum;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
-import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.time.LocalDateTime;
-import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@@ -36,10 +34,10 @@ public class QueueSyncService {
private RedisOrderQueueService redisQueueService;
/**
- * 定时同步任务:每5分钟执行一次
+ * 同步 MySQL 数据到 Redis
* 将最近1小时内变更的数据同步到 Redis
+ * 由 xxl-job 定时任务调用
*/
- @Scheduled(cron = "0 */5 * * * ?")
public void syncMySQLToRedis() {
try {
log.info("开始定时同步:MySQL -> Redis");
@@ -51,7 +49,7 @@ public class QueueSyncService {
.ge(OpsOrderQueueDO::getUpdateTime, startTime)
.in(OpsOrderQueueDO::getQueueStatus,
OrderQueueStatusEnum.WAITING.getStatus(),
- OrderQueueStatusEnum.DISPATCHED.getStatus(),
+ OrderQueueStatusEnum.PROCESSING.getStatus(),
OrderQueueStatusEnum.PAUSED.getStatus())
);
@@ -93,7 +91,7 @@ public class QueueSyncService {
.eq(OpsOrderQueueDO::getUserId, cleanerId)
.in(OpsOrderQueueDO::getQueueStatus,
OrderQueueStatusEnum.WAITING.getStatus(),
- OrderQueueStatusEnum.DISPATCHED.getStatus(),
+ OrderQueueStatusEnum.PROCESSING.getStatus(),
OrderQueueStatusEnum.PAUSED.getStatus())
);
@@ -141,7 +139,7 @@ public class QueueSyncService {
.eq(OpsOrderQueueDO::getUserId, cleanerId)
.in(OpsOrderQueueDO::getQueueStatus,
OrderQueueStatusEnum.WAITING.getStatus(),
- OrderQueueStatusEnum.DISPATCHED.getStatus(),
+ OrderQueueStatusEnum.PROCESSING.getStatus(),
OrderQueueStatusEnum.PAUSED.getStatus())
);
@@ -181,7 +179,7 @@ public class QueueSyncService {
new com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper()
.in(OpsOrderQueueDO::getQueueStatus,
OrderQueueStatusEnum.WAITING.getStatus(),
- OrderQueueStatusEnum.DISPATCHED.getStatus(),
+ OrderQueueStatusEnum.PROCESSING.getStatus(),
OrderQueueStatusEnum.PAUSED.getStatus())
);