Merge branch 'master' into feat/multi-tenant

吸收 master 今日 9 个工单链路修复:
- autoDispatchNext/dispatch 空闲兜底 + FOR UPDATE 并发防护
- 状态转换审计闭环(AFTER_COMMIT/AFTER_ROLLBACK)
- 队列楼层权重强优先 + 三级 baseline 兜底 + N+1 优化
- 工牌 nickname 回填
- CleanOrderAutoCancelJob 超时工单自动取消
This commit is contained in:
lzh
2026-04-20 16:04:46 +08:00
24 changed files with 1899 additions and 64 deletions

View File

@@ -15,9 +15,6 @@ import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.support.TransactionTemplate;
/**
* 工牌设备状态事件监听器
@@ -87,9 +84,6 @@ public class BadgeDeviceStatusEventListener {
@Resource
private OrderLifecycleManager orderLifecycleManager;
@Resource
private PlatformTransactionManager transactionManager;
/**
* 监听工单状态变更事件,同步更新设备工单关联
* <p>
@@ -180,40 +174,27 @@ public class BadgeDeviceStatusEventListener {
/**
* 处理工单推送状态(首次设置工单关联)
* <p>
* 若 Redis 里检测到旧 orderId正常业务不应出现仅打 ERROR 告警并清理 Redis 关联。
* 此前版本会在此处"自动取消旧工单",但那是对"数据已错乱"场景的暴力兜底:
* <ul>
* <li>取消使用 REQUIRES_NEW 独立事务且吞异常,失败时新单照常落地,旧单残留,形成越清越多</li>
* <li>真正的防线应在 DispatchEngine.autoDispatchNext 入口做设备空闲校验</li>
* </ul>
* 现改为被动告警,暴露问题等待定位,避免误杀保洁员正在执行的任务。
*/
private void handleDispatched(Long deviceId, Long orderId, OpsOrderDO order) {
// 检查并清理旧工单(防止工单切换时状态残留)
BadgeDeviceStatusDTO deviceStatus = badgeDeviceStatusService.getBadgeStatus(deviceId);
if (deviceStatus != null && deviceStatus.getCurrentOpsOrderId() != null) {
Long oldOrderId = deviceStatus.getCurrentOpsOrderId();
if (!oldOrderId.equals(orderId)) {
log.warn("[BadgeDeviceStatusEventListener] 派发新工单时检测到旧工单残留: " +
"deviceId={}, oldOrderId={}, newOrderId={}", deviceId, oldOrderId, orderId);
// 检查旧工单是否仍在进行中,如果是则先取消
OpsOrderDO oldOrder = opsOrderMapper.selectById(oldOrderId);
if (oldOrder != null) {
WorkOrderStatusEnum oldStatus = WorkOrderStatusEnum.fromStatus(oldOrder.getStatus());
if (oldStatus == WorkOrderStatusEnum.DISPATCHED
|| oldStatus == WorkOrderStatusEnum.CONFIRMED
|| oldStatus == WorkOrderStatusEnum.ARRIVED) {
// 旧工单仍在进行,先取消
// 使用 REQUIRES_NEW 独立事务,避免内层异常标记外层事务 rollback-only
log.warn("[BadgeDeviceStatusEventListener] 取消残留的旧工单: oldOrderId={}", oldOrderId);
try {
TransactionTemplate txTemplate = new TransactionTemplate(transactionManager);
txTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);
txTemplate.executeWithoutResult(status -> {
orderLifecycleManager.cancelOrder(oldOrderId, deviceId,
OperatorTypeEnum.SYSTEM, "新工单派发,自动取消旧工单");
});
} catch (Exception e) {
log.error("[BadgeDeviceStatusEventListener] 取消旧工单失败: oldOrderId={}", oldOrderId, e);
}
}
}
String oldStatus = oldOrder != null ? oldOrder.getStatus() : "NOT_FOUND";
log.error("[BadgeDeviceStatusEventListener] 派发新工单时检测到旧工单残留(数据可能已错乱,需人工核查): " +
"deviceId={}, oldOrderId={}, oldStatus={}, newOrderId={}",
deviceId, oldOrderId, oldStatus, orderId);
// 确保设备状态清理(无论旧工单是否取消成功
// 清理 Redis 中对旧工单的关联(纯 Redis 操作,不触达状态机
badgeDeviceStatusService.clearCurrentOrder(deviceId);
}
}

View File

