fix(ops): 补齐状态转换审计闭环,回滚场景也留痕到 bus_log

问题:ops_order_event 在主事务内写,事务 rollback 则整段记录消失;
若状态机转换抛异常或并发冲突被拒,线上只有控制台日志而无数据库审计,
运维难以追溯"是谁、在什么时候、尝试做了什么转换、为什么失败"。

设计:中央事件发布 + TransactionalEventListener 双阶段落盘

1. OrderTransitionAttemptedEvent(新)
   覆盖 transition 成功、失败、FOR UPDATE 被拒三种情况,携带 orderId、
   fromStatus、targetStatus、errorCode、errorMessage、causeSummary 等。

2. OrderLifecycleManagerImpl
   - transition 成功分支:publishAttempt(success=true)
   - transition 失败分支(context.hasError):publishAttempt(success=false,
     errorCode=INVALID_TRANSITION, cause=摘要)
   - dispatch FOR UPDATE 命中分支:publishAttempt(success=false,
     errorCode=ASSIGNEE_HAS_ACTIVE_ORDER)
   publishAttempt 内部 try/catch,审计失败不影响主流程。

3. OrderTransitionAuditListener(新)
   - @TransactionalEventListener(AFTER_COMMIT, fallbackExecution=true)
     主事务已提交,按事件本身的 success 写 bus_log;INFO 级。
   - @TransactionalEventListener(AFTER_ROLLBACK) + @Transactional(REQUIRES_NEW)
     主事务已回滚,事件里声称的 success 强制视为失败;独立事务写 bus_log
     避免因主事务回滚而日志同样丢失。
   - errorCode、fromStatus、targetStatus、reason、cause 全部落 payload。
   - 冲突(ASSIGNEE_HAS_ACTIVE_ORDER)→ WARN;其他失败 → ERROR。

4. LogType 新增 TRANSITION_FAILED、DISPATCH_REJECTED。
5. EventLogRecorder 接口补 recordSync(实现类已有同名方法)。

运维查询:按 eventDomain='dispatch' + eventLevel IN ('WARN','ERROR')
即可一眼看出所有"尝试但未成功"的状态转换。errorCode 留在 payload JSON 内,
未升级为一等字段(后续如需聚合统计再迁移)。

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
lzh
2026-04-20 13:11:28 +08:00
parent b534d79434
commit 3e248fee8c
6 changed files with 346 additions and 2 deletions

View File

@@ -0,0 +1,85 @@
package com.viewsh.module.ops.core.event;
import com.viewsh.module.ops.core.lifecycle.model.TransitionErrorCode;
import com.viewsh.module.ops.enums.OperatorTypeEnum;
import com.viewsh.module.ops.enums.WorkOrderStatusEnum;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.time.LocalDateTime;
/**
* 工单状态转换"尝试"领域事件
* <p>
* 与 {@link OrderStateChangedEvent} 的区别:
* <ul>
* <li>{@code OrderStateChangedEvent} 仅在状态转换 <b>成功</b> 后发布EventPublishHandler
* 订阅方是业务层监听器TTS 播报、设备状态同步等)。</li>
* <li>{@code OrderTransitionAttemptedEvent} 在每一次 transition 尝试时都发布——成功、失败、
* FOR UPDATE 被拒 都发。订阅方是审计日志,用于打穿事务回滚造成的审计断链
* rollback 场景下 ops_order_event 无记录bus_log 需独立事务补齐)。</li>
* </ul>
* 事务边界:
* <ul>
* <li>发布方在主事务内 {@code publishEvent},事件会被 Spring 挂在当前事务的 synchronization 上。</li>
* <li>订阅方用 {@code @TransactionalEventListener(AFTER_COMMIT)} 或 {@code AFTER_ROLLBACK}
* 分别处理 commit 与 rollback 场景,保证两种结果都留痕。</li>
* </ul>
*
* @author lzh
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class OrderTransitionAttemptedEvent {
/** 工单ID */
private Long orderId;
/** 工单类型CLEAN / SECURITY / REPAIR / SERVICE */
private String orderType;
/** 工单编号(冗余,便于日志检索) */
private String orderCode;
/** 原状态(查询时的当前状态) */
private WorkOrderStatusEnum fromStatus;
/** 目标状态 */
private WorkOrderStatusEnum targetStatus;
/** 执行人ID */
private Long assigneeId;
/** 操作人类型 */
private OperatorTypeEnum operatorType;
/** 操作人ID */
private Long operatorId;
/** 原因/备注 */
private String reason;
/**
* 发布时的"声明结果"。
* <p>
* 注意:这是发布瞬间的判断;如果后续 handler 抛异常导致整个事务 rollback
* 监听器在 {@code AFTER_ROLLBACK} 阶段应强制将其视为失败。
*/
private boolean success;
/** 失败错误码success=false 时有值) */
private TransitionErrorCode errorCode;
/** 失败原因简要消息success=false 时有值) */
private String errorMessage;
/** 异常摘要success=false 且存在异常时有值,只保留 class + message不带堆栈 */
private String causeSummary;
/** 事件时间 */
private LocalDateTime attemptedAt;
}

