chore: 【ops】保洁员Dispatch初步搭建
This commit is contained in:
@@ -0,0 +1,163 @@
|
||||
package com.viewsh.module.ops.environment.service.dispatch;
|
||||
|
||||
import com.viewsh.module.ops.api.queue.OrderQueueDTO;
|
||||
import com.viewsh.module.ops.dal.dataobject.cleaner.OpsCleanerStatusDO;
|
||||
import com.viewsh.module.ops.enums.CleanerStatusEnum;
|
||||
import com.viewsh.module.ops.enums.PriorityEnum;
|
||||
import com.viewsh.module.ops.environment.service.cleaner.CleanerStatusService;
|
||||
import com.viewsh.module.ops.service.dispatch.DispatchStrategy;
|
||||
import jakarta.annotation.PostConstruct;
|
||||
import jakarta.annotation.Resource;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.Comparator;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* 保洁区域优先派单策略
|
||||
* 优先分配给该区域的空闲保洁员,按以下规则排序:
|
||||
* 1. 状态必须是IDLE(空闲)
|
||||
* 2. 优先选择同区域的保洁员
|
||||
* 3. 考虑保洁员的工作量平衡
|
||||
*
|
||||
* @author lzh
|
||||
*/
|
||||
@Component
|
||||
@Slf4j
|
||||
public class CleanerAreaPriorityStrategy implements DispatchStrategy {
|
||||
|
||||
@Resource
|
||||
private CleanerStatusService cleanerStatusService;
|
||||
|
||||
@Resource
|
||||
private com.viewsh.module.ops.service.dispatch.DispatchEngineServiceImpl dispatchEngineService;
|
||||
|
||||
/**
|
||||
* 策略名称
|
||||
*/
|
||||
private static final String STRATEGY_NAME = "cleaner_area_priority";
|
||||
|
||||
@PostConstruct
|
||||
public void init() {
|
||||
// 注册策略到派单引擎
|
||||
dispatchEngineService.registerStrategy(this);
|
||||
// 注册执行人员类型与策略的映射
|
||||
dispatchEngineService.registerAssigneeTypeStrategy("CLEANER", STRATEGY_NAME);
|
||||
log.info("保洁区域优先派单策略已注册: strategyName={}", STRATEGY_NAME);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getName() {
|
||||
return STRATEGY_NAME;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Long recommendAssignee(OrderQueueDTO queueDTO) {
|
||||
log.info("执行保洁区域优先派单策略: opsOrderId={}", queueDTO.getOpsOrderId());
|
||||
|
||||
// 在新的队列模型中,推荐逻辑应该在工单创建时完成
|
||||
// 这里主要用于手动派单或重新派单的场景
|
||||
// TODO: 从 eventPayload 中解析区域ID,然后查询可用保洁员
|
||||
log.warn("在新队列模型中,推荐保洁员应该在工单创建时完成: opsOrderId={}",
|
||||
queueDTO.getOpsOrderId());
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean canInterrupt(Long currentAssigneeId, Long currentOrderId, Long urgentOrderId) {
|
||||
log.warn("判断是否可打断保洁员任务: assigneeId={}, currentOrderId={}, urgentOrderId={}",
|
||||
currentAssigneeId, currentOrderId, urgentOrderId);
|
||||
|
||||
// P0任务可以打断P1/P2/P3任务
|
||||
// 简化处理:默认允许打断
|
||||
log.info("允许打断:紧急任务优先级更高: urgentOrderId={}, currentOrderId={}",
|
||||
urgentOrderId, currentOrderId);
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* 选择最佳保洁员
|
||||
* 综合考虑以下因素:
|
||||
* 1. 是否在该区域(已过滤)
|
||||
* 2. 当前工作量(已完成的工单数少优先)
|
||||
*/
|
||||
private OpsCleanerStatusDO selectBestCleaner(List<OpsCleanerStatusDO> cleaners,
|
||||
OrderQueueDTO queueDTO) {
|
||||
// 使用Comparator进行多条件排序
|
||||
return cleaners.stream()
|
||||
.max(Comparator
|
||||
// 1. 电量充足优先(电量>20%)
|
||||
.comparing((OpsCleanerStatusDO cleaner) ->
|
||||
cleaner.getBatteryLevel() != null && cleaner.getBatteryLevel() > 20 ? 1 : 0)
|
||||
// 2. 心跳时间最新的优先(在线状态好)
|
||||
.thenComparing(OpsCleanerStatusDO::getLastHeartbeatTime, Comparator.nullsLast(Comparator.naturalOrder()))
|
||||
// 3. 没有当前工单的<E58D95><E79A84><EFBFBD>先(完全空闲)
|
||||
.thenComparing((OpsCleanerStatusDO cleaner) ->
|
||||
cleaner.getCurrentOpsOrderId() == null ? 1 : 0)
|
||||
// 4. 工作量少的优先(状态变更时间早的说明刚完成)
|
||||
.thenComparing(OpsCleanerStatusDO::getUpdateTime, Comparator.nullsLast(Comparator.naturalOrder()))
|
||||
)
|
||||
.orElse(null);
|
||||
}
|
||||
|
||||
/**
|
||||
* 为新工单推荐保洁员(工单创建时调用)
|
||||
* 这是主要的推荐方法,用于在工单创建时选择合适的保洁员
|
||||
*
|
||||
* @param areaId 区域ID
|
||||
* @param priority 工单优先级
|
||||
* @return 推荐的保洁员ID,如果没有合适的返回null
|
||||
*/
|
||||
public Long recommendCleanerForNewOrder(Long areaId, PriorityEnum priority) {
|
||||
log.info("为新工单推荐保洁员: areaId={}, priority={}", areaId, priority);
|
||||
|
||||
// TODO-lzh:目前场景是单个区域绑定单个保洁员(所以只需查出对应保洁返回即可)
|
||||
|
||||
// 查询该区域的可用保洁员(IDLE状态)
|
||||
List<OpsCleanerStatusDO> availableCleaners =
|
||||
cleanerStatusService.listAvailableCleaners(areaId);
|
||||
|
||||
if (availableCleaners.isEmpty()) {
|
||||
log.warn("该区域没有可用保洁员: areaId={}", areaId);
|
||||
return null;
|
||||
}
|
||||
|
||||
log.info("找到 {} 个可用保洁员: areaId={}", availableCleaners.size(), areaId);
|
||||
|
||||
// 对于P0紧急任务,优先选择电量充足、心跳最新的
|
||||
if (priority.isUrgent()) {
|
||||
log.warn("为P0紧急任务推荐保洁员,优先选择最佳保洁员");
|
||||
}
|
||||
|
||||
// 选择最佳保洁员
|
||||
OpsCleanerStatusDO selectedCleaner = selectBestCleanerById(availableCleaners);
|
||||
|
||||
if (selectedCleaner != null) {
|
||||
log.info("为新工单推荐保洁员: areaId={}, priority={}, cleanerId={}, cleanerName={}",
|
||||
areaId, priority, selectedCleaner.getUserId(), selectedCleaner.getUserName());
|
||||
return selectedCleaner.getUserId();
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* 从保洁员列表中选择最佳保洁员(简化版本,不需要 queueDTO)
|
||||
*/
|
||||
private OpsCleanerStatusDO selectBestCleanerById(List<OpsCleanerStatusDO> cleaners) {
|
||||
return cleaners.stream()
|
||||
.max(Comparator
|
||||
// 1. 电量充足优先(电量>20%)
|
||||
.comparing((OpsCleanerStatusDO cleaner) ->
|
||||
cleaner.getBatteryLevel() != null && cleaner.getBatteryLevel() > 20 ? 1 : 0)
|
||||
// 2. 心跳时间最新的优先(在线状态好)
|
||||
.thenComparing(OpsCleanerStatusDO::getLastHeartbeatTime, Comparator.nullsLast(Comparator.naturalOrder()))
|
||||
// 3. 没有当前工单的优先(完全空闲)
|
||||
.thenComparing((OpsCleanerStatusDO cleaner) ->
|
||||
cleaner.getCurrentOpsOrderId() == null ? 1 : 0)
|
||||
)
|
||||
.orElse(null);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,324 @@
|
||||
package com.viewsh.module.ops.service.dispatch;
|
||||
|
||||
import com.viewsh.module.ops.api.dispatch.DispatchEngineService;
|
||||
import com.viewsh.module.ops.api.queue.OrderQueueDTO;
|
||||
import com.viewsh.module.ops.api.queue.OrderQueueService;
|
||||
import com.viewsh.module.ops.dal.dataobject.queue.OpsOrderQueueDO;
|
||||
import com.viewsh.module.ops.dal.mysql.queue.OpsOrderQueueMapper;
|
||||
import com.viewsh.module.ops.enums.DispatchStrategyEnum;
|
||||
import com.viewsh.module.ops.enums.OrderQueueStatusEnum;
|
||||
import com.viewsh.module.ops.enums.PriorityEnum;
|
||||
import jakarta.annotation.PostConstruct;
|
||||
import jakarta.annotation.Resource;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.springframework.transaction.annotation.Transactional;
|
||||
|
||||
import java.util.*;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
/**
|
||||
* 派单引擎服务实现
|
||||
* 提供通用的派单框架,各业务模块通过实现 DispatchStrategy 接口提供具体策略
|
||||
*
|
||||
* 注意:新的队列模型是在工单创建时就指定了 userId(保洁员),
|
||||
* 所以派单引擎的主要职责是:
|
||||
* 1. 管理队列状态的流转(WAITING -> dispatched)
|
||||
* 2. 处理P0紧急任务的插队和打断
|
||||
*
|
||||
* @author lzh
|
||||
*/
|
||||
@Service
|
||||
@Slf4j
|
||||
public class DispatchEngineServiceImpl implements DispatchEngineService {
|
||||
|
||||
@Resource
|
||||
private OrderQueueService orderQueueService;
|
||||
|
||||
@Resource
|
||||
private OpsOrderQueueMapper orderQueueMapper;
|
||||
|
||||
/**
|
||||
* 派单策略注册表
|
||||
* Key: 策略名称
|
||||
* Value: 策略实现
|
||||
*/
|
||||
private final Map<String, DispatchStrategy> strategyRegistry = new ConcurrentHashMap<>();
|
||||
|
||||
/**
|
||||
* 执行人员类型与策略的映射
|
||||
*/
|
||||
private final Map<String, String> assigneeTypeStrategyMap = new ConcurrentHashMap<>();
|
||||
|
||||
@PostConstruct
|
||||
public void init() {
|
||||
log.info("派单引擎已初始化");
|
||||
}
|
||||
|
||||
// ========== 策略管理 ==========
|
||||
|
||||
/**
|
||||
* 注册派单策略
|
||||
*/
|
||||
public void registerStrategy(DispatchStrategy strategy) {
|
||||
strategyRegistry.put(strategy.getName(), strategy);
|
||||
log.info("派单策略已注册: strategyName={}", strategy.getName());
|
||||
}
|
||||
|
||||
/**
|
||||
* 注册执行人员类型与策略的映射
|
||||
*/
|
||||
public void registerAssigneeTypeStrategy(String assigneeType, String strategyName) {
|
||||
assigneeTypeStrategyMap.put(assigneeType, strategyName);
|
||||
log.info("执行人员类型策略映射已注册: assigneeType={}, strategyName={}",
|
||||
assigneeType, strategyName);
|
||||
}
|
||||
|
||||
private DispatchStrategy getStrategy(String strategyName) {
|
||||
return strategyRegistry.get(strategyName);
|
||||
}
|
||||
|
||||
private DispatchStrategy getStrategyByAssigneeType(String assigneeType) {
|
||||
String strategyName = assigneeTypeStrategyMap.get(assigneeType);
|
||||
if (strategyName == null) {
|
||||
log.warn("未找到执行人员类型对应的策略: assigneeType={}", assigneeType);
|
||||
return null;
|
||||
}
|
||||
return getStrategy(strategyName);
|
||||
}
|
||||
|
||||
// ========== 派单方法实现 ==========
|
||||
|
||||
@Override
|
||||
@Transactional(rollbackFor = Exception.class)
|
||||
public boolean autoDispatch(Long queueId) {
|
||||
log.info("开始自动派单: queueId={}", queueId);
|
||||
|
||||
// 在新的队列模型中,工单入队时已经指定了 userId
|
||||
// 派单引擎只需要将状态从 WAITING 改为 EXECUTING
|
||||
boolean success = orderQueueService.startExecution(queueId);
|
||||
|
||||
if (success) {
|
||||
log.info("自动派单成功: queueId={}", queueId);
|
||||
} else {
|
||||
log.warn("自动派单失败: queueId={}", queueId);
|
||||
}
|
||||
|
||||
return success;
|
||||
}
|
||||
|
||||
@Override
|
||||
@Transactional(rollbackFor = Exception.class)
|
||||
public boolean manualDispatch(Long queueId, Long assigneeId) {
|
||||
log.info("手动派单: queueId={}, assigneeId={}", queueId, assigneeId);
|
||||
|
||||
// 在新的队列模型中,userId 在入队时已经确定
|
||||
// 手动派单主要是重新分配 userId,然后开始执行
|
||||
OpsOrderQueueDO queueDO = orderQueueMapper.selectById(queueId);
|
||||
if (queueDO == null) {
|
||||
log.warn("队列记录不存在: queueId={}", queueId);
|
||||
return false;
|
||||
}
|
||||
|
||||
// 更新 userId
|
||||
queueDO.setUserId(assigneeId);
|
||||
orderQueueMapper.updateById(queueDO);
|
||||
|
||||
// 开始执行
|
||||
return autoDispatch(queueId);
|
||||
}
|
||||
|
||||
@Override
|
||||
@Transactional(rollbackFor = Exception.class)
|
||||
public int batchAutoDispatch(Long areaId, int batchSize) {
|
||||
log.info("开始批量自动派单: batchSize={}", batchSize);
|
||||
|
||||
// 查询等待中的队列
|
||||
List<OrderQueueDTO> waitingQueue = orderQueueService.getWaitingQueue();
|
||||
if (waitingQueue.isEmpty()) {
|
||||
log.info("没有待派单的任务");
|
||||
return 0;
|
||||
}
|
||||
|
||||
// 限制批次大小
|
||||
int processCount = Math.min(batchSize, waitingQueue.size());
|
||||
int successCount = 0;
|
||||
|
||||
// 逐个派单
|
||||
for (int i = 0; i < processCount; i++) {
|
||||
OrderQueueDTO queueDTO = waitingQueue.get(i);
|
||||
try {
|
||||
boolean success = autoDispatch(queueDTO.getId());
|
||||
if (success) {
|
||||
successCount++;
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.error("派单失败: queueId={}, opsOrderId={}",
|
||||
queueDTO.getId(), queueDTO.getOpsOrderId(), e);
|
||||
}
|
||||
}
|
||||
|
||||
log.info("批量派单完成: total={}, success={}", processCount, successCount);
|
||||
return successCount;
|
||||
}
|
||||
|
||||
@Override
|
||||
@Transactional(rollbackFor = Exception.class)
|
||||
public boolean urgentDispatch(Long queueId) {
|
||||
log.warn("开始紧急派单(P0): queueId={}", queueId);
|
||||
|
||||
OrderQueueDTO queueDTO = orderQueueService.getById(queueId);
|
||||
if (queueDTO == null) {
|
||||
log.warn("队列记录不存在: queueId={}", queueId);
|
||||
return false;
|
||||
}
|
||||
|
||||
// P0 紧急任务的处理:
|
||||
// 1. 直接开始执行
|
||||
// 2. 如果该用户正在执行其他任务,需要先打断
|
||||
|
||||
// 检查该用户是否正在执行其他任务
|
||||
OrderQueueDTO currentTask = orderQueueService.getCurrentTaskByUserId(queueDTO.getUserId());
|
||||
if (currentTask != null && !currentTask.getId().equals(queueId)) {
|
||||
// 用户正在执行其他任务,需要打断
|
||||
log.warn("该用户正在执行其他任务,需要打断: userId={}, currentTaskId={}",
|
||||
queueDTO.getUserId(), currentTask.getId());
|
||||
|
||||
// 判断是否可以打断
|
||||
String assigneeType = "CLEANER"; // TODO: 从用户表获取
|
||||
DispatchStrategy strategy = getStrategyByAssigneeType(assigneeType);
|
||||
if (strategy != null && strategy.canInterrupt(
|
||||
queueDTO.getUserId(),
|
||||
currentTask.getOpsOrderId(),
|
||||
queueDTO.getOpsOrderId())) {
|
||||
// 打断当前任务
|
||||
interruptTask(currentTask.getId(), currentTask.getOpsOrderId(), queueDTO.getOpsOrderId());
|
||||
} else {
|
||||
log.warn("不允许打断当前任务: currentTaskId={}", currentTask.getId());
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
// 开始执行紧急任务
|
||||
return autoDispatch(queueId);
|
||||
}
|
||||
|
||||
@Override
|
||||
@Transactional(rollbackFor = Exception.class)
|
||||
public boolean handlePriorityInterrupt(Long orderId) {
|
||||
log.warn("处理紧急插队: orderId={}", orderId);
|
||||
|
||||
OrderQueueDTO queueDTO = orderQueueService.getByOpsOrderId(orderId);
|
||||
if (queueDTO == null) {
|
||||
log.warn("工单不在队列中: orderId={}", orderId);
|
||||
return false;
|
||||
}
|
||||
|
||||
// 如果不是P0,不需要特殊处理
|
||||
if (!PriorityEnum.P0.getPriority().equals(queueDTO.getPriority())) {
|
||||
log.info("非P0工单,无需插队处理: orderId={}, priority={}",
|
||||
orderId, queueDTO.getPriority());
|
||||
return true;
|
||||
}
|
||||
|
||||
// 执行紧急派单
|
||||
return urgentDispatch(queueDTO.getId());
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Long> findAvailableAssignees(Long areaId, String assigneeType) {
|
||||
log.debug("查询可用执行人员: areaId={}, assigneeType={}", areaId, assigneeType);
|
||||
|
||||
// 根据执行人员类型获取策略
|
||||
DispatchStrategy strategy = getStrategyByAssigneeType(assigneeType);
|
||||
if (strategy == null) {
|
||||
log.warn("未找到派单策略: assigneeType={}", assigneeType);
|
||||
return Collections.emptyList();
|
||||
}
|
||||
|
||||
// 调用策略查询可用人员
|
||||
// TODO: 这里需要在策略接口中添加查询可用人员列表的方法
|
||||
return Collections.emptyList();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Long recommendAssignee(Long queueId, DispatchStrategyEnum dispatchStrategy) {
|
||||
log.debug("推荐执行人员: queueId={}", queueId);
|
||||
|
||||
// 在新的队列模型中,推荐执行人员的逻辑应该在入队之前完成
|
||||
// 这里只是预留接口,实际应该返回 null
|
||||
log.warn("在新队列模型中,推荐执行人员应该在入队时完成,而非派单时");
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public DispatchSuggestion getDispatchSuggestion(Long queueId) {
|
||||
// 在新的队列模型中,这个方法的意义不大
|
||||
// 因为 userId 在入队时已经确定
|
||||
log.warn("在新队列模型中,派单建议功能不适用");
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
@Transactional(rollbackFor = Exception.class)
|
||||
public boolean cancelDispatch(Long queueId, String reason) {
|
||||
log.info("取消派单: queueId={}, reason={}", queueId, reason);
|
||||
|
||||
OrderQueueDTO queueDTO = orderQueueService.getById(queueId);
|
||||
if (queueDTO == null) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (!queueDTO.canCancel()) {
|
||||
log.warn("当前状态不允许取<EFBFBD><EFBFBD>: queueId={}, status={}", queueId, queueDTO.getQueueStatus());
|
||||
return false;
|
||||
}
|
||||
|
||||
return orderQueueService.updateStatus(queueId, OrderQueueStatusEnum.CANCELLED);
|
||||
}
|
||||
|
||||
@Override
|
||||
@Transactional(rollbackFor = Exception.class)
|
||||
public boolean redispatch(Long queueId) {
|
||||
log.info("重新派单: queueId={}", queueId);
|
||||
|
||||
// 重新派单就是重新开始执行
|
||||
return autoDispatch(queueId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void startDispatchEngine() {
|
||||
log.info("派单引擎已启动");
|
||||
// TODO: 启动定时任务,定时处理待派单队列
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stopDispatchEngine() {
|
||||
log.info("派单引擎已停止");
|
||||
// TODO: 停止定时任务
|
||||
}
|
||||
|
||||
// ========== 私有方法 ==========
|
||||
|
||||
/**
|
||||
* 打断任务
|
||||
*/
|
||||
private void interruptTask(Long queueId, Long currentOrderId, Long urgentOrderId) {
|
||||
log.warn("打断任务: queueId={}, currentOrderId={}, urgentOrderId={}",
|
||||
queueId, currentOrderId, urgentOrderId);
|
||||
|
||||
// 暂停当前任务
|
||||
boolean paused = orderQueueService.pauseTask(queueId);
|
||||
|
||||
if (paused) {
|
||||
// 更新事件消息
|
||||
orderQueueService.updateEventMessage(queueId,
|
||||
"任务被P0紧急任务打断,紧急工单ID: " + urgentOrderId);
|
||||
|
||||
// TODO: 通知执行人员任务被打断(通过工牌推送消息)
|
||||
// TODO: 更新工单状态为暂停(调用工单服务)
|
||||
|
||||
log.info("任务已打断,执行人员可处理紧急任务: queueId={}", queueId);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,40 @@
|
||||
package com.viewsh.module.ops.service.dispatch;
|
||||
|
||||
import com.viewsh.module.ops.api.queue.OrderQueueDTO;
|
||||
|
||||
/**
|
||||
* 派单策略接口
|
||||
* 各业务模块(保洁、安保、工程等)需要实现此接口,定义自己的派单逻辑
|
||||
*
|
||||
* @author lzh
|
||||
*/
|
||||
public interface DispatchStrategy {
|
||||
|
||||
/**
|
||||
* 策略名称
|
||||
* 如:cleaner_area_priority, security_skill_match
|
||||
*
|
||||
* @return 策略名称
|
||||
*/
|
||||
String getName();
|
||||
|
||||
/**
|
||||
* 执行派单策略
|
||||
* 根据队列记录推荐合适的执行人员
|
||||
*
|
||||
* @param queueDTO 队列记录
|
||||
* @return 推荐的执行人员ID,如果没有合适的返回null
|
||||
*/
|
||||
Long recommendAssignee(OrderQueueDTO queueDTO);
|
||||
|
||||
/**
|
||||
* 判断是否可打断当前任务
|
||||
* 当P0紧急任务需要插队时,判断是否可以打断当前执行的任务
|
||||
*
|
||||
* @param currentAssigneeId 当前执行任务的执行人员ID
|
||||
* @param currentOrderId 当前正在执行的工单ID
|
||||
* @param urgentOrderId 紧急工单ID
|
||||
* @return 是否可以打断
|
||||
*/
|
||||
boolean canInterrupt(Long currentAssigneeId, Long currentOrderId, Long urgentOrderId);
|
||||
}
|
||||
Reference in New Issue
Block a user