5 Commits

Author SHA1 Message Date
lzh
57f32e56a9 fix(ops): 收口队列 Redis 分数来源
Some checks failed
Java CI with Maven / build (11) (push) Has been cancelled
Java CI with Maven / build (17) (push) Has been cancelled
Java CI with Maven / build (8) (push) Has been cancelled
- 移除 Redis 队列层基于优先级和时间戳的本地兜底算分逻辑

- 强制 enqueue、batchEnqueue、updatePriority 使用服务层预先计算的 queueScore

- 兼容历史缺少 queueScore 的 Redis 记录,按最低优先级处理避免旧模型重新参与排序

- 补齐 QueueSyncService 的 queueScore 映射,确保 MySQL 同步到 Redis 时保留总分

- 新增 QueueSyncServiceTest 覆盖同步链路携带 queueScore 的行为
2026-03-07 22:44:09 +08:00
lzh
af1e0c0989 fix(iot): 暂时取消作业时长不足抑制自动完成逻辑
信号丢失超时后不再校验最小有效作业时长,所有情况均直接触发自动完成。

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-07 22:32:16 +08:00
lzh
713ae744ac feat(ops): 客流阈值触发静默处理,工单完成时重置计数器防竞态
1. 已派发/已到达(DISPATCHED/CONFIRMED/ARRIVED)状态静默忽略客流触发,
   仅排队中(PENDING/QUEUED)状态才升级优先级
2. 工单完成时先重置IoT客流计数器再清除活跃标记,防止残留计数
   和MQ消息延迟导致的竞态误创建工单
3. 工单取消时仅清除活跃标记不重置计数器,保留客流数据以便尽快
   重新触发

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-07 22:28:11 +08:00
lzh
a9fd9313cc feat(ops): 重构派单队列评分逻辑,支持楼层差与等待老化综合排序
- 新增 QueueScoreCalculator/QueueScoreContext/QueueScoreResult,统一按优先级分 + 楼层差分 - 等待老化分计算队列总分,并将 PRIORITY_WEIGHT 调整为 1500
- OrderQueueService 新增 rebuildWaitingTasksByUserId 接口,OrderQueueServiceEnhanced 支持按执行人重算 WAITING 队列、以当前执行工单楼层为基准动态重排,并在事务提交后同步刷新 Redis
- RedisOrderQueueServiceImpl 支持持久化 baseFloorNo、targetFloorNo、floorDiff、waitMinutes、scoreUpdateTime 等评分明细,清队列时同时清理关联 Hash,避免脏数据残留
- DispatchEngineImpl、CleanerPriorityScheduleStrategy、BadgeDeviceScheduleStrategy 调整为非抢占式派单:P0 忙碌时仅入队等待,空闲时直接派发,自动派单前按总分重排并派发下一单
- CleanOrderServiceImpl 取消 P0 自动打断链路,升级到 P0 后仅重算等待队列并发送通知;补充 QueueScoreCalculatorTest、OrderQueueServiceEnhancedTest、CleanerPriorityScheduleStrategyTest、CleanOrderEndToEndTest 覆盖新行为
2026-03-07 21:15:10 +08:00
lzh
26c4ce07eb feat(iot,ops): 区域设备关联接口返回更多设备信息,修复 N+1 和代码质量问题
- IotDeviceSimpleRespDTO 新增 nickname、serialNumber、state、deviceType 字段
- IotDeviceQueryApi 新增 batchGetDevices 批量查询接口
- IotDeviceQueryApiImpl 提取 toSimpleDTO 统一转换、通过产品缓存解析 productName、
  移除 blanket try-catch 让异常正确传播、删除无用 import
- AreaDeviceRelationRespVO 新增 nickname、serialNumber、deviceState、deviceType 字段
- AreaDeviceRelationServiceImpl.listByAreaId 改为批量查询避免 N+1 RPC、
  增加 null 防护;bindDevice 改为 fail-fast 不再存脏数据
- ErrorCodeConstants 新增 IOT_SERVICE_UNAVAILABLE 错误码

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-07 21:06:10 +08:00
27 changed files with 1059 additions and 457 deletions

View File