View File

@@ -1,5 +1,6 @@
package com.viewsh.module.ops.core.lifecycle;
import com.viewsh.module.ops.core.event.OrderTransitionAttemptedEvent;
import com.viewsh.module.ops.core.lifecycle.handler.EventPublishHandler;
import com.viewsh.module.ops.core.lifecycle.handler.QueueSyncHandler;
import com.viewsh.module.ops.core.lifecycle.handler.StateTransitionHandler;
@@ -8,6 +9,9 @@ import com.viewsh.module.ops.core.lifecycle.model.OrderTransitionRequest;
import com.viewsh.module.ops.core.lifecycle.model.OrderTransitionResult;
import com.viewsh.module.ops.core.lifecycle.model.TransitionContext;
import com.viewsh.module.ops.core.lifecycle.model.TransitionErrorCode;
import org.springframework.context.ApplicationEventPublisher;
import java.time.LocalDateTime;
import com.viewsh.module.ops.dal.dataobject.workorder.OpsOrderDO;
import com.viewsh.module.ops.dal.mysql.workorder.OpsOrderMapper;
import com.viewsh.module.ops.enums.OperatorTypeEnum;
@@ -63,6 +67,9 @@ public class OrderLifecycleManagerImpl implements OrderLifecycleManager {
@Resource
private EventLogRecorder eventLogRecorder;
@Resource
private ApplicationEventPublisher applicationEventPublisher;
/**
* 责任链处理器
*/
@@ -102,10 +109,15 @@ public class OrderLifecycleManagerImpl implements OrderLifecycleManager {
// 4. 检查结果
if (context.hasError()) {
log.error("状态转换失败: orderId={}, error={}", order.getId(), context.getErrorMessage());
publishAttempt(order, oldStatus, request, false,
TransitionErrorCode.INVALID_TRANSITION,
context.getErrorMessage(),
summarizeThrowable(context.getCause()));
return OrderTransitionResult.fail(order.getId(), context.getErrorMessage());
}
log.info("状态转换成功: orderId={}, {} -> {}", order.getId(), oldStatus, request.getTargetStatus());
publishAttempt(order, oldStatus, request, true, null, null, null);
return OrderTransitionResult.builder()
.success(true)
.orderId(order.getId())
@@ -153,12 +165,21 @@ public class OrderLifecycleManagerImpl implements OrderLifecycleManager {
request.getAssigneeId(), request.getOrderId());
if (!activeOrders.isEmpty()) {
OpsOrderDO head = activeOrders.get(0);
String msg = "执行人已有活跃工单: orderId=" + head.getId() + ", status=" + head.getStatus();
log.warn("派发被拒:执行人已有活跃工单: assigneeId={}, requestOrderId={}, activeCount={}, sampleOrderId={}, sampleStatus={}",
request.getAssigneeId(), request.getOrderId(),
activeOrders.size(), head.getId(), head.getStatus());
// 审计:记录"派发被拒"尝试AFTER_COMMIT 监听器会写 bus_log
OpsOrderDO subject = opsOrderMapper.selectById(request.getOrderId());
WorkOrderStatusEnum fromStatus = subject != null
? WorkOrderStatusEnum.valueOf(subject.getStatus()) : null;
publishAttempt(subject != null ? subject : head, fromStatus, request, false,
TransitionErrorCode.ASSIGNEE_HAS_ACTIVE_ORDER, msg, null);
return OrderTransitionResult.fail(
request.getOrderId(),
"执行人已有活跃工单: orderId=" + head.getId() + ", status=" + head.getStatus(),
msg,
TransitionErrorCode.ASSIGNEE_HAS_ACTIVE_ORDER);
}
}
@@ -430,4 +451,49 @@ public class OrderLifecycleManagerImpl implements OrderLifecycleManager {
|| WorkOrderStatusEnum.ARRIVED == status;
}
/**
* 发布状态转换尝试事件,覆盖成功、普通失败、并发冲突三种情况。
* 订阅方 {@code OrderTransitionAuditListener} 在 AFTER_COMMIT/AFTER_ROLLBACK
* 阶段落 bus_log保证事务回滚不断链。
*/
private void publishAttempt(OpsOrderDO order, WorkOrderStatusEnum fromStatus,
OrderTransitionRequest request, boolean success,
TransitionErrorCode errorCode, String errorMessage,
String causeSummary) {
try {
OrderTransitionAttemptedEvent event = OrderTransitionAttemptedEvent.builder()
.orderId(order != null ? order.getId() : request.getOrderId())
.orderType(order != null ? order.getOrderType() : null)
.orderCode(order != null ? order.getOrderCode() : null)
.fromStatus(fromStatus)
.targetStatus(request.getTargetStatus())
.assigneeId(request.getAssigneeId())
.operatorType(request.getOperatorType())
.operatorId(request.getOperatorId())
.reason(request.getReason())
.success(success)
.errorCode(errorCode)
.errorMessage(errorMessage)
.causeSummary(causeSummary)
.attemptedAt(LocalDateTime.now())
.build();
applicationEventPublisher.publishEvent(event);
} catch (Exception e) {
// 审计事件发布失败不应影响主流程
log.error("发布转换尝试事件失败: orderId={}, targetStatus={}",
request.getOrderId(), request.getTargetStatus(), e);
}
}
/**
* 摘要异常:只保留类名 + message不带堆栈防止 bus_log 爆炸。
*/
private String summarizeThrowable(Throwable t) {
if (t == null) {
return null;
}
String msg = t.getMessage();
return t.getClass().getSimpleName() + (msg != null ? ": " + msg : "");
}
}

