diff --git a/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/queue/OrderQueueServiceEnhanced.java b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/queue/OrderQueueServiceEnhanced.java
index d9425ac..5951d25 100644
--- a/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/queue/OrderQueueServiceEnhanced.java
+++ b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/queue/OrderQueueServiceEnhanced.java
@@ -1,689 +1,690 @@
-package com.viewsh.module.ops.service.queue;
-
-import com.viewsh.module.ops.api.queue.OrderQueueDTO;
-import com.viewsh.module.ops.api.queue.OrderQueueService;
-import com.viewsh.module.ops.dal.dataobject.queue.OpsOrderQueueDO;
-import com.viewsh.module.ops.dal.mysql.queue.OpsOrderQueueMapper;
-import com.viewsh.module.ops.enums.OrderQueueStatusEnum;
-import com.viewsh.module.ops.enums.PriorityEnum;
-import jakarta.annotation.Resource;
-import lombok.extern.slf4j.Slf4j;
-import org.springframework.beans.BeanUtils;
-import org.springframework.context.annotation.Primary;
-import org.springframework.stereotype.Service;
-import org.springframework.transaction.annotation.Transactional;
-
-import java.time.LocalDateTime;
-import java.time.ZoneId;
-import java.util.*;
-import java.util.concurrent.CompletableFuture;
-import java.util.stream.Collectors;
-
-/**
- * 工单队列管理服务实现
- *
- * 架构说明:
- * 1. 写入:先写 MySQL(持久化),再异步写 Redis(高性能)
- * 2. 读取:优先读 Redis,未命中则读 MySQL 并同步到 Redis
- * 3. 同步:定时任务将 MySQL 数据同步到 Redis
- * 4. 容灾:Redis 宕机时降级到纯 MySQL 模式
- *
- * @author lzh
- */
-@Slf4j
+package com.viewsh.module.ops.service.queue;
+
+import com.viewsh.module.ops.api.queue.OrderQueueDTO;
+import com.viewsh.module.ops.api.queue.OrderQueueService;
+import com.viewsh.module.ops.dal.dataobject.queue.OpsOrderQueueDO;
+import com.viewsh.module.ops.dal.mysql.queue.OpsOrderQueueMapper;
+import com.viewsh.module.ops.enums.OrderQueueStatusEnum;
+import com.viewsh.module.ops.enums.PriorityEnum;
+import jakarta.annotation.Resource;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.BeanUtils;
+import org.springframework.context.annotation.Primary;
+import org.springframework.stereotype.Service;
+import org.springframework.transaction.annotation.Transactional;
+
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.util.*;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+/**
+ * 工单队列管理服务实现
+ *
+ * 架构说明:
+ * 1. 写入:先写 MySQL(持久化),再异步写 Redis(高性能)
+ * 2. 读取:优先读 Redis,未命中则读 MySQL 并同步到 Redis
+ * 3. 同步:定时任务将 MySQL 数据同步到 Redis
+ * 4. 容灾:Redis 宕机时降级到纯 MySQL 模式
+ *
+ * @author lzh
+ */
+@Slf4j
@Service
+@Primary
public class OrderQueueServiceEnhanced implements OrderQueueService {
-
- /**
- * Score 计算公式:优先级分数 + 时间戳
- * 优先级分数:P0=0, P1=1000000, P2=2000000, P3=3000000
- * 时间戳:秒级时间戳
- * 结果:优先级高的排在前面,同优先级按时间排序
- */
- private static final Map PRIORITY_SCORES = Map.of(
- 0, 0L, // P0: 0
- 1, 1000000L, // P1: 1,000,000
- 2, 2000000L, // P2: 2,000,000
- 3, 3000000L // P3: 3,000,000
- );
-
- @Resource
- private OpsOrderQueueMapper orderQueueMapper;
-
- @Resource
- private RedisOrderQueueService redisQueueService;
-
- @Resource
- private QueueSyncService queueSyncService;
-
- @Override
- @Transactional(rollbackFor = Exception.class)
- public Long enqueue(Long opsOrderId, Long userId, PriorityEnum priority, Integer queueIndex) {
- // 1. 检查工单是否已在队列中(MySQL)
- OpsOrderQueueDO existingQueue = orderQueueMapper.selectByOpsOrderId(opsOrderId);
- if (existingQueue != null) {
- log.warn("工单已在队列中: opsOrderId={}, userId={}", opsOrderId, userId);
- return existingQueue.getId();
- }
-
- // 2. 计算队列分数
- LocalDateTime now = LocalDateTime.now();
- double queueScore = calculateQueueScore(priority.getPriority(), now);
-
- // 3. 创建队列记录(MySQL)
- OpsOrderQueueDO queueDO = OpsOrderQueueDO.builder()
- .opsOrderId(opsOrderId)
- .userId(userId)
- .queueIndex(queueIndex != null ? queueIndex : calculateNextQueueIndex(priority))
- .priority(priority.getPriority())
- .queueScore(queueScore)
- .queueStatus(OrderQueueStatusEnum.WAITING.getStatus())
- .enqueueTime(now)
- .pausedDuration(0)
- .eventMessage("工单入队,等待派单")
- .build();
-
- orderQueueMapper.insert(queueDO);
-
- log.info("工单已入队(MySQL): opsOrderId={}, userId={}, priority={}, queueId={}",
- opsOrderId, userId, priority, queueDO.getId());
-
- // 3. 异步写入 Redis(失败不影响主流程)
- OrderQueueDTO dto = convertToDTO(queueDO);
- CompletableFuture.runAsync(() -> {
- try {
- redisQueueService.enqueue(dto);
- log.debug("工单已入队(Redis): queueId={}", dto.getId());
- } catch (Exception e) {
- log.error("Redis 入队失败,依赖定时同步任务补偿: queueId={}", dto.getId(), e);
- }
- });
-
- // 4. 如果是P0紧急任务,记录警告日志
- if (priority.isUrgent()) {
- log.warn("检测到P0紧急任务入队,需立即处理: opsOrderId={}", opsOrderId);
- // TODO: 触发紧急派单流程(在派单引擎中实现)
- }
-
- return queueDO.getId();
- }
-
- @Override
- @Transactional(rollbackFor = Exception.class)
- public boolean dequeue(Long queueId) {
- // 1. MySQL 删除
- OpsOrderQueueDO queueDO = orderQueueMapper.selectById(queueId);
- if (queueDO == null) {
- log.warn("队列记录不存在: queueId={}", queueId);
- return false;
- }
-
- // 只允许删除终态的记录
- if (!isTerminalStatus(queueDO.getQueueStatus())) {
- log.warn("只能删除终态的队列记录: queueId={}, status={}", queueId, queueDO.getQueueStatus());
- return false;
- }
-
- int deleted = orderQueueMapper.deleteById(queueId);
-
- // 2. 异步删除 Redis
- if (deleted > 0) {
- CompletableFuture.runAsync(() -> {
- try {
- redisQueueService.remove(queueId);
- log.debug("Redis 删除队列记录成功: queueId={}", queueId);
- } catch (Exception e) {
- log.error("Redis 删除队列记录失败: queueId={}", queueId, e);
- }
- });
- }
-
- log.info("队列记录已删除: queueId={}, deleted={}", queueId, deleted > 0);
- return deleted > 0;
- }
-
- @Override
- @Transactional(rollbackFor = Exception.class)
- public boolean dequeueByOpsOrderId(Long opsOrderId) {
- OpsOrderQueueDO queueDO = orderQueueMapper.selectByOpsOrderId(opsOrderId);
- if (queueDO == null) {
- log.warn("工单不在队列中: opsOrderId={}", opsOrderId);
- return false;
- }
-
- return dequeue(queueDO.getId());
- }
-
- @Override
- @Transactional(rollbackFor = Exception.class)
- public boolean updateStatus(Long queueId, OrderQueueStatusEnum newStatus) {
- // 1. MySQL 更新
- OpsOrderQueueDO queueDO = orderQueueMapper.selectById(queueId);
- if (queueDO == null) {
- log.warn("队列记录不存在: queueId={}", queueId);
- return false;
- }
-
- String oldStatus = queueDO.getQueueStatus();
-
- // 状态流转校验
- if (!validateStatusTransition(oldStatus, newStatus.getStatus())) {
- log.warn("非法的状态流转: queueId={}, oldStatus={}, newStatus={}",
- queueId, oldStatus, newStatus.getStatus());
- return false;
- }
-
- queueDO.setQueueStatus(newStatus.getStatus().toUpperCase());
- updateStatusTimestamp(queueDO, newStatus);
-
- int updated = orderQueueMapper.updateById(queueDO);
-
- // 2. 异步更新 Redis
- if (updated > 0) {
- OrderQueueDTO dto = convertToDTO(queueDO);
- CompletableFuture.runAsync(() -> {
- try {
- redisQueueService.updateStatus(queueId, newStatus.getStatus());
- log.debug("Redis 更新状态成功: queueId={}, status={}", queueId, newStatus.getStatus());
- } catch (Exception e) {
- log.error("Redis 更新状态失败: queueId={}", queueId, e);
- }
- });
- }
-
- log.info("队列状态已更新: queueId={}, opsOrderId={}, oldStatus={}, newStatus={}",
- queueId, queueDO.getOpsOrderId(), oldStatus, newStatus.getStatus());
-
- return updated > 0;
- }
-
- @Override
- @Transactional(rollbackFor = Exception.class)
- public boolean startExecution(Long queueId) {
- OpsOrderQueueDO queueDO = orderQueueMapper.selectById(queueId);
- if (queueDO == null) {
- log.warn("队列记录不存在: queueId={}", queueId);
- return false;
- }
-
- if (!OrderQueueStatusEnum.WAITING.getStatus().equals(queueDO.getQueueStatus())) {
- log.warn("只有WAITING状态的任务可以开始执行: queueId={}, status={}",
- queueId, queueDO.getQueueStatus());
- return false;
- }
-
- // 队列状态流转:WAITING → PROCESSING
- queueDO.setQueueStatus(OrderQueueStatusEnum.PROCESSING.getStatus());
- queueDO.setDequeueTime(LocalDateTime.now());
- queueDO.setEventMessage("派单成功,已分配给执行人员");
-
- int updated = orderQueueMapper.updateById(queueDO);
-
- // 异步更新 Redis
- if (updated > 0) {
- CompletableFuture.runAsync(() -> {
- try {
- redisQueueService.updateStatus(queueId, OrderQueueStatusEnum.PROCESSING.getStatus());
- } catch (Exception e) {
- log.error("Redis 更新状态失败: queueId={}", queueId, e);
- }
- });
- }
-
- log.info("派单成功: queueId={}, opsOrderId={}, assigneeId={}",
- queueId, queueDO.getOpsOrderId(), queueDO.getUserId());
-
- return updated > 0;
- }
-
- @Override
- @Transactional(rollbackFor = Exception.class)
- public boolean pauseTask(Long queueId) {
- OpsOrderQueueDO queueDO = orderQueueMapper.selectById(queueId);
- if (queueDO == null) {
- log.warn("队列记录不存在: queueId={}", queueId);
- return false;
- }
-
- // 允许 PROCESSING 状态的任务可以暂停
- String currentStatus = queueDO.getQueueStatus();
- if (!OrderQueueStatusEnum.PROCESSING.getStatus().equals(currentStatus)) {
- log.warn("只有PROCESSING状态的任务可以暂停: queueId={}, status={}",
- queueId, currentStatus);
- return false;
- }
-
- // 队列状态流转:PROCESSING → PAUSED
- queueDO.setQueueStatus(OrderQueueStatusEnum.PAUSED.getStatus());
-
- // 计算暂停时长
- if (queueDO.getDequeueTime() != null) {
- long pausedSeconds = java.time.Duration.between(
- queueDO.getDequeueTime(),
- LocalDateTime.now()
- ).getSeconds();
- queueDO.addPausedDuration((int) pausedSeconds);
- }
-
- queueDO.setEventMessage("任务已暂停");
- int updated = orderQueueMapper.updateById(queueDO);
-
- // 异步更新 Redis
- if (updated > 0) {
- CompletableFuture.runAsync(() -> {
- try {
- redisQueueService.updateStatus(queueId, OrderQueueStatusEnum.PAUSED.getStatus());
- } catch (Exception e) {
- log.error("Redis 更新状态失败: queueId={}", queueId, e);
- }
- });
- }
-
- log.info("任务已暂停: queueId={}, opsOrderId={}, pausedDuration={}",
- queueId, queueDO.getOpsOrderId(), queueDO.getPausedDuration());
-
- return updated > 0;
- }
-
- @Override
- @Transactional(rollbackFor = Exception.class)
- public boolean resumeTask(Long queueId) {
- OpsOrderQueueDO queueDO = orderQueueMapper.selectById(queueId);
- if (queueDO == null) {
- log.warn("队列记录不存在: queueId={}", queueId);
- return false;
- }
-
- // 只有 PAUSED 状态的任务可以恢复
- String currentStatus = queueDO.getQueueStatus();
- if (!OrderQueueStatusEnum.PAUSED.getStatus().equals(currentStatus)) {
- log.warn("只有PAUSED状态的任务可以恢复: queueId={}, status={}",
- queueId, currentStatus);
- return false;
- }
-
- // 队列状态流转:PAUSED → PROCESSING
- queueDO.setQueueStatus(OrderQueueStatusEnum.PROCESSING.getStatus());
- queueDO.setDequeueTime(LocalDateTime.now()); // 重置出队时间
- queueDO.setEventMessage("任务已恢复执行");
-
- int updated = orderQueueMapper.updateById(queueDO);
-
- // 异步更新 Redis
- if (updated > 0) {
- CompletableFuture.runAsync(() -> {
- try {
- redisQueueService.updateStatus(queueId, OrderQueueStatusEnum.PROCESSING.getStatus());
- } catch (Exception e) {
- log.error("Redis 更新状态失败: queueId={}", queueId, e);
- }
- });
- }
-
- log.info("任务已恢复执行: queueId={}, opsOrderId={}", queueId, queueDO.getOpsOrderId());
-
- return updated > 0;
- }
-
- @Override
- @Transactional(rollbackFor = Exception.class)
- public boolean adjustPriority(Long queueId, PriorityEnum newPriority, String reason) {
- OpsOrderQueueDO queueDO = orderQueueMapper.selectById(queueId);
- if (queueDO == null) {
- log.warn("队列记录不存在: queueId={}", queueId);
- return false;
- }
-
- Integer oldPriority = queueDO.getPriority();
-
- // 如果优先级没有变化,直接返回
- if (oldPriority.equals(newPriority.getPriority())) {
- log.info("优先级未变化: queueId={}, priority={}", queueId, newPriority);
- return true;
- }
-
- queueDO.setPriorityEnum(newPriority);
- // 重新计算队列顺序
- queueDO.setQueueIndex(calculateNextQueueIndex(newPriority));
- // 重新计算队列分数(使用原入队时间保持时间戳不变)
- LocalDateTime enqueueTime = queueDO.getEnqueueTime() != null ? queueDO.getEnqueueTime() : LocalDateTime.now();
- double newQueueScore = calculateQueueScore(newPriority.getPriority(), enqueueTime);
- queueDO.setQueueScore(newQueueScore);
- queueDO.setEventMessage("优先级调整: " + oldPriority + " -> " + newPriority + ", 原因: " + reason);
-
- int updated = orderQueueMapper.updateById(queueDO);
-
- // 异步更新 Redis
- if (updated > 0) {
- CompletableFuture.runAsync(() -> {
- try {
- redisQueueService.updatePriority(queueId, newPriority.getPriority());
- } catch (Exception e) {
- log.error("Redis 更新优先级失败: queueId={}", queueId, e);
- }
- });
- }
-
- log.info("优先级已调整: queueId={}, opsOrderId={}, oldPriority={}, newPriority={}, reason={}, newScore={}",
- queueId, queueDO.getOpsOrderId(), oldPriority, newPriority, reason, newQueueScore);
-
- // 如果升级为P0,触发紧急派单
- if (newPriority.isUrgent()) {
- log.warn("任务升级为P0紧急,需立即处理: queueId={}, opsOrderId={}",
- queueId, queueDO.getOpsOrderId());
- // TODO: 触发紧急派单流程
- }
-
- return updated > 0;
- }
-
- @Override
- @Transactional(rollbackFor = Exception.class)
- public boolean adjustPriorityByOpsOrderId(Long opsOrderId, PriorityEnum newPriority, String reason) {
- OpsOrderQueueDO queueDO = orderQueueMapper.selectByOpsOrderId(opsOrderId);
- if (queueDO == null) {
- log.warn("工单不在队列中: opsOrderId={}", opsOrderId);
- return false;
- }
-
- return adjustPriority(queueDO.getId(), newPriority, reason);
- }
-
- @Override
- @Transactional(rollbackFor = Exception.class)
- public boolean upgradeToUrgent(Long opsOrderId, String reason) {
- return adjustPriorityByOpsOrderId(opsOrderId, PriorityEnum.P0, reason);
- }
-
- @Override
- public List getWaitingQueue() {
- // 优先从 Redis 获取
- // TODO: 实现全局等待队列(需要聚合所有用户的队列)
- // 这里暂时使用 MySQL
- List list = orderQueueMapper.selectListWaiting();
- return convertToDTO(list);
- }
-
- @Override
- public List getUrgentOrders() {
- // TODO: 优先从 Redis 获取
- List list = orderQueueMapper.selectUrgentOrders();
- return convertToDTO(list);
- }
-
- @Override
- public OrderQueueDTO getCurrentTaskByUserId(Long userId) {
- // 1. 优先从 Redis 获取
- List redisTasks = redisQueueService.peekTasks(userId, 1);
- if (redisTasks != null && !redisTasks.isEmpty()) {
- return redisTasks.get(0);
- }
-
- // 2. Redis 未命中,从 MySQL 获取并同步到 Redis
- OpsOrderQueueDO queueDO = orderQueueMapper.selectCurrentExecutingByUserId(userId);
- if (queueDO != null) {
- // 同步到 Redis
- OrderQueueDTO dto = convertToDTO(queueDO);
- CompletableFuture.runAsync(() -> {
- try {
- redisQueueService.enqueue(dto);
- } catch (Exception e) {
- log.error("同步到 Redis 失败: queueId={}", dto.getId(), e);
- }
- });
- return dto;
- }
-
- return null;
- }
-
- @Override
- public List getTasksByUserId(Long userId) {
- // 1. 优先从 Redis 获取
- List redisTasks = redisQueueService.getTasksByUserId(userId);
- if (redisTasks != null && !redisTasks.isEmpty()) {
- return redisTasks;
- }
-
- // 2. Redis 未命中,从 MySQL 获取并同步到 Redis
- List mysqlList = orderQueueMapper.selectListByUserId(userId);
- if (mysqlList != null && !mysqlList.isEmpty()) {
- // 同步到 Redis
- List dtoList = convertToDTO(mysqlList);
- CompletableFuture.runAsync(() -> {
- try {
- redisQueueService.batchEnqueue(dtoList);
- } catch (Exception e) {
- log.error("批量同步到 Redis 失败: userId={}", userId, e);
- }
- });
- return dtoList;
- }
-
- return Collections.emptyList();
- }
-
- @Override
- public List getWaitingTasksByUserId(Long userId) {
- // 获取所有任务
- List allTasks = getTasksByUserId(userId);
-
- // 过滤出 WAITING 状态的任务,并按队列分数排序
- return allTasks.stream()
- .filter(task -> OrderQueueStatusEnum.WAITING.getStatus().equals(task.getQueueStatus()))
- .sorted((a, b) -> {
- // 优先使用队列分数 score 排序(已在入队时计算好)
- if (a.getQueueScore() != null && b.getQueueScore() != null) {
- return Double.compare(a.getQueueScore(), b.getQueueScore());
- }
-
- // 兜底:如果 score 为空,则按优先级+时间排序
- // 按优先级排序(P0 > P1 > P2 > P3)
- int priorityCompare = Integer.compare(
- b.getPriority() != null ? b.getPriority() : 999,
- a.getPriority() != null ? a.getPriority() : 999
- );
- if (priorityCompare != 0) {
- return priorityCompare;
- }
- // 优先级相同,按入队时间排序
- return a.getEnqueueTime().compareTo(b.getEnqueueTime());
- })
- .collect(Collectors.toList());
- }
-
- @Override
- public List getInterruptedTasksByUserId(Long userId) {
- // 获取所有任务
- List allTasks = getTasksByUserId(userId);
-
- // 过滤出 PAUSED 状态的任务,并按中断时间排序
- return allTasks.stream()
- .filter(task -> OrderQueueStatusEnum.PAUSED.getStatus().equals(task.getQueueStatus()))
- .sorted((a, b) -> {
- // 按中断时间排序(最早中断的排在前面)
- // 如果没有中断时间字段,则按入队时间排序
- if (a.getEnqueueTime() != null && b.getEnqueueTime() != null) {
- return a.getEnqueueTime().compareTo(b.getEnqueueTime());
- }
- return 0;
- })
- .collect(Collectors.toList());
- }
-
- @Override
- public OrderQueueDTO getById(Long queueId) {
- // 1. 优先从 Redis Hash 获取
- OrderQueueDTO redisDTO = redisQueueService.getByQueueId(queueId);
- if (redisDTO != null) {
- return redisDTO;
- }
-
- // 2. Redis 未命中,从 MySQL 获取
- OpsOrderQueueDO queueDO = orderQueueMapper.selectById(queueId);
- if (queueDO != null) {
- OrderQueueDTO dto = convertToDTO(queueDO);
- // 同步到 Redis
- CompletableFuture.runAsync(() -> {
- try {
- redisQueueService.enqueue(dto);
- } catch (Exception e) {
- log.error("同步到 Redis 失败: queueId={}", dto.getId(), e);
- }
- });
- return dto;
- }
-
- return null;
- }
-
- @Override
- public OrderQueueDTO getByOpsOrderId(Long opsOrderId) {
- // 1. 优先从 Redis 获取(遍历所有用户队列,性能较差,慎用)
- OrderQueueDTO redisDTO = redisQueueService.getByOrderId(opsOrderId);
- if (redisDTO != null) {
- return redisDTO;
- }
-
- // 2. Redis 未命中,从 MySQL 获取
- OpsOrderQueueDO queueDO = orderQueueMapper.selectByOpsOrderId(opsOrderId);
- if (queueDO != null) {
- return convertToDTO(queueDO);
- }
-
- return null;
- }
-
- @Override
- public Long countWaiting() {
- // 优先从 Redis 获取(需要聚合所有用户)
- // TODO: 实现分布式计数器
- return orderQueueMapper.countWaiting();
- }
-
- @Override
- public Long countByUserId(Long userId) {
- // 优先从 Redis 获取
- long redisCount = redisQueueService.getQueueSize(userId);
- if (redisCount > 0) {
- return redisCount;
- }
-
- // Redis 未命中,从 MySQL 获取
- return orderQueueMapper.countByUserId(userId);
- }
-
- @Override
- @Transactional(rollbackFor = Exception.class)
- public boolean updateEventMessage(Long queueId, String eventMessage) {
- OpsOrderQueueDO queueDO = orderQueueMapper.selectById(queueId);
- if (queueDO == null) {
- log.warn("队列记录不存在: queueId={}", queueId);
- return false;
- }
-
- queueDO.setEventMessage(eventMessage);
- int updated = orderQueueMapper.updateById(queueDO);
-
- log.info("事件消息已更新: queueId={}, eventMessage={}", queueId, eventMessage);
-
- return updated > 0;
- }
-
- // ========== 私有方法 ==========
-
- /**
- * 计算队列分数(用于排序)
- * 公式:优先级分数 + 时间戳
- *
- * @param priority 优先级(0=P0, 1=P1, 2=P2, 3=P3)
- * @param enqueueTime 入队时间
- * @return 队列分数
- */
- private double calculateQueueScore(Integer priority, LocalDateTime enqueueTime) {
- // 获取优先级分数
- long priorityScore = PRIORITY_SCORES.getOrDefault(priority, 3000000L);
-
- // 计算时间戳(秒级)
- long timestamp;
- if (enqueueTime != null) {
- timestamp = enqueueTime.atZone(ZoneId.systemDefault()).toEpochSecond();
- } else {
- timestamp = System.currentTimeMillis() / 1000;
- }
-
- return priorityScore + timestamp;
- }
-
- /**
- * 计算下一个队列顺序号
- */
- private Integer calculateNextQueueIndex(PriorityEnum priority) {
- // TODO: 查询当前优先级的最大 queueIndex,然后 +1
- // 这里简化处理,返回默认值
- return priority.getQueueOrder() * 1000;
- }
-
- /**
- * 判断是否为终态
- */
- private boolean isTerminalStatus(String status) {
- return "CANCELLED".equals(status) || "COMPLETED".equals(status)
- || "REMOVED".equals(status) || "FAILED".equals(status);
- }
-
- /**
- * 校验状态流转是否合法
- */
- private boolean validateStatusTransition(String oldStatus, String newStatus) {
- // 终态不能再变更
- if (isTerminalStatus(oldStatus)) {
- return false;
- }
-
- // 允许的状态流转(支持新旧状态)
- return switch (oldStatus.toUpperCase()) {
- case "PENDING" -> "WAITING".equals(newStatus) || "CANCELLED".equals(newStatus);
- case "WAITING" -> "PROCESSING".equals(newStatus) || "DISPATCHED".equals(newStatus)
- || "CANCELLED".equals(newStatus);
- case "PROCESSING", "DISPATCHED" -> "PAUSED".equals(newStatus)
- || "REMOVED".equals(newStatus) || "COMPLETED".equals(newStatus);
- case "PAUSED" -> "PROCESSING".equals(newStatus) || "DISPATCHED".equals(newStatus);
- default -> false;
- };
- }
-
- /**
- * 根据状态更新时间戳
- */
- private void updateStatusTimestamp(OpsOrderQueueDO queueDO, OrderQueueStatusEnum status) {
- switch (status.getStatus().toUpperCase()) {
- case "DISPATCHED" -> {
- if (queueDO.getDequeueTime() == null) {
- queueDO.setDequeueTime(LocalDateTime.now());
- }
- }
- case "REMOVED", "COMPLETED" -> {
- // 可以记录完成时间(如果表有该字段)
- }
- }
- }
-
- /**
- * 转换为 DTO
- */
- private OrderQueueDTO convertToDTO(OpsOrderQueueDO queueDO) {
- OrderQueueDTO dto = new OrderQueueDTO();
- BeanUtils.copyProperties(queueDO, dto);
- return dto;
- }
-
- /**
- * 批量转换为 DTO
- */
- private List convertToDTO(List list) {
- return list.stream()
- .map(this::convertToDTO)
- .collect(Collectors.toList());
- }
-}
+
+ /**
+ * Score 计算公式:优先级分数 + 时间戳
+ * 优先级分数:P0=0, P1=1000000, P2=2000000, P3=3000000
+ * 时间戳:秒级时间戳
+ * 结果:优先级高的排在前面,同优先级按时间排序
+ */
+ private static final Map PRIORITY_SCORES = Map.of(
+ 0, 0L, // P0: 0
+ 1, 1000000L, // P1: 1,000,000
+ 2, 2000000L, // P2: 2,000,000
+ 3, 3000000L // P3: 3,000,000
+ );
+
+ @Resource
+ private OpsOrderQueueMapper orderQueueMapper;
+
+ @Resource
+ private RedisOrderQueueService redisQueueService;
+
+ @Resource
+ private QueueSyncService queueSyncService;
+
+ @Override
+ @Transactional(rollbackFor = Exception.class)
+ public Long enqueue(Long opsOrderId, Long userId, PriorityEnum priority, Integer queueIndex) {
+ // 1. 检查工单是否已在队列中(MySQL)
+ OpsOrderQueueDO existingQueue = orderQueueMapper.selectByOpsOrderId(opsOrderId);
+ if (existingQueue != null) {
+ log.warn("工单已在队列中: opsOrderId={}, userId={}", opsOrderId, userId);
+ return existingQueue.getId();
+ }
+
+ // 2. 计算队列分数
+ LocalDateTime now = LocalDateTime.now();
+ double queueScore = calculateQueueScore(priority.getPriority(), now);
+
+ // 3. 创建队列记录(MySQL)
+ OpsOrderQueueDO queueDO = OpsOrderQueueDO.builder()
+ .opsOrderId(opsOrderId)
+ .userId(userId)
+ .queueIndex(queueIndex != null ? queueIndex : calculateNextQueueIndex(priority))
+ .priority(priority.getPriority())
+ .queueScore(queueScore)
+ .queueStatus(OrderQueueStatusEnum.WAITING.getStatus())
+ .enqueueTime(now)
+ .pausedDuration(0)
+ .eventMessage("工单入队,等待派单")
+ .build();
+
+ orderQueueMapper.insert(queueDO);
+
+ log.info("工单已入队(MySQL): opsOrderId={}, userId={}, priority={}, queueId={}",
+ opsOrderId, userId, priority, queueDO.getId());
+
+ // 3. 异步写入 Redis(失败不影响主流程)
+ OrderQueueDTO dto = convertToDTO(queueDO);
+ CompletableFuture.runAsync(() -> {
+ try {
+ redisQueueService.enqueue(dto);
+ log.debug("工单已入队(Redis): queueId={}", dto.getId());
+ } catch (Exception e) {
+ log.error("Redis 入队失败,依赖定时同步任务补偿: queueId={}", dto.getId(), e);
+ }
+ });
+
+ // 4. 如果是P0紧急任务,记录警告日志
+ if (priority.isUrgent()) {
+ log.warn("检测到P0紧急任务入队,需立即处理: opsOrderId={}", opsOrderId);
+ // TODO: 触发紧急派单流程(在派单引擎中实现)
+ }
+
+ return queueDO.getId();
+ }
+
+ @Override
+ @Transactional(rollbackFor = Exception.class)
+ public boolean dequeue(Long queueId) {
+ // 1. MySQL 删除
+ OpsOrderQueueDO queueDO = orderQueueMapper.selectById(queueId);
+ if (queueDO == null) {
+ log.warn("队列记录不存在: queueId={}", queueId);
+ return false;
+ }
+
+ // 只允许删除终态的记录
+ if (!isTerminalStatus(queueDO.getQueueStatus())) {
+ log.warn("只能删除终态的队列记录: queueId={}, status={}", queueId, queueDO.getQueueStatus());
+ return false;
+ }
+
+ int deleted = orderQueueMapper.deleteById(queueId);
+
+ // 2. 异步删除 Redis
+ if (deleted > 0) {
+ CompletableFuture.runAsync(() -> {
+ try {
+ redisQueueService.remove(queueId);
+ log.debug("Redis 删除队列记录成功: queueId={}", queueId);
+ } catch (Exception e) {
+ log.error("Redis 删除队列记录失败: queueId={}", queueId, e);
+ }
+ });
+ }
+
+ log.info("队列记录已删除: queueId={}, deleted={}", queueId, deleted > 0);
+ return deleted > 0;
+ }
+
+ @Override
+ @Transactional(rollbackFor = Exception.class)
+ public boolean dequeueByOpsOrderId(Long opsOrderId) {
+ OpsOrderQueueDO queueDO = orderQueueMapper.selectByOpsOrderId(opsOrderId);
+ if (queueDO == null) {
+ log.warn("工单不在队列中: opsOrderId={}", opsOrderId);
+ return false;
+ }
+
+ return dequeue(queueDO.getId());
+ }
+
+ @Override
+ @Transactional(rollbackFor = Exception.class)
+ public boolean updateStatus(Long queueId, OrderQueueStatusEnum newStatus) {
+ // 1. MySQL 更新
+ OpsOrderQueueDO queueDO = orderQueueMapper.selectById(queueId);
+ if (queueDO == null) {
+ log.warn("队列记录不存在: queueId={}", queueId);
+ return false;
+ }
+
+ String oldStatus = queueDO.getQueueStatus();
+
+ // 状态流转校验
+ if (!validateStatusTransition(oldStatus, newStatus.getStatus())) {
+ log.warn("非法的状态流转: queueId={}, oldStatus={}, newStatus={}",
+ queueId, oldStatus, newStatus.getStatus());
+ return false;
+ }
+
+ queueDO.setQueueStatus(newStatus.getStatus().toUpperCase());
+ updateStatusTimestamp(queueDO, newStatus);
+
+ int updated = orderQueueMapper.updateById(queueDO);
+
+ // 2. 异步更新 Redis
+ if (updated > 0) {
+ OrderQueueDTO dto = convertToDTO(queueDO);
+ CompletableFuture.runAsync(() -> {
+ try {
+ redisQueueService.updateStatus(queueId, newStatus.getStatus());
+ log.debug("Redis 更新状态成功: queueId={}, status={}", queueId, newStatus.getStatus());
+ } catch (Exception e) {
+ log.error("Redis 更新状态失败: queueId={}", queueId, e);
+ }
+ });
+ }
+
+ log.info("队列状态已更新: queueId={}, opsOrderId={}, oldStatus={}, newStatus={}",
+ queueId, queueDO.getOpsOrderId(), oldStatus, newStatus.getStatus());
+
+ return updated > 0;
+ }
+
+ @Override
+ @Transactional(rollbackFor = Exception.class)
+ public boolean startExecution(Long queueId) {
+ OpsOrderQueueDO queueDO = orderQueueMapper.selectById(queueId);
+ if (queueDO == null) {
+ log.warn("队列记录不存在: queueId={}", queueId);
+ return false;
+ }
+
+ if (!OrderQueueStatusEnum.WAITING.getStatus().equals(queueDO.getQueueStatus())) {
+ log.warn("只有WAITING状态的任务可以开始执行: queueId={}, status={}",
+ queueId, queueDO.getQueueStatus());
+ return false;
+ }
+
+ // 队列状态流转:WAITING → PROCESSING
+ queueDO.setQueueStatus(OrderQueueStatusEnum.PROCESSING.getStatus());
+ queueDO.setDequeueTime(LocalDateTime.now());
+ queueDO.setEventMessage("派单成功,已分配给执行人员");
+
+ int updated = orderQueueMapper.updateById(queueDO);
+
+ // 异步更新 Redis
+ if (updated > 0) {
+ CompletableFuture.runAsync(() -> {
+ try {
+ redisQueueService.updateStatus(queueId, OrderQueueStatusEnum.PROCESSING.getStatus());
+ } catch (Exception e) {
+ log.error("Redis 更新状态失败: queueId={}", queueId, e);
+ }
+ });
+ }
+
+ log.info("派单成功: queueId={}, opsOrderId={}, assigneeId={}",
+ queueId, queueDO.getOpsOrderId(), queueDO.getUserId());
+
+ return updated > 0;
+ }
+
+ @Override
+ @Transactional(rollbackFor = Exception.class)
+ public boolean pauseTask(Long queueId) {
+ OpsOrderQueueDO queueDO = orderQueueMapper.selectById(queueId);
+ if (queueDO == null) {
+ log.warn("队列记录不存在: queueId={}", queueId);
+ return false;
+ }
+
+ // 允许 PROCESSING 状态的任务可以暂停
+ String currentStatus = queueDO.getQueueStatus();
+ if (!OrderQueueStatusEnum.PROCESSING.getStatus().equals(currentStatus)) {
+ log.warn("只有PROCESSING状态的任务可以暂停: queueId={}, status={}",
+ queueId, currentStatus);
+ return false;
+ }
+
+ // 队列状态流转:PROCESSING → PAUSED
+ queueDO.setQueueStatus(OrderQueueStatusEnum.PAUSED.getStatus());
+
+ // 计算暂停时长
+ if (queueDO.getDequeueTime() != null) {
+ long pausedSeconds = java.time.Duration.between(
+ queueDO.getDequeueTime(),
+ LocalDateTime.now()
+ ).getSeconds();
+ queueDO.addPausedDuration((int) pausedSeconds);
+ }
+
+ queueDO.setEventMessage("任务已暂停");
+ int updated = orderQueueMapper.updateById(queueDO);
+
+ // 异步更新 Redis
+ if (updated > 0) {
+ CompletableFuture.runAsync(() -> {
+ try {
+ redisQueueService.updateStatus(queueId, OrderQueueStatusEnum.PAUSED.getStatus());
+ } catch (Exception e) {
+ log.error("Redis 更新状态失败: queueId={}", queueId, e);
+ }
+ });
+ }
+
+ log.info("任务已暂停: queueId={}, opsOrderId={}, pausedDuration={}",
+ queueId, queueDO.getOpsOrderId(), queueDO.getPausedDuration());
+
+ return updated > 0;
+ }
+
+ @Override
+ @Transactional(rollbackFor = Exception.class)
+ public boolean resumeTask(Long queueId) {
+ OpsOrderQueueDO queueDO = orderQueueMapper.selectById(queueId);
+ if (queueDO == null) {
+ log.warn("队列记录不存在: queueId={}", queueId);
+ return false;
+ }
+
+ // 只有 PAUSED 状态的任务可以恢复
+ String currentStatus = queueDO.getQueueStatus();
+ if (!OrderQueueStatusEnum.PAUSED.getStatus().equals(currentStatus)) {
+ log.warn("只有PAUSED状态的任务可以恢复: queueId={}, status={}",
+ queueId, currentStatus);
+ return false;
+ }
+
+ // 队列状态流转:PAUSED → PROCESSING
+ queueDO.setQueueStatus(OrderQueueStatusEnum.PROCESSING.getStatus());
+ queueDO.setDequeueTime(LocalDateTime.now()); // 重置出队时间
+ queueDO.setEventMessage("任务已恢复执行");
+
+ int updated = orderQueueMapper.updateById(queueDO);
+
+ // 异步更新 Redis
+ if (updated > 0) {
+ CompletableFuture.runAsync(() -> {
+ try {
+ redisQueueService.updateStatus(queueId, OrderQueueStatusEnum.PROCESSING.getStatus());
+ } catch (Exception e) {
+ log.error("Redis 更新状态失败: queueId={}", queueId, e);
+ }
+ });
+ }
+
+ log.info("任务已恢复执行: queueId={}, opsOrderId={}", queueId, queueDO.getOpsOrderId());
+
+ return updated > 0;
+ }
+
+ @Override
+ @Transactional(rollbackFor = Exception.class)
+ public boolean adjustPriority(Long queueId, PriorityEnum newPriority, String reason) {
+ OpsOrderQueueDO queueDO = orderQueueMapper.selectById(queueId);
+ if (queueDO == null) {
+ log.warn("队列记录不存在: queueId={}", queueId);
+ return false;
+ }
+
+ Integer oldPriority = queueDO.getPriority();
+
+ // 如果优先级没有变化,直接返回
+ if (oldPriority.equals(newPriority.getPriority())) {
+ log.info("优先级未变化: queueId={}, priority={}", queueId, newPriority);
+ return true;
+ }
+
+ queueDO.setPriorityEnum(newPriority);
+ // 重新计算队列顺序
+ queueDO.setQueueIndex(calculateNextQueueIndex(newPriority));
+ // 重新计算队列分数(使用原入队时间保持时间戳不变)
+ LocalDateTime enqueueTime = queueDO.getEnqueueTime() != null ? queueDO.getEnqueueTime() : LocalDateTime.now();
+ double newQueueScore = calculateQueueScore(newPriority.getPriority(), enqueueTime);
+ queueDO.setQueueScore(newQueueScore);
+ queueDO.setEventMessage("优先级调整: " + oldPriority + " -> " + newPriority + ", 原因: " + reason);
+
+ int updated = orderQueueMapper.updateById(queueDO);
+
+ // 异步更新 Redis
+ if (updated > 0) {
+ CompletableFuture.runAsync(() -> {
+ try {
+ redisQueueService.updatePriority(queueId, newPriority.getPriority());
+ } catch (Exception e) {
+ log.error("Redis 更新优先级失败: queueId={}", queueId, e);
+ }
+ });
+ }
+
+ log.info("优先级已调整: queueId={}, opsOrderId={}, oldPriority={}, newPriority={}, reason={}, newScore={}",
+ queueId, queueDO.getOpsOrderId(), oldPriority, newPriority, reason, newQueueScore);
+
+ // 如果升级为P0,触发紧急派单
+ if (newPriority.isUrgent()) {
+ log.warn("任务升级为P0紧急,需立即处理: queueId={}, opsOrderId={}",
+ queueId, queueDO.getOpsOrderId());
+ // TODO: 触发紧急派单流程
+ }
+
+ return updated > 0;
+ }
+
+ @Override
+ @Transactional(rollbackFor = Exception.class)
+ public boolean adjustPriorityByOpsOrderId(Long opsOrderId, PriorityEnum newPriority, String reason) {
+ OpsOrderQueueDO queueDO = orderQueueMapper.selectByOpsOrderId(opsOrderId);
+ if (queueDO == null) {
+ log.warn("工单不在队列中: opsOrderId={}", opsOrderId);
+ return false;
+ }
+
+ return adjustPriority(queueDO.getId(), newPriority, reason);
+ }
+
+ @Override
+ @Transactional(rollbackFor = Exception.class)
+ public boolean upgradeToUrgent(Long opsOrderId, String reason) {
+ return adjustPriorityByOpsOrderId(opsOrderId, PriorityEnum.P0, reason);
+ }
+
+ @Override
+ public List getWaitingQueue() {
+ // 优先从 Redis 获取
+ // TODO: 实现全局等待队列(需要聚合所有用户的队列)
+ // 这里暂时使用 MySQL
+ List list = orderQueueMapper.selectListWaiting();
+ return convertToDTO(list);
+ }
+
+ @Override
+ public List getUrgentOrders() {
+ // TODO: 优先从 Redis 获取
+ List list = orderQueueMapper.selectUrgentOrders();
+ return convertToDTO(list);
+ }
+
+ @Override
+ public OrderQueueDTO getCurrentTaskByUserId(Long userId) {
+ // 1. 优先从 Redis 获取
+ List redisTasks = redisQueueService.peekTasks(userId, 1);
+ if (redisTasks != null && !redisTasks.isEmpty()) {
+ return redisTasks.get(0);
+ }
+
+ // 2. Redis 未命中,从 MySQL 获取并同步到 Redis
+ OpsOrderQueueDO queueDO = orderQueueMapper.selectCurrentExecutingByUserId(userId);
+ if (queueDO != null) {
+ // 同步到 Redis
+ OrderQueueDTO dto = convertToDTO(queueDO);
+ CompletableFuture.runAsync(() -> {
+ try {
+ redisQueueService.enqueue(dto);
+ } catch (Exception e) {
+ log.error("同步到 Redis 失败: queueId={}", dto.getId(), e);
+ }
+ });
+ return dto;
+ }
+
+ return null;
+ }
+
+ @Override
+ public List getTasksByUserId(Long userId) {
+ // 1. 优先从 Redis 获取
+ List redisTasks = redisQueueService.getTasksByUserId(userId);
+ if (redisTasks != null && !redisTasks.isEmpty()) {
+ return redisTasks;
+ }
+
+ // 2. Redis 未命中,从 MySQL 获取并同步到 Redis
+ List mysqlList = orderQueueMapper.selectListByUserId(userId);
+ if (mysqlList != null && !mysqlList.isEmpty()) {
+ // 同步到 Redis
+ List dtoList = convertToDTO(mysqlList);
+ CompletableFuture.runAsync(() -> {
+ try {
+ redisQueueService.batchEnqueue(dtoList);
+ } catch (Exception e) {
+ log.error("批量同步到 Redis 失败: userId={}", userId, e);
+ }
+ });
+ return dtoList;
+ }
+
+ return Collections.emptyList();
+ }
+
+ @Override
+ public List getWaitingTasksByUserId(Long userId) {
+ // 获取所有任务
+ List allTasks = getTasksByUserId(userId);
+
+ // 过滤出 WAITING 状态的任务,并按队列分数排序
+ return allTasks.stream()
+ .filter(task -> OrderQueueStatusEnum.WAITING.getStatus().equals(task.getQueueStatus()))
+ .sorted((a, b) -> {
+ // 优先使用队列分数 score 排序(已在入队时计算好)
+ if (a.getQueueScore() != null && b.getQueueScore() != null) {
+ return Double.compare(a.getQueueScore(), b.getQueueScore());
+ }
+
+ // 兜底:如果 score 为空,则按优先级+时间排序
+ // 按优先级排序(P0 > P1 > P2 > P3)
+ int priorityCompare = Integer.compare(
+ b.getPriority() != null ? b.getPriority() : 999,
+ a.getPriority() != null ? a.getPriority() : 999
+ );
+ if (priorityCompare != 0) {
+ return priorityCompare;
+ }
+ // 优先级相同,按入队时间排序
+ return a.getEnqueueTime().compareTo(b.getEnqueueTime());
+ })
+ .collect(Collectors.toList());
+ }
+
+ @Override
+ public List getInterruptedTasksByUserId(Long userId) {
+ // 获取所有任务
+ List allTasks = getTasksByUserId(userId);
+
+ // 过滤出 PAUSED 状态的任务,并按中断时间排序
+ return allTasks.stream()
+ .filter(task -> OrderQueueStatusEnum.PAUSED.getStatus().equals(task.getQueueStatus()))
+ .sorted((a, b) -> {
+ // 按中断时间排序(最早中断的排在前面)
+ // 如果没有中断时间字段,则按入队时间排序
+ if (a.getEnqueueTime() != null && b.getEnqueueTime() != null) {
+ return a.getEnqueueTime().compareTo(b.getEnqueueTime());
+ }
+ return 0;
+ })
+ .collect(Collectors.toList());
+ }
+
+ @Override
+ public OrderQueueDTO getById(Long queueId) {
+ // 1. 优先从 Redis Hash 获取
+ OrderQueueDTO redisDTO = redisQueueService.getByQueueId(queueId);
+ if (redisDTO != null) {
+ return redisDTO;
+ }
+
+ // 2. Redis 未命中,从 MySQL 获取
+ OpsOrderQueueDO queueDO = orderQueueMapper.selectById(queueId);
+ if (queueDO != null) {
+ OrderQueueDTO dto = convertToDTO(queueDO);
+ // 同步到 Redis
+ CompletableFuture.runAsync(() -> {
+ try {
+ redisQueueService.enqueue(dto);
+ } catch (Exception e) {
+ log.error("同步到 Redis 失败: queueId={}", dto.getId(), e);
+ }
+ });
+ return dto;
+ }
+
+ return null;
+ }
+
+ @Override
+ public OrderQueueDTO getByOpsOrderId(Long opsOrderId) {
+ // 1. 优先从 Redis 获取(遍历所有用户队列,性能较差,慎用)
+ OrderQueueDTO redisDTO = redisQueueService.getByOrderId(opsOrderId);
+ if (redisDTO != null) {
+ return redisDTO;
+ }
+
+ // 2. Redis 未命中,从 MySQL 获取
+ OpsOrderQueueDO queueDO = orderQueueMapper.selectByOpsOrderId(opsOrderId);
+ if (queueDO != null) {
+ return convertToDTO(queueDO);
+ }
+
+ return null;
+ }
+
+ @Override
+ public Long countWaiting() {
+ // 优先从 Redis 获取(需要聚合所有用户)
+ // TODO: 实现分布式计数器
+ return orderQueueMapper.countWaiting();
+ }
+
+ @Override
+ public Long countByUserId(Long userId) {
+ // 优先从 Redis 获取
+ long redisCount = redisQueueService.getQueueSize(userId);
+ if (redisCount > 0) {
+ return redisCount;
+ }
+
+ // Redis 未命中,从 MySQL 获取
+ return orderQueueMapper.countByUserId(userId);
+ }
+
+ @Override
+ @Transactional(rollbackFor = Exception.class)
+ public boolean updateEventMessage(Long queueId, String eventMessage) {
+ OpsOrderQueueDO queueDO = orderQueueMapper.selectById(queueId);
+ if (queueDO == null) {
+ log.warn("队列记录不存在: queueId={}", queueId);
+ return false;
+ }
+
+ queueDO.setEventMessage(eventMessage);
+ int updated = orderQueueMapper.updateById(queueDO);
+
+ log.info("事件消息已更新: queueId={}, eventMessage={}", queueId, eventMessage);
+
+ return updated > 0;
+ }
+
+ // ========== 私有方法 ==========
+
+ /**
+ * 计算队列分数(用于排序)
+ * 公式:优先级分数 + 时间戳
+ *
+ * @param priority 优先级(0=P0, 1=P1, 2=P2, 3=P3)
+ * @param enqueueTime 入队时间
+ * @return 队列分数
+ */
+ private double calculateQueueScore(Integer priority, LocalDateTime enqueueTime) {
+ // 获取优先级分数
+ long priorityScore = PRIORITY_SCORES.getOrDefault(priority, 3000000L);
+
+ // 计算时间戳(秒级)
+ long timestamp;
+ if (enqueueTime != null) {
+ timestamp = enqueueTime.atZone(ZoneId.systemDefault()).toEpochSecond();
+ } else {
+ timestamp = System.currentTimeMillis() / 1000;
+ }
+
+ return priorityScore + timestamp;
+ }
+
+ /**
+ * 计算下一个队列顺序号
+ */
+ private Integer calculateNextQueueIndex(PriorityEnum priority) {
+ // TODO: 查询当前优先级的最大 queueIndex,然后 +1
+ // 这里简化处理,返回默认值
+ return priority.getQueueOrder() * 1000;
+ }
+
+ /**
+ * 判断是否为终态
+ */
+ private boolean isTerminalStatus(String status) {
+ return "CANCELLED".equals(status) || "COMPLETED".equals(status)
+ || "REMOVED".equals(status) || "FAILED".equals(status);
+ }
+
+ /**
+ * 校验状态流转是否合法
+ */
+ private boolean validateStatusTransition(String oldStatus, String newStatus) {
+ // 终态不能再变更
+ if (isTerminalStatus(oldStatus)) {
+ return false;
+ }
+
+ // 允许的状态流转(支持新旧状态)
+ return switch (oldStatus.toUpperCase()) {
+ case "PENDING" -> "WAITING".equals(newStatus) || "CANCELLED".equals(newStatus);
+ case "WAITING" -> "PROCESSING".equals(newStatus) || "DISPATCHED".equals(newStatus)
+ || "CANCELLED".equals(newStatus);
+ case "PROCESSING", "DISPATCHED" -> "PAUSED".equals(newStatus)
+ || "REMOVED".equals(newStatus) || "COMPLETED".equals(newStatus);
+ case "PAUSED" -> "PROCESSING".equals(newStatus) || "DISPATCHED".equals(newStatus);
+ default -> false;
+ };
+ }
+
+ /**
+ * 根据状态更新时间戳
+ */
+ private void updateStatusTimestamp(OpsOrderQueueDO queueDO, OrderQueueStatusEnum status) {
+ switch (status.getStatus().toUpperCase()) {
+ case "DISPATCHED" -> {
+ if (queueDO.getDequeueTime() == null) {
+ queueDO.setDequeueTime(LocalDateTime.now());
+ }
+ }
+ case "REMOVED", "COMPLETED" -> {
+ // 可以记录完成时间(如果表有该字段)
+ }
+ }
+ }
+
+ /**
+ * 转换为 DTO
+ */
+ private OrderQueueDTO convertToDTO(OpsOrderQueueDO queueDO) {
+ OrderQueueDTO dto = new OrderQueueDTO();
+ BeanUtils.copyProperties(queueDO, dto);
+ return dto;
+ }
+
+ /**
+ * 批量转换为 DTO
+ */
+ private List convertToDTO(List list) {
+ return list.stream()
+ .map(this::convertToDTO)
+ .collect(Collectors.toList());
+ }
+}
diff --git a/viewsh-module-ops/viewsh-module-ops-server/pom.xml b/viewsh-module-ops/viewsh-module-ops-server/pom.xml
index 7af5d6e..1ec8495 100644
--- a/viewsh-module-ops/viewsh-module-ops-server/pom.xml
+++ b/viewsh-module-ops/viewsh-module-ops-server/pom.xml
@@ -1,172 +1,176 @@
-
-
- viewsh-module-ops
- com.viewsh
- ${revision}
-
- 4.0.0
- jar
-
- viewsh-module-ops-server
-
- ${project.artifactId}
-
- [宿主层] 独立微服务入口,聚合上述所有模块
- 内部依赖流向(由低向高):ops-api → ops-biz → *-biz → ops-server
-
-
-
-
-
- com.viewsh
- viewsh-spring-boot-starter-env
-
-
-
-
- com.viewsh
- viewsh-module-system-api
- ${revision}
-
-
- com.viewsh
- viewsh-module-infra-api
- ${revision}
-
-
- com.viewsh
- viewsh-module-ops-api
- ${revision}
-
-
-
-
- com.viewsh
- viewsh-module-ops-biz
- ${revision}
-
-
-
-
- com.viewsh
- viewsh-module-environment-biz
- ${revision}
-
-
- com.viewsh
- viewsh-module-security-biz
- ${revision}
-
-
- com.viewsh
- viewsh-module-facilities-biz
- ${revision}
-
-
- com.viewsh
- viewsh-module-service-biz
- ${revision}
-
-
-
-
-
- com.viewsh
- viewsh-spring-boot-starter-biz-data-permission
-
-
- com.viewsh
- viewsh-spring-boot-starter-biz-tenant
-
-
-
-
- com.viewsh
- viewsh-spring-boot-starter-web
-
-
-
- com.viewsh
- viewsh-spring-boot-starter-security
-
-
-
-
- com.viewsh
- viewsh-spring-boot-starter-mybatis
-
-
-
- com.viewsh
- viewsh-spring-boot-starter-redis
-
-
-
-
- com.viewsh
- viewsh-spring-boot-starter-rpc
-
-
-
-
- com.alibaba.cloud
- spring-cloud-starter-alibaba-nacos-discovery
-
-
-
-
- com.alibaba.cloud
- spring-cloud-starter-alibaba-nacos-config
-
-
-
-
- com.viewsh
- viewsh-spring-boot-starter-job
-
-
+
+
+ viewsh-module-ops
+ com.viewsh
+ ${revision}
+
+ 4.0.0
+ jar
+
+ viewsh-module-ops-server
+
+ ${project.artifactId}
+
+ [宿主层] 独立微服务入口,聚合上述所有模块
+ 内部依赖流向(由低向高):ops-api → ops-biz → *-biz → ops-server
+
+
+
+
+
+ com.viewsh
+ viewsh-spring-boot-starter-env
+
+
+
+
+ com.viewsh
+ viewsh-module-system-api
+ ${revision}
+
+
+ com.viewsh
+ viewsh-module-infra-api
+ ${revision}
+
+
+ com.viewsh
+ viewsh-module-ops-api
+ ${revision}
+
+
+
+
+ com.viewsh
+ viewsh-module-ops-biz
+ ${revision}
+
+
+
+
+ com.viewsh
+ viewsh-module-environment-biz
+ ${revision}
+
+
+ com.viewsh
+ viewsh-module-security-biz
+ ${revision}
+
+
+ com.viewsh
+ viewsh-module-facilities-biz
+ ${revision}
+
+
+ com.viewsh
+ viewsh-module-service-biz
+ ${revision}
+
+
+
+
+
+ com.viewsh
+ viewsh-spring-boot-starter-biz-data-permission
+
+
+ com.viewsh
+ viewsh-spring-boot-starter-biz-tenant
+
+
+
+
+ com.viewsh
+ viewsh-spring-boot-starter-web
+
+
+
+ com.viewsh
+ viewsh-spring-boot-starter-security
+
+
+
+
+ com.viewsh
+ viewsh-spring-boot-starter-mybatis
+
+
+
+ com.viewsh
+ viewsh-spring-boot-starter-redis
+
+
+
+
+ com.viewsh
+ viewsh-spring-boot-starter-rpc
+
+
+
+
+ com.alibaba.cloud
+ spring-cloud-starter-alibaba-nacos-discovery
+
+
+
+
+ com.alibaba.cloud
+ spring-cloud-starter-alibaba-nacos-config
+
+
+
+
+ com.viewsh
+ viewsh-spring-boot-starter-job
+
+
com.viewsh
viewsh-spring-boot-starter-mq
+
+ org.apache.rocketmq
+ rocketmq-spring-boot-starter
+
-
- com.viewsh
- viewsh-spring-boot-starter-test
-
-
-
-
- com.viewsh
- viewsh-spring-boot-starter-excel
-
-
-
-
- com.viewsh
- viewsh-spring-boot-starter-monitor
-
-
-
-
-
- ${project.artifactId}
-
-
-
- org.springframework.boot
- spring-boot-maven-plugin
-
-
-
- repackage
-
-
-
-
-
-
-
+
+ com.viewsh
+ viewsh-spring-boot-starter-test
+
+
+
+
+ com.viewsh
+ viewsh-spring-boot-starter-excel
+
+
+
+
+ com.viewsh
+ viewsh-spring-boot-starter-monitor
+
+
+
+
+
+ ${project.artifactId}
+
+
+
+ org.springframework.boot
+ spring-boot-maven-plugin
+
+
+
+ repackage
+
+
+
+
+
+
+
diff --git a/viewsh-module-ops/viewsh-module-ops-server/src/main/java/com/viewsh/module/ops/framework/rpc/config/RpcConfiguration.java b/viewsh-module-ops/viewsh-module-ops-server/src/main/java/com/viewsh/module/ops/framework/rpc/config/RpcConfiguration.java
new file mode 100644
index 0000000..9d76038
--- /dev/null
+++ b/viewsh-module-ops/viewsh-module-ops-server/src/main/java/com/viewsh/module/ops/framework/rpc/config/RpcConfiguration.java
@@ -0,0 +1,14 @@
+package com.viewsh.module.ops.framework.rpc.config;
+
+import com.viewsh.module.iot.api.device.IotDeviceControlApi;
+import com.viewsh.module.system.api.notify.NotifyMessageSendApi;
+import org.springframework.cloud.openfeign.EnableFeignClients;
+import org.springframework.context.annotation.Configuration;
+
+@Configuration(value = "opsRpcConfiguration", proxyBeanMethods = false)
+@EnableFeignClients(clients = {
+ NotifyMessageSendApi.class,
+ IotDeviceControlApi.class
+})
+public class RpcConfiguration {
+}
diff --git a/viewsh-module-ops/viewsh-module-ops-server/src/main/resources/application-local.yaml b/viewsh-module-ops/viewsh-module-ops-server/src/main/resources/application-local.yaml
index a5a3060..4bad768 100644
--- a/viewsh-module-ops/viewsh-module-ops-server/src/main/resources/application-local.yaml
+++ b/viewsh-module-ops/viewsh-module-ops-server/src/main/resources/application-local.yaml
@@ -1,139 +1,154 @@
---- #################### 注册中心 + 配置中心相关配置 ####################
-
-spring:
- cloud:
- nacos:
- server-addr: 124.221.55.225:8848 # Nacos 服务器地址
- username: nacos # Nacos 账号
- password: 9oDxX~}e7DeP # Nacos 密码
- discovery: # 【配置中心】配置项
- namespace: dev # 命名空间。这里使用 dev 开发环境
- group: DEFAULT_GROUP # 使用的 Nacos 配置分组,默认为 DEFAULT_GROUP
- metadata:
- version: 1.0.0 # 服务实例的版本号,可用于灰度发布
- server-identity: nacosAuthKey # 身份验证密钥键
- server-identity-key: 8fG4s7J2kL9pQ3dR6xT1vZ0bW5nC8mE7hY2jU4qA0rS9tV6wB3fD1gH5kL8pN2 # 密钥值
- token: Z1xC9vT6pM3qL7rF2sW8bH0kD5nJ4aY9eV6uG1oR3tB8mN2wQ7cK5xS0jP4hL1 # 身份验证令牌
- config: # 【注册中心】配置项
- namespace: dev # 命名空间。这里使用 dev 开发环境
- group: DEFAULT_GROUP # 使用的 Nacos 配置分组,默认为 DEFAULT_GROUP
-
---- #################### 数据库相关配置 ####################
-spring:
- # 数据源配置项
- autoconfigure:
- # noinspection SpringBootApplicationYaml
- exclude:
- - com.alibaba.druid.spring.boot.autoconfigure.DruidDataSourceAutoConfigure # 排除 Druid 的自动配置,使用 dynamic-datasource-spring-boot-starter 配置多数据源
- datasource:
- druid: # Druid 【监控】相关的全局配置
- web-stat-filter:
- enabled: true
- stat-view-servlet:
- enabled: true
- allow: # 设置白名单,不填则允许所有访问
- url-pattern: /druid/*
- login-username: # 控制台管理用户名和密码
- login-password:
- filter:
- stat:
- enabled: true
- log-slow-sql: true # 慢 SQL 记录
- slow-sql-millis: 100
- merge-sql: true
- wall:
- config:
- multi-statement-allow: true
- dynamic: # 多数据源配置
- druid: # Druid 【连接池】相关的全局配置
- initial-size: 1 # 初始连接数
- min-idle: 1 # 最小连接池数量
- max-active: 20 # 最大连接池数量
- max-wait: 60000 # 配置获取连接等待超时的时间,单位:毫秒(1 分钟)
- time-between-eviction-runs-millis: 60000 # 配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位:毫秒(1 分钟)
- min-evictable-idle-time-millis: 600000 # 配置一个连接在池中最小生存的时间,单位:毫秒(10 分钟)
- max-evictable-idle-time-millis: 1800000 # 配置一个连接在池中最大生存的时间,单位:毫秒(30 分钟)
- validation-query: SELECT 1 FROM DUAL # 配置检测连接是否有效
- test-while-idle: true
- test-on-borrow: false
- test-on-return: false
- pool-prepared-statements: true # 是否开启 PreparedStatement 缓存
- max-pool-prepared-statement-per-connection-size: 20 # 每个连接缓存的 PreparedStatement 数量
- primary: master
- datasource:
- master:
- url: jdbc:mysql://${MYSQL_HOST:124.221.55.225}:${MYSQL_PORT:3306}/${MYSQL_DATABASE:aiot-platform}?useSSL=false&serverTimezone=Asia/Shanghai&allowPublicKeyRetrieval=true&nullCatalogMeansCurrent=true&rewriteBatchedStatements=true
- username: ${MYSQL_USERNAME:root}
- password: ${MYSQL_PASSWORD:65p^VTPi9Qd+}
- # username: sa # SQL Server 连接的示例
- # password: JSm:g(*%lU4ZAkz06cd52KqT3)i1?H7W # SQL Server 连接的示例
- # username: SYSDBA # DM 连接的示例
- # password: SYSDBA # DM 连接的示例
- slave: # 模拟从库,可根据自己需要修改
- lazy: true # 开启懒加载,保证启动速度
- url: jdbc:mysql://${MYSQL_HOST:124.221.55.225}:${MYSQL_PORT:3306}/${MYSQL_DATABASE:aiot-platform}?useSSL=false&serverTimezone=Asia/Shanghai&allowPublicKeyRetrieval=true&rewriteBatchedStatements=true&nullCatalogMeansCurrent=true
- username: ${MYSQL_USERNAME:root}
- password: ${MYSQL_PASSWORD:65p^VTPi9Qd+}
- # Redis 配置。Redisson 默认的配置足够使用,一般不需要进行调优
- data:
- redis:
- host: 127.0.0.1 # 地址
- port: 6379 # 端口
- database: 0 # 数据库索引
-# password: 123456 # 密码,建议生产环境开启
-
---- #################### MQ 消息队列相关配置 ####################
-
---- #################### 定时任务相关配置 ####################
-
-xxl:
- job:
- enabled: false # 暂时禁用 xxl-job
- admin:
- addresses: http://127.0.0.1:9090/xxl-job-admin # 调度中心部署跟地址
-
---- #################### 服务保障相关配置 ####################
-
-# Lock4j 配置项
-lock4j:
- acquire-timeout: 3000 # 获取分布式锁超时时间,默认为 3000 毫秒
- expire: 30000 # 分布式锁的超时时间,默认为 30 毫秒
-
---- #################### 监控相关配置 ####################
-
-# Actuator 监控端点的配置项
-management:
- endpoints:
- web:
- base-path: /actuator # Actuator 提供的 API 接口的根目录。默认为 /actuator
- exposure:
- include: '*' # 需要开放的端点。默认值只打开 health 和 info 两个端点。通过设置 * ,可以开放所有端点。
-
-# Spring Boot Admin 配置项
-spring:
- boot:
- admin:
- # Spring Boot Admin Client 客户端的相关配置
- client:
- instance:
- service-host-type: IP # 注册实例时,优先使用 IP [IP, HOST_NAME, CANONICAL_HOST_NAME]
- username: admin
- password: admin
-
-# 日志文件配置
-logging:
- level:
- # 配置自己写的 MyBatis Mapper 打印日志
- com.viewsh.module.ops.dal.mysql: debug
- org.springframework.context.support.PostProcessorRegistrationDelegate: ERROR # TODO 芋艿:先禁用,Spring Boot 3.X 存在部分错误的 WARN 提示
-
---- #################### 芋道相关配置 ####################
-
-# 芋道配置项,设置当前项目所有自定义的配置
-viewsh:
- env: # 多环境的配置项
- tag: ${HOSTNAME}
- security:
- mock-enable: true
- access-log: # 访问日志的配置项
- enable: false
+--- #################### 注册中心 + 配置中心相关配置 ####################
+
+spring:
+ cloud:
+ nacos:
+ server-addr: 124.221.55.225:8848 # Nacos 服务器地址
+ username: nacos # Nacos 账号
+ password: 9oDxX~}e7DeP # Nacos 密码
+ discovery: # 【配置中心】配置项
+ namespace: dev # 命名空间。这里使用 dev 开发环境
+ group: DEFAULT_GROUP # 使用的 Nacos 配置分组,默认为 DEFAULT_GROUP
+ metadata:
+ version: 1.0.0 # 服务实例的版本号,可用于灰度发布
+ server-identity: nacosAuthKey # 身份验证密钥键
+ server-identity-key: 8fG4s7J2kL9pQ3dR6xT1vZ0bW5nC8mE7hY2jU4qA0rS9tV6wB3fD1gH5kL8pN2 # 密钥值
+ token: Z1xC9vT6pM3qL7rF2sW8bH0kD5nJ4aY9eV6uG1oR3tB8mN2wQ7cK5xS0jP4hL1 # 身份验证令牌
+ config: # 【注册中心】配置项
+ namespace: dev # 命名空间。这里使用 dev 开发环境
+ group: DEFAULT_GROUP # 使用的 Nacos 配置分组,默认为 DEFAULT_GROUP
+
+--- #################### 数据库相关配置 ####################
+spring:
+ # 数据源配置项
+ autoconfigure:
+ # noinspection SpringBootApplicationYaml
+ exclude:
+ - com.alibaba.druid.spring.boot.autoconfigure.DruidDataSourceAutoConfigure # 排除 Druid 的自动配置,使用 dynamic-datasource-spring-boot-starter 配置多数据源
+ datasource:
+ druid: # Druid 【监控】相关的全局配置
+ web-stat-filter:
+ enabled: true
+ stat-view-servlet:
+ enabled: true
+ allow: # 设置白名单,不填则允许所有访问
+ url-pattern: /druid/*
+ login-username: # 控制台管理用户名和密码
+ login-password:
+ filter:
+ stat:
+ enabled: true
+ log-slow-sql: true # 慢 SQL 记录
+ slow-sql-millis: 100
+ merge-sql: true
+ wall:
+ config:
+ multi-statement-allow: true
+ dynamic: # 多数据源配置
+ druid: # Druid 【连接池】相关的全局配置
+ initial-size: 1 # 初始连接数
+ min-idle: 1 # 最小连接池数量
+ max-active: 20 # 最大连接池数量
+ max-wait: 60000 # 配置获取连接等待超时的时间,单位:毫秒(1 分钟)
+ time-between-eviction-runs-millis: 60000 # 配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位:毫秒(1 分钟)
+ min-evictable-idle-time-millis: 600000 # 配置一个连接在池中最小生存的时间,单位:毫秒(10 分钟)
+ max-evictable-idle-time-millis: 1800000 # 配置一个连接在池中最大生存的时间,单位:毫秒(30 分钟)
+ validation-query: SELECT 1 FROM DUAL # 配置检测连接是否有效
+ test-while-idle: true
+ test-on-borrow: false
+ test-on-return: false
+ pool-prepared-statements: true # 是否开启 PreparedStatement 缓存
+ max-pool-prepared-statement-per-connection-size: 20 # 每个连接缓存的 PreparedStatement 数量
+ primary: master
+ datasource:
+ master:
+ url: jdbc:mysql://${MYSQL_HOST:124.221.55.225}:${MYSQL_PORT:3306}/${MYSQL_DATABASE:aiot-platform}?useSSL=false&serverTimezone=Asia/Shanghai&allowPublicKeyRetrieval=true&nullCatalogMeansCurrent=true&rewriteBatchedStatements=true
+ username: ${MYSQL_USERNAME:root}
+ password: ${MYSQL_PASSWORD:65p^VTPi9Qd+}
+ # username: sa # SQL Server 连接的示例
+ # password: JSm:g(*%lU4ZAkz06cd52KqT3)i1?H7W # SQL Server 连接的示例
+ # username: SYSDBA # DM 连接的示例
+ # password: SYSDBA # DM 连接的示例
+ slave: # 模拟从库,可根据自己需要修改
+ lazy: true # 开启懒加载,保证启动速度
+ url: jdbc:mysql://${MYSQL_HOST:124.221.55.225}:${MYSQL_PORT:3306}/${MYSQL_DATABASE:aiot-platform}?useSSL=false&serverTimezone=Asia/Shanghai&allowPublicKeyRetrieval=true&rewriteBatchedStatements=true&nullCatalogMeansCurrent=true
+ username: ${MYSQL_USERNAME:root}
+ password: ${MYSQL_PASSWORD:65p^VTPi9Qd+}
+ # Redis 配置。Redisson 默认的配置足够使用,一般不需要进行调优
+ data:
+ redis:
+ host: 127.0.0.1 # 地址
+ port: 6379 # 端口
+ database: 0 # 数据库索引
+# password: 123456 # 密码,建议生产环境开启
+
+--- #################### MQ 消息队列相关配置 ####################
+
+# rocketmq 配置项,对应 RocketMQProperties 配置类
+rocketmq:
+ name-server: 124.221.55.225:9876 # RocketMQ Namesrv
+
+spring:
+ # RabbitMQ 配置项,对应 RabbitProperties 配置类
+ rabbitmq:
+ host: 127.0.0.1 # RabbitMQ 服务的地址
+ port: 5672 # RabbitMQ 服务的端口
+ username: guest # RabbitMQ 服务的账号
+ password: guest # RabbitMQ 服务的密码
+ # Kafka 配置项,对应 KafkaProperties 配置类
+ kafka:
+ bootstrap-servers: 127.0.0.1:9092 # 指定 Kafka Broker 地址,可以设置多个,以逗号分隔
+
+--- #################### 定时任务相关配置 ####################
+
+xxl:
+ job:
+ enabled: false # 暂时禁用 xxl-job
+ admin:
+ addresses: http://127.0.0.1:9090/xxl-job-admin # 调度中心部署跟地址
+
+--- #################### 服务保障相关配置 ####################
+
+# Lock4j 配置项
+lock4j:
+ acquire-timeout: 3000 # 获取分布式锁超时时间,默认为 3000 毫秒
+ expire: 30000 # 分布式锁的超时时间,默认为 30 毫秒
+
+--- #################### 监控相关配置 ####################
+
+# Actuator 监控端点的配置项
+management:
+ endpoints:
+ web:
+ base-path: /actuator # Actuator 提供的 API 接口的根目录。默认为 /actuator
+ exposure:
+ include: '*' # 需要开放的端点。默认值只打开 health 和 info 两个端点。通过设置 * ,可以开放所有端点。
+
+# Spring Boot Admin 配置项
+spring:
+ boot:
+ admin:
+ # Spring Boot Admin Client 客户端的相关配置
+ client:
+ instance:
+ service-host-type: IP # 注册实例时,优先使用 IP [IP, HOST_NAME, CANONICAL_HOST_NAME]
+ username: admin
+ password: admin
+
+# 日志文件配置
+logging:
+ level:
+ # 配置自己写的 MyBatis Mapper 打印日志
+ com.viewsh.module.ops.dal.mysql: debug
+ org.springframework.context.support.PostProcessorRegistrationDelegate: ERROR # TODO 芋艿:先禁用,Spring Boot 3.X 存在部分错误的 WARN 提示
+
+--- #################### 芋道相关配置 ####################
+
+# 芋道配置项,设置当前项目所有自定义的配置
+viewsh:
+ env: # 多环境的配置项
+ tag: ${HOSTNAME}
+ security:
+ mock-enable: true
+ access-log: # 访问日志的配置项
+ enable: false
diff --git a/viewsh-module-ops/viewsh-module-ops-server/src/main/resources/application.yaml b/viewsh-module-ops/viewsh-module-ops-server/src/main/resources/application.yaml
index a20b2e6..846ce7e 100644
--- a/viewsh-module-ops/viewsh-module-ops-server/src/main/resources/application.yaml
+++ b/viewsh-module-ops/viewsh-module-ops-server/src/main/resources/application.yaml
@@ -1,119 +1,143 @@
-spring:
- application:
- name: ops-server
-
- profiles:
- active: local
-
- main:
- allow-circular-references: true # 允许循环依赖,因为项目是三层架构,无法避免这个情况。
- allow-bean-definition-overriding: true # 允许 Bean 覆盖,例如说 Feign 等会存在重复定义的服务
-
- config:
- import:
- - optional:classpath:application-${spring.profiles.active}.yaml # 加载【本地】配置
- - optional:nacos:${spring.application.name}-${spring.profiles.active}.yaml # 加载【Nacos】的配置
-
- # Servlet 配置
- servlet:
- # 文件上传相关配置项
- multipart:
- max-file-size: 16MB # 单个文件大小
- max-request-size: 32MB # 设置总上传的文件大小
-
- # Jackson 配置项
- jackson:
- serialization:
- write-dates-as-timestamps: true # 设置 LocalDateTime 的格式,使用时间戳
- write-date-timestamps-as-nanoseconds: false # 设置不使用 nanoseconds 的格式。例如说 1611460870.401,而是直接 1611460870401
- write-durations-as-timestamps: true # 设置 Duration 的格式,使用时间戳
- fail-on-empty-beans: false # 允许序列化无属性的 Bean
-
- # Cache 配置项
- cache:
- type: REDIS
- redis:
- time-to-live: 1h # 设置过期时间为 1 小时
-
-server:
- port: 48092
-
-logging:
- file:
- name: ${user.home}/logs/${spring.application.name}.log # 日志文件名,全路径
-
---- #################### 接口文档配置 ####################
-
-springdoc:
- api-docs:
- enabled: true # 1. 是否开启 Swagger 接文档的元数据
- path: /v3/api-docs
- swagger-ui:
- enabled: true # 2.1 是否开启 Swagger 文档的官方 UI 界面
- path: /swagger-ui.html
- default-flat-param-object: true # 参见 https://doc.xiaominfo.com/docs/faq/v4/knife4j-parameterobject-flat-param 文档
-
-knife4j:
- enable: true # 2.2 是否开启 Swagger 文档的 Knife4j UI 界面
- setting:
- language: zh_cn
-
-# MyBatis Plus 的配置项
-mybatis-plus:
- configuration:
- map-underscore-to-camel-case: true # 虽然默认为 true ,但是还是显示去指定下。
- global-config:
- db-config:
- id-type: NONE # “智能”模式,基于 IdTypeEnvironmentPostProcessor + 数据源的类型,自动适配成 AUTO、INPUT 模式。
- # id-type: AUTO # 自增 ID,适合 MySQL 等直接自增的数据库
- # id-type: INPUT # 用户输入 ID,适合 Oracle、PostgreSQL、Kingbase、DB2、H2 数据库
- # id-type: ASSIGN_ID # 分配 ID,默认使用雪花算法。注意,Oracle、PostgreSQL、Kingbase、DB2、H2 数据库时,需要去除实体类上的 @KeySequence 注解
- logic-delete-value: 1 # 逻辑已删除值(默认为 1)
- logic-not-delete-value: 0 # 逻辑未删除值(默认为 0)
- banner: false # 关闭控制台的 Banner 打印
- type-aliases-package: ${viewsh.info.base-package}.module.*.dal.dataobject
- encryptor:
- password: XDV71a+xqStEA3WH # 加解密的秘钥,可使用 https://www.imaegoo.com/2020/aes-key-generator/ 网站生成
-
-mybatis-plus-join:
- banner: false # 关闭控制台的 Banner 打印
-
-# VO 转换(数据翻译)相关
-easy-trans:
- is-enable-global: false # 【默认禁用,对性能确认压力大】启用全局翻译(拦截所有 SpringMVC ResponseBody 进行自动翻译 )。如果对于性能要求很高可关闭此配置,或通过 @IgnoreTrans 忽略某个接口
-
---- #################### RPC 远程调用相关配置 ####################
-
---- #################### MQ 消息队列相关配置 ####################
-
---- #################### 定时任务相关配置 ####################
-
-xxl:
- job:
- executor:
- appname: ${spring.application.name} # 执行器 AppName
- logpath: ${user.home}/logs/xxl-job/${spring.application.name} # 执行器运行日志文件存储磁盘路径
- accessToken: default_token # 执行器通讯TOKEN
-
---- #################### 芋道相关配置 ####################
-
-viewsh:
- info:
- version: 1.0.0
- base-package: com.viewsh.module.ops
- web:
- admin-ui:
- url: http://dashboard.viewsh.iocoder.cn # Admin 管理后台 UI 的地址
- xss:
- enable: false
- exclude-urls: # 如下两个 url,仅仅是为了演示,去掉配置也没关系
- - ${spring.boot.admin.context-path}/** # 不处理 Spring Boot Admin 的请求
- - ${management.endpoints.web.base-path}/** # 不处理 Actuator 的请求
- swagger:
- title: 管理后台
- description: 提供管理员管理的所有功能
- version: ${viewsh.info.version}
- tenant: # 多租户相关配置项
- enable: true
-
-debug: false
+spring:
+ application:
+ name: ops-server
+
+ profiles:
+ active: local
+
+ main:
+ allow-circular-references: true # 允许循环依赖,因为项目是三层架构,无法避免这个情况。
+ allow-bean-definition-overriding: true # 允许 Bean 覆盖,例如说 Feign 等会存在重复定义的服务
+
+ config:
+ import:
+ - optional:classpath:application-${spring.profiles.active}.yaml # 加载【本地】配置
+ - optional:nacos:${spring.application.name}-${spring.profiles.active}.yaml # 加载【Nacos】的配置
+
+ # Servlet 配置
+ servlet:
+ # 文件上传相关配置项
+ multipart:
+ max-file-size: 16MB # 单个文件大小
+ max-request-size: 32MB # 设置总上传的文件大小
+
+ # Jackson 配置项
+ jackson:
+ serialization:
+ write-dates-as-timestamps: true # 设置 LocalDateTime 的格式,使用时间戳
+ write-date-timestamps-as-nanoseconds: false # 设置不使用 nanoseconds 的格式。例如说 1611460870.401,而是直接 1611460870401
+ write-durations-as-timestamps: true # 设置 Duration 的格式,使用时间戳
+ fail-on-empty-beans: false # 允许序列化无属性的 Bean
+
+ # Cache 配置项
+ cache:
+ type: REDIS
+ redis:
+ time-to-live: 1h # 设置过期时间为 1 小时
+
+server:
+ port: 48092
+
+logging:
+ file:
+ name: ${user.home}/logs/${spring.application.name}.log # 日志文件名,全路径
+
+--- #################### 接口文档配置 ####################
+
+springdoc:
+ api-docs:
+ enabled: true # 1. 是否开启 Swagger 接文档的元数据
+ path: /v3/api-docs
+ swagger-ui:
+ enabled: true # 2.1 是否开启 Swagger 文档的官方 UI 界面
+ path: /swagger-ui.html
+ default-flat-param-object: true # 参见 https://doc.xiaominfo.com/docs/faq/v4/knife4j-parameterobject-flat-param 文档
+
+knife4j:
+ enable: true # 2.2 是否开启 Swagger 文档的 Knife4j UI 界面
+ setting:
+ language: zh_cn
+
+# MyBatis Plus 的配置项
+mybatis-plus:
+ configuration:
+ map-underscore-to-camel-case: true # 虽然默认为 true ,但是还是显示去指定下。
+ global-config:
+ db-config:
+ id-type: NONE # “智能”模式,基于 IdTypeEnvironmentPostProcessor + 数据源的类型,自动适配成 AUTO、INPUT 模式。
+ # id-type: AUTO # 自增 ID,适合 MySQL 等直接自增的数据库
+ # id-type: INPUT # 用户输入 ID,适合 Oracle、PostgreSQL、Kingbase、DB2、H2 数据库
+ # id-type: ASSIGN_ID # 分配 ID,默认使用雪花算法。注意,Oracle、PostgreSQL、Kingbase、DB2、H2 数据库时,需要去除实体类上的 @KeySequence 注解
+ logic-delete-value: 1 # 逻辑已删除值(默认为 1)
+ logic-not-delete-value: 0 # 逻辑未删除值(默认为 0)
+ banner: false # 关闭控制台的 Banner 打印
+ type-aliases-package: ${viewsh.info.base-package}.module.*.dal.dataobject
+ encryptor:
+ password: XDV71a+xqStEA3WH # 加解密的秘钥,可使用 https://www.imaegoo.com/2020/aes-key-generator/ 网站生成
+
+mybatis-plus-join:
+ banner: false # 关闭控制台的 Banner 打印
+
+# VO 转换(数据翻译)相关
+easy-trans:
+ is-enable-global: false # 【默认禁用,对性能确认压力大】启用全局翻译(拦截所有 SpringMVC ResponseBody 进行自动翻译 )。如果对于性能要求很高可关闭此配置,或通过 @IgnoreTrans 忽略某个接口
+
+--- #################### RPC 远程调用相关配置 ####################
+
+--- #################### 消息队列相关 ####################
+
+# rocketmq 配置项,对应 RocketMQProperties 配置类
+rocketmq:
+ # Producer 配置项
+ producer:
+ group: ${spring.application.name}_PRODUCER # 生产者分组
+
+spring:
+ # Kafka 配置项,对应 KafkaProperties 配置类
+ kafka:
+ # Kafka Producer 配置项
+ producer:
+ acks: 1 # 0-不应答。1-leader 应答。all-所有 leader 和 follower 应答。
+ retries: 3 # 发送失败时,重试发送的次数
+ value-serializer: org.springframework.kafka.support.serializer.JsonSerializer # 消息的 value 的序列化
+ # Kafka Consumer 配置项
+ consumer:
+ auto-offset-reset: earliest # 设置消费者分组最初的消费进度为 earliest 。可参考博客 https://blog.csdn.net/lishuangzhe7047/article/details/74530417 理解
+ value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
+ properties:
+ spring.json.trusted.packages: '*'
+ # Kafka Consumer Listener 监听器配置
+ listener:
+ missing-topics-fatal: false # 消费监听接口监听的主题不存在时,默认会报错。所以通过设置为 false ,解决报错
+
+--- #################### 定时任务相关配置 ####################
+
+xxl:
+ job:
+ executor:
+ appname: ${spring.application.name} # 执行器 AppName
+ logpath: ${user.home}/logs/xxl-job/${spring.application.name} # 执行器运行日志文件存储磁盘路径
+ accessToken: default_token # 执行器通讯TOKEN
+
+--- #################### 芋道相关配置 ####################
+
+viewsh:
+ info:
+ version: 1.0.0
+ base-package: com.viewsh.module.ops
+ web:
+ admin-ui:
+ url: http://dashboard.viewsh.iocoder.cn # Admin 管理后台 UI 的地址
+ xss:
+ enable: false
+ exclude-urls: # 如下两个 url,仅仅是为了演示,去掉配置也没关系
+ - ${spring.boot.admin.context-path}/** # 不处理 Spring Boot Admin 的请求
+ - ${management.endpoints.web.base-path}/** # 不处理 Actuator 的请求
+ swagger:
+ title: 管理后台
+ description: 提供管理员管理的所有功能
+ version: ${viewsh.info.version}
+ tenant: # 多租户相关配置项
+ enable: true
+
+debug: false