Files
aiot-platform-cloud/docs/ops-architecture/part4-关键技术方案.md

38 KiB
Raw Blame History

Part 4: 关键技术方案

本文档详细阐述 Ops 业务运营模块的核心技术方案,包括设计思路、实现细节、技术要点和优缺点分析。


4.1 状态机实现方案

4.1.1 设计思路

状态机是工单管理的核心负责管理<EFBFBD><EFBFBD><EFBFBD>单的生命周期状态转换。设计遵循以下原则

  1. 单一职责:状态机只负责状态转换规则验证和状态管理,不包含业务逻辑
  2. 事件驱动:状态变更后发布事件,业务方通过 @EventListener 订阅处理
  3. 事务一致性:所有状态变更在同一事务中完成
  4. 可追溯性:所有状态转换记录到事件表

4.1.2 状态定义

工单状态枚举WorkOrderStatusEnum

状态 说明 可转换到
PENDING 待接单 QUEUED, DISPATCHED, CANCELLED
QUEUED 已入队 DISPATCHED, CANCELLED
DISPATCHED 已派单 CONFIRMED, ARRIVED, CANCELLED
CONFIRMED 已确认 ARRIVED, PAUSED, CANCELLED
ARRIVED 已到岗 PAUSED, COMPLETED, CANCELLED
PAUSED 已暂停 ARRIVED, CANCELLED
COMPLETED 已完成 [终态] -
CANCELLED 已取消 [终态] -

4.1.3 核心实现

状态转换规则

@Service
public class OrderStateMachine {

    private static final Map<WorkOrderStatusEnum, Set<WorkOrderStatusEnum>> TRANSITIONS = Map.of(
        WorkOrderStatusEnum.PENDING, Set.of(
            WorkOrderStatusEnum.QUEUED,      // 保洁业务:入队
            WorkOrderStatusEnum.DISPATCHED,   // 通用业务:直接派单
            WorkOrderStatusEnum.CANCELLED
        ),
        WorkOrderStatusEnum.QUEUED, Set.of(
            WorkOrderStatusEnum.DISPATCHED,
            WorkOrderStatusEnum.CANCELLED
        ),
        WorkOrderStatusEnum.DISPATCHED, Set.of(
            WorkOrderStatusEnum.CONFIRMED,
            WorkOrderStatusEnum.ARRIVED,
            WorkOrderStatusEnum.CANCELLED
        ),
        WorkOrderStatusEnum.CONFIRMED, Set.of(
            WorkOrderStatusEnum.ARRIVED,
            WorkOrderStatusEnum.PAUSED,
            WorkOrderStatusEnum.CANCELLED
        ),
        WorkOrderStatusEnum.ARRIVED, Set.of(
            WorkOrderStatusEnum.PAUSED,
            WorkOrderStatusEnum.COMPLETED,
            WorkOrderStatusEnum.CANCELLED
        ),
        WorkOrderStatusEnum.PAUSED, Set.of(
            WorkOrderStatusEnum.ARRIVED,
            WorkOrderStatusEnum.CANCELLED
        ),
        WorkOrderStatusEnum.COMPLETED, Collections.emptySet(),
        WorkOrderStatusEnum.CANCELLED, Collections.emptySet()
    );

    /**
     * 状态转换核心方法
     */
    @Transactional(rollbackFor = Exception.class)
    public void transition(OpsOrderDO order, WorkOrderStatusEnum newStatus,
                          OperatorTypeEnum operatorType, Long operatorId, String remark) {
        // 1. 参数校验
        Assert.notNull(order, "工单不能为空");
        Assert.notNull(newStatus, "目标状态不能为空");

        // 2. 获取当前状态
        WorkOrderStatusEnum currentStatus = order.getStatus();
        if (currentStatus == newStatus) {
            log.warn("工单状态已是目标状态,无需转换: orderId={}, status={}", order.getId(), newStatus);
            return;
        }

        // 3. 校验状态转换合法性
        Set<WorkOrderStatusEnum> allowedTargets = TRANSITIONS.get(currentStatus);
        if (allowedTargets == null || !allowedTargets.contains(newStatus)) {
            throw new IllegalStateException(
                String.format("非法的状态转换: %s → %s", currentStatus, newStatus));
        }

        // 4. 更新工单状态和相关字段
        WorkOrderStatusEnum oldStatus = order.getStatus();
        order.setStatus(newStatus);
        updateStatusFields(order, newStatus);  // 更新时间戳等字段

        // 5. 记录事件流
        OpsOrderEventDO event = buildEvent(order, oldStatus, newStatus, operatorType, operatorId, remark);
        orderEventMapper.insert(event);

        // 6. 发布状态变更事件
        publishStateChangedEvent(order, oldStatus, newStatus, operatorType, operatorId);
    }