View File

@@ -0,0 +1,174 @@
package com.viewsh.module.ops.core.lifecycle.audit;
import com.viewsh.module.ops.core.event.OrderTransitionAttemptedEvent;
import com.viewsh.module.ops.core.lifecycle.model.TransitionErrorCode;
import com.viewsh.module.ops.infrastructure.log.enumeration.EventDomain;
import com.viewsh.module.ops.infrastructure.log.enumeration.EventLevel;
import com.viewsh.module.ops.infrastructure.log.enumeration.LogModule;
import com.viewsh.module.ops.infrastructure.log.enumeration.LogType;
import com.viewsh.module.ops.infrastructure.log.recorder.EventLogRecord;
import com.viewsh.module.ops.infrastructure.log.recorder.EventLogRecorder;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.transaction.event.TransactionPhase;
import org.springframework.transaction.event.TransactionalEventListener;
import java.util.HashMap;
import java.util.Map;
/**
* 工单状态转换尝试审计监听器。
* <p>
* 闭环设计:
* <ul>
* <li><b>AFTER_COMMIT</b>:主事务成功提交,按事件本身的 success 标志写 bus_log。</li>
* <li><b>AFTER_ROLLBACK</b>主事务已回滚——事件里的数据ops_order_event 等)全部消失。
* 此时必须新开一个独立事务写 bus_log否则审计链断裂。</li>
* </ul>
* <p>
* 字段归位:
* <ul>
* <li>{@code eventLevel}:成功=INFO失败=WARN冲突被拒或 ERROR状态机异常</li>
* <li>{@code eventDomain}:统一用 DISPATCH派发域便于运维按域聚合</li>
* <li>{@code eventType}:成功→业务 LogType如 ORDER_DISPATCHED失败→TRANSITION_FAILED
* 或 DISPATCH_REJECTED</li>
* <li>{@code eventPayload}errorCode / fromStatus / targetStatus / operatorType / reason / cause</li>
* </ul>
*
* @author lzh
*/
@Slf4j
@Component
public class OrderTransitionAuditListener {
@Resource
private EventLogRecorder eventLogRecorder;
/**
* 主事务已提交:照事件声明写一条审计日志。
* <p>
* fallbackExecution=true在无事务上下文时也执行如测试、跨线程补写场景
*/
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT, fallbackExecution = true)
public void onAfterCommit(OrderTransitionAttemptedEvent event) {
try {
eventLogRecorder.recordSync(toRecord(event, /*rolledBack=*/false));
} catch (Exception e) {
log.error("[TransitionAudit] AFTER_COMMIT 写 bus_log 失败: orderId={}, success={}, errorCode={}",
event.getOrderId(), event.isSuccess(), event.getErrorCode(), e);
}
}
/**
* 主事务已回滚:无论事件里声称 success 与否,这次"尝试"都**实际未落库**。
* 必须开独立事务写 bus_log否则日志也会因同事务回滚而丢失。
*/
@TransactionalEventListener(phase = TransactionPhase.AFTER_ROLLBACK)
public void onAfterRollback(OrderTransitionAttemptedEvent event) {
writeRollbackAudit(event);
}
/**
* 独立事务写入失败审计。
* <p>
* AFTER_ROLLBACK 触发时主事务已结束,此处即便用 REQUIRES_NEW 也不会遇到嵌套锁;
* 失败本身只是单行 insert不再对其他表加锁。
*/
@Transactional(propagation = Propagation.REQUIRES_NEW, rollbackFor = Exception.class)
public void writeRollbackAudit(OrderTransitionAttemptedEvent event) {
try {
eventLogRecorder.recordSync(toRecord(event, /*rolledBack=*/true));
} catch (Exception e) {
log.error("[TransitionAudit] AFTER_ROLLBACK 写 bus_log 失败: orderId={}, targetStatus={}",
event.getOrderId(), event.getTargetStatus(), e);
}
}
// ==================== 私有映射方法 ====================
private EventLogRecord toRecord(OrderTransitionAttemptedEvent event, boolean rolledBack) {
// rolledBack=true 时强制视为失败:即便发布时声明 success=true
// 事务 rollback 说明写入未真正生效。
boolean success = event.isSuccess() && !rolledBack;
EventLevel level = success ? EventLevel.INFO
: (event.getErrorCode() == TransitionErrorCode.ASSIGNEE_HAS_ACTIVE_ORDER
? EventLevel.WARN : EventLevel.ERROR);
String eventTypeCode = resolveEventTypeCode(event, success, rolledBack);
Map<String, Object> payload = new HashMap<>();
payload.put("fromStatus", event.getFromStatus() != null ? event.getFromStatus().getStatus() : null);
payload.put("targetStatus", event.getTargetStatus() != null ? event.getTargetStatus().getStatus() : null);
payload.put("operatorType", event.getOperatorType() != null ? event.getOperatorType().getType() : null);
payload.put("reason", event.getReason());
payload.put("success", success);
payload.put("rolledBack", rolledBack);
if (event.getErrorCode() != null) {
payload.put("errorCode", event.getErrorCode().name());
}
if (event.getErrorMessage() != null) {
payload.put("errorMessage", event.getErrorMessage());
}
if (event.getCauseSummary() != null) {
payload.put("cause", event.getCauseSummary());
}
if (event.getOrderCode() != null) {
payload.put("orderCode", event.getOrderCode());
}
String message = buildMessage(event, success, rolledBack);
return EventLogRecord.builder()
.module(LogModule.fromOrderType(event.getOrderType()))
.domain(EventDomain.DISPATCH)
.eventType(eventTypeCode)
.level(level)
.message(message)
.targetId(event.getOrderId())
.targetType("order")
.deviceId(event.getAssigneeId())
.personId(event.getOperatorId())
.payload(payload)
.eventTime(event.getAttemptedAt())
.build();
}
private String resolveEventTypeCode(OrderTransitionAttemptedEvent event, boolean success, boolean rolledBack) {
if (!success && event.getErrorCode() == TransitionErrorCode.ASSIGNEE_HAS_ACTIVE_ORDER) {
return LogType.DISPATCH_REJECTED.getCode();
}
if (!success) {
return LogType.TRANSITION_FAILED.getCode();
}
// 成功场景:按目标状态映射到业务 LogTypeops_order_event 已有时间轴,
// 这里 bus_log 仅作宽表镜像,便于运维按 domain/module 聚合查询。
if (event.getTargetStatus() == null) {
return LogType.SYSTEM_EVENT.getCode();
}
return switch (event.getTargetStatus()) {
case QUEUED -> LogType.ORDER_QUEUED.getCode();
case DISPATCHED -> LogType.ORDER_DISPATCHED.getCode();
case CONFIRMED -> LogType.ORDER_CONFIRM.getCode();
case ARRIVED -> LogType.ORDER_ARRIVED.getCode();
case PAUSED -> LogType.ORDER_PAUSED.getCode();
case COMPLETED -> LogType.ORDER_COMPLETED.getCode();
case CANCELLED -> LogType.ORDER_CANCELLED.getCode();
default -> LogType.SYSTEM_EVENT.getCode();
};
}
private String buildMessage(OrderTransitionAttemptedEvent event, boolean success, boolean rolledBack) {
String from = event.getFromStatus() != null ? event.getFromStatus().getStatus() : "?";
String to = event.getTargetStatus() != null ? event.getTargetStatus().getStatus() : "?";
if (success) {
return String.format("状态转换成功: %s -> %s", from, to);
}
String prefix = rolledBack ? "状态转换回滚" : "状态转换失败";
String detail = event.getErrorMessage() != null ? event.getErrorMessage() : "";
return String.format("%s: %s -> %s %s", prefix, from, to, detail).trim();
}
}

View File

@@ -47,7 +47,14 @@ public enum LogType {
COMPLETE_SUPPRESSED_INVALID("COMPLETE_SUPPRESSED_INVALID", "作业时长不足抑制"),
BEACON_COMPLETE_REQUESTED("BEACON_COMPLETE_REQUESTED", "信号丢失自动完成请求"),
TTS_REQUEST("TTS_REQUEST", "语音播报请求"),
ARRIVE_REJECTED("ARRIVE_REJECTED", "到岗请求被拒绝");
ARRIVE_REJECTED("ARRIVE_REJECTED", "到岗请求被拒绝"),
// ========== 状态机转换闭环审计 ==========
/** 状态转换尝试失败状态机异常、handler 抛错等) */
TRANSITION_FAILED("TRANSITION_FAILED", "状态转换失败"),
/** 派发被 FOR UPDATE 拒绝(同 assignee 已有活跃工单) */
DISPATCH_REJECTED("DISPATCH_REJECTED", "派发被拒绝");
private static final Map<String, LogType> CODE_MAP = new HashMap<>();

View File

@@ -25,6 +25,17 @@ public interface EventLogRecorder {
*/
void recordAsync(EventLogRecord record);
/**
* 记录事件日志(同步)
* <p>
* 需要确保日志真正落库的场景使用(如 AFTER_COMMIT 审计、事务回滚场景补写)。
* 调用方负责事务边界本方法内部不开启事务MyBatis 的 insert 会按当前线程的
* 事务上下文执行;若无活跃事务则自动单条提交。
*
* @param record 日志记录
*/
void recordSync(EventLogRecord record);
// ==================== 便捷方法:按级别记录 ====================
/**

View File

@@ -58,6 +58,7 @@ public class EventLogRecorderImpl implements EventLogRecorder {
* <p>
* 用于需要确认日志写入成功的场景(如测试、关键业务)
*/
@Override
public void recordSync(EventLogRecord record) {
doRecord(record);
}