From 9115e03878a184ed371c4b895db2e387295623c6 Mon Sep 17 00:00:00 2001 From: lzh Date: Wed, 25 Mar 2026 15:44:42 +0800 Subject: [PATCH] =?UTF-8?q?feat(ops):=20=E6=96=B0=E5=A2=9E=E9=80=9A?= =?UTF-8?q?=E7=94=A8=E4=BA=BA=E5=91=98=E8=B0=83=E5=BA=A6=E7=8A=B6=E6=80=81?= =?UTF-8?q?=E6=9C=8D=E5=8A=A1=20UserDispatchStatusService?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 基于 Redis Hash 维护人员维度的调度状态,供安保/工程/客服业务线共用。 与保洁的 BadgeDeviceStatusService(设备维度)并行。 核心设计: - Redis Key: ops:user:dispatch:{userId},存储 status/activeOrderCount/waitingTaskCount 等 - 所有写操作使用 Lua 脚本原子执行,保证多业务线并发安全 - 事件监听器 @EventListener(事务内同步)自动排除 CLEAN 类型 - Redis 丢数据时降级为 IDLE,下一次事件自动重建(自愈) 新增文件: - UserDispatchStatusDTO (ops-api) - UserDispatchStatusService 接口 (ops-biz) - UserDispatchStatusServiceImpl - Lua 脚本实现 (ops-biz) - UserDispatchStatusEventListener - 通用事件监听 (ops-biz) - UserAssigneeStatusAdapter - AssigneeStatus 适配器 (ops-biz) Co-Authored-By: Claude Opus 4.6 (1M context) --- .../api/dispatch/UserDispatchStatusDTO.java | 89 ++++ .../dispatch/UserAssigneeStatusAdapter.java | 113 +++++ .../UserDispatchStatusEventListener.java | 81 ++++ .../dispatch/UserDispatchStatusService.java | 143 +++++++ .../UserDispatchStatusServiceImpl.java | 392 ++++++++++++++++++ .../job/IdleUserPendingOrderCheckJob.java | 140 +++++++ 6 files changed, 958 insertions(+) create mode 100644 viewsh-module-ops/viewsh-module-ops-api/src/main/java/com/viewsh/module/ops/api/dispatch/UserDispatchStatusDTO.java create mode 100644 viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/dispatch/UserAssigneeStatusAdapter.java create mode 100644 viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/dispatch/UserDispatchStatusEventListener.java create mode 100644 viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/dispatch/UserDispatchStatusService.java create mode 100644 viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/dispatch/UserDispatchStatusServiceImpl.java create mode 100644 viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/job/IdleUserPendingOrderCheckJob.java diff --git a/viewsh-module-ops/viewsh-module-ops-api/src/main/java/com/viewsh/module/ops/api/dispatch/UserDispatchStatusDTO.java b/viewsh-module-ops/viewsh-module-ops-api/src/main/java/com/viewsh/module/ops/api/dispatch/UserDispatchStatusDTO.java new file mode 100644 index 00000000..d8af27e0 --- /dev/null +++ b/viewsh-module-ops/viewsh-module-ops-api/src/main/java/com/viewsh/module/ops/api/dispatch/UserDispatchStatusDTO.java @@ -0,0 +1,89 @@ +package com.viewsh.module.ops.api.dispatch; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** + * 人员调度状态 DTO + *

+ * 对应 Redis Hash: ops:user:dispatch:{userId} + *

