Compare commits
5 Commits
3bcdb4119f
...
57f32e56a9
| Author | SHA1 | Date | |
|---|---|---|---|
| 57f32e56a9 | |||
| af1e0c0989 | |||
| 713ae744ac | |||
| a9fd9313cc | |||
| 26c4ce07eb |
@@ -10,6 +10,7 @@ import org.springframework.cloud.openfeign.FeignClient;
|
||||
import org.springframework.web.bind.annotation.GetMapping;
|
||||
import org.springframework.web.bind.annotation.RequestParam;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
@@ -38,4 +39,9 @@ public interface IotDeviceQueryApi {
|
||||
@Parameter(name = "id", description = "设备ID", required = true)
|
||||
CommonResult<IotDeviceSimpleRespDTO> getDevice(@RequestParam("id") Long id);
|
||||
|
||||
@GetMapping(PREFIX + "/batch-get")
|
||||
@Operation(summary = "批量获取设备详情")
|
||||
@Parameter(name = "ids", description = "设备ID列表", required = true, example = "[1,2,3]")
|
||||
CommonResult<List<IotDeviceSimpleRespDTO>> batchGetDevices(@RequestParam("ids") Collection<Long> ids);
|
||||
|
||||
}
|
||||
|
||||
@@ -29,4 +29,16 @@ public class IotDeviceSimpleRespDTO {
|
||||
@Schema(description = "产品名称", example = "客流计数器")
|
||||
private String productName;
|
||||
|
||||
@Schema(description = "设备备注名称", example = "A栋3层客流计数器")
|
||||
private String nickname;
|
||||
|
||||
@Schema(description = "设备序列号", example = "SN20250101001")
|
||||
private String serialNumber;
|
||||
|
||||
@Schema(description = "设备状态(0-未激活 1-在线 2-离线)", example = "1")
|
||||
private Integer state;
|
||||
|
||||
@Schema(description = "设备类型", example = "10")
|
||||
private Integer deviceType;
|
||||
|
||||
}
|
||||
|
||||
@@ -2,9 +2,10 @@ package com.viewsh.module.iot.api.device;
|
||||
|
||||
import com.viewsh.framework.common.pojo.CommonResult;
|
||||
import com.viewsh.module.iot.api.device.dto.IotDeviceSimpleRespDTO;
|
||||
import com.viewsh.module.iot.controller.admin.device.vo.device.IotDeviceRespVO;
|
||||
import com.viewsh.module.iot.dal.dataobject.device.IotDeviceDO;
|
||||
import com.viewsh.module.iot.dal.dataobject.product.IotProductDO;
|
||||
import com.viewsh.module.iot.service.device.IotDeviceService;
|
||||
import com.viewsh.module.iot.service.product.IotProductService;
|
||||
import io.swagger.v3.oas.annotations.Operation;
|
||||
import jakarta.annotation.Resource;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
@@ -14,8 +15,8 @@ import org.springframework.web.bind.annotation.GetMapping;
|
||||
import org.springframework.web.bind.annotation.RequestParam;
|
||||
import org.springframework.web.bind.annotation.RestController;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static com.viewsh.framework.common.pojo.CommonResult.success;
|
||||
import static com.viewsh.module.iot.api.device.IotDeviceQueryApi.PREFIX;
|
||||
@@ -36,55 +37,74 @@ public class IotDeviceQueryApiImpl implements IotDeviceQueryApi {
|
||||
@Resource
|
||||
private IotDeviceService deviceService;
|
||||
|
||||
@Resource
|
||||
private IotProductService productService;
|
||||
|
||||
@Override
|
||||
@GetMapping(PREFIX + "/simple-list")
|
||||
@Operation(summary = "获取设备精简列表(按类型/产品筛选)")
|
||||
public CommonResult<List<IotDeviceSimpleRespDTO>> getDeviceSimpleList(
|
||||
@RequestParam(value = "deviceType", required = false) Integer deviceType,
|
||||
@RequestParam(value = "productId", required = false) Long productId) {
|
||||
try {
|
||||
List<IotDeviceDO> list = deviceService.getDeviceListByCondition(deviceType, productId);
|
||||
List<IotDeviceSimpleRespDTO> result = list.stream()
|
||||
.map(device -> {
|
||||
IotDeviceSimpleRespDTO dto = new IotDeviceSimpleRespDTO();
|
||||
dto.setId(device.getId());
|
||||
dto.setDeviceName(device.getDeviceName());
|
||||
dto.setProductId(device.getProductId());
|
||||
dto.setProductKey(device.getProductKey());
|
||||
// TODO: 从产品服务获取产品名称
|
||||
dto.setProductName("产品_" + device.getProductKey());
|
||||
return dto;
|
||||
})
|
||||
.collect(Collectors.toList());
|
||||
return success(result);
|
||||
} catch (Exception e) {
|
||||
log.error("[getDeviceSimpleList] 查询设备列表失败: deviceType={}, productId={}", deviceType, productId, e);
|
||||
return success(List.of());
|
||||
}
|
||||
List<IotDeviceDO> list = deviceService.getDeviceListByCondition(deviceType, productId);
|
||||
List<IotDeviceSimpleRespDTO> result = list.stream()
|
||||
.map(this::toSimpleDTO)
|
||||
.toList();
|
||||
return success(result);
|
||||
}
|
||||
|
||||
@Override
|
||||
@GetMapping(PREFIX + "/get")
|
||||
@Operation(summary = "获取设备详情")
|
||||
public CommonResult<IotDeviceSimpleRespDTO> getDevice(@RequestParam("id") Long id) {
|
||||
try {
|
||||
IotDeviceDO device = deviceService.getDevice(id);
|
||||
if (device == null) {
|
||||
return success(null);
|
||||
}
|
||||
|
||||
IotDeviceSimpleRespDTO dto = new IotDeviceSimpleRespDTO();
|
||||
dto.setId(device.getId());
|
||||
dto.setDeviceName(device.getDeviceName());
|
||||
dto.setProductId(device.getProductId());
|
||||
dto.setProductKey(device.getProductKey());
|
||||
dto.setProductName("产品_" + device.getProductKey());
|
||||
|
||||
return success(dto);
|
||||
} catch (Exception e) {
|
||||
log.error("[getDevice] 查询设备详情失败: id={}", id, e);
|
||||
IotDeviceDO device = deviceService.getDevice(id);
|
||||
if (device == null) {
|
||||
return success(null);
|
||||
}
|
||||
return success(toSimpleDTO(device));
|
||||
}
|
||||
|
||||
@Override
|
||||
@GetMapping(PREFIX + "/batch-get")
|
||||
@Operation(summary = "批量获取设备详情")
|
||||
public CommonResult<List<IotDeviceSimpleRespDTO>> batchGetDevices(
|
||||
@RequestParam("ids") Collection<Long> ids) {
|
||||
if (ids == null || ids.isEmpty()) {
|
||||
return success(List.of());
|
||||
}
|
||||
List<IotDeviceDO> devices = deviceService.getDeviceList(ids);
|
||||
List<IotDeviceSimpleRespDTO> result = devices.stream()
|
||||
.map(this::toSimpleDTO)
|
||||
.toList();
|
||||
return success(result);
|
||||
}
|
||||
|
||||
private IotDeviceSimpleRespDTO toSimpleDTO(IotDeviceDO device) {
|
||||
IotDeviceSimpleRespDTO dto = new IotDeviceSimpleRespDTO();
|
||||
dto.setId(device.getId());
|
||||
dto.setDeviceName(device.getDeviceName());
|
||||
dto.setProductId(device.getProductId());
|
||||
dto.setProductKey(device.getProductKey());
|
||||
dto.setNickname(device.getNickname());
|
||||
dto.setSerialNumber(device.getSerialNumber());
|
||||
dto.setState(device.getState());
|
||||
dto.setDeviceType(device.getDeviceType());
|
||||
// 从产品缓存获取产品名称
|
||||
dto.setProductName(resolveProductName(device.getProductId()));
|
||||
return dto;
|
||||
}
|
||||
|
||||
private String resolveProductName(Long productId) {
|
||||
if (productId == null) {
|
||||
return null;
|
||||
}
|
||||
try {
|
||||
IotProductDO product = productService.getProductFromCache(productId);
|
||||
return product != null ? product.getName() : null;
|
||||
} catch (Exception e) {
|
||||
log.warn("[resolveProductName] 获取产品名称失败: productId={}, error={}", productId, e.getMessage());
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -191,15 +191,16 @@ public class SignalLossRuleProcessor {
|
||||
long minValidWorkMillis = exitConfig.getMinValidWorkMinutes() * 60000L;
|
||||
|
||||
// 6. 分支处理:有效 vs 无效作业
|
||||
if (durationMs < minValidWorkMillis) {
|
||||
// 作业时长不足,抑制完成
|
||||
handleInvalidWork(deviceId, badgeDeviceKey, areaId,
|
||||
durationMs, minValidWorkMillis, exitConfig);
|
||||
} else {
|
||||
// 作业时长有效,触发完成
|
||||
handleTimeoutComplete(deviceId, badgeDeviceKey, areaId,
|
||||
durationMs, lastLossTime);
|
||||
}
|
||||
// TODO 暂时取消作业时长不足抑制自动完成的逻辑,所有情况均触发完成
|
||||
// if (durationMs < minValidWorkMillis) {
|
||||
// // 作业时长不足,抑制完成
|
||||
// handleInvalidWork(deviceId, badgeDeviceKey, areaId,
|
||||
// durationMs, minValidWorkMillis, exitConfig);
|
||||
// } else {
|
||||
// 作业时长有效,触发完成
|
||||
handleTimeoutComplete(deviceId, badgeDeviceKey, areaId,
|
||||
durationMs, lastLossTime);
|
||||
// }
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@@ -36,8 +36,8 @@ import java.util.concurrent.TimeUnit;
|
||||
* <p>
|
||||
* 客流触发逻辑(周期化):
|
||||
* 1. 无活跃工单 → 创建新工单 → 标记活跃 → 重置阈值
|
||||
* 2. 有未派发工单(PENDING/QUEUED) → 升级优先级一级 → 重置阈值
|
||||
* 3. 有已派发工单(DISPATCHED/CONFIRMED/ARRIVED) → 忽略 → 重置阈值
|
||||
* 2. 有排队中工单(PENDING/QUEUED) → 升级优先级一级 → 重置阈值
|
||||
* 3. 有已派发/已到达工单(DISPATCHED/CONFIRMED/ARRIVED) → 静默处理,不升级不创建 → 重置阈值
|
||||
* 4. 已是 P0 → 不升级,记录审计日志 → 重置阈值
|
||||
* <p>
|
||||
* RocketMQ 配置:
|
||||
@@ -69,9 +69,9 @@ public class CleanOrderCreateEventHandler implements RocketMQListener<String> {
|
||||
private static final int DEDUP_TTL_SECONDS = 300;
|
||||
|
||||
/**
|
||||
* 未派发状态集合(可升级优先级)
|
||||
* 可升级优先级的状态集合(仅排队中,尚未派发)
|
||||
*/
|
||||
private static final Set<String> UNDISPATCHED_STATUSES = Set.of(
|
||||
private static final Set<String> UPGRADABLE_STATUSES = Set.of(
|
||||
WorkOrderStatusEnum.PENDING.getStatus(),
|
||||
WorkOrderStatusEnum.QUEUED.getStatus()
|
||||
);
|
||||
@@ -162,8 +162,8 @@ public class CleanOrderCreateEventHandler implements RocketMQListener<String> {
|
||||
if (activeOrder != null) {
|
||||
String status = activeOrder.getStatus();
|
||||
|
||||
if (UNDISPATCHED_STATUSES.contains(status)) {
|
||||
// 未派发 → 升级优先级一级
|
||||
if (UPGRADABLE_STATUSES.contains(status)) {
|
||||
// 排队中 → 升级优先级一级
|
||||
PriorityEnum result = cleanOrderService.upgradeOneLevelPriority(
|
||||
activeOrder.getOrderId(), "客流持续达标自动升级");
|
||||
|
||||
@@ -179,9 +179,10 @@ public class CleanOrderCreateEventHandler implements RocketMQListener<String> {
|
||||
areaId, activeOrder.getOrderId());
|
||||
}
|
||||
} else {
|
||||
// 已派发(DISPATCHED/CONFIRMED/ARRIVED)→ 忽略
|
||||
log.info("[CleanOrderCreateEventHandler] 区域{}已有已派发工单{}(状态:{}),忽略本次客流触发",
|
||||
areaId, activeOrder.getOrderId(), status);
|
||||
// 已派发/已确认/已到达 → 保洁员已在处理中,静默忽略
|
||||
log.info("[CleanOrderCreateEventHandler] 区域{}保洁员已在处理中(状态:{}),客流触发静默忽略: orderId={}",
|
||||
areaId, status, activeOrder.getOrderId());
|
||||
recordArrivedSilentLog(event, activeOrder.getOrderId());
|
||||
}
|
||||
|
||||
// ★ 所有分支都重置阈值
|
||||
@@ -389,6 +390,33 @@ public class CleanOrderCreateEventHandler implements RocketMQListener<String> {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 记录已派发/已到达静默处理审计日志
|
||||
*/
|
||||
private void recordArrivedSilentLog(CleanOrderCreateEventDTO event, Long orderId) {
|
||||
try {
|
||||
Map<String, Object> extra = new HashMap<>();
|
||||
extra.put("eventId", event.getEventId());
|
||||
extra.put("areaId", event.getAreaId());
|
||||
extra.put("reason", "保洁员已在处理中,客流触发静默忽略");
|
||||
|
||||
eventLogRecorder.record(EventLogRecord.builder()
|
||||
.module("clean")
|
||||
.domain(EventDomain.TRAFFIC)
|
||||
.eventType("ARRIVED_SILENT_IGNORE")
|
||||
.message(String.format("保洁员已在处理中,客流触发静默忽略 [区域:%d]", event.getAreaId()))
|
||||
.targetId(orderId)
|
||||
.targetType("order")
|
||||
.deviceId(event.getTriggerDeviceId())
|
||||
.level(EventLevel.INFO)
|
||||
.payload(extra)
|
||||
.build());
|
||||
|
||||
} catch (Exception e) {
|
||||
log.warn("[CleanOrderCreateEventHandler] 记录静默处理日志失败: orderId={}", orderId, e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 确定事件域
|
||||
*/
|
||||
|
||||
@@ -1,5 +1,7 @@
|
||||
package com.viewsh.module.ops.environment.integration.listener;
|
||||
|
||||
import com.viewsh.module.iot.api.device.IotDeviceControlApi;
|
||||
import com.viewsh.module.iot.api.device.dto.ResetTrafficCounterReqDTO;
|
||||
import com.viewsh.module.ops.api.queue.OrderQueueService;
|
||||
import com.viewsh.module.ops.core.dispatch.DispatchEngine;
|
||||
import com.viewsh.module.ops.core.dispatch.model.DispatchResult;
|
||||
@@ -87,6 +89,9 @@ public class CleanOrderEventListener {
|
||||
@Resource
|
||||
private TrafficActiveOrderRedisDAO trafficActiveOrderRedisDAO;
|
||||
|
||||
@Resource
|
||||
private IotDeviceControlApi iotDeviceControlApi;
|
||||
|
||||
// ==================== 工单创建事件 ====================
|
||||
|
||||
/**
|
||||
@@ -184,7 +189,7 @@ public class CleanOrderEventListener {
|
||||
break;
|
||||
case COMPLETED:
|
||||
handleCompleted(event);
|
||||
clearTrafficActiveOrder(event);
|
||||
clearTrafficActiveOrderOnComplete(event);
|
||||
break;
|
||||
case CANCELLED:
|
||||
|
||||
@@ -733,22 +738,80 @@ public class CleanOrderEventListener {
|
||||
}
|
||||
|
||||
/**
|
||||
* 工单终态时,清除 Redis 中的活跃工单标记
|
||||
* 工单完成时,先重置 IoT 客流计数器,再清除活跃标记
|
||||
* <p>
|
||||
* 仅处理客流触发的工单。清除后下次客流达标将创建新工单(新周期)。
|
||||
* 顺序至关重要:先重置计数器确保设备归零,再清除活跃标记开放新周期。
|
||||
* 如果先清标记后重置,存在竞态窗口——已发出的 MQ 消息在标记清除后到达,
|
||||
* 此时计数器尚未归零,会误创建基于残留计数的工单。
|
||||
* <p>
|
||||
* 如果计数器重置失败,仍然清除活跃标记(降级处理),避免区域永远被锁死。
|
||||
*/
|
||||
private void clearTrafficActiveOrderOnComplete(OrderStateChangedEvent event) {
|
||||
try {
|
||||
OpsOrderDO order = opsOrderMapper.selectById(event.getOrderId());
|
||||
if (order != null && "IOT_TRAFFIC".equals(order.getTriggerSource()) && order.getAreaId() != null) {
|
||||
// 1. 先重置 IoT 客流计数器(阻塞等待结果)
|
||||
boolean resetOk = resetTrafficCounterOnComplete(order.getTriggerDeviceId(), order.getAreaId());
|
||||
|
||||
// 2. 再清除活跃标记
|
||||
trafficActiveOrderRedisDAO.removeActive(order.getAreaId());
|
||||
|
||||
if (resetOk) {
|
||||
log.info("[CleanOrderEventListener] 客流工单完成,计数器已重置,活跃标记已清除: areaId={}", order.getAreaId());
|
||||
} else {
|
||||
log.warn("[CleanOrderEventListener] 客流工单完成,计数器重置失败但活跃标记已清除(降级): areaId={}", order.getAreaId());
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.warn("[CleanOrderEventListener] 清除客流活跃工单标记失败: orderId={}", event.getOrderId(), e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 工单取消时,仅清除活跃标记,不重置计数器
|
||||
* <p>
|
||||
* 取消意味着区域未被清洁,客流计数应保留,以便尽快重新触发工单。
|
||||
*/
|
||||
private void clearTrafficActiveOrder(OrderStateChangedEvent event) {
|
||||
try {
|
||||
OpsOrderDO order = opsOrderMapper.selectById(event.getOrderId());
|
||||
if (order != null && "IOT_TRAFFIC".equals(order.getTriggerSource()) && order.getAreaId() != null) {
|
||||
trafficActiveOrderRedisDAO.removeActive(order.getAreaId());
|
||||
log.info("[CleanOrderEventListener] 客流工单周期结束,已清除区域{}活跃标记", order.getAreaId());
|
||||
log.info("[CleanOrderEventListener] 客流工单取消,已清除区域{}活跃标记(计数器保留)", order.getAreaId());
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.warn("[CleanOrderEventListener] 清除客流活跃工单标记失败: orderId={}", event.getOrderId(), e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 工单完成时重置客流计数器
|
||||
*
|
||||
* @return true 重置成功,false 重置失败或无设备ID
|
||||
*/
|
||||
private boolean resetTrafficCounterOnComplete(Long deviceId, Long areaId) {
|
||||
if (deviceId == null) {
|
||||
return false;
|
||||
}
|
||||
try {
|
||||
ResetTrafficCounterReqDTO reqDTO = ResetTrafficCounterReqDTO.builder()
|
||||
.deviceId(deviceId)
|
||||
.remark("工单完成后重置计数器,清除作业期间残留计数")
|
||||
.build();
|
||||
var result = iotDeviceControlApi.resetTrafficCounter(reqDTO);
|
||||
if (result.getData() != null && result.getData()) {
|
||||
log.info("[CleanOrderEventListener] 工单完成,计数器重置成功: deviceId={}, areaId={}", deviceId, areaId);
|
||||
return true;
|
||||
} else {
|
||||
log.warn("[CleanOrderEventListener] 工单完成,计数器重置失败: deviceId={}, areaId={}", deviceId, areaId);
|
||||
return false;
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.warn("[CleanOrderEventListener] 工单完成,计数器重置异常: deviceId={}", deviceId, e);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 记录暂停开始时间
|
||||
*/
|
||||
|
||||
@@ -245,13 +245,13 @@ public class CleanOrderServiceImpl implements CleanOrderService {
|
||||
if (queueDTO != null) {
|
||||
orderQueueService.adjustPriority(queueDTO.getId(), PriorityEnum.P0, reason);
|
||||
|
||||
// 5. 使用新的调度引擎处理P0紧急插队
|
||||
DispatchResult result = dispatchEngine.urgentInterrupt(orderId, queueDTO.getUserId());
|
||||
// 5. 重算等待队列,P0 不再打断当前任务
|
||||
orderQueueService.rebuildWaitingTasksByUserId(queueDTO.getUserId(), order.getAreaId());
|
||||
|
||||
// 6. 发送优先级升级通知
|
||||
cleanOrderEventListener.sendPriorityUpgradeNotification(queueDTO.getUserId(), order.getOrderCode(), orderId);
|
||||
|
||||
return result.isSuccess();
|
||||
return true;
|
||||
}
|
||||
|
||||
return true;
|
||||
@@ -289,11 +289,11 @@ public class CleanOrderServiceImpl implements CleanOrderService {
|
||||
if (queueDTO != null) {
|
||||
orderQueueService.adjustPriority(queueDTO.getId(), newPriority, reason);
|
||||
|
||||
// 6. 如果升级到 P0,触发紧急打断逻辑
|
||||
// 6. 如果升级到 P0,仅重算等待队列,不再触发打断
|
||||
if (newPriority == PriorityEnum.P0) {
|
||||
DispatchResult result = dispatchEngine.urgentInterrupt(orderId, queueDTO.getUserId());
|
||||
orderQueueService.rebuildWaitingTasksByUserId(queueDTO.getUserId(), order.getAreaId());
|
||||
cleanOrderEventListener.sendPriorityUpgradeNotification(queueDTO.getUserId(), order.getOrderCode(), orderId);
|
||||
log.warn("客流升级到P0,触发紧急打断: orderId={}, success={}", orderId, result.isSuccess());
|
||||
log.warn("客流升级到P0,已重算等待队列: orderId={}", orderId);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -23,13 +23,13 @@ import java.util.List;
|
||||
* <p>
|
||||
* 职责:怎么派单
|
||||
* <p>
|
||||
* 策略规则:
|
||||
* <ul>
|
||||
* <li>空闲无任务 → DIRECT_DISPATCH(直接派单)</li>
|
||||
* <li>空闲有等待 → PUSH_AND_ENQUEUE(推送等待+新任务入队)</li>
|
||||
* <li>忙碌且非P0 → ENQUEUE_ONLY(仅入队)</li>
|
||||
* <li>忙碌且P0 → INTERRUPT_AND_DISPATCH(打断并派单)</li>
|
||||
* </ul>
|
||||
* 策略规则:
|
||||
* <ul>
|
||||
* <li>空闲无任务 → DIRECT_DISPATCH(直接派单)</li>
|
||||
* <li>空闲有等待 → PUSH_AND_ENQUEUE(推送等待+新任务入队)</li>
|
||||
* <li>忙碌且非P0 → ENQUEUE_ONLY(仅入队)</li>
|
||||
* <li>忙碌且P0 → ENQUEUE_ONLY(不抢断,进入等待队列)</li>
|
||||
* </ul>
|
||||
*
|
||||
* @author lzh
|
||||
*/
|
||||
@@ -106,12 +106,12 @@ public class BadgeDeviceScheduleStrategy implements ScheduleStrategy {
|
||||
// P0紧急任务,需要打断
|
||||
if (assigneeStatus.getCurrentTaskCount() > 0) {
|
||||
Long currentOrderId = deviceStatus.getCurrentOpsOrderId();
|
||||
log.warn("决策: INTERRUPT_AND_DISPATCH - P0紧急任务打断当前任务: currentOrderId={}", currentOrderId);
|
||||
return DispatchDecision.interruptAndDispatch(currentOrderId);
|
||||
} else {
|
||||
log.info("决策: DIRECT_DISPATCH - P0紧急任务直接派单");
|
||||
return DispatchDecision.directDispatch();
|
||||
}
|
||||
log.info("决策: ENQUEUE_ONLY - P0工单不再打断当前任务,进入等待队列: currentOrderId={}", currentOrderId);
|
||||
return DispatchDecision.enqueueOnly();
|
||||
} else {
|
||||
log.info("决策: DIRECT_DISPATCH - P0工单在空闲状态下直接派发");
|
||||
return DispatchDecision.directDispatch();
|
||||
}
|
||||
} else {
|
||||
// 非紧急任务,设备忙碌,入队等待
|
||||
log.info("决策: ENQUEUE_ONLY - 设备忙碌,任务入队等待");
|
||||
@@ -133,15 +133,14 @@ public class BadgeDeviceScheduleStrategy implements ScheduleStrategy {
|
||||
|
||||
// P0任务可以打断任何任务
|
||||
if (urgentContext.isUrgent()) {
|
||||
log.warn("允许打断: P0紧急任务可以打断当前任务");
|
||||
return InterruptDecision.allowByDefault();
|
||||
}
|
||||
|
||||
// P1/P2任务不能打断
|
||||
log.info("拒绝打断: 非P0任务不能打断当前任务");
|
||||
return InterruptDecision.deny(
|
||||
"紧急任务优先级不足",
|
||||
"建议等待当前任务完成"
|
||||
);
|
||||
log.info("拒绝打断: 当前调度已改为非抢占式队列派发");
|
||||
return InterruptDecision.deny("当前调度不再支持抢断", "工单将按队列总分在下一轮派发");
|
||||
}
|
||||
|
||||
log.info("拒绝打断: 当前调度已改为非抢占式队列派发");
|
||||
return InterruptDecision.deny(
|
||||
"当前调度不再支持抢断",
|
||||
"工单将按队列总分在下一轮派发"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -28,7 +28,7 @@ import java.util.List;
|
||||
* <li>空闲无任务 → DIRECT_DISPATCH(直接派单)</li>
|
||||
* <li>空闲有等待 → PUSH_AND_ENQUEUE(推送等待+新任务入队)</li>
|
||||
* <li>忙碌且非P0 → ENQUEUE_ONLY(仅入队)</li>
|
||||
* <li>忙碌且P0 → INTERRUPT_AND_DISPATCH(打断并派单)</li>
|
||||
* <li>忙碌且P0 → ENQUEUE_ONLY(不抢断,进入等待队列)</li>
|
||||
* </ul>
|
||||
*
|
||||
* @author lzh
|
||||
@@ -106,10 +106,10 @@ public class CleanerPriorityScheduleStrategy implements ScheduleStrategy {
|
||||
// P0紧急任务,需要打断
|
||||
if (assigneeStatus.getCurrentTaskCount() > 0) {
|
||||
Long currentOrderId = cleanerStatus.getCurrentOpsOrderId();
|
||||
log.warn("决策: INTERRUPT_AND_DISPATCH - P0紧急任务打断当前任务: currentOrderId={}", currentOrderId);
|
||||
return DispatchDecision.interruptAndDispatch(currentOrderId);
|
||||
log.info("决策: ENQUEUE_ONLY - P0工单不再打断当前任务,进入等待队列: currentOrderId={}", currentOrderId);
|
||||
return DispatchDecision.enqueueOnly();
|
||||
} else {
|
||||
log.info("决策: DIRECT_DISPATCH - P0紧急任务直接派单");
|
||||
log.info("决策: DIRECT_DISPATCH - P0工单在空闲状态下直接派发");
|
||||
return DispatchDecision.directDispatch();
|
||||
}
|
||||
} else {
|
||||
@@ -133,15 +133,14 @@ public class CleanerPriorityScheduleStrategy implements ScheduleStrategy {
|
||||
|
||||
// P0任务可以打断任何任务
|
||||
if (urgentContext.isUrgent()) {
|
||||
log.warn("允许打断: P0紧急任务可以打断当前任务");
|
||||
return InterruptDecision.allowByDefault();
|
||||
log.info("拒绝打断: 当前调度已改为非抢占式队列派发");
|
||||
return InterruptDecision.deny("当前调度不再支持抢断", "工单将按队列总分在下一轮派发");
|
||||
}
|
||||
|
||||
// P1/P2任务不能打断
|
||||
log.info("拒绝打断: 非P0任务不能打断当前任务");
|
||||
log.info("拒绝打断: 当前调度已改为非抢占式队列派发");
|
||||
return InterruptDecision.deny(
|
||||
"紧急任务优先级不足",
|
||||
"建议等待当前任务完成"
|
||||
"当前调度不再支持抢断",
|
||||
"工单将按队列总分在下一轮派发"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -9,10 +9,13 @@ import com.viewsh.module.ops.core.event.OrderEventPublisher;
|
||||
import com.viewsh.module.ops.core.lifecycle.OrderLifecycleManager;
|
||||
import com.viewsh.module.ops.core.lifecycle.model.OrderTransitionRequest;
|
||||
import com.viewsh.module.ops.dal.dataobject.workorder.OpsOrderDO;
|
||||
import com.viewsh.module.ops.dal.dataobject.area.OpsBusAreaDO;
|
||||
import com.viewsh.module.ops.dal.mysql.area.OpsBusAreaMapper;
|
||||
import com.viewsh.module.ops.dal.mysql.workorder.OpsOrderMapper;
|
||||
import com.viewsh.module.ops.enums.PriorityEnum;
|
||||
import com.viewsh.module.ops.enums.WorkOrderStatusEnum;
|
||||
import com.viewsh.module.ops.environment.dal.dataobject.workorder.OpsOrderCleanExtDO;
|
||||
import com.viewsh.module.ops.environment.dal.redis.TrafficActiveOrderRedisDAO;
|
||||
import com.viewsh.module.ops.environment.dal.mysql.workorder.OpsOrderCleanExtMapper;
|
||||
import com.viewsh.module.ops.environment.integration.consumer.*;
|
||||
import com.viewsh.framework.common.pojo.CommonResult;
|
||||
@@ -73,6 +76,8 @@ public class CleanOrderEndToEndTest {
|
||||
@Mock
|
||||
private OpsOrderMapper opsOrderMapper;
|
||||
@Mock
|
||||
private OpsBusAreaMapper opsBusAreaMapper;
|
||||
@Mock
|
||||
private OpsOrderCleanExtMapper cleanExtMapper;
|
||||
@Mock
|
||||
private OrderIdGenerator orderIdGenerator;
|
||||
@@ -94,6 +99,8 @@ public class CleanOrderEndToEndTest {
|
||||
private ValueOperations<String, String> valueOperations;
|
||||
@Mock
|
||||
private VoiceBroadcastService voiceBroadcastService;
|
||||
@Mock
|
||||
private TrafficActiveOrderRedisDAO trafficActiveOrderRedisDAO;
|
||||
|
||||
@Mock
|
||||
private BadgeDeviceStatusService badgeDeviceStatusService;
|
||||
@@ -149,6 +156,7 @@ public class CleanOrderEndToEndTest {
|
||||
|
||||
// 注入 CleanOrderEventListener
|
||||
injectField(cleanOrderService, "cleanOrderEventListener", cleanOrderEventListener);
|
||||
injectField(cleanOrderService, "opsBusAreaMapper", opsBusAreaMapper);
|
||||
|
||||
// 注入 CleanOrderAuditEventHandler 依赖
|
||||
injectField(auditEventHandler, "eventLogRecorder", eventLogRecorder);
|
||||
@@ -156,10 +164,18 @@ public class CleanOrderEndToEndTest {
|
||||
injectField(auditEventHandler, "opsOrderMapper", opsOrderMapper);
|
||||
injectField(auditEventHandler, "stringRedisTemplate", stringRedisTemplate);
|
||||
injectField(auditEventHandler, "objectMapper", objectMapper);
|
||||
injectField(createEventHandler, "trafficActiveOrderRedisDAO", trafficActiveOrderRedisDAO);
|
||||
|
||||
// Stub IotDeviceControlApi for resetTrafficCounter
|
||||
lenient().when(iotDeviceControlApi.resetTrafficCounter(any()))
|
||||
.thenReturn(CommonResult.success(true));
|
||||
lenient().when(opsBusAreaMapper.selectById(anyLong()))
|
||||
.thenAnswer(i -> OpsBusAreaDO.builder()
|
||||
.id(i.getArgument(0))
|
||||
.areaName("测试区域")
|
||||
.parentPath(null)
|
||||
.floorNo(1)
|
||||
.build());
|
||||
}
|
||||
|
||||
// ==========================================
|
||||
@@ -338,7 +354,7 @@ public class CleanOrderEndToEndTest {
|
||||
verify(eventLogRecorder).record(any());
|
||||
|
||||
// 2. TTS sent (orderId can be null for TTS_REQUEST events)
|
||||
verify(voiceBroadcastService).broadcastInOrder(eq(5001L), contains("请回到作业区域"), eq((Long) null));
|
||||
verify(voiceBroadcastService).broadcastDirect(eq(5001L), contains("请回到作业区域"), eq(9), eq((Long) null));
|
||||
}
|
||||
|
||||
// ==========================================
|
||||
@@ -378,9 +394,6 @@ public class CleanOrderEndToEndTest {
|
||||
|
||||
when(opsOrderMapper.selectById(orderId)).thenReturn(order);
|
||||
when(orderQueueService.getByOpsOrderId(orderId)).thenReturn(queueDTO);
|
||||
when(dispatchEngine.urgentInterrupt(eq(orderId), eq(2001L)))
|
||||
.thenReturn(DispatchResult.success("Success", 2001L));
|
||||
|
||||
// Execute
|
||||
boolean result = cleanOrderService.upgradePriorityToP0(orderId, "Manual Upgrade");
|
||||
|
||||
@@ -392,7 +405,7 @@ public class CleanOrderEndToEndTest {
|
||||
assertEquals(PriorityEnum.P0.getPriority(), orderCaptor.getValue().getPriority());
|
||||
|
||||
verify(orderQueueService).adjustPriority(eq(500L), eq(PriorityEnum.P0), anyString());
|
||||
verify(dispatchEngine).urgentInterrupt(orderId, 2001L);
|
||||
verify(orderQueueService).rebuildWaitingTasksByUserId(2001L, order.getAreaId());
|
||||
verify(cleanOrderEventListener).sendPriorityUpgradeNotification(eq(2001L), eq("WO-P2"), eq(orderId));
|
||||
}
|
||||
|
||||
|
||||
@@ -36,7 +36,7 @@ class CleanerPriorityScheduleStrategyTest {
|
||||
private com.viewsh.module.ops.core.dispatch.DispatchEngine dispatchEngine;
|
||||
|
||||
@Test
|
||||
void testDecide_P0_Interrupt() {
|
||||
void testDecide_P0_EnqueueOnlyWhenBusy() {
|
||||
// Setup
|
||||
OpsCleanerStatusDO c1 = new OpsCleanerStatusDO();
|
||||
c1.setUserId(1L);
|
||||
@@ -54,8 +54,7 @@ class CleanerPriorityScheduleStrategyTest {
|
||||
DispatchDecision decision = strategy.decide(context);
|
||||
|
||||
// Verify
|
||||
assertEquals(DispatchPath.INTERRUPT_AND_DISPATCH, decision.getPath());
|
||||
assertEquals(500L, decision.getInterruptedOrderId());
|
||||
assertEquals(DispatchPath.ENQUEUE_ONLY, decision.getPath());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
||||
@@ -40,13 +40,8 @@ public class OrderQueueDTO {
|
||||
|
||||
/**
|
||||
* 队列分数(用于排序)
|
||||
* 计算公式:优先级分数 + 时间戳
|
||||
* - P0: 0 + timestamp
|
||||
* - P1: 1000000 + timestamp
|
||||
* - P2: 2000000 + timestamp
|
||||
* - P3: 3000000 + timestamp
|
||||
*
|
||||
* 用于数据库层面的排序,优先级高的排在前面,同优先级按时间排序
|
||||
* 计算公式:优先级分 + 楼层差分 - 等待老化分
|
||||
* 分数越小越靠前,用于等待队列的动态重排
|
||||
*/
|
||||
private Double queueScore;
|
||||
|
||||
@@ -95,6 +90,31 @@ public class OrderQueueDTO {
|
||||
*/
|
||||
private LocalDateTime updateTime;
|
||||
|
||||
/**
|
||||
* 评分基准楼层
|
||||
*/
|
||||
private Integer baseFloorNo;
|
||||
|
||||
/**
|
||||
* 目标工单楼层
|
||||
*/
|
||||
private Integer targetFloorNo;
|
||||
|
||||
/**
|
||||
* 楼层差
|
||||
*/
|
||||
private Integer floorDiff;
|
||||
|
||||
/**
|
||||
* 等待分钟数
|
||||
*/
|
||||
private Long waitMinutes;
|
||||
|
||||
/**
|
||||
* 分数更新时间
|
||||
*/
|
||||
private LocalDateTime scoreUpdateTime;
|
||||
|
||||
// ========== 兼容旧字段名的getter方法 ==========
|
||||
|
||||
/**
|
||||
|
||||
@@ -164,6 +164,15 @@ public interface OrderQueueService {
|
||||
*/
|
||||
List<OrderQueueDTO> getWaitingTasksByUserIdFromDb(Long userId);
|
||||
|
||||
/**
|
||||
* 按当前上下文重算指定执行人等待队列的总分并返回最新排序结果
|
||||
*
|
||||
* @param userId 执行人ID
|
||||
* @param fallbackAreaId 当没有执行中工单时可使用的楼层基准区域ID
|
||||
* @return 已按最新总分排序的 WAITING 工单列表
|
||||
*/
|
||||
List<OrderQueueDTO> rebuildWaitingTasksByUserId(Long userId, Long fallbackAreaId);
|
||||
|
||||
/**
|
||||
* 获取用户的暂停任务列表(PAUSED状态,按暂停时间排序)
|
||||
*
|
||||
|
||||
@@ -21,5 +21,6 @@ public interface ErrorCodeConstants {
|
||||
ErrorCode DEVICE_ALREADY_BOUND = new ErrorCode(1_020_002_001, "该工牌已绑定至此区域");
|
||||
ErrorCode DEVICE_TYPE_ALREADY_BOUND = new ErrorCode(1_020_002_002, "该区域已绑定{},一个区域只能绑定一个");
|
||||
ErrorCode DEVICE_RELATION_NOT_FOUND = new ErrorCode(1_020_002_003, "设备关联关系不存在");
|
||||
ErrorCode IOT_SERVICE_UNAVAILABLE = new ErrorCode(1_020_002_004, "IoT 设备服务不可用,请稍后重试");
|
||||
|
||||
}
|
||||
|
||||
@@ -134,7 +134,7 @@ public class DispatchEngineImpl implements DispatchEngine {
|
||||
@Override
|
||||
@Transactional(rollbackFor = Exception.class)
|
||||
public DispatchResult urgentInterrupt(Long urgentOrderId, Long assigneeId) {
|
||||
log.warn("开始P0紧急插队: urgentOrderId={}, assigneeId={}", urgentOrderId, assigneeId);
|
||||
log.warn("处理P0工单派发请求: urgentOrderId={}, assigneeId={}", urgentOrderId, assigneeId);
|
||||
|
||||
// 查询紧急工单
|
||||
OpsOrderDO urgentOrder = orderMapper.selectById(urgentOrderId);
|
||||
@@ -156,23 +156,15 @@ public class DispatchEngineImpl implements DispatchEngine {
|
||||
// 查询执行人当前任务
|
||||
List<OrderQueueDTO> currentTasks = orderQueueService.getTasksByUserId(assigneeId);
|
||||
|
||||
// 如果有正在执行的任务,需要打断
|
||||
// 如果有正在执行的任务,不再打断,重排等待队列后等待下一轮派发
|
||||
OrderQueueDTO currentTask = currentTasks.stream()
|
||||
.filter(t -> OrderQueueStatusEnum.PROCESSING.getStatus().equals(t.getQueueStatus()))
|
||||
.findFirst()
|
||||
.orElse(null);
|
||||
|
||||
if (currentTask != null && !currentTask.getOpsOrderId().equals(urgentOrderId)) {
|
||||
// 需要打断当前任务
|
||||
log.info("打断当前任务: currentOrderId={}, urgentOrderId={}",
|
||||
currentTask.getOpsOrderId(), urgentOrderId);
|
||||
|
||||
try {
|
||||
orderLifecycleManager.interruptOrder(
|
||||
currentTask.getOpsOrderId(), urgentOrderId, assigneeId);
|
||||
} catch (Exception e) {
|
||||
log.warn("打断任务失败: currentOrderId={}", currentTask.getOpsOrderId(), e);
|
||||
}
|
||||
orderQueueService.rebuildWaitingTasksByUserId(assigneeId, null);
|
||||
return DispatchResult.success("P0工单已入队等待,不再打断当前任务", assigneeId);
|
||||
}
|
||||
|
||||
// 派发紧急任务
|
||||
@@ -182,63 +174,42 @@ public class DispatchEngineImpl implements DispatchEngine {
|
||||
.assigneeId(assigneeId)
|
||||
.operatorType(OperatorTypeEnum.SYSTEM)
|
||||
.operatorId(assigneeId)
|
||||
.reason("P0紧急任务派单")
|
||||
.reason("P0工单直接派发")
|
||||
.build();
|
||||
|
||||
OrderTransitionResult result = orderLifecycleManager.dispatch(request);
|
||||
|
||||
if (result.isSuccess()) {
|
||||
return DispatchResult.success("P0紧急任务已派单", assigneeId);
|
||||
return DispatchResult.success("P0工单已直接派发", assigneeId);
|
||||
} else {
|
||||
return DispatchResult.fail("P0紧急任务派单失败: " + result.getMessage());
|
||||
return DispatchResult.fail("P0工单派发失败: " + result.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
@Transactional(rollbackFor = Exception.class)
|
||||
public DispatchResult autoDispatchNext(Long completedOrderId, Long assigneeId) {
|
||||
log.info("任务完成后自动调度下一个: completedOrderId={}, assigneeId={}", completedOrderId, assigneeId);
|
||||
log.info("任务完成后自动派发下一单: completedOrderId={}, assigneeId={}", completedOrderId, assigneeId);
|
||||
|
||||
// 1. 优先检查是否有被中断的任务
|
||||
List<OrderQueueDTO> interruptedTasks = orderQueueService.getInterruptedTasksByUserId(assigneeId);
|
||||
|
||||
if (!interruptedTasks.isEmpty()) {
|
||||
// 恢复第一个中断的任务
|
||||
OrderQueueDTO interruptedTask = interruptedTasks.get(0);
|
||||
log.info("恢复中断任务: orderId={}", interruptedTask.getOpsOrderId());
|
||||
|
||||
OrderTransitionRequest request = OrderTransitionRequest.builder()
|
||||
.orderId(interruptedTask.getOpsOrderId())
|
||||
.targetStatus(WorkOrderStatusEnum.DISPATCHED)
|
||||
.queueId(interruptedTask.getId())
|
||||
.assigneeId(assigneeId)
|
||||
.operatorType(OperatorTypeEnum.SYSTEM)
|
||||
.operatorId(assigneeId)
|
||||
.reason("恢复中断任务")
|
||||
.build();
|
||||
|
||||
OrderTransitionResult result = orderLifecycleManager.transition(request);
|
||||
|
||||
if (result.isSuccess()) {
|
||||
return DispatchResult.success("已恢复中断任务", assigneeId);
|
||||
} else {
|
||||
return DispatchResult.fail("恢复中断任务失败: " + result.getMessage());
|
||||
}
|
||||
Long fallbackAreaId = null;
|
||||
OpsOrderDO completedOrder = orderMapper.selectById(completedOrderId);
|
||||
if (completedOrder != null) {
|
||||
fallbackAreaId = completedOrder.getAreaId();
|
||||
}
|
||||
|
||||
// 2. 如果没有中断任务,推送队列中的下一个任务
|
||||
// 注意:这里必须从 MySQL 读取最新数据,确保刚完成的任务不在等待列表中
|
||||
List<OrderQueueDTO> waitingTasks = orderQueueService.getWaitingTasksByUserIdFromDb(assigneeId);
|
||||
List<OrderQueueDTO> waitingTasks = orderQueueService.rebuildWaitingTasksByUserId(assigneeId, fallbackAreaId);
|
||||
|
||||
if (waitingTasks.isEmpty()) {
|
||||
log.info("无等待任务,执行人变为空闲: assigneeId={}", assigneeId);
|
||||
// 发布事件,由业务层更新执行人状态
|
||||
return DispatchResult.success("无等待任务,任务完成", assigneeId);
|
||||
return DispatchResult.success("无等待工单,执行人保持空闲", assigneeId);
|
||||
}
|
||||
|
||||
// <EFBFBD><EFBFBD>送第一个等待任务
|
||||
// 动态总分重排后,派发得分最低的等待工单
|
||||
OrderQueueDTO nextTask = waitingTasks.get(0);
|
||||
log.info("推送下一个等待任务: taskId={}, orderId={}", nextTask.getId(), nextTask.getOpsOrderId());
|
||||
log.info("派发下一单: queueId={}, orderId={}, score={}, floorDiff={}, waitMinutes={}",
|
||||
nextTask.getId(), nextTask.getOpsOrderId(), nextTask.getQueueScore(),
|
||||
nextTask.getFloorDiff(), nextTask.getWaitMinutes());
|
||||
|
||||
OrderTransitionRequest request = OrderTransitionRequest.builder()
|
||||
.orderId(nextTask.getOpsOrderId())
|
||||
@@ -247,15 +218,15 @@ public class DispatchEngineImpl implements DispatchEngine {
|
||||
.assigneeId(assigneeId)
|
||||
.operatorType(OperatorTypeEnum.SYSTEM)
|
||||
.operatorId(assigneeId)
|
||||
.reason("自动推送下一个任务")
|
||||
.reason("等待队列动态重排后自动派发")
|
||||
.build();
|
||||
|
||||
OrderTransitionResult result = orderLifecycleManager.transition(request);
|
||||
|
||||
if (result.isSuccess()) {
|
||||
return DispatchResult.success("已推送下一个任务", assigneeId);
|
||||
return DispatchResult.success("已按队列总分派发下一单", assigneeId);
|
||||
} else {
|
||||
return DispatchResult.fail("推送下一个任务失败: " + result.getMessage());
|
||||
return DispatchResult.fail("按队列总分派发下一单失败: " + result.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -330,7 +301,7 @@ public class DispatchEngineImpl implements DispatchEngine {
|
||||
OrderDispatchContext urgentContext) {
|
||||
ScheduleStrategy strategy = scheduleStrategyRegistry.get(urgentContext.getBusinessType());
|
||||
if (strategy == null) {
|
||||
log.warn("未找到调度策略,使用默认打断规则: businessType={}", urgentContext.getBusinessType());
|
||||
log.warn("未找到调度策略,使用默认非抢断规则: businessType={}", urgentContext.getBusinessType());
|
||||
return defaultInterruptDecision(urgentContext);
|
||||
}
|
||||
|
||||
@@ -343,14 +314,13 @@ public class DispatchEngineImpl implements DispatchEngine {
|
||||
}
|
||||
|
||||
/**
|
||||
* 默认打断决策
|
||||
* P0任务可以打断任何非P0任务
|
||||
* 默认非抢断决策
|
||||
*/
|
||||
private InterruptDecision defaultInterruptDecision(OrderDispatchContext urgentContext) {
|
||||
if (urgentContext.isUrgent()) {
|
||||
return InterruptDecision.allowByDefault();
|
||||
return InterruptDecision.deny("当前调度不再支持抢断", "工单将按队列总分在下一轮派发");
|
||||
}
|
||||
return InterruptDecision.deny("紧急任务优先级不足", "建议等待当前任务完成");
|
||||
return InterruptDecision.deny("当前调度不再支持抢断", "建议等待当前任务完成后按队列总分派发");
|
||||
}
|
||||
|
||||
// ==================== 私有方法 ====================
|
||||
@@ -502,41 +472,12 @@ public class DispatchEngineImpl implements DispatchEngine {
|
||||
}
|
||||
|
||||
/**
|
||||
* 打断并派单
|
||||
* 历史抢断入口已废弃,统一降级为仅入队等待下一轮动态派发
|
||||
*/
|
||||
private DispatchResult executeInterruptAndDispatch(OrderDispatchContext context, Long assigneeId,
|
||||
Long interruptedOrderId) {
|
||||
log.warn("执行打断并派单: orderId={}, assigneeId={}, interruptedOrderId={}",
|
||||
log.warn("检测到过期抢断路径,降级为仅入队: orderId={}, assigneeId={}, interruptedOrderId={}",
|
||||
context.getOrderId(), assigneeId, interruptedOrderId);
|
||||
|
||||
// 先打断当前任务
|
||||
if (interruptedOrderId != null) {
|
||||
orderLifecycleManager.interruptOrder(interruptedOrderId, context.getOrderId(), assigneeId);
|
||||
}
|
||||
|
||||
// 派发紧急任务
|
||||
OrderTransitionRequest request = OrderTransitionRequest.builder()
|
||||
.orderId(context.getOrderId())
|
||||
.targetStatus(WorkOrderStatusEnum.DISPATCHED)
|
||||
.assigneeId(assigneeId)
|
||||
.assigneeName(context.getRecommendedAssigneeName())
|
||||
.operatorType(OperatorTypeEnum.SYSTEM)
|
||||
.operatorId(assigneeId)
|
||||
.reason("P0紧急任务派单")
|
||||
.build();
|
||||
|
||||
OrderTransitionResult result = orderLifecycleManager.dispatch(request);
|
||||
|
||||
if (result.isSuccess()) {
|
||||
return DispatchResult.success(
|
||||
"P0紧急任务已派单",
|
||||
assigneeId,
|
||||
null,
|
||||
DispatchPath.INTERRUPT_AND_DISPATCH,
|
||||
result.getQueueId()
|
||||
);
|
||||
} else {
|
||||
return DispatchResult.fail("P0紧急任务派单失败: " + result.getMessage());
|
||||
}
|
||||
return executeEnqueueOnly(context, assigneeId);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -62,13 +62,8 @@ public class OpsOrderQueueDO extends BaseDO {
|
||||
|
||||
/**
|
||||
* 队列分数(用于排序)
|
||||
* 计算公式:优先级分数 + 时间戳
|
||||
* - P0: 0 + timestamp
|
||||
* - P1: 1000000 + timestamp
|
||||
* - P2: 2000000 + timestamp
|
||||
* - P3: 3000000 + timestamp
|
||||
*
|
||||
* 用于数据库层面的排序,优先级高的排在前面,同优先级按时间排序
|
||||
* 计算公式:优先级分 + 楼层差分 - 等待老化分
|
||||
* 分数越小越靠前,用于数据库与 Redis 的一致排序
|
||||
*/
|
||||
@TableField("queue_score")
|
||||
private Double queueScore;
|
||||
|
||||
@@ -33,6 +33,18 @@ public class AreaDeviceRelationRespVO {
|
||||
@Schema(description = "设备名称", example = "客流计数器001")
|
||||
private String deviceName;
|
||||
|
||||
@Schema(description = "设备备注名称", example = "A栋3层客流计数器")
|
||||
private String nickname;
|
||||
|
||||
@Schema(description = "设备序列号", example = "SN20250101001")
|
||||
private String serialNumber;
|
||||
|
||||
@Schema(description = "设备状态(0-未激活 1-在线 2-离线)", example = "1")
|
||||
private Integer deviceState;
|
||||
|
||||
@Schema(description = "设备类型", example = "10")
|
||||
private Integer deviceType;
|
||||
|
||||
@Schema(description = "产品ID", example = "10")
|
||||
private Long productId;
|
||||
|
||||
|
||||
@@ -18,9 +18,9 @@ import org.springframework.stereotype.Service;
|
||||
import org.springframework.transaction.annotation.Transactional;
|
||||
import org.springframework.validation.annotation.Validated;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.*;
|
||||
import java.util.function.Function;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static com.viewsh.module.ops.enums.AreaTypeEnum.*;
|
||||
|
||||
@@ -34,17 +34,17 @@ import static com.viewsh.module.ops.enums.AreaTypeEnum.*;
|
||||
@Slf4j
|
||||
public class AreaDeviceRelationServiceImpl implements AreaDeviceRelationService {
|
||||
|
||||
@Resource
|
||||
private OpsAreaDeviceRelationMapper opsAreaDeviceRelationMapper;
|
||||
|
||||
@Resource
|
||||
private OpsBusAreaMapper opsBusAreaMapper;
|
||||
|
||||
@Resource
|
||||
private IotDeviceQueryApi iotDeviceQueryApi;
|
||||
|
||||
@Resource
|
||||
private AreaDeviceService areaDeviceService;
|
||||
@Resource
|
||||
private OpsAreaDeviceRelationMapper opsAreaDeviceRelationMapper;
|
||||
|
||||
@Resource
|
||||
private OpsBusAreaMapper opsBusAreaMapper;
|
||||
|
||||
@Resource
|
||||
private IotDeviceQueryApi iotDeviceQueryApi;
|
||||
|
||||
@Resource
|
||||
private AreaDeviceService areaDeviceService;
|
||||
|
||||
private static final String TYPE_TRAFFIC_COUNTER = "TRAFFIC_COUNTER";
|
||||
private static final String TYPE_BEACON = "BEACON";
|
||||
@@ -53,9 +53,19 @@ public class AreaDeviceRelationServiceImpl implements AreaDeviceRelationService
|
||||
@Override
|
||||
public List<AreaDeviceRelationRespVO> listByAreaId(Long areaId) {
|
||||
List<OpsAreaDeviceRelationDO> list = opsAreaDeviceRelationMapper.selectListByAreaId(areaId);
|
||||
if (list == null || list.isEmpty()) {
|
||||
return Collections.emptyList();
|
||||
}
|
||||
|
||||
// 批量查询设备信息,避免 N+1 RPC 调用
|
||||
Set<Long> deviceIds = list.stream()
|
||||
.map(OpsAreaDeviceRelationDO::getDeviceId)
|
||||
.collect(Collectors.toSet());
|
||||
Map<Long, IotDeviceSimpleRespDTO> deviceMap = batchGetDeviceMap(deviceIds);
|
||||
|
||||
return list.stream()
|
||||
.map(this::convertToRespVO)
|
||||
.collect(java.util.stream.Collectors.toList());
|
||||
.map(relation -> convertToRespVO(relation, deviceMap.get(relation.getDeviceId())))
|
||||
.toList();
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -87,43 +97,27 @@ public class AreaDeviceRelationServiceImpl implements AreaDeviceRelationService
|
||||
}
|
||||
}
|
||||
|
||||
// 调用 IoT 接口获取设备信息
|
||||
String deviceKey = "DEVICE_" + bindReq.getDeviceId();
|
||||
Long productId = 0L;
|
||||
String productKey = "PRODUCT_DEFAULT";
|
||||
|
||||
try {
|
||||
CommonResult<IotDeviceSimpleRespDTO> result = iotDeviceQueryApi.getDevice(bindReq.getDeviceId());
|
||||
if (result != null && result.getData() != null) {
|
||||
IotDeviceSimpleRespDTO device = result.getData();
|
||||
deviceKey = device.getDeviceName() != null ? device.getDeviceName() : deviceKey;
|
||||
productId = device.getProductId() != null ? device.getProductId() : 0L;
|
||||
productKey = device.getProductKey() != null ? device.getProductKey() : productKey;
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.warn("[bindDevice] 调用 IoT 接口获取设备信息失败: deviceId={}, error={}",
|
||||
bindReq.getDeviceId(), e.getMessage());
|
||||
// 降级:使用默认值
|
||||
}
|
||||
// 调用 IoT 接口获取设备信息(失败则阻断绑定)
|
||||
IotDeviceSimpleRespDTO device = getDeviceOrThrow(bindReq.getDeviceId());
|
||||
|
||||
OpsAreaDeviceRelationDO relation = OpsAreaDeviceRelationDO.builder()
|
||||
.areaId(bindReq.getAreaId())
|
||||
.deviceId(bindReq.getDeviceId())
|
||||
.deviceKey(deviceKey)
|
||||
.productId(productId)
|
||||
.productKey(productKey)
|
||||
.deviceKey(device.getDeviceName())
|
||||
.productId(device.getProductId())
|
||||
.productKey(device.getProductKey())
|
||||
.relationType(bindReq.getRelationType())
|
||||
.configData(bindReq.getConfigData() != null ? bindReq.getConfigData() : new HashMap<>())
|
||||
.enabled(true)
|
||||
.build();
|
||||
|
||||
opsAreaDeviceRelationMapper.insert(relation);
|
||||
|
||||
// 清除可能存在的 NULL_CACHE 标记
|
||||
areaDeviceService.evictConfigCache(relation.getAreaId(), relation.getRelationType());
|
||||
|
||||
return relation.getId();
|
||||
}
|
||||
opsAreaDeviceRelationMapper.insert(relation);
|
||||
|
||||
// 清除可能存在的 NULL_CACHE 标记
|
||||
areaDeviceService.evictConfigCache(relation.getAreaId(), relation.getRelationType());
|
||||
|
||||
return relation.getId();
|
||||
}
|
||||
|
||||
@Override
|
||||
@Transactional(rollbackFor = Exception.class)
|
||||
@@ -141,60 +135,85 @@ public class AreaDeviceRelationServiceImpl implements AreaDeviceRelationService
|
||||
relation.setConfigData(updateReq.getConfigData());
|
||||
}
|
||||
|
||||
// enabled 更新
|
||||
if (updateReq.getEnabled() != null) {
|
||||
relation.setEnabled(updateReq.getEnabled());
|
||||
}
|
||||
|
||||
opsAreaDeviceRelationMapper.updateById(relation);
|
||||
|
||||
// 删缓存以同步 Redis
|
||||
areaDeviceService.evictConfigCache(existing.getAreaId(), existing.getRelationType());
|
||||
}
|
||||
|
||||
@Override
|
||||
@Transactional(rollbackFor = Exception.class)
|
||||
public Boolean unbindDevice(Long id) {
|
||||
OpsAreaDeviceRelationDO existing = opsAreaDeviceRelationMapper.selectById(id);
|
||||
if (existing == null) {
|
||||
return false;
|
||||
}
|
||||
|
||||
boolean deleted = opsAreaDeviceRelationMapper.deleteById(id) > 0;
|
||||
if (deleted) {
|
||||
// 同步 Redis 缓存
|
||||
areaDeviceService.evictConfigCache(existing.getAreaId(), existing.getRelationType());
|
||||
}
|
||||
return deleted;
|
||||
}
|
||||
|
||||
/**
|
||||
* 转换为响应 VO
|
||||
* 从 IoT 模块获取设备名称和产品名称(带降级)
|
||||
*/
|
||||
private AreaDeviceRelationRespVO convertToRespVO(OpsAreaDeviceRelationDO relation) {
|
||||
AreaDeviceRelationRespVO respVO = BeanUtil.copyProperties(relation, AreaDeviceRelationRespVO.class);
|
||||
|
||||
// 从 IoT 模块获取设备信息
|
||||
try {
|
||||
CommonResult<IotDeviceSimpleRespDTO> result = iotDeviceQueryApi.getDevice(relation.getDeviceId());
|
||||
if (result != null && result.getData() != null) {
|
||||
IotDeviceSimpleRespDTO device = result.getData();
|
||||
respVO.setDeviceName(device.getDeviceName());
|
||||
respVO.setProductName(device.getProductName());
|
||||
} else {
|
||||
// 降级:使用默认值
|
||||
respVO.setDeviceName("设备_" + relation.getDeviceId());
|
||||
respVO.setProductName("产品_" + relation.getProductKey());
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.warn("[convertToRespVO] 调用 IoT 接口获取设备信息失败: deviceId={}, error={}",
|
||||
relation.getDeviceId(), e.getMessage());
|
||||
// 降级:使用默认值
|
||||
respVO.setDeviceName("设备_" + relation.getDeviceId());
|
||||
respVO.setProductName("产品_" + relation.getProductKey());
|
||||
// enabled 更新
|
||||
if (updateReq.getEnabled() != null) {
|
||||
relation.setEnabled(updateReq.getEnabled());
|
||||
}
|
||||
|
||||
opsAreaDeviceRelationMapper.updateById(relation);
|
||||
|
||||
// 删缓存以同步 Redis
|
||||
areaDeviceService.evictConfigCache(existing.getAreaId(), existing.getRelationType());
|
||||
}
|
||||
|
||||
@Override
|
||||
@Transactional(rollbackFor = Exception.class)
|
||||
public Boolean unbindDevice(Long id) {
|
||||
OpsAreaDeviceRelationDO existing = opsAreaDeviceRelationMapper.selectById(id);
|
||||
if (existing == null) {
|
||||
return false;
|
||||
}
|
||||
|
||||
boolean deleted = opsAreaDeviceRelationMapper.deleteById(id) > 0;
|
||||
if (deleted) {
|
||||
// 同步 Redis 缓存
|
||||
areaDeviceService.evictConfigCache(existing.getAreaId(), existing.getRelationType());
|
||||
}
|
||||
return deleted;
|
||||
}
|
||||
|
||||
/**
|
||||
* 批量查询设备信息,返回 deviceId → DTO 映射
|
||||
*/
|
||||
private Map<Long, IotDeviceSimpleRespDTO> batchGetDeviceMap(Set<Long> deviceIds) {
|
||||
if (deviceIds.isEmpty()) {
|
||||
return Collections.emptyMap();
|
||||
}
|
||||
try {
|
||||
CommonResult<List<IotDeviceSimpleRespDTO>> result = iotDeviceQueryApi.batchGetDevices(deviceIds);
|
||||
if (result != null && result.getData() != null) {
|
||||
return result.getData().stream()
|
||||
.collect(Collectors.toMap(IotDeviceSimpleRespDTO::getId, Function.identity()));
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.warn("[batchGetDeviceMap] 批量查询设备信息失败: deviceIds={}, error={}", deviceIds, e.getMessage());
|
||||
}
|
||||
return Collections.emptyMap();
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取单个设备信息,失败则抛出异常阻断操作
|
||||
*/
|
||||
private IotDeviceSimpleRespDTO getDeviceOrThrow(Long deviceId) {
|
||||
try {
|
||||
CommonResult<IotDeviceSimpleRespDTO> result = iotDeviceQueryApi.getDevice(deviceId);
|
||||
if (result != null && result.getData() != null) {
|
||||
return result.getData();
|
||||
}
|
||||
throw ServiceExceptionUtil.exception(ErrorCodeConstants.DEVICE_NOT_FOUND);
|
||||
} catch (com.viewsh.framework.common.exception.ServiceException e) {
|
||||
throw e;
|
||||
} catch (Exception e) {
|
||||
log.error("[getDeviceOrThrow] 调用 IoT 接口获取设备信息失败: deviceId={}, error={}",
|
||||
deviceId, e.getMessage());
|
||||
throw ServiceExceptionUtil.exception(ErrorCodeConstants.IOT_SERVICE_UNAVAILABLE);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 转换为响应 VO(使用预查询的设备信息,避免 N+1)
|
||||
*/
|
||||
private AreaDeviceRelationRespVO convertToRespVO(OpsAreaDeviceRelationDO relation,
|
||||
IotDeviceSimpleRespDTO device) {
|
||||
AreaDeviceRelationRespVO respVO = BeanUtil.copyProperties(relation, AreaDeviceRelationRespVO.class);
|
||||
if (device != null) {
|
||||
respVO.setDeviceName(device.getDeviceName());
|
||||
respVO.setProductName(device.getProductName());
|
||||
respVO.setNickname(device.getNickname());
|
||||
respVO.setSerialNumber(device.getSerialNumber());
|
||||
respVO.setDeviceState(device.getState());
|
||||
respVO.setDeviceType(device.getDeviceType());
|
||||
}
|
||||
return respVO;
|
||||
}
|
||||
|
||||
|
||||
@@ -1,21 +1,26 @@
|
||||
package com.viewsh.module.ops.service.queue;
|
||||
|
||||
import com.viewsh.module.ops.api.queue.OrderQueueDTO;
|
||||
import com.viewsh.module.ops.api.queue.OrderQueueService;
|
||||
import com.viewsh.module.ops.dal.dataobject.queue.OpsOrderQueueDO;
|
||||
import com.viewsh.module.ops.dal.mysql.queue.OpsOrderQueueMapper;
|
||||
import com.viewsh.module.ops.api.queue.OrderQueueDTO;
|
||||
import com.viewsh.module.ops.api.queue.OrderQueueService;
|
||||
import com.viewsh.module.ops.dal.dataobject.area.OpsBusAreaDO;
|
||||
import com.viewsh.module.ops.dal.dataobject.queue.OpsOrderQueueDO;
|
||||
import com.viewsh.module.ops.dal.dataobject.workorder.OpsOrderDO;
|
||||
import com.viewsh.module.ops.dal.mysql.area.OpsBusAreaMapper;
|
||||
import com.viewsh.module.ops.dal.mysql.queue.OpsOrderQueueMapper;
|
||||
import com.viewsh.module.ops.dal.mysql.workorder.OpsOrderMapper;
|
||||
import com.viewsh.module.ops.enums.OrderQueueStatusEnum;
|
||||
import com.viewsh.module.ops.enums.PriorityEnum;
|
||||
import jakarta.annotation.Resource;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.BeanUtils;
|
||||
import org.springframework.context.annotation.Primary;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.springframework.transaction.annotation.Transactional;
|
||||
import org.springframework.beans.BeanUtils;
|
||||
import org.springframework.context.annotation.Primary;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.springframework.transaction.support.TransactionSynchronization;
|
||||
import org.springframework.transaction.support.TransactionSynchronizationManager;
|
||||
import org.springframework.transaction.annotation.Transactional;
|
||||
|
||||
import java.time.LocalDateTime;
|
||||
import java.time.ZoneId;
|
||||
import java.util.*;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
@@ -35,27 +40,27 @@ import java.util.stream.Collectors;
|
||||
@Primary
|
||||
public class OrderQueueServiceEnhanced implements OrderQueueService {
|
||||
|
||||
/**
|
||||
* Score 计算公式:优先级分数 + 时间戳
|
||||
* 优先级分数:P0=0, P1=1000000, P2=2000000, P3=3000000
|
||||
* 时间戳:秒级时间戳
|
||||
* 结果:优先级高的排在前面,同优先级按时间排序
|
||||
*/
|
||||
private static final Map<Integer, Long> PRIORITY_SCORES = Map.of(
|
||||
0, 0L, // P0: 0
|
||||
1, 1000000L, // P1: 1,000,000
|
||||
2, 2000000L, // P2: 2,000,000
|
||||
3, 3000000L // P3: 3,000,000
|
||||
);
|
||||
|
||||
@Resource
|
||||
private OpsOrderQueueMapper orderQueueMapper;
|
||||
|
||||
@Resource
|
||||
private RedisOrderQueueService redisQueueService;
|
||||
|
||||
@Resource
|
||||
private QueueSyncService queueSyncService;
|
||||
/**
|
||||
* 队列总分由 QueueScoreCalculator 统一计算:
|
||||
* 优先级分 + 楼层差分 - 等待老化分。
|
||||
*/
|
||||
@Resource
|
||||
private OpsOrderQueueMapper orderQueueMapper;
|
||||
|
||||
@Resource
|
||||
private OpsOrderMapper orderMapper;
|
||||
|
||||
@Resource
|
||||
private OpsBusAreaMapper areaMapper;
|
||||
|
||||
@Resource
|
||||
private RedisOrderQueueService redisQueueService;
|
||||
|
||||
@Resource
|
||||
private QueueSyncService queueSyncService;
|
||||
|
||||
@Resource
|
||||
private QueueScoreCalculator queueScoreCalculator;
|
||||
|
||||
@Override
|
||||
@Transactional(rollbackFor = Exception.class)
|
||||
@@ -69,7 +74,11 @@ public class OrderQueueServiceEnhanced implements OrderQueueService {
|
||||
|
||||
// 2. 计算队列分数
|
||||
LocalDateTime now = LocalDateTime.now();
|
||||
double queueScore = calculateQueueScore(priority.getPriority(), now);
|
||||
double queueScore = queueScoreCalculator.calculate(QueueScoreContext.builder()
|
||||
.priority(priority.getPriority())
|
||||
.enqueueTime(now)
|
||||
.now(now)
|
||||
.build()).getTotalScore();
|
||||
|
||||
// 3. 创建队列记录(MySQL)
|
||||
OpsOrderQueueDO queueDO = OpsOrderQueueDO.builder()
|
||||
@@ -361,22 +370,20 @@ public class OrderQueueServiceEnhanced implements OrderQueueService {
|
||||
queueDO.setQueueIndex(calculateNextQueueIndex(newPriority));
|
||||
// 重新计算队列分数(使用原入队时间保持时间戳不变)
|
||||
LocalDateTime enqueueTime = queueDO.getEnqueueTime() != null ? queueDO.getEnqueueTime() : LocalDateTime.now();
|
||||
double newQueueScore = calculateQueueScore(newPriority.getPriority(), enqueueTime);
|
||||
double newQueueScore = queueScoreCalculator.calculate(QueueScoreContext.builder()
|
||||
.priority(newPriority.getPriority())
|
||||
.enqueueTime(enqueueTime)
|
||||
.now(LocalDateTime.now())
|
||||
.build()).getTotalScore();
|
||||
queueDO.setQueueScore(newQueueScore);
|
||||
queueDO.setEventMessage("优先级调整: " + oldPriority + " -> " + newPriority + ", 原因: " + reason);
|
||||
|
||||
int updated = orderQueueMapper.updateById(queueDO);
|
||||
|
||||
// 异步更新 Redis
|
||||
if (updated > 0) {
|
||||
CompletableFuture.runAsync(() -> {
|
||||
try {
|
||||
redisQueueService.updatePriority(queueId, newPriority.getPriority());
|
||||
} catch (Exception e) {
|
||||
log.error("Redis 更新优先级失败: queueId={}", queueId, e);
|
||||
}
|
||||
});
|
||||
}
|
||||
// 事务提交后重算并同步 Redis,避免在事务未提交前读到旧数据
|
||||
if (updated > 0) {
|
||||
triggerQueueRebuildAfterCommit(queueDO.getUserId(), null);
|
||||
}
|
||||
|
||||
log.info("优先级已调整: queueId={}, opsOrderId={}, oldPriority={}, newPriority={}, reason={}, newScore={}",
|
||||
queueId, queueDO.getOpsOrderId(), oldPriority, newPriority, reason, newQueueScore);
|
||||
@@ -487,38 +494,58 @@ public class OrderQueueServiceEnhanced implements OrderQueueService {
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<OrderQueueDTO> getWaitingTasksByUserIdFromDb(Long userId) {
|
||||
// 直接从 MySQL 获取所有任务
|
||||
List<OpsOrderQueueDO> mysqlList = orderQueueMapper.selectListByUserId(userId);
|
||||
List<OrderQueueDTO> allTasks = convertToDTO(mysqlList);
|
||||
|
||||
// 过滤出 WAITING 状态的任务,并按队列分数排序
|
||||
return filterAndSortWaitingTasks(allTasks);
|
||||
}
|
||||
|
||||
private List<OrderQueueDTO> filterAndSortWaitingTasks(List<OrderQueueDTO> allTasks) {
|
||||
return allTasks.stream()
|
||||
.filter(task -> OrderQueueStatusEnum.WAITING.getStatus().equals(task.getQueueStatus()))
|
||||
.sorted((a, b) -> {
|
||||
// 优先使用队列分数 score 排序(已在入队时计算好)
|
||||
if (a.getQueueScore() != null && b.getQueueScore() != null) {
|
||||
return Double.compare(a.getQueueScore(), b.getQueueScore());
|
||||
}
|
||||
|
||||
// 兜底:如果 score 为空,则按优先级+时间排序
|
||||
// 按优先级排序(P0 > P1 > P2 > P3)
|
||||
int priorityCompare = Integer.compare(
|
||||
b.getPriority() != null ? b.getPriority() : 999,
|
||||
a.getPriority() != null ? a.getPriority() : 999
|
||||
);
|
||||
if (priorityCompare != 0) {
|
||||
return priorityCompare;
|
||||
}
|
||||
// 优先级相同,按入队时间排序
|
||||
return a.getEnqueueTime().compareTo(b.getEnqueueTime());
|
||||
})
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
public List<OrderQueueDTO> getWaitingTasksByUserIdFromDb(Long userId) {
|
||||
return rebuildWaitingTasksByUserId(userId, null);
|
||||
}
|
||||
|
||||
@Override
|
||||
@Transactional(rollbackFor = Exception.class)
|
||||
public List<OrderQueueDTO> rebuildWaitingTasksByUserId(Long userId, Long fallbackAreaId) {
|
||||
List<OpsOrderQueueDO> waitingQueues = orderQueueMapper.selectListByUserIdAndStatus(
|
||||
userId, OrderQueueStatusEnum.WAITING.getStatus());
|
||||
if (waitingQueues == null || waitingQueues.isEmpty()) {
|
||||
return Collections.emptyList();
|
||||
}
|
||||
|
||||
Long baselineAreaId = resolveBaselineAreaId(userId, fallbackAreaId);
|
||||
Integer baseFloorNo = resolveFloorNo(baselineAreaId);
|
||||
LocalDateTime now = LocalDateTime.now();
|
||||
|
||||
List<OrderQueueDTO> rebuiltTasks = new ArrayList<>(waitingQueues.size());
|
||||
for (OpsOrderQueueDO queueDO : waitingQueues) {
|
||||
OrderQueueDTO dto = convertToDTO(queueDO);
|
||||
Integer targetFloorNo = resolveFloorNo(resolveOrderAreaId(queueDO.getOpsOrderId()));
|
||||
QueueScoreResult result = queueScoreCalculator.calculate(QueueScoreContext.builder()
|
||||
.priority(queueDO.getPriority())
|
||||
.baseFloorNo(baseFloorNo)
|
||||
.targetFloorNo(targetFloorNo)
|
||||
.enqueueTime(queueDO.getEnqueueTime())
|
||||
.now(now)
|
||||
.build());
|
||||
|
||||
queueDO.setQueueScore(result.getTotalScore());
|
||||
orderQueueMapper.updateById(queueDO);
|
||||
|
||||
dto.setQueueScore(result.getTotalScore());
|
||||
dto.setBaseFloorNo(result.getBaseFloorNo());
|
||||
dto.setTargetFloorNo(result.getTargetFloorNo());
|
||||
dto.setFloorDiff(result.getFloorDiff());
|
||||
dto.setWaitMinutes(result.getWaitMinutes());
|
||||
dto.setScoreUpdateTime(now);
|
||||
rebuiltTasks.add(dto);
|
||||
}
|
||||
|
||||
rebuiltTasks.sort(this::compareByDynamicScore);
|
||||
syncUserQueueToRedis(userId, rebuiltTasks);
|
||||
return rebuiltTasks;
|
||||
}
|
||||
|
||||
private List<OrderQueueDTO> filterAndSortWaitingTasks(List<OrderQueueDTO> allTasks) {
|
||||
return allTasks.stream()
|
||||
.filter(task -> OrderQueueStatusEnum.WAITING.getStatus().equals(task.getQueueStatus()))
|
||||
.sorted(this::compareByDynamicScore)
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<OrderQueueDTO> getInterruptedTasksByUserId(Long userId) {
|
||||
@@ -620,32 +647,9 @@ public class OrderQueueServiceEnhanced implements OrderQueueService {
|
||||
|
||||
// ========== 私有方法 ==========
|
||||
|
||||
/**
|
||||
* 计算队列分数(用于排序)
|
||||
* 公式:优先级分数 + 时间戳
|
||||
*
|
||||
* @param priority 优先级(0=P0, 1=P1, 2=P2, 3=P3)
|
||||
* @param enqueueTime 入队时间
|
||||
* @return 队列分数
|
||||
*/
|
||||
private double calculateQueueScore(Integer priority, LocalDateTime enqueueTime) {
|
||||
// 获取优先级分数
|
||||
long priorityScore = PRIORITY_SCORES.getOrDefault(priority, 3000000L);
|
||||
|
||||
// 计算时间戳(秒级)
|
||||
long timestamp;
|
||||
if (enqueueTime != null) {
|
||||
timestamp = enqueueTime.atZone(ZoneId.systemDefault()).toEpochSecond();
|
||||
} else {
|
||||
timestamp = System.currentTimeMillis() / 1000;
|
||||
}
|
||||
|
||||
return priorityScore + timestamp;
|
||||
}
|
||||
|
||||
/**
|
||||
* 计算下一个队列顺序号
|
||||
*/
|
||||
/**
|
||||
* 计算下一个队列顺序号
|
||||
*/
|
||||
private Integer calculateNextQueueIndex(PriorityEnum priority) {
|
||||
// TODO: 查询当前优先级的最大 queueIndex,然后 +1
|
||||
// 这里简化处理,返回默认值
|
||||
@@ -709,9 +713,106 @@ public class OrderQueueServiceEnhanced implements OrderQueueService {
|
||||
/**
|
||||
* 批量转换为 DTO
|
||||
*/
|
||||
private List<OrderQueueDTO> convertToDTO(List<OpsOrderQueueDO> list) {
|
||||
return list.stream()
|
||||
.map(this::convertToDTO)
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
}
|
||||
private List<OrderQueueDTO> convertToDTO(List<OpsOrderQueueDO> list) {
|
||||
return list.stream()
|
||||
.map(this::convertToDTO)
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
|
||||
private int compareByDynamicScore(OrderQueueDTO a, OrderQueueDTO b) {
|
||||
int scoreCompare = Double.compare(
|
||||
a.getQueueScore() != null ? a.getQueueScore() : Double.MAX_VALUE,
|
||||
b.getQueueScore() != null ? b.getQueueScore() : Double.MAX_VALUE
|
||||
);
|
||||
if (scoreCompare != 0) {
|
||||
return scoreCompare;
|
||||
}
|
||||
if (a.getEnqueueTime() != null && b.getEnqueueTime() != null) {
|
||||
int enqueueCompare = a.getEnqueueTime().compareTo(b.getEnqueueTime());
|
||||
if (enqueueCompare != 0) {
|
||||
return enqueueCompare;
|
||||
}
|
||||
}
|
||||
return Long.compare(
|
||||
a.getId() != null ? a.getId() : Long.MAX_VALUE,
|
||||
b.getId() != null ? b.getId() : Long.MAX_VALUE
|
||||
);
|
||||
}
|
||||
|
||||
private Long resolveBaselineAreaId(Long userId, Long fallbackAreaId) {
|
||||
OpsOrderQueueDO processingQueue = orderQueueMapper.selectCurrentExecutingByUserId(userId);
|
||||
if (processingQueue != null) {
|
||||
Long processingAreaId = resolveOrderAreaId(processingQueue.getOpsOrderId());
|
||||
if (processingAreaId != null) {
|
||||
return processingAreaId;
|
||||
}
|
||||
}
|
||||
return fallbackAreaId;
|
||||
}
|
||||
|
||||
private Long resolveOrderAreaId(Long orderId) {
|
||||
OpsOrderDO order = orderMapper.selectById(orderId);
|
||||
return order != null ? order.getAreaId() : null;
|
||||
}
|
||||
|
||||
private Integer resolveFloorNo(Long areaId) {
|
||||
if (areaId == null) {
|
||||
return null;
|
||||
}
|
||||
OpsBusAreaDO area = areaMapper.selectById(areaId);
|
||||
return area != null ? area.getFloorNo() : null;
|
||||
}
|
||||
|
||||
private void syncUserQueueToRedis(Long userId, List<OrderQueueDTO> rebuiltWaitingTasks) {
|
||||
List<OpsOrderQueueDO> queues = orderQueueMapper.selectListByUserId(userId);
|
||||
if (queues == null || queues.isEmpty()) {
|
||||
redisQueueService.clearQueue(userId);
|
||||
return;
|
||||
}
|
||||
|
||||
Map<Long, OrderQueueDTO> rebuiltTaskMap = rebuiltWaitingTasks == null
|
||||
? Collections.emptyMap()
|
||||
: rebuiltWaitingTasks.stream()
|
||||
.filter(dto -> dto.getId() != null)
|
||||
.collect(Collectors.toMap(OrderQueueDTO::getId, dto -> dto));
|
||||
|
||||
List<OrderQueueDTO> queueDTOs = queues.stream()
|
||||
.map(this::convertToDTO)
|
||||
.map(dto -> rebuiltTaskMap.getOrDefault(dto.getId(), dto))
|
||||
.sorted((a, b) -> {
|
||||
if (Objects.equals(a.getUserId(), b.getUserId())) {
|
||||
return compareByDynamicScore(a, b);
|
||||
}
|
||||
return Long.compare(
|
||||
a.getUserId() != null ? a.getUserId() : Long.MAX_VALUE,
|
||||
b.getUserId() != null ? b.getUserId() : Long.MAX_VALUE
|
||||
);
|
||||
})
|
||||
.collect(Collectors.toList());
|
||||
|
||||
redisQueueService.clearQueue(userId);
|
||||
redisQueueService.batchEnqueue(queueDTOs);
|
||||
}
|
||||
|
||||
private void triggerQueueRebuildAfterCommit(Long userId, Long fallbackAreaId) {
|
||||
Runnable rebuildAction = () -> {
|
||||
try {
|
||||
rebuildWaitingTasksByUserId(userId, fallbackAreaId);
|
||||
} catch (Exception e) {
|
||||
log.error("等待队列重算失败: userId={}", userId, e);
|
||||
}
|
||||
};
|
||||
|
||||
if (!TransactionSynchronizationManager.isSynchronizationActive()) {
|
||||
rebuildAction.run();
|
||||
return;
|
||||
}
|
||||
|
||||
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() {
|
||||
@Override
|
||||
public void afterCommit() {
|
||||
rebuildAction.run();
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,49 @@
|
||||
package com.viewsh.module.ops.service.queue;
|
||||
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.time.LocalDateTime;
|
||||
|
||||
@Component
|
||||
public class QueueScoreCalculator {
|
||||
|
||||
static final int PRIORITY_WEIGHT = 1500;
|
||||
static final int FLOOR_WEIGHT = 20;
|
||||
static final int AGING_WEIGHT = 5;
|
||||
static final int MAX_FLOOR_DIFF = 10;
|
||||
static final int MAX_AGING_MINUTES = 240;
|
||||
|
||||
public QueueScoreResult calculate(QueueScoreContext context) {
|
||||
LocalDateTime now = context.getNow() != null ? context.getNow() : LocalDateTime.now();
|
||||
int priorityRank = context.getPriority() != null ? context.getPriority() : 3;
|
||||
|
||||
Integer baseFloorNo = context.getBaseFloorNo();
|
||||
Integer targetFloorNo = context.getTargetFloorNo();
|
||||
Integer floorDiff = null;
|
||||
int floorDiffScore = 0;
|
||||
if (baseFloorNo != null && targetFloorNo != null) {
|
||||
floorDiff = Math.abs(targetFloorNo - baseFloorNo);
|
||||
floorDiffScore = Math.min(floorDiff, MAX_FLOOR_DIFF) * FLOOR_WEIGHT;
|
||||
} else if (baseFloorNo != null) {
|
||||
floorDiffScore = MAX_FLOOR_DIFF * FLOOR_WEIGHT;
|
||||
}
|
||||
|
||||
long waitMinutes = 0;
|
||||
if (context.getEnqueueTime() != null) {
|
||||
waitMinutes = Math.max(0, Duration.between(context.getEnqueueTime(), now).toMinutes());
|
||||
}
|
||||
long agingScore = (long) Math.min(waitMinutes, MAX_AGING_MINUTES) * AGING_WEIGHT;
|
||||
|
||||
long priorityScore = (long) priorityRank * PRIORITY_WEIGHT;
|
||||
double totalScore = priorityScore + floorDiffScore - agingScore;
|
||||
|
||||
return QueueScoreResult.builder()
|
||||
.totalScore(totalScore)
|
||||
.baseFloorNo(baseFloorNo)
|
||||
.targetFloorNo(targetFloorNo)
|
||||
.floorDiff(floorDiff)
|
||||
.waitMinutes(waitMinutes)
|
||||
.build();
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,21 @@
|
||||
package com.viewsh.module.ops.service.queue;
|
||||
|
||||
import lombok.Builder;
|
||||
import lombok.Data;
|
||||
|
||||
import java.time.LocalDateTime;
|
||||
|
||||
@Data
|
||||
@Builder
|
||||
public class QueueScoreContext {
|
||||
|
||||
private Integer priority;
|
||||
|
||||
private Integer baseFloorNo;
|
||||
|
||||
private Integer targetFloorNo;
|
||||
|
||||
private LocalDateTime enqueueTime;
|
||||
|
||||
private LocalDateTime now;
|
||||
}
|
||||
@@ -0,0 +1,19 @@
|
||||
package com.viewsh.module.ops.service.queue;
|
||||
|
||||
import lombok.Builder;
|
||||
import lombok.Data;
|
||||
|
||||
@Data
|
||||
@Builder
|
||||
public class QueueScoreResult {
|
||||
|
||||
private double totalScore;
|
||||
|
||||
private Integer baseFloorNo;
|
||||
|
||||
private Integer targetFloorNo;
|
||||
|
||||
private Integer floorDiff;
|
||||
|
||||
private long waitMinutes;
|
||||
}
|
||||
@@ -232,6 +232,7 @@ public class QueueSyncService {
|
||||
dto.setUserId(queueDO.getUserId());
|
||||
dto.setQueueIndex(queueDO.getQueueIndex());
|
||||
dto.setPriority(queueDO.getPriority());
|
||||
dto.setQueueScore(queueDO.getQueueScore());
|
||||
dto.setQueueStatus(queueDO.getQueueStatus());
|
||||
dto.setEnqueueTime(queueDO.getEnqueueTime());
|
||||
dto.setDequeueTime(queueDO.getDequeueTime());
|
||||
|
||||
@@ -9,7 +9,6 @@ import org.springframework.data.redis.core.script.DefaultRedisScript;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import java.time.LocalDateTime;
|
||||
import java.time.ZoneId;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.stream.Collectors;
|
||||
@@ -27,27 +26,14 @@ public class RedisOrderQueueServiceImpl implements RedisOrderQueueService {
|
||||
@Resource
|
||||
private StringRedisTemplate stringRedisTemplate;
|
||||
|
||||
/**
|
||||
* Score 计算公式:优先级分数 + 时间戳
|
||||
* 优先级分数:P0=0, P1=1000000, P2=2000000, P3=3000000
|
||||
* 时间戳:毫秒级时间戳
|
||||
* 结果:优先级高的排在前面,同优先级按时间排序
|
||||
*/
|
||||
private static final Map<Integer, Long> PRIORITY_SCORES = Map.of(
|
||||
0, 0L, // P0: 0
|
||||
1, 1000000L, // P1: 1,000,000
|
||||
2, 2000000L, // P2: 2,000,000
|
||||
3, 3000000L // P3: 3,000,000
|
||||
);
|
||||
|
||||
@Override
|
||||
public boolean enqueue(OrderQueueDTO dto) {
|
||||
try {
|
||||
String queueKey = QUEUE_KEY_PREFIX + dto.getUserId();
|
||||
String infoKey = INFO_KEY_PREFIX + dto.getId();
|
||||
|
||||
// 1. 计算分数(优先级 + 时间戳)
|
||||
double score = calculateScore(dto.getPriority(), dto.getEnqueueTime());
|
||||
// Redis 仅持久化服务层已计算好的最终总分,不再本地兜底重算
|
||||
double score = requireQueueScore(dto, "enqueue");
|
||||
dto.setQueueScore(score);
|
||||
|
||||
// 2. 添加到 Sorted Set(使用 queueId 作为 member,而非 JSON)
|
||||
@@ -83,8 +69,8 @@ public class RedisOrderQueueServiceImpl implements RedisOrderQueueService {
|
||||
byte[] queueKey = (QUEUE_KEY_PREFIX + dto.getUserId()).getBytes();
|
||||
byte[] infoKey = (INFO_KEY_PREFIX + dto.getId()).getBytes();
|
||||
|
||||
// 计算分数并设置到 DTO
|
||||
double score = calculateScore(dto.getPriority(), dto.getEnqueueTime());
|
||||
// Redis 仅持久化服务层已计算好的最终总分,不再本地兜底重算
|
||||
double score = requireQueueScore(dto, "batchEnqueue");
|
||||
dto.setQueueScore(score);
|
||||
|
||||
// 添加到 Sorted Set(使用 queueId 作为 member)
|
||||
@@ -214,9 +200,17 @@ public class RedisOrderQueueServiceImpl implements RedisOrderQueueService {
|
||||
public long clearQueue(Long cleanerId) {
|
||||
try {
|
||||
String queueKey = QUEUE_KEY_PREFIX + cleanerId;
|
||||
Set<String> queueIds = stringRedisTemplate.opsForZSet().range(queueKey, 0, -1);
|
||||
if (queueIds != null && !queueIds.isEmpty()) {
|
||||
List<String> infoKeys = queueIds.stream()
|
||||
.map(queueId -> INFO_KEY_PREFIX + queueId)
|
||||
.collect(Collectors.toList());
|
||||
stringRedisTemplate.delete(infoKeys);
|
||||
}
|
||||
stringRedisTemplate.delete(queueKey);
|
||||
log.info("Redis 清空队列成功: cleanerId={}", cleanerId);
|
||||
return 0; // Redis 不返回删除数量
|
||||
long removedCount = queueIds != null ? queueIds.size() : 0;
|
||||
log.info("Redis 清空队列成功: cleanerId={}, removedCount={}", cleanerId, removedCount);
|
||||
return removedCount;
|
||||
} catch (Exception e) {
|
||||
log.error("Redis 清空队列失败: cleanerId={}", cleanerId, e);
|
||||
return 0;
|
||||
@@ -326,7 +320,7 @@ public class RedisOrderQueueServiceImpl implements RedisOrderQueueService {
|
||||
|
||||
String queueKey = QUEUE_KEY_PREFIX + dto.getUserId();
|
||||
dto.setPriority(newPriority);
|
||||
double newScore = calculateScore(newPriority, dto.getEnqueueTime());
|
||||
double newScore = requireQueueScore(dto, "updatePriority");
|
||||
|
||||
// 使用 Lua 脚本原子性更新 Hash 和 Sorted Set
|
||||
String script =
|
||||
@@ -451,7 +445,15 @@ public class RedisOrderQueueServiceImpl implements RedisOrderQueueService {
|
||||
if (opsOrderIdObj != null) {
|
||||
Long opsOrderId = Long.parseLong(opsOrderIdObj.toString());
|
||||
if (opsOrderId.equals(orderId)) {
|
||||
return mapToDto(infoMap);
|
||||
OrderQueueDTO dto = mapToDto(infoMap);
|
||||
if (dto == null || dto.getUserId() == null || dto.getId() == null) {
|
||||
continue;
|
||||
}
|
||||
String queueKey = QUEUE_KEY_PREFIX + dto.getUserId();
|
||||
Double score = stringRedisTemplate.opsForZSet().score(queueKey, dto.getId().toString());
|
||||
if (score != null) {
|
||||
return dto;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -532,23 +534,6 @@ public class RedisOrderQueueServiceImpl implements RedisOrderQueueService {
|
||||
|
||||
// ========== 私有方法 ==========
|
||||
|
||||
/**
|
||||
* 计算分数(优先级 + 时间戳)
|
||||
*/
|
||||
private double calculateScore(Integer priority, LocalDateTime enqueueTime) {
|
||||
long priorityScore = PRIORITY_SCORES.getOrDefault(priority, 3000000L);
|
||||
long timestamp;
|
||||
if (enqueueTime != null) {
|
||||
timestamp = enqueueTime
|
||||
.atZone(ZoneId.systemDefault())
|
||||
.toEpochSecond();
|
||||
} else {
|
||||
timestamp = System.currentTimeMillis() / 1000;
|
||||
}
|
||||
|
||||
return priorityScore + timestamp;
|
||||
}
|
||||
|
||||
/**
|
||||
* 将 DTO 转换为 Map(用于 Hash 存储,所有值显式转 String,确保跨路径序列化一致)
|
||||
*/
|
||||
@@ -566,6 +551,21 @@ public class RedisOrderQueueServiceImpl implements RedisOrderQueueService {
|
||||
if (dto.getEnqueueTime() != null) {
|
||||
map.put("enqueueTime", dto.getEnqueueTime().toString());
|
||||
}
|
||||
if (dto.getBaseFloorNo() != null) {
|
||||
map.put("baseFloorNo", String.valueOf(dto.getBaseFloorNo()));
|
||||
}
|
||||
if (dto.getTargetFloorNo() != null) {
|
||||
map.put("targetFloorNo", String.valueOf(dto.getTargetFloorNo()));
|
||||
}
|
||||
if (dto.getFloorDiff() != null) {
|
||||
map.put("floorDiff", String.valueOf(dto.getFloorDiff()));
|
||||
}
|
||||
if (dto.getWaitMinutes() != null) {
|
||||
map.put("waitMinutes", String.valueOf(dto.getWaitMinutes()));
|
||||
}
|
||||
if (dto.getScoreUpdateTime() != null) {
|
||||
map.put("scoreUpdateTime", dto.getScoreUpdateTime().toString());
|
||||
}
|
||||
|
||||
return map;
|
||||
}
|
||||
@@ -587,6 +587,21 @@ public class RedisOrderQueueServiceImpl implements RedisOrderQueueService {
|
||||
if (dto.getEnqueueTime() != null) {
|
||||
map.put("enqueueTime".getBytes(), dto.getEnqueueTime().toString().getBytes());
|
||||
}
|
||||
if (dto.getBaseFloorNo() != null) {
|
||||
map.put("baseFloorNo".getBytes(), String.valueOf(dto.getBaseFloorNo()).getBytes());
|
||||
}
|
||||
if (dto.getTargetFloorNo() != null) {
|
||||
map.put("targetFloorNo".getBytes(), String.valueOf(dto.getTargetFloorNo()).getBytes());
|
||||
}
|
||||
if (dto.getFloorDiff() != null) {
|
||||
map.put("floorDiff".getBytes(), String.valueOf(dto.getFloorDiff()).getBytes());
|
||||
}
|
||||
if (dto.getWaitMinutes() != null) {
|
||||
map.put("waitMinutes".getBytes(), String.valueOf(dto.getWaitMinutes()).getBytes());
|
||||
}
|
||||
if (dto.getScoreUpdateTime() != null) {
|
||||
map.put("scoreUpdateTime".getBytes(), dto.getScoreUpdateTime().toString().getBytes());
|
||||
}
|
||||
|
||||
return map;
|
||||
}
|
||||
@@ -618,14 +633,9 @@ public class RedisOrderQueueServiceImpl implements RedisOrderQueueService {
|
||||
if (queueScoreObj != null) {
|
||||
dto.setQueueScore(Double.parseDouble(queueScoreObj.toString()));
|
||||
} else {
|
||||
// 如果没有存储,则根据 priority 和 enqueueTime 计算
|
||||
Integer priority = dto.getPriority();
|
||||
LocalDateTime enqueueTime = null;
|
||||
Object enqueueTimeObj = map.get("enqueueTime");
|
||||
if (enqueueTimeObj != null) {
|
||||
enqueueTime = LocalDateTime.parse(enqueueTimeObj.toString());
|
||||
}
|
||||
dto.setQueueScore(calculateScore(priority, enqueueTime));
|
||||
// 历史数据兼容:没有总分时将其视为最低优先级,避免旧模型再次参与排序
|
||||
log.warn("Redis 队列记录缺少 queueScore,按最低优先级处理: queueId={}", dto.getId());
|
||||
dto.setQueueScore(Double.MAX_VALUE);
|
||||
}
|
||||
|
||||
// 字符串转换为 LocalDateTime
|
||||
@@ -633,6 +643,26 @@ public class RedisOrderQueueServiceImpl implements RedisOrderQueueService {
|
||||
if (enqueueTimeObj != null) {
|
||||
dto.setEnqueueTime(LocalDateTime.parse(enqueueTimeObj.toString()));
|
||||
}
|
||||
Object baseFloorNoObj = map.get("baseFloorNo");
|
||||
if (baseFloorNoObj != null) {
|
||||
dto.setBaseFloorNo(Integer.parseInt(baseFloorNoObj.toString()));
|
||||
}
|
||||
Object targetFloorNoObj = map.get("targetFloorNo");
|
||||
if (targetFloorNoObj != null) {
|
||||
dto.setTargetFloorNo(Integer.parseInt(targetFloorNoObj.toString()));
|
||||
}
|
||||
Object floorDiffObj = map.get("floorDiff");
|
||||
if (floorDiffObj != null) {
|
||||
dto.setFloorDiff(Integer.parseInt(floorDiffObj.toString()));
|
||||
}
|
||||
Object waitMinutesObj = map.get("waitMinutes");
|
||||
if (waitMinutesObj != null) {
|
||||
dto.setWaitMinutes(Long.parseLong(waitMinutesObj.toString()));
|
||||
}
|
||||
Object scoreUpdateTimeObj = map.get("scoreUpdateTime");
|
||||
if (scoreUpdateTimeObj != null) {
|
||||
dto.setScoreUpdateTime(LocalDateTime.parse(scoreUpdateTimeObj.toString()));
|
||||
}
|
||||
|
||||
return dto;
|
||||
} catch (Exception e) {
|
||||
@@ -640,4 +670,11 @@ public class RedisOrderQueueServiceImpl implements RedisOrderQueueService {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
private double requireQueueScore(OrderQueueDTO dto, String operation) {
|
||||
if (dto == null || dto.getQueueScore() == null) {
|
||||
throw new IllegalArgumentException("queueScore is required for Redis " + operation);
|
||||
}
|
||||
return dto.getQueueScore();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,103 @@
|
||||
package com.viewsh.module.ops.service.queue;
|
||||
|
||||
import com.viewsh.module.ops.api.queue.OrderQueueDTO;
|
||||
import com.viewsh.module.ops.dal.dataobject.area.OpsBusAreaDO;
|
||||
import com.viewsh.module.ops.dal.dataobject.queue.OpsOrderQueueDO;
|
||||
import com.viewsh.module.ops.dal.dataobject.workorder.OpsOrderDO;
|
||||
import com.viewsh.module.ops.dal.mysql.area.OpsBusAreaMapper;
|
||||
import com.viewsh.module.ops.dal.mysql.queue.OpsOrderQueueMapper;
|
||||
import com.viewsh.module.ops.dal.mysql.workorder.OpsOrderMapper;
|
||||
import com.viewsh.module.ops.enums.OrderQueueStatusEnum;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.extension.ExtendWith;
|
||||
import org.mockito.InjectMocks;
|
||||
import org.mockito.Mock;
|
||||
import org.mockito.Spy;
|
||||
import org.mockito.junit.jupiter.MockitoExtension;
|
||||
|
||||
import java.time.LocalDateTime;
|
||||
import java.util.List;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.Mockito.times;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
@ExtendWith(MockitoExtension.class)
|
||||
class OrderQueueServiceEnhancedTest {
|
||||
|
||||
@Mock
|
||||
private OpsOrderQueueMapper orderQueueMapper;
|
||||
@Mock
|
||||
private OpsOrderMapper orderMapper;
|
||||
@Mock
|
||||
private OpsBusAreaMapper areaMapper;
|
||||
@Mock
|
||||
private RedisOrderQueueService redisQueueService;
|
||||
@Mock
|
||||
private QueueSyncService queueSyncService;
|
||||
@Spy
|
||||
private QueueScoreCalculator queueScoreCalculator = new QueueScoreCalculator();
|
||||
|
||||
@InjectMocks
|
||||
private OrderQueueServiceEnhanced orderQueueService;
|
||||
|
||||
@Test
|
||||
void shouldRebuildWaitingTasksAndAvoidStarvation() {
|
||||
LocalDateTime now = LocalDateTime.now();
|
||||
Long userId = 2001L;
|
||||
|
||||
OpsOrderQueueDO olderFarTask = OpsOrderQueueDO.builder()
|
||||
.id(11L)
|
||||
.userId(userId)
|
||||
.opsOrderId(101L)
|
||||
.priority(1)
|
||||
.queueScore(0D)
|
||||
.queueStatus(OrderQueueStatusEnum.WAITING.getStatus())
|
||||
.enqueueTime(now.minusMinutes(80))
|
||||
.build();
|
||||
OpsOrderQueueDO newerNearTask = OpsOrderQueueDO.builder()
|
||||
.id(12L)
|
||||
.userId(userId)
|
||||
.opsOrderId(102L)
|
||||
.priority(1)
|
||||
.queueScore(0D)
|
||||
.queueStatus(OrderQueueStatusEnum.WAITING.getStatus())
|
||||
.enqueueTime(now.minusMinutes(5))
|
||||
.build();
|
||||
OpsOrderQueueDO currentTask = OpsOrderQueueDO.builder()
|
||||
.id(13L)
|
||||
.userId(userId)
|
||||
.opsOrderId(900L)
|
||||
.queueStatus(OrderQueueStatusEnum.PROCESSING.getStatus())
|
||||
.build();
|
||||
|
||||
when(orderQueueMapper.selectListByUserIdAndStatus(userId, OrderQueueStatusEnum.WAITING.getStatus()))
|
||||
.thenReturn(List.of(olderFarTask, newerNearTask));
|
||||
when(orderQueueMapper.selectCurrentExecutingByUserId(userId)).thenReturn(currentTask);
|
||||
when(orderQueueMapper.selectListByUserId(userId)).thenReturn(List.of(olderFarTask, newerNearTask, currentTask));
|
||||
when(orderMapper.selectById(900L)).thenReturn(OpsOrderDO.builder().id(900L).areaId(501L).build());
|
||||
when(orderMapper.selectById(101L)).thenReturn(OpsOrderDO.builder().id(101L).areaId(503L).build());
|
||||
when(orderMapper.selectById(102L)).thenReturn(OpsOrderDO.builder().id(102L).areaId(502L).build());
|
||||
when(areaMapper.selectById(501L)).thenReturn(OpsBusAreaDO.builder().id(501L).floorNo(5).build());
|
||||
when(areaMapper.selectById(502L)).thenReturn(OpsBusAreaDO.builder().id(502L).floorNo(6).build());
|
||||
when(areaMapper.selectById(503L)).thenReturn(OpsBusAreaDO.builder().id(503L).floorNo(8).build());
|
||||
when(orderQueueMapper.updateById(any(OpsOrderQueueDO.class))).thenReturn(1);
|
||||
|
||||
List<OrderQueueDTO> rebuiltTasks = orderQueueService.rebuildWaitingTasksByUserId(userId, null);
|
||||
|
||||
assertEquals(2, rebuiltTasks.size());
|
||||
assertEquals(101L, rebuiltTasks.get(0).getOpsOrderId());
|
||||
assertEquals(3, rebuiltTasks.get(0).getFloorDiff());
|
||||
assertTrue(rebuiltTasks.get(0).getWaitMinutes() >= 79);
|
||||
assertEquals(102L, rebuiltTasks.get(1).getOpsOrderId());
|
||||
assertEquals(1, rebuiltTasks.get(1).getFloorDiff());
|
||||
assertTrue(rebuiltTasks.get(0).getQueueScore() < rebuiltTasks.get(1).getQueueScore());
|
||||
|
||||
verify(orderQueueMapper, times(2)).updateById(any(OpsOrderQueueDO.class));
|
||||
verify(redisQueueService).clearQueue(userId);
|
||||
verify(redisQueueService).batchEnqueue(any());
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,73 @@
|
||||
package com.viewsh.module.ops.service.queue;
|
||||
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.time.LocalDateTime;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
class QueueScoreCalculatorTest {
|
||||
|
||||
private final QueueScoreCalculator calculator = new QueueScoreCalculator();
|
||||
|
||||
@Test
|
||||
void shouldPreferSmallerFloorDiffWhenPrioritySame() {
|
||||
LocalDateTime now = LocalDateTime.now();
|
||||
|
||||
QueueScoreResult near = calculator.calculate(QueueScoreContext.builder()
|
||||
.priority(1)
|
||||
.baseFloorNo(5)
|
||||
.targetFloorNo(6)
|
||||
.enqueueTime(now.minusMinutes(5))
|
||||
.now(now)
|
||||
.build());
|
||||
|
||||
QueueScoreResult far = calculator.calculate(QueueScoreContext.builder()
|
||||
.priority(1)
|
||||
.baseFloorNo(5)
|
||||
.targetFloorNo(9)
|
||||
.enqueueTime(now.minusMinutes(5))
|
||||
.now(now)
|
||||
.build());
|
||||
|
||||
assertTrue(near.getTotalScore() < far.getTotalScore());
|
||||
}
|
||||
|
||||
@Test
|
||||
void shouldAllowOlderOrderToGainAgingAdvantage() {
|
||||
LocalDateTime now = LocalDateTime.now();
|
||||
|
||||
QueueScoreResult older = calculator.calculate(QueueScoreContext.builder()
|
||||
.priority(1)
|
||||
.baseFloorNo(5)
|
||||
.targetFloorNo(8)
|
||||
.enqueueTime(now.minusMinutes(80))
|
||||
.now(now)
|
||||
.build());
|
||||
|
||||
QueueScoreResult newer = calculator.calculate(QueueScoreContext.builder()
|
||||
.priority(1)
|
||||
.baseFloorNo(5)
|
||||
.targetFloorNo(6)
|
||||
.enqueueTime(now.minusMinutes(5))
|
||||
.now(now)
|
||||
.build());
|
||||
|
||||
assertTrue(older.getTotalScore() < newer.getTotalScore());
|
||||
}
|
||||
|
||||
@Test
|
||||
void shouldDegradeGracefullyWhenBaseFloorMissing() {
|
||||
LocalDateTime now = LocalDateTime.now();
|
||||
|
||||
QueueScoreResult result = calculator.calculate(QueueScoreContext.builder()
|
||||
.priority(2)
|
||||
.baseFloorNo(null)
|
||||
.targetFloorNo(6)
|
||||
.enqueueTime(now.minusMinutes(10))
|
||||
.now(now)
|
||||
.build());
|
||||
|
||||
assertTrue(result.getTotalScore() < QueueScoreCalculator.PRIORITY_WEIGHT * 2);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,61 @@
|
||||
package com.viewsh.module.ops.service.queue;
|
||||
|
||||
import com.viewsh.module.ops.api.queue.OrderQueueDTO;
|
||||
import com.viewsh.module.ops.dal.dataobject.queue.OpsOrderQueueDO;
|
||||
import com.viewsh.module.ops.dal.mysql.queue.OpsOrderQueueMapper;
|
||||
import com.viewsh.module.ops.enums.OrderQueueStatusEnum;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.extension.ExtendWith;
|
||||
import org.mockito.ArgumentCaptor;
|
||||
import org.mockito.InjectMocks;
|
||||
import org.mockito.Mock;
|
||||
import org.mockito.junit.jupiter.MockitoExtension;
|
||||
|
||||
import java.time.LocalDateTime;
|
||||
import java.util.List;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertNotNull;
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
@ExtendWith(MockitoExtension.class)
|
||||
class QueueSyncServiceTest {
|
||||
|
||||
@Mock
|
||||
private OpsOrderQueueMapper orderQueueMapper;
|
||||
@Mock
|
||||
private RedisOrderQueueService redisQueueService;
|
||||
|
||||
@InjectMocks
|
||||
private QueueSyncService queueSyncService;
|
||||
|
||||
@Test
|
||||
void shouldCarryPersistedQueueScoreWhenSyncUserQueueToRedis() {
|
||||
Long userId = 1001L;
|
||||
OpsOrderQueueDO queueDO = OpsOrderQueueDO.builder()
|
||||
.id(11L)
|
||||
.opsOrderId(22L)
|
||||
.userId(userId)
|
||||
.queueIndex(1)
|
||||
.priority(0)
|
||||
.queueScore(-120D)
|
||||
.queueStatus(OrderQueueStatusEnum.WAITING.getStatus())
|
||||
.enqueueTime(LocalDateTime.now().minusMinutes(10))
|
||||
.build();
|
||||
|
||||
when(orderQueueMapper.selectList(any())).thenReturn(List.of(queueDO));
|
||||
when(redisQueueService.clearQueue(userId)).thenReturn(1L);
|
||||
when(redisQueueService.batchEnqueue(any())).thenReturn(1L);
|
||||
|
||||
queueSyncService.syncUserQueueToRedis(userId);
|
||||
|
||||
ArgumentCaptor<List<OrderQueueDTO>> captor = ArgumentCaptor.forClass(List.class);
|
||||
verify(redisQueueService).batchEnqueue(captor.capture());
|
||||
List<OrderQueueDTO> syncedTasks = captor.getValue();
|
||||
assertNotNull(syncedTasks);
|
||||
assertEquals(1, syncedTasks.size());
|
||||
assertEquals(-120D, syncedTasks.get(0).getQueueScore());
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user