feat(security): 安保工单接入队列调度,支持入队等待和负载均衡
改造安保调度策略,接入 UserDispatchStatusService: SecurityScheduleStrategy: - 读 Redis 判断人员忙碌/空闲,决策 DIRECT_DISPATCH / PUSH_AND_ENQUEUE / ENQUEUE_ONLY - PAUSED 状态视为忙碌,不给暂停中的人员推送新工单 - 替换原来的始终 DIRECT_DISPATCH 逻辑 SecurityAreaAssignStrategy: - Pipeline 批量读 Redis 获取每人活跃工单数 - 选负载最轻的人员,同等负载取 sort 最小 SecurityOrderEventListener: - 新增 QUEUED 分支,入队时写入 assignee 信息到扩展表 - 取消工单后自动派发下一个等待工单(兜底从 ops_order 表获取 assigneeId) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -15,6 +15,8 @@ import com.viewsh.module.ops.infrastructure.log.enumeration.LogModule;
|
||||
import com.viewsh.module.ops.infrastructure.log.enumeration.LogType;
|
||||
import com.viewsh.module.ops.infrastructure.log.recorder.EventLogRecord;
|
||||
import com.viewsh.module.ops.infrastructure.log.recorder.EventLogRecorder;
|
||||
import com.viewsh.module.ops.dal.dataobject.workorder.OpsOrderDO;
|
||||
import com.viewsh.module.ops.dal.mysql.workorder.OpsOrderMapper;
|
||||
import com.viewsh.module.ops.security.dal.dataobject.workorder.OpsOrderSecurityExtDO;
|
||||
import com.viewsh.module.ops.security.dal.mysql.workorder.OpsOrderSecurityExtMapper;
|
||||
import jakarta.annotation.Resource;
|
||||
@@ -47,6 +49,9 @@ public class SecurityOrderEventListener {
|
||||
@Resource
|
||||
private OpsOrderSecurityExtMapper securityExtMapper;
|
||||
|
||||
@Resource
|
||||
private OpsOrderMapper opsOrderMapper;
|
||||
|
||||
@Resource
|
||||
private DispatchEngine dispatchEngine;
|
||||
|
||||
@@ -124,6 +129,7 @@ public class SecurityOrderEventListener {
|
||||
log.info("安保工单状态变更: orderId={}, {} -> {}", orderId, event.getOldStatus(), newStatus);
|
||||
|
||||
switch (newStatus) {
|
||||
case QUEUED -> handleQueued(orderId, event);
|
||||
case DISPATCHED -> handleDispatched(orderId, event);
|
||||
case CONFIRMED -> handleConfirmed(orderId, event);
|
||||
case COMPLETED -> handleCompleted(orderId, event);
|
||||
@@ -153,6 +159,27 @@ public class SecurityOrderEventListener {
|
||||
|
||||
// ==================== 状态处理方法 ====================
|
||||
|
||||
private void handleQueued(Long orderId, OrderStateChangedEvent event) {
|
||||
// 入队时先写入处理人信息(dispatchedTime 等派发时再写)
|
||||
Long assigneeId = event.getPayloadLong("assigneeId");
|
||||
String assigneeName = (String) event.getPayload().get("assigneeName");
|
||||
String assigneePhone = (String) event.getPayload().get("assigneePhone");
|
||||
|
||||
if (assigneeId != null) {
|
||||
OpsOrderSecurityExtDO extUpdate = new OpsOrderSecurityExtDO();
|
||||
extUpdate.setOpsOrderId(orderId);
|
||||
extUpdate.setAssignedUserId(assigneeId);
|
||||
extUpdate.setAssignedUserName(assigneeName);
|
||||
extUpdate.setAssignedUserPhone(assigneePhone);
|
||||
securityExtMapper.insertOrUpdateSelective(extUpdate);
|
||||
}
|
||||
|
||||
String message = assigneeName != null
|
||||
? String.format("工单已入队,分配给 %s,等待派发", assigneeName)
|
||||
: "工单已入队等待派发";
|
||||
recordLog(EventDomain.DISPATCH, LogType.ORDER_DISPATCHED, message, orderId, assigneeId);
|
||||
}
|
||||
|
||||
private void handleDispatched(Long orderId, OrderStateChangedEvent event) {
|
||||
// 1. 记录下发时间 + 处理人信息
|
||||
Long assigneeId = event.getPayloadLong("assigneeId");
|
||||
@@ -231,6 +258,25 @@ public class SecurityOrderEventListener {
|
||||
}
|
||||
|
||||
recordLog(EventDomain.DISPATCH, LogType.ORDER_CANCELLED, message, orderId, operatorId);
|
||||
|
||||
// 取消后自动派发下一个等待工单
|
||||
// EventPublishHandler 只在 COMPLETED 时发布 OrderCompletedEvent,
|
||||
// CANCELLED 不会触发 onOrderCompleted,需要在此处补充调用
|
||||
Long assigneeId = event.getPayloadLong("assigneeId");
|
||||
if (assigneeId == null) {
|
||||
// 兜底从工单表获取(管理员取消等场景 payload 可能无 assigneeId)
|
||||
OpsOrderDO order = opsOrderMapper.selectById(orderId);
|
||||
if (order != null) {
|
||||
assigneeId = order.getAssigneeId();
|
||||
}
|
||||
}
|
||||
if (assigneeId != null) {
|
||||
try {
|
||||
dispatchEngine.autoDispatchNext(orderId, assigneeId);
|
||||
} catch (Exception e) {
|
||||
log.error("安保工单取消后自动派送下一个失败: orderId={}", orderId, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void handlePaused(Long orderId, OrderStateChangedEvent event) {
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package com.viewsh.module.ops.security.service.dispatch;
|
||||
|
||||
import cn.hutool.core.collection.CollUtil;
|
||||
import com.viewsh.module.ops.api.dispatch.UserDispatchStatusDTO;
|
||||
import com.viewsh.module.ops.core.dispatch.DispatchEngine;
|
||||
import com.viewsh.module.ops.core.dispatch.model.AssigneeRecommendation;
|
||||
import com.viewsh.module.ops.core.dispatch.model.OrderDispatchContext;
|
||||
@@ -8,6 +9,7 @@ import com.viewsh.module.ops.core.dispatch.strategy.AssignStrategy;
|
||||
import com.viewsh.module.ops.enums.WorkOrderTypeEnum;
|
||||
import com.viewsh.module.ops.security.dal.dataobject.area.OpsAreaSecurityUserDO;
|
||||
import com.viewsh.module.ops.security.dal.mysql.area.OpsAreaSecurityUserMapper;
|
||||
import com.viewsh.module.ops.service.dispatch.UserDispatchStatusService;
|
||||
import com.viewsh.module.system.api.user.AdminUserApi;
|
||||
import com.viewsh.module.system.api.user.dto.AdminUserRespDTO;
|
||||
import jakarta.annotation.PostConstruct;
|
||||
@@ -16,11 +18,17 @@ import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* 安保工单区域分配策略
|
||||
* <p>
|
||||
* 根据区域绑定的安保人员随机分配。
|
||||
* 职责:决定谁来接单
|
||||
* <p>
|
||||
* 通过 {@link UserDispatchStatusService} 批量读取 Redis(Pipeline,一次网络往返),
|
||||
* 选择活跃工单最少的人员,同等负载取 sort 最小的。
|
||||
*
|
||||
* @author lzh
|
||||
*/
|
||||
@@ -40,6 +48,9 @@ public class SecurityAreaAssignStrategy implements AssignStrategy {
|
||||
@Resource
|
||||
private AdminUserApi adminUserApi;
|
||||
|
||||
@Resource
|
||||
private UserDispatchStatusService userDispatchStatusService;
|
||||
|
||||
@PostConstruct
|
||||
public void init() {
|
||||
dispatchEngine.registerAssignStrategy(BUSINESS_TYPE, this);
|
||||
@@ -64,16 +75,45 @@ public class SecurityAreaAssignStrategy implements AssignStrategy {
|
||||
return AssigneeRecommendation.none();
|
||||
}
|
||||
|
||||
// 1. 查询区域绑定的安保人员(已按 sort ASC 排序)
|
||||
List<OpsAreaSecurityUserDO> users = areaSecurityUserMapper.selectListByAreaId(areaId);
|
||||
if (CollUtil.isEmpty(users)) {
|
||||
log.info("区域 {} 无绑定安保人员,工单 {} 等待手动分配", areaId, context.getOrderId());
|
||||
return AssigneeRecommendation.none();
|
||||
}
|
||||
|
||||
// 选择 sort 值最小的人员(sort 越小优先级越高,由 Mapper 已按 sort ASC 排序)
|
||||
OpsAreaSecurityUserDO chosen = users.get(0);
|
||||
// 2. 批量读 Redis 获取每人的调度状态(Pipeline,一次网络往返)
|
||||
List<Long> userIds = users.stream()
|
||||
.map(OpsAreaSecurityUserDO::getUserId)
|
||||
.toList();
|
||||
List<UserDispatchStatusDTO> statuses = userDispatchStatusService.batchGetStatus(userIds);
|
||||
|
||||
// 从系统用户表查询 nickname 和 mobile
|
||||
Map<Long, Integer> loadMap = statuses.stream()
|
||||
.filter(Objects::nonNull)
|
||||
.collect(Collectors.toMap(
|
||||
UserDispatchStatusDTO::getUserId,
|
||||
UserDispatchStatusDTO::getActiveOrderCountSafe,
|
||||
(a, b) -> a));
|
||||
|
||||
// 3. 选负载最小的人(users 已按 sort ASC 排序,遍历顺序保证同等负载取 sort 最小)
|
||||
OpsAreaSecurityUserDO chosen = null;
|
||||
int minLoad = Integer.MAX_VALUE;
|
||||
for (OpsAreaSecurityUserDO user : users) {
|
||||
int load = loadMap.getOrDefault(user.getUserId(), 0);
|
||||
if (load < minLoad) {
|
||||
minLoad = load;
|
||||
chosen = user;
|
||||
}
|
||||
}
|
||||
|
||||
if (chosen == null) {
|
||||
return AssigneeRecommendation.none();
|
||||
}
|
||||
|
||||
log.info("安保分配决策: areaId={}, chosen={}, load={}, totalCandidates={}",
|
||||
areaId, chosen.getUserId(), minLoad, users.size());
|
||||
|
||||
// 4. 查询姓名和手机号
|
||||
String assigneeName = chosen.getUserName(); // 兜底使用冗余字段
|
||||
String assigneePhone = null;
|
||||
AdminUserRespDTO user = adminUserApi.getUser(chosen.getUserId()).getCheckedData();
|
||||
@@ -82,11 +122,14 @@ public class SecurityAreaAssignStrategy implements AssignStrategy {
|
||||
assigneePhone = user.getMobile();
|
||||
}
|
||||
|
||||
// 评分:负载越低分越高(基础分 50,无负载 +30,每少一个任务 +10,上限 100)
|
||||
int score = Math.min(100, 50 + Math.max(0, (3 - minLoad)) * 10);
|
||||
|
||||
AssigneeRecommendation recommendation = AssigneeRecommendation.of(
|
||||
chosen.getUserId(), assigneeName, 50, "区域排序优先分配");
|
||||
chosen.getUserId(), assigneeName, score,
|
||||
String.format("负载均衡选择(当前活跃工单: %d)", minLoad));
|
||||
recommendation.setAssigneePhone(assigneePhone);
|
||||
recommendation.setAreaId(areaId);
|
||||
return recommendation;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -1,11 +1,15 @@
|
||||
package com.viewsh.module.ops.security.service.dispatch;
|
||||
|
||||
import com.viewsh.module.ops.api.dispatch.UserDispatchStatusDTO;
|
||||
import com.viewsh.module.ops.core.dispatch.DispatchEngine;
|
||||
import com.viewsh.module.ops.core.dispatch.model.AssigneeStatus;
|
||||
import com.viewsh.module.ops.core.dispatch.model.DispatchDecision;
|
||||
import com.viewsh.module.ops.core.dispatch.model.DispatchPath;
|
||||
import com.viewsh.module.ops.core.dispatch.model.OrderDispatchContext;
|
||||
import com.viewsh.module.ops.core.dispatch.strategy.InterruptDecision;
|
||||
import com.viewsh.module.ops.core.dispatch.strategy.ScheduleStrategy;
|
||||
import com.viewsh.module.ops.enums.WorkOrderTypeEnum;
|
||||
import com.viewsh.module.ops.service.dispatch.UserAssigneeStatusAdapter;
|
||||
import com.viewsh.module.ops.service.dispatch.UserDispatchStatusService;
|
||||
import jakarta.annotation.PostConstruct;
|
||||
import jakarta.annotation.Resource;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
@@ -14,9 +18,15 @@ import org.springframework.stereotype.Component;
|
||||
/**
|
||||
* 安保工单调度策略
|
||||
* <p>
|
||||
* 安保工单调度相对简单:
|
||||
* - 有可用人员 → 直接派单
|
||||
* - 人员忙碌 → 入队等待
|
||||
* 职责:决定怎么派单
|
||||
* <p>
|
||||
* 通过 {@link UserDispatchStatusService} 读取 Redis 中的人员状态(O(1)),
|
||||
* 根据忙碌/空闲决定调度路径:
|
||||
* <ul>
|
||||
* <li>完全空闲(无执行中、无等待)→ DIRECT_DISPATCH</li>
|
||||
* <li>无执行中但有等待 → PUSH_AND_ENQUEUE</li>
|
||||
* <li>忙碌 → ENQUEUE_ONLY</li>
|
||||
* </ul>
|
||||
*
|
||||
* @author lzh
|
||||
*/
|
||||
@@ -30,6 +40,9 @@ public class SecurityScheduleStrategy implements ScheduleStrategy {
|
||||
@Resource
|
||||
private DispatchEngine dispatchEngine;
|
||||
|
||||
@Resource
|
||||
private UserDispatchStatusService userDispatchStatusService;
|
||||
|
||||
@PostConstruct
|
||||
public void init() {
|
||||
dispatchEngine.registerScheduleStrategy(BUSINESS_TYPE, this);
|
||||
@@ -48,12 +61,44 @@ public class SecurityScheduleStrategy implements ScheduleStrategy {
|
||||
|
||||
@Override
|
||||
public DispatchDecision decide(OrderDispatchContext context) {
|
||||
// 安保工单默认直接派单
|
||||
// DispatchEngine 会根据执行人状态自动选择路径
|
||||
return DispatchDecision.builder()
|
||||
.path(DispatchPath.DIRECT_DISPATCH)
|
||||
.reason("安保工单直接派单")
|
||||
.build();
|
||||
Long assigneeId = context.getRecommendedAssigneeId();
|
||||
if (assigneeId == null) {
|
||||
return DispatchDecision.unavailable("未指定执行人");
|
||||
}
|
||||
|
||||
// O(1) 读 Redis
|
||||
UserDispatchStatusDTO userStatus = userDispatchStatusService.getStatus(assigneeId);
|
||||
|
||||
// 适配为通用接口,设置到上下文
|
||||
AssigneeStatus assigneeStatus = new UserAssigneeStatusAdapter(userStatus);
|
||||
context.setAssigneeStatus(assigneeStatus);
|
||||
|
||||
// PAUSED 也视为忙碌,不应给暂停中的人员推送新工单
|
||||
boolean isBusy = userStatus != null && (userStatus.isBusy() || userStatus.isPaused());
|
||||
int waitingCount = userStatus != null ? userStatus.getWaitingTaskCountSafe() : 0;
|
||||
|
||||
log.info("安保调度决策: assigneeId={}, status={}, isBusy={}, waitingCount={}, isUrgent={}",
|
||||
assigneeId,
|
||||
userStatus != null ? userStatus.getStatus() : "IDLE(null)",
|
||||
isBusy, waitingCount, context.isUrgent());
|
||||
|
||||
// 决策调度路径
|
||||
if (!isBusy && waitingCount == 0) {
|
||||
log.info("决策: DIRECT_DISPATCH - 人员空闲无等待任务");
|
||||
return DispatchDecision.directDispatch();
|
||||
} else if (!isBusy && waitingCount > 0) {
|
||||
log.info("决策: PUSH_AND_ENQUEUE - 人员空闲但有等待任务");
|
||||
return DispatchDecision.pushAndEnqueue();
|
||||
} else {
|
||||
log.info("决策: ENQUEUE_ONLY - 人员忙碌,任务入队等待");
|
||||
return DispatchDecision.enqueueOnly();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public InterruptDecision evaluateInterrupt(Long currentAssigneeId, Long currentOrderId,
|
||||
OrderDispatchContext urgentContext) {
|
||||
// 安保工单不支持抢断,与保洁一致(非抢占式队列派发)
|
||||
return InterruptDecision.deny("安保工单不支持抢断", "工单将按队列优先级在下一轮派发");
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user