diff --git a/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/listener/BadgeDeviceStatusEventListener.java b/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/listener/BadgeDeviceStatusEventListener.java index a9cc44c5..cd0b7857 100644 --- a/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/listener/BadgeDeviceStatusEventListener.java +++ b/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/listener/BadgeDeviceStatusEventListener.java @@ -15,9 +15,6 @@ import jakarta.annotation.Resource; import lombok.extern.slf4j.Slf4j; import org.springframework.context.event.EventListener; import org.springframework.stereotype.Component; -import org.springframework.transaction.PlatformTransactionManager; -import org.springframework.transaction.TransactionDefinition; -import org.springframework.transaction.support.TransactionTemplate; /** * 工牌设备状态事件监听器 @@ -87,9 +84,6 @@ public class BadgeDeviceStatusEventListener { @Resource private OrderLifecycleManager orderLifecycleManager; - @Resource - private PlatformTransactionManager transactionManager; - /** * 监听工单状态变更事件,同步更新设备工单关联 *

@@ -180,40 +174,27 @@ public class BadgeDeviceStatusEventListener { /** * 处理工单推送状态(首次设置工单关联) + *

+ * 若 Redis 里检测到旧 orderId(正常业务不应出现),仅打 ERROR 告警并清理 Redis 关联。 + * 此前版本会在此处"自动取消旧工单",但那是对"数据已错乱"场景的暴力兜底: + *

+ * 现改为被动告警,暴露问题等待定位,避免误杀保洁员正在执行的任务。 */ private void handleDispatched(Long deviceId, Long orderId, OpsOrderDO order) { - // 检查并清理旧工单(防止工单切换时状态残留) BadgeDeviceStatusDTO deviceStatus = badgeDeviceStatusService.getBadgeStatus(deviceId); if (deviceStatus != null && deviceStatus.getCurrentOpsOrderId() != null) { Long oldOrderId = deviceStatus.getCurrentOpsOrderId(); if (!oldOrderId.equals(orderId)) { - log.warn("[BadgeDeviceStatusEventListener] 派发新工单时检测到旧工单残留: " + - "deviceId={}, oldOrderId={}, newOrderId={}", deviceId, oldOrderId, orderId); - - // 检查旧工单是否仍在进行中,如果是则先取消 OpsOrderDO oldOrder = opsOrderMapper.selectById(oldOrderId); - if (oldOrder != null) { - WorkOrderStatusEnum oldStatus = WorkOrderStatusEnum.fromStatus(oldOrder.getStatus()); - if (oldStatus == WorkOrderStatusEnum.DISPATCHED - || oldStatus == WorkOrderStatusEnum.CONFIRMED - || oldStatus == WorkOrderStatusEnum.ARRIVED) { - // 旧工单仍在进行,先取消 - // 使用 REQUIRES_NEW 独立事务,避免内层异常标记外层事务 rollback-only - log.warn("[BadgeDeviceStatusEventListener] 取消残留的旧工单: oldOrderId={}", oldOrderId); - try { - TransactionTemplate txTemplate = new TransactionTemplate(transactionManager); - txTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW); - txTemplate.executeWithoutResult(status -> { - orderLifecycleManager.cancelOrder(oldOrderId, deviceId, - OperatorTypeEnum.SYSTEM, "新工单派发,自动取消旧工单"); - }); - } catch (Exception e) { - log.error("[BadgeDeviceStatusEventListener] 取消旧工单失败: oldOrderId={}", oldOrderId, e); - } - } - } + String oldStatus = oldOrder != null ? oldOrder.getStatus() : "NOT_FOUND"; + log.error("[BadgeDeviceStatusEventListener] 派发新工单时检测到旧工单残留(数据可能已错乱,需人工核查): " + + "deviceId={}, oldOrderId={}, oldStatus={}, newOrderId={}", + deviceId, oldOrderId, oldStatus, orderId); - // 确保设备状态清理(无论旧工单是否取消成功) + // 清理 Redis 中对旧工单的关联(纯 Redis 操作,不触达状态机) badgeDeviceStatusService.clearCurrentOrder(deviceId); } } diff --git a/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/job/BadgeDeviceStatusSyncJob.java b/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/job/BadgeDeviceStatusSyncJob.java index fdb052ef..c17c81f6 100644 --- a/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/job/BadgeDeviceStatusSyncJob.java +++ b/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/job/BadgeDeviceStatusSyncJob.java @@ -4,7 +4,9 @@ import cn.hutool.core.collection.CollUtil; import cn.hutool.core.util.StrUtil; import com.viewsh.framework.common.pojo.CommonResult; import com.viewsh.framework.tenant.core.job.TenantJob; +import com.viewsh.module.iot.api.device.IotDeviceQueryApi; import com.viewsh.module.iot.api.device.IotDeviceStatusQueryApi; +import com.viewsh.module.iot.api.device.dto.IotDeviceSimpleRespDTO; import com.viewsh.module.iot.api.device.dto.status.DeviceStatusRespDTO; import com.viewsh.module.ops.api.badge.BadgeDeviceStatusDTO; import com.viewsh.module.ops.dal.dataobject.area.OpsAreaDeviceRelationDO; @@ -18,6 +20,8 @@ import jakarta.annotation.Resource; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; +import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.stream.Collectors; @@ -47,6 +51,9 @@ public class BadgeDeviceStatusSyncJob { @Resource private IotDeviceStatusQueryApi iotDeviceStatusQueryApi; + @Resource + private IotDeviceQueryApi iotDeviceQueryApi; + @Resource private OpsAreaDeviceRelationMapper areaDeviceRelationMapper; @@ -120,6 +127,9 @@ public class BadgeDeviceStatusSyncJob { OpsAreaDeviceRelationDO::getAreaId, (existing, replacement) -> existing)); + // 3b. 批量查询设备 nickname(IoT 是唯一可信源),防止 Redis key 丢失后降级到 deviceCode + Map deviceNicknameMap = loadDeviceNicknameMap(deviceIds); + // 4. 逐一对账并修正 for (DeviceStatusRespDTO iotStatus : iotResult.getData()) { // 4a. 工单一致性检查(修复残留的已终态工单关联) @@ -135,7 +145,10 @@ public class BadgeDeviceStatusSyncJob { } // 4b. IoT 在线/离线状态对账 - boolean corrected = syncSingleDevice(iotStatus, deviceAreaMap.get(iotStatus.getDeviceId())); + boolean corrected = syncSingleDevice( + iotStatus, + deviceAreaMap.get(iotStatus.getDeviceId()), + deviceNicknameMap.get(iotStatus.getDeviceId())); syncCount++; if (corrected) { correctedCount++; @@ -154,9 +167,10 @@ public class BadgeDeviceStatusSyncJob { * * @param iotStatus IoT 设备状态 * @param areaId 设备所属区域ID + * @param nickname 设备昵称(从 IoT 查到的权威值,允许 null) * @return 是否进行了修正 */ - private boolean syncSingleDevice(DeviceStatusRespDTO iotStatus, Long areaId) { + private boolean syncSingleDevice(DeviceStatusRespDTO iotStatus, Long areaId, String nickname) { Long deviceId = iotStatus.getDeviceId(); try { @@ -168,8 +182,20 @@ public class BadgeDeviceStatusSyncJob { boolean opsOnline = opsStatus != null && opsStatus.getStatus() != null && opsStatus.getStatus().isActive(); - // 如果状态一致,无需修正 + // 如果状态一致,但 Redis 缺 nickname 而 IoT 有值,则补写一次防止派单时降级显示 deviceCode if (iotOnline == opsOnline) { + if (iotOnline && nickname != null + && (opsStatus == null || opsStatus.getNickname() == null)) { + badgeDeviceStatusService.updateBadgeOnlineStatus( + deviceId, + iotStatus.getDeviceCode(), + nickname, + areaId, + BadgeDeviceStatusEnum.IDLE, + "定时对账补写-昵称"); + log.info("[SyncJob] 补写设备昵称:deviceId={}, nickname={}", deviceId, nickname); + return true; + } return false; } @@ -178,17 +204,17 @@ public class BadgeDeviceStatusSyncJob { badgeDeviceStatusService.updateBadgeOnlineStatus( deviceId, iotStatus.getDeviceCode(), - null, // nickname: 对账场景不更新昵称,保留Redis中已有值 + nickname, areaId, BadgeDeviceStatusEnum.IDLE, "定时对账修正-上线"); - log.info("[SyncJob] 修正设备状态:deviceId={}, IoT=ONLINE, Ops={}", - deviceId, opsOnline ? "ACTIVE" : "OFFLINE/NULL"); + log.info("[SyncJob] 修正设备状态:deviceId={}, IoT=ONLINE, Ops={}, nickname={}", + deviceId, opsOnline ? "ACTIVE" : "OFFLINE/NULL", nickname); } else { badgeDeviceStatusService.updateBadgeOnlineStatus( deviceId, iotStatus.getDeviceCode(), - null, // nickname: 对账场景不更新昵称,保留Redis中已有值 + nickname, null, BadgeDeviceStatusEnum.OFFLINE, "定时对账修正-离线"); @@ -204,6 +230,35 @@ public class BadgeDeviceStatusSyncJob { } } + /** + * 批量从 IoT 查询设备昵称 + *

+ * Redis 中 ops:badge:device:{deviceId} 的 nickname 字段可能因 TTL/重启/缓存清理而缺失, + * 每次对账时以 IoT 为唯一可信源做回填,避免派单时降级为 deviceCode(如 "43607737587")。 + */ + private Map loadDeviceNicknameMap(List deviceIds) { + if (CollUtil.isEmpty(deviceIds)) { + return Collections.emptyMap(); + } + try { + CommonResult> result = iotDeviceQueryApi.batchGetDevices(deviceIds); + if (!result.isSuccess() || CollUtil.isEmpty(result.getData())) { + log.warn("[SyncJob] 查询设备昵称失败或为空: {}", result.getMsg()); + return Collections.emptyMap(); + } + Map map = new HashMap<>(result.getData().size()); + for (IotDeviceSimpleRespDTO dto : result.getData()) { + if (dto.getId() != null && dto.getNickname() != null) { + map.put(dto.getId(), dto.getNickname()); + } + } + return map; + } catch (Exception e) { + log.warn("[SyncJob] 批量查询设备昵称异常,本次对账跳过昵称回填", e); + return Collections.emptyMap(); + } + } + /** * 同步结果 */ diff --git a/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/job/CleanOrderAutoCancelJob.java b/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/job/CleanOrderAutoCancelJob.java new file mode 100644 index 00000000..d2071e63 --- /dev/null +++ b/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/job/CleanOrderAutoCancelJob.java @@ -0,0 +1,160 @@ +package com.viewsh.module.ops.environment.job; + +import cn.hutool.core.collection.CollUtil; +import cn.hutool.core.util.StrUtil; +import com.viewsh.framework.mybatis.core.query.LambdaQueryWrapperX; +import com.viewsh.framework.tenant.core.job.TenantJob; +import com.viewsh.module.ops.core.lifecycle.OrderLifecycleManager; +import com.viewsh.module.ops.dal.dataobject.workorder.OpsOrderDO; +import com.viewsh.module.ops.dal.mysql.workorder.OpsOrderMapper; +import com.viewsh.module.ops.enums.OperatorTypeEnum; +import com.viewsh.module.ops.enums.WorkOrderStatusEnum; +import com.xxl.job.core.handler.annotation.XxlJob; +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; + +import java.time.LocalDateTime; +import java.util.List; + +/** + * 保洁工单超时自动取消 Job + *

+ * 职责: + * 扫描所有保洁类(order_type=CLEAN)非终态工单, + * 若最近一次进展(update_time)距今超过阈值(默认 12 小时), + * 以 SYSTEM 身份走正常取消流程将其关闭。 + *

+ * 设计要点: + * 1. 时间基准使用 update_time 而非 create_time——任何状态转换/字段更新都会刷新 update_time, + * 这样"按最新进展计算超时"才准确:刚被重派的 DISPATCHED 单不会因 create_time 老而被误杀。 + * 2. 状态白名单 = PENDING / QUEUED / DISPATCHED / CONFIRMED / ARRIVED(不含 PAUSED)。 + * PAUSED 是 P0 打断的产物,应由 resumeInterruptedOrder 经状态机走 PAUSED → DISPATCHED + * 恢复。若此 Job 把 PAUSED 单直接 CANCELLED,P0 完成后的 resume 会在状态机检查 + * "PAUSED → DISPATCHED" 时因源状态已变为 CANCELLED 而抛 IllegalStateException, + * 进而破坏 P0 恢复链路。PAUSED 若真的卡死(P0 也卡),交由人工审核,不自动化。 + * 3. 取消调用 {@link OrderLifecycleManager#cancelOrder} 走完整责任链: + * StateTransitionHandler → QueueSyncHandler → EventPublishHandler + * → CleanOrderEventListener.onOrderStateChanged(CANCELLED) 会统一处理 + * TTS 停播、设备工单关联回收、审计日志。 + * 4. 单单独立事务 + try/catch 隔离,单条失败不影响批次其余工单。 + * 5. 单次扫描限 batchSize 条,防止异常堆积时一次性取消过多触发事件风暴; + * 未处理完的工单留给下一轮 cron。 + * 6. cancel 前再做一次乐观校验:重查 update_time 是否仍 <= threshold。 + * 候选装内存到实际 cancel 之间如果有用户触达(确认/到岗),update_time 会被刷新; + * 此时放弃 cancel,避免误杀用户刚触达的工单。 + *

+ * XXL-Job 配置建议: + * - JobHandler: cleanOrderAutoCancelJob + * - Cron: 0 17 * * * ? (每小时 :17 触发,避开整点尖峰) + * + * @author lzh + */ +@Slf4j +@Component +public class CleanOrderAutoCancelJob { + + private static final String BUSINESS_TYPE_CLEAN = "CLEAN"; + private static final String CANCEL_REASON = "超过12小时未处理,系统自动完结"; + + @Resource + private OpsOrderMapper opsOrderMapper; + + @Resource + private OrderLifecycleManager orderLifecycleManager; + + /** 超时时长(小时),update_time 距今超过此值视为卡死 */ + @Value("${viewsh.ops.clean.auto-cancel.timeout-hours:12}") + private int timeoutHours; + + /** 单次最大扫描/取消工单数,防止事件风暴 */ + @Value("${viewsh.ops.clean.auto-cancel.batch-size:200}") + private int batchSize; + + @XxlJob("cleanOrderAutoCancelJob") + @TenantJob + public String execute() { + try { + CancelResult result = scanAndCancel(); + return StrUtil.format( + "保洁工单超时自动取消完成: 扫描 {} 单, 成功 {}, 失败 {}, 跳过 {}, 耗时 {} ms", + result.scanned, result.succeeded, result.failed, result.skippedStale, result.durationMs); + } catch (Exception e) { + log.error("[CleanOrderAutoCancelJob] 执行失败", e); + return StrUtil.format("保洁工单超时自动取消失败: {}", e.getMessage()); + } + } + + public CancelResult scanAndCancel() { + long startTime = System.currentTimeMillis(); + LocalDateTime threshold = LocalDateTime.now().minusHours(timeoutHours); + + log.info("[CleanOrderAutoCancelJob] 开始扫描: timeoutHours={}, threshold={}, batchSize={}", + timeoutHours, threshold, batchSize); + + List candidates = opsOrderMapper.selectList(new LambdaQueryWrapperX() + .eq(OpsOrderDO::getOrderType, BUSINESS_TYPE_CLEAN) + .notIn(OpsOrderDO::getStatus, + WorkOrderStatusEnum.COMPLETED.getStatus(), + WorkOrderStatusEnum.CANCELLED.getStatus(), + // PAUSED 交由 resumeInterruptedOrder 经状态机恢复,不在此 Job 自动化处理 + WorkOrderStatusEnum.PAUSED.getStatus()) + .le(OpsOrderDO::getUpdateTime, threshold) + .orderByAsc(OpsOrderDO::getUpdateTime) + .last("LIMIT " + batchSize)); + + if (CollUtil.isEmpty(candidates)) { + log.info("[CleanOrderAutoCancelJob] 无超时工单"); + return new CancelResult(0, 0, 0, 0, System.currentTimeMillis() - startTime); + } + + int succeeded = 0; + int failed = 0; + int skippedStale = 0; + + for (OpsOrderDO order : candidates) { + Long orderId = order.getId(); + try { + // 乐观校验:候选装内存→实际 cancel 之间,用户可能已触达工单刷新 update_time。 + // 重查一次确认仍超时,避免把用户刚点过的工单一并 cancel 掉。 + OpsOrderDO fresh = opsOrderMapper.selectById(orderId); + if (fresh == null + || WorkOrderStatusEnum.COMPLETED.getStatus().equals(fresh.getStatus()) + || WorkOrderStatusEnum.CANCELLED.getStatus().equals(fresh.getStatus()) + || WorkOrderStatusEnum.PAUSED.getStatus().equals(fresh.getStatus()) + || fresh.getUpdateTime() == null + || fresh.getUpdateTime().isAfter(threshold)) { + skippedStale++; + log.info("[CleanOrderAutoCancelJob] 并发触达/状态已变,跳过: orderId={}, snapshotStatus={}, latestStatus={}, latestUpdateTime={}", + orderId, order.getStatus(), + fresh != null ? fresh.getStatus() : "NOT_FOUND", + fresh != null ? fresh.getUpdateTime() : null); + continue; + } + + orderLifecycleManager.cancelOrder( + orderId, + null, + OperatorTypeEnum.SYSTEM, + CANCEL_REASON); + succeeded++; + log.info("[CleanOrderAutoCancelJob] 自动取消成功: orderId={}, orderCode={}, status={}, updateTime={}", + orderId, order.getOrderCode(), order.getStatus(), order.getUpdateTime()); + } catch (Exception e) { + failed++; + log.warn("[CleanOrderAutoCancelJob] 自动取消失败: orderId={}, orderCode={}, status={}, error={}", + orderId, order.getOrderCode(), order.getStatus(), e.getMessage(), e); + } + } + + long duration = System.currentTimeMillis() - startTime; + log.info("[CleanOrderAutoCancelJob] 扫描完成: 扫描 {} 单, 成功 {}, 失败 {}, 跳过 {}, 耗时 {} ms", + candidates.size(), succeeded, failed, skippedStale, duration); + + return new CancelResult(candidates.size(), succeeded, failed, skippedStale, duration); + } + + public record CancelResult(int scanned, int succeeded, int failed, int skippedStale, long durationMs) { + } +} diff --git a/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/service/dispatch/BadgeDeviceAreaAssignStrategy.java b/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/service/dispatch/BadgeDeviceAreaAssignStrategy.java index b1923737..c3f9cfd0 100644 --- a/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/service/dispatch/BadgeDeviceAreaAssignStrategy.java +++ b/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/service/dispatch/BadgeDeviceAreaAssignStrategy.java @@ -89,8 +89,7 @@ public class BadgeDeviceAreaAssignStrategy implements AssignStrategy { if (selectedDevice != null) { String reason = buildRecommendationReason(selectedDevice, context); - String assigneeName = selectedDevice.getNickname() != null - ? selectedDevice.getNickname() : selectedDevice.getDeviceCode(); + String assigneeName = resolveAssigneeName(selectedDevice); return AssigneeRecommendation.of( selectedDevice.getDeviceId(), assigneeName, @@ -118,8 +117,7 @@ public class BadgeDeviceAreaAssignStrategy implements AssignStrategy { .map(device -> { int score = calculateScore(device); String reason = buildRecommendationReason(device, context); - String assigneeName = device.getNickname() != null - ? device.getNickname() : device.getDeviceCode(); + String assigneeName = resolveAssigneeName(device); return AssigneeRecommendation.of( device.getDeviceId(), assigneeName, @@ -133,6 +131,25 @@ public class BadgeDeviceAreaAssignStrategy implements AssignStrategy { // ==================== 私有方法 ==================== + /** + * 解析执行人展示名称。 + *

+ * 优先用 nickname;nickname 缺失时(例如 Redis 状态缓存被清理、IoT 侧未维护昵称), + * 返回 "工牌-尾号" 这样的可读降级文案,避免把 deviceCode/IMEI 这类长数字串直接当作人员名字暴露给调用方。 + */ + private String resolveAssigneeName(BadgeDeviceStatusDTO device) { + String nickname = device.getNickname(); + if (nickname != null && !nickname.isBlank()) { + return nickname; + } + String code = device.getDeviceCode(); + if (code != null && !code.isBlank()) { + int len = code.length(); + return "工牌-" + (len > 4 ? code.substring(len - 4) : code); + } + return device.getDeviceId() != null ? "工牌-" + device.getDeviceId() : "未知工牌"; + } + /** * 选择最佳设备 */ diff --git a/viewsh-module-ops/viewsh-module-environment-biz/src/test/java/com/viewsh/module/ops/environment/job/CleanOrderAutoCancelJobTest.java b/viewsh-module-ops/viewsh-module-environment-biz/src/test/java/com/viewsh/module/ops/environment/job/CleanOrderAutoCancelJobTest.java new file mode 100644 index 00000000..a18dbf22 --- /dev/null +++ b/viewsh-module-ops/viewsh-module-environment-biz/src/test/java/com/viewsh/module/ops/environment/job/CleanOrderAutoCancelJobTest.java @@ -0,0 +1,198 @@ +package com.viewsh.module.ops.environment.job; + +import com.viewsh.module.ops.core.lifecycle.OrderLifecycleManager; +import com.viewsh.module.ops.dal.dataobject.workorder.OpsOrderDO; +import com.viewsh.module.ops.dal.mysql.workorder.OpsOrderMapper; +import com.viewsh.module.ops.enums.OperatorTypeEnum; +import com.viewsh.module.ops.enums.WorkOrderStatusEnum; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.springframework.test.util.ReflectionTestUtils; + +import java.time.LocalDateTime; +import java.util.Collections; +import java.util.List; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +/** + * 验证 CleanOrderAutoCancelJob 的五条不变量: + *

    + *
  1. 无候选 → 返回零结果,不触发取消
  2. + *
  3. 正常批次 → 依次 cancel,成功计数正确
  4. + *
  5. 单条失败不中断其余 → try/catch 隔离
  6. + *
  7. 候选到 cancel 间被用户触达 → 乐观锁跳过(避免误杀)
  8. + *
  9. 候选到 cancel 间状态变为终态/PAUSED → 跳过
  10. + *
+ */ +@ExtendWith(MockitoExtension.class) +class CleanOrderAutoCancelJobTest { + + @Mock + private OpsOrderMapper opsOrderMapper; + @Mock + private OrderLifecycleManager orderLifecycleManager; + + @InjectMocks + private CleanOrderAutoCancelJob job; + + @BeforeEach + void setUp() { + ReflectionTestUtils.setField(job, "timeoutHours", 12); + ReflectionTestUtils.setField(job, "batchSize", 200); + } + + @Test + void scanAndCancel_whenNoCandidates_shouldReturnZeroCounts() { + when(opsOrderMapper.selectList(any(com.viewsh.framework.mybatis.core.query.LambdaQueryWrapperX.class))) + .thenReturn(Collections.emptyList()); + + CleanOrderAutoCancelJob.CancelResult result = job.scanAndCancel(); + + assertEquals(0, result.scanned()); + assertEquals(0, result.succeeded()); + assertEquals(0, result.failed()); + assertEquals(0, result.skippedStale()); + verify(orderLifecycleManager, never()).cancelOrder(anyLong(), any(), any(), any()); + } + + @Test + void scanAndCancel_whenAllCandidatesStillStale_shouldCancelAll() { + LocalDateTime staleTime = LocalDateTime.now().minusHours(13); + OpsOrderDO a = stale(101L, "WO-101", WorkOrderStatusEnum.DISPATCHED, staleTime); + OpsOrderDO b = stale(102L, "WO-102", WorkOrderStatusEnum.CONFIRMED, staleTime); + OpsOrderDO c = stale(103L, "WO-103", WorkOrderStatusEnum.ARRIVED, staleTime); + + when(opsOrderMapper.selectList(any(com.viewsh.framework.mybatis.core.query.LambdaQueryWrapperX.class))) + .thenReturn(List.of(a, b, c)); + // Fresh fetch confirms all three are still stale + when(opsOrderMapper.selectById(101L)).thenReturn(a); + when(opsOrderMapper.selectById(102L)).thenReturn(b); + when(opsOrderMapper.selectById(103L)).thenReturn(c); + + CleanOrderAutoCancelJob.CancelResult result = job.scanAndCancel(); + + assertEquals(3, result.scanned()); + assertEquals(3, result.succeeded()); + assertEquals(0, result.failed()); + assertEquals(0, result.skippedStale()); + verify(orderLifecycleManager, times(3)) + .cancelOrder(anyLong(), eq(null), eq(OperatorTypeEnum.SYSTEM), any()); + } + + @Test + void scanAndCancel_whenOneCancelThrows_shouldNotAbortBatch() { + LocalDateTime staleTime = LocalDateTime.now().minusHours(13); + OpsOrderDO a = stale(201L, "WO-201", WorkOrderStatusEnum.DISPATCHED, staleTime); + OpsOrderDO b = stale(202L, "WO-202", WorkOrderStatusEnum.CONFIRMED, staleTime); + OpsOrderDO c = stale(203L, "WO-203", WorkOrderStatusEnum.ARRIVED, staleTime); + + when(opsOrderMapper.selectList(any(com.viewsh.framework.mybatis.core.query.LambdaQueryWrapperX.class))) + .thenReturn(List.of(a, b, c)); + when(opsOrderMapper.selectById(201L)).thenReturn(a); + when(opsOrderMapper.selectById(202L)).thenReturn(b); + when(opsOrderMapper.selectById(203L)).thenReturn(c); + // 第二条取消抛异常,不应影响第一、第三条。 + // 不能用 doThrow(...).when(mock).cancelOrder(eq(202L), ...)——strict stubs 会把"201L 调用和 202L 存根不匹配"判成错配。 + // 改用 doAnswer 按 orderId 路由,覆盖所有 cancel 调用。 + doAnswer(invocation -> { + Long orderId = invocation.getArgument(0); + if (orderId != null && orderId == 202L) { + throw new IllegalStateException("状态机非法转换"); + } + return null; + }).when(orderLifecycleManager).cancelOrder(anyLong(), any(), any(), any()); + + CleanOrderAutoCancelJob.CancelResult result = job.scanAndCancel(); + + assertEquals(3, result.scanned()); + assertEquals(2, result.succeeded()); + assertEquals(1, result.failed()); + assertEquals(0, result.skippedStale()); + verify(orderLifecycleManager).cancelOrder(eq(201L), any(), any(), any()); + verify(orderLifecycleManager).cancelOrder(eq(202L), any(), any(), any()); + verify(orderLifecycleManager).cancelOrder(eq(203L), any(), any(), any()); + } + + @Test + void scanAndCancel_whenOrderTouchedBeforeCancel_shouldSkipAsStale() { + // 候选装内存时 update_time=13h ago,实际 cancel 前用户刚刚点确认,update_time 刷为"1 分钟前"。 + // 乐观校验应跳过,避免误杀已被触达的工单。 + LocalDateTime snapshotUpdate = LocalDateTime.now().minusHours(13); + LocalDateTime freshUpdate = LocalDateTime.now().minusMinutes(1); + + OpsOrderDO snapshot = stale(301L, "WO-301", WorkOrderStatusEnum.DISPATCHED, snapshotUpdate); + OpsOrderDO fresh = stale(301L, "WO-301", WorkOrderStatusEnum.CONFIRMED, freshUpdate); + + when(opsOrderMapper.selectList(any(com.viewsh.framework.mybatis.core.query.LambdaQueryWrapperX.class))) + .thenReturn(List.of(snapshot)); + when(opsOrderMapper.selectById(301L)).thenReturn(fresh); + + CleanOrderAutoCancelJob.CancelResult result = job.scanAndCancel(); + + assertEquals(1, result.scanned()); + assertEquals(0, result.succeeded()); + assertEquals(1, result.skippedStale()); + verify(orderLifecycleManager, never()).cancelOrder(anyLong(), any(), any(), any()); + } + + @Test + void scanAndCancel_whenOrderBecameTerminal_shouldSkip() { + // 候选装内存时还是 ARRIVED,实际 cancel 前已被其他路径 forceComplete 为 COMPLETED + LocalDateTime staleTime = LocalDateTime.now().minusHours(13); + OpsOrderDO snapshot = stale(401L, "WO-401", WorkOrderStatusEnum.ARRIVED, staleTime); + OpsOrderDO fresh = stale(401L, "WO-401", WorkOrderStatusEnum.COMPLETED, staleTime); + + when(opsOrderMapper.selectList(any(com.viewsh.framework.mybatis.core.query.LambdaQueryWrapperX.class))) + .thenReturn(List.of(snapshot)); + when(opsOrderMapper.selectById(401L)).thenReturn(fresh); + + CleanOrderAutoCancelJob.CancelResult result = job.scanAndCancel(); + + assertEquals(1, result.skippedStale()); + verify(orderLifecycleManager, never()).cancelOrder(anyLong(), any(), any(), any()); + } + + @Test + void scanAndCancel_whenOrderBecamePaused_shouldSkip() { + // 快照是 DISPATCHED,刚被 P0 打断成 PAUSED——此 Job 应放行给 resumeInterruptedOrder + LocalDateTime staleTime = LocalDateTime.now().minusHours(13); + OpsOrderDO snapshot = stale(501L, "WO-501", WorkOrderStatusEnum.DISPATCHED, staleTime); + OpsOrderDO fresh = stale(501L, "WO-501", WorkOrderStatusEnum.PAUSED, + LocalDateTime.now().minusHours(14)); // update_time 刚刷新,但仍<=threshold;状态变 PAUSED 就该跳过 + + when(opsOrderMapper.selectList(any(com.viewsh.framework.mybatis.core.query.LambdaQueryWrapperX.class))) + .thenReturn(List.of(snapshot)); + when(opsOrderMapper.selectById(501L)).thenReturn(fresh); + + CleanOrderAutoCancelJob.CancelResult result = job.scanAndCancel(); + + assertEquals(1, result.skippedStale()); + verify(orderLifecycleManager, never()).cancelOrder(anyLong(), any(), any(), any()); + } + + // ==================== Helpers ==================== + + private OpsOrderDO stale(Long id, String code, WorkOrderStatusEnum status, LocalDateTime updateTime) { + OpsOrderDO order = OpsOrderDO.builder() + .id(id) + .orderCode(code) + .status(status.getStatus()) + .orderType("CLEAN") + .build(); + order.setUpdateTime(updateTime); + return order; + } +} diff --git a/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/core/dispatch/DispatchEngineImpl.java b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/core/dispatch/DispatchEngineImpl.java index 48ab3741..b65f4582 100644 --- a/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/core/dispatch/DispatchEngineImpl.java +++ b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/core/dispatch/DispatchEngineImpl.java @@ -9,6 +9,7 @@ import com.viewsh.module.ops.core.dispatch.strategy.ScheduleStrategy; import com.viewsh.module.ops.core.lifecycle.OrderLifecycleManager; import com.viewsh.module.ops.core.lifecycle.model.OrderTransitionRequest; import com.viewsh.module.ops.core.lifecycle.model.OrderTransitionResult; +import com.viewsh.module.ops.core.lifecycle.model.TransitionErrorCode; import com.viewsh.module.ops.dal.dataobject.workorder.OpsOrderDO; import com.viewsh.module.ops.dal.mysql.workorder.OpsOrderMapper; import com.viewsh.module.ops.enums.OperatorTypeEnum; @@ -178,6 +179,22 @@ public class DispatchEngineImpl implements DispatchEngine { public DispatchResult autoDispatchNext(Long completedOrderId, Long assigneeId) { log.info("任务完成后自动派发下一单: completedOrderId={}, assigneeId={}", completedOrderId, assigneeId); + if (assigneeId == null) { + log.warn("autoDispatchNext 缺少执行人,跳过派发: completedOrderId={}", completedOrderId); + return DispatchResult.success("缺少执行人,跳过派发", null); + } + + // 空闲校验:若执行人仍挂着其他活跃工单(DISPATCHED/CONFIRMED/ARRIVED/PAUSED), + // 说明设备尚未真正空闲,不应再派发新任务——否则会触发"同一设备并行多单"的状态错乱, + // 典型场景是管理员手动取消一个僵尸 DISPATCHED 单时,handleCancelled 会调到这里。 + List activeOrders = orderMapper.selectActiveByAssignee(assigneeId, completedOrderId); + if (!activeOrders.isEmpty()) { + OpsOrderDO head = activeOrders.get(0); + log.info("执行人仍有活跃工单,跳过自动派发: assigneeId={}, completedOrderId={}, activeCount={}, sampleOrderId={}, sampleStatus={}", + assigneeId, completedOrderId, activeOrders.size(), head.getId(), head.getStatus()); + return DispatchResult.success("执行人非空闲,跳过派发", assigneeId); + } + Long fallbackAreaId = null; OpsOrderDO completedOrder = orderMapper.selectById(completedOrderId); if (completedOrder != null) { @@ -229,7 +246,9 @@ public class DispatchEngineImpl implements DispatchEngine { .reason("等待队列动态重排后自动派发") .build(); - OrderTransitionResult result = orderLifecycleManager.transition(request); + // 走 dispatch() 而不是 transition():dispatch 内部会先做 FOR UPDATE 不变量检查 + // (Bug #2 防线),避免 autoDispatchNext 在"从队列派发"这一类入口绕过串行化。 + OrderTransitionResult result = orderLifecycleManager.dispatch(request); if (result.isSuccess()) { return DispatchResult.success("已按队列总分派发下一单", assigneeId); @@ -346,6 +365,23 @@ public class DispatchEngineImpl implements DispatchEngine { Long orderId = context.getOrderId(); Long assigneeId = context.getRecommendedAssigneeId(); + // 兜底校验:调度策略基于 Redis 的设备状态判空闲,可能与 MySQL 的 ops_order 实际活跃态不一致 + // (例如设备 Redis 状态被某次 COMPLETED 清回 IDLE 但历史 CONFIRMED/DISPATCHED 单仍残留)。 + // 若分配路径会真正推送工单给设备(DIRECT_DISPATCH / PUSH_AND_ENQUEUE), + // 此处再查一次 MySQL,非空闲时强制降级到 ENQUEUE_ONLY,避免同一设备并行多单的状态错乱。 + if (assigneeId != null + && (decision.getPath() == DispatchPath.DIRECT_DISPATCH + || decision.getPath() == DispatchPath.PUSH_AND_ENQUEUE)) { + List activeOrders = orderMapper.selectActiveByAssignee(assigneeId, orderId); + if (!activeOrders.isEmpty()) { + OpsOrderDO head = activeOrders.get(0); + log.warn("调度决策为 {} 但执行人仍挂活跃工单,降级为仅入队: orderId={}, assigneeId={}, activeCount={}, sampleOrderId={}, sampleStatus={}", + decision.getPath(), orderId, assigneeId, + activeOrders.size(), head.getId(), head.getStatus()); + return executeEnqueueOnly(context, assigneeId); + } + } + switch (decision.getPath()) { case DIRECT_DISPATCH: return executeDirectDispatch(context, assigneeId); @@ -402,9 +438,25 @@ public class DispatchEngineImpl implements DispatchEngine { DispatchPath.DIRECT_DISPATCH, result.getQueueId() ); - } else { - return DispatchResult.fail("直接派单失败: " + result.getMessage()); } + + // 并发冲突兜底:dispatch 入口的 FOR UPDATE 判定执行人已有活跃工单, + // 此时工单仍在原状态(通常是 PENDING)。如果仍是 PENDING,直接降级为入队, + // 避免工单悬空;若已是 QUEUED(例如从队列派发被抢先),则让它继续留在队列等下一轮。 + if (result.getErrorCode() == TransitionErrorCode.ASSIGNEE_HAS_ACTIVE_ORDER) { + OpsOrderDO order = orderMapper.selectById(context.getOrderId()); + String currentStatus = order != null ? order.getStatus() : null; + if (WorkOrderStatusEnum.QUEUED.getStatus().equals(currentStatus)) { + log.warn("直接派单被 FOR UPDATE 拒绝且工单已在队列中,保持 QUEUED 等待下一轮: orderId={}, assigneeId={}", + context.getOrderId(), assigneeId); + return DispatchResult.fail("并发冲突,已留在队列等待: " + result.getMessage()); + } + log.warn("直接派单被 FOR UPDATE 拒绝,降级为入队: orderId={}, assigneeId={}, reason={}", + context.getOrderId(), assigneeId, result.getMessage()); + return executeEnqueueOnly(context, assigneeId); + } + + return DispatchResult.fail("直接派单失败: " + result.getMessage()); } /** @@ -427,8 +479,15 @@ public class DispatchEngineImpl implements DispatchEngine { .reason("自动推送等待任务") .build(); - orderLifecycleManager.dispatch(dispatchRequest); - log.info("已推送等待任务: taskId={}", firstWaiting.getId()); + OrderTransitionResult pushResult = orderLifecycleManager.dispatch(dispatchRequest); + if (pushResult.isSuccess()) { + log.info("已推送等待任务: taskId={}", firstWaiting.getId()); + } else { + // 可能被 dispatch() 里的 FOR UPDATE 拒绝:此处不中断新任务入队流程, + // 但要把"推送失败"清晰落在日志里,避免 "已推送" 说谎误导运维排查。 + log.warn("推送等待任务失败,继续执行新任务入队: taskId={}, orderId={}, error={}", + firstWaiting.getId(), firstWaiting.getOpsOrderId(), pushResult.getMessage()); + } } // 新任务入队 diff --git a/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/core/event/OrderTransitionAttemptedEvent.java b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/core/event/OrderTransitionAttemptedEvent.java new file mode 100644 index 00000000..6c3f112b --- /dev/null +++ b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/core/event/OrderTransitionAttemptedEvent.java @@ -0,0 +1,85 @@ +package com.viewsh.module.ops.core.event; + +import com.viewsh.module.ops.core.lifecycle.model.TransitionErrorCode; +import com.viewsh.module.ops.enums.OperatorTypeEnum; +import com.viewsh.module.ops.enums.WorkOrderStatusEnum; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.time.LocalDateTime; + +/** + * 工单状态转换"尝试"领域事件 + *

+ * 与 {@link OrderStateChangedEvent} 的区别: + *

    + *
  • {@code OrderStateChangedEvent} 仅在状态转换 成功 后发布(EventPublishHandler), + * 订阅方是业务层监听器(TTS 播报、设备状态同步等)。
  • + *
  • {@code OrderTransitionAttemptedEvent} 在每一次 transition 尝试时都发布——成功、失败、 + * FOR UPDATE 被拒 都发。订阅方是审计日志,用于打穿事务回滚造成的审计断链 + * (rollback 场景下 ops_order_event 无记录,bus_log 需独立事务补齐)。
  • + *
+ * 事务边界: + *
    + *
  • 发布方在主事务内 {@code publishEvent},事件会被 Spring 挂在当前事务的 synchronization 上。
  • + *
  • 订阅方用 {@code @TransactionalEventListener(AFTER_COMMIT)} 或 {@code AFTER_ROLLBACK} + * 分别处理 commit 与 rollback 场景,保证两种结果都留痕。
  • + *
+ * + * @author lzh + */ +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class OrderTransitionAttemptedEvent { + + /** 工单ID */ + private Long orderId; + + /** 工单类型(CLEAN / SECURITY / REPAIR / SERVICE) */ + private String orderType; + + /** 工单编号(冗余,便于日志检索) */ + private String orderCode; + + /** 原状态(查询时的当前状态) */ + private WorkOrderStatusEnum fromStatus; + + /** 目标状态 */ + private WorkOrderStatusEnum targetStatus; + + /** 执行人ID */ + private Long assigneeId; + + /** 操作人类型 */ + private OperatorTypeEnum operatorType; + + /** 操作人ID */ + private Long operatorId; + + /** 原因/备注 */ + private String reason; + + /** + * 发布时的"声明结果"。 + *

+ * 注意:这是发布瞬间的判断;如果后续 handler 抛异常导致整个事务 rollback, + * 监听器在 {@code AFTER_ROLLBACK} 阶段应强制将其视为失败。 + */ + private boolean success; + + /** 失败错误码(success=false 时有值) */ + private TransitionErrorCode errorCode; + + /** 失败原因(简要消息,success=false 时有值) */ + private String errorMessage; + + /** 异常摘要(success=false 且存在异常时有值,只保留 class + message,不带堆栈) */ + private String causeSummary; + + /** 事件时间 */ + private LocalDateTime attemptedAt; +} diff --git a/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/core/lifecycle/OrderLifecycleManagerImpl.java b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/core/lifecycle/OrderLifecycleManagerImpl.java index 3864f49f..4770afa7 100644 --- a/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/core/lifecycle/OrderLifecycleManagerImpl.java +++ b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/core/lifecycle/OrderLifecycleManagerImpl.java @@ -1,5 +1,6 @@ package com.viewsh.module.ops.core.lifecycle; +import com.viewsh.module.ops.core.event.OrderTransitionAttemptedEvent; import com.viewsh.module.ops.core.lifecycle.handler.EventPublishHandler; import com.viewsh.module.ops.core.lifecycle.handler.QueueSyncHandler; import com.viewsh.module.ops.core.lifecycle.handler.StateTransitionHandler; @@ -7,6 +8,10 @@ import com.viewsh.module.ops.core.lifecycle.handler.TransitionHandler; import com.viewsh.module.ops.core.lifecycle.model.OrderTransitionRequest; import com.viewsh.module.ops.core.lifecycle.model.OrderTransitionResult; import com.viewsh.module.ops.core.lifecycle.model.TransitionContext; +import com.viewsh.module.ops.core.lifecycle.model.TransitionErrorCode; +import org.springframework.context.ApplicationEventPublisher; + +import java.time.LocalDateTime; import com.viewsh.module.ops.dal.dataobject.workorder.OpsOrderDO; import com.viewsh.module.ops.dal.mysql.workorder.OpsOrderMapper; import com.viewsh.module.ops.enums.OperatorTypeEnum; @@ -62,6 +67,9 @@ public class OrderLifecycleManagerImpl implements OrderLifecycleManager { @Resource private EventLogRecorder eventLogRecorder; + @Resource + private ApplicationEventPublisher applicationEventPublisher; + /** * 责任链处理器 */ @@ -101,10 +109,15 @@ public class OrderLifecycleManagerImpl implements OrderLifecycleManager { // 4. 检查结果 if (context.hasError()) { log.error("状态转换失败: orderId={}, error={}", order.getId(), context.getErrorMessage()); + publishAttempt(order, oldStatus, request, false, + TransitionErrorCode.INVALID_TRANSITION, + context.getErrorMessage(), + summarizeThrowable(context.getCause())); return OrderTransitionResult.fail(order.getId(), context.getErrorMessage()); } log.info("状态转换成功: orderId={}, {} -> {}", order.getId(), oldStatus, request.getTargetStatus()); + publishAttempt(order, oldStatus, request, true, null, null, null); return OrderTransitionResult.builder() .success(true) .orderId(order.getId()) @@ -142,6 +155,35 @@ public class OrderLifecycleManagerImpl implements OrderLifecycleManager { // 设置目标状态 request.setTargetStatus(WorkOrderStatusEnum.DISPATCHED); + // 业务不变量:同一执行人在任一时刻最多只能有 1 条活跃工单 + // (DISPATCHED/CONFIRMED/ARRIVED)。PAUSED 不纳入——P0 打断恢复走的就是 + // PAUSED→DISPATCHED,此处放行。对命中行加 FOR UPDATE,配合 @Transactional + // 串行化并发派发;命中则本次派发被拒,由调用方决定降级策略 + // (DispatchEngineImpl.executeDirectDispatch 会降级为入队)。 + if (request.getAssigneeId() != null) { + java.util.List activeOrders = opsOrderMapper.selectActiveByAssigneeForUpdate( + request.getAssigneeId(), request.getOrderId()); + if (!activeOrders.isEmpty()) { + OpsOrderDO head = activeOrders.get(0); + String msg = "执行人已有活跃工单: orderId=" + head.getId() + ", status=" + head.getStatus(); + log.warn("派发被拒:执行人已有活跃工单: assigneeId={}, requestOrderId={}, activeCount={}, sampleOrderId={}, sampleStatus={}", + request.getAssigneeId(), request.getOrderId(), + activeOrders.size(), head.getId(), head.getStatus()); + + // 审计:记录"派发被拒"尝试,AFTER_COMMIT 监听器会写 bus_log + OpsOrderDO subject = opsOrderMapper.selectById(request.getOrderId()); + WorkOrderStatusEnum fromStatus = subject != null + ? WorkOrderStatusEnum.valueOf(subject.getStatus()) : null; + publishAttempt(subject != null ? subject : head, fromStatus, request, false, + TransitionErrorCode.ASSIGNEE_HAS_ACTIVE_ORDER, msg, null); + + return OrderTransitionResult.fail( + request.getOrderId(), + msg, + TransitionErrorCode.ASSIGNEE_HAS_ACTIVE_ORDER); + } + } + // 派单时更新工单的 assigneeId(从 PENDING -> DISPATCHED) if (request.getAssigneeId() != null) { OpsOrderDO order = opsOrderMapper.selectById(request.getOrderId()); @@ -188,17 +230,22 @@ public class OrderLifecycleManagerImpl implements OrderLifecycleManager { public void resumeOrder(Long orderId, Long operatorId) { log.info("开始恢复工单: orderId={}, operatorId={}", orderId, operatorId); - // 构建请求 + // 取出工单自身的 assigneeId 透传给 dispatch,使其 FOR UPDATE 不变量检查生效—— + // 否则 P0 恢复与并发派发竞争时可能再出现"同一 assignee 两条 DISPATCHED"。 + // assigneeId == null 的异常态(工单已卸人)下 dispatch 会跳过该检查,行为退化为原 transition。 + OpsOrderDO order = opsOrderMapper.selectById(orderId); + Long assigneeId = order != null ? order.getAssigneeId() : null; + OrderTransitionRequest request = OrderTransitionRequest.builder() .orderId(orderId) .targetStatus(WorkOrderStatusEnum.DISPATCHED) + .assigneeId(assigneeId) .operatorType(OperatorTypeEnum.CLEANER) .operatorId(operatorId) .reason("恢复工单") .build(); - // 执行状态转换 - OrderTransitionResult result = transition(request); + OrderTransitionResult result = dispatch(request); if (!result.isSuccess()) { throw new IllegalStateException("恢复工单失败: " + result.getMessage()); @@ -409,4 +456,49 @@ public class OrderLifecycleManagerImpl implements OrderLifecycleManager { || WorkOrderStatusEnum.ARRIVED == status; } + /** + * 发布状态转换尝试事件,覆盖成功、普通失败、并发冲突三种情况。 + * 订阅方 {@code OrderTransitionAuditListener} 在 AFTER_COMMIT/AFTER_ROLLBACK + * 阶段落 bus_log,保证事务回滚不断链。 + */ + private void publishAttempt(OpsOrderDO order, WorkOrderStatusEnum fromStatus, + OrderTransitionRequest request, boolean success, + TransitionErrorCode errorCode, String errorMessage, + String causeSummary) { + try { + OrderTransitionAttemptedEvent event = OrderTransitionAttemptedEvent.builder() + .orderId(order != null ? order.getId() : request.getOrderId()) + .orderType(order != null ? order.getOrderType() : null) + .orderCode(order != null ? order.getOrderCode() : null) + .fromStatus(fromStatus) + .targetStatus(request.getTargetStatus()) + .assigneeId(request.getAssigneeId()) + .operatorType(request.getOperatorType()) + .operatorId(request.getOperatorId()) + .reason(request.getReason()) + .success(success) + .errorCode(errorCode) + .errorMessage(errorMessage) + .causeSummary(causeSummary) + .attemptedAt(LocalDateTime.now()) + .build(); + applicationEventPublisher.publishEvent(event); + } catch (Exception e) { + // 审计事件发布失败不应影响主流程 + log.error("发布转换尝试事件失败: orderId={}, targetStatus={}", + request.getOrderId(), request.getTargetStatus(), e); + } + } + + /** + * 摘要异常:只保留类名 + message,不带堆栈,防止 bus_log 爆炸。 + */ + private String summarizeThrowable(Throwable t) { + if (t == null) { + return null; + } + String msg = t.getMessage(); + return t.getClass().getSimpleName() + (msg != null ? ": " + msg : ""); + } + } diff --git a/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/core/lifecycle/audit/OrderTransitionAuditListener.java b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/core/lifecycle/audit/OrderTransitionAuditListener.java new file mode 100644 index 00000000..bb10af99 --- /dev/null +++ b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/core/lifecycle/audit/OrderTransitionAuditListener.java @@ -0,0 +1,173 @@ +package com.viewsh.module.ops.core.lifecycle.audit; + +import com.viewsh.module.ops.core.event.OrderTransitionAttemptedEvent; +import com.viewsh.module.ops.core.lifecycle.model.TransitionErrorCode; +import com.viewsh.module.ops.infrastructure.log.enumeration.EventDomain; +import com.viewsh.module.ops.infrastructure.log.enumeration.EventLevel; +import com.viewsh.module.ops.infrastructure.log.enumeration.LogModule; +import com.viewsh.module.ops.infrastructure.log.enumeration.LogType; +import com.viewsh.module.ops.infrastructure.log.recorder.EventLogRecord; +import com.viewsh.module.ops.infrastructure.log.recorder.EventLogRecorder; +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; +import org.springframework.transaction.event.TransactionPhase; +import org.springframework.transaction.event.TransactionalEventListener; + +import java.util.HashMap; +import java.util.Map; + +/** + * 工单状态转换尝试审计监听器。 + *

+ * 闭环设计: + *

    + *
  • AFTER_COMMIT:主事务成功提交,按事件本身的 success 标志写 bus_log。
  • + *
  • AFTER_ROLLBACK:主事务已回滚——事件里的数据(ops_order_event 等)全部消失。 + * 此时必须新开一个独立事务写 bus_log,否则审计链断裂。
  • + *
+ *

+ * 字段归位: + *

    + *
  • {@code eventLevel}:成功=INFO;失败=WARN(冲突被拒)或 ERROR(状态机异常)
  • + *
  • {@code eventDomain}:统一用 DISPATCH(派发域),便于运维按域聚合
  • + *
  • {@code eventType}:成功→业务 LogType(如 ORDER_DISPATCHED);失败→TRANSITION_FAILED + * 或 DISPATCH_REJECTED
  • + *
  • {@code eventPayload}:errorCode / fromStatus / targetStatus / operatorType / reason / cause
  • + *
+ * + * @author lzh + */ +@Slf4j +@Component +public class OrderTransitionAuditListener { + + @Resource + private EventLogRecorder eventLogRecorder; + + /** + * 主事务已提交:照事件声明写一条审计日志。 + *

+ * fallbackExecution=true:在无事务上下文时也执行(如测试、跨线程补写场景)。 + */ + @TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT, fallbackExecution = true) + public void onAfterCommit(OrderTransitionAttemptedEvent event) { + try { + eventLogRecorder.recordSync(toRecord(event, /*rolledBack=*/false)); + } catch (Exception e) { + log.error("[TransitionAudit] AFTER_COMMIT 写 bus_log 失败: orderId={}, success={}, errorCode={}", + event.getOrderId(), event.isSuccess(), event.getErrorCode(), e); + } + } + + /** + * 主事务已回滚:无论事件里声称 success 与否,这次"尝试"都**实际未落库**。 + * 必须开独立事务写 bus_log,否则日志也会因同事务回滚而丢失。 + */ + @TransactionalEventListener(phase = TransactionPhase.AFTER_ROLLBACK) + public void onAfterRollback(OrderTransitionAttemptedEvent event) { + writeRollbackAudit(event); + } + + /** + * 写入"事务已回滚"的审计记录。 + *

+ * 不加 @Transactional:AFTER_ROLLBACK 阶段主事务已彻底结束,当前线程无活跃事务; + * 且本方法由 onAfterRollback 自调用,Spring 代理不会拦截,加注解也是死注解。 + * 实际行为:eventLogRecorder.recordSync 的 insert 在 auto-commit 模式下单条提交, + * 失败只丢这一行审计、不影响主业务(主业务早已回滚并报错给调用方)。 + */ + public void writeRollbackAudit(OrderTransitionAttemptedEvent event) { + try { + eventLogRecorder.recordSync(toRecord(event, /*rolledBack=*/true)); + } catch (Exception e) { + log.error("[TransitionAudit] AFTER_ROLLBACK 写 bus_log 失败: orderId={}, targetStatus={}", + event.getOrderId(), event.getTargetStatus(), e); + } + } + + // ==================== 私有映射方法 ==================== + + private EventLogRecord toRecord(OrderTransitionAttemptedEvent event, boolean rolledBack) { + // rolledBack=true 时强制视为失败:即便发布时声明 success=true, + // 事务 rollback 说明写入未真正生效。 + boolean success = event.isSuccess() && !rolledBack; + + EventLevel level = success ? EventLevel.INFO + : (event.getErrorCode() == TransitionErrorCode.ASSIGNEE_HAS_ACTIVE_ORDER + ? EventLevel.WARN : EventLevel.ERROR); + + String eventTypeCode = resolveEventTypeCode(event, success, rolledBack); + + Map payload = new HashMap<>(); + payload.put("fromStatus", event.getFromStatus() != null ? event.getFromStatus().getStatus() : null); + payload.put("targetStatus", event.getTargetStatus() != null ? event.getTargetStatus().getStatus() : null); + payload.put("operatorType", event.getOperatorType() != null ? event.getOperatorType().getType() : null); + payload.put("reason", event.getReason()); + payload.put("success", success); + payload.put("rolledBack", rolledBack); + if (event.getErrorCode() != null) { + payload.put("errorCode", event.getErrorCode().name()); + } + if (event.getErrorMessage() != null) { + payload.put("errorMessage", event.getErrorMessage()); + } + if (event.getCauseSummary() != null) { + payload.put("cause", event.getCauseSummary()); + } + if (event.getOrderCode() != null) { + payload.put("orderCode", event.getOrderCode()); + } + + String message = buildMessage(event, success, rolledBack); + + return EventLogRecord.builder() + .module(LogModule.fromOrderType(event.getOrderType())) + .domain(EventDomain.DISPATCH) + .eventType(eventTypeCode) + .level(level) + .message(message) + .targetId(event.getOrderId()) + .targetType("order") + .deviceId(event.getAssigneeId()) + .personId(event.getOperatorId()) + .payload(payload) + .eventTime(event.getAttemptedAt()) + .build(); + } + + private String resolveEventTypeCode(OrderTransitionAttemptedEvent event, boolean success, boolean rolledBack) { + if (!success && event.getErrorCode() == TransitionErrorCode.ASSIGNEE_HAS_ACTIVE_ORDER) { + return LogType.DISPATCH_REJECTED.getCode(); + } + if (!success) { + return LogType.TRANSITION_FAILED.getCode(); + } + // 成功场景:按目标状态映射到业务 LogType;ops_order_event 已有时间轴, + // 这里 bus_log 仅作宽表镜像,便于运维按 domain/module 聚合查询。 + if (event.getTargetStatus() == null) { + return LogType.SYSTEM_EVENT.getCode(); + } + return switch (event.getTargetStatus()) { + case QUEUED -> LogType.ORDER_QUEUED.getCode(); + case DISPATCHED -> LogType.ORDER_DISPATCHED.getCode(); + case CONFIRMED -> LogType.ORDER_CONFIRM.getCode(); + case ARRIVED -> LogType.ORDER_ARRIVED.getCode(); + case PAUSED -> LogType.ORDER_PAUSED.getCode(); + case COMPLETED -> LogType.ORDER_COMPLETED.getCode(); + case CANCELLED -> LogType.ORDER_CANCELLED.getCode(); + default -> LogType.SYSTEM_EVENT.getCode(); + }; + } + + private String buildMessage(OrderTransitionAttemptedEvent event, boolean success, boolean rolledBack) { + String from = event.getFromStatus() != null ? event.getFromStatus().getStatus() : "?"; + String to = event.getTargetStatus() != null ? event.getTargetStatus().getStatus() : "?"; + if (success) { + return String.format("状态转换成功: %s -> %s", from, to); + } + String prefix = rolledBack ? "状态转换回滚" : "状态转换失败"; + String detail = event.getErrorMessage() != null ? event.getErrorMessage() : ""; + return String.format("%s: %s -> %s %s", prefix, from, to, detail).trim(); + } +} diff --git a/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/core/lifecycle/model/OrderTransitionResult.java b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/core/lifecycle/model/OrderTransitionResult.java index e32a9c91..66e81d66 100644 --- a/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/core/lifecycle/model/OrderTransitionResult.java +++ b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/core/lifecycle/model/OrderTransitionResult.java @@ -47,6 +47,14 @@ public class OrderTransitionResult { */ private Long queueId; + /** + * 失败错误码(仅 success=false 时有值) + *

+ * 调用方可据此区分需降级的失败(如 ASSIGNEE_HAS_ACTIVE_ORDER)与硬失败, + * 未显式设置时默认为 {@link TransitionErrorCode#OTHER}。 + */ + private TransitionErrorCode errorCode; + /** * 成功结果 */ @@ -81,6 +89,7 @@ public class OrderTransitionResult { return OrderTransitionResult.builder() .success(false) .message(message) + .errorCode(TransitionErrorCode.OTHER) .build(); } @@ -92,6 +101,19 @@ public class OrderTransitionResult { .success(false) .orderId(orderId) .message(message) + .errorCode(TransitionErrorCode.OTHER) + .build(); + } + + /** + * 失败结果(带工单ID 和错误码) + */ + public static OrderTransitionResult fail(Long orderId, String message, TransitionErrorCode errorCode) { + return OrderTransitionResult.builder() + .success(false) + .orderId(orderId) + .message(message) + .errorCode(errorCode) .build(); } } diff --git a/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/core/lifecycle/model/TransitionErrorCode.java b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/core/lifecycle/model/TransitionErrorCode.java new file mode 100644 index 00000000..0ae7fac1 --- /dev/null +++ b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/core/lifecycle/model/TransitionErrorCode.java @@ -0,0 +1,35 @@ +package com.viewsh.module.ops.core.lifecycle.model; + +/** + * 状态转换失败的错误码 + *

+ * 用于调用方区分可恢复/需降级的失败场景(如并发冲突)与真正的硬失败(状态机非法转换等), + * 避免把"可降级"的结果误当成硬错误直接向用户暴露。 + * + * @author lzh + */ +public enum TransitionErrorCode { + + /** + * 执行人已有活跃工单(DISPATCHED/CONFIRMED/ARRIVED),不应再派发。 + *

+ * 发生在 OrderLifecycleManager.dispatch 入口的 FOR UPDATE 兜底检查命中时。 + * 调用方应将工单降级到 QUEUED(入队等待下一轮动态派发),避免 PENDING 状态悬空。 + */ + ASSIGNEE_HAS_ACTIVE_ORDER, + + /** + * 状态机不允许此转换(非法的状态流转) + */ + INVALID_TRANSITION, + + /** + * 工单不存在 + */ + ORDER_NOT_FOUND, + + /** + * 其他失败(无特定归类) + */ + OTHER; +} diff --git a/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/dal/mysql/queue/OpsOrderQueueMapper.java b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/dal/mysql/queue/OpsOrderQueueMapper.java index f6ed6f62..a25c207d 100644 --- a/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/dal/mysql/queue/OpsOrderQueueMapper.java +++ b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/dal/mysql/queue/OpsOrderQueueMapper.java @@ -54,7 +54,7 @@ public interface OpsOrderQueueMapper extends BaseMapperX { } /** - * 根据用户ID查询队列列表 + * 根据用户ID查询队列列表(含历史 REMOVED 记录,通常用于审计/统计) */ default List selectListByUserId(Long userId) { return selectList(new LambdaQueryWrapperX() @@ -62,6 +62,19 @@ public interface OpsOrderQueueMapper extends BaseMapperX { .orderByDesc(OpsOrderQueueDO::getEnqueueTime)); } + /** + * 根据用户ID查询活跃队列列表(仅 WAITING/PROCESSING/PAUSED,排除 REMOVED/已终态) + *

+ * 同步到 Redis、计算队列长度、查询当前任务等场景应走此方法,避免 + * 将历史 REMOVED 记录同步到 Redis 造成 ZSet / Hash 膨胀。 + */ + default List selectActiveListByUserId(Long userId) { + return selectList(new LambdaQueryWrapperX() + .eq(OpsOrderQueueDO::getUserId, userId) + .in(OpsOrderQueueDO::getQueueStatus, "WAITING", "PROCESSING", "PAUSED") + .orderByDesc(OpsOrderQueueDO::getEnqueueTime)); + } + /** * 根据用户ID和状态查询队列列表 * 用于强制从 MySQL 读取最新数据 diff --git a/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/dal/mysql/workorder/OpsOrderMapper.java b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/dal/mysql/workorder/OpsOrderMapper.java index d3c7b436..b59d0637 100644 --- a/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/dal/mysql/workorder/OpsOrderMapper.java +++ b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/dal/mysql/workorder/OpsOrderMapper.java @@ -92,6 +92,80 @@ public interface OpsOrderMapper extends BaseMapperX { .last("LIMIT 1")); } + /** + * 查询执行人名下尚未结束的工单(DISPATCHED/CONFIRMED/ARRIVED/PAUSED) + *

+ * 用于 autoDispatchNext 等调度入口的空闲校验:若该执行人仍挂着活跃工单, + * 则不应再派发新任务,避免"越清越多"的级联派发。 + * + * @param assigneeId 执行人ID(工牌设备ID) + * @param excludeOrderId 需要排除的工单ID(通常是刚完成/取消触发本次调度的工单),可传 null + * @return 活跃工单列表,按创建时间升序 + */ + default List selectActiveByAssignee(Long assigneeId, Long excludeOrderId) { + return selectList(new LambdaQueryWrapperX() + .eq(OpsOrderDO::getAssigneeId, assigneeId) + .in(OpsOrderDO::getStatus, + WorkOrderStatusEnum.DISPATCHED.getStatus(), + WorkOrderStatusEnum.CONFIRMED.getStatus(), + WorkOrderStatusEnum.ARRIVED.getStatus(), + WorkOrderStatusEnum.PAUSED.getStatus()) + .ne(excludeOrderId != null, OpsOrderDO::getId, excludeOrderId) + .orderByAsc(OpsOrderDO::getCreateTime)); + } + + /** + * 查询执行人名下"正在执行"的工单,并对命中行加行锁(SELECT ... FOR UPDATE) + *

+ * 与 {@link #selectActiveByAssignee} 的区别: + *

    + *
  • 不含 PAUSED——PAUSED 代表 P0 打断后挂起的旧任务,不占用"当前时间片", + * 派发时(如 P0 结束后恢复)不应被它阻塞
  • + *
  • 结果行加 FOR UPDATE 排他锁,用于 dispatch 入口做业务不变量校验: + * "同一执行人在任一时刻最多只能有 1 条活跃工单"。
  • + *
+ * 必须在事务中调用,否则锁无意义。 + * + * @param assigneeId 执行人ID + * @param excludeOrderId 排除的工单ID(通常是本次正在派发的工单本身) + * @return 命中的活跃工单列表(通常空列表表示可派发) + */ + default List selectActiveByAssigneeForUpdate(Long assigneeId, Long excludeOrderId) { + return selectList(new LambdaQueryWrapperX() + .eq(OpsOrderDO::getAssigneeId, assigneeId) + .in(OpsOrderDO::getStatus, + WorkOrderStatusEnum.DISPATCHED.getStatus(), + WorkOrderStatusEnum.CONFIRMED.getStatus(), + WorkOrderStatusEnum.ARRIVED.getStatus()) + .ne(excludeOrderId != null, OpsOrderDO::getId, excludeOrderId) + .orderByAsc(OpsOrderDO::getCreateTime) + .last("FOR UPDATE")); + } + + /** + * 查询执行人最近一条已完成工单的区域(用于楼层基准兜底) + *

+ * 用途:{@code OrderQueueServiceEnhanced.resolveBaselineAreaId} 的二级兜底。 + * 当执行人当前没有 PROCESSING 工单时(短暂空闲),用最近完成的那一单的 + * 区域作为"物理位置推断",保证楼层差评分在空闲期仍然生效。 + *

+ * 时间窗:通过 {@code since} 过滤,超过窗口仍空闲则认为轨迹失效, + * 返回 null 让调用方降级到更外层的兜底(fallbackAreaId 或无楼层模式)。 + * + * @param assigneeId 执行人ID + * @param since 只考虑 updateTime 晚于此时间的工单(如 now - 24h) + * @return 最近一条 COMPLETED 工单的 areaId;无匹配返回 null + */ + default Long selectLatestCompletedAreaIdByAssignee(Long assigneeId, LocalDateTime since) { + OpsOrderDO order = selectOne(new LambdaQueryWrapperX() + .eq(OpsOrderDO::getAssigneeId, assigneeId) + .eq(OpsOrderDO::getStatus, WorkOrderStatusEnum.COMPLETED.getStatus()) + .ge(since != null, OpsOrderDO::getUpdateTime, since) + .orderByDesc(OpsOrderDO::getUpdateTime) + .last("LIMIT 1")); + return order != null ? order.getAreaId() : null; + } + // ==================== 统计聚合查询 ==================== /** diff --git a/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/infrastructure/log/enumeration/LogType.java b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/infrastructure/log/enumeration/LogType.java index aba71be4..9e5584f7 100644 --- a/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/infrastructure/log/enumeration/LogType.java +++ b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/infrastructure/log/enumeration/LogType.java @@ -47,7 +47,14 @@ public enum LogType { COMPLETE_SUPPRESSED_INVALID("COMPLETE_SUPPRESSED_INVALID", "作业时长不足抑制"), BEACON_COMPLETE_REQUESTED("BEACON_COMPLETE_REQUESTED", "信号丢失自动完成请求"), TTS_REQUEST("TTS_REQUEST", "语音播报请求"), - ARRIVE_REJECTED("ARRIVE_REJECTED", "到岗请求被拒绝"); + ARRIVE_REJECTED("ARRIVE_REJECTED", "到岗请求被拒绝"), + + // ========== 状态机转换闭环审计 ========== + + /** 状态转换尝试失败(状态机异常、handler 抛错等) */ + TRANSITION_FAILED("TRANSITION_FAILED", "状态转换失败"), + /** 派发被 FOR UPDATE 拒绝(同 assignee 已有活跃工单) */ + DISPATCH_REJECTED("DISPATCH_REJECTED", "派发被拒绝"); private static final Map CODE_MAP = new HashMap<>(); diff --git a/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/infrastructure/log/recorder/EventLogRecorder.java b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/infrastructure/log/recorder/EventLogRecorder.java index 7440065c..f044d14a 100644 --- a/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/infrastructure/log/recorder/EventLogRecorder.java +++ b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/infrastructure/log/recorder/EventLogRecorder.java @@ -25,6 +25,17 @@ public interface EventLogRecorder { */ void recordAsync(EventLogRecord record); + /** + * 记录事件日志(同步) + *

+ * 需要确保日志真正落库的场景使用(如 AFTER_COMMIT 审计、事务回滚场景补写)。 + * 调用方负责事务边界:本方法内部不开启事务,MyBatis 的 insert 会按当前线程的 + * 事务上下文执行;若无活跃事务则自动单条提交。 + * + * @param record 日志记录 + */ + void recordSync(EventLogRecord record); + // ==================== 便捷方法:按级别记录 ==================== /** diff --git a/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/infrastructure/log/recorder/EventLogRecorderImpl.java b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/infrastructure/log/recorder/EventLogRecorderImpl.java index ea12055b..92bab9f0 100644 --- a/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/infrastructure/log/recorder/EventLogRecorderImpl.java +++ b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/infrastructure/log/recorder/EventLogRecorderImpl.java @@ -58,6 +58,7 @@ public class EventLogRecorderImpl implements EventLogRecorder { *

* 用于需要确认日志写入成功的场景(如测试、关键业务) */ + @Override public void recordSync(EventLogRecord record) { doRecord(record); } diff --git a/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/queue/OrderQueueServiceEnhanced.java b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/queue/OrderQueueServiceEnhanced.java index aeb3fc2b..216a37a2 100644 --- a/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/queue/OrderQueueServiceEnhanced.java +++ b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/queue/OrderQueueServiceEnhanced.java @@ -115,6 +115,9 @@ public class OrderQueueServiceEnhanced implements OrderQueueService { // TODO: 触发紧急派单流程(在派单引擎中实现) } + // 5. 事务提交后按全局楼层重排一次:新入队工单立即按楼层差参与排序,不等下一次 rebuild + triggerQueueRebuildAfterCommit(userId, null); + return queueDO.getId(); } @@ -467,7 +470,8 @@ public class OrderQueueServiceEnhanced implements OrderQueueService { } // 2. Redis 未命中,从 MySQL 获取并同步到 Redis - List mysqlList = orderQueueMapper.selectListByUserId(userId); + // 只同步活跃态(WAITING/PROCESSING/PAUSED),排除 REMOVED 历史记录,避免 Redis 膨胀 + List mysqlList = orderQueueMapper.selectActiveListByUserId(userId); if (mysqlList != null && !mysqlList.isEmpty()) { // 同步到 Redis List dtoList = convertToDTO(mysqlList); @@ -511,10 +515,31 @@ public class OrderQueueServiceEnhanced implements OrderQueueService { Integer baseFloorNo = resolveFloorNo(baselineAreaId); LocalDateTime now = LocalDateTime.now(); + // 批量装载 orders + areas,消除 N+1:100 条 WAITING 从 200 次 SELECT 降为 2 次。 + List orderIds = waitingQueues.stream() + .map(OpsOrderQueueDO::getOpsOrderId) + .filter(Objects::nonNull) + .distinct() + .collect(Collectors.toList()); + Map orderIdToAreaId = orderIds.isEmpty() + ? Collections.emptyMap() + : orderMapper.selectBatchIds(orderIds).stream() + .filter(o -> o.getAreaId() != null) + .collect(Collectors.toMap(OpsOrderDO::getId, OpsOrderDO::getAreaId, + (a, b) -> a)); + List areaIds = orderIdToAreaId.values().stream().distinct().collect(Collectors.toList()); + Map areaIdToFloorNo = areaIds.isEmpty() + ? Collections.emptyMap() + : areaMapper.selectBatchIds(areaIds).stream() + .filter(a -> a.getFloorNo() != null) + .collect(Collectors.toMap(OpsBusAreaDO::getId, OpsBusAreaDO::getFloorNo, + (a, b) -> a)); + List rebuiltTasks = new ArrayList<>(waitingQueues.size()); for (OpsOrderQueueDO queueDO : waitingQueues) { OrderQueueDTO dto = convertToDTO(queueDO); - Integer targetFloorNo = resolveFloorNo(resolveOrderAreaId(queueDO.getOpsOrderId())); + Long targetAreaId = orderIdToAreaId.get(queueDO.getOpsOrderId()); + Integer targetFloorNo = targetAreaId != null ? areaIdToFloorNo.get(targetAreaId) : null; QueueScoreResult result = queueScoreCalculator.calculate(QueueScoreContext.builder() .priority(queueDO.getPriority()) .baseFloorNo(baseFloorNo) @@ -739,7 +764,17 @@ public class OrderQueueServiceEnhanced implements OrderQueueService { ); } + /** + * 解析楼层基准区域(三级兜底) + *

    + *
  1. 当前 PROCESSING 工单的区域——表示“正在做的楼层”
  2. + *
  3. 最近 24 小时内已完成工单的区域——投射保洁员最近的物理位置
  4. + *
  5. 调用方显式传入的 {@code fallbackAreaId}(如 autoDispatchNext 传的 completedOrder.areaId)
  6. + *
+ * 都未命中则返回 null,本次排序降级为无楼层模式。 + */ private Long resolveBaselineAreaId(Long userId, Long fallbackAreaId) { + // 一级:当前正在执行的工单 OpsOrderQueueDO processingQueue = orderQueueMapper.selectCurrentExecutingByUserId(userId); if (processingQueue != null) { Long processingAreaId = resolveOrderAreaId(processingQueue.getOpsOrderId()); @@ -747,6 +782,13 @@ public class OrderQueueServiceEnhanced implements OrderQueueService { return processingAreaId; } } + // 二级:最近 24 小时内的已完成工单,推断保洁员当前物理位置 + Long recentAreaId = orderMapper.selectLatestCompletedAreaIdByAssignee( + userId, LocalDateTime.now().minusHours(24)); + if (recentAreaId != null) { + return recentAreaId; + } + // 三级:调用方提示的区域(可为 null) return fallbackAreaId; } @@ -764,7 +806,8 @@ public class OrderQueueServiceEnhanced implements OrderQueueService { } private void syncUserQueueToRedis(Long userId, List rebuiltWaitingTasks) { - List queues = orderQueueMapper.selectListByUserId(userId); + // 只同步活跃态(WAITING/PROCESSING/PAUSED),避免把历史 REMOVED 记录回写 Redis ZSet/Hash + List queues = orderQueueMapper.selectActiveListByUserId(userId); if (queues == null || queues.isEmpty()) { redisQueueService.clearQueue(userId); return; @@ -794,6 +837,21 @@ public class OrderQueueServiceEnhanced implements OrderQueueService { redisQueueService.batchEnqueue(queueDTOs); } + /** + * 在当前事务提交后触发一次等待队列重算。 + *

+ * 事务边界说明:本方法在 afterCommit 阶段(即外层事务已提交)自调用 + * {@link #rebuildWaitingTasksByUserId(Long, Long)},此时: + *

    + *
  • 当前线程不在任何事务中(主事务刚提交完)
  • + *
  • 自调用绕过 Spring 代理,rebuild 方法上的 @Transactional 不生效
  • + *
  • 实际运行在 auto-commit 模式:每个 updateById 独立提交
  • + *
+ * 后果:rebuild 中途抛异常时 MySQL 可能半更新、Redis 可能部分写入, + * 不强一致但最终一致——下一次 enqueue 会再触发一次完整 rebuild 自愈。 + * 对“队列排序”这类可重放数据可以接受;若未来改为影响 MySQL 外表的写入, + * 需要把 rebuild 抽到独立 bean,用代理调用走新事务。 + */ private void triggerQueueRebuildAfterCommit(Long userId, Long fallbackAreaId) { Runnable rebuildAction = () -> { try { diff --git a/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/queue/QueueScoreCalculator.java b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/queue/QueueScoreCalculator.java index 5e22b1f3..bd161f51 100644 --- a/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/queue/QueueScoreCalculator.java +++ b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/queue/QueueScoreCalculator.java @@ -9,7 +9,11 @@ import java.time.LocalDateTime; public class QueueScoreCalculator { static final int PRIORITY_WEIGHT = 1500; - static final int FLOOR_WEIGHT = 60; + /** + * 楼层差权重。10 层封顶 × 100 = 1000,大于 aging 上限 720,实现"强楼层优先": + * 等满 4 小时(aging 上限)的任务也不会反超更近楼层的同优先级任务。 + */ + static final int FLOOR_WEIGHT = 100; static final int AGING_WEIGHT = 3; static final int MAX_FLOOR_DIFF = 10; static final int MAX_AGING_MINUTES = 240; @@ -22,11 +26,11 @@ public class QueueScoreCalculator { Integer targetFloorNo = context.getTargetFloorNo(); Integer floorDiff = null; int floorDiffScore = 0; + // 语义对称:只要 baseFloor 或 targetFloor 任一缺失,就视为"信息不足",不参与楼层排序(score=0)。 + // 旧逻辑会在"有 base 无 target"时打 +600 罚分,导致同一工单在保洁员忙碌/空闲时排序不单调。 if (baseFloorNo != null && targetFloorNo != null) { floorDiff = Math.abs(targetFloorNo - baseFloorNo); floorDiffScore = Math.min(floorDiff, MAX_FLOOR_DIFF) * FLOOR_WEIGHT; - } else if (baseFloorNo != null) { - floorDiffScore = MAX_FLOOR_DIFF * FLOOR_WEIGHT; } long waitMinutes = 0; diff --git a/viewsh-module-ops/viewsh-module-ops-biz/src/test/java/com/viewsh/module/ops/core/dispatch/DispatchEngineConflictFallbackTest.java b/viewsh-module-ops/viewsh-module-ops-biz/src/test/java/com/viewsh/module/ops/core/dispatch/DispatchEngineConflictFallbackTest.java new file mode 100644 index 00000000..12a3fd50 --- /dev/null +++ b/viewsh-module-ops/viewsh-module-ops-biz/src/test/java/com/viewsh/module/ops/core/dispatch/DispatchEngineConflictFallbackTest.java @@ -0,0 +1,170 @@ +package com.viewsh.module.ops.core.dispatch; + +import com.viewsh.module.ops.api.queue.OrderQueueService; +import com.viewsh.module.ops.core.dispatch.model.AssigneeRecommendation; +import com.viewsh.module.ops.core.dispatch.model.DispatchDecision; +import com.viewsh.module.ops.core.dispatch.model.DispatchPath; +import com.viewsh.module.ops.core.dispatch.model.DispatchResult; +import com.viewsh.module.ops.core.dispatch.model.OrderDispatchContext; +import com.viewsh.module.ops.core.dispatch.strategy.AssignStrategy; +import com.viewsh.module.ops.core.dispatch.strategy.ScheduleStrategy; +import com.viewsh.module.ops.core.lifecycle.OrderLifecycleManager; +import com.viewsh.module.ops.core.lifecycle.model.OrderTransitionRequest; +import com.viewsh.module.ops.core.lifecycle.model.OrderTransitionResult; +import com.viewsh.module.ops.core.lifecycle.model.TransitionErrorCode; +import com.viewsh.module.ops.dal.dataobject.workorder.OpsOrderDO; +import com.viewsh.module.ops.dal.mysql.workorder.OpsOrderMapper; +import com.viewsh.module.ops.enums.WorkOrderStatusEnum; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import java.util.List; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +/** + * 验证 Bug #2:FOR UPDATE 并发冲突降级路径。 + *

+ * 背景:OrderLifecycleManager.dispatch 入口加了 selectActiveByAssigneeForUpdate 行锁, + * 命中时返 {@link TransitionErrorCode#ASSIGNEE_HAS_ACTIVE_ORDER}。 + * DispatchEngine 需按工单当前状态分支处理: + *

    + *
  • PENDING(从未入队)→ 直接降级为入队,避免悬空
  • + *
  • QUEUED(已在队列)→ 保留排队,不做重复入队
  • + *
  • 其他错误码(如 INVALID_TRANSITION)→ 硬失败,不走降级
  • + *
+ */ +@ExtendWith(MockitoExtension.class) +class DispatchEngineConflictFallbackTest { + + @Mock + private OrderLifecycleManager orderLifecycleManager; + @Mock + private OrderQueueService orderQueueService; + @Mock + private OpsOrderMapper orderMapper; + @Mock + private AssignStrategy assignStrategy; + @Mock + private ScheduleStrategy scheduleStrategy; + + @InjectMocks + private DispatchEngineImpl dispatchEngine; + + private static final String CLEAN = "CLEAN"; + private static final Long ASSIGNEE_ID = 31L; + + @BeforeEach + void setUp() { + dispatchEngine.registerAssignStrategy(CLEAN, assignStrategy); + dispatchEngine.registerScheduleStrategy(CLEAN, scheduleStrategy); + } + + @Test + void directDispatch_onConflict_whenOrderIsPending_shouldDowngradeToEnqueue() { + // PENDING 工单派发被拒 → 降级 executeEnqueueOnly,避免工单悬空 + Long orderId = 400L; + OrderDispatchContext context = baseContext(orderId); + + stubHappyPathUntilDispatch(orderId); + when(orderLifecycleManager.dispatch(any(OrderTransitionRequest.class))) + .thenReturn(OrderTransitionResult.fail(orderId, + "同执行人已有活跃工单 999", + TransitionErrorCode.ASSIGNEE_HAS_ACTIVE_ORDER)); + when(orderMapper.selectById(orderId)).thenReturn(OpsOrderDO.builder() + .id(orderId) + .status(WorkOrderStatusEnum.PENDING.getStatus()) + .build()); + when(orderLifecycleManager.enqueue(any(OrderTransitionRequest.class))) + .thenReturn(OrderTransitionResult.success(orderId, + WorkOrderStatusEnum.PENDING, WorkOrderStatusEnum.QUEUED, 6000L)); + + DispatchResult result = dispatchEngine.dispatch(context); + + assertTrue(result.isSuccess()); + assertEquals(DispatchPath.ENQUEUE_ONLY, result.getPath()); + assertEquals(6000L, result.getQueueId()); + verify(orderLifecycleManager).enqueue(any(OrderTransitionRequest.class)); + } + + @Test + void directDispatch_onConflict_whenOrderAlreadyQueued_shouldKeepInQueue() { + // QUEUED 工单(从队列中被拉出派发)再次被拒 → 不重复入队,继续留在队列等下一轮 + Long orderId = 401L; + OrderDispatchContext context = baseContext(orderId); + + stubHappyPathUntilDispatch(orderId); + when(orderLifecycleManager.dispatch(any(OrderTransitionRequest.class))) + .thenReturn(OrderTransitionResult.fail(orderId, + "同执行人已有活跃工单 998", + TransitionErrorCode.ASSIGNEE_HAS_ACTIVE_ORDER)); + when(orderMapper.selectById(orderId)).thenReturn(OpsOrderDO.builder() + .id(orderId) + .status(WorkOrderStatusEnum.QUEUED.getStatus()) + .build()); + + DispatchResult result = dispatchEngine.dispatch(context); + + assertFalse(result.isSuccess()); + assertTrue(result.getMessage().contains("已留在队列等待"), + "冲突信息应说明工单已留在队列,实际: " + result.getMessage()); + // 关键断言:不能再调一次 enqueue,否则队列里会出现两条记录 + verify(orderLifecycleManager, never()).enqueue(any(OrderTransitionRequest.class)); + } + + @Test + void directDispatch_onGeneralFailure_shouldNotDowngrade() { + // 非并发冲突的失败(例如非法状态转换)不走降级路径,且不查 selectById + Long orderId = 402L; + OrderDispatchContext context = baseContext(orderId); + + stubHappyPathUntilDispatch(orderId); + when(orderLifecycleManager.dispatch(any(OrderTransitionRequest.class))) + .thenReturn(OrderTransitionResult.fail(orderId, + "非法状态转换", + TransitionErrorCode.INVALID_TRANSITION)); + + DispatchResult result = dispatchEngine.dispatch(context); + + assertFalse(result.isSuccess()); + assertTrue(result.getMessage().contains("直接派单失败"), + "一般失败应归类为直接派单失败,实际: " + result.getMessage()); + verify(orderLifecycleManager, never()).enqueue(any(OrderTransitionRequest.class)); + // 非冲突错误不应触发工单状态复核 + verify(orderMapper, never()).selectById(orderId); + } + + // ==================== Helpers ==================== + + private OrderDispatchContext baseContext(Long orderId) { + return OrderDispatchContext.builder() + .orderId(orderId) + .orderCode("WO-TEST-" + orderId) + .businessType(CLEAN) + .areaId(501L) + .build(); + } + + /** + * 装配 dispatch 路径上到 orderLifecycleManager.dispatch() 之前的全部 stub: + * 策略推荐成功 + 决策为 DIRECT_DISPATCH + 兜底查询 MySQL 为空闲。 + * 留给测试自己控制 orderLifecycleManager.dispatch 的返回。 + */ + private void stubHappyPathUntilDispatch(Long orderId) { + when(assignStrategy.recommend(any())).thenReturn( + AssigneeRecommendation.of(ASSIGNEE_ID, "工牌31", 80, "区域最近")); + when(scheduleStrategy.decide(any())).thenReturn(DispatchDecision.directDispatch()); + when(orderQueueService.getTasksByUserId(ASSIGNEE_ID)).thenReturn(List.of()); + when(orderMapper.selectActiveByAssignee(ASSIGNEE_ID, orderId)).thenReturn(List.of()); + } +} diff --git a/viewsh-module-ops/viewsh-module-ops-biz/src/test/java/com/viewsh/module/ops/core/dispatch/DispatchEngineIdleCheckTest.java b/viewsh-module-ops/viewsh-module-ops-biz/src/test/java/com/viewsh/module/ops/core/dispatch/DispatchEngineIdleCheckTest.java new file mode 100644 index 00000000..4c04ac10 --- /dev/null +++ b/viewsh-module-ops/viewsh-module-ops-biz/src/test/java/com/viewsh/module/ops/core/dispatch/DispatchEngineIdleCheckTest.java @@ -0,0 +1,264 @@ +package com.viewsh.module.ops.core.dispatch; + +import com.viewsh.module.ops.api.queue.OrderQueueDTO; +import com.viewsh.module.ops.api.queue.OrderQueueService; +import com.viewsh.module.ops.core.dispatch.model.AssigneeRecommendation; +import com.viewsh.module.ops.core.dispatch.model.DispatchDecision; +import com.viewsh.module.ops.core.dispatch.model.DispatchPath; +import com.viewsh.module.ops.core.dispatch.model.DispatchResult; +import com.viewsh.module.ops.core.dispatch.model.OrderDispatchContext; +import com.viewsh.module.ops.core.dispatch.strategy.AssignStrategy; +import com.viewsh.module.ops.core.dispatch.strategy.ScheduleStrategy; +import com.viewsh.module.ops.core.lifecycle.OrderLifecycleManager; +import com.viewsh.module.ops.core.lifecycle.model.OrderTransitionRequest; +import com.viewsh.module.ops.core.lifecycle.model.OrderTransitionResult; +import com.viewsh.module.ops.dal.dataobject.workorder.OpsOrderDO; +import com.viewsh.module.ops.dal.mysql.workorder.OpsOrderMapper; +import com.viewsh.module.ops.enums.WorkOrderStatusEnum; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import java.util.List; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; +import static org.mockito.Mockito.when; + +/** + * 验证 Bug #1(autoDispatchNext 空闲兜底)+ Bug #4(executeDispatch 前置检查)。 + *

+ * 产线事故:管理员 cancel 一个僵尸 DISPATCHED 单 → handleCancelled → autoDispatchNext, + * 若不校验活跃态,就会在同一设备上派发新单、旧单不死,最终 0002=CONFIRMED + 0003=DISPATCHED 并存。 + */ +@ExtendWith(MockitoExtension.class) +class DispatchEngineIdleCheckTest { + + @Mock + private OrderLifecycleManager orderLifecycleManager; + @Mock + private OrderQueueService orderQueueService; + @Mock + private OpsOrderMapper orderMapper; + @Mock + private AssignStrategy assignStrategy; + @Mock + private ScheduleStrategy scheduleStrategy; + + @InjectMocks + private DispatchEngineImpl dispatchEngine; + + private static final String CLEAN = "CLEAN"; + private static final Long ASSIGNEE_ID = 31L; + + @BeforeEach + void setUp() { + dispatchEngine.registerAssignStrategy(CLEAN, assignStrategy); + dispatchEngine.registerScheduleStrategy(CLEAN, scheduleStrategy); + } + + @Test + void autoDispatchNext_shouldSkip_whenAssigneeStillHasActiveOrder() { + // 场景:completedOrderId=100 刚被 cancel,但设备 31 名下还挂着 200L CONFIRMED 单 + Long completedOrderId = 100L; + when(orderMapper.selectActiveByAssignee(ASSIGNEE_ID, completedOrderId)) + .thenReturn(List.of(OpsOrderDO.builder() + .id(200L) + .status(WorkOrderStatusEnum.CONFIRMED.getStatus()) + .build())); + + DispatchResult result = dispatchEngine.autoDispatchNext(completedOrderId, ASSIGNEE_ID); + + assertTrue(result.isSuccess()); + assertEquals("执行人非空闲,跳过派发", result.getMessage()); + assertEquals(ASSIGNEE_ID, result.getAssigneeId()); + // 不应触发后续队列重排和派发 + verifyNoInteractions(orderQueueService); + verifyNoInteractions(orderLifecycleManager); + } + + @Test + void autoDispatchNext_shouldSkip_whenAssigneeHasPausedOrder() { + // PAUSED 也视为"仍有任务",不能派发新单(否则 PAUSED 恢复回来就和新单冲突) + Long completedOrderId = 101L; + when(orderMapper.selectActiveByAssignee(ASSIGNEE_ID, completedOrderId)) + .thenReturn(List.of(OpsOrderDO.builder() + .id(201L) + .status(WorkOrderStatusEnum.PAUSED.getStatus()) + .build())); + + DispatchResult result = dispatchEngine.autoDispatchNext(completedOrderId, ASSIGNEE_ID); + + assertTrue(result.isSuccess()); + assertEquals("执行人非空闲,跳过派发", result.getMessage()); + verifyNoInteractions(orderLifecycleManager); + } + + @Test + void autoDispatchNext_shouldReturnEarly_whenAssigneeIdIsNull() { + // 入参校验:assigneeId 空直接返回,不查活跃态 + DispatchResult result = dispatchEngine.autoDispatchNext(100L, null); + + assertTrue(result.isSuccess()); + assertEquals("缺少执行人,跳过派发", result.getMessage()); + verifyNoInteractions(orderMapper); + verifyNoInteractions(orderQueueService); + } + + @Test + void executeDispatch_shouldDowngradeDirectDispatchToEnqueue_whenMysqlShowsActive() { + // Bug #4:Redis 说设备空闲,但 MySQL 仍有活跃态 → 兜底把 DIRECT_DISPATCH 降级为 ENQUEUE_ONLY + Long orderId = 300L; + OrderDispatchContext context = baseContext(orderId); + + when(assignStrategy.recommend(any())).thenReturn( + AssigneeRecommendation.of(ASSIGNEE_ID, "工牌31", 80, "区域最近")); + when(scheduleStrategy.decide(any())).thenReturn(DispatchDecision.directDispatch()); + when(orderQueueService.getTasksByUserId(ASSIGNEE_ID)).thenReturn(List.of()); + when(orderMapper.selectActiveByAssignee(ASSIGNEE_ID, orderId)) + .thenReturn(List.of(OpsOrderDO.builder() + .id(999L) + .status(WorkOrderStatusEnum.ARRIVED.getStatus()) + .build())); + when(orderLifecycleManager.enqueue(any(OrderTransitionRequest.class))) + .thenReturn(successEnqueue(orderId, 5000L)); + + DispatchResult result = dispatchEngine.dispatch(context); + + assertTrue(result.isSuccess()); + assertEquals(DispatchPath.ENQUEUE_ONLY, result.getPath()); + assertEquals(5000L, result.getQueueId()); + verify(orderLifecycleManager, never()).dispatch(any(OrderTransitionRequest.class)); + verify(orderLifecycleManager).enqueue(any(OrderTransitionRequest.class)); + } + + @Test + void executeDispatch_shouldDowngradePushAndEnqueue_whenMysqlShowsActive() { + // PUSH_AND_ENQUEUE 路径同样要兜底:本应"推送旧队首 + 新单入队", + // 但旧队首已活跃,推送会触发 FOR UPDATE 冲突,所以直接降级为 ENQUEUE_ONLY + Long orderId = 301L; + OrderDispatchContext context = baseContext(orderId); + + when(assignStrategy.recommend(any())).thenReturn( + AssigneeRecommendation.of(ASSIGNEE_ID, "工牌31", 80, "区域最近")); + when(scheduleStrategy.decide(any())).thenReturn(DispatchDecision.pushAndEnqueue()); + when(orderQueueService.getTasksByUserId(ASSIGNEE_ID)).thenReturn(List.of()); + when(orderMapper.selectActiveByAssignee(ASSIGNEE_ID, orderId)) + .thenReturn(List.of(OpsOrderDO.builder() + .id(998L) + .status(WorkOrderStatusEnum.DISPATCHED.getStatus()) + .build())); + when(orderLifecycleManager.enqueue(any(OrderTransitionRequest.class))) + .thenReturn(successEnqueue(orderId, 5001L)); + + DispatchResult result = dispatchEngine.dispatch(context); + + assertTrue(result.isSuccess()); + assertEquals(DispatchPath.ENQUEUE_ONLY, result.getPath()); + verify(orderLifecycleManager, never()).dispatch(any(OrderTransitionRequest.class)); + verify(orderLifecycleManager).enqueue(any(OrderTransitionRequest.class)); + } + + @Test + void executeDispatch_shouldNotQueryMysql_whenPathIsEnqueueOnly() { + // ENQUEUE_ONLY 本来就不推送,无需兜底查询——避免给每一次入队都叠加一次 SQL 开销 + Long orderId = 302L; + OrderDispatchContext context = baseContext(orderId); + + when(assignStrategy.recommend(any())).thenReturn( + AssigneeRecommendation.of(ASSIGNEE_ID, "工牌31", 80, "区域最近")); + when(scheduleStrategy.decide(any())).thenReturn(DispatchDecision.enqueueOnly()); + when(orderLifecycleManager.enqueue(any(OrderTransitionRequest.class))) + .thenReturn(successEnqueue(orderId, 5002L)); + + DispatchResult result = dispatchEngine.dispatch(context); + + assertTrue(result.isSuccess()); + assertEquals(DispatchPath.ENQUEUE_ONLY, result.getPath()); + // 关键:ENQUEUE_ONLY 不应触发兜底查询 + verify(orderMapper, never()).selectActiveByAssignee(any(), any()); + } + + @Test + void executeDispatch_shouldProceedDirectDispatch_whenMysqlConfirmsIdle() { + // 反向用例:MySQL 也确认空闲 → 正常走 DIRECT_DISPATCH,不降级 + Long orderId = 303L; + OrderDispatchContext context = baseContext(orderId); + + when(assignStrategy.recommend(any())).thenReturn( + AssigneeRecommendation.of(ASSIGNEE_ID, "工牌31", 80, "区域最近")); + when(scheduleStrategy.decide(any())).thenReturn(DispatchDecision.directDispatch()); + when(orderQueueService.getTasksByUserId(ASSIGNEE_ID)).thenReturn(List.of()); + when(orderMapper.selectActiveByAssignee(ASSIGNEE_ID, orderId)).thenReturn(List.of()); + when(orderLifecycleManager.dispatch(any(OrderTransitionRequest.class))) + .thenReturn(OrderTransitionResult.success(orderId, + WorkOrderStatusEnum.PENDING, WorkOrderStatusEnum.DISPATCHED)); + + DispatchResult result = dispatchEngine.dispatch(context); + + assertFalse(!result.isSuccess()); // 确认成功 + assertEquals(DispatchPath.DIRECT_DISPATCH, result.getPath()); + verify(orderLifecycleManager).dispatch(any(OrderTransitionRequest.class)); + verify(orderLifecycleManager, never()).enqueue(any(OrderTransitionRequest.class)); + } + + // ==================== Helpers ==================== + + private OrderDispatchContext baseContext(Long orderId) { + return OrderDispatchContext.builder() + .orderId(orderId) + .orderCode("WO-TEST-" + orderId) + .businessType(CLEAN) + .areaId(501L) + .build(); + } + + private OrderTransitionResult successEnqueue(Long orderId, Long queueId) { + return OrderTransitionResult.success(orderId, + WorkOrderStatusEnum.PENDING, WorkOrderStatusEnum.QUEUED, queueId); + } + + @Test + void autoDispatchNext_whenDispatchingFromQueue_shouldGoThroughDispatchNotTransition() { + // 锁死 P1 修复:从队列派发必须走 dispatch(),以继承 Bug #2 的 FOR UPDATE 串行化防线。 + // 如果未来有人改回 transition(),本测试会红:autoDispatchNext 绕过 FOR UPDATE 的漏洞就回来了。 + Long completedOrderId = 700L; + Long waitingOrderId = 701L; + Long queueId = 800L; + + when(orderMapper.selectActiveByAssignee(ASSIGNEE_ID, completedOrderId)).thenReturn(List.of()); + when(orderMapper.selectById(completedOrderId)).thenReturn(OpsOrderDO.builder() + .id(completedOrderId).areaId(501L).build()); + OrderQueueDTO waitingDTO = new OrderQueueDTO(); + waitingDTO.setId(queueId); + waitingDTO.setOpsOrderId(waitingOrderId); + waitingDTO.setQueueScore(1000.0); + waitingDTO.setFloorDiff(1); + waitingDTO.setWaitMinutes(2L); + when(orderQueueService.rebuildWaitingTasksByUserId(ASSIGNEE_ID, 501L)) + .thenReturn(List.of(waitingDTO)); + when(orderMapper.selectById(waitingOrderId)).thenReturn(OpsOrderDO.builder() + .id(waitingOrderId) + .status(WorkOrderStatusEnum.QUEUED.getStatus()) + .build()); + when(orderLifecycleManager.dispatch(any(OrderTransitionRequest.class))) + .thenReturn(OrderTransitionResult.success(waitingOrderId, + WorkOrderStatusEnum.QUEUED, WorkOrderStatusEnum.DISPATCHED)); + + DispatchResult result = dispatchEngine.autoDispatchNext(completedOrderId, ASSIGNEE_ID); + + assertTrue(result.isSuccess()); + assertEquals("已按队列总分派发下一单", result.getMessage()); + // 关键断言:必须调 dispatch()(带 FOR UPDATE)而不是 transition()(裸责任链) + verify(orderLifecycleManager).dispatch(any(OrderTransitionRequest.class)); + verify(orderLifecycleManager, never()).transition(any(OrderTransitionRequest.class)); + } +} diff --git a/viewsh-module-ops/viewsh-module-ops-biz/src/test/java/com/viewsh/module/ops/core/lifecycle/audit/OrderTransitionAuditListenerTest.java b/viewsh-module-ops/viewsh-module-ops-biz/src/test/java/com/viewsh/module/ops/core/lifecycle/audit/OrderTransitionAuditListenerTest.java new file mode 100644 index 00000000..246d8d72 --- /dev/null +++ b/viewsh-module-ops/viewsh-module-ops-biz/src/test/java/com/viewsh/module/ops/core/lifecycle/audit/OrderTransitionAuditListenerTest.java @@ -0,0 +1,211 @@ +package com.viewsh.module.ops.core.lifecycle.audit; + +import com.viewsh.module.ops.core.event.OrderTransitionAttemptedEvent; +import com.viewsh.module.ops.core.lifecycle.model.TransitionErrorCode; +import com.viewsh.module.ops.enums.OperatorTypeEnum; +import com.viewsh.module.ops.enums.WorkOrderStatusEnum; +import com.viewsh.module.ops.infrastructure.log.enumeration.EventDomain; +import com.viewsh.module.ops.infrastructure.log.enumeration.EventLevel; +import com.viewsh.module.ops.infrastructure.log.enumeration.LogModule; +import com.viewsh.module.ops.infrastructure.log.enumeration.LogType; +import com.viewsh.module.ops.infrastructure.log.recorder.EventLogRecord; +import com.viewsh.module.ops.infrastructure.log.recorder.EventLogRecorder; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.ArgumentCaptor; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import java.time.LocalDateTime; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.verify; + +/** + * 验证 Bug #7:状态转换审计闭环。 + *

+ * 三条路径: + *

    + *
  1. AFTER_COMMIT + 成功 → INFO + 业务 LogType(如 ORDER_DISPATCHED)
  2. + *
  3. AFTER_COMMIT + 并发冲突失败 → WARN + DISPATCH_REJECTED
  4. + *
  5. AFTER_COMMIT + 一般失败 → ERROR + TRANSITION_FAILED
  6. + *
  7. AFTER_ROLLBACK → 无论事件声明 success 与否都视为失败(事务已回滚),独立事务补写
  8. + *
+ */ +@ExtendWith(MockitoExtension.class) +class OrderTransitionAuditListenerTest { + + @Mock + private EventLogRecorder eventLogRecorder; + + @InjectMocks + private OrderTransitionAuditListener listener; + + @Test + void onAfterCommit_success_shouldRecordInfoWithBusinessEventType() { + OrderTransitionAttemptedEvent event = successEvent(100L, WorkOrderStatusEnum.QUEUED, + WorkOrderStatusEnum.DISPATCHED); + + listener.onAfterCommit(event); + + EventLogRecord rec = captureRecord(); + assertEquals(EventLevel.INFO, rec.getLevel()); + assertEquals(EventDomain.DISPATCH, rec.getDomain()); + assertEquals(LogType.ORDER_DISPATCHED.getCode(), rec.getEventType()); + assertEquals(LogModule.CLEAN, rec.getModule()); + assertEquals(100L, rec.getTargetId()); + assertEquals("order", rec.getTargetType()); + assertEquals(Boolean.TRUE, rec.getPayload().get("success")); + assertEquals(Boolean.FALSE, rec.getPayload().get("rolledBack")); + assertEquals(WorkOrderStatusEnum.QUEUED.getStatus(), rec.getPayload().get("fromStatus")); + assertEquals(WorkOrderStatusEnum.DISPATCHED.getStatus(), rec.getPayload().get("targetStatus")); + } + + @Test + void onAfterCommit_success_shouldMapAllBusinessStatusesToLogType() { + // 验证关键目标状态都能映射到对应 LogType(避免回归导致全映射到 SYSTEM_EVENT) + assertLogTypeForTarget(WorkOrderStatusEnum.QUEUED, LogType.ORDER_QUEUED); + assertLogTypeForTarget(WorkOrderStatusEnum.DISPATCHED, LogType.ORDER_DISPATCHED); + assertLogTypeForTarget(WorkOrderStatusEnum.CONFIRMED, LogType.ORDER_CONFIRM); + assertLogTypeForTarget(WorkOrderStatusEnum.ARRIVED, LogType.ORDER_ARRIVED); + assertLogTypeForTarget(WorkOrderStatusEnum.PAUSED, LogType.ORDER_PAUSED); + assertLogTypeForTarget(WorkOrderStatusEnum.COMPLETED, LogType.ORDER_COMPLETED); + assertLogTypeForTarget(WorkOrderStatusEnum.CANCELLED, LogType.ORDER_CANCELLED); + } + + @Test + void onAfterCommit_forUpdateRejected_shouldRecordWarnWithDispatchRejected() { + OrderTransitionAttemptedEvent event = OrderTransitionAttemptedEvent.builder() + .orderId(200L) + .orderType("CLEAN") + .fromStatus(WorkOrderStatusEnum.PENDING) + .targetStatus(WorkOrderStatusEnum.DISPATCHED) + .assigneeId(31L) + .operatorType(OperatorTypeEnum.SYSTEM) + .success(false) + .errorCode(TransitionErrorCode.ASSIGNEE_HAS_ACTIVE_ORDER) + .errorMessage("同执行人已有活跃工单 999") + .attemptedAt(LocalDateTime.now()) + .build(); + + listener.onAfterCommit(event); + + EventLogRecord rec = captureRecord(); + // 并发冲突只是业务层拒绝,不是系统异常,所以是 WARN 而不是 ERROR + assertEquals(EventLevel.WARN, rec.getLevel()); + assertEquals(LogType.DISPATCH_REJECTED.getCode(), rec.getEventType()); + assertEquals("ASSIGNEE_HAS_ACTIVE_ORDER", rec.getPayload().get("errorCode")); + assertEquals("同执行人已有活跃工单 999", rec.getPayload().get("errorMessage")); + assertTrue(rec.getMessage().contains("状态转换失败"), + "消息应标明转换失败,实际: " + rec.getMessage()); + } + + @Test + void onAfterCommit_generalFailure_shouldRecordErrorWithTransitionFailed() { + OrderTransitionAttemptedEvent event = OrderTransitionAttemptedEvent.builder() + .orderId(300L) + .orderType("SECURITY") + .fromStatus(WorkOrderStatusEnum.PENDING) + .targetStatus(WorkOrderStatusEnum.DISPATCHED) + .success(false) + .errorCode(TransitionErrorCode.INVALID_TRANSITION) + .errorMessage("PENDING → ARRIVED 非法") + .causeSummary("IllegalStateException: PENDING → ARRIVED 非法") + .attemptedAt(LocalDateTime.now()) + .build(); + + listener.onAfterCommit(event); + + EventLogRecord rec = captureRecord(); + assertEquals(EventLevel.ERROR, rec.getLevel()); + assertEquals(LogType.TRANSITION_FAILED.getCode(), rec.getEventType()); + assertEquals(LogModule.SECURITY, rec.getModule()); + assertEquals("INVALID_TRANSITION", rec.getPayload().get("errorCode")); + assertEquals("IllegalStateException: PENDING → ARRIVED 非法", + rec.getPayload().get("cause")); + } + + @Test + void writeRollbackAudit_evenIfEventClaimsSuccess_shouldForceFailure() { + // 即便发布时声明 success=true,事务 rollback 就是没真正落库,必须按失败记录 + OrderTransitionAttemptedEvent event = OrderTransitionAttemptedEvent.builder() + .orderId(400L) + .orderType("CLEAN") + .fromStatus(WorkOrderStatusEnum.QUEUED) + .targetStatus(WorkOrderStatusEnum.DISPATCHED) + .success(true) // 发布时乐观声明 + .attemptedAt(LocalDateTime.now()) + .build(); + + listener.writeRollbackAudit(event); + + EventLogRecord rec = captureRecord(); + assertEquals(EventLevel.ERROR, rec.getLevel()); + assertEquals(LogType.TRANSITION_FAILED.getCode(), rec.getEventType()); + assertEquals(Boolean.TRUE, rec.getPayload().get("rolledBack")); + assertEquals(Boolean.FALSE, rec.getPayload().get("success")); + assertTrue(rec.getMessage().contains("状态转换回滚"), + "回滚消息应明确标注,实际: " + rec.getMessage()); + } + + @Test + void writeRollbackAudit_withForUpdateRejected_stillMapsToDispatchRejected() { + // 回滚 + 冲突错误码 → 依然归类为 DISPATCH_REJECTED(而不是 TRANSITION_FAILED) + OrderTransitionAttemptedEvent event = OrderTransitionAttemptedEvent.builder() + .orderId(500L) + .orderType("CLEAN") + .fromStatus(WorkOrderStatusEnum.PENDING) + .targetStatus(WorkOrderStatusEnum.DISPATCHED) + .success(false) + .errorCode(TransitionErrorCode.ASSIGNEE_HAS_ACTIVE_ORDER) + .attemptedAt(LocalDateTime.now()) + .build(); + + listener.writeRollbackAudit(event); + + EventLogRecord rec = captureRecord(); + // 冲突型错误即便回滚也应是 WARN + DISPATCH_REJECTED,方便运维过滤 + assertEquals(EventLevel.WARN, rec.getLevel()); + assertEquals(LogType.DISPATCH_REJECTED.getCode(), rec.getEventType()); + assertFalse((Boolean) rec.getPayload().get("success")); + } + + // ==================== Helpers ==================== + + private OrderTransitionAttemptedEvent successEvent(Long orderId, + WorkOrderStatusEnum from, + WorkOrderStatusEnum to) { + return OrderTransitionAttemptedEvent.builder() + .orderId(orderId) + .orderType("CLEAN") + .orderCode("WO-" + orderId) + .fromStatus(from) + .targetStatus(to) + .assigneeId(31L) + .operatorType(OperatorTypeEnum.SYSTEM) + .operatorId(31L) + .reason("test") + .success(true) + .attemptedAt(LocalDateTime.now()) + .build(); + } + + private void assertLogTypeForTarget(WorkOrderStatusEnum target, LogType expected) { + org.mockito.Mockito.reset(eventLogRecorder); + listener.onAfterCommit(successEvent(1000L + target.ordinal(), + WorkOrderStatusEnum.PENDING, target)); + + EventLogRecord rec = captureRecord(); + assertEquals(expected.getCode(), rec.getEventType(), + "target=" + target + " 应映射到 " + expected); + } + + private EventLogRecord captureRecord() { + ArgumentCaptor captor = ArgumentCaptor.forClass(EventLogRecord.class); + verify(eventLogRecorder).recordSync(captor.capture()); + return captor.getValue(); + } +} diff --git a/viewsh-module-ops/viewsh-module-ops-biz/src/test/java/com/viewsh/module/ops/service/queue/OrderQueueServiceEnhancedTest.java b/viewsh-module-ops/viewsh-module-ops-biz/src/test/java/com/viewsh/module/ops/service/queue/OrderQueueServiceEnhancedTest.java index 19f5885a..bdf2a12c 100644 --- a/viewsh-module-ops/viewsh-module-ops-biz/src/test/java/com/viewsh/module/ops/service/queue/OrderQueueServiceEnhancedTest.java +++ b/viewsh-module-ops/viewsh-module-ops-biz/src/test/java/com/viewsh/module/ops/service/queue/OrderQueueServiceEnhancedTest.java @@ -77,13 +77,21 @@ class OrderQueueServiceEnhancedTest { when(orderQueueMapper.selectListByUserIdAndStatus(userId, OrderQueueStatusEnum.WAITING.getStatus())) .thenReturn(List.of(olderFarTask, newerNearTask)); when(orderQueueMapper.selectCurrentExecutingByUserId(userId)).thenReturn(currentTask); - when(orderQueueMapper.selectListByUserId(userId)).thenReturn(List.of(olderFarTask, newerNearTask, currentTask)); + // syncUserQueueToRedis 走 selectActiveListByUserId(Bug#6),只返回活跃态 + when(orderQueueMapper.selectActiveListByUserId(userId)) + .thenReturn(List.of(olderFarTask, newerNearTask, currentTask)); + // resolveOrderAreaId 仍单条 selectById(PROCESSING 工单的 area) when(orderMapper.selectById(900L)).thenReturn(OpsOrderDO.builder().id(900L).areaId(501L).build()); - when(orderMapper.selectById(101L)).thenReturn(OpsOrderDO.builder().id(101L).areaId(503L).build()); - when(orderMapper.selectById(102L)).thenReturn(OpsOrderDO.builder().id(102L).areaId(502L).build()); + // WAITING 工单批量加载 + when(orderMapper.selectBatchIds(org.mockito.ArgumentMatchers.anyCollection())) + .thenReturn(List.of( + OpsOrderDO.builder().id(101L).areaId(503L).build(), + OpsOrderDO.builder().id(102L).areaId(502L).build())); when(areaMapper.selectById(501L)).thenReturn(OpsBusAreaDO.builder().id(501L).floorNo(5).build()); - when(areaMapper.selectById(502L)).thenReturn(OpsBusAreaDO.builder().id(502L).floorNo(6).build()); - when(areaMapper.selectById(503L)).thenReturn(OpsBusAreaDO.builder().id(503L).floorNo(8).build()); + when(areaMapper.selectBatchIds(org.mockito.ArgumentMatchers.anyCollection())) + .thenReturn(List.of( + OpsBusAreaDO.builder().id(503L).floorNo(8).build(), + OpsBusAreaDO.builder().id(502L).floorNo(6).build())); when(orderQueueMapper.updateById(any(OpsOrderQueueDO.class))).thenReturn(1); List rebuiltTasks = orderQueueService.rebuildWaitingTasksByUserId(userId, null); diff --git a/viewsh-module-ops/viewsh-module-ops-biz/src/test/java/com/viewsh/module/ops/service/queue/QueueScoreCalculatorEnhancedTest.java b/viewsh-module-ops/viewsh-module-ops-biz/src/test/java/com/viewsh/module/ops/service/queue/QueueScoreCalculatorEnhancedTest.java new file mode 100644 index 00000000..1604bc19 --- /dev/null +++ b/viewsh-module-ops/viewsh-module-ops-biz/src/test/java/com/viewsh/module/ops/service/queue/QueueScoreCalculatorEnhancedTest.java @@ -0,0 +1,131 @@ +package com.viewsh.module.ops.service.queue; + +import org.junit.jupiter.api.Test; + +import java.time.LocalDateTime; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * 楼层权重修复的补充测试(commit a5f916c)。 + *

+ * 与 {@link QueueScoreCalculatorTest}(基础行为)互补,覆盖这次改动的三个关键不变量: + *

    + *
  1. G 强楼层优先:FLOOR_WEIGHT=100,10 层封顶 1000 > aging 封顶 720, + * 保证等满 4 小时的任务也不会反超近楼层任务
  2. + *
  3. B 语义对称:base 或 target 任一缺失 → floorScore=0,不再有"有 base 无 target → +600"罚分
  4. + *
  5. floor 封顶:楼层差超过 MAX_FLOOR_DIFF=10 时按 10 计算
  6. + *
+ */ +class QueueScoreCalculatorEnhancedTest { + + private final QueueScoreCalculator calculator = new QueueScoreCalculator(); + + @Test + void strongFloorPriority_farLongWaitedTaskShouldNotOvertakeNearJustInTask() { + // G: 同 P1 优先级下,"远楼层+等满 4 小时" 不应反超 "近楼层+刚入队"。 + // 近刚入队: priority=1500, floor=0, aging=0 → 1500 + // 远等满: priority=1500, floor=10*100=1000, aging=720 → 1780 + // near.score (1500) < far.score (1780) → near 先派发,符合"强楼层优先" + LocalDateTime now = LocalDateTime.of(2026, 4, 20, 12, 0); + + QueueScoreResult nearJustIn = calculator.calculate(QueueScoreContext.builder() + .priority(1) + .baseFloorNo(3).targetFloorNo(3) + .enqueueTime(now).now(now) + .build()); + + QueueScoreResult farLongWaited = calculator.calculate(QueueScoreContext.builder() + .priority(1) + .baseFloorNo(3).targetFloorNo(13) // diff=10(封顶) + .enqueueTime(now.minusHours(4)) // aging 封顶 240 min + .now(now) + .build()); + + assertTrue(nearJustIn.getTotalScore() < farLongWaited.getTotalScore(), + "near=" + nearJustIn.getTotalScore() + " far=" + farLongWaited.getTotalScore() + + ":远楼层即便等满也不应反超近楼层"); + } + + @Test + void symmetricNullHandling_baseFloorMissing_shouldGiveZeroFloorScore() { + // B: baseFloor=null(执行人位置未知),不应被动扣分 + LocalDateTime now = LocalDateTime.of(2026, 4, 20, 12, 0); + QueueScoreResult result = calculator.calculate(QueueScoreContext.builder() + .priority(1) + .baseFloorNo(null).targetFloorNo(5) + .enqueueTime(now).now(now) + .build()); + + // priorityScore=1500, floorScore=0, aging=0 → 1500 + assertEquals(1500.0, result.getTotalScore(), 0.001); + assertNull(result.getFloorDiff(), "floorDiff 应为 null,表示楼层信息不完整"); + } + + @Test + void symmetricNullHandling_targetFloorMissing_shouldGiveZeroFloorScore() { + // B: targetFloor=null(工单区域未登记楼层)同样应得 floorScore=0——与 baseFloor 缺失等价 + LocalDateTime now = LocalDateTime.of(2026, 4, 20, 12, 0); + QueueScoreResult result = calculator.calculate(QueueScoreContext.builder() + .priority(1) + .baseFloorNo(5).targetFloorNo(null) + .enqueueTime(now).now(now) + .build()); + + assertEquals(1500.0, result.getTotalScore(), 0.001); + assertNull(result.getFloorDiff()); + } + + @Test + void symmetricNullHandling_bothTasksWithPartialFloor_shouldSortByAgingOnly() { + // 关键回归:旧逻辑会给"有 base 无 target"+600 罚分,导致同一工单在不同 base 场景排序不单调。 + // 现在两个任务楼层信息同等"不完整"应仅靠 aging 排序,"等得久"的排前。 + LocalDateTime now = LocalDateTime.of(2026, 4, 20, 12, 0); + + QueueScoreResult newerNoTarget = calculator.calculate(QueueScoreContext.builder() + .priority(1) + .baseFloorNo(5).targetFloorNo(null) + .enqueueTime(now.minusMinutes(5)) + .now(now) + .build()); + + QueueScoreResult olderNoTarget = calculator.calculate(QueueScoreContext.builder() + .priority(1) + .baseFloorNo(5).targetFloorNo(null) + .enqueueTime(now.minusMinutes(80)) + .now(now) + .build()); + + // 两者 floorScore 都 =0,aging 越大分越低 → older 先派 + assertTrue(olderNoTarget.getTotalScore() < newerNoTarget.getTotalScore()); + } + + @Test + void floorCappedAtMaxFloorDiff_evenWhenActualDiffMuchLarger() { + // 楼层差 25 应按 10 封顶:floorScore = 10 × 100 = 1000 + LocalDateTime now = LocalDateTime.of(2026, 4, 20, 12, 0); + QueueScoreResult result = calculator.calculate(QueueScoreContext.builder() + .priority(1) + .baseFloorNo(0).targetFloorNo(25) + .enqueueTime(now).now(now) + .build()); + + assertEquals(25, result.getFloorDiff(), "floorDiff 透传原始差值,便于诊断"); + // priorityScore=1500 + floorScore(capped)=1000 - aging=0 = 2500 + assertEquals(2500.0, result.getTotalScore(), 0.001, + "超过 10 层差应按 10 层封顶计算分数"); + } + + @Test + void floorWeightShouldDominateAgingCap() { + // 锁死这次改动的核心不变量:FLOOR_WEIGHT * MAX_FLOOR_DIFF > AGING_WEIGHT * MAX_AGING_MINUTES + // 即 100 * 10 = 1000 > 3 * 240 = 720 + int floorMax = QueueScoreCalculator.FLOOR_WEIGHT * QueueScoreCalculator.MAX_FLOOR_DIFF; + int agingMax = QueueScoreCalculator.AGING_WEIGHT * QueueScoreCalculator.MAX_AGING_MINUTES; + assertTrue(floorMax > agingMax, + "权重失衡:floor 封顶 " + floorMax + " 不再压倒 aging 封顶 " + agingMax + + ",会导致等得久的远楼层任务反超近楼层"); + } +} diff --git a/viewsh-module-ops/viewsh-module-ops-server/src/main/resources/application.yaml b/viewsh-module-ops/viewsh-module-ops-server/src/main/resources/application.yaml index 3b48f1ea..b72e80c4 100644 --- a/viewsh-module-ops/viewsh-module-ops-server/src/main/resources/application.yaml +++ b/viewsh-module-ops/viewsh-module-ops-server/src/main/resources/application.yaml @@ -146,6 +146,12 @@ viewsh: connect-timeout: 5000 read-timeout: 10000 max-retry: 2 + clean: + auto-cancel: + # 保洁工单 update_time 距今超过此小时数视为卡死,由 CleanOrderAutoCancelJob 自动取消 + timeout-hours: 12 + # 单次扫描/取消上限,防止事件风暴;超出的工单留给下一轮 cron + batch-size: 200 # API 签名配置:外部系统调用开放接口时使用(如安保工单的告警系统) signature: apps: