chore: 【ops】保洁调度实现

This commit is contained in:
lzh
2026-01-09 17:35:54 +08:00
parent 23830961c8
commit 4a2801e97c
3 changed files with 416 additions and 163 deletions

View File

@@ -0,0 +1,271 @@
package com.viewsh.module.ops.environment.service.dispatch;
import com.viewsh.module.ops.api.queue.OrderQueueDTO;
import com.viewsh.module.ops.api.queue.OrderQueueService;
import com.viewsh.module.ops.core.dispatch.model.AssigneeRecommendation;
import com.viewsh.module.ops.core.dispatch.model.OrderDispatchContext;
import com.viewsh.module.ops.core.dispatch.strategy.AssignStrategy;
import com.viewsh.module.ops.enums.PriorityEnum;
import com.viewsh.module.ops.environment.dal.dataobject.cleaner.OpsCleanerStatusDO;
import com.viewsh.module.ops.environment.integration.adapter.CleanerAssigneeStatusAdapter;
import com.viewsh.module.ops.environment.service.cleaner.CleanerStatusService;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.stream.Collectors;
/**
* 保洁区域优先分配策略
* <p>
* 职责:谁来接单
* <p>
* 策略规则:
* 1. 查询该区域的空闲保洁员(状态=IDLE
* 2. 优先选择电量充足、心跳最新的保洁员
* 3. 考虑工作量平衡,选择当前任务最少的
*
* @author lzh
*/
@Component
@Slf4j
public class CleanerAreaAssignStrategy implements AssignStrategy {
private static final String STRATEGY_NAME = "cleaner_area_priority";
private static final String BUSINESS_TYPE = "CLEAN";
@Resource
private CleanerStatusService cleanerStatusService;
@Resource
private OrderQueueService orderQueueService;
@Resource
private com.viewsh.module.ops.core.dispatch.DispatchEngine dispatchEngine;
@PostConstruct
public void init() {
dispatchEngine.registerAssignStrategy(BUSINESS_TYPE, this);
log.info("保洁分配策略已注册: strategyName={}, businessType={}", STRATEGY_NAME, BUSINESS_TYPE);
}
@Override
public String getName() {
return STRATEGY_NAME;
}
@Override
public String getSupportedBusinessType() {
return BUSINESS_TYPE;
}
@Override
public AssigneeRecommendation recommend(OrderDispatchContext context) {
log.info("执行保洁区域优先分配策略: orderId={}, areaId={}, priority={}",
context.getOrderId(), context.getAreaId(), context.getPriority());
// 查询该区域的保洁员
List<OpsCleanerStatusDO> cleaners = cleanerStatusService.listCleanersByArea(context.getAreaId());
if (cleaners.isEmpty()) {
log.warn("该区域没有保洁员: areaId={}", context.getAreaId());
return AssigneeRecommendation.none();
}
// 过滤在线的保洁员
List<OpsCleanerStatusDO> onlineCleaners = cleaners.stream()
.filter(c -> !com.viewsh.module.ops.enums.CleanerStatusEnum.OFFLINE.getCode().equals(c.getStatus()))
.collect(Collectors.toList());
if (onlineCleaners.isEmpty()) {
log.warn("该区域没有在线保洁员: areaId={}", context.getAreaId());
return AssigneeRecommendation.none();
}
// 选择最佳保洁员
OpsCleanerStatusDO selectedCleaner = selectBestCleaner(onlineCleaners, context);
if (selectedCleaner != null) {
String reason = buildRecommendationReason(selectedCleaner, context);
return AssigneeRecommendation.of(
selectedCleaner.getUserId(),
selectedCleaner.getUserName(),
calculateScore(selectedCleaner),
reason
);
}
return AssigneeRecommendation.none();
}
@Override
public List<AssigneeRecommendation> recommendBatch(OrderDispatchContext context, int limit) {
log.info("批量推荐保洁员: areaId={}, priority={}, limit={}",
context.getAreaId(), context.getPriority(), limit);
List<OpsCleanerStatusDO> cleaners = cleanerStatusService.listCleanersByArea(context.getAreaId());
if (cleaners.isEmpty()) {
return new ArrayList<>();
}
return cleaners.stream()
.filter(c -> !com.viewsh.module.ops.enums.CleanerStatusEnum.OFFLINE.getCode().equals(c.getStatus()))
.limit(limit)
.map(cleaner -> {
int score = calculateScore(cleaner);
String reason = buildRecommendationReason(cleaner, context);
return AssigneeRecommendation.of(
cleaner.getUserId(),
cleaner.getUserName(),
score,
reason
);
})
.sorted((a, b) -> Integer.compare(b.getScore(), a.getScore()))
.collect(Collectors.toList());
}
// ==================== 私有方法 ====================
/**
* 选择最佳保洁员
*/
private OpsCleanerStatusDO selectBestCleaner(List<OpsCleanerStatusDO> cleaners, OrderDispatchContext context) {
// 对于P0紧急任务优先选择IDLE状态的保洁员
if (context.isUrgent()) {
return cleaners.stream()
.filter(c -> com.viewsh.module.ops.enums.CleanerStatusEnum.IDLE.getCode().equals(c.getStatus()))
.max(Comparator
.comparing((OpsCleanerStatusDO c) -> c.getBatteryLevel() != null && c.getBatteryLevel() > 20 ? 1 : 0)
.thenComparing(OpsCleanerStatusDO::getLastHeartbeatTime, Comparator.nullsLast(Comparator.naturalOrder())))
.orElse(null);
}
// 普通任务优先选择IDLE状态其次是BUSY状态等待队列较少的
OpsCleanerStatusDO idleCleaner = cleaners.stream()
.filter(c -> com.viewsh.module.ops.enums.CleanerStatusEnum.IDLE.getCode().equals(c.getStatus()))
.max(Comparator
.comparing((OpsCleanerStatusDO c) -> c.getBatteryLevel() != null && c.getBatteryLevel() > 20 ? 1 : 0)
.thenComparing(OpsCleanerStatusDO::getLastHeartbeatTime, Comparator.nullsLast(Comparator.naturalOrder())))
.orElse(null);
if (idleCleaner != null) {
return idleCleaner;
}
// 没有空闲保洁员,选择等待队列较少的忙碌保洁员
return cleaners.stream()
.filter(c -> com.viewsh.module.ops.enums.CleanerStatusEnum.BUSY.getCode().equals(c.getStatus()))
.min(Comparator.comparing(c -> getWaitingTaskCount(c.getUserId())))
.orElse(null);
}
/**
* 获取等待任务数量
*/
private int getWaitingTaskCount(Long userId) {
try {
List<OrderQueueDTO> waitingTasks = orderQueueService.getWaitingTasksByUserId(userId);
return waitingTasks != null ? waitingTasks.size() : 0;
} catch (Exception e) {
log.warn("查询等待任务数量失败: userId={}", userId, e);
return 0;
}
}
/**
* 计算推荐分数0-100
*/
private int calculateScore(OpsCleanerStatusDO cleaner) {
int score = 50; // 基础分
com.viewsh.module.ops.enums.CleanerStatusEnum statusEnum = cleaner.getStatusEnum();
// 状态分数
if (statusEnum == com.viewsh.module.ops.enums.CleanerStatusEnum.IDLE) {
score += 30;
} else if (statusEnum == com.viewsh.module.ops.enums.CleanerStatusEnum.BUSY) {
score += 10;
}
// 电量分数
if (cleaner.getBatteryLevel() != null) {
if (cleaner.getBatteryLevel() > 80) {
score += 15;
} else if (cleaner.getBatteryLevel() > 50) {
score += 10;
} else if (cleaner.getBatteryLevel() > 20) {
score += 5;
}
}
// 心跳分数
if (cleaner.getLastHeartbeatTime() != null) {
long minutesSinceHeartbeat = java.time.Duration.between(
cleaner.getLastHeartbeatTime(),
java.time.LocalDateTime.now()
).toMinutes();
if (minutesSinceHeartbeat < 5) {
score += 5;
}
}
return Math.min(score, 100);
}
/**
* 构建推荐理由
*/
private String buildRecommendationReason(OpsCleanerStatusDO cleaner, OrderDispatchContext context) {
StringBuilder reason = new StringBuilder();
reason.append("同区域保洁员");
com.viewsh.module.ops.enums.CleanerStatusEnum statusEnum = cleaner.getStatusEnum();
reason.append("、状态=").append(statusEnum != null ? statusEnum.getDescription() : cleaner.getStatus());
if (cleaner.getBatteryLevel() != null) {
reason.append("、电量").append(cleaner.getBatteryLevel()).append("%");
}
if (context.isUrgent()) {
reason.append("、适合P0紧急任务");
}
return reason.toString();
}
// ==================== 兼容方法 ====================
/**
* 为新工单推<E58D95><E68EA8>保洁员兼容旧接口
* <p>
* 这是主要的推荐方法,用于在工单创建时选择合适的保洁员
*
* @param areaId 区域ID
* @param priority 工单优先级
* @return 推荐的保洁员ID如果没有合适的返回null
*/
public Long recommendCleanerForNewOrder(Long areaId, PriorityEnum priority) {
log.info("为新工单推荐保洁员: areaId={}, priority={}", areaId, priority);
// 使用新的策略接口方法
OrderDispatchContext context = OrderDispatchContext.builder()
.areaId(areaId)
.priority(priority)
.build();
AssigneeRecommendation recommendation = recommend(context);
if (recommendation != null && recommendation.hasRecommendation()) {
log.info("为新工单推荐保洁员: areaId={}, priority={}, cleanerId={}, cleanerName={}",
areaId, priority, recommendation.getAssigneeId(), recommendation.getAssigneeName());
return recommendation.getAssigneeId();
}
log.warn("未找到可用的保洁员: areaId={}", areaId);
return null;
}
}

View File

@@ -1,163 +0,0 @@
package com.viewsh.module.ops.environment.service.dispatch;
import com.viewsh.module.ops.api.queue.OrderQueueDTO;
import com.viewsh.module.ops.dal.dataobject.cleaner.OpsCleanerStatusDO;
import com.viewsh.module.ops.enums.CleanerStatusEnum;
import com.viewsh.module.ops.enums.PriorityEnum;
import com.viewsh.module.ops.environment.service.cleaner.CleanerStatusService;
import com.viewsh.module.ops.service.dispatch.DispatchStrategy;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.Comparator;
import java.util.List;
/**
* 保洁区域优先派单策略
* 优先分配给该区域的空闲保洁员,按以下规则排序:
* 1. 状态必须是IDLE空闲
* 2. 优先选择同区域的保洁员
* 3. 考虑保洁员的工作量平衡
*
* @author lzh
*/
@Component
@Slf4j
public class CleanerAreaPriorityStrategy implements DispatchStrategy {
@Resource
private CleanerStatusService cleanerStatusService;
@Resource
private com.viewsh.module.ops.service.dispatch.DispatchEngineServiceImpl dispatchEngineService;
/**
* 策略名称
*/
private static final String STRATEGY_NAME = "cleaner_area_priority";
@PostConstruct
public void init() {
// 注册策略到派单引擎
dispatchEngineService.registerStrategy(this);
// 注册执行人员类型与策略的映射
dispatchEngineService.registerAssigneeTypeStrategy("CLEANER", STRATEGY_NAME);
log.info("保洁区域优先派单策略已注册: strategyName={}", STRATEGY_NAME);
}
@Override
public String getName() {
return STRATEGY_NAME;
}
@Override
public Long recommendAssignee(OrderQueueDTO queueDTO) {
log.info("执行保洁区域优先派单策略: opsOrderId={}", queueDTO.getOpsOrderId());
// 在新的队列模型中,推荐逻辑应该在工单创建时完成
// 这里主要用于手动派单或重新派单的场景
// TODO: 从 eventPayload 中解析区域ID然后查询可用保洁员
log.warn("在新队列模型中,推荐保洁员应该在工单创建时完成: opsOrderId={}",
queueDTO.getOpsOrderId());
return null;
}
@Override
public boolean canInterrupt(Long currentAssigneeId, Long currentOrderId, Long urgentOrderId) {
log.warn("判断是否可打断保洁员任务: assigneeId={}, currentOrderId={}, urgentOrderId={}",
currentAssigneeId, currentOrderId, urgentOrderId);
// P0任务可以打断P1/P2/P3任务
// 简化处理:默认允许打断
log.info("允许打断:紧急任务优先级更高: urgentOrderId={}, currentOrderId={}",
urgentOrderId, currentOrderId);
return true;
}
/**
* 选择最佳保洁员
* 综合考虑以下因素:
* 1. 是否在该区域(已过滤)
* 2. 当前工作量(已完成的工单数少优先)
*/
private OpsCleanerStatusDO selectBestCleaner(List<OpsCleanerStatusDO> cleaners,
OrderQueueDTO queueDTO) {
// 使用Comparator进行多条件排序
return cleaners.stream()
.max(Comparator
// 1. 电量充足优先(电量>20%
.comparing((OpsCleanerStatusDO cleaner) ->
cleaner.getBatteryLevel() != null && cleaner.getBatteryLevel() > 20 ? 1 : 0)
// 2. 心跳时间最新的优先(在线状态好)
.thenComparing(OpsCleanerStatusDO::getLastHeartbeatTime, Comparator.nullsLast(Comparator.naturalOrder()))
// 3. 没有当前工单的<E58D95><E79A84><EFBFBD>完全空闲
.thenComparing((OpsCleanerStatusDO cleaner) ->
cleaner.getCurrentOpsOrderId() == null ? 1 : 0)
// 4. 工作量少的优先(状态变更时间早的说明刚完成)
.thenComparing(OpsCleanerStatusDO::getUpdateTime, Comparator.nullsLast(Comparator.naturalOrder()))
)
.orElse(null);
}
/**
* 为新工单推荐保洁员(工单创建时调用)
* 这是主要的推荐方法,用于在工单创建时选择合适的保洁员
*
* @param areaId 区域ID
* @param priority 工单优先级
* @return 推荐的保洁员ID如果没有合适的返回null
*/
public Long recommendCleanerForNewOrder(Long areaId, PriorityEnum priority) {
log.info("为新工单推荐保洁员: areaId={}, priority={}", areaId, priority);
// TODO-lzh:目前场景是单个区域绑定单个保洁员(所以只需查出对应保洁返回即可)
// 查询该区域的可用保洁员IDLE状态
List<OpsCleanerStatusDO> availableCleaners =
cleanerStatusService.listAvailableCleaners(areaId);
if (availableCleaners.isEmpty()) {
log.warn("该区域没有可用保洁员: areaId={}", areaId);
return null;
}
log.info("找到 {} 个可用保洁员: areaId={}", availableCleaners.size(), areaId);
// 对于P0紧急任务优先选择电量充足、心跳最新的
if (priority.isUrgent()) {
log.warn("为P0紧急任务推荐保洁员优先选择最佳保洁员");
}
// 选择最佳保洁员
OpsCleanerStatusDO selectedCleaner = selectBestCleanerById(availableCleaners);
if (selectedCleaner != null) {
log.info("为新工单推荐保洁员: areaId={}, priority={}, cleanerId={}, cleanerName={}",
areaId, priority, selectedCleaner.getUserId(), selectedCleaner.getUserName());
return selectedCleaner.getUserId();
}
return null;
}
/**
* 从保洁员列表中选择最佳保洁员(简化版本,不需要 queueDTO
*/
private OpsCleanerStatusDO selectBestCleanerById(List<OpsCleanerStatusDO> cleaners) {
return cleaners.stream()
.max(Comparator
// 1. 电量充足优先(电量>20%
.comparing((OpsCleanerStatusDO cleaner) ->
cleaner.getBatteryLevel() != null && cleaner.getBatteryLevel() > 20 ? 1 : 0)
// 2. 心跳时间最新的优先(在线状态好)
.thenComparing(OpsCleanerStatusDO::getLastHeartbeatTime, Comparator.nullsLast(Comparator.naturalOrder()))
// 3. 没有当前工单的优先(完全空闲)
.thenComparing((OpsCleanerStatusDO cleaner) ->
cleaner.getCurrentOpsOrderId() == null ? 1 : 0)
)
.orElse(null);
}
}