    /**
     * 根据状态更新相关字段
     */
    private void updateStatusFields(OpsOrderDO order, WorkOrderStatusEnum newStatus) {
        LocalDateTime now = LocalDateTime.now();
        switch (newStatus) {
            case DISPATCHED:
                order.setDispatchTime(now);
                break;
            case ARRIVED:
                order.setArriveTime(now);
                break;
            case COMPLETED:
                order.setCompleteTime(now);
                break;
            case CANCELLED:
                order.setCancelTime(now);
                break;
            default:
                break;
        }
    }

    /**
     * 发布状态变更事件
     */
    private void publishStateChangedEvent(OpsOrderDO order, WorkOrderStatusEnum oldStatus,
                                          WorkOrderStatusEnum newStatus,
                                          OperatorTypeEnum operatorType, Long operatorId) {
        try {
            OrderStateChangedEvent event = new OrderStateChangedEvent(
                order.getId(), oldStatus, newStatus, operatorType, operatorId
            );
            applicationEventPublisher.publishEvent(event);
        } catch (Exception e) {
            // 事件发布失败不影响主流程
            log.error("发布状态变更事件失败: orderId={}", order.getId(), e);
        }
    }
}

4.1.4 状态流转图

                    ┌─────────┐
                    │ PENDING │ (待接单)
                    └────┬────┘
                         │
         ┌───────────────┼───────────────┐
         ↓               ↓               ↓
   ┌─────────┐     ┌─────────┐     ┌──────────┐
   │ QUEUED  │────▶│DISPATCHED│     │CANCELLED │
   └────┬────┘     └────┬────┘     └──────────┘
        │               │
        └───────────────┼───────────────┐
                        ↓               ↓
                 ┌──────────┐     ┌──────────┐
                 │ CONFIRMED│────▶│ ARRIVED  │
                 └──────────┘     └────┬─────┘
                                        │
                        ┌───────────────┼───────────────┐
                        ↓               ↓               ↓
                 ┌──────────┐     ┌──────────┐     ┌──────────┐
                 │  PAUSED  │────▶│ ARRIVED  │     │COMPLETED │
                 └──────────┘     └──────────┘     └──────────┘

任何状态 ─────────────────────────────────────────▶ CANCELLED (终态)

4.1.5 技术要点

  1. 状态转换验证:使用 Map 定义清晰的状态转换规则,防止非法状态转换
  2. 事务一致性:所有状态变更在同一事务中完成,确保数据一致性
  3. 事件发布失败处理:事件发布失败不影响主流程,记录错误日志
  4. 状态字段自动更新:根据新状态自动更新相关时间戳字段(派单时间、到达时间、完成时间等)

4.1.6 优缺点分析

优点

  • 状态转换规则清晰可见,易于维护和扩展
  • 事件驱动架构解耦了状态管理与业务逻辑
  • 事务保证确保数据一致性
  • 完整的事件溯源支持审计和问题追溯

缺点

  • 状态机相对简单,复杂业务逻辑需要额外处理
  • 事件发布异步可能导致时序问题
  • 状态枚举需要提前定义,新增状态需要修改代码

4.2 优先级队列实现方案

4.2.1 设计思路

采用 Redis + MySQL 混合队列 架构:

  • Redis Sorted Set:高性能队列,支持实时排序和快速出队
  • MySQL:持久化存储,保证数据不丢失
  • 写入策略:先写入 MySQL保证持久化再异步写入 Redis提高性能
  • 读取策略:优先从 Redis 读取Redis 不可用时降级到 MySQL

4.2.2 优先级定义

优先级 说明 响应时间要求 打断能力
P0 紧急任务 < 3分钟 可打断任何任务
P1 重要任务 < 15分钟 不可打断
P2 普通任务 < 2小时 不可打断

4.2.3 队列状态

队列状态枚举OrderQueueStatusEnum

状态 说明 可转换到
WAITING 等待中 PROCESSING, PAUSED, REMOVED
PROCESSING 处理中 PAUSED, REMOVED
PAUSED 暂停中 PROCESSING, REMOVED
REMOVED 已移除 [终态] -

4.2.4 核心实现

分数计算公式

@Service
public class OrderQueueService {

    // 优先级分数映射
    private static final Map<Integer, Long> PRIORITY_SCORES = Map.of(
        0, 0L,           // P0: 0
        1, 1000000L,     // P1: 1,000,000
        2, 2000000L,     // P2: 2,000,000
        3, 3000000L      // P3: 3,000,000
    );