@@ -10,6 +10,7 @@ import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import java.util.Collection;
import java.util.List;
/**
@@ -38,4 +39,9 @@ public interface IotDeviceQueryApi {
@Parameter(name = "id", description = "设备ID", required = true)
CommonResult<IotDeviceSimpleRespDTO> getDevice(@RequestParam("id") Long id);
@GetMapping(PREFIX + "/batch-get")
@Operation(summary = "批量获取设备详情")
@Parameter(name = "ids", description = "设备ID列表", required = true, example = "[1,2,3]")
CommonResult<List<IotDeviceSimpleRespDTO>> batchGetDevices(@RequestParam("ids") Collection<Long> ids);
}

View File

@@ -29,4 +29,16 @@ public class IotDeviceSimpleRespDTO {
@Schema(description = "产品名称", example = "客流计数器")
private String productName;
@Schema(description = "设备备注名称", example = "A栋3层客流计数器")
private String nickname;
@Schema(description = "设备序列号", example = "SN20250101001")
private String serialNumber;
@Schema(description = "设备状态0-未激活 1-在线 2-离线)", example = "1")
private Integer state;
@Schema(description = "设备类型", example = "10")
private Integer deviceType;
}

View File

@@ -2,9 +2,10 @@ package com.viewsh.module.iot.api.device;
import com.viewsh.framework.common.pojo.CommonResult;
import com.viewsh.module.iot.api.device.dto.IotDeviceSimpleRespDTO;
import com.viewsh.module.iot.controller.admin.device.vo.device.IotDeviceRespVO;
import com.viewsh.module.iot.dal.dataobject.device.IotDeviceDO;
import com.viewsh.module.iot.dal.dataobject.product.IotProductDO;
import com.viewsh.module.iot.service.device.IotDeviceService;
import com.viewsh.module.iot.service.product.IotProductService;
import io.swagger.v3.oas.annotations.Operation;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
@@ -14,8 +15,8 @@ import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import java.util.Collection;
import java.util.List;
import java.util.stream.Collectors;
import static com.viewsh.framework.common.pojo.CommonResult.success;
import static com.viewsh.module.iot.api.device.IotDeviceQueryApi.PREFIX;
@@ -36,55 +37,74 @@ public class IotDeviceQueryApiImpl implements IotDeviceQueryApi {
@Resource
private IotDeviceService deviceService;
@Resource
private IotProductService productService;
@Override
@GetMapping(PREFIX + "/simple-list")
@Operation(summary = "获取设备精简列表(按类型/产品筛选)")
public CommonResult<List<IotDeviceSimpleRespDTO>> getDeviceSimpleList(
@RequestParam(value = "deviceType", required = false) Integer deviceType,
@RequestParam(value = "productId", required = false) Long productId) {
try {
List<IotDeviceDO> list = deviceService.getDeviceListByCondition(deviceType, productId);
List<IotDeviceSimpleRespDTO> result = list.stream()
.map(device -> {
IotDeviceSimpleRespDTO dto = new IotDeviceSimpleRespDTO();
dto.setId(device.getId());
dto.setDeviceName(device.getDeviceName());
dto.setProductId(device.getProductId());
dto.setProductKey(device.getProductKey());
// TODO: 从产品服务获取产品名称
dto.setProductName("产品_" + device.getProductKey());
return dto;
})
.collect(Collectors.toList());
return success(result);
} catch (Exception e) {
log.error("[getDeviceSimpleList] 查询设备列表失败: deviceType={}, productId={}", deviceType, productId, e);
return success(List.of());
}
List<IotDeviceDO> list = deviceService.getDeviceListByCondition(deviceType, productId);
List<IotDeviceSimpleRespDTO> result = list.stream()
.map(this::toSimpleDTO)
.toList();
return success(result);
}
@Override
@GetMapping(PREFIX + "/get")
@Operation(summary = "获取设备详情")
public CommonResult<IotDeviceSimpleRespDTO> getDevice(@RequestParam("id") Long id) {
try {
IotDeviceDO device = deviceService.getDevice(id);
if (device == null) {
return success(null);
}
IotDeviceSimpleRespDTO dto = new IotDeviceSimpleRespDTO();
dto.setId(device.getId());
dto.setDeviceName(device.getDeviceName());
dto.setProductId(device.getProductId());
dto.setProductKey(device.getProductKey());
dto.setProductName("产品_" + device.getProductKey());
return success(dto);
} catch (Exception e) {
log.error("[getDevice] 查询设备详情失败: id={}", id, e);
IotDeviceDO device = deviceService.getDevice(id);
if (device == null) {
return success(null);
}
return success(toSimpleDTO(device));
}
@Override
@GetMapping(PREFIX + "/batch-get")
@Operation(summary = "批量获取设备详情")
public CommonResult<List<IotDeviceSimpleRespDTO>> batchGetDevices(
@RequestParam("ids") Collection<Long> ids) {
if (ids == null || ids.isEmpty()) {
return success(List.of());
}
List<IotDeviceDO> devices = deviceService.getDeviceList(ids);
List<IotDeviceSimpleRespDTO> result = devices.stream()
.map(this::toSimpleDTO)
.toList();
return success(result);
}
private IotDeviceSimpleRespDTO toSimpleDTO(IotDeviceDO device) {
IotDeviceSimpleRespDTO dto = new IotDeviceSimpleRespDTO();
dto.setId(device.getId());
dto.setDeviceName(device.getDeviceName());
dto.setProductId(device.getProductId());
dto.setProductKey(device.getProductKey());
dto.setNickname(device.getNickname());
dto.setSerialNumber(device.getSerialNumber());
dto.setState(device.getState());
dto.setDeviceType(device.getDeviceType());
// 从产品缓存获取产品名称
dto.setProductName(resolveProductName(device.getProductId()));
return dto;
}
private String resolveProductName(Long productId) {
if (productId == null) {
return null;
}
try {
IotProductDO product = productService.getProductFromCache(productId);
return product != null ? product.getName() : null;
} catch (Exception e) {
log.warn("[resolveProductName] 获取产品名称失败: productId={}, error={}", productId, e.getMessage());
return null;
}
}
}

View File

@@ -191,15 +191,16 @@ public class SignalLossRuleProcessor {
long minValidWorkMillis = exitConfig.getMinValidWorkMinutes() * 60000L;
// 6. 分支处理:有效 vs 无效作业
if (durationMs < minValidWorkMillis) {
// 作业时长不足,抑制完成
handleInvalidWork(deviceId, badgeDeviceKey, areaId,
durationMs, minValidWorkMillis, exitConfig);
} else {
// 作业时长有效,触发完成
handleTimeoutComplete(deviceId, badgeDeviceKey, areaId,
durationMs, lastLossTime);
}
// TODO 暂时取消作业时长不足抑制自动完成的逻辑,所有情况均触发完成
// if (durationMs < minValidWorkMillis) {
// // 作业时长不足,抑制完成
// handleInvalidWork(deviceId, badgeDeviceKey, areaId,
// durationMs, minValidWorkMillis, exitConfig);
// } else {
// 作业时长有效,触发完成
handleTimeoutComplete(deviceId, badgeDeviceKey, areaId,
durationMs, lastLossTime);
// }
}
/**

View File

@@ -36,8 +36,8 @@ import java.util.concurrent.TimeUnit;
* <p>
* 客流触发逻辑(周期化):
* 1. 无活跃工单 → 创建新工单 → 标记活跃 → 重置阈值
* 2. 有未派发工单(PENDING/QUEUED) → 升级优先级一级 → 重置阈值
* 3. 有已派发工单(DISPATCHED/CONFIRMED/ARRIVED) → 忽略 → 重置阈值
* 2. 有排队中工单(PENDING/QUEUED) → 升级优先级一级 → 重置阈值
* 3. 有已派发/已到达工单(DISPATCHED/CONFIRMED/ARRIVED) → 静默处理,不升级不创建 → 重置阈值
* 4. 已是 P0 → 不升级,记录审计日志 → 重置阈值
* <p>
* RocketMQ 配置:
@@ -69,9 +69,9 @@ public class CleanOrderCreateEventHandler implements RocketMQListener<String> {
private static final int DEDUP_TTL_SECONDS = 300;
/**
* 未派发状态集合(可升级优先级)
* 可升级优先级的状态集合(仅排队中,尚未派发
*/
private static final Set<String> UNDISPATCHED_STATUSES = Set.of(
private static final Set<String> UPGRADABLE_STATUSES = Set.of(
WorkOrderStatusEnum.PENDING.getStatus(),
WorkOrderStatusEnum.QUEUED.getStatus()
);
@@ -162,8 +162,8 @@ public class CleanOrderCreateEventHandler implements RocketMQListener<String> {
if (activeOrder != null) {
String status = activeOrder.getStatus();
if (UNDISPATCHED_STATUSES.contains(status)) {
// 未派发 → 升级优先级一级
if (UPGRADABLE_STATUSES.contains(status)) {
// 排队中 → 升级优先级一级
PriorityEnum result = cleanOrderService.upgradeOneLevelPriority(
activeOrder.getOrderId(), "客流持续达标自动升级");
@@ -179,9 +179,10 @@ public class CleanOrderCreateEventHandler implements RocketMQListener<String> {
areaId, activeOrder.getOrderId());
}
} else {
// 已派发DISPATCHED/CONFIRMED/ARRIVED忽略
log.info("[CleanOrderCreateEventHandler] 区域{}已有已派发工单{}(状态:{}),忽略本次客流触发",
areaId, activeOrder.getOrderId(), status);
// 已派发/已确认/已到达 → 保洁员已在处理中,静默忽略
log.info("[CleanOrderCreateEventHandler] 区域{}保洁员已在处理中(状态:{}),客流触发静默忽略: orderId={}",
areaId, status, activeOrder.getOrderId());
recordArrivedSilentLog(event, activeOrder.getOrderId());
}
// ★ 所有分支都重置阈值
@@ -389,6 +390,33 @@ public class CleanOrderCreateEventHandler implements RocketMQListener<String> {
}
}
/**
* 记录已派发/已到达静默处理审计日志
*/
private void recordArrivedSilentLog(CleanOrderCreateEventDTO event, Long orderId) {
try {
Map<String, Object> extra = new HashMap<>();
extra.put("eventId", event.getEventId());
extra.put("areaId", event.getAreaId());
extra.put("reason", "保洁员已在处理中,客流触发静默忽略");
eventLogRecorder.record(EventLogRecord.builder()
.module("clean")
.domain(EventDomain.TRAFFIC)
.eventType("ARRIVED_SILENT_IGNORE")
.message(String.format("保洁员已在处理中,客流触发静默忽略 [区域:%d]", event.getAreaId()))
.targetId(orderId)
.targetType("order")
.deviceId(event.getTriggerDeviceId())
.level(EventLevel.INFO)
.payload(extra)
.build());
} catch (Exception e) {
log.warn("[CleanOrderCreateEventHandler] 记录静默处理日志失败: orderId={}", orderId, e);
}
}
/**
* 确定事件域
*/

View File

@@ -1,5 +1,7 @@
package com.viewsh.module.ops.environment.integration.listener;
import com.viewsh.module.iot.api.device.IotDeviceControlApi;
import com.viewsh.module.iot.api.device.dto.ResetTrafficCounterReqDTO;
import com.viewsh.module.ops.api.queue.OrderQueueService;
import com.viewsh.module.ops.core.dispatch.DispatchEngine;
import com.viewsh.module.ops.core.dispatch.model.DispatchResult;
@@ -87,6 +89,9 @@ public class CleanOrderEventListener {
@Resource
private TrafficActiveOrderRedisDAO trafficActiveOrderRedisDAO;
@Resource
private IotDeviceControlApi iotDeviceControlApi;
// ==================== 工单创建事件 ====================
/**
@@ -184,7 +189,7 @@ public class CleanOrderEventListener {
break;
case COMPLETED:
handleCompleted(event);
clearTrafficActiveOrder(event);
clearTrafficActiveOrderOnComplete(event);
break;
case CANCELLED:
@@ -733,22 +738,80 @@ public class CleanOrderEventListener {
}
/**
* 工单终态时,清除 Redis 中的活跃工单标记
* 工单完成时,先重置 IoT 客流计数器,再清除活跃标记
* <p>
* 仅处理客流触发的工单。清除后下次客流达标将创建新工单(新周期
* 顺序至关重要:先重置计数器确保设备归零,再清除活跃标记开放新周期。
* 如果先清标记后重置,存在竞态窗口——已发出的 MQ 消息在标记清除后到达,
* 此时计数器尚未归零,会误创建基于残留计数的工单。
* <p>
* 如果计数器重置失败,仍然清除活跃标记(降级处理),避免区域永远被锁死。
*/
private void clearTrafficActiveOrderOnComplete(OrderStateChangedEvent event) {
try {
OpsOrderDO order = opsOrderMapper.selectById(event.getOrderId());
if (order != null && "IOT_TRAFFIC".equals(order.getTriggerSource()) && order.getAreaId() != null) {
// 1. 先重置 IoT 客流计数器(阻塞等待结果)
boolean resetOk = resetTrafficCounterOnComplete(order.getTriggerDeviceId(), order.getAreaId());
// 2. 再清除活跃标记
trafficActiveOrderRedisDAO.removeActive(order.getAreaId());
if (resetOk) {
log.info("[CleanOrderEventListener] 客流工单完成,计数器已重置,活跃标记已清除: areaId={}", order.getAreaId());
} else {
log.warn("[CleanOrderEventListener] 客流工单完成,计数器重置失败但活跃标记已清除(降级): areaId={}", order.getAreaId());
}
}
} catch (Exception e) {
log.warn("[CleanOrderEventListener] 清除客流活跃工单标记失败: orderId={}", event.getOrderId(), e);
}
}
/**
* 工单取消时,仅清除活跃标记,不重置计数器
* <p>
* 取消意味着区域未被清洁,客流计数应保留,以便尽快重新触发工单。
*/
private void clearTrafficActiveOrder(OrderStateChangedEvent event) {
try {
OpsOrderDO order = opsOrderMapper.selectById(event.getOrderId());
if (order != null && "IOT_TRAFFIC".equals(order.getTriggerSource()) && order.getAreaId() != null) {
trafficActiveOrderRedisDAO.removeActive(order.getAreaId());
log.info("[CleanOrderEventListener] 客流工单周期结束,已清除区域{}活跃标记", order.getAreaId());
log.info("[CleanOrderEventListener] 客流工单取消,已清除区域{}活跃标记(计数器保留)", order.getAreaId());
}
} catch (Exception e) {
log.warn("[CleanOrderEventListener] 清除客流活跃工单标记失败: orderId={}", event.getOrderId(), e);
}
}
/**
* 工单完成时重置客流计数器
*
* @return true 重置成功false 重置失败或无设备ID
*/
private boolean resetTrafficCounterOnComplete(Long deviceId, Long areaId) {
if (deviceId == null) {
return false;
}
try {
ResetTrafficCounterReqDTO reqDTO = ResetTrafficCounterReqDTO.builder()
.deviceId(deviceId)
.remark("工单完成后重置计数器,清除作业期间残留计数")
.build();
var result = iotDeviceControlApi.resetTrafficCounter(reqDTO);
if (result.getData() != null && result.getData()) {
log.info("[CleanOrderEventListener] 工单完成,计数器重置成功: deviceId={}, areaId={}", deviceId, areaId);
return true;
} else {
log.warn("[CleanOrderEventListener] 工单完成,计数器重置失败: deviceId={}, areaId={}", deviceId, areaId);
return false;
}
} catch (Exception e) {
log.warn("[CleanOrderEventListener] 工单完成,计数器重置异常: deviceId={}", deviceId, e);
return false;
}
}
/**
* 记录暂停开始时间
*/

View File

@@ -245,13 +245,13 @@ public class CleanOrderServiceImpl implements CleanOrderService {
if (queueDTO != null) {
orderQueueService.adjustPriority(queueDTO.getId(), PriorityEnum.P0, reason);
// 5. 使用新的调度引擎处理P0紧急插队
DispatchResult result = dispatchEngine.urgentInterrupt(orderId, queueDTO.getUserId());
// 5. 重算等待队列P0 不再打断当前任务
orderQueueService.rebuildWaitingTasksByUserId(queueDTO.getUserId(), order.getAreaId());
// 6. 发送优先级升级通知
cleanOrderEventListener.sendPriorityUpgradeNotification(queueDTO.getUserId(), order.getOrderCode(), orderId);
return result.isSuccess();
return true;
}
return true;
@@ -289,11 +289,11 @@ public class CleanOrderServiceImpl implements CleanOrderService {
if (queueDTO != null) {
orderQueueService.adjustPriority(queueDTO.getId(), newPriority, reason);
// 6. 如果升级到 P0触发紧急打断逻辑
// 6. 如果升级到 P0仅重算等待队列,不再触发打断
if (newPriority == PriorityEnum.P0) {
DispatchResult result = dispatchEngine.urgentInterrupt(orderId, queueDTO.getUserId());
orderQueueService.rebuildWaitingTasksByUserId(queueDTO.getUserId(), order.getAreaId());
cleanOrderEventListener.sendPriorityUpgradeNotification(queueDTO.getUserId(), order.getOrderCode(), orderId);
log.warn("客流升级到P0触发紧急打断: orderId={}, success={}", orderId, result.isSuccess());
log.warn("客流升级到P0已重算等待队列: orderId={}", orderId);
}
}

View File

@@ -23,13 +23,13 @@ import java.util.List;
* <p>
* 职责:怎么派单
* <p>
* 策略规则:
* <ul>
* <li>空闲无任务 → DIRECT_DISPATCH直接派单</li>
* <li>空闲有等待 → PUSH_AND_ENQUEUE推送等待+新任务入队)</li>
* <li>忙碌且非P0 → ENQUEUE_ONLY仅入队</li>
* <li>忙碌且P0 → INTERRUPT_AND_DISPATCH打断并派单</li>
* </ul>
* 策略规则:
* <ul>
* <li>空闲无任务 → DIRECT_DISPATCH直接派单</li>
* <li>空闲有等待 → PUSH_AND_ENQUEUE推送等待+新任务入队)</li>
* <li>忙碌且非P0 → ENQUEUE_ONLY仅入队</li>
* <li>忙碌且P0 → ENQUEUE_ONLY不抢断进入等待队列</li>
* </ul>
*
* @author lzh
*/
@@ -106,12 +106,12 @@ public class BadgeDeviceScheduleStrategy implements ScheduleStrategy {
// P0紧急任务需要打断
if (assigneeStatus.getCurrentTaskCount() > 0) {
Long currentOrderId = deviceStatus.getCurrentOpsOrderId();
log.warn("决策: INTERRUPT_AND_DISPATCH - P0紧急任务打断当前任务: currentOrderId={}", currentOrderId);
return DispatchDecision.interruptAndDispatch(currentOrderId);
} else {
log.info("决策: DIRECT_DISPATCH - P0紧急任务直接派");
return DispatchDecision.directDispatch();
}
log.info("决策: ENQUEUE_ONLY - P0工单不再打断当前任务,进入等待队列: currentOrderId={}", currentOrderId);
return DispatchDecision.enqueueOnly();
} else {
log.info("决策: DIRECT_DISPATCH - P0工单在空闲状态下直接派");
return DispatchDecision.directDispatch();
}
} else {
// 非紧急任务,设备忙碌,入队等待
log.info("决策: ENQUEUE_ONLY - 设备忙碌,任务入队等待");
@@ -133,15 +133,14 @@ public class BadgeDeviceScheduleStrategy implements ScheduleStrategy {
// P0任务可以打断任何任务
if (urgentContext.isUrgent()) {
log.warn("允许打断: P0紧急任务可以打断当前任务");
return InterruptDecision.allowByDefault();
}
// P1/P2任务不能打断
log.info("拒绝打断: 非P0任务不能打断当前任务");
return InterruptDecision.deny(
"紧急任务优先级不足",
"建议等待当前任务完成"
);
log.info("拒绝打断: 当前调度已改为非抢占式队列派发");
return InterruptDecision.deny("当前调度不再支持抢断", "工单将按队列总分在下一轮派发");
}
log.info("拒绝打断: 当前调度已改为非抢占式队列派发");
return InterruptDecision.deny(
"当前调度不再支持抢断",
"工单将按队列总分在下一轮派发"
);
}
}

View File

@@ -28,7 +28,7 @@ import java.util.List;
* <li>空闲无任务 → DIRECT_DISPATCH直接派单</li>
* <li>空闲有等待 → PUSH_AND_ENQUEUE推送等待+新任务入队)</li>
* <li>忙碌且非P0 → ENQUEUE_ONLY仅入队</li>
* <li>忙碌且P0 → INTERRUPT_AND_DISPATCH打断并派单</li>
* <li>忙碌且P0 → ENQUEUE_ONLY不抢断进入等待队列</li>
* </ul>
*
* @author lzh
@@ -106,10 +106,10 @@ public class CleanerPriorityScheduleStrategy implements ScheduleStrategy {
// P0紧急任务需要打断
if (assigneeStatus.getCurrentTaskCount() > 0) {
Long currentOrderId = cleanerStatus.getCurrentOpsOrderId();
log.warn("决策: INTERRUPT_AND_DISPATCH - P0紧急任务打断当前任务: currentOrderId={}", currentOrderId);
return DispatchDecision.interruptAndDispatch(currentOrderId);
log.info("决策: ENQUEUE_ONLY - P0工单不再打断当前任务,进入等待队列: currentOrderId={}", currentOrderId);
return DispatchDecision.enqueueOnly();
} else {
log.info("决策: DIRECT_DISPATCH - P0紧急任务直接派");
log.info("决策: DIRECT_DISPATCH - P0工单在空闲状态下直接派");
return DispatchDecision.directDispatch();
}
} else {
@@ -133,15 +133,14 @@ public class CleanerPriorityScheduleStrategy implements ScheduleStrategy {
// P0任务可以打断任何任务
if (urgentContext.isUrgent()) {
log.warn("允许打断: P0紧急任务可以打断当前任务");
return InterruptDecision.allowByDefault();
log.info("拒绝打断: 当前调度已改为非抢占式队列派发");
return InterruptDecision.deny("当前调度不再支持抢断", "工单将按队列总分在下一轮派发");
}
// P1/P2任务不能打断
log.info("拒绝打断: 非P0任务不能打断当前任务");
log.info("拒绝打断: 当前调度已改为非抢占式队列派发");
return InterruptDecision.deny(
"紧急任务优先级不足",
"建议等待当前任务完成"
"当前调度不再支持抢断",
"工单将按队列总分在下一轮派发"
);
}
}

