feat(ops): 客流阈值触发静默处理,工单完成时重置计数器防竞态

1. 已派发/已到达(DISPATCHED/CONFIRMED/ARRIVED)状态静默忽略客流触发,
   仅排队中(PENDING/QUEUED)状态才升级优先级
2. 工单完成时先重置IoT客流计数器再清除活跃标记,防止残留计数
   和MQ消息延迟导致的竞态误创建工单
3. 工单取消时仅清除活跃标记不重置计数器,保留客流数据以便尽快
   重新触发

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
lzh
2026-03-07 22:28:11 +08:00
parent a9fd9313cc
commit 713ae744ac
2 changed files with 104 additions and 13 deletions

View File

@@ -36,8 +36,8 @@ import java.util.concurrent.TimeUnit;
* <p>
* 客流触发逻辑(周期化):
* 1. 无活跃工单 → 创建新工单 → 标记活跃 → 重置阈值
* 2. 有未派发工单(PENDING/QUEUED) → 升级优先级一级 → 重置阈值
* 3. 有已派发工单(DISPATCHED/CONFIRMED/ARRIVED) → 忽略 → 重置阈值
* 2. 有排队中工单(PENDING/QUEUED) → 升级优先级一级 → 重置阈值
* 3. 有已派发/已到达工单(DISPATCHED/CONFIRMED/ARRIVED) → 静默处理,不升级不创建 → 重置阈值
* 4. 已是 P0 → 不升级,记录审计日志 → 重置阈值
* <p>
* RocketMQ 配置:
@@ -69,9 +69,9 @@ public class CleanOrderCreateEventHandler implements RocketMQListener<String> {
private static final int DEDUP_TTL_SECONDS = 300;
/**
* 未派发状态集合(可升级优先级)
* 可升级优先级的状态集合(仅排队中,尚未派发
*/
private static final Set<String> UNDISPATCHED_STATUSES = Set.of(
private static final Set<String> UPGRADABLE_STATUSES = Set.of(
WorkOrderStatusEnum.PENDING.getStatus(),
WorkOrderStatusEnum.QUEUED.getStatus()
);
@@ -162,8 +162,8 @@ public class CleanOrderCreateEventHandler implements RocketMQListener<String> {
if (activeOrder != null) {
String status = activeOrder.getStatus();
if (UNDISPATCHED_STATUSES.contains(status)) {
// 未派发 → 升级优先级一级
if (UPGRADABLE_STATUSES.contains(status)) {
// 排队中 → 升级优先级一级
PriorityEnum result = cleanOrderService.upgradeOneLevelPriority(
activeOrder.getOrderId(), "客流持续达标自动升级");
@@ -179,9 +179,10 @@ public class CleanOrderCreateEventHandler implements RocketMQListener<String> {
areaId, activeOrder.getOrderId());
}
} else {
// 已派发DISPATCHED/CONFIRMED/ARRIVED忽略
log.info("[CleanOrderCreateEventHandler] 区域{}已有已派发工单{}(状态:{}),忽略本次客流触发",
areaId, activeOrder.getOrderId(), status);
// 已派发/已确认/已到达 → 保洁员已在处理中,静默忽略
log.info("[CleanOrderCreateEventHandler] 区域{}保洁员已在处理中(状态:{}),客流触发静默忽略: orderId={}",
areaId, status, activeOrder.getOrderId());
recordArrivedSilentLog(event, activeOrder.getOrderId());
}
// ★ 所有分支都重置阈值
@@ -389,6 +390,33 @@ public class CleanOrderCreateEventHandler implements RocketMQListener<String> {
}
}
/**
* 记录已派发/已到达静默处理审计日志
*/
private void recordArrivedSilentLog(CleanOrderCreateEventDTO event, Long orderId) {
try {
Map<String, Object> extra = new HashMap<>();
extra.put("eventId", event.getEventId());
extra.put("areaId", event.getAreaId());
extra.put("reason", "保洁员已在处理中,客流触发静默忽略");
eventLogRecorder.record(EventLogRecord.builder()
.module("clean")
.domain(EventDomain.TRAFFIC)
.eventType("ARRIVED_SILENT_IGNORE")
.message(String.format("保洁员已在处理中,客流触发静默忽略 [区域:%d]", event.getAreaId()))
.targetId(orderId)
.targetType("order")
.deviceId(event.getTriggerDeviceId())
.level(EventLevel.INFO)
.payload(extra)
.build());
} catch (Exception e) {
log.warn("[CleanOrderCreateEventHandler] 记录静默处理日志失败: orderId={}", orderId, e);
}
}
/**
* 确定事件域
*/

View File

@@ -1,5 +1,7 @@
package com.viewsh.module.ops.environment.integration.listener;
import com.viewsh.module.iot.api.device.IotDeviceControlApi;
import com.viewsh.module.iot.api.device.dto.ResetTrafficCounterReqDTO;
import com.viewsh.module.ops.api.queue.OrderQueueService;
import com.viewsh.module.ops.core.dispatch.DispatchEngine;
import com.viewsh.module.ops.core.dispatch.model.DispatchResult;
@@ -87,6 +89,9 @@ public class CleanOrderEventListener {
@Resource
private TrafficActiveOrderRedisDAO trafficActiveOrderRedisDAO;
@Resource
private IotDeviceControlApi iotDeviceControlApi;
// ==================== 工单创建事件 ====================
/**
@@ -184,7 +189,7 @@ public class CleanOrderEventListener {
break;
case COMPLETED:
handleCompleted(event);
clearTrafficActiveOrder(event);
clearTrafficActiveOrderOnComplete(event);
break;
case CANCELLED:
@@ -733,22 +738,80 @@ public class CleanOrderEventListener {
}
/**
* 工单终态时,清除 Redis 中的活跃工单标记
* 工单完成时,先重置 IoT 客流计数器,再清除活跃标记
* <p>
* 仅处理客流触发的工单。清除后下次客流达标将创建新工单(新周期
* 顺序至关重要:先重置计数器确保设备归零,再清除活跃标记开放新周期。
* 如果先清标记后重置,存在竞态窗口——已发出的 MQ 消息在标记清除后到达,
* 此时计数器尚未归零,会误创建基于残留计数的工单。
* <p>
* 如果计数器重置失败,仍然清除活跃标记(降级处理),避免区域永远被锁死。
*/
private void clearTrafficActiveOrderOnComplete(OrderStateChangedEvent event) {
try {
OpsOrderDO order = opsOrderMapper.selectById(event.getOrderId());
if (order != null && "IOT_TRAFFIC".equals(order.getTriggerSource()) && order.getAreaId() != null) {
// 1. 先重置 IoT 客流计数器(阻塞等待结果)
boolean resetOk = resetTrafficCounterOnComplete(order.getTriggerDeviceId(), order.getAreaId());
// 2. 再清除活跃标记
trafficActiveOrderRedisDAO.removeActive(order.getAreaId());
if (resetOk) {
log.info("[CleanOrderEventListener] 客流工单完成,计数器已重置,活跃标记已清除: areaId={}", order.getAreaId());
} else {
log.warn("[CleanOrderEventListener] 客流工单完成,计数器重置失败但活跃标记已清除(降级): areaId={}", order.getAreaId());
}
}
} catch (Exception e) {
log.warn("[CleanOrderEventListener] 清除客流活跃工单标记失败: orderId={}", event.getOrderId(), e);
}
}
/**
* 工单取消时,仅清除活跃标记,不重置计数器
* <p>
* 取消意味着区域未被清洁,客流计数应保留,以便尽快重新触发工单。
*/
private void clearTrafficActiveOrder(OrderStateChangedEvent event) {
try {
OpsOrderDO order = opsOrderMapper.selectById(event.getOrderId());
if (order != null && "IOT_TRAFFIC".equals(order.getTriggerSource()) && order.getAreaId() != null) {
trafficActiveOrderRedisDAO.removeActive(order.getAreaId());
log.info("[CleanOrderEventListener] 客流工单周期结束,已清除区域{}活跃标记", order.getAreaId());
log.info("[CleanOrderEventListener] 客流工单取消,已清除区域{}活跃标记(计数器保留)", order.getAreaId());
}
} catch (Exception e) {
log.warn("[CleanOrderEventListener] 清除客流活跃工单标记失败: orderId={}", event.getOrderId(), e);
}
}
/**
* 工单完成时重置客流计数器
*
* @return true 重置成功false 重置失败或无设备ID
*/
private boolean resetTrafficCounterOnComplete(Long deviceId, Long areaId) {
if (deviceId == null) {
return false;
}
try {
ResetTrafficCounterReqDTO reqDTO = ResetTrafficCounterReqDTO.builder()
.deviceId(deviceId)
.remark("工单完成后重置计数器,清除作业期间残留计数")
.build();
var result = iotDeviceControlApi.resetTrafficCounter(reqDTO);
if (result.getData() != null && result.getData()) {
log.info("[CleanOrderEventListener] 工单完成,计数器重置成功: deviceId={}, areaId={}", deviceId, areaId);
return true;
} else {
log.warn("[CleanOrderEventListener] 工单完成,计数器重置失败: deviceId={}, areaId={}", deviceId, areaId);
return false;
}
} catch (Exception e) {
log.warn("[CleanOrderEventListener] 工单完成,计数器重置异常: deviceId={}", deviceId, e);
return false;
}
}
/**
* 记录暂停开始时间
*/