    /**
     * 计算队列分数
     * 公式priorityScore + timestamp
     * - priorityScore保证高优先级任务排在前面
     * - timestamp保证同优先级任务按时间 FIFO 排序
     */
    private double calculateScore(int priority, long timestamp) {
        long priorityScore = PRIORITY_SCORES.getOrDefault(priority, 2000000L);
        return priorityScore + (timestamp / 1000.0);  // 转换为秒避免精度问题
    }

    /**
     * 入队
     */
    @Transactional(rollbackFor = Exception.class)
    public boolean enqueue(OrderQueueDTO dto) {
        // 1. 参数校验
        Assert.notNull(dto, "入队参数不能为空");
        Assert.notNull(dto.getOrderId(), "工单ID不能为空");
        Assert.notNull(dto.getUserId(), "用户ID不能为空");

        // 2. 写入 MySQL持久化
        OpsOrderQueueDO queueDO = new OpsOrderQueueDO();
        queueDO.setOrderId(dto.getOrderId());
        queueDO.setUserId(dto.getUserId());
        queueDO.setStatus(OrderQueueStatusEnum.WAITING.getCode());
        queueDO.setPriority(dto.getPriority());
        queueDO.setEnqueueTime(LocalDateTime.now());
        orderQueueMapper.insert(queueDO);

        // 3. 异步写入 Redis高性能
        CompletableFuture.runAsync(() -> {
            try {
                String queueKey = buildQueueKey(dto.getUserId());
                String infoKey = buildInfoKey(dto.getOrderId());

                // 计算分数
                double score = calculateScore(dto.getPriority(),
                    dto.getEnqueueTime() != null ? dto.getEnqueueTime().getTime() : System.currentTimeMillis());

                // 添加到 Sorted Set
                String json = JsonUtils.toJson(queueDO);
                redisTemplate.opsForZSet().add(queueKey, json, score);

                // 存储详细信息到 Hash
                Map<String, Object> infoMap = new HashMap<>();
                infoMap.put("orderId", dto.getOrderId());
                infoMap.put("priority", dto.getPriority());
                infoMap.put("status", OrderQueueStatusEnum.WAITING.getCode());
                infoMap.put("enqueueTime", queueDO.getEnqueueTime().toString());
                redisTemplate.opsForHash().putAll(infoKey, infoMap);

                // 设置过期时间24小时
                redisTemplate.expire(queueKey, 24, TimeUnit.HOURS);
                redisTemplate.expire(infoKey, 24, TimeUnit.HOURS);
            } catch (Exception e) {
                log.error("Redis入队失败: orderId={}", dto.getOrderId(), e);
            }
        });

        return true;
    }

    /**
     * 出队(原子性操作)
     */
    public OpsOrderQueueDO dequeue(Long userId) {
        String queueKey = buildQueueKey(userId);

        // 使用 ZPOPMIN 实现原子性出队
        Set<Object> results = redisTemplate.opsForZSet().popMin(queueKey);

        if (results == null || results.isEmpty()) {
            // Redis 为空,从 MySQL 读取(降级)
            return dequeueFromDB(userId);
        }

        // 解析结果
        Object result = results.iterator().next();
        OrderQueueDTO dto = JsonUtils.fromJson(result.toString(), OrderQueueDTO.class);

        // 更新 MySQL 状态
        orderQueueMapper.updateStatus(dto.getOrderId(), OrderQueueStatusEnum.REMOVED.getCode());

        // 清理 Hash
        String infoKey = buildInfoKey(dto.getOrderId());
        redisTemplate.delete(infoKey);

        return dto.toDO();
    }

    /**
     * 开始执行WAITING → PROCESSING
     */
    @Transactional(rollbackFor = Exception.class)
    public void startExecution(Long orderId, Long userId) {
        // 1. 更新 MySQL
        orderQueueMapper.updateStatus(orderId, OrderQueueStatusEnum.PROCESSING.getCode());

        // 2. 更新 Redis
        String infoKey = buildInfoKey(orderId);
        redisTemplate.opsForHash().put(infoKey, "status", OrderQueueStatusEnum.PROCESSING.getCode());
    }

    /**
     * 暂停任务PROCESSING → PAUSED
     */
    @Transactional(rollbackFor = Exception.class)
    public void pauseTask(Long orderId, Long userId) {
        // 1. 更新 MySQL
        orderQueueMapper.updateStatus(orderId, OrderQueueStatusEnum.PAUSED.getCode());

        // 2. 更新 Redis
        String infoKey = buildInfoKey(orderId);
        redisTemplate.opsForHash().put(infoKey, "status", OrderQueueStatusEnum.PAUSED.getCode());
    }

