diff --git a/viewsh-module-ops/viewsh-module-security-biz/src/main/java/com/viewsh/module/ops/security/integration/listener/SecurityOrderEventListener.java b/viewsh-module-ops/viewsh-module-security-biz/src/main/java/com/viewsh/module/ops/security/integration/listener/SecurityOrderEventListener.java index eb00851..68e0f04 100644 --- a/viewsh-module-ops/viewsh-module-security-biz/src/main/java/com/viewsh/module/ops/security/integration/listener/SecurityOrderEventListener.java +++ b/viewsh-module-ops/viewsh-module-security-biz/src/main/java/com/viewsh/module/ops/security/integration/listener/SecurityOrderEventListener.java @@ -15,6 +15,8 @@ import com.viewsh.module.ops.infrastructure.log.enumeration.LogModule; import com.viewsh.module.ops.infrastructure.log.enumeration.LogType; import com.viewsh.module.ops.infrastructure.log.recorder.EventLogRecord; import com.viewsh.module.ops.infrastructure.log.recorder.EventLogRecorder; +import com.viewsh.module.ops.dal.dataobject.workorder.OpsOrderDO; +import com.viewsh.module.ops.dal.mysql.workorder.OpsOrderMapper; import com.viewsh.module.ops.security.dal.dataobject.workorder.OpsOrderSecurityExtDO; import com.viewsh.module.ops.security.dal.mysql.workorder.OpsOrderSecurityExtMapper; import jakarta.annotation.Resource; @@ -47,6 +49,9 @@ public class SecurityOrderEventListener { @Resource private OpsOrderSecurityExtMapper securityExtMapper; + @Resource + private OpsOrderMapper opsOrderMapper; + @Resource private DispatchEngine dispatchEngine; @@ -124,6 +129,7 @@ public class SecurityOrderEventListener { log.info("安保工单状态变更: orderId={}, {} -> {}", orderId, event.getOldStatus(), newStatus); switch (newStatus) { + case QUEUED -> handleQueued(orderId, event); case DISPATCHED -> handleDispatched(orderId, event); case CONFIRMED -> handleConfirmed(orderId, event); case COMPLETED -> handleCompleted(orderId, event); @@ -153,6 +159,27 @@ public class SecurityOrderEventListener { // ==================== 状态处理方法 ==================== + private void handleQueued(Long orderId, OrderStateChangedEvent event) { + // 入队时先写入处理人信息(dispatchedTime 等派发时再写) + Long assigneeId = event.getPayloadLong("assigneeId"); + String assigneeName = (String) event.getPayload().get("assigneeName"); + String assigneePhone = (String) event.getPayload().get("assigneePhone"); + + if (assigneeId != null) { + OpsOrderSecurityExtDO extUpdate = new OpsOrderSecurityExtDO(); + extUpdate.setOpsOrderId(orderId); + extUpdate.setAssignedUserId(assigneeId); + extUpdate.setAssignedUserName(assigneeName); + extUpdate.setAssignedUserPhone(assigneePhone); + securityExtMapper.insertOrUpdateSelective(extUpdate); + } + + String message = assigneeName != null + ? String.format("工单已入队,分配给 %s,等待派发", assigneeName) + : "工单已入队等待派发"; + recordLog(EventDomain.DISPATCH, LogType.ORDER_DISPATCHED, message, orderId, assigneeId); + } + private void handleDispatched(Long orderId, OrderStateChangedEvent event) { // 1. 记录下发时间 + 处理人信息 Long assigneeId = event.getPayloadLong("assigneeId"); @@ -231,6 +258,25 @@ public class SecurityOrderEventListener { } recordLog(EventDomain.DISPATCH, LogType.ORDER_CANCELLED, message, orderId, operatorId); + + // 取消后自动派发下一个等待工单 + // EventPublishHandler 只在 COMPLETED 时发布 OrderCompletedEvent, + // CANCELLED 不会触发 onOrderCompleted,需要在此处补充调用 + Long assigneeId = event.getPayloadLong("assigneeId"); + if (assigneeId == null) { + // 兜底从工单表获取(管理员取消等场景 payload 可能无 assigneeId) + OpsOrderDO order = opsOrderMapper.selectById(orderId); + if (order != null) { + assigneeId = order.getAssigneeId(); + } + } + if (assigneeId != null) { + try { + dispatchEngine.autoDispatchNext(orderId, assigneeId); + } catch (Exception e) { + log.error("安保工单取消后自动派送下一个失败: orderId={}", orderId, e); + } + } } private void handlePaused(Long orderId, OrderStateChangedEvent event) { diff --git a/viewsh-module-ops/viewsh-module-security-biz/src/main/java/com/viewsh/module/ops/security/service/dispatch/SecurityAreaAssignStrategy.java b/viewsh-module-ops/viewsh-module-security-biz/src/main/java/com/viewsh/module/ops/security/service/dispatch/SecurityAreaAssignStrategy.java index 32404c5..6969bd9 100644 --- a/viewsh-module-ops/viewsh-module-security-biz/src/main/java/com/viewsh/module/ops/security/service/dispatch/SecurityAreaAssignStrategy.java +++ b/viewsh-module-ops/viewsh-module-security-biz/src/main/java/com/viewsh/module/ops/security/service/dispatch/SecurityAreaAssignStrategy.java @@ -1,6 +1,7 @@ package com.viewsh.module.ops.security.service.dispatch; import cn.hutool.core.collection.CollUtil; +import com.viewsh.module.ops.api.dispatch.UserDispatchStatusDTO; import com.viewsh.module.ops.core.dispatch.DispatchEngine; import com.viewsh.module.ops.core.dispatch.model.AssigneeRecommendation; import com.viewsh.module.ops.core.dispatch.model.OrderDispatchContext; @@ -8,6 +9,7 @@ import com.viewsh.module.ops.core.dispatch.strategy.AssignStrategy; import com.viewsh.module.ops.enums.WorkOrderTypeEnum; import com.viewsh.module.ops.security.dal.dataobject.area.OpsAreaSecurityUserDO; import com.viewsh.module.ops.security.dal.mysql.area.OpsAreaSecurityUserMapper; +import com.viewsh.module.ops.service.dispatch.UserDispatchStatusService; import com.viewsh.module.system.api.user.AdminUserApi; import com.viewsh.module.system.api.user.dto.AdminUserRespDTO; import jakarta.annotation.PostConstruct; @@ -16,11 +18,17 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.stream.Collectors; /** * 安保工单区域分配策略 *

- * 根据区域绑定的安保人员随机分配。 + * 职责:决定谁来接单 + *

+ * 通过 {@link UserDispatchStatusService} 批量读取 Redis(Pipeline,一次网络往返), + * 选择活跃工单最少的人员,同等负载取 sort 最小的。 * * @author lzh */ @@ -40,6 +48,9 @@ public class SecurityAreaAssignStrategy implements AssignStrategy { @Resource private AdminUserApi adminUserApi; + @Resource + private UserDispatchStatusService userDispatchStatusService; + @PostConstruct public void init() { dispatchEngine.registerAssignStrategy(BUSINESS_TYPE, this); @@ -64,16 +75,45 @@ public class SecurityAreaAssignStrategy implements AssignStrategy { return AssigneeRecommendation.none(); } + // 1. 查询区域绑定的安保人员(已按 sort ASC 排序) List users = areaSecurityUserMapper.selectListByAreaId(areaId); if (CollUtil.isEmpty(users)) { log.info("区域 {} 无绑定安保人员,工单 {} 等待手动分配", areaId, context.getOrderId()); return AssigneeRecommendation.none(); } - // 选择 sort 值最小的人员(sort 越小优先级越高,由 Mapper 已按 sort ASC 排序) - OpsAreaSecurityUserDO chosen = users.get(0); + // 2. 批量读 Redis 获取每人的调度状态(Pipeline,一次网络往返) + List userIds = users.stream() + .map(OpsAreaSecurityUserDO::getUserId) + .toList(); + List statuses = userDispatchStatusService.batchGetStatus(userIds); - // 从系统用户表查询 nickname 和 mobile + Map loadMap = statuses.stream() + .filter(Objects::nonNull) + .collect(Collectors.toMap( + UserDispatchStatusDTO::getUserId, + UserDispatchStatusDTO::getActiveOrderCountSafe, + (a, b) -> a)); + + // 3. 选负载最小的人(users 已按 sort ASC 排序,遍历顺序保证同等负载取 sort 最小) + OpsAreaSecurityUserDO chosen = null; + int minLoad = Integer.MAX_VALUE; + for (OpsAreaSecurityUserDO user : users) { + int load = loadMap.getOrDefault(user.getUserId(), 0); + if (load < minLoad) { + minLoad = load; + chosen = user; + } + } + + if (chosen == null) { + return AssigneeRecommendation.none(); + } + + log.info("安保分配决策: areaId={}, chosen={}, load={}, totalCandidates={}", + areaId, chosen.getUserId(), minLoad, users.size()); + + // 4. 查询姓名和手机号 String assigneeName = chosen.getUserName(); // 兜底使用冗余字段 String assigneePhone = null; AdminUserRespDTO user = adminUserApi.getUser(chosen.getUserId()).getCheckedData(); @@ -82,11 +122,14 @@ public class SecurityAreaAssignStrategy implements AssignStrategy { assigneePhone = user.getMobile(); } + // 评分:负载越低分越高(基础分 50,无负载 +30,每少一个任务 +10,上限 100) + int score = Math.min(100, 50 + Math.max(0, (3 - minLoad)) * 10); + AssigneeRecommendation recommendation = AssigneeRecommendation.of( - chosen.getUserId(), assigneeName, 50, "区域排序优先分配"); + chosen.getUserId(), assigneeName, score, + String.format("负载均衡选择(当前活跃工单: %d)", minLoad)); recommendation.setAssigneePhone(assigneePhone); recommendation.setAreaId(areaId); return recommendation; } - } diff --git a/viewsh-module-ops/viewsh-module-security-biz/src/main/java/com/viewsh/module/ops/security/service/dispatch/SecurityScheduleStrategy.java b/viewsh-module-ops/viewsh-module-security-biz/src/main/java/com/viewsh/module/ops/security/service/dispatch/SecurityScheduleStrategy.java index 99a0f56..e8eff43 100644 --- a/viewsh-module-ops/viewsh-module-security-biz/src/main/java/com/viewsh/module/ops/security/service/dispatch/SecurityScheduleStrategy.java +++ b/viewsh-module-ops/viewsh-module-security-biz/src/main/java/com/viewsh/module/ops/security/service/dispatch/SecurityScheduleStrategy.java @@ -1,11 +1,15 @@ package com.viewsh.module.ops.security.service.dispatch; +import com.viewsh.module.ops.api.dispatch.UserDispatchStatusDTO; import com.viewsh.module.ops.core.dispatch.DispatchEngine; +import com.viewsh.module.ops.core.dispatch.model.AssigneeStatus; import com.viewsh.module.ops.core.dispatch.model.DispatchDecision; -import com.viewsh.module.ops.core.dispatch.model.DispatchPath; import com.viewsh.module.ops.core.dispatch.model.OrderDispatchContext; +import com.viewsh.module.ops.core.dispatch.strategy.InterruptDecision; import com.viewsh.module.ops.core.dispatch.strategy.ScheduleStrategy; import com.viewsh.module.ops.enums.WorkOrderTypeEnum; +import com.viewsh.module.ops.service.dispatch.UserAssigneeStatusAdapter; +import com.viewsh.module.ops.service.dispatch.UserDispatchStatusService; import jakarta.annotation.PostConstruct; import jakarta.annotation.Resource; import lombok.extern.slf4j.Slf4j; @@ -14,9 +18,15 @@ import org.springframework.stereotype.Component; /** * 安保工单调度策略 *

- * 安保工单调度相对简单: - * - 有可用人员 → 直接派单 - * - 人员忙碌 → 入队等待 + * 职责:决定怎么派单 + *

+ * 通过 {@link UserDispatchStatusService} 读取 Redis 中的人员状态(O(1)), + * 根据忙碌/空闲决定调度路径: + *

* * @author lzh */ @@ -30,6 +40,9 @@ public class SecurityScheduleStrategy implements ScheduleStrategy { @Resource private DispatchEngine dispatchEngine; + @Resource + private UserDispatchStatusService userDispatchStatusService; + @PostConstruct public void init() { dispatchEngine.registerScheduleStrategy(BUSINESS_TYPE, this); @@ -48,12 +61,44 @@ public class SecurityScheduleStrategy implements ScheduleStrategy { @Override public DispatchDecision decide(OrderDispatchContext context) { - // 安保工单默认直接派单 - // DispatchEngine 会根据执行人状态自动选择路径 - return DispatchDecision.builder() - .path(DispatchPath.DIRECT_DISPATCH) - .reason("安保工单直接派单") - .build(); + Long assigneeId = context.getRecommendedAssigneeId(); + if (assigneeId == null) { + return DispatchDecision.unavailable("未指定执行人"); + } + + // O(1) 读 Redis + UserDispatchStatusDTO userStatus = userDispatchStatusService.getStatus(assigneeId); + + // 适配为通用接口,设置到上下文 + AssigneeStatus assigneeStatus = new UserAssigneeStatusAdapter(userStatus); + context.setAssigneeStatus(assigneeStatus); + + // PAUSED 也视为忙碌,不应给暂停中的人员推送新工单 + boolean isBusy = userStatus != null && (userStatus.isBusy() || userStatus.isPaused()); + int waitingCount = userStatus != null ? userStatus.getWaitingTaskCountSafe() : 0; + + log.info("安保调度决策: assigneeId={}, status={}, isBusy={}, waitingCount={}, isUrgent={}", + assigneeId, + userStatus != null ? userStatus.getStatus() : "IDLE(null)", + isBusy, waitingCount, context.isUrgent()); + + // 决策调度路径 + if (!isBusy && waitingCount == 0) { + log.info("决策: DIRECT_DISPATCH - 人员空闲无等待任务"); + return DispatchDecision.directDispatch(); + } else if (!isBusy && waitingCount > 0) { + log.info("决策: PUSH_AND_ENQUEUE - 人员空闲但有等待任务"); + return DispatchDecision.pushAndEnqueue(); + } else { + log.info("决策: ENQUEUE_ONLY - 人员忙碌,任务入队等待"); + return DispatchDecision.enqueueOnly(); + } } + @Override + public InterruptDecision evaluateInterrupt(Long currentAssigneeId, Long currentOrderId, + OrderDispatchContext urgentContext) { + // 安保工单不支持抢断,与保洁一致(非抢占式队列派发) + return InterruptDecision.deny("安保工单不支持抢断", "工单将按队列优先级在下一轮派发"); + } }