View File

@@ -9,10 +9,13 @@ import com.viewsh.module.ops.core.event.OrderEventPublisher;
import com.viewsh.module.ops.core.lifecycle.OrderLifecycleManager;
import com.viewsh.module.ops.core.lifecycle.model.OrderTransitionRequest;
import com.viewsh.module.ops.dal.dataobject.workorder.OpsOrderDO;
import com.viewsh.module.ops.dal.dataobject.area.OpsBusAreaDO;
import com.viewsh.module.ops.dal.mysql.area.OpsBusAreaMapper;
import com.viewsh.module.ops.dal.mysql.workorder.OpsOrderMapper;
import com.viewsh.module.ops.enums.PriorityEnum;
import com.viewsh.module.ops.enums.WorkOrderStatusEnum;
import com.viewsh.module.ops.environment.dal.dataobject.workorder.OpsOrderCleanExtDO;
import com.viewsh.module.ops.environment.dal.redis.TrafficActiveOrderRedisDAO;
import com.viewsh.module.ops.environment.dal.mysql.workorder.OpsOrderCleanExtMapper;
import com.viewsh.module.ops.environment.integration.consumer.*;
import com.viewsh.framework.common.pojo.CommonResult;
@@ -73,6 +76,8 @@ public class CleanOrderEndToEndTest {
@Mock
private OpsOrderMapper opsOrderMapper;
@Mock
private OpsBusAreaMapper opsBusAreaMapper;
@Mock
private OpsOrderCleanExtMapper cleanExtMapper;
@Mock
private OrderIdGenerator orderIdGenerator;
@@ -94,6 +99,8 @@ public class CleanOrderEndToEndTest {
private ValueOperations<String, String> valueOperations;
@Mock
private VoiceBroadcastService voiceBroadcastService;
@Mock
private TrafficActiveOrderRedisDAO trafficActiveOrderRedisDAO;
@Mock
private BadgeDeviceStatusService badgeDeviceStatusService;
@@ -149,6 +156,7 @@ public class CleanOrderEndToEndTest {
// 注入 CleanOrderEventListener
injectField(cleanOrderService, "cleanOrderEventListener", cleanOrderEventListener);
injectField(cleanOrderService, "opsBusAreaMapper", opsBusAreaMapper);
// 注入 CleanOrderAuditEventHandler 依赖
injectField(auditEventHandler, "eventLogRecorder", eventLogRecorder);
@@ -156,10 +164,18 @@ public class CleanOrderEndToEndTest {
injectField(auditEventHandler, "opsOrderMapper", opsOrderMapper);
injectField(auditEventHandler, "stringRedisTemplate", stringRedisTemplate);
injectField(auditEventHandler, "objectMapper", objectMapper);
injectField(createEventHandler, "trafficActiveOrderRedisDAO", trafficActiveOrderRedisDAO);
// Stub IotDeviceControlApi for resetTrafficCounter
lenient().when(iotDeviceControlApi.resetTrafficCounter(any()))
.thenReturn(CommonResult.success(true));
lenient().when(opsBusAreaMapper.selectById(anyLong()))
.thenAnswer(i -> OpsBusAreaDO.builder()
.id(i.getArgument(0))
.areaName("测试区域")
.parentPath(null)
.floorNo(1)
.build());
}
// ==========================================
@@ -338,7 +354,7 @@ public class CleanOrderEndToEndTest {
verify(eventLogRecorder).record(any());
// 2. TTS sent (orderId can be null for TTS_REQUEST events)
verify(voiceBroadcastService).broadcastInOrder(eq(5001L), contains("请回到作业区域"), eq((Long) null));
verify(voiceBroadcastService).broadcastDirect(eq(5001L), contains("请回到作业区域"), eq(9), eq((Long) null));
}
// ==========================================
@@ -378,9 +394,6 @@ public class CleanOrderEndToEndTest {
when(opsOrderMapper.selectById(orderId)).thenReturn(order);
when(orderQueueService.getByOpsOrderId(orderId)).thenReturn(queueDTO);
when(dispatchEngine.urgentInterrupt(eq(orderId), eq(2001L)))
.thenReturn(DispatchResult.success("Success", 2001L));
// Execute
boolean result = cleanOrderService.upgradePriorityToP0(orderId, "Manual Upgrade");
@@ -392,7 +405,7 @@ public class CleanOrderEndToEndTest {
assertEquals(PriorityEnum.P0.getPriority(), orderCaptor.getValue().getPriority());
verify(orderQueueService).adjustPriority(eq(500L), eq(PriorityEnum.P0), anyString());
verify(dispatchEngine).urgentInterrupt(orderId, 2001L);
verify(orderQueueService).rebuildWaitingTasksByUserId(2001L, order.getAreaId());
verify(cleanOrderEventListener).sendPriorityUpgradeNotification(eq(2001L), eq("WO-P2"), eq(orderId));
}

View File

@@ -36,7 +36,7 @@ class CleanerPriorityScheduleStrategyTest {
private com.viewsh.module.ops.core.dispatch.DispatchEngine dispatchEngine;
@Test
void testDecide_P0_Interrupt() {
void testDecide_P0_EnqueueOnlyWhenBusy() {
// Setup
OpsCleanerStatusDO c1 = new OpsCleanerStatusDO();
c1.setUserId(1L);
@@ -54,8 +54,7 @@ class CleanerPriorityScheduleStrategyTest {
DispatchDecision decision = strategy.decide(context);
// Verify
assertEquals(DispatchPath.INTERRUPT_AND_DISPATCH, decision.getPath());
assertEquals(500L, decision.getInterruptedOrderId());
assertEquals(DispatchPath.ENQUEUE_ONLY, decision.getPath());
}
@Test

View File

@@ -40,13 +40,8 @@ public class OrderQueueDTO {
/**
* 队列分数(用于排序)
* 计算公式:优先级分 + 时间戳
* - P0: 0 + timestamp
* - P1: 1000000 + timestamp
* - P2: 2000000 + timestamp
* - P3: 3000000 + timestamp
*
* 用于数据库层面的排序,优先级高的排在前面,同优先级按时间排序
* 计算公式:优先级分 + 楼层差分 - 等待老化分
* 分数越小越靠前,用于等待队列的动态重排
*/
private Double queueScore;
@@ -95,6 +90,31 @@ public class OrderQueueDTO {
*/
private LocalDateTime updateTime;
/**
* 评分基准楼层
*/
private Integer baseFloorNo;
/**
* 目标工单楼层
*/
private Integer targetFloorNo;
/**
* 楼层差
*/
private Integer floorDiff;
/**
* 等待分钟数
*/
private Long waitMinutes;
/**
* 分数更新时间
*/
private LocalDateTime scoreUpdateTime;
// ========== 兼容旧字段名的getter方法 ==========
/**

View File

@@ -164,6 +164,15 @@ public interface OrderQueueService {
*/
List<OrderQueueDTO> getWaitingTasksByUserIdFromDb(Long userId);
/**
* 按当前上下文重算指定执行人等待队列的总分并返回最新排序结果
*
* @param userId 执行人ID
* @param fallbackAreaId 当没有执行中工单时可使用的楼层基准区域ID
* @return 已按最新总分排序的 WAITING 工单列表
*/
List<OrderQueueDTO> rebuildWaitingTasksByUserId(Long userId, Long fallbackAreaId);
/**
* 获取用户的暂停任务列表PAUSED状态按暂停时间排序
*

View File

@@ -21,5 +21,6 @@ public interface ErrorCodeConstants {
ErrorCode DEVICE_ALREADY_BOUND = new ErrorCode(1_020_002_001, "该工牌已绑定至此区域");
ErrorCode DEVICE_TYPE_ALREADY_BOUND = new ErrorCode(1_020_002_002, "该区域已绑定{},一个区域只能绑定一个");
ErrorCode DEVICE_RELATION_NOT_FOUND = new ErrorCode(1_020_002_003, "设备关联关系不存在");
ErrorCode IOT_SERVICE_UNAVAILABLE = new ErrorCode(1_020_002_004, "IoT 设备服务不可用,请稍后重试");
}

View File

@@ -134,7 +134,7 @@ public class DispatchEngineImpl implements DispatchEngine {
@Override
@Transactional(rollbackFor = Exception.class)
public DispatchResult urgentInterrupt(Long urgentOrderId, Long assigneeId) {
log.warn("开始P0紧急插队: urgentOrderId={}, assigneeId={}", urgentOrderId, assigneeId);
log.warn("处理P0工单派发请求: urgentOrderId={}, assigneeId={}", urgentOrderId, assigneeId);
// 查询紧急工单
OpsOrderDO urgentOrder = orderMapper.selectById(urgentOrderId);
@@ -156,23 +156,15 @@ public class DispatchEngineImpl implements DispatchEngine {
// 查询执行人当前任务
List<OrderQueueDTO> currentTasks = orderQueueService.getTasksByUserId(assigneeId);
// 如果有正在执行的任务,需要打断
// 如果有正在执行的任务,不再打断,重排等待队列后等待下一轮派发
OrderQueueDTO currentTask = currentTasks.stream()
.filter(t -> OrderQueueStatusEnum.PROCESSING.getStatus().equals(t.getQueueStatus()))
.findFirst()
.orElse(null);
if (currentTask != null && !currentTask.getOpsOrderId().equals(urgentOrderId)) {
// 需要打断当前任务
log.info("打断当前任务: currentOrderId={}, urgentOrderId={}",
currentTask.getOpsOrderId(), urgentOrderId);
try {
orderLifecycleManager.interruptOrder(
currentTask.getOpsOrderId(), urgentOrderId, assigneeId);
} catch (Exception e) {
log.warn("打断任务失败: currentOrderId={}", currentTask.getOpsOrderId(), e);
}
orderQueueService.rebuildWaitingTasksByUserId(assigneeId, null);
return DispatchResult.success("P0工单已入队等待不再打断当前任务", assigneeId);
}
// 派发紧急任务
@@ -182,63 +174,42 @@ public class DispatchEngineImpl implements DispatchEngine {
.assigneeId(assigneeId)
.operatorType(OperatorTypeEnum.SYSTEM)
.operatorId(assigneeId)
.reason("P0紧急任务派单")
.reason("P0工单直接派发")
.build();
OrderTransitionResult result = orderLifecycleManager.dispatch(request);
if (result.isSuccess()) {
return DispatchResult.success("P0紧急任务已派单", assigneeId);
return DispatchResult.success("P0工单已直接派发", assigneeId);
} else {
return DispatchResult.fail("P0紧急任务派单失败: " + result.getMessage());
return DispatchResult.fail("P0工单派发失败: " + result.getMessage());
}
}
@Override
@Transactional(rollbackFor = Exception.class)
public DispatchResult autoDispatchNext(Long completedOrderId, Long assigneeId) {
log.info("任务完成后自动调度下一: completedOrderId={}, assigneeId={}", completedOrderId, assigneeId);
log.info("任务完成后自动派发下一: completedOrderId={}, assigneeId={}", completedOrderId, assigneeId);
// 1. 优先检查是否有被中断的任务
List<OrderQueueDTO> interruptedTasks = orderQueueService.getInterruptedTasksByUserId(assigneeId);
if (!interruptedTasks.isEmpty()) {
// 恢复第一个中断的任务
OrderQueueDTO interruptedTask = interruptedTasks.get(0);
log.info("恢复中断任务: orderId={}", interruptedTask.getOpsOrderId());
OrderTransitionRequest request = OrderTransitionRequest.builder()
.orderId(interruptedTask.getOpsOrderId())
.targetStatus(WorkOrderStatusEnum.DISPATCHED)
.queueId(interruptedTask.getId())
.assigneeId(assigneeId)
.operatorType(OperatorTypeEnum.SYSTEM)
.operatorId(assigneeId)
.reason("恢复中断任务")
.build();
OrderTransitionResult result = orderLifecycleManager.transition(request);
if (result.isSuccess()) {
return DispatchResult.success("已恢复中断任务", assigneeId);
} else {
return DispatchResult.fail("恢复中断任务失败: " + result.getMessage());
}
Long fallbackAreaId = null;
OpsOrderDO completedOrder = orderMapper.selectById(completedOrderId);
if (completedOrder != null) {
fallbackAreaId = completedOrder.getAreaId();
}
// 2. 如果没有中断任务,推送队列中的下一个任务
// 注意:这里必须从 MySQL 读取最新数据,确保刚完成的任务不在等待列表中
List<OrderQueueDTO> waitingTasks = orderQueueService.getWaitingTasksByUserIdFromDb(assigneeId);
List<OrderQueueDTO> waitingTasks = orderQueueService.rebuildWaitingTasksByUserId(assigneeId, fallbackAreaId);
if (waitingTasks.isEmpty()) {
log.info("无等待任务,执行人变为空闲: assigneeId={}", assigneeId);
// 发布事件,由业务层更新执行人状态
return DispatchResult.success("无等待任务,任务完成", assigneeId);
return DispatchResult.success("无等待工单,执行人保持空闲", assigneeId);
}
// <EFBFBD><EFBFBD>送第一个等待任务
// 动态总分重排后,派发得分最低的等待工单
OrderQueueDTO nextTask = waitingTasks.get(0);
log.info("推送下一个等待任务: taskId={}, orderId={}", nextTask.getId(), nextTask.getOpsOrderId());
log.info("派发下一单: queueId={}, orderId={}, score={}, floorDiff={}, waitMinutes={}",
nextTask.getId(), nextTask.getOpsOrderId(), nextTask.getQueueScore(),
nextTask.getFloorDiff(), nextTask.getWaitMinutes());
OrderTransitionRequest request = OrderTransitionRequest.builder()
.orderId(nextTask.getOpsOrderId())
@@ -247,15 +218,15 @@ public class DispatchEngineImpl implements DispatchEngine {
.assigneeId(assigneeId)
.operatorType(OperatorTypeEnum.SYSTEM)
.operatorId(assigneeId)
.reason("自动推送下一个任务")
.reason("等待队列动态重排后自动派发")
.build();
OrderTransitionResult result = orderLifecycleManager.transition(request);
if (result.isSuccess()) {
return DispatchResult.success("推送下一个任务", assigneeId);
return DispatchResult.success("按队列总分派发下一单", assigneeId);
} else {
return DispatchResult.fail("推送下一个任务失败: " + result.getMessage());
return DispatchResult.fail("按队列总分派发下一单失败: " + result.getMessage());
}
}
@@ -330,7 +301,7 @@ public class DispatchEngineImpl implements DispatchEngine {
OrderDispatchContext urgentContext) {
ScheduleStrategy strategy = scheduleStrategyRegistry.get(urgentContext.getBusinessType());
if (strategy == null) {
log.warn("未找到调度策略,使用默认断规则: businessType={}", urgentContext.getBusinessType());
log.warn("未找到调度策略,使用默认非抢断规则: businessType={}", urgentContext.getBusinessType());
return defaultInterruptDecision(urgentContext);
}
@@ -343,14 +314,13 @@ public class DispatchEngineImpl implements DispatchEngine {
}
/**
* 默认断决策
* P0任务可以打断任何非P0任务
* 默认非抢断决策
*/
private InterruptDecision defaultInterruptDecision(OrderDispatchContext urgentContext) {
if (urgentContext.isUrgent()) {
return InterruptDecision.allowByDefault();
return InterruptDecision.deny("当前调度不再支持抢断", "工单将按队列总分在下一轮派发");
}
return InterruptDecision.deny("紧急任务优先级不足", "建议等待当前任务完成");
return InterruptDecision.deny("当前调度不再支持抢断", "建议等待当前任务完成后按队列总分派发");
}
// ==================== 私有方法 ====================
@@ -502,41 +472,12 @@ public class DispatchEngineImpl implements DispatchEngine {
}
/**
* 打断并派单
* 历史抢断入口已废弃,统一降级为仅入队等待下一轮动态派发
*/
private DispatchResult executeInterruptAndDispatch(OrderDispatchContext context, Long assigneeId,
Long interruptedOrderId) {
log.warn("执行打断并派单: orderId={}, assigneeId={}, interruptedOrderId={}",
log.warn("检测到过期抢断路径,降级为仅入队: orderId={}, assigneeId={}, interruptedOrderId={}",
context.getOrderId(), assigneeId, interruptedOrderId);
// 先打断当前任务
if (interruptedOrderId != null) {
orderLifecycleManager.interruptOrder(interruptedOrderId, context.getOrderId(), assigneeId);
}
// 派发紧急任务
OrderTransitionRequest request = OrderTransitionRequest.builder()
.orderId(context.getOrderId())
.targetStatus(WorkOrderStatusEnum.DISPATCHED)
.assigneeId(assigneeId)
.assigneeName(context.getRecommendedAssigneeName())
.operatorType(OperatorTypeEnum.SYSTEM)
.operatorId(assigneeId)
.reason("P0紧急任务派单")
.build();
OrderTransitionResult result = orderLifecycleManager.dispatch(request);
if (result.isSuccess()) {
return DispatchResult.success(
"P0紧急任务已派单",
assigneeId,
null,
DispatchPath.INTERRUPT_AND_DISPATCH,
result.getQueueId()
);
} else {
return DispatchResult.fail("P0紧急任务派单失败: " + result.getMessage());
}
return executeEnqueueOnly(context, assigneeId);
}
}

View File

@@ -62,13 +62,8 @@ public class OpsOrderQueueDO extends BaseDO {
/**
* 队列分数(用于排序)
* 计算公式:优先级分 + 时间戳
* - P0: 0 + timestamp
* - P1: 1000000 + timestamp
* - P2: 2000000 + timestamp
* - P3: 3000000 + timestamp
*
* 用于数据库层面的排序,优先级高的排在前面,同优先级按时间排序
* 计算公式:优先级分 + 楼层差分 - 等待老化分
* 分数越小越靠前,用于数据库与 Redis 的一致排序
*/
@TableField("queue_score")
private Double queueScore;

View File

@@ -33,6 +33,18 @@ public class AreaDeviceRelationRespVO {
@Schema(description = "设备名称", example = "客流计数器001")
private String deviceName;
@Schema(description = "设备备注名称", example = "A栋3层客流计数器")
private String nickname;
@Schema(description = "设备序列号", example = "SN20250101001")
private String serialNumber;
@Schema(description = "设备状态0-未激活 1-在线 2-离线)", example = "1")
private Integer deviceState;
@Schema(description = "设备类型", example = "10")
private Integer deviceType;
@Schema(description = "产品ID", example = "10")
private Long productId;

View File

@@ -18,9 +18,9 @@ import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.validation.annotation.Validated;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.*;
import java.util.function.Function;
import java.util.stream.Collectors;
import static com.viewsh.module.ops.enums.AreaTypeEnum.*;
@@ -34,17 +34,17 @@ import static com.viewsh.module.ops.enums.AreaTypeEnum.*;
@Slf4j
public class AreaDeviceRelationServiceImpl implements AreaDeviceRelationService {
@Resource
private OpsAreaDeviceRelationMapper opsAreaDeviceRelationMapper;
@Resource
private OpsBusAreaMapper opsBusAreaMapper;
@Resource
private IotDeviceQueryApi iotDeviceQueryApi;
@Resource
private AreaDeviceService areaDeviceService;
@Resource
private OpsAreaDeviceRelationMapper opsAreaDeviceRelationMapper;
@Resource
private OpsBusAreaMapper opsBusAreaMapper;
@Resource
private IotDeviceQueryApi iotDeviceQueryApi;
@Resource
private AreaDeviceService areaDeviceService;
private static final String TYPE_TRAFFIC_COUNTER = "TRAFFIC_COUNTER";
private static final String TYPE_BEACON = "BEACON";
@@ -53,9 +53,19 @@ public class AreaDeviceRelationServiceImpl implements AreaDeviceRelationService
@Override
public List<AreaDeviceRelationRespVO> listByAreaId(Long areaId) {
List<OpsAreaDeviceRelationDO> list = opsAreaDeviceRelationMapper.selectListByAreaId(areaId);
if (list == null || list.isEmpty()) {
return Collections.emptyList();
}
// 批量查询设备信息,避免 N+1 RPC 调用
Set<Long> deviceIds = list.stream()
.map(OpsAreaDeviceRelationDO::getDeviceId)
.collect(Collectors.toSet());
Map<Long, IotDeviceSimpleRespDTO> deviceMap = batchGetDeviceMap(deviceIds);
return list.stream()
.map(this::convertToRespVO)
.collect(java.util.stream.Collectors.toList());
.map(relation -> convertToRespVO(relation, deviceMap.get(relation.getDeviceId())))
.toList();
}
@Override
@@ -87,43 +97,27 @@ public class AreaDeviceRelationServiceImpl implements AreaDeviceRelationService
}
}
// 调用 IoT 接口获取设备信息
String deviceKey = "DEVICE_" + bindReq.getDeviceId();
Long productId = 0L;
String productKey = "PRODUCT_DEFAULT";
try {
CommonResult<IotDeviceSimpleRespDTO> result = iotDeviceQueryApi.getDevice(bindReq.getDeviceId());
if (result != null && result.getData() != null) {
IotDeviceSimpleRespDTO device = result.getData();
deviceKey = device.getDeviceName() != null ? device.getDeviceName() : deviceKey;
productId = device.getProductId() != null ? device.getProductId() : 0L;
productKey = device.getProductKey() != null ? device.getProductKey() : productKey;
}
} catch (Exception e) {
log.warn("[bindDevice] 调用 IoT 接口获取设备信息失败: deviceId={}, error={}",
bindReq.getDeviceId(), e.getMessage());
// 降级:使用默认值
}
// 调用 IoT 接口获取设备信息(失败则阻断绑定)
IotDeviceSimpleRespDTO device = getDeviceOrThrow(bindReq.getDeviceId());
OpsAreaDeviceRelationDO relation = OpsAreaDeviceRelationDO.builder()
.areaId(bindReq.getAreaId())
.deviceId(bindReq.getDeviceId())
.deviceKey(deviceKey)
.productId(productId)
.productKey(productKey)
.deviceKey(device.getDeviceName())
.productId(device.getProductId())
.productKey(device.getProductKey())
.relationType(bindReq.getRelationType())
.configData(bindReq.getConfigData() != null ? bindReq.getConfigData() : new HashMap<>())
.enabled(true)
.build();
opsAreaDeviceRelationMapper.insert(relation);
// 清除可能存在的 NULL_CACHE 标记
areaDeviceService.evictConfigCache(relation.getAreaId(), relation.getRelationType());
return relation.getId();
}
opsAreaDeviceRelationMapper.insert(relation);
// 清除可能存在的 NULL_CACHE 标记
areaDeviceService.evictConfigCache(relation.getAreaId(), relation.getRelationType());
return relation.getId();
}
@Override
@Transactional(rollbackFor = Exception.class)
@@ -141,60 +135,85 @@ public class AreaDeviceRelationServiceImpl implements AreaDeviceRelationService
relation.setConfigData(updateReq.getConfigData());
}
// enabled 更新
if (updateReq.getEnabled() != null) {
relation.setEnabled(updateReq.getEnabled());
}
opsAreaDeviceRelationMapper.updateById(relation);
// 删缓存以同步 Redis
areaDeviceService.evictConfigCache(existing.getAreaId(), existing.getRelationType());
}
@Override
@Transactional(rollbackFor = Exception.class)
public Boolean unbindDevice(Long id) {
OpsAreaDeviceRelationDO existing = opsAreaDeviceRelationMapper.selectById(id);
if (existing == null) {
return false;
}
boolean deleted = opsAreaDeviceRelationMapper.deleteById(id) > 0;
if (deleted) {
// 同步 Redis 缓存
areaDeviceService.evictConfigCache(existing.getAreaId(), existing.getRelationType());
}
return deleted;
}
/**
* 转换为响应 VO
* 从 IoT 模块获取设备名称和产品名称(带降级)
*/
private AreaDeviceRelationRespVO convertToRespVO(OpsAreaDeviceRelationDO relation) {
AreaDeviceRelationRespVO respVO = BeanUtil.copyProperties(relation, AreaDeviceRelationRespVO.class);
// 从 IoT 模块获取设备信息
try {
CommonResult<IotDeviceSimpleRespDTO> result = iotDeviceQueryApi.getDevice(relation.getDeviceId());
if (result != null && result.getData() != null) {
IotDeviceSimpleRespDTO device = result.getData();
respVO.setDeviceName(device.getDeviceName());
respVO.setProductName(device.getProductName());
} else {
// 降级:使用默认值
respVO.setDeviceName("设备_" + relation.getDeviceId());
respVO.setProductName("产品_" + relation.getProductKey());
}
} catch (Exception e) {
log.warn("[convertToRespVO] 调用 IoT 接口获取设备信息失败: deviceId={}, error={}",
relation.getDeviceId(), e.getMessage());
// 降级:使用默认值
respVO.setDeviceName("设备_" + relation.getDeviceId());
respVO.setProductName("产品_" + relation.getProductKey());
// enabled 更新
if (updateReq.getEnabled() != null) {
relation.setEnabled(updateReq.getEnabled());
}
opsAreaDeviceRelationMapper.updateById(relation);
// 删缓存以同步 Redis
areaDeviceService.evictConfigCache(existing.getAreaId(), existing.getRelationType());
}
@Override
@Transactional(rollbackFor = Exception.class)
public Boolean unbindDevice(Long id) {
OpsAreaDeviceRelationDO existing = opsAreaDeviceRelationMapper.selectById(id);
if (existing == null) {
return false;
}
boolean deleted = opsAreaDeviceRelationMapper.deleteById(id) > 0;
if (deleted) {
// 同步 Redis 缓存
areaDeviceService.evictConfigCache(existing.getAreaId(), existing.getRelationType());
}
return deleted;
}
/**
* 批量查询设备信息,返回 deviceId → DTO 映射
*/
private Map<Long, IotDeviceSimpleRespDTO> batchGetDeviceMap(Set<Long> deviceIds) {
if (deviceIds.isEmpty()) {
return Collections.emptyMap();
}
try {
CommonResult<List<IotDeviceSimpleRespDTO>> result = iotDeviceQueryApi.batchGetDevices(deviceIds);
if (result != null && result.getData() != null) {
return result.getData().stream()
.collect(Collectors.toMap(IotDeviceSimpleRespDTO::getId, Function.identity()));
}
} catch (Exception e) {
log.warn("[batchGetDeviceMap] 批量查询设备信息失败: deviceIds={}, error={}", deviceIds, e.getMessage());
}
return Collections.emptyMap();
}
/**
* 获取单个设备信息,失败则抛出异常阻断操作
*/
private IotDeviceSimpleRespDTO getDeviceOrThrow(Long deviceId) {
try {
CommonResult<IotDeviceSimpleRespDTO> result = iotDeviceQueryApi.getDevice(deviceId);
if (result != null && result.getData() != null) {
return result.getData();
}
throw ServiceExceptionUtil.exception(ErrorCodeConstants.DEVICE_NOT_FOUND);
} catch (com.viewsh.framework.common.exception.ServiceException e) {
throw e;
} catch (Exception e) {
log.error("[getDeviceOrThrow] 调用 IoT 接口获取设备信息失败: deviceId={}, error={}",
deviceId, e.getMessage());
throw ServiceExceptionUtil.exception(ErrorCodeConstants.IOT_SERVICE_UNAVAILABLE);
}
}
/**
* 转换为响应 VO使用预查询的设备信息避免 N+1
*/
private AreaDeviceRelationRespVO convertToRespVO(OpsAreaDeviceRelationDO relation,
IotDeviceSimpleRespDTO device) {
AreaDeviceRelationRespVO respVO = BeanUtil.copyProperties(relation, AreaDeviceRelationRespVO.class);
if (device != null) {
respVO.setDeviceName(device.getDeviceName());
respVO.setProductName(device.getProductName());
respVO.setNickname(device.getNickname());
respVO.setSerialNumber(device.getSerialNumber());
respVO.setDeviceState(device.getState());
respVO.setDeviceType(device.getDeviceType());
}
return respVO;
}

View File

@@ -1,21 +1,26 @@
package com.viewsh.module.ops.service.queue;
import com.viewsh.module.ops.api.queue.OrderQueueDTO;
import com.viewsh.module.ops.api.queue.OrderQueueService;
import com.viewsh.module.ops.dal.dataobject.queue.OpsOrderQueueDO;
import com.viewsh.module.ops.dal.mysql.queue.OpsOrderQueueMapper;
import com.viewsh.module.ops.api.queue.OrderQueueDTO;
import com.viewsh.module.ops.api.queue.OrderQueueService;
import com.viewsh.module.ops.dal.dataobject.area.OpsBusAreaDO;
import com.viewsh.module.ops.dal.dataobject.queue.OpsOrderQueueDO;
import com.viewsh.module.ops.dal.dataobject.workorder.OpsOrderDO;
import com.viewsh.module.ops.dal.mysql.area.OpsBusAreaMapper;
import com.viewsh.module.ops.dal.mysql.queue.OpsOrderQueueMapper;
import com.viewsh.module.ops.dal.mysql.workorder.OpsOrderMapper;
import com.viewsh.module.ops.enums.OrderQueueStatusEnum;
import com.viewsh.module.ops.enums.PriorityEnum;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeanUtils;
import org.springframework.context.annotation.Primary;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.beans.BeanUtils;
import org.springframework.context.annotation.Primary;
import org.springframework.stereotype.Service;
import org.springframework.transaction.support.TransactionSynchronization;
import org.springframework.transaction.support.TransactionSynchronizationManager;
import org.springframework.transaction.annotation.Transactional;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.*;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
@@ -35,27 +40,27 @@ import java.util.stream.Collectors;
@Primary
public class OrderQueueServiceEnhanced implements OrderQueueService {
/**
* Score 计算公式:优先级分数 + 时间戳
* 优先级分P0=0, P1=1000000, P2=2000000, P3=3000000
* 时间戳:秒级时间戳
* 结果:优先级高的排在前面,同优先级按时间排序
*/
private static final Map<Integer, Long> PRIORITY_SCORES = Map.of(
0, 0L, // P0: 0
1, 1000000L, // P1: 1,000,000
2, 2000000L, // P2: 2,000,000
3, 3000000L // P3: 3,000,000
);
@Resource
private OpsOrderQueueMapper orderQueueMapper;
@Resource
private RedisOrderQueueService redisQueueService;
@Resource
private QueueSyncService queueSyncService;
/**
* 队列总分由 QueueScoreCalculator 统一计算:
* 优先级分 + 楼层差分 - 等待老化分。
*/
@Resource
private OpsOrderQueueMapper orderQueueMapper;
@Resource
private OpsOrderMapper orderMapper;
@Resource
private OpsBusAreaMapper areaMapper;
@Resource
private RedisOrderQueueService redisQueueService;
@Resource
private QueueSyncService queueSyncService;
@Resource
private QueueScoreCalculator queueScoreCalculator;
@Override
@Transactional(rollbackFor = Exception.class)
@@ -69,7 +74,11 @@ public class OrderQueueServiceEnhanced implements OrderQueueService {
// 2. 计算队列分数
LocalDateTime now = LocalDateTime.now();
double queueScore = calculateQueueScore(priority.getPriority(), now);
double queueScore = queueScoreCalculator.calculate(QueueScoreContext.builder()
.priority(priority.getPriority())
.enqueueTime(now)
.now(now)
.build()).getTotalScore();
// 3. 创建队列记录MySQL
OpsOrderQueueDO queueDO = OpsOrderQueueDO.builder()
@@ -361,22 +370,20 @@ public class OrderQueueServiceEnhanced implements OrderQueueService {
queueDO.setQueueIndex(calculateNextQueueIndex(newPriority));
// 重新计算队列分数(使用原入队时间保持时间戳不变)
LocalDateTime enqueueTime = queueDO.getEnqueueTime() != null ? queueDO.getEnqueueTime() : LocalDateTime.now();
double newQueueScore = calculateQueueScore(newPriority.getPriority(), enqueueTime);
double newQueueScore = queueScoreCalculator.calculate(QueueScoreContext.builder()
.priority(newPriority.getPriority())
.enqueueTime(enqueueTime)
.now(LocalDateTime.now())
.build()).getTotalScore();
queueDO.setQueueScore(newQueueScore);
queueDO.setEventMessage("优先级调整: " + oldPriority + " -> " + newPriority + ", 原因: " + reason);
int updated = orderQueueMapper.updateById(queueDO);
// 异步更新 Redis
if (updated > 0) {
CompletableFuture.runAsync(() -> {
try {
redisQueueService.updatePriority(queueId, newPriority.getPriority());
} catch (Exception e) {
log.error("Redis 更新优先级失败: queueId={}", queueId, e);
}
});
}
// 事务提交后重算并同步 Redis避免在事务未提交前读到旧数据
if (updated > 0) {
triggerQueueRebuildAfterCommit(queueDO.getUserId(), null);
}
log.info("优先级已调整: queueId={}, opsOrderId={}, oldPriority={}, newPriority={}, reason={}, newScore={}",
queueId, queueDO.getOpsOrderId(), oldPriority, newPriority, reason, newQueueScore);
@@ -487,38 +494,58 @@ public class OrderQueueServiceEnhanced implements OrderQueueService {
}
@Override
public List<OrderQueueDTO> getWaitingTasksByUserIdFromDb(Long userId) {
// 直接从 MySQL 获取所有任务
List<OpsOrderQueueDO> mysqlList = orderQueueMapper.selectListByUserId(userId);
List<OrderQueueDTO> allTasks = convertToDTO(mysqlList);
// 过滤出 WAITING 状态的任务,并按队列分数排序
return filterAndSortWaitingTasks(allTasks);
}
private List<OrderQueueDTO> filterAndSortWaitingTasks(List<OrderQueueDTO> allTasks) {
return allTasks.stream()
.filter(task -> OrderQueueStatusEnum.WAITING.getStatus().equals(task.getQueueStatus()))
.sorted((a, b) -> {
// 优先使用队列分数 score 排序(已在入队时计算好)
if (a.getQueueScore() != null && b.getQueueScore() != null) {
return Double.compare(a.getQueueScore(), b.getQueueScore());
}
// 兜底:如果 score 为空,则按优先级+时间排序
// 按优先级排序P0 > P1 > P2 > P3
int priorityCompare = Integer.compare(
b.getPriority() != null ? b.getPriority() : 999,
a.getPriority() != null ? a.getPriority() : 999
);
if (priorityCompare != 0) {
return priorityCompare;
}
// 优先级相同,按入队时间排序
return a.getEnqueueTime().compareTo(b.getEnqueueTime());
})
.collect(Collectors.toList());
}
public List<OrderQueueDTO> getWaitingTasksByUserIdFromDb(Long userId) {
return rebuildWaitingTasksByUserId(userId, null);
}
@Override
@Transactional(rollbackFor = Exception.class)
public List<OrderQueueDTO> rebuildWaitingTasksByUserId(Long userId, Long fallbackAreaId) {
List<OpsOrderQueueDO> waitingQueues = orderQueueMapper.selectListByUserIdAndStatus(
userId, OrderQueueStatusEnum.WAITING.getStatus());
if (waitingQueues == null || waitingQueues.isEmpty()) {
return Collections.emptyList();
}
Long baselineAreaId = resolveBaselineAreaId(userId, fallbackAreaId);
Integer baseFloorNo = resolveFloorNo(baselineAreaId);
LocalDateTime now = LocalDateTime.now();
List<OrderQueueDTO> rebuiltTasks = new ArrayList<>(waitingQueues.size());
for (OpsOrderQueueDO queueDO : waitingQueues) {
OrderQueueDTO dto = convertToDTO(queueDO);
Integer targetFloorNo = resolveFloorNo(resolveOrderAreaId(queueDO.getOpsOrderId()));
QueueScoreResult result = queueScoreCalculator.calculate(QueueScoreContext.builder()
.priority(queueDO.getPriority())
.baseFloorNo(baseFloorNo)
.targetFloorNo(targetFloorNo)
.enqueueTime(queueDO.getEnqueueTime())
.now(now)
.build());
queueDO.setQueueScore(result.getTotalScore());
orderQueueMapper.updateById(queueDO);
dto.setQueueScore(result.getTotalScore());
dto.setBaseFloorNo(result.getBaseFloorNo());
dto.setTargetFloorNo(result.getTargetFloorNo());
dto.setFloorDiff(result.getFloorDiff());
dto.setWaitMinutes(result.getWaitMinutes());
dto.setScoreUpdateTime(now);
rebuiltTasks.add(dto);
}
rebuiltTasks.sort(this::compareByDynamicScore);
syncUserQueueToRedis(userId, rebuiltTasks);
return rebuiltTasks;
}
private List<OrderQueueDTO> filterAndSortWaitingTasks(List<OrderQueueDTO> allTasks) {
return allTasks.stream()
.filter(task -> OrderQueueStatusEnum.WAITING.getStatus().equals(task.getQueueStatus()))
.sorted(this::compareByDynamicScore)
.collect(Collectors.toList());
}
@Override
public List<OrderQueueDTO> getInterruptedTasksByUserId(Long userId) {
@@ -620,32 +647,9 @@ public class OrderQueueServiceEnhanced implements OrderQueueService {
// ========== 私有方法 ==========
/**
* 计算队列分数(用于排序)
* 公式:优先级分数 + 时间戳
*
* @param priority 优先级0=P0, 1=P1, 2=P2, 3=P3
* @param enqueueTime 入队时间
* @return 队列分数
*/
private double calculateQueueScore(Integer priority, LocalDateTime enqueueTime) {
// 获取优先级分数
long priorityScore = PRIORITY_SCORES.getOrDefault(priority, 3000000L);
// 计算时间戳(秒级)
long timestamp;
if (enqueueTime != null) {
timestamp = enqueueTime.atZone(ZoneId.systemDefault()).toEpochSecond();
} else {
timestamp = System.currentTimeMillis() / 1000;
}
return priorityScore + timestamp;
}
/**
* 计算下一个队列顺序号
*/
/**
* 计算下一个队列顺序号
*/
private Integer calculateNextQueueIndex(PriorityEnum priority) {
// TODO: 查询当前优先级的最大 queueIndex然后 +1
// 这里简化处理,返回默认值
@@ -709,9 +713,106 @@ public class OrderQueueServiceEnhanced implements OrderQueueService {
/**
* 批量转换为 DTO
*/
private List<OrderQueueDTO> convertToDTO(List<OpsOrderQueueDO> list) {
return list.stream()
.map(this::convertToDTO)
.collect(Collectors.toList());
}
}
private List<OrderQueueDTO> convertToDTO(List<OpsOrderQueueDO> list) {
return list.stream()
.map(this::convertToDTO)
.collect(Collectors.toList());
}
private int compareByDynamicScore(OrderQueueDTO a, OrderQueueDTO b) {
int scoreCompare = Double.compare(
a.getQueueScore() != null ? a.getQueueScore() : Double.MAX_VALUE,
b.getQueueScore() != null ? b.getQueueScore() : Double.MAX_VALUE
);
if (scoreCompare != 0) {
return scoreCompare;
}
if (a.getEnqueueTime() != null && b.getEnqueueTime() != null) {
int enqueueCompare = a.getEnqueueTime().compareTo(b.getEnqueueTime());
if (enqueueCompare != 0) {
return enqueueCompare;
}
}
return Long.compare(
a.getId() != null ? a.getId() : Long.MAX_VALUE,
b.getId() != null ? b.getId() : Long.MAX_VALUE
);
}
private Long resolveBaselineAreaId(Long userId, Long fallbackAreaId) {
OpsOrderQueueDO processingQueue = orderQueueMapper.selectCurrentExecutingByUserId(userId);
if (processingQueue != null) {
Long processingAreaId = resolveOrderAreaId(processingQueue.getOpsOrderId());
if (processingAreaId != null) {
return processingAreaId;
}
}
return fallbackAreaId;
}
private Long resolveOrderAreaId(Long orderId) {
OpsOrderDO order = orderMapper.selectById(orderId);
return order != null ? order.getAreaId() : null;
}
private Integer resolveFloorNo(Long areaId) {
if (areaId == null) {
return null;
}
OpsBusAreaDO area = areaMapper.selectById(areaId);
return area != null ? area.getFloorNo() : null;
}
private void syncUserQueueToRedis(Long userId, List<OrderQueueDTO> rebuiltWaitingTasks) {
List<OpsOrderQueueDO> queues = orderQueueMapper.selectListByUserId(userId);
if (queues == null || queues.isEmpty()) {
redisQueueService.clearQueue(userId);
return;
}
Map<Long, OrderQueueDTO> rebuiltTaskMap = rebuiltWaitingTasks == null
? Collections.emptyMap()
: rebuiltWaitingTasks.stream()
.filter(dto -> dto.getId() != null)
.collect(Collectors.toMap(OrderQueueDTO::getId, dto -> dto));
List<OrderQueueDTO> queueDTOs = queues.stream()
.map(this::convertToDTO)
.map(dto -> rebuiltTaskMap.getOrDefault(dto.getId(), dto))
.sorted((a, b) -> {
if (Objects.equals(a.getUserId(), b.getUserId())) {
return compareByDynamicScore(a, b);
}
return Long.compare(
a.getUserId() != null ? a.getUserId() : Long.MAX_VALUE,
b.getUserId() != null ? b.getUserId() : Long.MAX_VALUE
);
})
.collect(Collectors.toList());
redisQueueService.clearQueue(userId);
redisQueueService.batchEnqueue(queueDTOs);
}
private void triggerQueueRebuildAfterCommit(Long userId, Long fallbackAreaId) {
Runnable rebuildAction = () -> {
try {
rebuildWaitingTasksByUserId(userId, fallbackAreaId);
} catch (Exception e) {
log.error("等待队列重算失败: userId={}", userId, e);
}
};
if (!TransactionSynchronizationManager.isSynchronizationActive()) {
rebuildAction.run();
return;
}
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() {
@Override
public void afterCommit() {
rebuildAction.run();
}
});
}
}

View File

@@ -0,0 +1,49 @@
package com.viewsh.module.ops.service.queue;
import org.springframework.stereotype.Component;
import java.time.Duration;
import java.time.LocalDateTime;
@Component
public class QueueScoreCalculator {
static final int PRIORITY_WEIGHT = 1500;
static final int FLOOR_WEIGHT = 20;
static final int AGING_WEIGHT = 5;
static final int MAX_FLOOR_DIFF = 10;
static final int MAX_AGING_MINUTES = 240;
public QueueScoreResult calculate(QueueScoreContext context) {
LocalDateTime now = context.getNow() != null ? context.getNow() : LocalDateTime.now();
int priorityRank = context.getPriority() != null ? context.getPriority() : 3;
Integer baseFloorNo = context.getBaseFloorNo();
Integer targetFloorNo = context.getTargetFloorNo();
Integer floorDiff = null;
int floorDiffScore = 0;
if (baseFloorNo != null && targetFloorNo != null) {
floorDiff = Math.abs(targetFloorNo - baseFloorNo);
floorDiffScore = Math.min(floorDiff, MAX_FLOOR_DIFF) * FLOOR_WEIGHT;
} else if (baseFloorNo != null) {
floorDiffScore = MAX_FLOOR_DIFF * FLOOR_WEIGHT;
}
long waitMinutes = 0;
if (context.getEnqueueTime() != null) {
waitMinutes = Math.max(0, Duration.between(context.getEnqueueTime(), now).toMinutes());
}
long agingScore = (long) Math.min(waitMinutes, MAX_AGING_MINUTES) * AGING_WEIGHT;
long priorityScore = (long) priorityRank * PRIORITY_WEIGHT;
double totalScore = priorityScore + floorDiffScore - agingScore;
return QueueScoreResult.builder()
.totalScore(totalScore)
.baseFloorNo(baseFloorNo)
.targetFloorNo(targetFloorNo)
.floorDiff(floorDiff)
.waitMinutes(waitMinutes)
.build();
}
}

View File

@@ -0,0 +1,21 @@
package com.viewsh.module.ops.service.queue;
import lombok.Builder;
import lombok.Data;
import java.time.LocalDateTime;
@Data
@Builder
public class QueueScoreContext {
private Integer priority;
private Integer baseFloorNo;
private Integer targetFloorNo;
private LocalDateTime enqueueTime;
private LocalDateTime now;
}

View File

@@ -0,0 +1,19 @@
package com.viewsh.module.ops.service.queue;
import lombok.Builder;
import lombok.Data;
@Data
@Builder
public class QueueScoreResult {
private double totalScore;
private Integer baseFloorNo;
private Integer targetFloorNo;
private Integer floorDiff;
private long waitMinutes;
}

View File

@@ -232,6 +232,7 @@ public class QueueSyncService {
dto.setUserId(queueDO.getUserId());
dto.setQueueIndex(queueDO.getQueueIndex());
dto.setPriority(queueDO.getPriority());
dto.setQueueScore(queueDO.getQueueScore());
dto.setQueueStatus(queueDO.getQueueStatus());
dto.setEnqueueTime(queueDO.getEnqueueTime());
dto.setDequeueTime(queueDO.getDequeueTime());

View File

@@ -9,7 +9,6 @@ import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.stereotype.Service;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@@ -27,27 +26,14 @@ public class RedisOrderQueueServiceImpl implements RedisOrderQueueService {
@Resource
private StringRedisTemplate stringRedisTemplate;
/**
* Score 计算公式:优先级分数 + 时间戳
* 优先级分数P0=0, P1=1000000, P2=2000000, P3=3000000
* 时间戳:毫秒级时间戳
* 结果:优先级高的排在前面,同优先级按时间排序
*/
private static final Map<Integer, Long> PRIORITY_SCORES = Map.of(
0, 0L, // P0: 0
1, 1000000L, // P1: 1,000,000
2, 2000000L, // P2: 2,000,000
3, 3000000L // P3: 3,000,000
);
@Override
public boolean enqueue(OrderQueueDTO dto) {
try {
String queueKey = QUEUE_KEY_PREFIX + dto.getUserId();
String infoKey = INFO_KEY_PREFIX + dto.getId();
// 1. 计算分数(优先级 + 时间戳)
double score = calculateScore(dto.getPriority(), dto.getEnqueueTime());
// Redis 仅持久化服务层已计算好的最终总分,不再本地兜底重算
double score = requireQueueScore(dto, "enqueue");
dto.setQueueScore(score);
// 2. 添加到 Sorted Set使用 queueId 作为 member而非 JSON
@@ -83,8 +69,8 @@ public class RedisOrderQueueServiceImpl implements RedisOrderQueueService {
byte[] queueKey = (QUEUE_KEY_PREFIX + dto.getUserId()).getBytes();
byte[] infoKey = (INFO_KEY_PREFIX + dto.getId()).getBytes();
// 计算分数并设置到 DTO
double score = calculateScore(dto.getPriority(), dto.getEnqueueTime());
// Redis 仅持久化服务层已计算好的最终总分,不再本地兜底重算
double score = requireQueueScore(dto, "batchEnqueue");
dto.setQueueScore(score);
// 添加到 Sorted Set使用 queueId 作为 member
@@ -214,9 +200,17 @@ public class RedisOrderQueueServiceImpl implements RedisOrderQueueService {
public long clearQueue(Long cleanerId) {
try {
String queueKey = QUEUE_KEY_PREFIX + cleanerId;
Set<String> queueIds = stringRedisTemplate.opsForZSet().range(queueKey, 0, -1);
if (queueIds != null && !queueIds.isEmpty()) {
List<String> infoKeys = queueIds.stream()
.map(queueId -> INFO_KEY_PREFIX + queueId)
.collect(Collectors.toList());
stringRedisTemplate.delete(infoKeys);
}
stringRedisTemplate.delete(queueKey);
log.info("Redis 清空队列成功: cleanerId={}", cleanerId);
return 0; // Redis 不返回删除数量
long removedCount = queueIds != null ? queueIds.size() : 0;
log.info("Redis 清空队列成功: cleanerId={}, removedCount={}", cleanerId, removedCount);
return removedCount;
} catch (Exception e) {
log.error("Redis 清空队列失败: cleanerId={}", cleanerId, e);
return 0;
@@ -326,7 +320,7 @@ public class RedisOrderQueueServiceImpl implements RedisOrderQueueService {
String queueKey = QUEUE_KEY_PREFIX + dto.getUserId();
dto.setPriority(newPriority);
double newScore = calculateScore(newPriority, dto.getEnqueueTime());
double newScore = requireQueueScore(dto, "updatePriority");
// 使用 Lua 脚本原子性更新 Hash 和 Sorted Set
String script =
@@ -451,7 +445,15 @@ public class RedisOrderQueueServiceImpl implements RedisOrderQueueService {
if (opsOrderIdObj != null) {
Long opsOrderId = Long.parseLong(opsOrderIdObj.toString());
if (opsOrderId.equals(orderId)) {
return mapToDto(infoMap);
OrderQueueDTO dto = mapToDto(infoMap);
if (dto == null || dto.getUserId() == null || dto.getId() == null) {
continue;
}
String queueKey = QUEUE_KEY_PREFIX + dto.getUserId();
Double score = stringRedisTemplate.opsForZSet().score(queueKey, dto.getId().toString());
if (score != null) {
return dto;
}
}
}
}
@@ -532,23 +534,6 @@ public class RedisOrderQueueServiceImpl implements RedisOrderQueueService {
// ========== 私有方法 ==========
/**
* 计算分数(优先级 + 时间戳)
*/
private double calculateScore(Integer priority, LocalDateTime enqueueTime) {
long priorityScore = PRIORITY_SCORES.getOrDefault(priority, 3000000L);
long timestamp;
if (enqueueTime != null) {
timestamp = enqueueTime
.atZone(ZoneId.systemDefault())
.toEpochSecond();
} else {
timestamp = System.currentTimeMillis() / 1000;
}
return priorityScore + timestamp;
}
/**
* 将 DTO 转换为 Map用于 Hash 存储,所有值显式转 String确保跨路径序列化一致
*/
@@ -566,6 +551,21 @@ public class RedisOrderQueueServiceImpl implements RedisOrderQueueService {
if (dto.getEnqueueTime() != null) {
map.put("enqueueTime", dto.getEnqueueTime().toString());
}
if (dto.getBaseFloorNo() != null) {
map.put("baseFloorNo", String.valueOf(dto.getBaseFloorNo()));
}
if (dto.getTargetFloorNo() != null) {
map.put("targetFloorNo", String.valueOf(dto.getTargetFloorNo()));
}
if (dto.getFloorDiff() != null) {
map.put("floorDiff", String.valueOf(dto.getFloorDiff()));
}
if (dto.getWaitMinutes() != null) {
map.put("waitMinutes", String.valueOf(dto.getWaitMinutes()));
}
if (dto.getScoreUpdateTime() != null) {
map.put("scoreUpdateTime", dto.getScoreUpdateTime().toString());
}
return map;
}
@@ -587,6 +587,21 @@ public class RedisOrderQueueServiceImpl implements RedisOrderQueueService {
if (dto.getEnqueueTime() != null) {
map.put("enqueueTime".getBytes(), dto.getEnqueueTime().toString().getBytes());
}
if (dto.getBaseFloorNo() != null) {
map.put("baseFloorNo".getBytes(), String.valueOf(dto.getBaseFloorNo()).getBytes());
}
if (dto.getTargetFloorNo() != null) {
map.put("targetFloorNo".getBytes(), String.valueOf(dto.getTargetFloorNo()).getBytes());
}
if (dto.getFloorDiff() != null) {
map.put("floorDiff".getBytes(), String.valueOf(dto.getFloorDiff()).getBytes());
}
if (dto.getWaitMinutes() != null) {
map.put("waitMinutes".getBytes(), String.valueOf(dto.getWaitMinutes()).getBytes());
}
if (dto.getScoreUpdateTime() != null) {
map.put("scoreUpdateTime".getBytes(), dto.getScoreUpdateTime().toString().getBytes());
}
return map;
}
@@ -618,14 +633,9 @@ public class RedisOrderQueueServiceImpl implements RedisOrderQueueService {
if (queueScoreObj != null) {
dto.setQueueScore(Double.parseDouble(queueScoreObj.toString()));
} else {
// 如果没有存储,则根据 priority 和 enqueueTime 计算
Integer priority = dto.getPriority();
LocalDateTime enqueueTime = null;
Object enqueueTimeObj = map.get("enqueueTime");
if (enqueueTimeObj != null) {
enqueueTime = LocalDateTime.parse(enqueueTimeObj.toString());
}
dto.setQueueScore(calculateScore(priority, enqueueTime));
// 历史数据兼容:没有总分时将其视为最低优先级,避免旧模型再次参与排序
log.warn("Redis 队列记录缺少 queueScore按最低优先级处理: queueId={}", dto.getId());
dto.setQueueScore(Double.MAX_VALUE);
}
// 字符串转换为 LocalDateTime
@@ -633,6 +643,26 @@ public class RedisOrderQueueServiceImpl implements RedisOrderQueueService {
if (enqueueTimeObj != null) {
dto.setEnqueueTime(LocalDateTime.parse(enqueueTimeObj.toString()));
}
Object baseFloorNoObj = map.get("baseFloorNo");
if (baseFloorNoObj != null) {
dto.setBaseFloorNo(Integer.parseInt(baseFloorNoObj.toString()));
}
Object targetFloorNoObj = map.get("targetFloorNo");
if (targetFloorNoObj != null) {
dto.setTargetFloorNo(Integer.parseInt(targetFloorNoObj.toString()));
}
Object floorDiffObj = map.get("floorDiff");
if (floorDiffObj != null) {
dto.setFloorDiff(Integer.parseInt(floorDiffObj.toString()));
}
Object waitMinutesObj = map.get("waitMinutes");
if (waitMinutesObj != null) {
dto.setWaitMinutes(Long.parseLong(waitMinutesObj.toString()));
}
Object scoreUpdateTimeObj = map.get("scoreUpdateTime");
if (scoreUpdateTimeObj != null) {
dto.setScoreUpdateTime(LocalDateTime.parse(scoreUpdateTimeObj.toString()));
}
return dto;
} catch (Exception e) {
@@ -640,4 +670,11 @@ public class RedisOrderQueueServiceImpl implements RedisOrderQueueService {
return null;
}
}
private double requireQueueScore(OrderQueueDTO dto, String operation) {
if (dto == null || dto.getQueueScore() == null) {
throw new IllegalArgumentException("queueScore is required for Redis " + operation);
}
return dto.getQueueScore();
}
}

View File

@@ -0,0 +1,103 @@
package com.viewsh.module.ops.service.queue;
import com.viewsh.module.ops.api.queue.OrderQueueDTO;
import com.viewsh.module.ops.dal.dataobject.area.OpsBusAreaDO;
import com.viewsh.module.ops.dal.dataobject.queue.OpsOrderQueueDO;
import com.viewsh.module.ops.dal.dataobject.workorder.OpsOrderDO;
import com.viewsh.module.ops.dal.mysql.area.OpsBusAreaMapper;
import com.viewsh.module.ops.dal.mysql.queue.OpsOrderQueueMapper;
import com.viewsh.module.ops.dal.mysql.workorder.OpsOrderMapper;
import com.viewsh.module.ops.enums.OrderQueueStatusEnum;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.Spy;
import org.mockito.junit.jupiter.MockitoExtension;
import java.time.LocalDateTime;
import java.util.List;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
class OrderQueueServiceEnhancedTest {
@Mock
private OpsOrderQueueMapper orderQueueMapper;
@Mock
private OpsOrderMapper orderMapper;
@Mock
private OpsBusAreaMapper areaMapper;
@Mock
private RedisOrderQueueService redisQueueService;
@Mock
private QueueSyncService queueSyncService;
@Spy
private QueueScoreCalculator queueScoreCalculator = new QueueScoreCalculator();
@InjectMocks
private OrderQueueServiceEnhanced orderQueueService;
@Test
void shouldRebuildWaitingTasksAndAvoidStarvation() {
LocalDateTime now = LocalDateTime.now();
Long userId = 2001L;
OpsOrderQueueDO olderFarTask = OpsOrderQueueDO.builder()
.id(11L)
.userId(userId)
.opsOrderId(101L)
.priority(1)
.queueScore(0D)
.queueStatus(OrderQueueStatusEnum.WAITING.getStatus())
.enqueueTime(now.minusMinutes(80))
.build();
OpsOrderQueueDO newerNearTask = OpsOrderQueueDO.builder()
.id(12L)
.userId(userId)
.opsOrderId(102L)
.priority(1)
.queueScore(0D)
.queueStatus(OrderQueueStatusEnum.WAITING.getStatus())
.enqueueTime(now.minusMinutes(5))
.build();
OpsOrderQueueDO currentTask = OpsOrderQueueDO.builder()
.id(13L)
.userId(userId)
.opsOrderId(900L)
.queueStatus(OrderQueueStatusEnum.PROCESSING.getStatus())
.build();
when(orderQueueMapper.selectListByUserIdAndStatus(userId, OrderQueueStatusEnum.WAITING.getStatus()))
.thenReturn(List.of(olderFarTask, newerNearTask));
when(orderQueueMapper.selectCurrentExecutingByUserId(userId)).thenReturn(currentTask);
when(orderQueueMapper.selectListByUserId(userId)).thenReturn(List.of(olderFarTask, newerNearTask, currentTask));
when(orderMapper.selectById(900L)).thenReturn(OpsOrderDO.builder().id(900L).areaId(501L).build());
when(orderMapper.selectById(101L)).thenReturn(OpsOrderDO.builder().id(101L).areaId(503L).build());
when(orderMapper.selectById(102L)).thenReturn(OpsOrderDO.builder().id(102L).areaId(502L).build());
when(areaMapper.selectById(501L)).thenReturn(OpsBusAreaDO.builder().id(501L).floorNo(5).build());
when(areaMapper.selectById(502L)).thenReturn(OpsBusAreaDO.builder().id(502L).floorNo(6).build());
when(areaMapper.selectById(503L)).thenReturn(OpsBusAreaDO.builder().id(503L).floorNo(8).build());
when(orderQueueMapper.updateById(any(OpsOrderQueueDO.class))).thenReturn(1);
List<OrderQueueDTO> rebuiltTasks = orderQueueService.rebuildWaitingTasksByUserId(userId, null);
assertEquals(2, rebuiltTasks.size());
assertEquals(101L, rebuiltTasks.get(0).getOpsOrderId());
assertEquals(3, rebuiltTasks.get(0).getFloorDiff());
assertTrue(rebuiltTasks.get(0).getWaitMinutes() >= 79);
assertEquals(102L, rebuiltTasks.get(1).getOpsOrderId());
assertEquals(1, rebuiltTasks.get(1).getFloorDiff());
assertTrue(rebuiltTasks.get(0).getQueueScore() < rebuiltTasks.get(1).getQueueScore());
verify(orderQueueMapper, times(2)).updateById(any(OpsOrderQueueDO.class));
verify(redisQueueService).clearQueue(userId);
verify(redisQueueService).batchEnqueue(any());
}
}

View File

@@ -0,0 +1,73 @@
package com.viewsh.module.ops.service.queue;
import org.junit.jupiter.api.Test;
import java.time.LocalDateTime;
import static org.junit.jupiter.api.Assertions.assertTrue;
class QueueScoreCalculatorTest {
private final QueueScoreCalculator calculator = new QueueScoreCalculator();
@Test
void shouldPreferSmallerFloorDiffWhenPrioritySame() {
LocalDateTime now = LocalDateTime.now();
QueueScoreResult near = calculator.calculate(QueueScoreContext.builder()
.priority(1)
.baseFloorNo(5)
.targetFloorNo(6)
.enqueueTime(now.minusMinutes(5))
.now(now)
.build());
QueueScoreResult far = calculator.calculate(QueueScoreContext.builder()
.priority(1)
.baseFloorNo(5)
.targetFloorNo(9)
.enqueueTime(now.minusMinutes(5))
.now(now)
.build());
assertTrue(near.getTotalScore() < far.getTotalScore());
}
@Test
void shouldAllowOlderOrderToGainAgingAdvantage() {
LocalDateTime now = LocalDateTime.now();
QueueScoreResult older = calculator.calculate(QueueScoreContext.builder()
.priority(1)
.baseFloorNo(5)
.targetFloorNo(8)
.enqueueTime(now.minusMinutes(80))
.now(now)
.build());
QueueScoreResult newer = calculator.calculate(QueueScoreContext.builder()
.priority(1)
.baseFloorNo(5)
.targetFloorNo(6)
.enqueueTime(now.minusMinutes(5))
.now(now)
.build());
assertTrue(older.getTotalScore() < newer.getTotalScore());
}
@Test
void shouldDegradeGracefullyWhenBaseFloorMissing() {
LocalDateTime now = LocalDateTime.now();
QueueScoreResult result = calculator.calculate(QueueScoreContext.builder()
.priority(2)
.baseFloorNo(null)
.targetFloorNo(6)
.enqueueTime(now.minusMinutes(10))
.now(now)
.build());
assertTrue(result.getTotalScore() < QueueScoreCalculator.PRIORITY_WEIGHT * 2);
}
}

View File

@@ -0,0 +1,61 @@
package com.viewsh.module.ops.service.queue;
import com.viewsh.module.ops.api.queue.OrderQueueDTO;
import com.viewsh.module.ops.dal.dataobject.queue.OpsOrderQueueDO;
import com.viewsh.module.ops.dal.mysql.queue.OpsOrderQueueMapper;
import com.viewsh.module.ops.enums.OrderQueueStatusEnum;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentCaptor;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import java.time.LocalDateTime;
import java.util.List;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
class QueueSyncServiceTest {
@Mock
private OpsOrderQueueMapper orderQueueMapper;
@Mock
private RedisOrderQueueService redisQueueService;
@InjectMocks
private QueueSyncService queueSyncService;
@Test
void shouldCarryPersistedQueueScoreWhenSyncUserQueueToRedis() {
Long userId = 1001L;
OpsOrderQueueDO queueDO = OpsOrderQueueDO.builder()
.id(11L)
.opsOrderId(22L)
.userId(userId)
.queueIndex(1)
.priority(0)
.queueScore(-120D)
.queueStatus(OrderQueueStatusEnum.WAITING.getStatus())
.enqueueTime(LocalDateTime.now().minusMinutes(10))
.build();
when(orderQueueMapper.selectList(any())).thenReturn(List.of(queueDO));
when(redisQueueService.clearQueue(userId)).thenReturn(1L);
when(redisQueueService.batchEnqueue(any())).thenReturn(1L);
queueSyncService.syncUserQueueToRedis(userId);
ArgumentCaptor<List<OrderQueueDTO>> captor = ArgumentCaptor.forClass(List.class);
verify(redisQueueService).batchEnqueue(captor.capture());
List<OrderQueueDTO> syncedTasks = captor.getValue();
assertNotNull(syncedTasks);
assertEquals(1, syncedTasks.size());
assertEquals(-120D, syncedTasks.get(0).getQueueScore());
}
}