    /**
     * 恢复任务PAUSED → PROCESSING
     */
    @Transactional(rollbackFor = Exception.class)
    public void resumeTask(Long orderId, Long userId) {
        // 1. 更新 MySQL
        orderQueueMapper.updateStatus(orderId, OrderQueueStatusEnum.PROCESSING.getCode());

        // 2. 更新 Redis
        String infoKey = buildInfoKey(orderId);
        redisTemplate.opsForHash().put(infoKey, "status", OrderQueueStatusEnum.PROCESSING.getCode());
    }
}

4.2.5 数据同步策略

写入流程

工单入队请求
    ↓
写入 MySQL同步保证持久化
    ↓
异步写入 Redis后台任务提高性能
    ↓
返回成功

读取流程

工单出队请求
    ↓
尝试从 Redis 读取
    ↓
Redis 为空?
    ├─ 是 → 从 MySQL 读取(降级)
    └─ 否 → 返回 Redis 数据

容灾机制

  • Redis 宕机:自动降级到纯 MySQL 模式
  • 定时同步:每 5 分钟从 MySQL 同步数据到 Redis
  • 数据一致性:以 MySQL 为准Redis 作为缓存

4.2.6 技术要点

  1. 分数计算priorityScore + timestamp,确保高优先级任务在前,同优先级任务按 FIFO 排序
  2. 原子性操作:使用 ZPOPMIN 实现原子性出队,避免并发问题
  3. 容灾机制Redis 宕机时降级到纯 MySQL 模式
  4. 批量操作:使用 Pipeline 提高批量写入性能

4.2.7 优缺点分析

优点

  • 高性能Redis Sorted Set 提供 O(log N) 的插入和删除性能
  • 可靠性MySQL 持久化保证数据不丢失
  • 扩展性:支持横向扩展
  • 容灾能力Redis 故障时自动降级

缺点

  • 架构复杂:需要维护两套存储
  • 数据一致性:异步同步可能导致短暂不一致
  • 内存占用Redis 需要存储大量队列数据
  • 运维成本:需要同时维护 Redis 和 MySQL

4.3 智能派单引擎

4.3.1 设计思路

派单引擎采用 策略模式,将派单决策抽象为两层:

  1. 分配策略AssignStrategy推荐最合适的执行人员
  2. 调度策略ScheduleStrategy决策派单路径直接派单、入队、打断

核心原则

  • 纯决策层:引擎只负责"决策",不负责状态管理和设备通知
  • 业务解耦:通用逻辑下沉,业务特定逻辑通过事件监听实现
  • 可扩展性:新增业务线只需实现策略接口

4.3.2 策略接口定义

分配策略接口

public interface AssignStrategy {

    /**
     * 策略类型
     */
    String getType();

    /**
     * 策略优先级(数字越小优先级越高)
     */
    int getPriority();

    /**
     * 是否支持该业务类型
     */
    boolean supports(WorkOrderTypeEnum orderType);

    /**
     * 推荐执行人员
     */
    AssigneeRecommendation recommend(OrderDispatchContext context);
}

调度策略接口

public interface ScheduleStrategy {

    /**
     * 策略类型
     */
    String getType();

    /**
     * 是否支持该业务类型
     */
    boolean supports(WorkOrderTypeEnum orderType);

    /**
     * 决策派单路径
     */
    DispatchDecision decide(OrderDispatchContext context);
}

4.3.3 推荐算法

保洁区域优先策略CleanerAreaPriorityStrategy

@Component
public class CleanerAreaPriorityStrategy implements AssignStrategy {

    @Autowired
    private CleanerStatusService cleanerStatusService;

    @Autowired
    private BusAreaService busAreaService;

    @Override
    public String getType() {
        return "CLEANER_AREA_PRIORITY";
    }

    @Override
    public int getPriority() {
        return 0;  // 最高优先级
    }

    @Override
    public boolean supports(WorkOrderTypeEnum orderType) {
        return WorkOrderTypeEnum.CLEAN.equals(orderType);
    }

    @Override
    public AssigneeRecommendation recommend(OrderDispatchContext context) {
        // 1. 查询该区域的保洁员
        List<OpsCleanerStatusDO> cleaners = cleanerStatusService.listCleanersByArea(context.getAreaId());

        if (cleaners.isEmpty()) {
            return AssigneeRecommendation.notFound("该区域无保洁员");
        }

        // 2. 过滤在线的保洁员
        List<OpsCleanerStatusDO> onlineCleaners = cleaners.stream()
            .filter(c -> !CleanerStatusEnum.OFFLINE.equals(c.getStatus()))
            .collect(Collectors.toList());

        if (onlineCleaners.isEmpty()) {
            return AssigneeRecommendation.notFound("该区域无在线保洁员");
        }

        // 3. 选择最佳保洁员
        OpsCleanerStatusDO selectedCleaner = selectBestCleaner(onlineCleaners, context);

        return AssigneeRecommendation.found(selectedCleaner.getUserId(),
            "推荐理由:同区域、状态在线、当前任务数少");
    }

