fix(ops): 修复工单队列一致性与设备状态覆盖问题
This commit is contained in:
@@ -2,6 +2,7 @@ package com.viewsh.module.ops.environment.integration.listener;
|
|||||||
|
|
||||||
import com.viewsh.module.iot.api.device.IotDeviceControlApi;
|
import com.viewsh.module.iot.api.device.IotDeviceControlApi;
|
||||||
import com.viewsh.module.iot.api.device.dto.ResetTrafficCounterReqDTO;
|
import com.viewsh.module.iot.api.device.dto.ResetTrafficCounterReqDTO;
|
||||||
|
import com.viewsh.module.ops.api.queue.OrderQueueService;
|
||||||
import com.viewsh.module.ops.core.dispatch.DispatchEngine;
|
import com.viewsh.module.ops.core.dispatch.DispatchEngine;
|
||||||
import com.viewsh.module.ops.core.dispatch.model.DispatchResult;
|
import com.viewsh.module.ops.core.dispatch.model.DispatchResult;
|
||||||
import com.viewsh.module.ops.core.dispatch.model.OrderDispatchContext;
|
import com.viewsh.module.ops.core.dispatch.model.OrderDispatchContext;
|
||||||
@@ -83,6 +84,9 @@ public class CleanOrderEventListener {
|
|||||||
@Resource
|
@Resource
|
||||||
private EventLogRecorder eventLogRecorder;
|
private EventLogRecorder eventLogRecorder;
|
||||||
|
|
||||||
|
@Resource
|
||||||
|
private OrderQueueService orderQueueService;
|
||||||
|
|
||||||
// ==================== 工单创建事件 ====================
|
// ==================== 工单创建事件 ====================
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -231,6 +235,9 @@ public class CleanOrderEventListener {
|
|||||||
// 设备状态由 BadgeDeviceStatusEventListener 统一处理
|
// 设备状态由 BadgeDeviceStatusEventListener 统一处理
|
||||||
log.debug("[CleanOrderEventListener] CANCELLED 状态已处理: orderId={}", event.getOrderId());
|
log.debug("[CleanOrderEventListener] CANCELLED 状态已处理: orderId={}", event.getOrderId());
|
||||||
break;
|
break;
|
||||||
|
case QUEUED:
|
||||||
|
handleQueued(event);
|
||||||
|
break;
|
||||||
default:
|
default:
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
@@ -265,6 +272,34 @@ public class CleanOrderEventListener {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 处理入队状态
|
||||||
|
*/
|
||||||
|
@Async("ops-task-executor")
|
||||||
|
public void handleQueued(OrderStateChangedEvent event) {
|
||||||
|
Long orderId = event.getOrderId();
|
||||||
|
Long deviceId = event.getPayloadLong("assigneeId");
|
||||||
|
|
||||||
|
if (deviceId == null) {
|
||||||
|
deviceId = event.getOperatorId();
|
||||||
|
}
|
||||||
|
|
||||||
|
if (deviceId != null) {
|
||||||
|
try {
|
||||||
|
// 获取等待中的任务列表
|
||||||
|
var waitingTasks = orderQueueService.getWaitingTasksByUserId(deviceId);
|
||||||
|
int queueCount = waitingTasks != null ? waitingTasks.size() : 0;
|
||||||
|
|
||||||
|
// 发送待办增加通知
|
||||||
|
sendQueuedOrderNotification(deviceId, queueCount);
|
||||||
|
|
||||||
|
log.info("[CleanOrderEventListener] 入队语音播报已发送: deviceId={}, queueCount={}", deviceId, queueCount);
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.warn("[CleanOrderEventListener] 播报入队语音失败: deviceId={}, orderId={}", deviceId, orderId);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 处理已确认状态
|
* 处理已确认状态
|
||||||
*/
|
*/
|
||||||
|
|||||||
@@ -242,11 +242,30 @@ public class BadgeDeviceStatusServiceImpl implements BadgeDeviceStatusService, I
|
|||||||
|
|
||||||
try {
|
try {
|
||||||
String key = BADGE_STATUS_KEY_PREFIX + deviceId;
|
String key = BADGE_STATUS_KEY_PREFIX + deviceId;
|
||||||
Long now = System.currentTimeMillis();
|
|
||||||
|
|
||||||
// 获取当前状态
|
// 获取当前状态
|
||||||
Map<Object, Object> currentMap = redisTemplate.opsForHash().entries(key);
|
Map<Object, Object> currentMap = redisTemplate.opsForHash().entries(key);
|
||||||
|
|
||||||
|
// 核心修复逻辑:如果不为 OFFLINE (即 IDLE/BUSY),且当前有正在进行的工单,则强制保持 BUSY 状态
|
||||||
|
// 防止设备心跳/上线事件将 BUSY 重置为 IDLE,导致调度状态不一致
|
||||||
|
if (status != BadgeDeviceStatusEnum.OFFLINE && currentMap.containsKey("currentOpsOrderId")) {
|
||||||
|
Object currentOrderIdObj = currentMap.get("currentOpsOrderId");
|
||||||
|
Object currentStatusObj = currentMap.get("status");
|
||||||
|
|
||||||
|
if (currentOrderIdObj != null && currentStatusObj != null) {
|
||||||
|
String currentStatusStr = currentStatusObj.toString();
|
||||||
|
if (BadgeDeviceStatusEnum.BUSY.getCode().equals(currentStatusStr)
|
||||||
|
|| BadgeDeviceStatusEnum.PAUSED.getCode().equals(currentStatusStr)) {
|
||||||
|
log.info("[updateBadgeOnlineStatus] 设备 {} 重新上线,但有进行中工单 {},保持状态 {}",
|
||||||
|
deviceId, currentOrderIdObj, currentStatusStr);
|
||||||
|
// 强制修正传入的状态为当前实际状态
|
||||||
|
status = BadgeDeviceStatusEnum.valueOf(currentStatusStr);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Long now = System.currentTimeMillis();
|
||||||
|
|
||||||
// 构建状态数据
|
// 构建状态数据
|
||||||
Map<String, Object> statusMap = new HashMap<>();
|
Map<String, Object> statusMap = new HashMap<>();
|
||||||
statusMap.put("deviceId", deviceId);
|
statusMap.put("deviceId", deviceId);
|
||||||
@@ -263,8 +282,8 @@ public class BadgeDeviceStatusServiceImpl implements BadgeDeviceStatusService, I
|
|||||||
// 更新实时物理区域信息 (Key2)
|
// 更新实时物理区域信息 (Key2)
|
||||||
if (areaId != null) {
|
if (areaId != null) {
|
||||||
statusMap.put("currentAreaId", areaId);
|
statusMap.put("currentAreaId", areaId);
|
||||||
// 同时更新区域设备索引缓存
|
// 同时更新区域设备索引缓存 (这里注释掉了,保持原样)
|
||||||
// addToAreaIndex(deviceId, areaId);
|
// addToAreaIndex(deviceId, areaId);
|
||||||
} else {
|
} else {
|
||||||
// 保持现有实时物理区域信息
|
// 保持现有实时物理区域信息
|
||||||
Object existingAreaId = currentMap.get("currentAreaId");
|
Object existingAreaId = currentMap.get("currentAreaId");
|
||||||
|
|||||||
@@ -34,41 +34,34 @@ public class QueueSyncHandler extends TransitionHandler {
|
|||||||
log.debug("队列同步处理器: orderId={}, targetStatus={}, queueId={}",
|
log.debug("队列同步处理器: orderId={}, targetStatus={}, queueId={}",
|
||||||
context.getOrder().getId(), targetStatus, queueId);
|
context.getOrder().getId(), targetStatus, queueId);
|
||||||
|
|
||||||
try {
|
// 根据目标状态同步队列状态
|
||||||
// 根据目标状态同步队列状态
|
switch (targetStatus) {
|
||||||
switch (targetStatus) {
|
case QUEUED:
|
||||||
case QUEUED:
|
handleQueued(context);
|
||||||
handleQueued(context);
|
break;
|
||||||
break;
|
|
||||||
|
|
||||||
case DISPATCHED:
|
case DISPATCHED:
|
||||||
handleDispatched(context);
|
handleDispatched(context);
|
||||||
break;
|
break;
|
||||||
|
|
||||||
case ARRIVED:
|
case ARRIVED:
|
||||||
// 队列状态保持 PROCESSING
|
// 队列状态保持 PROCESSING
|
||||||
break;
|
break;
|
||||||
|
|
||||||
case COMPLETED:
|
case COMPLETED:
|
||||||
case CANCELLED:
|
case CANCELLED:
|
||||||
handleCompletedOrCancelled(context);
|
handleCompletedOrCancelled(context);
|
||||||
break;
|
break;
|
||||||
|
|
||||||
case PAUSED:
|
case PAUSED:
|
||||||
handlePaused(context);
|
handlePaused(context);
|
||||||
break;
|
break;
|
||||||
|
|
||||||
default:
|
default:
|
||||||
log.debug("目标状态无需同步队列: targetStatus={}", targetStatus);
|
log.debug("目标状态无需同步队列: targetStatus={}", targetStatus);
|
||||||
}
|
|
||||||
|
|
||||||
log.debug("队列同步成功: orderId={}, targetStatus={}", context.getOrder().getId(), targetStatus);
|
|
||||||
|
|
||||||
} catch (Exception e) {
|
|
||||||
error(context, "队列同步失败: " + e.getMessage(), e);
|
|
||||||
log.error("队列同步失败: orderId={}, targetStatus={}",
|
|
||||||
context.getOrder().getId(), targetStatus, e);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
log.debug("队列同步成功: orderId={}, targetStatus={}", context.getOrder().getId(), targetStatus);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -79,20 +72,27 @@ public class QueueSyncHandler extends TransitionHandler {
|
|||||||
Long assigneeId = request.getAssigneeId();
|
Long assigneeId = request.getAssigneeId();
|
||||||
|
|
||||||
if (assigneeId == null) {
|
if (assigneeId == null) {
|
||||||
log.warn("入队时缺少执行人ID: orderId={}", context.getOrder().getId());
|
// 必须抛出异常以触发事务回滚,防止工单状态变更但队列记录未创建
|
||||||
return;
|
String errorMsg = String.format("入队时缺少执行人ID: orderId=%s", context.getOrder().getId());
|
||||||
|
log.error(errorMsg);
|
||||||
|
throw new IllegalArgumentException(errorMsg);
|
||||||
}
|
}
|
||||||
|
|
||||||
// 创建队列记录
|
try {
|
||||||
Long queueId = orderQueueService.enqueue(
|
// 创建队列记录
|
||||||
context.getOrder().getId(),
|
Long queueId = orderQueueService.enqueue(
|
||||||
assigneeId,
|
context.getOrder().getId(),
|
||||||
context.getOrder().getPriorityEnum(),
|
assigneeId,
|
||||||
request.getQueueIndex()
|
context.getOrder().getPriorityEnum(),
|
||||||
);
|
request.getQueueIndex());
|
||||||
|
|
||||||
context.setQueueId(queueId);
|
context.setQueueId(queueId);
|
||||||
log.debug("队列记录已创建: queueId={}", queueId);
|
log.debug("队列记录已创建: queueId={}", queueId);
|
||||||
|
} catch (Exception e) {
|
||||||
|
// 捕获异常并重新抛出,确保事务回滚
|
||||||
|
log.error("创建队列记录失败: orderId={}", context.getOrder().getId(), e);
|
||||||
|
throw new IllegalStateException("创建队列记录失败", e);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -108,9 +108,14 @@ public class QueueSyncHandler extends TransitionHandler {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
// 队列状态从 WAITING 变更为 PROCESSING
|
try {
|
||||||
orderQueueService.startExecution(queueId);
|
// 队列状态从 WAITING 变更为 PROCESSING
|
||||||
log.debug("队列状态已更新为PROCESSING: queueId={}", queueId);
|
orderQueueService.startExecution(queueId);
|
||||||
|
log.debug("队列状态已更新为PROCESSING: queueId={}", queueId);
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("更新队列状态失败: queueId={}", queueId, e);
|
||||||
|
throw new IllegalStateException("更新队列状态失败", e);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|||||||
Reference in New Issue
Block a user