View File

@@ -0,0 +1,145 @@
package com.viewsh.module.ops.environment.service.dispatch;
import com.viewsh.module.ops.api.queue.OrderQueueDTO;
import com.viewsh.module.ops.api.queue.OrderQueueService;
import com.viewsh.module.ops.core.dispatch.model.AssigneeStatus;
import com.viewsh.module.ops.core.dispatch.model.DispatchDecision;
import com.viewsh.module.ops.core.dispatch.model.OrderDispatchContext;
import com.viewsh.module.ops.core.dispatch.strategy.InterruptDecision;
import com.viewsh.module.ops.core.dispatch.strategy.ScheduleStrategy;
import com.viewsh.module.ops.environment.dal.dataobject.cleaner.OpsCleanerStatusDO;
import com.viewsh.module.ops.environment.integration.adapter.CleanerAssigneeStatusAdapter;
import com.viewsh.module.ops.environment.service.cleaner.CleanerStatusService;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.List;
/**
* 保洁优先级调度策略
* <p>
* 职责:怎么派单
* <p>
* 策略规则:
* <ul>
* <li>空闲无任务 → DIRECT_DISPATCH直接派单</li>
* <li>空闲有等待 → PUSH_AND_ENQUEUE推送等待+新任务入队)</li>
* <li>忙碌且非P0 → ENQUEUE_ONLY仅入队</li>
* <li>忙碌且P0 → INTERRUPT_AND_DISPATCH打断并派单</li>
* </ul>
*
* @author lzh
*/
@Component
@Slf4j
public class CleanerPriorityScheduleStrategy implements ScheduleStrategy {
private static final String STRATEGY_NAME = "cleaner_priority_schedule";
private static final String BUSINESS_TYPE = "CLEAN";
@Resource
private CleanerStatusService cleanerStatusService;
@Resource
private OrderQueueService orderQueueService;
@Resource
private com.viewsh.module.ops.core.dispatch.DispatchEngine dispatchEngine;
@PostConstruct
public void init() {
dispatchEngine.registerScheduleStrategy(BUSINESS_TYPE, this);
log.info("保洁调度策略已注册: strategyName={}, businessType={}", STRATEGY_NAME, BUSINESS_TYPE);
}
@Override
public String getName() {
return STRATEGY_NAME;
}
@Override
public String getSupportedBusinessType() {
return BUSINESS_TYPE;
}
@Override
public DispatchDecision decide(OrderDispatchContext context) {
Long assigneeId = context.getRecommendedAssigneeId();
if (assigneeId == null) {
return DispatchDecision.unavailable("未指定执行人");
}
// 查询保洁员状态
OpsCleanerStatusDO cleanerStatus = cleanerStatusService.getStatus(assigneeId);
if (cleanerStatus == null) {
return DispatchDecision.unavailable("保洁员不存在: " + assigneeId);
}
// 转换为通用状态接口
AssigneeStatus assigneeStatus = new CleanerAssigneeStatusAdapter(cleanerStatus);
context.setAssigneeStatus(assigneeStatus);
// 查询等待任务数量
List<OrderQueueDTO> waitingTasks = orderQueueService.getWaitingTasksByUserId(assigneeId);
int waitingCount = waitingTasks != null ? waitingTasks.size() : 0;
log.info("保洁员调度决策: assigneeId={}, status={}, waitingCount={}, orderIsUrgent={}",
assigneeId, assigneeStatus.getStatus(), waitingCount, context.isUrgent());
// 决策调度路径
if (assigneeStatus.isIdle() && assigneeStatus.getCurrentTaskCount() == 0) {
// 空闲且无正在执行的任务
if (waitingCount > 0) {
// 有等待任务,先推送等待任务
log.info("决策: PUSH_AND_ENQUEUE - 保洁员空闲但有等待任务");
return DispatchDecision.pushAndEnqueue();
} else {
// 直接派单
log.info("决策: DIRECT_DISPATCH - 保洁员空闲无等待任务");
return DispatchDecision.directDispatch();
}
} else if (context.isUrgent()) {
// P0紧急任务需要打断
if (assigneeStatus.getCurrentTaskCount() > 0) {
Long currentOrderId = cleanerStatus.getCurrentOpsOrderId();
log.warn("决策: INTERRUPT_AND_DISPATCH - P0紧急任务打断当前任务: currentOrderId={}", currentOrderId);
return DispatchDecision.interruptAndDispatch(currentOrderId);
} else {
log.info("决策: DIRECT_DISPATCH - P0紧急任务直接派单");
return DispatchDecision.directDispatch();
}
} else {
// 非紧急任务,保洁员忙碌,入队等待
log.info("决策: ENQUEUE_ONLY - 保洁员忙碌,任务入队等待");
return DispatchDecision.enqueueOnly();
}
}
@Override
public InterruptDecision evaluateInterrupt(Long currentAssigneeId, Long currentOrderId,
OrderDispatchContext urgentContext) {
log.info("评估是否可打断保洁员任务: assigneeId={}, currentOrderId={}, urgentOrderId={}, urgentPriority={}",
currentAssigneeId, currentOrderId, urgentContext.getOrderId(), urgentContext.getPriority());
// 检查保洁员状态
OpsCleanerStatusDO cleanerStatus = cleanerStatusService.getStatus(currentAssigneeId);
if (cleanerStatus == null) {
return InterruptDecision.deny("保洁员不存在", "无法执行打断");
}
// P0任务可以打断任何任务
if (urgentContext.isUrgent()) {
log.warn("允许打断: P0紧急任务可以打断当前任务");
return InterruptDecision.allowByDefault();
}
// P1/P2任务不能打断
log.info("拒绝打断: 非P0任务不能打断当前任务");
return InterruptDecision.deny(
"紧急任务优先级不足",
"建议等待当前任务完成"
);
}
}