+ * 适用于安保、工程、客服等人员维度的业务线。 + * 保洁使用设备维度的 {@code BadgeDeviceStatusDTO}。 + * + * @author lzh + */ +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class UserDispatchStatusDTO { + + /** + * 人员ID + */ + private Long userId; + + /** + * 调度状态:IDLE / BUSY / PAUSED + */ + private String status; + + /** + * 当前执行中的工单ID(DISPATCHED/CONFIRMED/ARRIVED 状态) + */ + private Long currentOrderId; + + /** + * 当前工单类型(SECURITY / REPAIR / SERVICE) + */ + private String currentOrderType; + + /** + * 当前工单状态(DISPATCHED / CONFIRMED / ARRIVED) + */ + private String currentOrderStatus; + + /** + * 活跃工单总数(含直接派单 + 排队中,不依赖队列记录) + */ + private Integer activeOrderCount; + + /** + * 队列中 WAITING 状态的任务数 + */ + private Integer waitingTaskCount; + + /** + * 最后更新时间戳(毫秒) + */ + private Long lastUpdateTime; + + public boolean isIdle() { + return "IDLE".equals(status) || status == null; + } + + public boolean isBusy() { + return "BUSY".equals(status); + } + + public boolean isPaused() { + return "PAUSED".equals(status); + } + + /** + * 获取活跃工单数(null 安全) + */ + public int getActiveOrderCountSafe() { + return activeOrderCount != null ? activeOrderCount : 0; + } + + /** + * 获取等待任务数(null 安全) + */ + public int getWaitingTaskCountSafe() { + return waitingTaskCount != null ? waitingTaskCount : 0; + } +} diff --git a/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/dispatch/UserAssigneeStatusAdapter.java b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/dispatch/UserAssigneeStatusAdapter.java new file mode 100644 index 00000000..302f4dce --- /dev/null +++ b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/dispatch/UserAssigneeStatusAdapter.java @@ -0,0 +1,113 @@ +package com.viewsh.module.ops.service.dispatch; + +import com.viewsh.module.ops.api.dispatch.UserDispatchStatusDTO; +import com.viewsh.module.ops.core.dispatch.model.AssigneeStatus; + +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneId; + +/** + * 人员调度状态适配器 + *

+ * 将 {@link UserDispatchStatusDTO} 适配为通用的 {@link AssigneeStatus} 接口, + * 供调度引擎和调度策略使用。 + *

+ * 与保洁的 {@code BadgeDeviceAssigneeStatusAdapter}(设备维度)对等。 + * + * @author lzh + */ +public class UserAssigneeStatusAdapter implements AssigneeStatus { + + private final UserDispatchStatusDTO userStatus; + + public UserAssigneeStatusAdapter(UserDispatchStatusDTO userStatus) { + this.userStatus = userStatus; + } + + @Override + public String getStatus() { + return userStatus != null ? userStatus.getStatus() : "IDLE"; + } + + @Override + public boolean isIdle() { + return userStatus == null || userStatus.isIdle(); + } + + @Override + public boolean isBusy() { + return userStatus != null && userStatus.isBusy(); + } + + @Override + public boolean isOnline() { + // 人员维度无在线/离线概念(无 IoT 设备),有状态记录即视为在线 + return userStatus != null; + } + + @Override + public Long getCurrentTaskCount() { + if (userStatus == null) { + return 0L; + } + return userStatus.getCurrentOrderId() != null ? 1L : 0L; + } + + @Override + public Long getWaitingTaskCount() { + if (userStatus == null) { + return 0L; + } + return (long) userStatus.getWaitingTaskCountSafe(); + } + + @Override + public Long getAssigneeId() { + return userStatus != null ? userStatus.getUserId() : null; + } + + @Override + public String getAssigneeName() { + // 人员姓名不存储在 Redis 状态中,由 AssignStrategy 提供 + return null; + } + + @Override + public Long getAreaId() { + // 区域信息不存储在人员状态中,由 AssignStrategy 提供 + return null; + } + + @Override + public LocalDateTime getLastHeartbeatTime() { + if (userStatus == null || userStatus.getLastUpdateTime() == null) { + return null; + } + return LocalDateTime.ofInstant( + Instant.ofEpochMilli(userStatus.getLastUpdateTime()), + ZoneId.systemDefault()); + } + + @Override + public Object getExtension(String key) { + if (userStatus == null) { + return null; + } + return switch (key) { + case "currentOrderId" -> userStatus.getCurrentOrderId(); + case "currentOrderType" -> userStatus.getCurrentOrderType(); + case "currentOrderStatus" -> userStatus.getCurrentOrderStatus(); + case "activeOrderCount" -> userStatus.getActiveOrderCount(); + case "waitingTaskCount" -> userStatus.getWaitingTaskCount(); + default -> null; + }; + } + + /** + * 获取原始 DTO + */ + public UserDispatchStatusDTO getUserStatus() { + return userStatus; + } +} diff --git a/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/dispatch/UserDispatchStatusEventListener.java b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/dispatch/UserDispatchStatusEventListener.java new file mode 100644 index 00000000..99ad67f1 --- /dev/null +++ b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/dispatch/UserDispatchStatusEventListener.java @@ -0,0 +1,81 @@ +package com.viewsh.module.ops.service.dispatch; + +import com.viewsh.module.ops.core.event.OrderStateChangedEvent; +import com.viewsh.module.ops.enums.WorkOrderStatusEnum; +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.springframework.context.event.EventListener; +import org.springframework.stereotype.Component; + +/** + * 通用人员调度状态事件监听器 + *

+ * 监听 {@link OrderStateChangedEvent},自动更新 Redis 中的人员调度状态。 + *

+ * 设计说明: + *

+ * + * @author lzh + */ +@Slf4j +@Component +public class UserDispatchStatusEventListener { + + @Resource + private UserDispatchStatusService userDispatchStatusService; + + /** + * 监听工单状态变更,同步更新人员调度状态 + */ + @EventListener + public void onOrderStateChanged(OrderStateChangedEvent event) { + // 排除保洁(保洁走设备维度 BadgeDeviceStatusEventListener) + if ("CLEAN".equals(event.getOrderType())) { + return; + } + + Long assigneeId = event.getPayloadLong("assigneeId"); + if (assigneeId == null) { + return; + } + + Long orderId = event.getOrderId(); + String orderType = event.getOrderType(); + WorkOrderStatusEnum newStatus = event.getNewStatus(); + WorkOrderStatusEnum oldStatus = event.getOldStatus(); + + log.debug("人员调度状态监听: orderId={}, orderType={}, {} -> {}, assigneeId={}", + orderId, orderType, oldStatus, newStatus, assigneeId); + + switch (newStatus) { + case DISPATCHED -> handleDispatched(assigneeId, orderId, orderType, oldStatus); + case QUEUED -> userDispatchStatusService.onOrderQueued(assigneeId, orderId, orderType); + case CONFIRMED, ARRIVED -> userDispatchStatusService.onOrderStatusAdvanced( + assigneeId, orderId, newStatus.getStatus()); + case COMPLETED, CANCELLED -> userDispatchStatusService.onOrderFinished(assigneeId, orderId); + case PAUSED -> userDispatchStatusService.onOrderPaused(assigneeId, orderId); + default -> log.debug("人员调度状态无需处理: orderId={}, status={}", orderId, newStatus); + } + } + + /** + * 处理 DISPATCHED 状态:区分直接派发 vs 从排队派发 + */ + private void handleDispatched(Long assigneeId, Long orderId, String orderType, + WorkOrderStatusEnum oldStatus) { + if (oldStatus == WorkOrderStatusEnum.QUEUED) { + // 从排队变为派发:waitingTaskCount-1,activeOrderCount 不变 + userDispatchStatusService.onQueuedOrderDispatched(assigneeId, orderId, orderType); + } else if (oldStatus == WorkOrderStatusEnum.PAUSED) { + // 从暂停恢复:status → BUSY + userDispatchStatusService.onOrderResumed(assigneeId, orderId); + } else { + // 直接派发(PENDING → DISPATCHED):activeOrderCount+1 + userDispatchStatusService.onOrderDispatched(assigneeId, orderId, orderType); + } + } +} diff --git a/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/dispatch/UserDispatchStatusService.java b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/dispatch/UserDispatchStatusService.java new file mode 100644 index 00000000..f5e0ab98 --- /dev/null +++ b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/dispatch/UserDispatchStatusService.java @@ -0,0 +1,143 @@ +package com.viewsh.module.ops.service.dispatch; + +import com.viewsh.module.ops.api.dispatch.UserDispatchStatusDTO; + +import java.util.List; + +/** + * 通用人员调度状态服务 + *

+ * 基于 Redis Hash 维护人员维度的调度状态,供安保、工程、客服等业务线共用。 + * 保洁使用设备维度的 {@code BadgeDeviceStatusService},与本服务并行。 + *

+ * 设计原则: + *

+ * + * @author lzh + */ +public interface UserDispatchStatusService { + + // ==================== 写操作(事件驱动调用)==================== + + /** + * 工单直接派发(PENDING → DISPATCHED) + *

+ * 效果:status=BUSY, currentOrderId 设置, activeOrderCount+1 + * + * @param userId 执行人ID + * @param orderId 工单ID + * @param orderType 工单类型(SECURITY / REPAIR / SERVICE) + */ + void onOrderDispatched(Long userId, Long orderId, String orderType); + + /** + * 工单入队(PENDING → QUEUED) + *

+ * 效果:activeOrderCount+1, waitingTaskCount+1, status=BUSY + * + * @param userId 执行人ID + * @param orderId 工单ID + * @param orderType 工单类型 + */ + void onOrderQueued(Long userId, Long orderId, String orderType); + + /** + * 排队中工单被派发(QUEUED → DISPATCHED) + *

+ * 效果:waitingTaskCount-1, currentOrderId 设置, status=BUSY + *

+ * 注意:activeOrderCount 不变(入队时已+1) + * + * @param userId 执行人ID + * @param orderId 工单ID + * @param orderType 工单类型 + */ + void onQueuedOrderDispatched(Long userId, Long orderId, String orderType); + + /** + * 工单状态推进(CONFIRMED / ARRIVED) + *

+ * 效果:更新 currentOrderStatus + * + * @param userId 执行人ID + * @param orderId 工单ID + * @param newStatus 新状态(CONFIRMED / ARRIVED) + */ + void onOrderStatusAdvanced(Long userId, Long orderId, String newStatus); + + /** + * 工单完成或取消 + *

+ * 效果:activeOrderCount-1, 若是当前工单则清除 currentOrderId, + * 若 activeOrderCount 归零则 status=IDLE + * + * @param userId 执行人ID + * @param orderId 工单ID + */ + void onOrderFinished(Long userId, Long orderId); + + /** + * 工单暂停 + *

+ * 效果:若是当前工单则 status=PAUSED + * + * @param userId 执行人ID + * @param orderId 工单ID + */ + void onOrderPaused(Long userId, Long orderId); + + /** + * 工单恢复(PAUSED → DISPATCHED) + *

+ * 效果:若是当前工单则 status=BUSY + * + * @param userId 执行人ID + * @param orderId 工单ID + */ + void onOrderResumed(Long userId, Long orderId); + + // ==================== 读操作(调度时使用)==================== + + /** + * 获取人员调度状态 + *

+ * O(1) Redis HGETALL,返回 null 表示无状态记录(视为空闲) + * + * @param userId 人员ID + * @return 调度状态,null 表示空闲 + */ + UserDispatchStatusDTO getStatus(Long userId); + + /** + * 批量获取人员调度状态 + *

+ * 使用 Redis Pipeline 一次网络往返获取多个用户状态 + * + * @param userIds 人员ID列表 + * @return 调度状态列表(与入参顺序对应,无状态的为 null) + */ + List batchGetStatus(List userIds); + + /** + * 是否空闲 + *

+ * 无 Redis 记录也视为空闲 + * + * @param userId 人员ID + * @return true=空闲 + */ + boolean isIdle(Long userId); + + /** + * 获取活跃工单数(用于负载均衡) + * + * @param userId 人员ID + * @return 活跃工单数,无记录返回 0 + */ + int getActiveOrderCount(Long userId); +} diff --git a/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/dispatch/UserDispatchStatusServiceImpl.java b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/dispatch/UserDispatchStatusServiceImpl.java new file mode 100644 index 00000000..6803a09e --- /dev/null +++ b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/dispatch/UserDispatchStatusServiceImpl.java @@ -0,0 +1,392 @@ +package com.viewsh.module.ops.service.dispatch; + +import com.viewsh.module.ops.api.dispatch.UserDispatchStatusDTO; +import jakarta.annotation.PostConstruct; +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.springframework.data.redis.core.StringRedisTemplate; +import org.springframework.data.redis.core.script.DefaultRedisScript; +import org.springframework.stereotype.Service; + +import java.util.*; + +/** + * 通用人员调度状态服务 - Redis 实现 + *

+ * Redis Key: ops:user:dispatch:{userId} + * Redis Type: Hash + * TTL: 24h(每次写入刷新) + *

+ * 所有写操作使用 Lua 脚本原子执行,保证多业务线并发安全。 + * + * @author lzh + */ +@Slf4j +@Service +public class UserDispatchStatusServiceImpl implements UserDispatchStatusService { + + private static final String KEY_PREFIX = "ops:user:dispatch:"; + private static final long TTL_SECONDS = 24 * 3600; // 24h + + @Resource + private StringRedisTemplate stringRedisTemplate; + + // ==================== Lua 脚本 ==================== + + private DefaultRedisScript dispatchScript; + private DefaultRedisScript finishScript; + private DefaultRedisScript enqueueScript; + private DefaultRedisScript queuedToDispatchedScript; + private DefaultRedisScript pauseResumeScript; + private DefaultRedisScript statusAdvancedScript; + + @PostConstruct + public void init() { + dispatchScript = buildScript(LUA_DISPATCH); + finishScript = buildScript(LUA_FINISH); + enqueueScript = buildScript(LUA_ENQUEUE); + queuedToDispatchedScript = buildScript(LUA_QUEUED_TO_DISPATCHED); + pauseResumeScript = buildScript(LUA_PAUSE_RESUME); + statusAdvancedScript = buildScript(LUA_STATUS_ADVANCED); + log.info("人员调度状态服务已初始化,Lua 脚本已预加载"); + } + + private DefaultRedisScript buildScript(String script) { + DefaultRedisScript redisScript = new DefaultRedisScript<>(); + redisScript.setScriptText(script); + redisScript.setResultType(Long.class); + return redisScript; + } + + // ==================== 写操作 ==================== + + @Override + public void onOrderDispatched(Long userId, Long orderId, String orderType) { + if (userId == null || orderId == null) { + return; + } + try { + stringRedisTemplate.execute(dispatchScript, + List.of(keyOf(userId)), + String.valueOf(orderId), orderType, "DISPATCHED", + String.valueOf(System.currentTimeMillis())); + log.debug("人员状态更新[DISPATCH]: userId={}, orderId={}, orderType={}", userId, orderId, orderType); + } catch (Exception e) { + log.error("人员状态更新[DISPATCH]失败: userId={}, orderId={}", userId, orderId, e); + } + } + + @Override + public void onOrderQueued(Long userId, Long orderId, String orderType) { + if (userId == null) { + return; + } + try { + stringRedisTemplate.execute(enqueueScript, + List.of(keyOf(userId)), + String.valueOf(System.currentTimeMillis())); + log.debug("人员状态更新[ENQUEUE]: userId={}, orderId={}", userId, orderId); + } catch (Exception e) { + log.error("人员状态更新[ENQUEUE]失败: userId={}, orderId={}", userId, orderId, e); + } + } + + @Override + public void onQueuedOrderDispatched(Long userId, Long orderId, String orderType) { + if (userId == null || orderId == null) { + return; + } + try { + stringRedisTemplate.execute(queuedToDispatchedScript, + List.of(keyOf(userId)), + String.valueOf(orderId), orderType, + String.valueOf(System.currentTimeMillis())); + log.debug("人员状态更新[QUEUED→DISPATCHED]: userId={}, orderId={}", userId, orderId); + } catch (Exception e) { + log.error("人员状态更新[QUEUED→DISPATCHED]失败: userId={}, orderId={}", userId, orderId, e); + } + } + + @Override + public void onOrderStatusAdvanced(Long userId, Long orderId, String newStatus) { + if (userId == null || orderId == null) { + return; + } + try { + stringRedisTemplate.execute(statusAdvancedScript, + List.of(keyOf(userId)), + String.valueOf(orderId), newStatus, + String.valueOf(System.currentTimeMillis())); + log.debug("人员状态更新[STATUS_ADVANCED]: userId={}, orderId={}, newStatus={}", userId, orderId, newStatus); + } catch (Exception e) { + log.error("人员状态更新[STATUS_ADVANCED]失败: userId={}, orderId={}", userId, orderId, e); + } + } + + @Override + public void onOrderFinished(Long userId, Long orderId) { + if (userId == null || orderId == null) { + return; + } + try { + stringRedisTemplate.execute(finishScript, + List.of(keyOf(userId)), + String.valueOf(orderId), + String.valueOf(System.currentTimeMillis())); + log.debug("人员状态更新[FINISH]: userId={}, orderId={}", userId, orderId); + } catch (Exception e) { + log.error("人员状态更新[FINISH]失败: userId={}, orderId={}", userId, orderId, e); + } + } + + @Override + public void onOrderPaused(Long userId, Long orderId) { + if (userId == null || orderId == null) { + return; + } + try { + stringRedisTemplate.execute(pauseResumeScript, + List.of(keyOf(userId)), + String.valueOf(orderId), "PAUSED", + String.valueOf(System.currentTimeMillis())); + log.debug("人员状态更新[PAUSED]: userId={}, orderId={}", userId, orderId); + } catch (Exception e) { + log.error("人员状态更新[PAUSED]失败: userId={}, orderId={}", userId, orderId, e); + } + } + + @Override + public void onOrderResumed(Long userId, Long orderId) { + if (userId == null || orderId == null) { + return; + } + try { + stringRedisTemplate.execute(pauseResumeScript, + List.of(keyOf(userId)), + String.valueOf(orderId), "BUSY", + String.valueOf(System.currentTimeMillis())); + log.debug("人员状态更新[RESUMED]: userId={}, orderId={}", userId, orderId); + } catch (Exception e) { + log.error("人员状态更新[RESUMED]失败: userId={}, orderId={}", userId, orderId, e); + } + } + + // ==================== 读操作 ==================== + + @Override + public UserDispatchStatusDTO getStatus(Long userId) { + if (userId == null) { + return null; + } + try { + Map map = stringRedisTemplate.opsForHash().entries(keyOf(userId)); + if (map.isEmpty()) { + return null; + } + return mapToDto(userId, map); + } catch (Exception e) { + log.error("获取人员调度状态失败: userId={}", userId, e); + return null; + } + } + + @Override + public List batchGetStatus(List userIds) { + if (userIds == null || userIds.isEmpty()) { + return Collections.emptyList(); + } + + try { + // Pipeline 批量读取 + List results = stringRedisTemplate.executePipelined( + (org.springframework.data.redis.core.RedisCallback) connection -> { + for (Long userId : userIds) { + connection.hashCommands().hGetAll(keyOf(userId).getBytes()); + } + return null; + }); + + List statuses = new ArrayList<>(userIds.size()); + for (int i = 0; i < userIds.size(); i++) { + Object result = results.get(i); + if (result instanceof Map map && !map.isEmpty()) { + @SuppressWarnings("unchecked") + Map hashMap = (Map) map; + statuses.add(mapToDto(userIds.get(i), hashMap)); + } else { + statuses.add(null); + } + } + return statuses; + } catch (Exception e) { + log.error("批量获取人员调度状态失败: userIds={}", userIds, e); + return Collections.nCopies(userIds.size(), null); + } + } + + @Override + public boolean isIdle(Long userId) { + UserDispatchStatusDTO status = getStatus(userId); + return status == null || status.isIdle(); + } + + @Override + public int getActiveOrderCount(Long userId) { + UserDispatchStatusDTO status = getStatus(userId); + return status != null ? status.getActiveOrderCountSafe() : 0; + } + + // ==================== 私有方法 ==================== + + private String keyOf(Long userId) { + return KEY_PREFIX + userId; + } + + private UserDispatchStatusDTO mapToDto(Long userId, Map map) { + return UserDispatchStatusDTO.builder() + .userId(userId) + .status(getString(map, "status")) + .currentOrderId(getLong(map, "currentOrderId")) + .currentOrderType(getString(map, "currentOrderType")) + .currentOrderStatus(getString(map, "currentOrderStatus")) + .activeOrderCount(getInteger(map, "activeOrderCount")) + .waitingTaskCount(getInteger(map, "waitingTaskCount")) + .lastUpdateTime(getLong(map, "lastUpdateTime")) + .build(); + } + + private String getString(Map map, String key) { + Object v = map.get(key); + return v != null ? v.toString() : null; + } + + private Long getLong(Map map, String key) { + Object v = map.get(key); + if (v == null) return null; + try { + return Long.parseLong(v.toString()); + } catch (NumberFormatException e) { + return null; + } + } + + private Integer getInteger(Map map, String key) { + Object v = map.get(key); + if (v == null) return null; + try { + return Integer.parseInt(v.toString()); + } catch (NumberFormatException e) { + return null; + } + } + + // ==================== Lua 脚本定义 ==================== + + /** + * DISPATCH: 工单直接派发 + * KEYS[1] = hash key + * ARGV[1] = orderId, ARGV[2] = orderType, ARGV[3] = orderStatus, ARGV[4] = timestamp + */ + private static final String LUA_DISPATCH = """ + local count = redis.call('HINCRBY', KEYS[1], 'activeOrderCount', 1) + redis.call('HSET', KEYS[1], + 'status', 'BUSY', + 'currentOrderId', ARGV[1], + 'currentOrderType', ARGV[2], + 'currentOrderStatus', ARGV[3], + 'lastUpdateTime', ARGV[4]) + redis.call('EXPIRE', KEYS[1], 86400) + return count + """; + + /** + * FINISH: 工单完成/取消 + * KEYS[1] = hash key + * ARGV[1] = orderId, ARGV[2] = timestamp + */ + private static final String LUA_FINISH = """ + local count = redis.call('HINCRBY', KEYS[1], 'activeOrderCount', -1) + if count < 0 then + count = 0 + redis.call('HSET', KEYS[1], 'activeOrderCount', '0') + end + local cur = redis.call('HGET', KEYS[1], 'currentOrderId') + if cur == ARGV[1] then + redis.call('HDEL', KEYS[1], 'currentOrderId', 'currentOrderType', 'currentOrderStatus') + end + if count == 0 then + redis.call('HSET', KEYS[1], 'status', 'IDLE') + else + redis.call('HSET', KEYS[1], 'status', 'BUSY') + end + redis.call('HSET', KEYS[1], 'lastUpdateTime', ARGV[2]) + redis.call('EXPIRE', KEYS[1], 86400) + return count + """; + + /** + * ENQUEUE: 工单入队 + * KEYS[1] = hash key + * ARGV[1] = timestamp + */ + private static final String LUA_ENQUEUE = """ + local active = redis.call('HINCRBY', KEYS[1], 'activeOrderCount', 1) + redis.call('HINCRBY', KEYS[1], 'waitingTaskCount', 1) + if active > 0 then + redis.call('HSET', KEYS[1], 'status', 'BUSY') + end + redis.call('HSET', KEYS[1], 'lastUpdateTime', ARGV[1]) + redis.call('EXPIRE', KEYS[1], 86400) + return active + """; + + /** + * QUEUED_TO_DISPATCHED: 排队中工单被派发 + * KEYS[1] = hash key + * ARGV[1] = orderId, ARGV[2] = orderType, ARGV[3] = timestamp + */ + private static final String LUA_QUEUED_TO_DISPATCHED = """ + local waiting = redis.call('HINCRBY', KEYS[1], 'waitingTaskCount', -1) + if waiting < 0 then + redis.call('HSET', KEYS[1], 'waitingTaskCount', '0') + end + redis.call('HSET', KEYS[1], + 'status', 'BUSY', + 'currentOrderId', ARGV[1], + 'currentOrderType', ARGV[2], + 'currentOrderStatus', 'DISPATCHED', + 'lastUpdateTime', ARGV[3]) + redis.call('EXPIRE', KEYS[1], 86400) + return 1 + """; + + /** + * PAUSE_RESUME: 暂停/恢复(条件更新,仅当操作的是当前工单时才改状态) + * KEYS[1] = hash key + * ARGV[1] = orderId, ARGV[2] = targetStatus (PAUSED / BUSY), ARGV[3] = timestamp + */ + private static final String LUA_PAUSE_RESUME = """ + local cur = redis.call('HGET', KEYS[1], 'currentOrderId') + if cur == ARGV[1] then + redis.call('HSET', KEYS[1], 'status', ARGV[2]) + end + redis.call('HSET', KEYS[1], 'lastUpdateTime', ARGV[3]) + redis.call('EXPIRE', KEYS[1], 86400) + return 1 + """; + + /** + * STATUS_ADVANCED: 状态推进(CONFIRMED / ARRIVED),更新 currentOrderStatus + * KEYS[1] = hash key + * ARGV[1] = orderId, ARGV[2] = newStatus, ARGV[3] = timestamp + */ + private static final String LUA_STATUS_ADVANCED = """ + local cur = redis.call('HGET', KEYS[1], 'currentOrderId') + if cur == ARGV[1] then + redis.call('HSET', KEYS[1], 'currentOrderStatus', ARGV[2]) + end + redis.call('HSET', KEYS[1], 'lastUpdateTime', ARGV[3]) + redis.call('EXPIRE', KEYS[1], 86400) + return 1 + """; +} diff --git a/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/job/IdleUserPendingOrderCheckJob.java b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/job/IdleUserPendingOrderCheckJob.java new file mode 100644 index 00000000..1e3207f6 --- /dev/null +++ b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/job/IdleUserPendingOrderCheckJob.java @@ -0,0 +1,140 @@ +package com.viewsh.module.ops.service.job; + +import cn.hutool.core.collection.CollUtil; +import cn.hutool.core.util.StrUtil; +import com.viewsh.framework.mybatis.core.query.LambdaQueryWrapperX; +import com.viewsh.framework.tenant.core.job.TenantJob; +import com.viewsh.module.ops.api.dispatch.UserDispatchStatusDTO; +import com.viewsh.module.ops.core.dispatch.DispatchEngine; +import com.viewsh.module.ops.core.dispatch.model.DispatchResult; +import com.viewsh.module.ops.dal.dataobject.workorder.OpsOrderDO; +import com.viewsh.module.ops.dal.mysql.workorder.OpsOrderMapper; +import com.viewsh.module.ops.enums.WorkOrderStatusEnum; +import com.viewsh.module.ops.service.dispatch.UserDispatchStatusService; +import com.xxl.job.core.handler.annotation.XxlJob; +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +/** + * 空闲人员待办工单检查 Job + *

+ * 职责: + * 1. 每15分钟扫描所有 QUEUED 状态的人员维度工单(排除 CLEAN) + * 2. 检查 assignee 是否处于空闲状态(Redis 无记录或 status=IDLE) + * 3. 如是,调用 autoDispatchNext 推送下一个工单 + *

+ * 场景: + * - 服务重启导致事件丢失,工单卡在 QUEUED + * - 取消/完成后 autoDispatchNext 调用失败的补偿 + * - Redis 状态丢失后的自愈兜底 + *

+ * 与保洁的 {@code IdleDevicePendingOrderCheckJob} 对等, + * 但面向人员维度(安保/工程/客服),放在 ops-biz 通用层。 + *

+ * XXL-Job 配置: + * - JobHandler: idleUserPendingOrderCheckJob + * - Cron: 0 5/15 * * * ? (每 15 分钟,偏移 5 分钟避免与设备检查重叠) + * + * @author lzh + */ +@Slf4j +@Component +public class IdleUserPendingOrderCheckJob { + + @Resource + private OpsOrderMapper opsOrderMapper; + + @Resource + private DispatchEngine dispatchEngine; + + @Resource + private UserDispatchStatusService userDispatchStatusService; + + @XxlJob("idleUserPendingOrderCheckJob") + @TenantJob + public String execute() { + try { + CheckResult result = checkAndDispatchPendingOrders(); + return StrUtil.format( + "空闲人员待办检查完成: 发现 {} 个待办人员,空闲 {} 个,成功推送 {} 个工单,耗时 {} ms", + result.pendingUserCount, result.idleUserCount, result.dispatchedCount, result.durationMs); + } catch (Exception e) { + log.error("[IdleUserPendingOrderCheckJob] 执行失败", e); + return StrUtil.format("空闲人员待办检查失败: {}", e.getMessage()); + } + } + + public CheckResult checkAndDispatchPendingOrders() { + log.info("[IdleUserPendingOrderCheckJob] 开始检查空闲人员待办工单"); + long startTime = System.currentTimeMillis(); + int pendingUserCount = 0; + int idleUserCount = 0; + int dispatchedCount = 0; + + // 1. 查询所有 QUEUED 状态的人员维度工单,提取去重的 assigneeId + List queuedOrders = opsOrderMapper.selectList(new LambdaQueryWrapperX() + .eq(OpsOrderDO::getStatus, WorkOrderStatusEnum.QUEUED.getStatus()) + .ne(OpsOrderDO::getOrderType, "CLEAN") // 排除保洁(走设备维度 Job) + .isNotNull(OpsOrderDO::getAssigneeId) + .select(OpsOrderDO::getAssigneeId)); // 只查 assigneeId + + if (CollUtil.isEmpty(queuedOrders)) { + log.info("[IdleUserPendingOrderCheckJob] 无待办工单,跳过检查"); + return new CheckResult(0, 0, 0, System.currentTimeMillis() - startTime); + } + + // 去重 assigneeId + Set assigneeIds = new HashSet<>(); + for (OpsOrderDO order : queuedOrders) { + if (order.getAssigneeId() != null) { + assigneeIds.add(order.getAssigneeId()); + } + } + pendingUserCount = assigneeIds.size(); + log.info("[IdleUserPendingOrderCheckJob] 发现 {} 个人员有待办工单(共 {} 条 QUEUED)", + pendingUserCount, queuedOrders.size()); + + // 2. 逐个检查人员是否空闲 + for (Long assigneeId : assigneeIds) { + try { + UserDispatchStatusDTO status = userDispatchStatusService.getStatus(assigneeId); + + // null(Redis 无记录)或 IDLE 视为空闲 + boolean isIdle = status == null || status.isIdle(); + if (!isIdle) { + continue; + } + + idleUserCount++; + log.info("[IdleUserPendingOrderCheckJob] 发现空闲人员有待办工单: assigneeId={}, redisStatus={}", + assigneeId, status != null ? status.getStatus() : "null(IDLE)"); + + // 3. 自动推送下一个工单 + DispatchResult result = dispatchEngine.autoDispatchNext(null, assigneeId); + if (result.isSuccess()) { + dispatchedCount++; + log.info("[IdleUserPendingOrderCheckJob] 自动推送成功: assigneeId={}", assigneeId); + } else { + log.warn("[IdleUserPendingOrderCheckJob] 自动推送失败: assigneeId={}, reason={}", + assigneeId, result.getMessage()); + } + } catch (Exception e) { + log.error("[IdleUserPendingOrderCheckJob] 检查人员待办失败: assigneeId={}", assigneeId, e); + } + } + + long duration = System.currentTimeMillis() - startTime; + log.info("[IdleUserPendingOrderCheckJob] 检查完成: 待办人员 {} 个,空闲 {} 个,成功推送 {} 个,耗时 {} ms", + pendingUserCount, idleUserCount, dispatchedCount, duration); + + return new CheckResult(pendingUserCount, idleUserCount, dispatchedCount, duration); + } + + public record CheckResult(int pendingUserCount, int idleUserCount, int dispatchedCount, long durationMs) { + } +}