From 4d349517997a90f87faafc09b9772f38cf7d53a2 Mon Sep 17 00:00:00 2001 From: lzh Date: Fri, 30 Jan 2026 14:50:38 +0800 Subject: [PATCH] =?UTF-8?q?fix(ops):=20=E4=BF=AE=E5=A4=8D=E5=B7=A5?= =?UTF-8?q?=E5=8D=95=E9=98=9F=E5=88=97=E4=B8=80=E8=87=B4=E6=80=A7=E4=B8=8E?= =?UTF-8?q?=E8=AE=BE=E5=A4=87=E7=8A=B6=E6=80=81=E8=A6=86=E7=9B=96=E9=97=AE?= =?UTF-8?q?=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../listener/CleanOrderEventListener.java | 35 +++++++ .../badge/BadgeDeviceStatusServiceImpl.java | 25 ++++- .../lifecycle/handler/QueueSyncHandler.java | 91 ++++++++++--------- 3 files changed, 105 insertions(+), 46 deletions(-) diff --git a/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/listener/CleanOrderEventListener.java b/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/listener/CleanOrderEventListener.java index 08f1d1f..7e128f7 100644 --- a/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/listener/CleanOrderEventListener.java +++ b/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/integration/listener/CleanOrderEventListener.java @@ -2,6 +2,7 @@ package com.viewsh.module.ops.environment.integration.listener; import com.viewsh.module.iot.api.device.IotDeviceControlApi; import com.viewsh.module.iot.api.device.dto.ResetTrafficCounterReqDTO; +import com.viewsh.module.ops.api.queue.OrderQueueService; import com.viewsh.module.ops.core.dispatch.DispatchEngine; import com.viewsh.module.ops.core.dispatch.model.DispatchResult; import com.viewsh.module.ops.core.dispatch.model.OrderDispatchContext; @@ -83,6 +84,9 @@ public class CleanOrderEventListener { @Resource private EventLogRecorder eventLogRecorder; + @Resource + private OrderQueueService orderQueueService; + // ==================== 工单创建事件 ==================== /** @@ -231,6 +235,9 @@ public class CleanOrderEventListener { // 设备状态由 BadgeDeviceStatusEventListener 统一处理 log.debug("[CleanOrderEventListener] CANCELLED 状态已处理: orderId={}", event.getOrderId()); break; + case QUEUED: + handleQueued(event); + break; default: break; } @@ -265,6 +272,34 @@ public class CleanOrderEventListener { } } + /** + * 处理入队状态 + */ + @Async("ops-task-executor") + public void handleQueued(OrderStateChangedEvent event) { + Long orderId = event.getOrderId(); + Long deviceId = event.getPayloadLong("assigneeId"); + + if (deviceId == null) { + deviceId = event.getOperatorId(); + } + + if (deviceId != null) { + try { + // 获取等待中的任务列表 + var waitingTasks = orderQueueService.getWaitingTasksByUserId(deviceId); + int queueCount = waitingTasks != null ? waitingTasks.size() : 0; + + // 发送待办增加通知 + sendQueuedOrderNotification(deviceId, queueCount); + + log.info("[CleanOrderEventListener] 入队语音播报已发送: deviceId={}, queueCount={}", deviceId, queueCount); + } catch (Exception e) { + log.warn("[CleanOrderEventListener] 播报入队语音失败: deviceId={}, orderId={}", deviceId, orderId); + } + } + } + /** * 处理已确认状态 */ diff --git a/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/service/badge/BadgeDeviceStatusServiceImpl.java b/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/service/badge/BadgeDeviceStatusServiceImpl.java index 3af4fcf..769759b 100644 --- a/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/service/badge/BadgeDeviceStatusServiceImpl.java +++ b/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/service/badge/BadgeDeviceStatusServiceImpl.java @@ -242,11 +242,30 @@ public class BadgeDeviceStatusServiceImpl implements BadgeDeviceStatusService, I try { String key = BADGE_STATUS_KEY_PREFIX + deviceId; - Long now = System.currentTimeMillis(); // 获取当前状态 Map currentMap = redisTemplate.opsForHash().entries(key); + // 核心修复逻辑:如果不为 OFFLINE (即 IDLE/BUSY),且当前有正在进行的工单,则强制保持 BUSY 状态 + // 防止设备心跳/上线事件将 BUSY 重置为 IDLE,导致调度状态不一致 + if (status != BadgeDeviceStatusEnum.OFFLINE && currentMap.containsKey("currentOpsOrderId")) { + Object currentOrderIdObj = currentMap.get("currentOpsOrderId"); + Object currentStatusObj = currentMap.get("status"); + + if (currentOrderIdObj != null && currentStatusObj != null) { + String currentStatusStr = currentStatusObj.toString(); + if (BadgeDeviceStatusEnum.BUSY.getCode().equals(currentStatusStr) + || BadgeDeviceStatusEnum.PAUSED.getCode().equals(currentStatusStr)) { + log.info("[updateBadgeOnlineStatus] 设备 {} 重新上线,但有进行中工单 {},保持状态 {}", + deviceId, currentOrderIdObj, currentStatusStr); + // 强制修正传入的状态为当前实际状态 + status = BadgeDeviceStatusEnum.valueOf(currentStatusStr); + } + } + } + + Long now = System.currentTimeMillis(); + // 构建状态数据 Map statusMap = new HashMap<>(); statusMap.put("deviceId", deviceId); @@ -263,8 +282,8 @@ public class BadgeDeviceStatusServiceImpl implements BadgeDeviceStatusService, I // 更新实时物理区域信息 (Key2) if (areaId != null) { statusMap.put("currentAreaId", areaId); - // 同时更新区域设备索引缓存 -// addToAreaIndex(deviceId, areaId); + // 同时更新区域设备索引缓存 (这里注释掉了,保持原样) + // addToAreaIndex(deviceId, areaId); } else { // 保持现有实时物理区域信息 Object existingAreaId = currentMap.get("currentAreaId"); diff --git a/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/core/lifecycle/handler/QueueSyncHandler.java b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/core/lifecycle/handler/QueueSyncHandler.java index 4f96765..a173208 100644 --- a/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/core/lifecycle/handler/QueueSyncHandler.java +++ b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/core/lifecycle/handler/QueueSyncHandler.java @@ -34,41 +34,34 @@ public class QueueSyncHandler extends TransitionHandler { log.debug("队列同步处理器: orderId={}, targetStatus={}, queueId={}", context.getOrder().getId(), targetStatus, queueId); - try { - // 根据目标状态同步队列状态 - switch (targetStatus) { - case QUEUED: - handleQueued(context); - break; + // 根据目标状态同步队列状态 + switch (targetStatus) { + case QUEUED: + handleQueued(context); + break; - case DISPATCHED: - handleDispatched(context); - break; + case DISPATCHED: + handleDispatched(context); + break; - case ARRIVED: - // 队列状态保持 PROCESSING - break; + case ARRIVED: + // 队列状态保持 PROCESSING + break; - case COMPLETED: - case CANCELLED: - handleCompletedOrCancelled(context); - break; + case COMPLETED: + case CANCELLED: + handleCompletedOrCancelled(context); + break; - case PAUSED: - handlePaused(context); - break; + case PAUSED: + handlePaused(context); + break; - default: - log.debug("目标状态无需同步队列: targetStatus={}", targetStatus); - } - - log.debug("队列同步成功: orderId={}, targetStatus={}", context.getOrder().getId(), targetStatus); - - } catch (Exception e) { - error(context, "队列同步失败: " + e.getMessage(), e); - log.error("队列同步失败: orderId={}, targetStatus={}", - context.getOrder().getId(), targetStatus, e); + default: + log.debug("目标状态无需同步队列: targetStatus={}", targetStatus); } + + log.debug("队列同步成功: orderId={}, targetStatus={}", context.getOrder().getId(), targetStatus); } /** @@ -79,20 +72,27 @@ public class QueueSyncHandler extends TransitionHandler { Long assigneeId = request.getAssigneeId(); if (assigneeId == null) { - log.warn("入队时缺少执行人ID: orderId={}", context.getOrder().getId()); - return; + // 必须抛出异常以触发事务回滚,防止工单状态变更但队列记录未创建 + String errorMsg = String.format("入队时缺少执行人ID: orderId=%s", context.getOrder().getId()); + log.error(errorMsg); + throw new IllegalArgumentException(errorMsg); } - // 创建队列记录 - Long queueId = orderQueueService.enqueue( - context.getOrder().getId(), - assigneeId, - context.getOrder().getPriorityEnum(), - request.getQueueIndex() - ); + try { + // 创建队列记录 + Long queueId = orderQueueService.enqueue( + context.getOrder().getId(), + assigneeId, + context.getOrder().getPriorityEnum(), + request.getQueueIndex()); - context.setQueueId(queueId); - log.debug("队列记录已创建: queueId={}", queueId); + context.setQueueId(queueId); + log.debug("队列记录已创建: queueId={}", queueId); + } catch (Exception e) { + // 捕获异常并重新抛出,确保事务回滚 + log.error("创建队列记录失败: orderId={}", context.getOrder().getId(), e); + throw new IllegalStateException("创建队列记录失败", e); + } } /** @@ -108,9 +108,14 @@ public class QueueSyncHandler extends TransitionHandler { return; } - // 队列状态从 WAITING 变更为 PROCESSING - orderQueueService.startExecution(queueId); - log.debug("队列状态已更新为PROCESSING: queueId={}", queueId); + try { + // 队列状态从 WAITING 变更为 PROCESSING + orderQueueService.startExecution(queueId); + log.debug("队列状态已更新为PROCESSING: queueId={}", queueId); + } catch (Exception e) { + log.error("更新队列状态失败: queueId={}", queueId, e); + throw new IllegalStateException("更新队列状态失败", e); + } } /**