diff --git a/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/dispatch/UserDispatchStatusService.java b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/dispatch/UserDispatchStatusService.java index bd3059f..30ede0f 100644 --- a/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/dispatch/UserDispatchStatusService.java +++ b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/dispatch/UserDispatchStatusService.java @@ -151,4 +151,14 @@ public interface UserDispatchStatusService { * @return 活跃工单数,无记录返回 0 */ int getActiveOrderCount(Long userId); + + /** + * 启动校准:从 DB 重建人员调度状态 + *

+ * 扫描 Redis 中所有 ops:user:dispatch:* key,校验 currentOrderId 对应的工单是否仍在进行中, + * 并根据 DB 中实际活跃工单数修正 activeOrderCount / waitingTaskCount / status。 + * + * @return 校准的用户数 + */ + int calibrateFromDb(); } diff --git a/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/dispatch/UserDispatchStatusServiceImpl.java b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/dispatch/UserDispatchStatusServiceImpl.java index 4529cfb..9546a49 100644 --- a/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/dispatch/UserDispatchStatusServiceImpl.java +++ b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/dispatch/UserDispatchStatusServiceImpl.java @@ -1,6 +1,9 @@ package com.viewsh.module.ops.service.dispatch; import com.viewsh.module.ops.api.dispatch.UserDispatchStatusDTO; +import com.viewsh.module.ops.dal.dataobject.workorder.OpsOrderDO; +import com.viewsh.module.ops.dal.mysql.workorder.OpsOrderMapper; +import com.viewsh.module.ops.enums.WorkOrderStatusEnum; import jakarta.annotation.PostConstruct; import jakarta.annotation.Resource; import lombok.extern.slf4j.Slf4j; @@ -9,6 +12,7 @@ import org.springframework.data.redis.core.script.DefaultRedisScript; import org.springframework.stereotype.Service; import java.util.*; +import java.util.stream.Collectors; /** * 通用人员调度状态服务 - Redis 实现 @@ -31,6 +35,9 @@ public class UserDispatchStatusServiceImpl implements UserDispatchStatusService @Resource private StringRedisTemplate stringRedisTemplate; + @Resource + private OpsOrderMapper opsOrderMapper; + // ==================== Lua 脚本 ==================== private DefaultRedisScript dispatchScript; @@ -297,6 +304,107 @@ public class UserDispatchStatusServiceImpl implements UserDispatchStatusService } } + // ==================== 启动校准 ==================== + + @Override + public int calibrateFromDb() { + // 1. SCAN 所有 user dispatch key + Set keys = stringRedisTemplate.keys(KEY_PREFIX + "*"); + if (keys == null || keys.isEmpty()) { + return 0; + } + + int calibrated = 0; + + for (String key : keys) { + try { + String userIdStr = key.substring(KEY_PREFIX.length()); + Long userId = Long.parseLong(userIdStr); + + // 2. 查 DB:该用户所有非终态、非保洁的活跃工单 + List activeOrders = opsOrderMapper.selectList( + new com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper() + .eq(OpsOrderDO::getAssigneeId, userId) + .ne(OpsOrderDO::getOrderType, "CLEAN") + .notIn(OpsOrderDO::getStatus, + WorkOrderStatusEnum.COMPLETED.getStatus(), + WorkOrderStatusEnum.CANCELLED.getStatus()) + ); + + if (activeOrders.isEmpty()) { + // 无活跃工单,但 Redis 中有记录 → 应该是 IDLE + Map current = stringRedisTemplate.opsForHash().entries(key); + String currentStatus = current.get("status") != null ? current.get("status").toString() : null; + if (!"IDLE".equals(currentStatus) || !"0".equals(Objects.toString(current.get("activeOrderCount"), "0"))) { + // 重置为 IDLE + stringRedisTemplate.opsForHash().putAll(key, Map.of( + "status", "IDLE", + "activeOrderCount", "0", + "waitingTaskCount", "0", + "lastUpdateTime", String.valueOf(System.currentTimeMillis()) + )); + stringRedisTemplate.opsForHash().delete(key, "currentOrderId", "currentOrderType", "currentOrderStatus"); + log.info("[校准] 人员状态重置为IDLE: userId={}, 原状态={}", userId, currentStatus); + calibrated++; + } + } else { + // 有活跃工单 → 重建计数 + int activeCount = activeOrders.size(); + int waitingCount = (int) activeOrders.stream() + .filter(o -> WorkOrderStatusEnum.QUEUED.getStatus().equals(o.getStatus())) + .count(); + + // 找当前正在执行的工单(DISPATCHED/CONFIRMED/ARRIVED 中最新的) + OpsOrderDO currentOrder = activeOrders.stream() + .filter(o -> { + String s = o.getStatus(); + return WorkOrderStatusEnum.DISPATCHED.getStatus().equals(s) + || WorkOrderStatusEnum.CONFIRMED.getStatus().equals(s) + || WorkOrderStatusEnum.ARRIVED.getStatus().equals(s); + }) + .max(Comparator.comparing(OpsOrderDO::getUpdateTime)) + .orElse(null); + + // 判断是否有暂停的工单 + boolean hasPaused = activeOrders.stream() + .anyMatch(o -> WorkOrderStatusEnum.PAUSED.getStatus().equals(o.getStatus())); + + String status = hasPaused ? "PAUSED" : "BUSY"; + + Map newHash = new HashMap<>(); + newHash.put("status", status); + newHash.put("activeOrderCount", String.valueOf(activeCount)); + newHash.put("waitingTaskCount", String.valueOf(waitingCount)); + newHash.put("lastUpdateTime", String.valueOf(System.currentTimeMillis())); + if (currentOrder != null) { + newHash.put("currentOrderId", String.valueOf(currentOrder.getId())); + newHash.put("currentOrderType", currentOrder.getOrderType()); + newHash.put("currentOrderStatus", currentOrder.getStatus()); + } + + // 比对 Redis 当前值,有差异则修正 + Map current = stringRedisTemplate.opsForHash().entries(key); + boolean needUpdate = !String.valueOf(activeCount).equals(Objects.toString(current.get("activeOrderCount"), "")) + || !String.valueOf(waitingCount).equals(Objects.toString(current.get("waitingTaskCount"), "")) + || !status.equals(Objects.toString(current.get("status"), "")); + + if (needUpdate) { + stringRedisTemplate.delete(key); + stringRedisTemplate.opsForHash().putAll(key, newHash); + stringRedisTemplate.expire(key, java.time.Duration.ofSeconds(TTL_SECONDS)); + log.info("[校准] 人员状态已修正: userId={}, status={}, active={}, waiting={}", + userId, status, activeCount, waitingCount); + calibrated++; + } + } + } catch (Exception e) { + log.warn("[校准] 人员状态校准失败: key={}", key, e); + } + } + + return calibrated; + } + // ==================== Lua 脚本定义 ==================== /** diff --git a/viewsh-module-ops/viewsh-module-ops-server/src/main/java/com/viewsh/module/ops/framework/job/dispatch/UserDispatchStatusInitializer.java b/viewsh-module-ops/viewsh-module-ops-server/src/main/java/com/viewsh/module/ops/framework/job/dispatch/UserDispatchStatusInitializer.java new file mode 100644 index 0000000..5db37d4 --- /dev/null +++ b/viewsh-module-ops/viewsh-module-ops-server/src/main/java/com/viewsh/module/ops/framework/job/dispatch/UserDispatchStatusInitializer.java @@ -0,0 +1,43 @@ +package com.viewsh.module.ops.framework.job.dispatch; + +import com.viewsh.framework.tenant.core.util.TenantUtils; +import com.viewsh.module.ops.service.dispatch.UserDispatchStatusService; +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.springframework.boot.ApplicationArguments; +import org.springframework.boot.ApplicationRunner; +import org.springframework.stereotype.Component; + +/** + * 人员调度状态 Redis 启动校准器 + *

+ * 职责:服务启动时,扫描 Redis 中所有人员调度状态 key, + * 与 DB 中的实际活跃工单比对,修正 status / activeOrderCount / waitingTaskCount。 + *

+ * 解决场景: + * - 服务重启期间工单完成/取消事件丢失,导致人员状态卡在 BUSY + * - 计数器漂移(increment/decrement 不对称) + * + * @author AI + */ +@Slf4j +@Component +public class UserDispatchStatusInitializer implements ApplicationRunner { + + @Resource + private UserDispatchStatusService userDispatchStatusService; + + @Override + public void run(ApplicationArguments args) { + log.info("[初始化] 开始校准人员调度状态..."); + + try { + TenantUtils.executeIgnore(() -> { + int calibrated = userDispatchStatusService.calibrateFromDb(); + log.info("[初始化] 人员调度状态校准完成:修正 {} 个用户", calibrated); + }); + } catch (Exception e) { + log.error("[初始化] 人员调度状态校准失败", e); + } + } +}