    /**
     * 选择最佳保洁员
     * 综合考虑:状态 > 电量 > 心跳时间 > 当前任务数
     */
    private OpsCleanerStatusDO selectBestCleaner(List<OpsCleanerStatusDO> cleaners,
                                                  OrderDispatchContext context) {
        return cleaners.stream()
            .filter(c -> CleanerStatusEnum.IDLE.equals(c.getStatus()))
            .min(Comparator
                .comparing(OpsCleanerStatusDO::getCurrentTaskCount)  // 任务数少的优先
                .thenComparing(OpsCleanerStatusDO::getBatteryLevel).reversed()  // 电量高的优先
                .thenComparing(OpsCleanerStatusDO::getLastHeartbeatTime))  // 心跳新的优先
            .orElse(cleaners.get(0));
    }
}

4.3.4 调度决策

路径决策策略

@Component
public class DefaultScheduleStrategy implements ScheduleStrategy {

    @Override
    public DispatchDecision decide(OrderDispatchContext context) {
        Long orderId = context.getOrderId();
        PriorityEnum priority = context.getPriority();

        // 获取推荐执行人
        AssigneeRecommendation recommendation = context.getRecommendation();
        if (!recommendation.isFound()) {
            // 无可用执行人,仅入队
            return DispatchDecision.enqueueOnly("无可用执行人,等待分配");
        }

        // 获取执行人状态
        CleanerStatusDO assigneeStatus = cleanerStatusService.getStatus(recommendation.getAssigneeId());

        // 决策流程
        if (assigneeStatus.isIdle() && assigneeStatus.getCurrentTaskCount() == 0) {
            // 执行人空闲且无任务
            Long waitingCount = orderQueueService.getWaitingCount(assigneeStatus.getUserId());

            if (waitingCount > 0) {
                // 有等待任务,推送通知并入队
                return DispatchDecision.pushAndEnqueue("执行人有等待任务,推送通知并入队");
            } else {
                // 直接派单
                return DispatchDecision.directDispatch("执行人空闲,直接派单");
            }
        } else if (priority == PriorityEnum.P0) {
            // P0 紧急任务,打断当前任务
            Long currentOrderId = assigneeStatus.getCurrentOrderId();
            return DispatchDecision.interruptAndDispatch(currentOrderId, "P0紧急任务打断当前任务");
        } else {
            // 普通任务,仅入队
            return DispatchDecision.enqueueOnly("执行人忙碌,入队等待");
        }
    }
}

4.3.5 派单决策类型

决策类型 说明 条件
DIRECT_DISPATCH 直接派单 执行人空闲且无等待任务
PUSH_AND_ENQUEUE 推送并入队 执行人空闲但有等待任务
ENQUEUE_ONLY 仅入队 执行人忙碌或无可用执行人
INTERRUPT_AND_DISPATCH 打断派单 P0紧急任务且执行人忙碌

4.3.6 技术要点

  1. 推荐算法:综合考虑状态、电量、心跳时间、工作量平衡
  2. 调度路径:支持四种派单模式,适应不同场景
  3. 策略注册:通过 @PostConstruct 自动注册策略到引擎
  4. 决策透明:推荐理由和调度决策都有详细日志

4.3.7 优缺点分析

优点

  • 高度可扩展:新增业务线只需实现策略接口
  • 决策透明:推荐理由和调度决策都有详细日志
  • 性能优化:优先级和缓存机制提高推荐效率
  • 业务解耦:纯通用逻辑,业务特定状态通过事件监听实现

缺点

  • 策略耦合:策略实现需要依赖多个服务
  • 调试复杂:多层决策链路增加调试难度
  • 扩展成本:新增策略需要考虑与现有策略的协调
  • 冷启动问题:无历史数据时推荐准确度较低

4.4 事件驱动架构

4.4.1 设计思路