@@ -4,7 +4,9 @@ import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.StrUtil;
import com.viewsh.framework.common.pojo.CommonResult;
import com.viewsh.framework.tenant.core.job.TenantJob;
import com.viewsh.module.iot.api.device.IotDeviceQueryApi;
import com.viewsh.module.iot.api.device.IotDeviceStatusQueryApi;
import com.viewsh.module.iot.api.device.dto.IotDeviceSimpleRespDTO;
import com.viewsh.module.iot.api.device.dto.status.DeviceStatusRespDTO;
import com.viewsh.module.ops.api.badge.BadgeDeviceStatusDTO;
import com.viewsh.module.ops.dal.dataobject.area.OpsAreaDeviceRelationDO;
@@ -18,6 +20,8 @@ import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@@ -47,6 +51,9 @@ public class BadgeDeviceStatusSyncJob {
@Resource
private IotDeviceStatusQueryApi iotDeviceStatusQueryApi;
@Resource
private IotDeviceQueryApi iotDeviceQueryApi;
@Resource
private OpsAreaDeviceRelationMapper areaDeviceRelationMapper;
@@ -120,6 +127,9 @@ public class BadgeDeviceStatusSyncJob {
OpsAreaDeviceRelationDO::getAreaId,
(existing, replacement) -> existing));
// 3b. 批量查询设备 nicknameIoT 是唯一可信源),防止 Redis key 丢失后降级到 deviceCode
Map<Long, String> deviceNicknameMap = loadDeviceNicknameMap(deviceIds);
// 4. 逐一对账并修正
for (DeviceStatusRespDTO iotStatus : iotResult.getData()) {
// 4a. 工单一致性检查(修复残留的已终态工单关联)
@@ -135,7 +145,10 @@ public class BadgeDeviceStatusSyncJob {
}
// 4b. IoT 在线/离线状态对账
boolean corrected = syncSingleDevice(iotStatus, deviceAreaMap.get(iotStatus.getDeviceId()));
boolean corrected = syncSingleDevice(
iotStatus,
deviceAreaMap.get(iotStatus.getDeviceId()),
deviceNicknameMap.get(iotStatus.getDeviceId()));
syncCount++;
if (corrected) {
correctedCount++;
@@ -154,9 +167,10 @@ public class BadgeDeviceStatusSyncJob {
*
* @param iotStatus IoT 设备状态
* @param areaId 设备所属区域ID
* @param nickname 设备昵称(从 IoT 查到的权威值,允许 null
* @return 是否进行了修正
*/
private boolean syncSingleDevice(DeviceStatusRespDTO iotStatus, Long areaId) {
private boolean syncSingleDevice(DeviceStatusRespDTO iotStatus, Long areaId, String nickname) {
Long deviceId = iotStatus.getDeviceId();
try {
@@ -168,8 +182,20 @@ public class BadgeDeviceStatusSyncJob {
boolean opsOnline = opsStatus != null && opsStatus.getStatus() != null
&& opsStatus.getStatus().isActive();
// 如果状态一致,无需修正
// 如果状态一致,但 Redis 缺 nickname 而 IoT 有值,则补写一次防止派单时降级显示 deviceCode
if (iotOnline == opsOnline) {
if (iotOnline && nickname != null
&& (opsStatus == null || opsStatus.getNickname() == null)) {
badgeDeviceStatusService.updateBadgeOnlineStatus(
deviceId,
iotStatus.getDeviceCode(),
nickname,
areaId,
BadgeDeviceStatusEnum.IDLE,
"定时对账补写-昵称");
log.info("[SyncJob] 补写设备昵称deviceId={}, nickname={}", deviceId, nickname);
return true;
}
return false;
}
@@ -178,17 +204,17 @@ public class BadgeDeviceStatusSyncJob {
badgeDeviceStatusService.updateBadgeOnlineStatus(
deviceId,
iotStatus.getDeviceCode(),
null, // nickname: 对账场景不更新昵称保留Redis中已有值
nickname,
areaId,
BadgeDeviceStatusEnum.IDLE,
"定时对账修正-上线");
log.info("[SyncJob] 修正设备状态deviceId={}, IoT=ONLINE, Ops={}",
deviceId, opsOnline ? "ACTIVE" : "OFFLINE/NULL");
log.info("[SyncJob] 修正设备状态deviceId={}, IoT=ONLINE, Ops={}, nickname={}",
deviceId, opsOnline ? "ACTIVE" : "OFFLINE/NULL", nickname);
} else {
badgeDeviceStatusService.updateBadgeOnlineStatus(
deviceId,
iotStatus.getDeviceCode(),
null, // nickname: 对账场景不更新昵称保留Redis中已有值
nickname,
null,
BadgeDeviceStatusEnum.OFFLINE,
"定时对账修正-离线");
@@ -204,6 +230,35 @@ public class BadgeDeviceStatusSyncJob {
}
}
/**
* 批量从 IoT 查询设备昵称
* <p>
* Redis 中 ops:badge:device:{deviceId} 的 nickname 字段可能因 TTL/重启/缓存清理而缺失,
* 每次对账时以 IoT 为唯一可信源做回填,避免派单时降级为 deviceCode如 "43607737587")。
*/
private Map<Long, String> loadDeviceNicknameMap(List<Long> deviceIds) {
if (CollUtil.isEmpty(deviceIds)) {
return Collections.emptyMap();
}
try {
CommonResult<List<IotDeviceSimpleRespDTO>> result = iotDeviceQueryApi.batchGetDevices(deviceIds);
if (!result.isSuccess() || CollUtil.isEmpty(result.getData())) {
log.warn("[SyncJob] 查询设备昵称失败或为空: {}", result.getMsg());
return Collections.emptyMap();
}
Map<Long, String> map = new HashMap<>(result.getData().size());
for (IotDeviceSimpleRespDTO dto : result.getData()) {
if (dto.getId() != null && dto.getNickname() != null) {
map.put(dto.getId(), dto.getNickname());
}
}
return map;
} catch (Exception e) {
log.warn("[SyncJob] 批量查询设备昵称异常,本次对账跳过昵称回填", e);
return Collections.emptyMap();
}
}
/**
* 同步结果
*/

View File

@@ -0,0 +1,160 @@
package com.viewsh.module.ops.environment.job;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.StrUtil;
import com.viewsh.framework.mybatis.core.query.LambdaQueryWrapperX;
import com.viewsh.framework.tenant.core.job.TenantJob;
import com.viewsh.module.ops.core.lifecycle.OrderLifecycleManager;
import com.viewsh.module.ops.dal.dataobject.workorder.OpsOrderDO;
import com.viewsh.module.ops.dal.mysql.workorder.OpsOrderMapper;
import com.viewsh.module.ops.enums.OperatorTypeEnum;
import com.viewsh.module.ops.enums.WorkOrderStatusEnum;
import com.xxl.job.core.handler.annotation.XxlJob;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
import java.util.List;
/**
* 保洁工单超时自动取消 Job
* <p>
* 职责:
* 扫描所有保洁类order_type=CLEAN非终态工单
* 若最近一次进展update_time距今超过阈值默认 12 小时),
* 以 SYSTEM 身份走正常取消流程将其关闭。
* <p>
* 设计要点:
* 1. 时间基准使用 update_time 而非 create_time——任何状态转换/字段更新都会刷新 update_time
* 这样"按最新进展计算超时"才准确:刚被重派的 DISPATCHED 单不会因 create_time 老而被误杀。
* 2. 状态白名单 = PENDING / QUEUED / DISPATCHED / CONFIRMED / ARRIVED不含 PAUSED
* PAUSED 是 P0 打断的产物,应由 resumeInterruptedOrder 经状态机走 PAUSED → DISPATCHED
* 恢复。若此 Job 把 PAUSED 单直接 CANCELLEDP0 完成后的 resume 会在状态机检查
* "PAUSED → DISPATCHED" 时因源状态已变为 CANCELLED 而抛 IllegalStateException
* 进而破坏 P0 恢复链路。PAUSED 若真的卡死P0 也卡),交由人工审核,不自动化。
* 3. 取消调用 {@link OrderLifecycleManager#cancelOrder} 走完整责任链:
* StateTransitionHandler → QueueSyncHandler → EventPublishHandler
* → CleanOrderEventListener.onOrderStateChanged(CANCELLED) 会统一处理
* TTS 停播、设备工单关联回收、审计日志。
* 4. 单单独立事务 + try/catch 隔离,单条失败不影响批次其余工单。
* 5. 单次扫描限 batchSize 条,防止异常堆积时一次性取消过多触发事件风暴;
* 未处理完的工单留给下一轮 cron。
* 6. cancel 前再做一次乐观校验:重查 update_time 是否仍 &lt;= threshold。
* 候选装内存到实际 cancel 之间如果有用户触达(确认/到岗update_time 会被刷新;
* 此时放弃 cancel避免误杀用户刚触达的工单。
* <p>
* XXL-Job 配置建议:
* - JobHandler: cleanOrderAutoCancelJob
* - Cron: 0 17 * * * ? (每小时 :17 触发,避开整点尖峰)
*
* @author lzh
*/
@Slf4j
@Component
public class CleanOrderAutoCancelJob {
private static final String BUSINESS_TYPE_CLEAN = "CLEAN";
private static final String CANCEL_REASON = "超过12小时未处理系统自动完结";
@Resource
private OpsOrderMapper opsOrderMapper;
@Resource
private OrderLifecycleManager orderLifecycleManager;
/** 超时时长小时update_time 距今超过此值视为卡死 */
@Value("${viewsh.ops.clean.auto-cancel.timeout-hours:12}")
private int timeoutHours;
/** 单次最大扫描/取消工单数,防止事件风暴 */
@Value("${viewsh.ops.clean.auto-cancel.batch-size:200}")
private int batchSize;
@XxlJob("cleanOrderAutoCancelJob")
@TenantJob
public String execute() {
try {
CancelResult result = scanAndCancel();
return StrUtil.format(
"保洁工单超时自动取消完成: 扫描 {} 单, 成功 {}, 失败 {}, 跳过 {}, 耗时 {} ms",
result.scanned, result.succeeded, result.failed, result.skippedStale, result.durationMs);
} catch (Exception e) {
log.error("[CleanOrderAutoCancelJob] 执行失败", e);
return StrUtil.format("保洁工单超时自动取消失败: {}", e.getMessage());
}
}
public CancelResult scanAndCancel() {
long startTime = System.currentTimeMillis();
LocalDateTime threshold = LocalDateTime.now().minusHours(timeoutHours);
log.info("[CleanOrderAutoCancelJob] 开始扫描: timeoutHours={}, threshold={}, batchSize={}",
timeoutHours, threshold, batchSize);
List<OpsOrderDO> candidates = opsOrderMapper.selectList(new LambdaQueryWrapperX<OpsOrderDO>()
.eq(OpsOrderDO::getOrderType, BUSINESS_TYPE_CLEAN)
.notIn(OpsOrderDO::getStatus,
WorkOrderStatusEnum.COMPLETED.getStatus(),
WorkOrderStatusEnum.CANCELLED.getStatus(),
// PAUSED 交由 resumeInterruptedOrder 经状态机恢复,不在此 Job 自动化处理
WorkOrderStatusEnum.PAUSED.getStatus())
.le(OpsOrderDO::getUpdateTime, threshold)
.orderByAsc(OpsOrderDO::getUpdateTime)
.last("LIMIT " + batchSize));
if (CollUtil.isEmpty(candidates)) {
log.info("[CleanOrderAutoCancelJob] 无超时工单");
return new CancelResult(0, 0, 0, 0, System.currentTimeMillis() - startTime);
}
int succeeded = 0;
int failed = 0;
int skippedStale = 0;
for (OpsOrderDO order : candidates) {
Long orderId = order.getId();
try {
// 乐观校验:候选装内存→实际 cancel 之间,用户可能已触达工单刷新 update_time。
// 重查一次确认仍超时,避免把用户刚点过的工单一并 cancel 掉。
OpsOrderDO fresh = opsOrderMapper.selectById(orderId);
if (fresh == null
|| WorkOrderStatusEnum.COMPLETED.getStatus().equals(fresh.getStatus())
|| WorkOrderStatusEnum.CANCELLED.getStatus().equals(fresh.getStatus())
|| WorkOrderStatusEnum.PAUSED.getStatus().equals(fresh.getStatus())
|| fresh.getUpdateTime() == null
|| fresh.getUpdateTime().isAfter(threshold)) {
skippedStale++;
log.info("[CleanOrderAutoCancelJob] 并发触达/状态已变,跳过: orderId={}, snapshotStatus={}, latestStatus={}, latestUpdateTime={}",
orderId, order.getStatus(),
fresh != null ? fresh.getStatus() : "NOT_FOUND",
fresh != null ? fresh.getUpdateTime() : null);
continue;
}
orderLifecycleManager.cancelOrder(
orderId,
null,
OperatorTypeEnum.SYSTEM,
CANCEL_REASON);
succeeded++;
log.info("[CleanOrderAutoCancelJob] 自动取消成功: orderId={}, orderCode={}, status={}, updateTime={}",
orderId, order.getOrderCode(), order.getStatus(), order.getUpdateTime());
} catch (Exception e) {
failed++;
log.warn("[CleanOrderAutoCancelJob] 自动取消失败: orderId={}, orderCode={}, status={}, error={}",
orderId, order.getOrderCode(), order.getStatus(), e.getMessage(), e);
}
}
long duration = System.currentTimeMillis() - startTime;
log.info("[CleanOrderAutoCancelJob] 扫描完成: 扫描 {} 单, 成功 {}, 失败 {}, 跳过 {}, 耗时 {} ms",
candidates.size(), succeeded, failed, skippedStale, duration);
return new CancelResult(candidates.size(), succeeded, failed, skippedStale, duration);
}
public record CancelResult(int scanned, int succeeded, int failed, int skippedStale, long durationMs) {
}
}

View File

@@ -89,8 +89,7 @@ public class BadgeDeviceAreaAssignStrategy implements AssignStrategy {
if (selectedDevice != null) {
String reason = buildRecommendationReason(selectedDevice, context);
String assigneeName = selectedDevice.getNickname() != null
? selectedDevice.getNickname() : selectedDevice.getDeviceCode();
String assigneeName = resolveAssigneeName(selectedDevice);
return AssigneeRecommendation.of(
selectedDevice.getDeviceId(),
assigneeName,
@@ -118,8 +117,7 @@ public class BadgeDeviceAreaAssignStrategy implements AssignStrategy {
.map(device -> {
int score = calculateScore(device);
String reason = buildRecommendationReason(device, context);
String assigneeName = device.getNickname() != null
? device.getNickname() : device.getDeviceCode();
String assigneeName = resolveAssigneeName(device);
return AssigneeRecommendation.of(
device.getDeviceId(),
assigneeName,
@@ -133,6 +131,25 @@ public class BadgeDeviceAreaAssignStrategy implements AssignStrategy {
// ==================== 私有方法 ====================
/**
* 解析执行人展示名称。
* <p>
* 优先用 nicknamenickname 缺失时(例如 Redis 状态缓存被清理、IoT 侧未维护昵称),
* 返回 "工牌-尾号" 这样的可读降级文案,避免把 deviceCode/IMEI 这类长数字串直接当作人员名字暴露给调用方。
*/
private String resolveAssigneeName(BadgeDeviceStatusDTO device) {
String nickname = device.getNickname();
if (nickname != null && !nickname.isBlank()) {
return nickname;
}
String code = device.getDeviceCode();
if (code != null && !code.isBlank()) {
int len = code.length();
return "工牌-" + (len > 4 ? code.substring(len - 4) : code);
}
return device.getDeviceId() != null ? "工牌-" + device.getDeviceId() : "未知工牌";
}
/**
* 选择最佳设备
*/

View File

@@ -0,0 +1,198 @@
package com.viewsh.module.ops.environment.job;
import com.viewsh.module.ops.core.lifecycle.OrderLifecycleManager;
import com.viewsh.module.ops.dal.dataobject.workorder.OpsOrderDO;
import com.viewsh.module.ops.dal.mysql.workorder.OpsOrderMapper;
import com.viewsh.module.ops.enums.OperatorTypeEnum;
import com.viewsh.module.ops.enums.WorkOrderStatusEnum;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.springframework.test.util.ReflectionTestUtils;
import java.time.LocalDateTime;
import java.util.Collections;
import java.util.List;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
/**
* 验证 CleanOrderAutoCancelJob 的五条不变量:
* <ol>
* <li>无候选 → 返回零结果,不触发取消</li>
* <li>正常批次 → 依次 cancel成功计数正确</li>
* <li>单条失败不中断其余 → try/catch 隔离</li>
* <li>候选到 cancel 间被用户触达 → 乐观锁跳过(避免误杀)</li>
* <li>候选到 cancel 间状态变为终态/PAUSED → 跳过</li>
* </ol>
*/
@ExtendWith(MockitoExtension.class)
class CleanOrderAutoCancelJobTest {
@Mock
private OpsOrderMapper opsOrderMapper;
@Mock
private OrderLifecycleManager orderLifecycleManager;
@InjectMocks
private CleanOrderAutoCancelJob job;
@BeforeEach
void setUp() {
ReflectionTestUtils.setField(job, "timeoutHours", 12);
ReflectionTestUtils.setField(job, "batchSize", 200);
}
@Test
void scanAndCancel_whenNoCandidates_shouldReturnZeroCounts() {
when(opsOrderMapper.selectList(any(com.viewsh.framework.mybatis.core.query.LambdaQueryWrapperX.class)))
.thenReturn(Collections.emptyList());
CleanOrderAutoCancelJob.CancelResult result = job.scanAndCancel();
assertEquals(0, result.scanned());
assertEquals(0, result.succeeded());
assertEquals(0, result.failed());
assertEquals(0, result.skippedStale());
verify(orderLifecycleManager, never()).cancelOrder(anyLong(), any(), any(), any());
}
@Test
void scanAndCancel_whenAllCandidatesStillStale_shouldCancelAll() {
LocalDateTime staleTime = LocalDateTime.now().minusHours(13);
OpsOrderDO a = stale(101L, "WO-101", WorkOrderStatusEnum.DISPATCHED, staleTime);
OpsOrderDO b = stale(102L, "WO-102", WorkOrderStatusEnum.CONFIRMED, staleTime);
OpsOrderDO c = stale(103L, "WO-103", WorkOrderStatusEnum.ARRIVED, staleTime);
when(opsOrderMapper.selectList(any(com.viewsh.framework.mybatis.core.query.LambdaQueryWrapperX.class)))
.thenReturn(List.of(a, b, c));
// Fresh fetch confirms all three are still stale
when(opsOrderMapper.selectById(101L)).thenReturn(a);
when(opsOrderMapper.selectById(102L)).thenReturn(b);
when(opsOrderMapper.selectById(103L)).thenReturn(c);
CleanOrderAutoCancelJob.CancelResult result = job.scanAndCancel();
assertEquals(3, result.scanned());
assertEquals(3, result.succeeded());
assertEquals(0, result.failed());
assertEquals(0, result.skippedStale());
verify(orderLifecycleManager, times(3))
.cancelOrder(anyLong(), eq(null), eq(OperatorTypeEnum.SYSTEM), any());
}
@Test
void scanAndCancel_whenOneCancelThrows_shouldNotAbortBatch() {
LocalDateTime staleTime = LocalDateTime.now().minusHours(13);
OpsOrderDO a = stale(201L, "WO-201", WorkOrderStatusEnum.DISPATCHED, staleTime);
OpsOrderDO b = stale(202L, "WO-202", WorkOrderStatusEnum.CONFIRMED, staleTime);
OpsOrderDO c = stale(203L, "WO-203", WorkOrderStatusEnum.ARRIVED, staleTime);
when(opsOrderMapper.selectList(any(com.viewsh.framework.mybatis.core.query.LambdaQueryWrapperX.class)))
.thenReturn(List.of(a, b, c));
when(opsOrderMapper.selectById(201L)).thenReturn(a);
when(opsOrderMapper.selectById(202L)).thenReturn(b);
when(opsOrderMapper.selectById(203L)).thenReturn(c);
// 第二条取消抛异常,不应影响第一、第三条。
// 不能用 doThrow(...).when(mock).cancelOrder(eq(202L), ...)——strict stubs 会把"201L 调用和 202L 存根不匹配"判成错配。
// 改用 doAnswer 按 orderId 路由,覆盖所有 cancel 调用。
doAnswer(invocation -> {
Long orderId = invocation.getArgument(0);
if (orderId != null && orderId == 202L) {
throw new IllegalStateException("状态机非法转换");
}
return null;
}).when(orderLifecycleManager).cancelOrder(anyLong(), any(), any(), any());
CleanOrderAutoCancelJob.CancelResult result = job.scanAndCancel();
assertEquals(3, result.scanned());
assertEquals(2, result.succeeded());
assertEquals(1, result.failed());
assertEquals(0, result.skippedStale());
verify(orderLifecycleManager).cancelOrder(eq(201L), any(), any(), any());
verify(orderLifecycleManager).cancelOrder(eq(202L), any(), any(), any());
verify(orderLifecycleManager).cancelOrder(eq(203L), any(), any(), any());
}
@Test
void scanAndCancel_whenOrderTouchedBeforeCancel_shouldSkipAsStale() {
// 候选装内存时 update_time=13h ago实际 cancel 前用户刚刚点确认update_time 刷为"1 分钟前"。
// 乐观校验应跳过,避免误杀已被触达的工单。
LocalDateTime snapshotUpdate = LocalDateTime.now().minusHours(13);
LocalDateTime freshUpdate = LocalDateTime.now().minusMinutes(1);
OpsOrderDO snapshot = stale(301L, "WO-301", WorkOrderStatusEnum.DISPATCHED, snapshotUpdate);
OpsOrderDO fresh = stale(301L, "WO-301", WorkOrderStatusEnum.CONFIRMED, freshUpdate);
when(opsOrderMapper.selectList(any(com.viewsh.framework.mybatis.core.query.LambdaQueryWrapperX.class)))
.thenReturn(List.of(snapshot));
when(opsOrderMapper.selectById(301L)).thenReturn(fresh);
CleanOrderAutoCancelJob.CancelResult result = job.scanAndCancel();
assertEquals(1, result.scanned());
assertEquals(0, result.succeeded());
assertEquals(1, result.skippedStale());
verify(orderLifecycleManager, never()).cancelOrder(anyLong(), any(), any(), any());
}
@Test
void scanAndCancel_whenOrderBecameTerminal_shouldSkip() {
// 候选装内存时还是 ARRIVED实际 cancel 前已被其他路径 forceComplete 为 COMPLETED
LocalDateTime staleTime = LocalDateTime.now().minusHours(13);
OpsOrderDO snapshot = stale(401L, "WO-401", WorkOrderStatusEnum.ARRIVED, staleTime);
OpsOrderDO fresh = stale(401L, "WO-401", WorkOrderStatusEnum.COMPLETED, staleTime);
when(opsOrderMapper.selectList(any(com.viewsh.framework.mybatis.core.query.LambdaQueryWrapperX.class)))
.thenReturn(List.of(snapshot));
when(opsOrderMapper.selectById(401L)).thenReturn(fresh);
CleanOrderAutoCancelJob.CancelResult result = job.scanAndCancel();
assertEquals(1, result.skippedStale());
verify(orderLifecycleManager, never()).cancelOrder(anyLong(), any(), any(), any());
}
@Test
void scanAndCancel_whenOrderBecamePaused_shouldSkip() {
// 快照是 DISPATCHED刚被 P0 打断成 PAUSED——此 Job 应放行给 resumeInterruptedOrder
LocalDateTime staleTime = LocalDateTime.now().minusHours(13);
OpsOrderDO snapshot = stale(501L, "WO-501", WorkOrderStatusEnum.DISPATCHED, staleTime);
OpsOrderDO fresh = stale(501L, "WO-501", WorkOrderStatusEnum.PAUSED,
LocalDateTime.now().minusHours(14)); // update_time 刚刷新,但仍<=threshold状态变 PAUSED 就该跳过
when(opsOrderMapper.selectList(any(com.viewsh.framework.mybatis.core.query.LambdaQueryWrapperX.class)))
.thenReturn(List.of(snapshot));
when(opsOrderMapper.selectById(501L)).thenReturn(fresh);
CleanOrderAutoCancelJob.CancelResult result = job.scanAndCancel();
assertEquals(1, result.skippedStale());
verify(orderLifecycleManager, never()).cancelOrder(anyLong(), any(), any(), any());
}
// ==================== Helpers ====================
private OpsOrderDO stale(Long id, String code, WorkOrderStatusEnum status, LocalDateTime updateTime) {
OpsOrderDO order = OpsOrderDO.builder()
.id(id)
.orderCode(code)
.status(status.getStatus())
.orderType("CLEAN")
.build();
order.setUpdateTime(updateTime);
return order;
}
}

View File

@@ -9,6 +9,7 @@ import com.viewsh.module.ops.core.dispatch.strategy.ScheduleStrategy;
import com.viewsh.module.ops.core.lifecycle.OrderLifecycleManager;
import com.viewsh.module.ops.core.lifecycle.model.OrderTransitionRequest;
import com.viewsh.module.ops.core.lifecycle.model.OrderTransitionResult;
import com.viewsh.module.ops.core.lifecycle.model.TransitionErrorCode;
import com.viewsh.module.ops.dal.dataobject.workorder.OpsOrderDO;
import com.viewsh.module.ops.dal.mysql.workorder.OpsOrderMapper;
import com.viewsh.module.ops.enums.OperatorTypeEnum;
@@ -178,6 +179,22 @@ public class DispatchEngineImpl implements DispatchEngine {
public DispatchResult autoDispatchNext(Long completedOrderId, Long assigneeId) {
log.info("任务完成后自动派发下一单: completedOrderId={}, assigneeId={}", completedOrderId, assigneeId);
if (assigneeId == null) {
log.warn("autoDispatchNext 缺少执行人,跳过派发: completedOrderId={}", completedOrderId);
return DispatchResult.success("缺少执行人,跳过派发", null);
}
// 空闲校验若执行人仍挂着其他活跃工单DISPATCHED/CONFIRMED/ARRIVED/PAUSED
// 说明设备尚未真正空闲,不应再派发新任务——否则会触发"同一设备并行多单"的状态错乱,
// 典型场景是管理员手动取消一个僵尸 DISPATCHED 单时handleCancelled 会调到这里。
List<OpsOrderDO> activeOrders = orderMapper.selectActiveByAssignee(assigneeId, completedOrderId);
if (!activeOrders.isEmpty()) {
OpsOrderDO head = activeOrders.get(0);
log.info("执行人仍有活跃工单,跳过自动派发: assigneeId={}, completedOrderId={}, activeCount={}, sampleOrderId={}, sampleStatus={}",
assigneeId, completedOrderId, activeOrders.size(), head.getId(), head.getStatus());
return DispatchResult.success("执行人非空闲,跳过派发", assigneeId);
}
Long fallbackAreaId = null;
OpsOrderDO completedOrder = orderMapper.selectById(completedOrderId);
if (completedOrder != null) {
@@ -229,7 +246,9 @@ public class DispatchEngineImpl implements DispatchEngine {
.reason("等待队列动态重排后自动派发")
.build();
OrderTransitionResult result = orderLifecycleManager.transition(request);
// 走 dispatch() 而不是 transition()dispatch 内部会先做 FOR UPDATE 不变量检查
// Bug #2 防线),避免 autoDispatchNext 在"从队列派发"这一类入口绕过串行化。
OrderTransitionResult result = orderLifecycleManager.dispatch(request);
if (result.isSuccess()) {
return DispatchResult.success("已按队列总分派发下一单", assigneeId);
@@ -346,6 +365,23 @@ public class DispatchEngineImpl implements DispatchEngine {
Long orderId = context.getOrderId();
Long assigneeId = context.getRecommendedAssigneeId();
// 兜底校验:调度策略基于 Redis 的设备状态判空闲,可能与 MySQL 的 ops_order 实际活跃态不一致
// (例如设备 Redis 状态被某次 COMPLETED 清回 IDLE 但历史 CONFIRMED/DISPATCHED 单仍残留)。
// 若分配路径会真正推送工单给设备DIRECT_DISPATCH / PUSH_AND_ENQUEUE
// 此处再查一次 MySQL非空闲时强制降级到 ENQUEUE_ONLY避免同一设备并行多单的状态错乱。
if (assigneeId != null
&& (decision.getPath() == DispatchPath.DIRECT_DISPATCH
|| decision.getPath() == DispatchPath.PUSH_AND_ENQUEUE)) {
List<OpsOrderDO> activeOrders = orderMapper.selectActiveByAssignee(assigneeId, orderId);
if (!activeOrders.isEmpty()) {
OpsOrderDO head = activeOrders.get(0);
log.warn("调度决策为 {} 但执行人仍挂活跃工单,降级为仅入队: orderId={}, assigneeId={}, activeCount={}, sampleOrderId={}, sampleStatus={}",
decision.getPath(), orderId, assigneeId,
activeOrders.size(), head.getId(), head.getStatus());
return executeEnqueueOnly(context, assigneeId);
}
}
switch (decision.getPath()) {
case DIRECT_DISPATCH:
return executeDirectDispatch(context, assigneeId);
@@ -402,9 +438,25 @@ public class DispatchEngineImpl implements DispatchEngine {
DispatchPath.DIRECT_DISPATCH,
result.getQueueId()
);
} else {
return DispatchResult.fail("直接派单失败: " + result.getMessage());
}
// 并发冲突兜底dispatch 入口的 FOR UPDATE 判定执行人已有活跃工单,
// 此时工单仍在原状态(通常是 PENDING。如果仍是 PENDING直接降级为入队
// 避免工单悬空;若已是 QUEUED例如从队列派发被抢先则让它继续留在队列等下一轮。
if (result.getErrorCode() == TransitionErrorCode.ASSIGNEE_HAS_ACTIVE_ORDER) {
OpsOrderDO order = orderMapper.selectById(context.getOrderId());
String currentStatus = order != null ? order.getStatus() : null;
if (WorkOrderStatusEnum.QUEUED.getStatus().equals(currentStatus)) {
log.warn("直接派单被 FOR UPDATE 拒绝且工单已在队列中,保持 QUEUED 等待下一轮: orderId={}, assigneeId={}",
context.getOrderId(), assigneeId);
return DispatchResult.fail("并发冲突,已留在队列等待: " + result.getMessage());
}
log.warn("直接派单被 FOR UPDATE 拒绝,降级为入队: orderId={}, assigneeId={}, reason={}",
context.getOrderId(), assigneeId, result.getMessage());
return executeEnqueueOnly(context, assigneeId);
}
return DispatchResult.fail("直接派单失败: " + result.getMessage());
}
/**
@@ -427,8 +479,15 @@ public class DispatchEngineImpl implements DispatchEngine {
.reason("自动推送等待任务")
.build();
orderLifecycleManager.dispatch(dispatchRequest);
log.info("已推送等待任务: taskId={}", firstWaiting.getId());
OrderTransitionResult pushResult = orderLifecycleManager.dispatch(dispatchRequest);
if (pushResult.isSuccess()) {
log.info("已推送等待任务: taskId={}", firstWaiting.getId());
} else {
// 可能被 dispatch() 里的 FOR UPDATE 拒绝:此处不中断新任务入队流程,
// 但要把"推送失败"清晰落在日志里,避免 "已推送" 说谎误导运维排查。
log.warn("推送等待任务失败,继续执行新任务入队: taskId={}, orderId={}, error={}",
firstWaiting.getId(), firstWaiting.getOpsOrderId(), pushResult.getMessage());
}
}
// 新任务入队

View File

@@ -0,0 +1,85 @@
package com.viewsh.module.ops.core.event;
import com.viewsh.module.ops.core.lifecycle.model.TransitionErrorCode;
import com.viewsh.module.ops.enums.OperatorTypeEnum;
import com.viewsh.module.ops.enums.WorkOrderStatusEnum;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.time.LocalDateTime;
/**
* 工单状态转换"尝试"领域事件
* <p>
* 与 {@link OrderStateChangedEvent} 的区别:
* <ul>
* <li>{@code OrderStateChangedEvent} 仅在状态转换 <b>成功</b> 后发布EventPublishHandler
* 订阅方是业务层监听器TTS 播报、设备状态同步等)。</li>
* <li>{@code OrderTransitionAttemptedEvent} 在每一次 transition 尝试时都发布——成功、失败、
* FOR UPDATE 被拒 都发。订阅方是审计日志,用于打穿事务回滚造成的审计断链
* rollback 场景下 ops_order_event 无记录bus_log 需独立事务补齐)。</li>
* </ul>
* 事务边界:
* <ul>
* <li>发布方在主事务内 {@code publishEvent},事件会被 Spring 挂在当前事务的 synchronization 上。</li>
* <li>订阅方用 {@code @TransactionalEventListener(AFTER_COMMIT)} 或 {@code AFTER_ROLLBACK}
* 分别处理 commit 与 rollback 场景,保证两种结果都留痕。</li>
* </ul>
*
* @author lzh
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class OrderTransitionAttemptedEvent {
/** 工单ID */
private Long orderId;
/** 工单类型CLEAN / SECURITY / REPAIR / SERVICE */
private String orderType;
/** 工单编号(冗余,便于日志检索) */
private String orderCode;
/** 原状态(查询时的当前状态) */
private WorkOrderStatusEnum fromStatus;
/** 目标状态 */
private WorkOrderStatusEnum targetStatus;
/** 执行人ID */
private Long assigneeId;
/** 操作人类型 */
private OperatorTypeEnum operatorType;
/** 操作人ID */
private Long operatorId;
/** 原因/备注 */
private String reason;
/**
* 发布时的"声明结果"。
* <p>
* 注意:这是发布瞬间的判断;如果后续 handler 抛异常导致整个事务 rollback
* 监听器在 {@code AFTER_ROLLBACK} 阶段应强制将其视为失败。
*/
private boolean success;
/** 失败错误码success=false 时有值) */
private TransitionErrorCode errorCode;
/** 失败原因简要消息success=false 时有值) */
private String errorMessage;
/** 异常摘要success=false 且存在异常时有值,只保留 class + message不带堆栈 */
private String causeSummary;
/** 事件时间 */
private LocalDateTime attemptedAt;
}

View File

@@ -1,5 +1,6 @@
package com.viewsh.module.ops.core.lifecycle;
import com.viewsh.module.ops.core.event.OrderTransitionAttemptedEvent;
import com.viewsh.module.ops.core.lifecycle.handler.EventPublishHandler;
import com.viewsh.module.ops.core.lifecycle.handler.QueueSyncHandler;
import com.viewsh.module.ops.core.lifecycle.handler.StateTransitionHandler;
@@ -7,6 +8,10 @@ import com.viewsh.module.ops.core.lifecycle.handler.TransitionHandler;
import com.viewsh.module.ops.core.lifecycle.model.OrderTransitionRequest;
import com.viewsh.module.ops.core.lifecycle.model.OrderTransitionResult;
import com.viewsh.module.ops.core.lifecycle.model.TransitionContext;
import com.viewsh.module.ops.core.lifecycle.model.TransitionErrorCode;
import org.springframework.context.ApplicationEventPublisher;
import java.time.LocalDateTime;
import com.viewsh.module.ops.dal.dataobject.workorder.OpsOrderDO;
import com.viewsh.module.ops.dal.mysql.workorder.OpsOrderMapper;
import com.viewsh.module.ops.enums.OperatorTypeEnum;
@@ -62,6 +67,9 @@ public class OrderLifecycleManagerImpl implements OrderLifecycleManager {
@Resource
private EventLogRecorder eventLogRecorder;
@Resource
private ApplicationEventPublisher applicationEventPublisher;
/**
* 责任链处理器
*/
@@ -101,10 +109,15 @@ public class OrderLifecycleManagerImpl implements OrderLifecycleManager {
// 4. 检查结果
if (context.hasError()) {
log.error("状态转换失败: orderId={}, error={}", order.getId(), context.getErrorMessage());
publishAttempt(order, oldStatus, request, false,
TransitionErrorCode.INVALID_TRANSITION,
context.getErrorMessage(),
summarizeThrowable(context.getCause()));
return OrderTransitionResult.fail(order.getId(), context.getErrorMessage());
}
log.info("状态转换成功: orderId={}, {} -> {}", order.getId(), oldStatus, request.getTargetStatus());
publishAttempt(order, oldStatus, request, true, null, null, null);
return OrderTransitionResult.builder()
.success(true)
.orderId(order.getId())
@@ -142,6 +155,35 @@ public class OrderLifecycleManagerImpl implements OrderLifecycleManager {
// 设置目标状态
request.setTargetStatus(WorkOrderStatusEnum.DISPATCHED);
// 业务不变量:同一执行人在任一时刻最多只能有 1 条活跃工单
// DISPATCHED/CONFIRMED/ARRIVED。PAUSED 不纳入——P0 打断恢复走的就是
// PAUSED→DISPATCHED此处放行。对命中行加 FOR UPDATE配合 @Transactional
// 串行化并发派发;命中则本次派发被拒,由调用方决定降级策略
// DispatchEngineImpl.executeDirectDispatch 会降级为入队)。
if (request.getAssigneeId() != null) {
java.util.List<OpsOrderDO> activeOrders = opsOrderMapper.selectActiveByAssigneeForUpdate(
request.getAssigneeId(), request.getOrderId());
if (!activeOrders.isEmpty()) {
OpsOrderDO head = activeOrders.get(0);
String msg = "执行人已有活跃工单: orderId=" + head.getId() + ", status=" + head.getStatus();
log.warn("派发被拒:执行人已有活跃工单: assigneeId={}, requestOrderId={}, activeCount={}, sampleOrderId={}, sampleStatus={}",
request.getAssigneeId(), request.getOrderId(),
activeOrders.size(), head.getId(), head.getStatus());
// 审计:记录"派发被拒"尝试AFTER_COMMIT 监听器会写 bus_log
OpsOrderDO subject = opsOrderMapper.selectById(request.getOrderId());
WorkOrderStatusEnum fromStatus = subject != null
? WorkOrderStatusEnum.valueOf(subject.getStatus()) : null;
publishAttempt(subject != null ? subject : head, fromStatus, request, false,
TransitionErrorCode.ASSIGNEE_HAS_ACTIVE_ORDER, msg, null);
return OrderTransitionResult.fail(
request.getOrderId(),
msg,
TransitionErrorCode.ASSIGNEE_HAS_ACTIVE_ORDER);
}
}
// 派单时更新工单的 assigneeId从 PENDING -> DISPATCHED
if (request.getAssigneeId() != null) {
OpsOrderDO order = opsOrderMapper.selectById(request.getOrderId());
@@ -188,17 +230,22 @@ public class OrderLifecycleManagerImpl implements OrderLifecycleManager {
public void resumeOrder(Long orderId, Long operatorId) {
log.info("开始恢复工单: orderId={}, operatorId={}", orderId, operatorId);
// 构建请求
// 取出工单自身的 assigneeId 透传给 dispatch使其 FOR UPDATE 不变量检查生效——
// 否则 P0 恢复与并发派发竞争时可能再出现"同一 assignee 两条 DISPATCHED"。
// assigneeId == null 的异常态(工单已卸人)下 dispatch 会跳过该检查,行为退化为原 transition。
OpsOrderDO order = opsOrderMapper.selectById(orderId);
Long assigneeId = order != null ? order.getAssigneeId() : null;
OrderTransitionRequest request = OrderTransitionRequest.builder()
.orderId(orderId)
.targetStatus(WorkOrderStatusEnum.DISPATCHED)
.assigneeId(assigneeId)
.operatorType(OperatorTypeEnum.CLEANER)
.operatorId(operatorId)
.reason("恢复工单")
.build();
// 执行状态转换
OrderTransitionResult result = transition(request);
OrderTransitionResult result = dispatch(request);
if (!result.isSuccess()) {
throw new IllegalStateException("恢复工单失败: " + result.getMessage());
@@ -409,4 +456,49 @@ public class OrderLifecycleManagerImpl implements OrderLifecycleManager {
|| WorkOrderStatusEnum.ARRIVED == status;
}
/**
* 发布状态转换尝试事件,覆盖成功、普通失败、并发冲突三种情况。
* 订阅方 {@code OrderTransitionAuditListener} 在 AFTER_COMMIT/AFTER_ROLLBACK
* 阶段落 bus_log保证事务回滚不断链。
*/
private void publishAttempt(OpsOrderDO order, WorkOrderStatusEnum fromStatus,
OrderTransitionRequest request, boolean success,
TransitionErrorCode errorCode, String errorMessage,
String causeSummary) {
try {
OrderTransitionAttemptedEvent event = OrderTransitionAttemptedEvent.builder()
.orderId(order != null ? order.getId() : request.getOrderId())
.orderType(order != null ? order.getOrderType() : null)
.orderCode(order != null ? order.getOrderCode() : null)
.fromStatus(fromStatus)
.targetStatus(request.getTargetStatus())
.assigneeId(request.getAssigneeId())
.operatorType(request.getOperatorType())
.operatorId(request.getOperatorId())
.reason(request.getReason())
.success(success)
.errorCode(errorCode)
.errorMessage(errorMessage)
.causeSummary(causeSummary)
.attemptedAt(LocalDateTime.now())
.build();
applicationEventPublisher.publishEvent(event);
} catch (Exception e) {
// 审计事件发布失败不应影响主流程
log.error("发布转换尝试事件失败: orderId={}, targetStatus={}",
request.getOrderId(), request.getTargetStatus(), e);
}
}
/**
* 摘要异常:只保留类名 + message不带堆栈防止 bus_log 爆炸。
*/
private String summarizeThrowable(Throwable t) {
if (t == null) {
return null;
}
String msg = t.getMessage();
return t.getClass().getSimpleName() + (msg != null ? ": " + msg : "");
}
}

View File

@@ -0,0 +1,173 @@
package com.viewsh.module.ops.core.lifecycle.audit;
import com.viewsh.module.ops.core.event.OrderTransitionAttemptedEvent;
import com.viewsh.module.ops.core.lifecycle.model.TransitionErrorCode;
import com.viewsh.module.ops.infrastructure.log.enumeration.EventDomain;
import com.viewsh.module.ops.infrastructure.log.enumeration.EventLevel;
import com.viewsh.module.ops.infrastructure.log.enumeration.LogModule;
import com.viewsh.module.ops.infrastructure.log.enumeration.LogType;
import com.viewsh.module.ops.infrastructure.log.recorder.EventLogRecord;
import com.viewsh.module.ops.infrastructure.log.recorder.EventLogRecorder;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.transaction.event.TransactionPhase;
import org.springframework.transaction.event.TransactionalEventListener;
import java.util.HashMap;
import java.util.Map;
/**
* 工单状态转换尝试审计监听器。
* <p>
* 闭环设计:
* <ul>
* <li><b>AFTER_COMMIT</b>:主事务成功提交,按事件本身的 success 标志写 bus_log。</li>
* <li><b>AFTER_ROLLBACK</b>主事务已回滚——事件里的数据ops_order_event 等)全部消失。
* 此时必须新开一个独立事务写 bus_log否则审计链断裂。</li>
* </ul>
* <p>
* 字段归位:
* <ul>
* <li>{@code eventLevel}:成功=INFO失败=WARN冲突被拒或 ERROR状态机异常</li>
* <li>{@code eventDomain}:统一用 DISPATCH派发域便于运维按域聚合</li>
* <li>{@code eventType}:成功→业务 LogType如 ORDER_DISPATCHED失败→TRANSITION_FAILED
* 或 DISPATCH_REJECTED</li>
* <li>{@code eventPayload}errorCode / fromStatus / targetStatus / operatorType / reason / cause</li>
* </ul>
*
* @author lzh
*/
@Slf4j
@Component
public class OrderTransitionAuditListener {
@Resource
private EventLogRecorder eventLogRecorder;
/**
* 主事务已提交:照事件声明写一条审计日志。
* <p>
* fallbackExecution=true在无事务上下文时也执行如测试、跨线程补写场景
*/
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT, fallbackExecution = true)
public void onAfterCommit(OrderTransitionAttemptedEvent event) {
try {
eventLogRecorder.recordSync(toRecord(event, /*rolledBack=*/false));
} catch (Exception e) {
log.error("[TransitionAudit] AFTER_COMMIT 写 bus_log 失败: orderId={}, success={}, errorCode={}",
event.getOrderId(), event.isSuccess(), event.getErrorCode(), e);
}
}
/**
* 主事务已回滚:无论事件里声称 success 与否,这次"尝试"都**实际未落库**。
* 必须开独立事务写 bus_log否则日志也会因同事务回滚而丢失。
*/
@TransactionalEventListener(phase = TransactionPhase.AFTER_ROLLBACK)
public void onAfterRollback(OrderTransitionAttemptedEvent event) {
writeRollbackAudit(event);
}
/**
* 写入"事务已回滚"的审计记录。
* <p>
* 不加 @TransactionalAFTER_ROLLBACK 阶段主事务已彻底结束,当前线程无活跃事务;
* 且本方法由 onAfterRollback 自调用Spring 代理不会拦截,加注解也是死注解。
* 实际行为eventLogRecorder.recordSync 的 insert 在 auto-commit 模式下单条提交,
* 失败只丢这一行审计、不影响主业务(主业务早已回滚并报错给调用方)。
*/
public void writeRollbackAudit(OrderTransitionAttemptedEvent event) {
try {
eventLogRecorder.recordSync(toRecord(event, /*rolledBack=*/true));
} catch (Exception e) {
log.error("[TransitionAudit] AFTER_ROLLBACK 写 bus_log 失败: orderId={}, targetStatus={}",
event.getOrderId(), event.getTargetStatus(), e);
}
}
// ==================== 私有映射方法 ====================
private EventLogRecord toRecord(OrderTransitionAttemptedEvent event, boolean rolledBack) {
// rolledBack=true 时强制视为失败:即便发布时声明 success=true
// 事务 rollback 说明写入未真正生效。
boolean success = event.isSuccess() && !rolledBack;
EventLevel level = success ? EventLevel.INFO
: (event.getErrorCode() == TransitionErrorCode.ASSIGNEE_HAS_ACTIVE_ORDER
? EventLevel.WARN : EventLevel.ERROR);
String eventTypeCode = resolveEventTypeCode(event, success, rolledBack);
Map<String, Object> payload = new HashMap<>();
payload.put("fromStatus", event.getFromStatus() != null ? event.getFromStatus().getStatus() : null);
payload.put("targetStatus", event.getTargetStatus() != null ? event.getTargetStatus().getStatus() : null);
payload.put("operatorType", event.getOperatorType() != null ? event.getOperatorType().getType() : null);
payload.put("reason", event.getReason());
payload.put("success", success);
payload.put("rolledBack", rolledBack);
if (event.getErrorCode() != null) {
payload.put("errorCode", event.getErrorCode().name());
}
if (event.getErrorMessage() != null) {
payload.put("errorMessage", event.getErrorMessage());
}
if (event.getCauseSummary() != null) {
payload.put("cause", event.getCauseSummary());
}
if (event.getOrderCode() != null) {
payload.put("orderCode", event.getOrderCode());
}
String message = buildMessage(event, success, rolledBack);
return EventLogRecord.builder()
.module(LogModule.fromOrderType(event.getOrderType()))
.domain(EventDomain.DISPATCH)
.eventType(eventTypeCode)
.level(level)
.message(message)
.targetId(event.getOrderId())
.targetType("order")
.deviceId(event.getAssigneeId())
.personId(event.getOperatorId())
.payload(payload)
.eventTime(event.getAttemptedAt())
.build();
}
private String resolveEventTypeCode(OrderTransitionAttemptedEvent event, boolean success, boolean rolledBack) {
if (!success && event.getErrorCode() == TransitionErrorCode.ASSIGNEE_HAS_ACTIVE_ORDER) {
return LogType.DISPATCH_REJECTED.getCode();
}
if (!success) {
return LogType.TRANSITION_FAILED.getCode();
}
// 成功场景:按目标状态映射到业务 LogTypeops_order_event 已有时间轴,
// 这里 bus_log 仅作宽表镜像,便于运维按 domain/module 聚合查询。
if (event.getTargetStatus() == null) {
return LogType.SYSTEM_EVENT.getCode();
}
return switch (event.getTargetStatus()) {
case QUEUED -> LogType.ORDER_QUEUED.getCode();
case DISPATCHED -> LogType.ORDER_DISPATCHED.getCode();
case CONFIRMED -> LogType.ORDER_CONFIRM.getCode();
case ARRIVED -> LogType.ORDER_ARRIVED.getCode();
case PAUSED -> LogType.ORDER_PAUSED.getCode();
case COMPLETED -> LogType.ORDER_COMPLETED.getCode();
case CANCELLED -> LogType.ORDER_CANCELLED.getCode();
default -> LogType.SYSTEM_EVENT.getCode();
};
}
private String buildMessage(OrderTransitionAttemptedEvent event, boolean success, boolean rolledBack) {
String from = event.getFromStatus() != null ? event.getFromStatus().getStatus() : "?";
String to = event.getTargetStatus() != null ? event.getTargetStatus().getStatus() : "?";
if (success) {
return String.format("状态转换成功: %s -> %s", from, to);
}
String prefix = rolledBack ? "状态转换回滚" : "状态转换失败";
String detail = event.getErrorMessage() != null ? event.getErrorMessage() : "";
return String.format("%s: %s -> %s %s", prefix, from, to, detail).trim();
}
}

View File

@@ -47,6 +47,14 @@ public class OrderTransitionResult {
*/
private Long queueId;
/**
* 失败错误码(仅 success=false 时有值)
* <p>
* 调用方可据此区分需降级的失败(如 ASSIGNEE_HAS_ACTIVE_ORDER与硬失败
* 未显式设置时默认为 {@link TransitionErrorCode#OTHER}。
*/
private TransitionErrorCode errorCode;
/**
* 成功结果
*/
@@ -81,6 +89,7 @@ public class OrderTransitionResult {
return OrderTransitionResult.builder()
.success(false)
.message(message)
.errorCode(TransitionErrorCode.OTHER)
.build();
}
@@ -92,6 +101,19 @@ public class OrderTransitionResult {
.success(false)
.orderId(orderId)
.message(message)
.errorCode(TransitionErrorCode.OTHER)
.build();
}
/**
* 失败结果带工单ID 和错误码)
*/
public static OrderTransitionResult fail(Long orderId, String message, TransitionErrorCode errorCode) {
return OrderTransitionResult.builder()
.success(false)
.orderId(orderId)
.message(message)
.errorCode(errorCode)
.build();
}
}

View File

@@ -0,0 +1,35 @@
package com.viewsh.module.ops.core.lifecycle.model;
/**
* 状态转换失败的错误码
* <p>
* 用于调用方区分可恢复/需降级的失败场景(如并发冲突)与真正的硬失败(状态机非法转换等),
* 避免把"可降级"的结果误当成硬错误直接向用户暴露。
*
* @author lzh
*/
public enum TransitionErrorCode {
/**
* 执行人已有活跃工单DISPATCHED/CONFIRMED/ARRIVED不应再派发。
* <p>
* 发生在 OrderLifecycleManager.dispatch 入口的 FOR UPDATE 兜底检查命中时。
* 调用方应将工单降级到 QUEUED入队等待下一轮动态派发避免 PENDING 状态悬空。
*/
ASSIGNEE_HAS_ACTIVE_ORDER,
/**
* 状态机不允许此转换(非法的状态流转)
*/
INVALID_TRANSITION,
/**
* 工单不存在
*/
ORDER_NOT_FOUND,
/**
* 其他失败(无特定归类)
*/
OTHER;
}

View File

@@ -54,7 +54,7 @@ public interface OpsOrderQueueMapper extends BaseMapperX<OpsOrderQueueDO> {
}
/**
* 根据用户ID查询队列列表
* 根据用户ID查询队列列表(含历史 REMOVED 记录,通常用于审计/统计)
*/
default List<OpsOrderQueueDO> selectListByUserId(Long userId) {
return selectList(new LambdaQueryWrapperX<OpsOrderQueueDO>()
@@ -62,6 +62,19 @@ public interface OpsOrderQueueMapper extends BaseMapperX<OpsOrderQueueDO> {
.orderByDesc(OpsOrderQueueDO::getEnqueueTime));
}
/**
* 根据用户ID查询活跃队列列表仅 WAITING/PROCESSING/PAUSED排除 REMOVED/已终态)
* <p>
* 同步到 Redis、计算队列长度、查询当前任务等场景应走此方法避免
* 将历史 REMOVED 记录同步到 Redis 造成 ZSet / Hash 膨胀。
*/
default List<OpsOrderQueueDO> selectActiveListByUserId(Long userId) {
return selectList(new LambdaQueryWrapperX<OpsOrderQueueDO>()
.eq(OpsOrderQueueDO::getUserId, userId)
.in(OpsOrderQueueDO::getQueueStatus, "WAITING", "PROCESSING", "PAUSED")
.orderByDesc(OpsOrderQueueDO::getEnqueueTime));
}
/**
* 根据用户ID和状态查询队列列表
* 用于强制从 MySQL 读取最新数据

View File

@@ -92,6 +92,80 @@ public interface OpsOrderMapper extends BaseMapperX<OpsOrderDO> {
.last("LIMIT 1"));
}
/**
* 查询执行人名下尚未结束的工单DISPATCHED/CONFIRMED/ARRIVED/PAUSED
* <p>
* 用于 autoDispatchNext 等调度入口的空闲校验:若该执行人仍挂着活跃工单,
* 则不应再派发新任务,避免"越清越多"的级联派发。
*
* @param assigneeId 执行人ID工牌设备ID
* @param excludeOrderId 需要排除的工单ID通常是刚完成/取消触发本次调度的工单),可传 null
* @return 活跃工单列表,按创建时间升序
*/
default List<OpsOrderDO> selectActiveByAssignee(Long assigneeId, Long excludeOrderId) {
return selectList(new LambdaQueryWrapperX<OpsOrderDO>()
.eq(OpsOrderDO::getAssigneeId, assigneeId)
.in(OpsOrderDO::getStatus,
WorkOrderStatusEnum.DISPATCHED.getStatus(),
WorkOrderStatusEnum.CONFIRMED.getStatus(),
WorkOrderStatusEnum.ARRIVED.getStatus(),
WorkOrderStatusEnum.PAUSED.getStatus())
.ne(excludeOrderId != null, OpsOrderDO::getId, excludeOrderId)
.orderByAsc(OpsOrderDO::getCreateTime));
}
/**
* 查询执行人名下"正在执行"的工单并对命中行加行锁SELECT ... FOR UPDATE
* <p>
* 与 {@link #selectActiveByAssignee} 的区别:
* <ul>
* <li><b>不含 PAUSED</b>——PAUSED 代表 P0 打断后挂起的旧任务,不占用"当前时间片"
* 派发时(如 P0 结束后恢复)不应被它阻塞</li>
* <li>结果行加 FOR UPDATE 排他锁,用于 dispatch 入口做业务不变量校验:
* "同一执行人在任一时刻最多只能有 1 条活跃工单"。</li>
* </ul>
* 必须在事务中调用,否则锁无意义。
*
* @param assigneeId 执行人ID
* @param excludeOrderId 排除的工单ID通常是本次正在派发的工单本身
* @return 命中的活跃工单列表(通常空列表表示可派发)
*/
default List<OpsOrderDO> selectActiveByAssigneeForUpdate(Long assigneeId, Long excludeOrderId) {
return selectList(new LambdaQueryWrapperX<OpsOrderDO>()
.eq(OpsOrderDO::getAssigneeId, assigneeId)
.in(OpsOrderDO::getStatus,
WorkOrderStatusEnum.DISPATCHED.getStatus(),
WorkOrderStatusEnum.CONFIRMED.getStatus(),
WorkOrderStatusEnum.ARRIVED.getStatus())
.ne(excludeOrderId != null, OpsOrderDO::getId, excludeOrderId)
.orderByAsc(OpsOrderDO::getCreateTime)
.last("FOR UPDATE"));
}
/**
* 查询执行人最近一条已完成工单的区域(用于楼层基准兜底)
* <p>
* 用途:{@code OrderQueueServiceEnhanced.resolveBaselineAreaId} 的二级兜底。
* 当执行人当前没有 PROCESSING 工单时(短暂空闲),用最近完成的那一单的
* 区域作为"物理位置推断",保证楼层差评分在空闲期仍然生效。
* <p>
* 时间窗:通过 {@code since} 过滤,超过窗口仍空闲则认为轨迹失效,
* 返回 null 让调用方降级到更外层的兜底fallbackAreaId 或无楼层模式)。
*
* @param assigneeId 执行人ID
* @param since 只考虑 updateTime 晚于此时间的工单(如 now - 24h
* @return 最近一条 COMPLETED 工单的 areaId无匹配返回 null
*/
default Long selectLatestCompletedAreaIdByAssignee(Long assigneeId, LocalDateTime since) {
OpsOrderDO order = selectOne(new LambdaQueryWrapperX<OpsOrderDO>()
.eq(OpsOrderDO::getAssigneeId, assigneeId)
.eq(OpsOrderDO::getStatus, WorkOrderStatusEnum.COMPLETED.getStatus())
.ge(since != null, OpsOrderDO::getUpdateTime, since)
.orderByDesc(OpsOrderDO::getUpdateTime)
.last("LIMIT 1"));
return order != null ? order.getAreaId() : null;
}
// ==================== 统计聚合查询 ====================
/**

View File

@@ -47,7 +47,14 @@ public enum LogType {
COMPLETE_SUPPRESSED_INVALID("COMPLETE_SUPPRESSED_INVALID", "作业时长不足抑制"),
BEACON_COMPLETE_REQUESTED("BEACON_COMPLETE_REQUESTED", "信号丢失自动完成请求"),
TTS_REQUEST("TTS_REQUEST", "语音播报请求"),
ARRIVE_REJECTED("ARRIVE_REJECTED", "到岗请求被拒绝");
ARRIVE_REJECTED("ARRIVE_REJECTED", "到岗请求被拒绝"),
// ========== 状态机转换闭环审计 ==========
/** 状态转换尝试失败状态机异常、handler 抛错等) */
TRANSITION_FAILED("TRANSITION_FAILED", "状态转换失败"),
/** 派发被 FOR UPDATE 拒绝(同 assignee 已有活跃工单) */
DISPATCH_REJECTED("DISPATCH_REJECTED", "派发被拒绝");
private static final Map<String, LogType> CODE_MAP = new HashMap<>();

View File

@@ -25,6 +25,17 @@ public interface EventLogRecorder {
*/
void recordAsync(EventLogRecord record);
/**
* 记录事件日志(同步)
* <p>
* 需要确保日志真正落库的场景使用(如 AFTER_COMMIT 审计、事务回滚场景补写)。
* 调用方负责事务边界本方法内部不开启事务MyBatis 的 insert 会按当前线程的
* 事务上下文执行;若无活跃事务则自动单条提交。
*
* @param record 日志记录
*/
void recordSync(EventLogRecord record);
// ==================== 便捷方法:按级别记录 ====================
/**

View File

@@ -58,6 +58,7 @@ public class EventLogRecorderImpl implements EventLogRecorder {
* <p>
* 用于需要确认日志写入成功的场景(如测试、关键业务)
*/
@Override
public void recordSync(EventLogRecord record) {
doRecord(record);
}

View File

@@ -115,6 +115,9 @@ public class OrderQueueServiceEnhanced implements OrderQueueService {
// TODO: 触发紧急派单流程(在派单引擎中实现)
}
// 5. 事务提交后按全局楼层重排一次:新入队工单立即按楼层差参与排序,不等下一次 rebuild
triggerQueueRebuildAfterCommit(userId, null);
return queueDO.getId();
}
@@ -467,7 +470,8 @@ public class OrderQueueServiceEnhanced implements OrderQueueService {
}
// 2. Redis 未命中,从 MySQL 获取并同步到 Redis
List<OpsOrderQueueDO> mysqlList = orderQueueMapper.selectListByUserId(userId);
// 只同步活跃态WAITING/PROCESSING/PAUSED排除 REMOVED 历史记录,避免 Redis 膨胀
List<OpsOrderQueueDO> mysqlList = orderQueueMapper.selectActiveListByUserId(userId);
if (mysqlList != null && !mysqlList.isEmpty()) {
// 同步到 Redis
List<OrderQueueDTO> dtoList = convertToDTO(mysqlList);
@@ -511,10 +515,31 @@ public class OrderQueueServiceEnhanced implements OrderQueueService {
Integer baseFloorNo = resolveFloorNo(baselineAreaId);
LocalDateTime now = LocalDateTime.now();
// 批量装载 orders + areas消除 N+1100 条 WAITING 从 200 次 SELECT 降为 2 次。
List<Long> orderIds = waitingQueues.stream()
.map(OpsOrderQueueDO::getOpsOrderId)
.filter(Objects::nonNull)
.distinct()
.collect(Collectors.toList());
Map<Long, Long> orderIdToAreaId = orderIds.isEmpty()
? Collections.emptyMap()
: orderMapper.selectBatchIds(orderIds).stream()
.filter(o -> o.getAreaId() != null)
.collect(Collectors.toMap(OpsOrderDO::getId, OpsOrderDO::getAreaId,
(a, b) -> a));
List<Long> areaIds = orderIdToAreaId.values().stream().distinct().collect(Collectors.toList());
Map<Long, Integer> areaIdToFloorNo = areaIds.isEmpty()
? Collections.emptyMap()
: areaMapper.selectBatchIds(areaIds).stream()
.filter(a -> a.getFloorNo() != null)
.collect(Collectors.toMap(OpsBusAreaDO::getId, OpsBusAreaDO::getFloorNo,
(a, b) -> a));
List<OrderQueueDTO> rebuiltTasks = new ArrayList<>(waitingQueues.size());
for (OpsOrderQueueDO queueDO : waitingQueues) {
OrderQueueDTO dto = convertToDTO(queueDO);
Integer targetFloorNo = resolveFloorNo(resolveOrderAreaId(queueDO.getOpsOrderId()));
Long targetAreaId = orderIdToAreaId.get(queueDO.getOpsOrderId());
Integer targetFloorNo = targetAreaId != null ? areaIdToFloorNo.get(targetAreaId) : null;
QueueScoreResult result = queueScoreCalculator.calculate(QueueScoreContext.builder()
.priority(queueDO.getPriority())
.baseFloorNo(baseFloorNo)
@@ -739,7 +764,17 @@ public class OrderQueueServiceEnhanced implements OrderQueueService {
);
}
/**
* 解析楼层基准区域(三级兜底)
* <ol>
* <li>当前 PROCESSING 工单的区域——表示“正在做的楼层”</li>
* <li>最近 24 小时内已完成工单的区域——投射保洁员最近的物理位置</li>
* <li>调用方显式传入的 {@code fallbackAreaId}(如 autoDispatchNext 传的 completedOrder.areaId</li>
* </ol>
* 都未命中则返回 null本次排序降级为无楼层模式。
*/
private Long resolveBaselineAreaId(Long userId, Long fallbackAreaId) {
// 一级:当前正在执行的工单
OpsOrderQueueDO processingQueue = orderQueueMapper.selectCurrentExecutingByUserId(userId);
if (processingQueue != null) {
Long processingAreaId = resolveOrderAreaId(processingQueue.getOpsOrderId());
@@ -747,6 +782,13 @@ public class OrderQueueServiceEnhanced implements OrderQueueService {
return processingAreaId;
}
}
// 二级:最近 24 小时内的已完成工单,推断保洁员当前物理位置
Long recentAreaId = orderMapper.selectLatestCompletedAreaIdByAssignee(
userId, LocalDateTime.now().minusHours(24));
if (recentAreaId != null) {
return recentAreaId;
}
// 三级:调用方提示的区域(可为 null
return fallbackAreaId;
}
@@ -764,7 +806,8 @@ public class OrderQueueServiceEnhanced implements OrderQueueService {
}
private void syncUserQueueToRedis(Long userId, List<OrderQueueDTO> rebuiltWaitingTasks) {
List<OpsOrderQueueDO> queues = orderQueueMapper.selectListByUserId(userId);
// 只同步活跃态WAITING/PROCESSING/PAUSED避免把历史 REMOVED 记录回写 Redis ZSet/Hash
List<OpsOrderQueueDO> queues = orderQueueMapper.selectActiveListByUserId(userId);
if (queues == null || queues.isEmpty()) {
redisQueueService.clearQueue(userId);
return;
@@ -794,6 +837,21 @@ public class OrderQueueServiceEnhanced implements OrderQueueService {
redisQueueService.batchEnqueue(queueDTOs);
}
/**
* 在当前事务提交后触发一次等待队列重算。
* <p>
* <b>事务边界说明</b>:本方法在 afterCommit 阶段(即外层事务已提交)自调用
* {@link #rebuildWaitingTasksByUserId(Long, Long)},此时:
* <ul>
* <li>当前线程不在任何事务中(主事务刚提交完)</li>
* <li>自调用绕过 Spring 代理rebuild 方法上的 @Transactional 不生效</li>
* <li>实际运行在 auto-commit 模式:每个 updateById 独立提交</li>
* </ul>
* <b>后果</b>rebuild 中途抛异常时 MySQL 可能半更新、Redis 可能部分写入,
* 不强一致但最终一致——下一次 enqueue 会再触发一次完整 rebuild 自愈。
* 对“队列排序”这类可重放数据可以接受;若未来改为影响 MySQL 外表的写入,
* 需要把 rebuild 抽到独立 bean用代理调用走新事务。
*/
private void triggerQueueRebuildAfterCommit(Long userId, Long fallbackAreaId) {
Runnable rebuildAction = () -> {
try {

View File

@@ -9,7 +9,11 @@ import java.time.LocalDateTime;
public class QueueScoreCalculator {
static final int PRIORITY_WEIGHT = 1500;
static final int FLOOR_WEIGHT = 60;
/**
* 楼层差权重。10 层封顶 × 100 = 1000大于 aging 上限 720实现"强楼层优先"
* 等满 4 小时aging 上限)的任务也不会反超更近楼层的同优先级任务。
*/
static final int FLOOR_WEIGHT = 100;
static final int AGING_WEIGHT = 3;
static final int MAX_FLOOR_DIFF = 10;
static final int MAX_AGING_MINUTES = 240;
@@ -22,11 +26,11 @@ public class QueueScoreCalculator {
Integer targetFloorNo = context.getTargetFloorNo();
Integer floorDiff = null;
int floorDiffScore = 0;
// 语义对称:只要 baseFloor 或 targetFloor 任一缺失,就视为"信息不足"不参与楼层排序score=0
// 旧逻辑会在"有 base 无 target"时打 +600 罚分,导致同一工单在保洁员忙碌/空闲时排序不单调。
if (baseFloorNo != null && targetFloorNo != null) {
floorDiff = Math.abs(targetFloorNo - baseFloorNo);
floorDiffScore = Math.min(floorDiff, MAX_FLOOR_DIFF) * FLOOR_WEIGHT;
} else if (baseFloorNo != null) {
floorDiffScore = MAX_FLOOR_DIFF * FLOOR_WEIGHT;
}
long waitMinutes = 0;

View File

@@ -0,0 +1,170 @@
package com.viewsh.module.ops.core.dispatch;
import com.viewsh.module.ops.api.queue.OrderQueueService;
import com.viewsh.module.ops.core.dispatch.model.AssigneeRecommendation;
import com.viewsh.module.ops.core.dispatch.model.DispatchDecision;
import com.viewsh.module.ops.core.dispatch.model.DispatchPath;
import com.viewsh.module.ops.core.dispatch.model.DispatchResult;
import com.viewsh.module.ops.core.dispatch.model.OrderDispatchContext;
import com.viewsh.module.ops.core.dispatch.strategy.AssignStrategy;
import com.viewsh.module.ops.core.dispatch.strategy.ScheduleStrategy;
import com.viewsh.module.ops.core.lifecycle.OrderLifecycleManager;
import com.viewsh.module.ops.core.lifecycle.model.OrderTransitionRequest;
import com.viewsh.module.ops.core.lifecycle.model.OrderTransitionResult;
import com.viewsh.module.ops.core.lifecycle.model.TransitionErrorCode;
import com.viewsh.module.ops.dal.dataobject.workorder.OpsOrderDO;
import com.viewsh.module.ops.dal.mysql.workorder.OpsOrderMapper;
import com.viewsh.module.ops.enums.WorkOrderStatusEnum;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import java.util.List;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
/**
* 验证 Bug #2FOR UPDATE 并发冲突降级路径。
* <p>
* 背景OrderLifecycleManager.dispatch 入口加了 selectActiveByAssigneeForUpdate 行锁,
* 命中时返 {@link TransitionErrorCode#ASSIGNEE_HAS_ACTIVE_ORDER}。
* DispatchEngine 需按工单当前状态分支处理:
* <ul>
* <li>PENDING从未入队→ 直接降级为入队,避免悬空</li>
* <li>QUEUED已在队列→ 保留排队,不做重复入队</li>
* <li>其他错误码(如 INVALID_TRANSITION→ 硬失败,不走降级</li>
* </ul>
*/
@ExtendWith(MockitoExtension.class)
class DispatchEngineConflictFallbackTest {
@Mock
private OrderLifecycleManager orderLifecycleManager;
@Mock
private OrderQueueService orderQueueService;
@Mock
private OpsOrderMapper orderMapper;
@Mock
private AssignStrategy assignStrategy;
@Mock
private ScheduleStrategy scheduleStrategy;
@InjectMocks
private DispatchEngineImpl dispatchEngine;
private static final String CLEAN = "CLEAN";
private static final Long ASSIGNEE_ID = 31L;
@BeforeEach
void setUp() {
dispatchEngine.registerAssignStrategy(CLEAN, assignStrategy);
dispatchEngine.registerScheduleStrategy(CLEAN, scheduleStrategy);
}
@Test
void directDispatch_onConflict_whenOrderIsPending_shouldDowngradeToEnqueue() {
// PENDING 工单派发被拒 → 降级 executeEnqueueOnly避免工单悬空
Long orderId = 400L;
OrderDispatchContext context = baseContext(orderId);
stubHappyPathUntilDispatch(orderId);
when(orderLifecycleManager.dispatch(any(OrderTransitionRequest.class)))
.thenReturn(OrderTransitionResult.fail(orderId,
"同执行人已有活跃工单 999",
TransitionErrorCode.ASSIGNEE_HAS_ACTIVE_ORDER));
when(orderMapper.selectById(orderId)).thenReturn(OpsOrderDO.builder()
.id(orderId)
.status(WorkOrderStatusEnum.PENDING.getStatus())
.build());
when(orderLifecycleManager.enqueue(any(OrderTransitionRequest.class)))
.thenReturn(OrderTransitionResult.success(orderId,
WorkOrderStatusEnum.PENDING, WorkOrderStatusEnum.QUEUED, 6000L));
DispatchResult result = dispatchEngine.dispatch(context);
assertTrue(result.isSuccess());
assertEquals(DispatchPath.ENQUEUE_ONLY, result.getPath());
assertEquals(6000L, result.getQueueId());
verify(orderLifecycleManager).enqueue(any(OrderTransitionRequest.class));
}
@Test
void directDispatch_onConflict_whenOrderAlreadyQueued_shouldKeepInQueue() {
// QUEUED 工单(从队列中被拉出派发)再次被拒 → 不重复入队,继续留在队列等下一轮
Long orderId = 401L;
OrderDispatchContext context = baseContext(orderId);
stubHappyPathUntilDispatch(orderId);
when(orderLifecycleManager.dispatch(any(OrderTransitionRequest.class)))
.thenReturn(OrderTransitionResult.fail(orderId,
"同执行人已有活跃工单 998",
TransitionErrorCode.ASSIGNEE_HAS_ACTIVE_ORDER));
when(orderMapper.selectById(orderId)).thenReturn(OpsOrderDO.builder()
.id(orderId)
.status(WorkOrderStatusEnum.QUEUED.getStatus())
.build());
DispatchResult result = dispatchEngine.dispatch(context);
assertFalse(result.isSuccess());
assertTrue(result.getMessage().contains("已留在队列等待"),
"冲突信息应说明工单已留在队列,实际: " + result.getMessage());
// 关键断言:不能再调一次 enqueue否则队列里会出现两条记录
verify(orderLifecycleManager, never()).enqueue(any(OrderTransitionRequest.class));
}
@Test
void directDispatch_onGeneralFailure_shouldNotDowngrade() {
// 非并发冲突的失败(例如非法状态转换)不走降级路径,且不查 selectById
Long orderId = 402L;
OrderDispatchContext context = baseContext(orderId);
stubHappyPathUntilDispatch(orderId);
when(orderLifecycleManager.dispatch(any(OrderTransitionRequest.class)))
.thenReturn(OrderTransitionResult.fail(orderId,
"非法状态转换",
TransitionErrorCode.INVALID_TRANSITION));
DispatchResult result = dispatchEngine.dispatch(context);
assertFalse(result.isSuccess());
assertTrue(result.getMessage().contains("直接派单失败"),
"一般失败应归类为直接派单失败,实际: " + result.getMessage());
verify(orderLifecycleManager, never()).enqueue(any(OrderTransitionRequest.class));
// 非冲突错误不应触发工单状态复核
verify(orderMapper, never()).selectById(orderId);
}
// ==================== Helpers ====================
private OrderDispatchContext baseContext(Long orderId) {
return OrderDispatchContext.builder()
.orderId(orderId)
.orderCode("WO-TEST-" + orderId)
.businessType(CLEAN)
.areaId(501L)
.build();
}
/**
* 装配 dispatch 路径上到 orderLifecycleManager.dispatch() 之前的全部 stub
* 策略推荐成功 + 决策为 DIRECT_DISPATCH + 兜底查询 MySQL 为空闲。
* 留给测试自己控制 orderLifecycleManager.dispatch 的返回。
*/
private void stubHappyPathUntilDispatch(Long orderId) {
when(assignStrategy.recommend(any())).thenReturn(
AssigneeRecommendation.of(ASSIGNEE_ID, "工牌31", 80, "区域最近"));
when(scheduleStrategy.decide(any())).thenReturn(DispatchDecision.directDispatch());
when(orderQueueService.getTasksByUserId(ASSIGNEE_ID)).thenReturn(List.of());
when(orderMapper.selectActiveByAssignee(ASSIGNEE_ID, orderId)).thenReturn(List.of());
}
}

View File

@@ -0,0 +1,264 @@
package com.viewsh.module.ops.core.dispatch;
import com.viewsh.module.ops.api.queue.OrderQueueDTO;
import com.viewsh.module.ops.api.queue.OrderQueueService;
import com.viewsh.module.ops.core.dispatch.model.AssigneeRecommendation;
import com.viewsh.module.ops.core.dispatch.model.DispatchDecision;
import com.viewsh.module.ops.core.dispatch.model.DispatchPath;
import com.viewsh.module.ops.core.dispatch.model.DispatchResult;
import com.viewsh.module.ops.core.dispatch.model.OrderDispatchContext;
import com.viewsh.module.ops.core.dispatch.strategy.AssignStrategy;
import com.viewsh.module.ops.core.dispatch.strategy.ScheduleStrategy;
import com.viewsh.module.ops.core.lifecycle.OrderLifecycleManager;
import com.viewsh.module.ops.core.lifecycle.model.OrderTransitionRequest;
import com.viewsh.module.ops.core.lifecycle.model.OrderTransitionResult;
import com.viewsh.module.ops.dal.dataobject.workorder.OpsOrderDO;
import com.viewsh.module.ops.dal.mysql.workorder.OpsOrderMapper;
import com.viewsh.module.ops.enums.WorkOrderStatusEnum;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import java.util.List;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.when;
/**
* 验证 Bug #1autoDispatchNext 空闲兜底)+ Bug #4executeDispatch 前置检查)。
* <p>
* 产线事故:管理员 cancel 一个僵尸 DISPATCHED 单 → handleCancelled → autoDispatchNext
* 若不校验活跃态,就会在同一设备上派发新单、旧单不死,最终 0002=CONFIRMED + 0003=DISPATCHED 并存。
*/
@ExtendWith(MockitoExtension.class)
class DispatchEngineIdleCheckTest {
@Mock
private OrderLifecycleManager orderLifecycleManager;
@Mock
private OrderQueueService orderQueueService;
@Mock
private OpsOrderMapper orderMapper;
@Mock
private AssignStrategy assignStrategy;
@Mock
private ScheduleStrategy scheduleStrategy;
@InjectMocks
private DispatchEngineImpl dispatchEngine;
private static final String CLEAN = "CLEAN";
private static final Long ASSIGNEE_ID = 31L;
@BeforeEach
void setUp() {
dispatchEngine.registerAssignStrategy(CLEAN, assignStrategy);
dispatchEngine.registerScheduleStrategy(CLEAN, scheduleStrategy);
}
@Test
void autoDispatchNext_shouldSkip_whenAssigneeStillHasActiveOrder() {
// 场景completedOrderId=100 刚被 cancel但设备 31 名下还挂着 200L CONFIRMED 单
Long completedOrderId = 100L;
when(orderMapper.selectActiveByAssignee(ASSIGNEE_ID, completedOrderId))
.thenReturn(List.of(OpsOrderDO.builder()
.id(200L)
.status(WorkOrderStatusEnum.CONFIRMED.getStatus())
.build()));
DispatchResult result = dispatchEngine.autoDispatchNext(completedOrderId, ASSIGNEE_ID);
assertTrue(result.isSuccess());
assertEquals("执行人非空闲,跳过派发", result.getMessage());
assertEquals(ASSIGNEE_ID, result.getAssigneeId());
// 不应触发后续队列重排和派发
verifyNoInteractions(orderQueueService);
verifyNoInteractions(orderLifecycleManager);
}
@Test
void autoDispatchNext_shouldSkip_whenAssigneeHasPausedOrder() {
// PAUSED 也视为"仍有任务",不能派发新单(否则 PAUSED 恢复回来就和新单冲突)
Long completedOrderId = 101L;
when(orderMapper.selectActiveByAssignee(ASSIGNEE_ID, completedOrderId))
.thenReturn(List.of(OpsOrderDO.builder()
.id(201L)
.status(WorkOrderStatusEnum.PAUSED.getStatus())
.build()));
DispatchResult result = dispatchEngine.autoDispatchNext(completedOrderId, ASSIGNEE_ID);
assertTrue(result.isSuccess());
assertEquals("执行人非空闲,跳过派发", result.getMessage());
verifyNoInteractions(orderLifecycleManager);
}
@Test
void autoDispatchNext_shouldReturnEarly_whenAssigneeIdIsNull() {
// 入参校验assigneeId 空直接返回,不查活跃态
DispatchResult result = dispatchEngine.autoDispatchNext(100L, null);
assertTrue(result.isSuccess());
assertEquals("缺少执行人,跳过派发", result.getMessage());
verifyNoInteractions(orderMapper);
verifyNoInteractions(orderQueueService);
}
@Test
void executeDispatch_shouldDowngradeDirectDispatchToEnqueue_whenMysqlShowsActive() {
// Bug #4Redis 说设备空闲,但 MySQL 仍有活跃态 → 兜底把 DIRECT_DISPATCH 降级为 ENQUEUE_ONLY
Long orderId = 300L;
OrderDispatchContext context = baseContext(orderId);
when(assignStrategy.recommend(any())).thenReturn(
AssigneeRecommendation.of(ASSIGNEE_ID, "工牌31", 80, "区域最近"));
when(scheduleStrategy.decide(any())).thenReturn(DispatchDecision.directDispatch());
when(orderQueueService.getTasksByUserId(ASSIGNEE_ID)).thenReturn(List.of());
when(orderMapper.selectActiveByAssignee(ASSIGNEE_ID, orderId))
.thenReturn(List.of(OpsOrderDO.builder()
.id(999L)
.status(WorkOrderStatusEnum.ARRIVED.getStatus())
.build()));
when(orderLifecycleManager.enqueue(any(OrderTransitionRequest.class)))
.thenReturn(successEnqueue(orderId, 5000L));
DispatchResult result = dispatchEngine.dispatch(context);
assertTrue(result.isSuccess());
assertEquals(DispatchPath.ENQUEUE_ONLY, result.getPath());
assertEquals(5000L, result.getQueueId());
verify(orderLifecycleManager, never()).dispatch(any(OrderTransitionRequest.class));
verify(orderLifecycleManager).enqueue(any(OrderTransitionRequest.class));
}
@Test
void executeDispatch_shouldDowngradePushAndEnqueue_whenMysqlShowsActive() {
// PUSH_AND_ENQUEUE 路径同样要兜底:本应"推送旧队首 + 新单入队"
// 但旧队首已活跃,推送会触发 FOR UPDATE 冲突,所以直接降级为 ENQUEUE_ONLY
Long orderId = 301L;
OrderDispatchContext context = baseContext(orderId);
when(assignStrategy.recommend(any())).thenReturn(
AssigneeRecommendation.of(ASSIGNEE_ID, "工牌31", 80, "区域最近"));
when(scheduleStrategy.decide(any())).thenReturn(DispatchDecision.pushAndEnqueue());
when(orderQueueService.getTasksByUserId(ASSIGNEE_ID)).thenReturn(List.of());
when(orderMapper.selectActiveByAssignee(ASSIGNEE_ID, orderId))
.thenReturn(List.of(OpsOrderDO.builder()
.id(998L)
.status(WorkOrderStatusEnum.DISPATCHED.getStatus())
.build()));
when(orderLifecycleManager.enqueue(any(OrderTransitionRequest.class)))
.thenReturn(successEnqueue(orderId, 5001L));
DispatchResult result = dispatchEngine.dispatch(context);
assertTrue(result.isSuccess());
assertEquals(DispatchPath.ENQUEUE_ONLY, result.getPath());
verify(orderLifecycleManager, never()).dispatch(any(OrderTransitionRequest.class));
verify(orderLifecycleManager).enqueue(any(OrderTransitionRequest.class));
}
@Test
void executeDispatch_shouldNotQueryMysql_whenPathIsEnqueueOnly() {
// ENQUEUE_ONLY 本来就不推送,无需兜底查询——避免给每一次入队都叠加一次 SQL 开销
Long orderId = 302L;
OrderDispatchContext context = baseContext(orderId);
when(assignStrategy.recommend(any())).thenReturn(
AssigneeRecommendation.of(ASSIGNEE_ID, "工牌31", 80, "区域最近"));
when(scheduleStrategy.decide(any())).thenReturn(DispatchDecision.enqueueOnly());
when(orderLifecycleManager.enqueue(any(OrderTransitionRequest.class)))
.thenReturn(successEnqueue(orderId, 5002L));
DispatchResult result = dispatchEngine.dispatch(context);
assertTrue(result.isSuccess());
assertEquals(DispatchPath.ENQUEUE_ONLY, result.getPath());
// 关键ENQUEUE_ONLY 不应触发兜底查询
verify(orderMapper, never()).selectActiveByAssignee(any(), any());
}
@Test
void executeDispatch_shouldProceedDirectDispatch_whenMysqlConfirmsIdle() {
// 反向用例MySQL 也确认空闲 → 正常走 DIRECT_DISPATCH不降级
Long orderId = 303L;
OrderDispatchContext context = baseContext(orderId);
when(assignStrategy.recommend(any())).thenReturn(
AssigneeRecommendation.of(ASSIGNEE_ID, "工牌31", 80, "区域最近"));
when(scheduleStrategy.decide(any())).thenReturn(DispatchDecision.directDispatch());
when(orderQueueService.getTasksByUserId(ASSIGNEE_ID)).thenReturn(List.of());
when(orderMapper.selectActiveByAssignee(ASSIGNEE_ID, orderId)).thenReturn(List.of());
when(orderLifecycleManager.dispatch(any(OrderTransitionRequest.class)))
.thenReturn(OrderTransitionResult.success(orderId,
WorkOrderStatusEnum.PENDING, WorkOrderStatusEnum.DISPATCHED));
DispatchResult result = dispatchEngine.dispatch(context);
assertFalse(!result.isSuccess()); // 确认成功
assertEquals(DispatchPath.DIRECT_DISPATCH, result.getPath());
verify(orderLifecycleManager).dispatch(any(OrderTransitionRequest.class));
verify(orderLifecycleManager, never()).enqueue(any(OrderTransitionRequest.class));
}
// ==================== Helpers ====================
private OrderDispatchContext baseContext(Long orderId) {
return OrderDispatchContext.builder()
.orderId(orderId)
.orderCode("WO-TEST-" + orderId)
.businessType(CLEAN)
.areaId(501L)
.build();
}
private OrderTransitionResult successEnqueue(Long orderId, Long queueId) {
return OrderTransitionResult.success(orderId,
WorkOrderStatusEnum.PENDING, WorkOrderStatusEnum.QUEUED, queueId);
}
@Test
void autoDispatchNext_whenDispatchingFromQueue_shouldGoThroughDispatchNotTransition() {
// 锁死 P1 修复:从队列派发必须走 dispatch(),以继承 Bug #2 的 FOR UPDATE 串行化防线。
// 如果未来有人改回 transition()本测试会红autoDispatchNext 绕过 FOR UPDATE 的漏洞就回来了。
Long completedOrderId = 700L;
Long waitingOrderId = 701L;
Long queueId = 800L;
when(orderMapper.selectActiveByAssignee(ASSIGNEE_ID, completedOrderId)).thenReturn(List.of());
when(orderMapper.selectById(completedOrderId)).thenReturn(OpsOrderDO.builder()
.id(completedOrderId).areaId(501L).build());
OrderQueueDTO waitingDTO = new OrderQueueDTO();
waitingDTO.setId(queueId);
waitingDTO.setOpsOrderId(waitingOrderId);
waitingDTO.setQueueScore(1000.0);
waitingDTO.setFloorDiff(1);
waitingDTO.setWaitMinutes(2L);
when(orderQueueService.rebuildWaitingTasksByUserId(ASSIGNEE_ID, 501L))
.thenReturn(List.of(waitingDTO));
when(orderMapper.selectById(waitingOrderId)).thenReturn(OpsOrderDO.builder()
.id(waitingOrderId)
.status(WorkOrderStatusEnum.QUEUED.getStatus())
.build());
when(orderLifecycleManager.dispatch(any(OrderTransitionRequest.class)))
.thenReturn(OrderTransitionResult.success(waitingOrderId,
WorkOrderStatusEnum.QUEUED, WorkOrderStatusEnum.DISPATCHED));
DispatchResult result = dispatchEngine.autoDispatchNext(completedOrderId, ASSIGNEE_ID);
assertTrue(result.isSuccess());
assertEquals("已按队列总分派发下一单", result.getMessage());
// 关键断言:必须调 dispatch()(带 FOR UPDATE而不是 transition()(裸责任链)
verify(orderLifecycleManager).dispatch(any(OrderTransitionRequest.class));
verify(orderLifecycleManager, never()).transition(any(OrderTransitionRequest.class));
}
}

View File

@@ -0,0 +1,211 @@
package com.viewsh.module.ops.core.lifecycle.audit;
import com.viewsh.module.ops.core.event.OrderTransitionAttemptedEvent;
import com.viewsh.module.ops.core.lifecycle.model.TransitionErrorCode;
import com.viewsh.module.ops.enums.OperatorTypeEnum;
import com.viewsh.module.ops.enums.WorkOrderStatusEnum;
import com.viewsh.module.ops.infrastructure.log.enumeration.EventDomain;
import com.viewsh.module.ops.infrastructure.log.enumeration.EventLevel;
import com.viewsh.module.ops.infrastructure.log.enumeration.LogModule;
import com.viewsh.module.ops.infrastructure.log.enumeration.LogType;
import com.viewsh.module.ops.infrastructure.log.recorder.EventLogRecord;
import com.viewsh.module.ops.infrastructure.log.recorder.EventLogRecorder;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentCaptor;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import java.time.LocalDateTime;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.verify;
/**
* 验证 Bug #7状态转换审计闭环。
* <p>
* 三条路径:
* <ol>
* <li>AFTER_COMMIT + 成功 → INFO + 业务 LogType如 ORDER_DISPATCHED</li>
* <li>AFTER_COMMIT + 并发冲突失败 → WARN + DISPATCH_REJECTED</li>
* <li>AFTER_COMMIT + 一般失败 → ERROR + TRANSITION_FAILED</li>
* <li>AFTER_ROLLBACK → 无论事件声明 success 与否都视为失败(事务已回滚),独立事务补写</li>
* </ol>
*/
@ExtendWith(MockitoExtension.class)
class OrderTransitionAuditListenerTest {
@Mock
private EventLogRecorder eventLogRecorder;
@InjectMocks
private OrderTransitionAuditListener listener;
@Test
void onAfterCommit_success_shouldRecordInfoWithBusinessEventType() {
OrderTransitionAttemptedEvent event = successEvent(100L, WorkOrderStatusEnum.QUEUED,
WorkOrderStatusEnum.DISPATCHED);
listener.onAfterCommit(event);
EventLogRecord rec = captureRecord();
assertEquals(EventLevel.INFO, rec.getLevel());
assertEquals(EventDomain.DISPATCH, rec.getDomain());
assertEquals(LogType.ORDER_DISPATCHED.getCode(), rec.getEventType());
assertEquals(LogModule.CLEAN, rec.getModule());
assertEquals(100L, rec.getTargetId());
assertEquals("order", rec.getTargetType());
assertEquals(Boolean.TRUE, rec.getPayload().get("success"));
assertEquals(Boolean.FALSE, rec.getPayload().get("rolledBack"));
assertEquals(WorkOrderStatusEnum.QUEUED.getStatus(), rec.getPayload().get("fromStatus"));
assertEquals(WorkOrderStatusEnum.DISPATCHED.getStatus(), rec.getPayload().get("targetStatus"));
}
@Test
void onAfterCommit_success_shouldMapAllBusinessStatusesToLogType() {
// 验证关键目标状态都能映射到对应 LogType避免回归导致全映射到 SYSTEM_EVENT
assertLogTypeForTarget(WorkOrderStatusEnum.QUEUED, LogType.ORDER_QUEUED);
assertLogTypeForTarget(WorkOrderStatusEnum.DISPATCHED, LogType.ORDER_DISPATCHED);
assertLogTypeForTarget(WorkOrderStatusEnum.CONFIRMED, LogType.ORDER_CONFIRM);
assertLogTypeForTarget(WorkOrderStatusEnum.ARRIVED, LogType.ORDER_ARRIVED);
assertLogTypeForTarget(WorkOrderStatusEnum.PAUSED, LogType.ORDER_PAUSED);
assertLogTypeForTarget(WorkOrderStatusEnum.COMPLETED, LogType.ORDER_COMPLETED);
assertLogTypeForTarget(WorkOrderStatusEnum.CANCELLED, LogType.ORDER_CANCELLED);
}
@Test
void onAfterCommit_forUpdateRejected_shouldRecordWarnWithDispatchRejected() {
OrderTransitionAttemptedEvent event = OrderTransitionAttemptedEvent.builder()
.orderId(200L)
.orderType("CLEAN")
.fromStatus(WorkOrderStatusEnum.PENDING)
.targetStatus(WorkOrderStatusEnum.DISPATCHED)
.assigneeId(31L)
.operatorType(OperatorTypeEnum.SYSTEM)
.success(false)
.errorCode(TransitionErrorCode.ASSIGNEE_HAS_ACTIVE_ORDER)
.errorMessage("同执行人已有活跃工单 999")
.attemptedAt(LocalDateTime.now())
.build();
listener.onAfterCommit(event);
EventLogRecord rec = captureRecord();
// 并发冲突只是业务层拒绝,不是系统异常,所以是 WARN 而不是 ERROR
assertEquals(EventLevel.WARN, rec.getLevel());
assertEquals(LogType.DISPATCH_REJECTED.getCode(), rec.getEventType());
assertEquals("ASSIGNEE_HAS_ACTIVE_ORDER", rec.getPayload().get("errorCode"));
assertEquals("同执行人已有活跃工单 999", rec.getPayload().get("errorMessage"));
assertTrue(rec.getMessage().contains("状态转换失败"),
"消息应标明转换失败,实际: " + rec.getMessage());
}
@Test
void onAfterCommit_generalFailure_shouldRecordErrorWithTransitionFailed() {
OrderTransitionAttemptedEvent event = OrderTransitionAttemptedEvent.builder()
.orderId(300L)
.orderType("SECURITY")
.fromStatus(WorkOrderStatusEnum.PENDING)
.targetStatus(WorkOrderStatusEnum.DISPATCHED)
.success(false)
.errorCode(TransitionErrorCode.INVALID_TRANSITION)
.errorMessage("PENDING → ARRIVED 非法")
.causeSummary("IllegalStateException: PENDING → ARRIVED 非法")
.attemptedAt(LocalDateTime.now())
.build();
listener.onAfterCommit(event);
EventLogRecord rec = captureRecord();
assertEquals(EventLevel.ERROR, rec.getLevel());
assertEquals(LogType.TRANSITION_FAILED.getCode(), rec.getEventType());
assertEquals(LogModule.SECURITY, rec.getModule());
assertEquals("INVALID_TRANSITION", rec.getPayload().get("errorCode"));
assertEquals("IllegalStateException: PENDING → ARRIVED 非法",
rec.getPayload().get("cause"));
}
@Test
void writeRollbackAudit_evenIfEventClaimsSuccess_shouldForceFailure() {
// 即便发布时声明 success=true事务 rollback 就是没真正落库,必须按失败记录
OrderTransitionAttemptedEvent event = OrderTransitionAttemptedEvent.builder()
.orderId(400L)
.orderType("CLEAN")
.fromStatus(WorkOrderStatusEnum.QUEUED)
.targetStatus(WorkOrderStatusEnum.DISPATCHED)
.success(true) // 发布时乐观声明
.attemptedAt(LocalDateTime.now())
.build();
listener.writeRollbackAudit(event);
EventLogRecord rec = captureRecord();
assertEquals(EventLevel.ERROR, rec.getLevel());
assertEquals(LogType.TRANSITION_FAILED.getCode(), rec.getEventType());
assertEquals(Boolean.TRUE, rec.getPayload().get("rolledBack"));
assertEquals(Boolean.FALSE, rec.getPayload().get("success"));
assertTrue(rec.getMessage().contains("状态转换回滚"),
"回滚消息应明确标注,实际: " + rec.getMessage());
}
@Test
void writeRollbackAudit_withForUpdateRejected_stillMapsToDispatchRejected() {
// 回滚 + 冲突错误码 → 依然归类为 DISPATCH_REJECTED而不是 TRANSITION_FAILED
OrderTransitionAttemptedEvent event = OrderTransitionAttemptedEvent.builder()
.orderId(500L)
.orderType("CLEAN")
.fromStatus(WorkOrderStatusEnum.PENDING)
.targetStatus(WorkOrderStatusEnum.DISPATCHED)
.success(false)
.errorCode(TransitionErrorCode.ASSIGNEE_HAS_ACTIVE_ORDER)
.attemptedAt(LocalDateTime.now())
.build();
listener.writeRollbackAudit(event);
EventLogRecord rec = captureRecord();
// 冲突型错误即便回滚也应是 WARN + DISPATCH_REJECTED方便运维过滤
assertEquals(EventLevel.WARN, rec.getLevel());
assertEquals(LogType.DISPATCH_REJECTED.getCode(), rec.getEventType());
assertFalse((Boolean) rec.getPayload().get("success"));
}
// ==================== Helpers ====================
private OrderTransitionAttemptedEvent successEvent(Long orderId,
WorkOrderStatusEnum from,
WorkOrderStatusEnum to) {
return OrderTransitionAttemptedEvent.builder()
.orderId(orderId)
.orderType("CLEAN")
.orderCode("WO-" + orderId)
.fromStatus(from)
.targetStatus(to)
.assigneeId(31L)
.operatorType(OperatorTypeEnum.SYSTEM)
.operatorId(31L)
.reason("test")
.success(true)
.attemptedAt(LocalDateTime.now())
.build();
}
private void assertLogTypeForTarget(WorkOrderStatusEnum target, LogType expected) {
org.mockito.Mockito.reset(eventLogRecorder);
listener.onAfterCommit(successEvent(1000L + target.ordinal(),
WorkOrderStatusEnum.PENDING, target));
EventLogRecord rec = captureRecord();
assertEquals(expected.getCode(), rec.getEventType(),
"target=" + target + " 应映射到 " + expected);
}
private EventLogRecord captureRecord() {
ArgumentCaptor<EventLogRecord> captor = ArgumentCaptor.forClass(EventLogRecord.class);
verify(eventLogRecorder).recordSync(captor.capture());
return captor.getValue();
}
}

View File

@@ -77,13 +77,21 @@ class OrderQueueServiceEnhancedTest {
when(orderQueueMapper.selectListByUserIdAndStatus(userId, OrderQueueStatusEnum.WAITING.getStatus()))
.thenReturn(List.of(olderFarTask, newerNearTask));
when(orderQueueMapper.selectCurrentExecutingByUserId(userId)).thenReturn(currentTask);
when(orderQueueMapper.selectListByUserId(userId)).thenReturn(List.of(olderFarTask, newerNearTask, currentTask));
// syncUserQueueToRedis 走 selectActiveListByUserIdBug#6只返回活跃态
when(orderQueueMapper.selectActiveListByUserId(userId))
.thenReturn(List.of(olderFarTask, newerNearTask, currentTask));
// resolveOrderAreaId 仍单条 selectByIdPROCESSING 工单的 area
when(orderMapper.selectById(900L)).thenReturn(OpsOrderDO.builder().id(900L).areaId(501L).build());
when(orderMapper.selectById(101L)).thenReturn(OpsOrderDO.builder().id(101L).areaId(503L).build());
when(orderMapper.selectById(102L)).thenReturn(OpsOrderDO.builder().id(102L).areaId(502L).build());
// WAITING 工单批量加载
when(orderMapper.selectBatchIds(org.mockito.ArgumentMatchers.anyCollection()))
.thenReturn(List.of(
OpsOrderDO.builder().id(101L).areaId(503L).build(),
OpsOrderDO.builder().id(102L).areaId(502L).build()));
when(areaMapper.selectById(501L)).thenReturn(OpsBusAreaDO.builder().id(501L).floorNo(5).build());
when(areaMapper.selectById(502L)).thenReturn(OpsBusAreaDO.builder().id(502L).floorNo(6).build());
when(areaMapper.selectById(503L)).thenReturn(OpsBusAreaDO.builder().id(503L).floorNo(8).build());
when(areaMapper.selectBatchIds(org.mockito.ArgumentMatchers.anyCollection()))
.thenReturn(List.of(
OpsBusAreaDO.builder().id(503L).floorNo(8).build(),
OpsBusAreaDO.builder().id(502L).floorNo(6).build()));
when(orderQueueMapper.updateById(any(OpsOrderQueueDO.class))).thenReturn(1);
List<OrderQueueDTO> rebuiltTasks = orderQueueService.rebuildWaitingTasksByUserId(userId, null);

View File

@@ -0,0 +1,131 @@
package com.viewsh.module.ops.service.queue;
import org.junit.jupiter.api.Test;
import java.time.LocalDateTime;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
/**
* 楼层权重修复的补充测试commit a5f916c
* <p>
* 与 {@link QueueScoreCalculatorTest}(基础行为)互补,覆盖这次改动的三个关键不变量:
* <ol>
* <li><b>G 强楼层优先</b>FLOOR_WEIGHT=10010 层封顶 1000 &gt; aging 封顶 720
* 保证等满 4 小时的任务也不会反超近楼层任务</li>
* <li><b>B 语义对称</b>base 或 target 任一缺失 → floorScore=0不再有"有 base 无 target → +600"罚分</li>
* <li><b>floor 封顶</b>:楼层差超过 MAX_FLOOR_DIFF=10 时按 10 计算</li>
* </ol>
*/
class QueueScoreCalculatorEnhancedTest {
private final QueueScoreCalculator calculator = new QueueScoreCalculator();
@Test
void strongFloorPriority_farLongWaitedTaskShouldNotOvertakeNearJustInTask() {
// G: 同 P1 优先级下,"远楼层+等满 4 小时" 不应反超 "近楼层+刚入队"。
// 近刚入队: priority=1500, floor=0, aging=0 → 1500
// 远等满: priority=1500, floor=10*100=1000, aging=720 → 1780
// near.score (1500) < far.score (1780) → near 先派发,符合"强楼层优先"
LocalDateTime now = LocalDateTime.of(2026, 4, 20, 12, 0);
QueueScoreResult nearJustIn = calculator.calculate(QueueScoreContext.builder()
.priority(1)
.baseFloorNo(3).targetFloorNo(3)
.enqueueTime(now).now(now)
.build());
QueueScoreResult farLongWaited = calculator.calculate(QueueScoreContext.builder()
.priority(1)
.baseFloorNo(3).targetFloorNo(13) // diff=10封顶
.enqueueTime(now.minusHours(4)) // aging 封顶 240 min
.now(now)
.build());
assertTrue(nearJustIn.getTotalScore() < farLongWaited.getTotalScore(),
"near=" + nearJustIn.getTotalScore() + " far=" + farLongWaited.getTotalScore()
+ ":远楼层即便等满也不应反超近楼层");
}
@Test
void symmetricNullHandling_baseFloorMissing_shouldGiveZeroFloorScore() {
// B: baseFloor=null执行人位置未知不应被动扣分
LocalDateTime now = LocalDateTime.of(2026, 4, 20, 12, 0);
QueueScoreResult result = calculator.calculate(QueueScoreContext.builder()
.priority(1)
.baseFloorNo(null).targetFloorNo(5)
.enqueueTime(now).now(now)
.build());
// priorityScore=1500, floorScore=0, aging=0 → 1500
assertEquals(1500.0, result.getTotalScore(), 0.001);
assertNull(result.getFloorDiff(), "floorDiff 应为 null表示楼层信息不完整");
}
@Test
void symmetricNullHandling_targetFloorMissing_shouldGiveZeroFloorScore() {
// B: targetFloor=null工单区域未登记楼层同样应得 floorScore=0——与 baseFloor 缺失等价
LocalDateTime now = LocalDateTime.of(2026, 4, 20, 12, 0);
QueueScoreResult result = calculator.calculate(QueueScoreContext.builder()
.priority(1)
.baseFloorNo(5).targetFloorNo(null)
.enqueueTime(now).now(now)
.build());
assertEquals(1500.0, result.getTotalScore(), 0.001);
assertNull(result.getFloorDiff());
}
@Test
void symmetricNullHandling_bothTasksWithPartialFloor_shouldSortByAgingOnly() {
// 关键回归:旧逻辑会给"有 base 无 target"+600 罚分,导致同一工单在不同 base 场景排序不单调。
// 现在两个任务楼层信息同等"不完整"应仅靠 aging 排序,"等得久"的排前。
LocalDateTime now = LocalDateTime.of(2026, 4, 20, 12, 0);
QueueScoreResult newerNoTarget = calculator.calculate(QueueScoreContext.builder()
.priority(1)
.baseFloorNo(5).targetFloorNo(null)
.enqueueTime(now.minusMinutes(5))
.now(now)
.build());
QueueScoreResult olderNoTarget = calculator.calculate(QueueScoreContext.builder()
.priority(1)
.baseFloorNo(5).targetFloorNo(null)
.enqueueTime(now.minusMinutes(80))
.now(now)
.build());
// 两者 floorScore 都 =0aging 越大分越低 → older 先派
assertTrue(olderNoTarget.getTotalScore() < newerNoTarget.getTotalScore());
}
@Test
void floorCappedAtMaxFloorDiff_evenWhenActualDiffMuchLarger() {
// 楼层差 25 应按 10 封顶floorScore = 10 × 100 = 1000
LocalDateTime now = LocalDateTime.of(2026, 4, 20, 12, 0);
QueueScoreResult result = calculator.calculate(QueueScoreContext.builder()
.priority(1)
.baseFloorNo(0).targetFloorNo(25)
.enqueueTime(now).now(now)
.build());
assertEquals(25, result.getFloorDiff(), "floorDiff 透传原始差值,便于诊断");
// priorityScore=1500 + floorScore(capped)=1000 - aging=0 = 2500
assertEquals(2500.0, result.getTotalScore(), 0.001,
"超过 10 层差应按 10 层封顶计算分数");
}
@Test
void floorWeightShouldDominateAgingCap() {
// 锁死这次改动的核心不变量FLOOR_WEIGHT * MAX_FLOOR_DIFF > AGING_WEIGHT * MAX_AGING_MINUTES
// 即 100 * 10 = 1000 > 3 * 240 = 720
int floorMax = QueueScoreCalculator.FLOOR_WEIGHT * QueueScoreCalculator.MAX_FLOOR_DIFF;
int agingMax = QueueScoreCalculator.AGING_WEIGHT * QueueScoreCalculator.MAX_AGING_MINUTES;
assertTrue(floorMax > agingMax,
"权重失衡floor 封顶 " + floorMax + " 不再压倒 aging 封顶 " + agingMax
+ ",会导致等得久的远楼层任务反超近楼层");
}
}

View File

@@ -146,6 +146,12 @@ viewsh:
connect-timeout: 5000
read-timeout: 10000
max-retry: 2
clean:
auto-cancel:
# 保洁工单 update_time 距今超过此小时数视为卡死,由 CleanOrderAutoCancelJob 自动取消
timeout-hours: 12
# 单次扫描/取消上限,防止事件风暴;超出的工单留给下一轮 cron
batch-size: 200
# API 签名配置:外部系统调用开放接口时使用(如安保工单的告警系统)
signature:
apps: