fix(ops): 启动时校准人员调度状态

- 为 UserDispatchStatusService 增加基于 DB 的重建能力\n- 扫描 Redis 中的人员调度 key,按实际活跃工单数修正 status、activeOrderCount、waitingTaskCount\n- 新增启动初始化器,服务启动时自动执行一次校准,缓解事件丢失导致的 BUSY 残留
This commit is contained in:
lzh
2026-03-31 22:58:09 +08:00
parent 1696aeb287
commit 306303ab16
3 changed files with 161 additions and 0 deletions

View File

@@ -151,4 +151,14 @@ public interface UserDispatchStatusService {
* @return 活跃工单数,无记录返回 0
*/
int getActiveOrderCount(Long userId);
/**
* 启动校准:从 DB 重建人员调度状态
* <p>
* 扫描 Redis 中所有 ops:user:dispatch:* key校验 currentOrderId 对应的工单是否仍在进行中,
* 并根据 DB 中实际活跃工单数修正 activeOrderCount / waitingTaskCount / status。
*
* @return 校准的用户数
*/
int calibrateFromDb();
}

View File

@@ -1,6 +1,9 @@
package com.viewsh.module.ops.service.dispatch;
import com.viewsh.module.ops.api.dispatch.UserDispatchStatusDTO;
import com.viewsh.module.ops.dal.dataobject.workorder.OpsOrderDO;
import com.viewsh.module.ops.dal.mysql.workorder.OpsOrderMapper;
import com.viewsh.module.ops.enums.WorkOrderStatusEnum;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
@@ -9,6 +12,7 @@ import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.stereotype.Service;
import java.util.*;
import java.util.stream.Collectors;
/**
* 通用人员调度状态服务 - Redis 实现
@@ -31,6 +35,9 @@ public class UserDispatchStatusServiceImpl implements UserDispatchStatusService
@Resource
private StringRedisTemplate stringRedisTemplate;
@Resource
private OpsOrderMapper opsOrderMapper;
// ==================== Lua 脚本 ====================
private DefaultRedisScript<Long> dispatchScript;
@@ -297,6 +304,107 @@ public class UserDispatchStatusServiceImpl implements UserDispatchStatusService
}
}
// ==================== 启动校准 ====================
@Override
public int calibrateFromDb() {
// 1. SCAN 所有 user dispatch key
Set<String> keys = stringRedisTemplate.keys(KEY_PREFIX + "*");
if (keys == null || keys.isEmpty()) {
return 0;
}
int calibrated = 0;
for (String key : keys) {
try {
String userIdStr = key.substring(KEY_PREFIX.length());
Long userId = Long.parseLong(userIdStr);
// 2. 查 DB该用户所有非终态、非保洁的活跃工单
List<OpsOrderDO> activeOrders = opsOrderMapper.selectList(
new com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper<OpsOrderDO>()
.eq(OpsOrderDO::getAssigneeId, userId)
.ne(OpsOrderDO::getOrderType, "CLEAN")
.notIn(OpsOrderDO::getStatus,
WorkOrderStatusEnum.COMPLETED.getStatus(),
WorkOrderStatusEnum.CANCELLED.getStatus())
);
if (activeOrders.isEmpty()) {
// 无活跃工单,但 Redis 中有记录 → 应该是 IDLE
Map<Object, Object> current = stringRedisTemplate.opsForHash().entries(key);
String currentStatus = current.get("status") != null ? current.get("status").toString() : null;
if (!"IDLE".equals(currentStatus) || !"0".equals(Objects.toString(current.get("activeOrderCount"), "0"))) {
// 重置为 IDLE
stringRedisTemplate.opsForHash().putAll(key, Map.of(
"status", "IDLE",
"activeOrderCount", "0",
"waitingTaskCount", "0",
"lastUpdateTime", String.valueOf(System.currentTimeMillis())
));
stringRedisTemplate.opsForHash().delete(key, "currentOrderId", "currentOrderType", "currentOrderStatus");
log.info("[校准] 人员状态重置为IDLE: userId={}, 原状态={}", userId, currentStatus);
calibrated++;
}
} else {
// 有活跃工单 → 重建计数
int activeCount = activeOrders.size();
int waitingCount = (int) activeOrders.stream()
.filter(o -> WorkOrderStatusEnum.QUEUED.getStatus().equals(o.getStatus()))
.count();
// 找当前正在执行的工单DISPATCHED/CONFIRMED/ARRIVED 中最新的)
OpsOrderDO currentOrder = activeOrders.stream()
.filter(o -> {
String s = o.getStatus();
return WorkOrderStatusEnum.DISPATCHED.getStatus().equals(s)
|| WorkOrderStatusEnum.CONFIRMED.getStatus().equals(s)
|| WorkOrderStatusEnum.ARRIVED.getStatus().equals(s);
})
.max(Comparator.comparing(OpsOrderDO::getUpdateTime))
.orElse(null);
// 判断是否有暂停的工单
boolean hasPaused = activeOrders.stream()
.anyMatch(o -> WorkOrderStatusEnum.PAUSED.getStatus().equals(o.getStatus()));
String status = hasPaused ? "PAUSED" : "BUSY";
Map<String, String> newHash = new HashMap<>();
newHash.put("status", status);
newHash.put("activeOrderCount", String.valueOf(activeCount));
newHash.put("waitingTaskCount", String.valueOf(waitingCount));
newHash.put("lastUpdateTime", String.valueOf(System.currentTimeMillis()));
if (currentOrder != null) {
newHash.put("currentOrderId", String.valueOf(currentOrder.getId()));
newHash.put("currentOrderType", currentOrder.getOrderType());
newHash.put("currentOrderStatus", currentOrder.getStatus());
}
// 比对 Redis 当前值,有差异则修正
Map<Object, Object> current = stringRedisTemplate.opsForHash().entries(key);
boolean needUpdate = !String.valueOf(activeCount).equals(Objects.toString(current.get("activeOrderCount"), ""))
|| !String.valueOf(waitingCount).equals(Objects.toString(current.get("waitingTaskCount"), ""))
|| !status.equals(Objects.toString(current.get("status"), ""));
if (needUpdate) {
stringRedisTemplate.delete(key);
stringRedisTemplate.opsForHash().putAll(key, newHash);
stringRedisTemplate.expire(key, java.time.Duration.ofSeconds(TTL_SECONDS));
log.info("[校准] 人员状态已修正: userId={}, status={}, active={}, waiting={}",
userId, status, activeCount, waitingCount);
calibrated++;
}
}
} catch (Exception e) {
log.warn("[校准] 人员状态校准失败: key={}", key, e);
}
}
return calibrated;
}
// ==================== Lua 脚本定义 ====================
/**

View File

@@ -0,0 +1,43 @@
package com.viewsh.module.ops.framework.job.dispatch;
import com.viewsh.framework.tenant.core.util.TenantUtils;
import com.viewsh.module.ops.service.dispatch.UserDispatchStatusService;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;
/**
* 人员调度状态 Redis 启动校准器
* <p>
* 职责:服务启动时,扫描 Redis 中所有人员调度状态 key
* 与 DB 中的实际活跃工单比对,修正 status / activeOrderCount / waitingTaskCount。
* <p>
* 解决场景:
* - 服务重启期间工单完成/取消事件丢失,导致人员状态卡在 BUSY
* - 计数器漂移increment/decrement 不对称)
*
* @author AI
*/
@Slf4j
@Component
public class UserDispatchStatusInitializer implements ApplicationRunner {
@Resource
private UserDispatchStatusService userDispatchStatusService;
@Override
public void run(ApplicationArguments args) {
log.info("[初始化] 开始校准人员调度状态...");
try {
TenantUtils.executeIgnore(() -> {
int calibrated = userDispatchStatusService.calibrateFromDb();
log.info("[初始化] 人员调度状态校准完成:修正 {} 个用户", calibrated);
});
} catch (Exception e) {
log.error("[初始化] 人员调度状态校准失败", e);
}
}
}