通过 领域事件 解耦状态机与业务逻辑:

  1. 事件定义:定义领域事件(如 OrderStateChangedEvent
  2. 事件发布:状态机在状态变更后发布事件
  3. 事件订阅:业务方通过 @EventListener 订阅事件并处理

核心优势

  • 松耦合:状态机与业务逻辑完全解耦
  • 异步性:事件异步处理,提高系统吞吐量
  • 可扩展:新增业务逻辑只需添加事件监听器

4.4.2 事件定义

状态变更事件

@Data
@AllArgsConstructor
public class OrderStateChangedEvent {

    /**
     * 工单ID
     */
    private Long orderId;

    /**
     * 旧状态
     */
    private WorkOrderStatusEnum oldStatus;

    /**
     * 新状态
     */
    private WorkOrderStatusEnum newStatus;

    /**
     * 操作人类型
     */
    private OperatorTypeEnum operatorType;

    /**
     * 操作人ID
     */
    private Long operatorId;

    /**
     * 扩展参数(用于传递业务特定信息)
     */
    private Map<String, Object> payload;

    public OrderStateChangedEvent(Long orderId, WorkOrderStatusEnum oldStatus,
                                   WorkOrderStatusEnum newStatus,
                                   OperatorTypeEnum operatorType, Long operatorId) {
        this.orderId = orderId;
        this.oldStatus = oldStatus;
        this.newStatus = newStatus;
        this.operatorType = operatorType;
        this.operatorId = operatorId;
        this.payload = new HashMap<>();
    }

    /**
     * 添加扩展参数
     */
    public OrderStateChangedEvent addPayload(String key, Object value) {
        this.payload.put(key, value);
        return this;
    }

    /**
     * 获取扩展参数
     */
    @SuppressWarnings("unchecked")
    public <T> T getPayload(String key, Class<T> type) {
        Object value = this.payload.get(key);
        return value != null ? (T) value : null;
    }
}

4.4.3 事件发布

事件发布器

@Component
public class OrderEventPublisher {

    @Autowired
    private ApplicationEventPublisher applicationEventPublisher;

    /**
     * 发布状态变更事件
     */
    public void publishStateChanged(OrderStateChangedEvent event) {
        try {
            applicationEventPublisher.publishEvent(event);
            log.info("发布状态变更事件成功: orderId={}, {} → {}",
                event.getOrderId(), event.getOldStatus(), event.getNewStatus());
        } catch (Exception e) {
            // 事件发布失败不影响主流程
            log.error("发布状态变更事件失败: orderId={}", event.getOrderId(), e);
        }
    }

    /**
     * 异步发布事件
     */
    @Async
    public void publishStateChangedAsync(OrderStateChangedEvent event) {
        publishStateChanged(event);
    }
}

4.4.4 事件订阅

保洁业务事件处理器

@Component
public class CleanOrderEventHandler {

    @Autowired
    private CleanerStatusService cleanerStatusService;

    @Autowired
    private OrderQueueService orderQueueService;

    /**
     * 监听工单派单事件
     */
    @EventListener
    @Async
    public void onOrderDispatched(OrderStateChangedEvent event) {
        if (!event.getNewStatus().equals(WorkOrderStatusEnum.DISPATCHED)) {
            return;
        }

        Long orderId = event.getOrderId();
        Long cleanerId = event.getOperatorId();

        log.info("工单已派单: orderId={}, cleanerId={}", orderId, cleanerId);

        // 更新保洁员状态
        cleanerStatusService.updateCurrentTask(cleanerId, orderId);

        // 发送工牌通知
        badgeNotificationService.notifyCleaner(cleanerId, orderId);
    }

    /**
     * 监听工单完成事件
     */
    @EventListener
    @Async
    public void onOrderCompleted(OrderStateChangedEvent event) {
        if (!event.getNewStatus().equals(WorkOrderStatusEnum.COMPLETED)) {
            return;
        }

        Long orderId = event.getOrderId();
        Long cleanerId = event.getOperatorId();

        log.info("工单已完成: orderId={}, cleanerId={}", orderId, cleanerId);

        // 清理保洁员当前任务
        cleanerStatusService.clearCurrentTask(cleanerId);

        // 移除队列记录
        orderQueueService.removeTask(orderId, cleanerId);

        // 自动派发下一个任务
        autoDispatchNext(cleanerId);
    }

    /**
     * 自动派发下一个任务
     */
    private void autoDispatchNext(Long cleanerId) {
        // 从队列中获取下一个任务
        OpsOrderQueueDO nextTask = orderQueueService.dequeue(cleanerId);

        if (nextTask != null) {
            log.info("自动派发下一个任务: taskId={}, cleanerId={}", nextTask.getOrderId(), cleanerId);

            // 派单
            dispatchEngine.autoDispatch(nextTask.getOrderId(), cleanerId);
        }
    }
}

4.4.5 技术要点

  1. 异常隔离:事件发布异常被捕获,不影响主流程
  2. 参数传递:通过 payload 传递业务特定信息,如中断原因
  3. 异步处理:使用 @Async 实现异步事件处理
  4. 事务隔离:事件处理在独立事务中,避免影响主流程

4.4.6 优缺点分析

优点

  • 松耦合:状态机与业务逻辑完全解耦
  • 异步性:事件异步处理,提高系统吞吐量
  • 可扩展:新增业务逻辑只需添加事件监听器
  • 易于测试:事件发布和订阅可以独立测试

缺点

  • 时序问题:异步事件可能导致处理顺序不确定
  • 调试困难:事件链路分散,问题定位复杂
  • 性能开销:频繁的事件发布可能影响性能
  • 一致性挑战:异步处理可能导致数据短暂不一致

4.5 P0 紧急打断机制

4.5.1 设计思路

P0 紧急任务打断机制用于处理紧急情况,保证高优先级任务能够立即获得资源:

核心原则

  1. 优先级特权P0 任务具有最高优先权,可以打断任何非 P0 任务
  2. 状态管理:被打断的任务进入 PAUSED 状态,保留恢复能力
  3. 自动恢复P0 任务完成后自动恢复被打断的任务

4.5.2 打断条件

可以被打断的状态

  • DISPATCHED(已派单)
  • CONFIRMED(已确认)
  • ARRIVED(已到岗)

不可以被打断的状态

  • PENDING(待接单)
  • QUEUED(已入队)
  • PAUSED(已暂停)
  • COMPLETED(已完成)
  • CANCELLED(已取消)

4.5.3 打断流程

P0 紧急工单创建
    ↓
查找附近保洁员
    ↓
保洁员状态为 BUSY
    ├─ 否 → 直接派单
    └─ 是 → 检查当前任务优先级
           ↓
       当前任务 < P0
           ├─ 是 → 暂停当前任务PAUSED
           │        派发 P0 工单
           │        P0 完成后自动恢复
           └─ 否 → P0 入队等待

4.5.4 核心实现

打断操作

@Service
public class OrderLifecycleManager {

    @Autowired
    private OrderStateMachine stateMachine;

    @Autowired
    private OrderQueueService orderQueueService;

    @Autowired
    private CleanerStatusService cleanerStatusService;

    /**
     * 打断当前任务并派发紧急任务
     */
    @Transactional(rollbackFor = Exception.class)
    public void interruptAndDispatch(Long currentOrderId, Long urgentOrderId,
                                     Long assigneeId, Long operatorId) {
        // 1. 参数校验
        OpsOrderDO currentOrder = orderService.getById(currentOrderId);
        if (currentOrder == null) {
            throw new IllegalArgumentException("当前任务不存在");
        }

        OpsOrderDO urgentOrder = orderService.getById(urgentOrderId);
        if (urgentOrder == null) {
            throw new IllegalArgumentException("紧急任务不存在");
        }

        // 2. 检查打断条件
        WorkOrderStatusEnum currentStatus = currentOrder.getStatus();
        if (!canInterrupt(currentStatus)) {
            throw new IllegalStateException(
                String.format("当前任务状态不允许打断: %s", currentStatus));
        }

        // 3. 暂停当前任务
        pauseOrder(currentOrderId, assigneeId, operatorId, "被P0任务打断: " + urgentOrderId);

        // 4. 记录打断关系
        recordInterruptRelation(currentOrderId, urgentOrderId);

        // 5. 派发紧急任务
        assignOrder(urgentOrderId, assigneeId, operatorId);

        log.info("打断并派发成功: currentOrderId={}, urgentOrderId={}, assigneeId={}",
            currentOrderId, urgentOrderId, assigneeId);
    }

    /**
     * 暂停工单(同时暂停工单状态和队列状态)
     */
    @Transactional(rollbackFor = Exception.class)
    public void pauseOrder(Long orderId, Long assigneeId, Long operatorId, String reason) {
        // 1. 暂停工单状态
        OpsOrderDO order = orderService.getById(orderId);
        stateMachine.transition(order, WorkOrderStatusEnum.PAUSED,
            OperatorTypeEnum.SYSTEM, operatorId, reason);

        // 2. 暂停队列状态
        orderQueueService.pauseTask(orderId, assigneeId);

        // 3. 更新保洁员状态
        cleanerStatusService.pauseCurrentTask(assigneeId, orderId);

        log.info("暂停工单成功: orderId={}, assigneeId={}, reason={}", orderId, assigneeId, reason);
    }

    /**
     * 恢复工单(同时恢复工单状态和队列状态)
     */
    @Transactional(rollbackFor = Exception.class)
    public void resumeOrder(Long orderId, Long assigneeId, Long operatorId) {
        // 1. 恢复工单状态
        OpsOrderDO order = orderService.getById(orderId);
        stateMachine.transition(order, WorkOrderStatusEnum.ARRIVED,
            OperatorTypeEnum.SYSTEM, operatorId, "自动恢复被中断的任务");

        // 2. 恢复队列状态
        orderQueueService.resumeTask(orderId, assigneeId);

        // 3. 更新保洁员状态
        cleanerStatusService.resumeCurrentTask(assigneeId, orderId);

        log.info("恢复工单成功: orderId={}, assigneeId={}", orderId, assigneeId);
    }

    /**
     * 检查是否可以打断
     */
    private boolean canInterrupt(WorkOrderStatusEnum status) {
        return Set.of(
            WorkOrderStatusEnum.DISPATCHED,
            WorkOrderStatusEnum.CONFIRMED,
            WorkOrderStatusEnum.ARRIVED
        ).contains(status);
 }

    /**
     * 记录打断关系
     */
    private void recordInterruptRelation(Long interruptedOrderId, Long urgentOrderId) {
        // 存储打断关系到 Redis
        String key = "interrupt:relation:" + urgentOrderId;
        redisTemplate.opsForValue().set(key, interruptedOrderId, 24, TimeUnit.HOURS);
    }
}

自动恢复机制

@Component
public class CleanOrderEventHandler {

    /**
     * 监听 P0 任务完成事件,自动恢复被打断的任务
     */
    @EventListener
    @Async
    public void onUrgentOrderCompleted(OrderStateChangedEvent event) {
        if (!event.getNewStatus().equals(WorkOrderStatusEnum.COMPLETED)) {
            return;
        }

        Long urgentOrderId = event.getOrderId();

        // 检查是否有被打断的任务
        String key = "interrupt:relation:" + urgentOrderId;
        Long interruptedOrderId = (Long) redisTemplate.opsForValue().get(key);

        if (interruptedOrderId != null) {
            log.info("检测到被打断任务,自动恢复: urgentOrderId={}, interruptedOrderId={}",
                urgentOrderId, interruptedOrderId);

            // 恢复被打断的任务
            Long assigneeId = event.getOperatorId();
            orderLifecycleManager.resumeOrder(interruptedOrderId, assigneeId, 0L);

            // 清理打断关系
            redisTemplate.delete(key);
        }
    }
}

4.5.5 技术要点

  1. 打断条件:只有 DISPATCHED、CONFIRMED、ARRIVED 状态的任务可以被打断
  2. 事务保证:打断操作在同一事务中完成状态转换和事件发布
  3. 自动调度P0 任务完成后通过 autoDispatchNext 自动恢复中断任务
  4. 关系记录:使用 Redis 记录打断关系,支持自动恢复

4.5.6 优缺点分析

优点

  • 紧急响应P0 任务可以立即获得资源,满足紧急需求
  • 状态完整:被打断的任务状态完整保留,可以无缝恢复
  • 业务连续性:保证了任务的连续性和完整性
  • 自动化P0 完成后自动恢复被打断任务,无需人工干预

缺点

  • 资源抢占:可能导致其他任务被长时间中断
  • 用户体验:被打断的用户可能需要等待较长时间
  • 实现复杂:需要处理多种状态转换和恢复场景
  • 级联影响:频繁打断可能导致任务积压

4.6 技术方案总结

4.6.1 方案对比

技术方案 核心优势 主要挑战 适用场景
状态机 状态转换清晰、易于维护 复杂业务需要额外处理 工单生命周期管理
优先级队列 高性能、高可靠 架构复杂、一致性挑战 高并发工单调度
智能派单 可扩展、决策透明 策略耦合、调试复杂 多业务线智能调度
事件驱动 松耦合、易扩展 时序问题、调试困难 业务逻辑解耦
P0 打断 紧急响应、自动化 资源抢占、用户体验 紧急任务处理

4.6.2 技术选型建议

小规模部署(< 1000 工单/天)

  • 队列:纯 MySQL 实现,降低架构复杂度
  • 派单:简单的轮询或随机策略
  • 事件:同步处理,降低异步复杂度

中规模部署1000-10000 工单/天)

  • 队列Redis + MySQL 混合架构
  • 派单:区域优先策略
  • 事件:异步处理,提高吞吐量

大规模部署(> 10000 工单/天)

  • 队列Redis Cluster + MySQL 主从
  • 派单:多策略组合(区域优先 + 负载均衡 + 技能匹配)
  • 事件消息队列RocketMQ/Kafka+ 异步处理

4.6.3 性能优化建议

  1. 缓存优化

    • 保洁员状态缓存5分钟过期
    • 区域树缓存(静态数据)
    • 派单策略结果缓存1分钟过期
  2. 批量操作

    • 批量查询保洁员状态
    • 批量更新队列状态
    • Pipeline 批量 Redis 操作
  3. 异步处理

    • 事件异步发布
    • 通知异步发送
    • 统计数据异步计算
  4. 数据库优化

    • 合理设计索引
    • 分库分表(按业务线或时间)
    • 读写分离

下一章Part 5: 数据模型设计