feat(ops): 新增 OpsRedisKeyBuilder 统一管理 Redis Key 租户隔离
新建 OpsRedisKeyBuilder 集中式工具类,所有 Ops 模块 Redis Key 统一使用
:t{tenantId} 格式实现多租户隔离。迁移以下服务的 Key 构建:
- RedisOrderQueueServiceImpl(派单队列/信息/锁)
- UserDispatchStatusServiceImpl(调度状态)
- BadgeDeviceStatusServiceImpl(工牌状态)
- TrafficActiveOrderRedisDAO(客流活跃工单)
- TtsQueueConsumer(TTS 队列/锁/循环)
- OrderCodeGenerator(工单编码序号)
- AreaDeviceServiceImpl(区域设备配置缓存)
- TrafficStatisticsPersistJob(持久化锁)
- BadgeDeviceStatusRedisDAO(IoT 侧工牌状态)
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -1,6 +1,7 @@
|
||||
package com.viewsh.module.iot.dal.redis.clean;
|
||||
|
||||
import jakarta.annotation.Resource;
|
||||
package com.viewsh.module.iot.dal.redis.clean;
|
||||
|
||||
import com.viewsh.framework.tenant.core.context.TenantContextHolder;
|
||||
import jakarta.annotation.Resource;
|
||||
import lombok.Getter;
|
||||
import lombok.Setter;
|
||||
import org.springframework.data.redis.core.StringRedisTemplate;
|
||||
@@ -29,8 +30,6 @@ public class BadgeDeviceStatusRedisDAO {
|
||||
* <p>
|
||||
* 格式:ops:badge:status:{deviceId}
|
||||
*/
|
||||
private static final String BADGE_STATUS_KEY_PATTERN = "ops:badge:status:%s";
|
||||
|
||||
@Resource
|
||||
private StringRedisTemplate stringRedisTemplate;
|
||||
|
||||
@@ -115,9 +114,9 @@ public class BadgeDeviceStatusRedisDAO {
|
||||
/**
|
||||
* 格式化 Redis Key
|
||||
*/
|
||||
private static String formatKey(Long deviceId) {
|
||||
return String.format(BADGE_STATUS_KEY_PATTERN, deviceId);
|
||||
}
|
||||
private static String formatKey(Long deviceId) {
|
||||
return "ops:badge:status:t" + TenantContextHolder.getRequiredTenantId() + ":" + deviceId;
|
||||
}
|
||||
|
||||
/**
|
||||
* 工单信息(精简 DTO)
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
package com.viewsh.module.ops.environment.dal.redis;
|
||||
|
||||
import com.viewsh.module.ops.infrastructure.redis.OpsRedisKeyBuilder;
|
||||
import jakarta.annotation.Resource;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.data.redis.core.StringRedisTemplate;
|
||||
@@ -12,7 +13,7 @@ import java.util.Map;
|
||||
* <p>
|
||||
* 用于标记某区域是否有客流触发的活跃工单,避免重复创建。
|
||||
* <p>
|
||||
* Redis Key: ops:clean:traffic:active-order:{areaId}
|
||||
* Redis Key: ops:clean:traffic:active-order:t{tenantId}:{areaId}
|
||||
* Value: Hash { orderId, status, priority }
|
||||
* TTL: 无(由终态主动删除)
|
||||
*
|
||||
@@ -22,8 +23,6 @@ import java.util.Map;
|
||||
@Repository
|
||||
public class TrafficActiveOrderRedisDAO {
|
||||
|
||||
private static final String KEY_PATTERN = "ops:clean:traffic:active-order:%s";
|
||||
|
||||
private static final String FIELD_ORDER_ID = "orderId";
|
||||
private static final String FIELD_STATUS = "status";
|
||||
private static final String FIELD_PRIORITY = "priority";
|
||||
@@ -101,6 +100,6 @@ public class TrafficActiveOrderRedisDAO {
|
||||
}
|
||||
|
||||
private String buildKey(Long areaId) {
|
||||
return String.format(KEY_PATTERN, areaId);
|
||||
return OpsRedisKeyBuilder.trafficActiveOrder(areaId);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,11 +1,12 @@
|
||||
package com.viewsh.module.ops.environment.service.badge;
|
||||
|
||||
import com.viewsh.module.ops.api.badge.BadgeDeviceStatusDTO;
|
||||
import com.viewsh.module.ops.dal.dataobject.workorder.OpsOrderDO;
|
||||
import com.viewsh.module.ops.dal.mysql.workorder.OpsOrderMapper;
|
||||
import com.viewsh.module.ops.enums.BadgeDeviceStatusEnum;
|
||||
import com.viewsh.module.ops.enums.WorkOrderStatusEnum;
|
||||
import com.viewsh.module.ops.service.area.AreaDeviceService;
|
||||
import com.viewsh.module.ops.dal.dataobject.workorder.OpsOrderDO;
|
||||
import com.viewsh.module.ops.dal.mysql.workorder.OpsOrderMapper;
|
||||
import com.viewsh.module.ops.enums.BadgeDeviceStatusEnum;
|
||||
import com.viewsh.module.ops.enums.WorkOrderStatusEnum;
|
||||
import com.viewsh.module.ops.infrastructure.redis.OpsRedisKeyBuilder;
|
||||
import com.viewsh.module.ops.service.area.AreaDeviceService;
|
||||
import jakarta.annotation.Resource;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.InitializingBean;
|
||||
@@ -54,8 +55,6 @@ public class BadgeDeviceStatusServiceImpl implements BadgeDeviceStatusService, I
|
||||
/**
|
||||
* Redis Key 前缀
|
||||
*/
|
||||
private static final String BADGE_STATUS_KEY_PREFIX = "ops:badge:status:";
|
||||
|
||||
/**
|
||||
* 状态过期时间(小时)
|
||||
*/
|
||||
@@ -76,7 +75,7 @@ public class BadgeDeviceStatusServiceImpl implements BadgeDeviceStatusService, I
|
||||
}
|
||||
|
||||
try {
|
||||
String key = BADGE_STATUS_KEY_PREFIX + deviceId;
|
||||
String key = OpsRedisKeyBuilder.badgeStatus(deviceId);
|
||||
|
||||
// 获取当前状态
|
||||
BadgeDeviceStatusDTO currentStatus = getBadgeStatus(deviceId);
|
||||
@@ -151,7 +150,7 @@ public class BadgeDeviceStatusServiceImpl implements BadgeDeviceStatusService, I
|
||||
}
|
||||
|
||||
try {
|
||||
String key = BADGE_STATUS_KEY_PREFIX + deviceId;
|
||||
String key = OpsRedisKeyBuilder.badgeStatus(deviceId);
|
||||
Map<Object, Object> map = stringRedisTemplate.opsForHash().entries(key);
|
||||
|
||||
if (map.isEmpty()) {
|
||||
@@ -233,7 +232,7 @@ public class BadgeDeviceStatusServiceImpl implements BadgeDeviceStatusService, I
|
||||
@Override
|
||||
public List<BadgeDeviceStatusDTO> listActiveBadges() {
|
||||
try {
|
||||
Set<String> keys = stringRedisTemplate.keys(BADGE_STATUS_KEY_PREFIX + "*");
|
||||
Set<String> keys = stringRedisTemplate.keys(OpsRedisKeyBuilder.badgeStatusPattern());
|
||||
|
||||
if (keys == null || keys.isEmpty()) {
|
||||
return Collections.emptyList();
|
||||
@@ -241,7 +240,7 @@ public class BadgeDeviceStatusServiceImpl implements BadgeDeviceStatusService, I
|
||||
|
||||
return keys.stream()
|
||||
.map(key -> {
|
||||
String deviceIdStr = key.substring(BADGE_STATUS_KEY_PREFIX.length());
|
||||
String deviceIdStr = extractIdFromKey(key);
|
||||
return getBadgeStatus(Long.parseLong(deviceIdStr));
|
||||
})
|
||||
.filter(Objects::nonNull)
|
||||
@@ -264,7 +263,7 @@ public class BadgeDeviceStatusServiceImpl implements BadgeDeviceStatusService, I
|
||||
}
|
||||
|
||||
try {
|
||||
String key = BADGE_STATUS_KEY_PREFIX + deviceId;
|
||||
String key = OpsRedisKeyBuilder.badgeStatus(deviceId);
|
||||
|
||||
// 获取当前状态
|
||||
Map<Object, Object> currentMap = stringRedisTemplate.opsForHash().entries(key);
|
||||
@@ -376,7 +375,7 @@ public class BadgeDeviceStatusServiceImpl implements BadgeDeviceStatusService, I
|
||||
}
|
||||
|
||||
try {
|
||||
String key = BADGE_STATUS_KEY_PREFIX + deviceId;
|
||||
String key = OpsRedisKeyBuilder.badgeStatus(deviceId);
|
||||
stringRedisTemplate.opsForHash().put(key, "currentOpsOrderId", String.valueOf(orderId));
|
||||
log.debug("设置工牌设备当前工单: deviceId={}, orderId={}", deviceId, orderId);
|
||||
} catch (Exception e) {
|
||||
@@ -391,7 +390,7 @@ public class BadgeDeviceStatusServiceImpl implements BadgeDeviceStatusService, I
|
||||
}
|
||||
|
||||
try {
|
||||
String key = BADGE_STATUS_KEY_PREFIX + deviceId;
|
||||
String key = OpsRedisKeyBuilder.badgeStatus(deviceId);
|
||||
|
||||
// 使用 Redis Pipeline 保证多个字段的原子性设置
|
||||
stringRedisTemplate.opsForHash().put(key, "currentOpsOrderId", String.valueOf(orderId));
|
||||
@@ -422,7 +421,7 @@ public class BadgeDeviceStatusServiceImpl implements BadgeDeviceStatusService, I
|
||||
}
|
||||
|
||||
try {
|
||||
String key = BADGE_STATUS_KEY_PREFIX + deviceId;
|
||||
String key = OpsRedisKeyBuilder.badgeStatus(deviceId);
|
||||
stringRedisTemplate.opsForHash().put(key, "currentOrderStatus", orderStatus);
|
||||
log.debug("更新工单状态: deviceId={}, orderStatus={}", deviceId, orderStatus);
|
||||
} catch (Exception e) {
|
||||
@@ -437,7 +436,7 @@ public class BadgeDeviceStatusServiceImpl implements BadgeDeviceStatusService, I
|
||||
}
|
||||
|
||||
try {
|
||||
String key = BADGE_STATUS_KEY_PREFIX + deviceId;
|
||||
String key = OpsRedisKeyBuilder.badgeStatus(deviceId);
|
||||
// 更新工单状态和信标MAC(orderId 和 areaId 已在 DISPATCHED 时设置,不需要重复)
|
||||
stringRedisTemplate.opsForHash().put(key, "currentOrderStatus", orderStatus);
|
||||
if (beaconMac != null) {
|
||||
@@ -457,7 +456,7 @@ public class BadgeDeviceStatusServiceImpl implements BadgeDeviceStatusService, I
|
||||
}
|
||||
|
||||
try {
|
||||
String key = BADGE_STATUS_KEY_PREFIX + deviceId;
|
||||
String key = OpsRedisKeyBuilder.badgeStatus(deviceId);
|
||||
// 清除工单相关字段:currentOpsOrderId、currentOrderStatus、currentAreaId、currentAreaName、beaconMac
|
||||
stringRedisTemplate.opsForHash().delete(key, "currentOpsOrderId", "currentOrderStatus", "currentAreaId", "currentAreaName", "beaconMac");
|
||||
log.info("清除工牌设备当前工单: deviceId={}", deviceId);
|
||||
@@ -469,7 +468,7 @@ public class BadgeDeviceStatusServiceImpl implements BadgeDeviceStatusService, I
|
||||
@Override
|
||||
public List<BadgeDeviceStatusDTO> listBadgesWithCurrentOrder() {
|
||||
try {
|
||||
Set<String> keys = stringRedisTemplate.keys(BADGE_STATUS_KEY_PREFIX + "*");
|
||||
Set<String> keys = stringRedisTemplate.keys(OpsRedisKeyBuilder.badgeStatusPattern());
|
||||
|
||||
if (keys == null || keys.isEmpty()) {
|
||||
return Collections.emptyList();
|
||||
@@ -477,7 +476,7 @@ public class BadgeDeviceStatusServiceImpl implements BadgeDeviceStatusService, I
|
||||
|
||||
return keys.stream()
|
||||
.map(key -> {
|
||||
String deviceIdStr = key.substring(BADGE_STATUS_KEY_PREFIX.length());
|
||||
String deviceIdStr = extractIdFromKey(key);
|
||||
return getBadgeStatus(Long.parseLong(deviceIdStr));
|
||||
})
|
||||
.filter(Objects::nonNull)
|
||||
@@ -520,7 +519,7 @@ public class BadgeDeviceStatusServiceImpl implements BadgeDeviceStatusService, I
|
||||
|
||||
// 直接写 status 字段,避免 updateBadgeStatus 内部回写已清除的 currentOpsOrderId
|
||||
if (deviceStatus.getStatus() != null && deviceStatus.getStatus() != BadgeDeviceStatusEnum.IDLE) {
|
||||
String key = BADGE_STATUS_KEY_PREFIX + deviceId;
|
||||
String key = OpsRedisKeyBuilder.badgeStatus(deviceId);
|
||||
stringRedisTemplate.opsForHash().put(key, "status", BadgeDeviceStatusEnum.IDLE.getCode());
|
||||
stringRedisTemplate.opsForHash().put(key, "statusChangeTime", LocalDateTime.now().toString());
|
||||
}
|
||||
@@ -539,7 +538,7 @@ public class BadgeDeviceStatusServiceImpl implements BadgeDeviceStatusService, I
|
||||
}
|
||||
|
||||
try {
|
||||
String key = BADGE_STATUS_KEY_PREFIX + deviceId;
|
||||
String key = OpsRedisKeyBuilder.badgeStatus(deviceId);
|
||||
|
||||
Map<String, String> statusMap = new HashMap<>();
|
||||
if (areaId != null) {
|
||||
@@ -568,7 +567,7 @@ public class BadgeDeviceStatusServiceImpl implements BadgeDeviceStatusService, I
|
||||
}
|
||||
|
||||
try {
|
||||
String key = BADGE_STATUS_KEY_PREFIX + deviceId;
|
||||
String key = OpsRedisKeyBuilder.badgeStatus(deviceId);
|
||||
stringRedisTemplate.delete(key);
|
||||
log.info("删除工牌设备状态: deviceId={}", deviceId);
|
||||
} catch (Exception e) {
|
||||
@@ -579,7 +578,7 @@ public class BadgeDeviceStatusServiceImpl implements BadgeDeviceStatusService, I
|
||||
@Override
|
||||
public void clearOfflineBadges() {
|
||||
try {
|
||||
Set<String> keys = stringRedisTemplate.keys(BADGE_STATUS_KEY_PREFIX + "*");
|
||||
Set<String> keys = stringRedisTemplate.keys(OpsRedisKeyBuilder.badgeStatusPattern());
|
||||
|
||||
if (keys == null || keys.isEmpty()) {
|
||||
return;
|
||||
@@ -655,9 +654,9 @@ public class BadgeDeviceStatusServiceImpl implements BadgeDeviceStatusService, I
|
||||
}
|
||||
}
|
||||
|
||||
private Integer getInteger(Object value) {
|
||||
if (value == null) {
|
||||
return null;
|
||||
private Integer getInteger(Object value) {
|
||||
if (value == null) {
|
||||
return null;
|
||||
}
|
||||
if (value instanceof Integer) {
|
||||
return (Integer) value;
|
||||
@@ -666,8 +665,12 @@ public class BadgeDeviceStatusServiceImpl implements BadgeDeviceStatusService, I
|
||||
return Integer.parseInt(value.toString());
|
||||
} catch (Exception e) {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private String extractIdFromKey(String key) {
|
||||
return key.substring(key.lastIndexOf(':') + 1);
|
||||
}
|
||||
|
||||
/**
|
||||
* 工牌设备离线事件
|
||||
|
||||
@@ -20,7 +20,7 @@ import java.util.stream.Collectors;
|
||||
/**
|
||||
* 保洁工牌服务实现
|
||||
* <p>
|
||||
* 工牌状态数据来源:Redis (ops:badge:status:{deviceId})
|
||||
* 工牌状态数据来源:Redis (ops:badge:status:t{tenantId}:{deviceId})
|
||||
* 设备基本信息来源:iot_device 表
|
||||
* 区域关联来源:ops_area_device_relation 表
|
||||
*
|
||||
|
||||
@@ -4,11 +4,12 @@ import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.viewsh.module.iot.api.device.IotDeviceControlApi;
|
||||
import com.viewsh.module.iot.api.device.dto.IotDeviceServiceInvokeReqDTO;
|
||||
import com.viewsh.module.ops.infrastructure.log.enumeration.EventDomain;
|
||||
import com.viewsh.module.ops.infrastructure.log.enumeration.EventLevel;
|
||||
import com.viewsh.module.ops.infrastructure.log.enumeration.LogModule;
|
||||
import com.viewsh.module.ops.infrastructure.log.enumeration.LogType;
|
||||
import com.viewsh.module.ops.infrastructure.log.recorder.EventLogRecord;
|
||||
import com.viewsh.module.ops.infrastructure.log.recorder.EventLogRecorder;
|
||||
import com.viewsh.module.ops.infrastructure.log.enumeration.EventLevel;
|
||||
import com.viewsh.module.ops.infrastructure.log.enumeration.LogModule;
|
||||
import com.viewsh.module.ops.infrastructure.log.enumeration.LogType;
|
||||
import com.viewsh.module.ops.infrastructure.log.recorder.EventLogRecord;
|
||||
import com.viewsh.module.ops.infrastructure.log.recorder.EventLogRecorder;
|
||||
import com.viewsh.module.ops.infrastructure.redis.OpsRedisKeyBuilder;
|
||||
import cn.hutool.core.map.MapUtil;
|
||||
import com.viewsh.framework.tenant.core.context.TenantContextHolder;
|
||||
import jakarta.annotation.PreDestroy;
|
||||
@@ -45,12 +46,6 @@ import java.util.concurrent.TimeUnit;
|
||||
@Service
|
||||
public class TtsQueueConsumer {
|
||||
|
||||
private static final String QUEUE_KEY_PREFIX = "ops:tts:queue:";
|
||||
|
||||
private static final String LOCK_KEY_PREFIX = "ops:tts:lock:";
|
||||
|
||||
private static final String LOOP_KEY_PREFIX = "ops:tts:loop:";
|
||||
|
||||
@Value("${ops.tts.queue.enabled:true}")
|
||||
private boolean queueEnabled;
|
||||
|
||||
@@ -86,8 +81,8 @@ public class TtsQueueConsumer {
|
||||
/**
|
||||
* 获取队列 key
|
||||
*/
|
||||
public String getQueueKey(Long deviceId) {
|
||||
return QUEUE_KEY_PREFIX + deviceId;
|
||||
public String getQueueKey(Long deviceId) {
|
||||
return OpsRedisKeyBuilder.ttsQueue(deviceId);
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -150,7 +145,7 @@ public class TtsQueueConsumer {
|
||||
*/
|
||||
public boolean processSingleQueue(String queueKey) {
|
||||
// 从 key 中提取 deviceId
|
||||
String deviceIdStr = queueKey.substring(QUEUE_KEY_PREFIX.length());
|
||||
String deviceIdStr = extractIdFromKey(queueKey);
|
||||
Long deviceId;
|
||||
try {
|
||||
deviceId = Long.parseLong(deviceIdStr);
|
||||
@@ -159,7 +154,7 @@ public class TtsQueueConsumer {
|
||||
return false;
|
||||
}
|
||||
|
||||
String lockKey = LOCK_KEY_PREFIX + deviceId;
|
||||
String lockKey = OpsRedisKeyBuilder.ttsLock(deviceId);
|
||||
|
||||
try {
|
||||
// 尝试获取播报间隔锁(SETNX + TTL,原子操作)
|
||||
@@ -260,7 +255,7 @@ public class TtsQueueConsumer {
|
||||
|
||||
try {
|
||||
// 获取所有队列 key
|
||||
String pattern = QUEUE_KEY_PREFIX + "*";
|
||||
String pattern = OpsRedisKeyBuilder.ttsQueuePattern();
|
||||
Set<String> keys = redisTemplate.keys(pattern);
|
||||
|
||||
if (keys == null || keys.isEmpty()) {
|
||||
@@ -375,7 +370,7 @@ public class TtsQueueConsumer {
|
||||
* @param orderId 工单ID
|
||||
*/
|
||||
public void startLoop(Long deviceId, Long orderId) {
|
||||
String loopKey = LOOP_KEY_PREFIX + deviceId;
|
||||
String loopKey = OpsRedisKeyBuilder.ttsLoop(deviceId);
|
||||
redisTemplate.opsForValue().set(loopKey, String.valueOf(orderId), 1, TimeUnit.HOURS);
|
||||
log.info("[TTS队列] 启动循环播报: deviceId={}, orderId={}", deviceId, orderId);
|
||||
}
|
||||
@@ -388,10 +383,10 @@ public class TtsQueueConsumer {
|
||||
* @param deviceId 设备ID
|
||||
*/
|
||||
public void stopLoop(Long deviceId) {
|
||||
String loopKey = LOOP_KEY_PREFIX + deviceId;
|
||||
String loopKey = OpsRedisKeyBuilder.ttsLoop(deviceId);
|
||||
Boolean deleted = redisTemplate.delete(loopKey);
|
||||
// 清除播报间隔锁,使后续播报(如地点播报)可以立即发送
|
||||
String lockKey = LOCK_KEY_PREFIX + deviceId;
|
||||
String lockKey = OpsRedisKeyBuilder.ttsLock(deviceId);
|
||||
redisTemplate.delete(lockKey);
|
||||
// 从队列中移除循环消息,保留非循环消息(如取消播报、待办播报等)
|
||||
int removed = removeLoopMessages(deviceId);
|
||||
@@ -456,7 +451,7 @@ public class TtsQueueConsumer {
|
||||
* @return 是否激活
|
||||
*/
|
||||
public boolean isLoopActive(Long deviceId) {
|
||||
String loopKey = LOOP_KEY_PREFIX + deviceId;
|
||||
String loopKey = OpsRedisKeyBuilder.ttsLoop(deviceId);
|
||||
return Boolean.TRUE.equals(redisTemplate.hasKey(loopKey));
|
||||
}
|
||||
|
||||
@@ -485,7 +480,7 @@ public class TtsQueueConsumer {
|
||||
* 清空所有队列
|
||||
*/
|
||||
public void clearAllQueues() {
|
||||
String pattern = QUEUE_KEY_PREFIX + "*";
|
||||
String pattern = OpsRedisKeyBuilder.ttsQueuePattern();
|
||||
Set<String> keys = redisTemplate.keys(pattern);
|
||||
if (keys != null && !keys.isEmpty()) {
|
||||
redisTemplate.delete(keys);
|
||||
@@ -500,14 +495,14 @@ public class TtsQueueConsumer {
|
||||
public Map<String, Object> getQueueStatus() {
|
||||
Map<String, Object> status = new ConcurrentHashMap<>();
|
||||
|
||||
String pattern = QUEUE_KEY_PREFIX + "*";
|
||||
String pattern = OpsRedisKeyBuilder.ttsQueuePattern();
|
||||
Set<String> keys = redisTemplate.keys(pattern);
|
||||
|
||||
if (keys != null && !keys.isEmpty()) {
|
||||
for (String key : keys) {
|
||||
String deviceIdStr = key.substring(QUEUE_KEY_PREFIX.length());
|
||||
Long size = redisTemplate.opsForList().size(key);
|
||||
status.put(deviceIdStr, size != null ? size : 0);
|
||||
String deviceIdStr = extractIdFromKey(key);
|
||||
Long size = redisTemplate.opsForList().size(key);
|
||||
status.put(deviceIdStr, size != null ? size : 0);
|
||||
}
|
||||
status.put("totalDevices", keys.size());
|
||||
} else {
|
||||
@@ -523,9 +518,13 @@ public class TtsQueueConsumer {
|
||||
/**
|
||||
* 检查是否启用队列
|
||||
*/
|
||||
public boolean isEnabled() {
|
||||
return queueEnabled;
|
||||
}
|
||||
public boolean isEnabled() {
|
||||
return queueEnabled;
|
||||
}
|
||||
|
||||
private String extractIdFromKey(String key) {
|
||||
return key.substring(key.lastIndexOf(':') + 1);
|
||||
}
|
||||
|
||||
/**
|
||||
* 服务关闭时清理资源
|
||||
|
||||
@@ -8,7 +8,7 @@ import lombok.NoArgsConstructor;
|
||||
/**
|
||||
* 人员调度状态 DTO
|
||||
* <p>
|
||||
* 对应 Redis Hash: ops:user:dispatch:{userId}
|
||||
* 对应 Redis Hash: ops:user:dispatch:t{tenantId}:{userId}
|
||||
* <p>
|
||||
* 适用于安保、工程、客服等人员维度的业务线。
|
||||
* 保洁使用设备维度的 {@code BadgeDeviceStatusDTO}。
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package com.viewsh.module.ops.infrastructure.code;
|
||||
|
||||
import jakarta.annotation.Resource;
|
||||
import com.viewsh.module.ops.infrastructure.redis.OpsRedisKeyBuilder;
|
||||
import jakarta.annotation.Resource;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.data.redis.core.StringRedisTemplate;
|
||||
import org.springframework.stereotype.Service;
|
||||
@@ -26,8 +27,7 @@ import java.util.concurrent.TimeUnit;
|
||||
@Service
|
||||
public class OrderCodeGenerator {
|
||||
|
||||
private static final String KEY_PREFIX = "ops:order:code:";
|
||||
private static final DateTimeFormatter DATE_FORMATTER = DateTimeFormatter.ofPattern("yyyyMMdd");
|
||||
private static final DateTimeFormatter DATE_FORMATTER = DateTimeFormatter.ofPattern("yyyyMMdd");
|
||||
private static final long DEFAULT_EXPIRE_DAYS = 7; // key 默认保留7天
|
||||
|
||||
@Resource
|
||||
@@ -45,7 +45,7 @@ public class OrderCodeGenerator {
|
||||
}
|
||||
|
||||
String dateStr = LocalDate.now().format(DATE_FORMATTER);
|
||||
String key = KEY_PREFIX + businessType + ":" + dateStr;
|
||||
String key = OpsRedisKeyBuilder.orderCode(businessType, dateStr);
|
||||
|
||||
// Redis 自增并获取新值
|
||||
Long seq = stringRedisTemplate.opsForValue().increment(key);
|
||||
@@ -74,7 +74,7 @@ public class OrderCodeGenerator {
|
||||
*/
|
||||
public long getCurrentSeq(String businessType) {
|
||||
String dateStr = LocalDate.now().format(DATE_FORMATTER);
|
||||
String key = KEY_PREFIX + businessType + ":" + dateStr;
|
||||
String key = OpsRedisKeyBuilder.orderCode(businessType, dateStr);
|
||||
String value = stringRedisTemplate.opsForValue().get(key);
|
||||
return value == null ? 0 : Long.parseLong(value);
|
||||
}
|
||||
@@ -86,7 +86,7 @@ public class OrderCodeGenerator {
|
||||
*/
|
||||
public void reset(String businessType) {
|
||||
String dateStr = LocalDate.now().format(DATE_FORMATTER);
|
||||
String key = KEY_PREFIX + businessType + ":" + dateStr;
|
||||
String key = OpsRedisKeyBuilder.orderCode(businessType, dateStr);
|
||||
stringRedisTemplate.delete(key);
|
||||
log.warn("重置工单编号序号: businessType={}, date={}", businessType, dateStr);
|
||||
}
|
||||
|
||||
@@ -0,0 +1,175 @@
|
||||
package com.viewsh.module.ops.infrastructure.redis;
|
||||
|
||||
import com.viewsh.framework.tenant.core.context.TenantContextHolder;
|
||||
|
||||
/**
|
||||
* Ops 模块 Redis Key 统一构建器。
|
||||
*/
|
||||
public final class OpsRedisKeyBuilder {
|
||||
|
||||
private static final String ORDER_QUEUE_PREFIX = "ops:order:queue";
|
||||
private static final String ORDER_QUEUE_INFO_PREFIX = "ops:order:queue:info";
|
||||
private static final String ORDER_QUEUE_LOCK_PREFIX = "ops:order:queue:lock";
|
||||
private static final String USER_DISPATCH_PREFIX = "ops:user:dispatch";
|
||||
private static final String BADGE_STATUS_PREFIX = "ops:badge:status";
|
||||
private static final String TRAFFIC_ACTIVE_ORDER_PREFIX = "ops:clean:traffic:active-order";
|
||||
private static final String TTS_QUEUE_PREFIX = "ops:tts:queue";
|
||||
private static final String TTS_LOCK_PREFIX = "ops:tts:lock";
|
||||
private static final String TTS_LOOP_PREFIX = "ops:tts:loop";
|
||||
private static final String ORDER_CODE_PREFIX = "ops:order:code";
|
||||
private static final String AREA_DEVICE_CONFIG_PREFIX = "ops:area";
|
||||
private static final String EVENT_DEDUP_PREFIX = "ops:dedup";
|
||||
private static final String BADGE_DEDUP_PREFIX = "ops:badge:dedup";
|
||||
private static final String TRAFFIC_PERSIST_LOCK_PREFIX = "ops:traffic:persist:lock";
|
||||
|
||||
private OpsRedisKeyBuilder() {
|
||||
}
|
||||
|
||||
public static String orderQueue(Long userId) {
|
||||
return orderQueue(TenantContextHolder.getRequiredTenantId(), userId);
|
||||
}
|
||||
|
||||
public static String orderQueue(Long tenantId, Long userId) {
|
||||
return tenantKey(ORDER_QUEUE_PREFIX, tenantId) + ":" + userId;
|
||||
}
|
||||
|
||||
public static String orderQueuePattern() {
|
||||
return orderQueuePattern(TenantContextHolder.getRequiredTenantId());
|
||||
}
|
||||
|
||||
public static String orderQueuePattern(Long tenantId) {
|
||||
return tenantKey(ORDER_QUEUE_PREFIX, tenantId) + ":*";
|
||||
}
|
||||
|
||||
public static String orderQueueInfo(Long queueId) {
|
||||
return orderQueueInfo(TenantContextHolder.getRequiredTenantId(), queueId);
|
||||
}
|
||||
|
||||
public static String orderQueueInfo(Long tenantId, Long queueId) {
|
||||
return tenantKey(ORDER_QUEUE_INFO_PREFIX, tenantId) + ":" + queueId;
|
||||
}
|
||||
|
||||
public static String orderQueueInfoPattern() {
|
||||
return orderQueueInfoPattern(TenantContextHolder.getRequiredTenantId());
|
||||
}
|
||||
|
||||
public static String orderQueueInfoPattern(Long tenantId) {
|
||||
return tenantKey(ORDER_QUEUE_INFO_PREFIX, tenantId) + ":*";
|
||||
}
|
||||
|
||||
public static String orderQueueLock(Long cleanerId) {
|
||||
return orderQueueLock(TenantContextHolder.getRequiredTenantId(), cleanerId);
|
||||
}
|
||||
|
||||
public static String orderQueueLock(Long tenantId, Long cleanerId) {
|
||||
return tenantKey(ORDER_QUEUE_LOCK_PREFIX, tenantId) + ":" + cleanerId;
|
||||
}
|
||||
|
||||
public static String userDispatchStatus(Long userId) {
|
||||
return userDispatchStatus(TenantContextHolder.getRequiredTenantId(), userId);
|
||||
}
|
||||
|
||||
public static String userDispatchStatus(Long tenantId, Long userId) {
|
||||
return tenantKey(USER_DISPATCH_PREFIX, tenantId) + ":" + userId;
|
||||
}
|
||||
|
||||
public static String badgeStatus(Long deviceId) {
|
||||
return badgeStatus(TenantContextHolder.getRequiredTenantId(), deviceId);
|
||||
}
|
||||
|
||||
public static String badgeStatus(Long tenantId, Long deviceId) {
|
||||
return tenantKey(BADGE_STATUS_PREFIX, tenantId) + ":" + deviceId;
|
||||
}
|
||||
|
||||
public static String badgeStatusPattern() {
|
||||
return badgeStatusPattern(TenantContextHolder.getRequiredTenantId());
|
||||
}
|
||||
|
||||
public static String badgeStatusPattern(Long tenantId) {
|
||||
return tenantKey(BADGE_STATUS_PREFIX, tenantId) + ":*";
|
||||
}
|
||||
|
||||
public static String trafficActiveOrder(Long areaId) {
|
||||
return trafficActiveOrder(TenantContextHolder.getRequiredTenantId(), areaId);
|
||||
}
|
||||
|
||||
public static String trafficActiveOrder(Long tenantId, Long areaId) {
|
||||
return tenantKey(TRAFFIC_ACTIVE_ORDER_PREFIX, tenantId) + ":" + areaId;
|
||||
}
|
||||
|
||||
public static String ttsQueue(Long deviceId) {
|
||||
return ttsQueue(TenantContextHolder.getRequiredTenantId(), deviceId);
|
||||
}
|
||||
|
||||
public static String ttsQueue(Long tenantId, Long deviceId) {
|
||||
return tenantKey(TTS_QUEUE_PREFIX, tenantId) + ":" + deviceId;
|
||||
}
|
||||
|
||||
public static String ttsQueuePattern() {
|
||||
return ttsQueuePattern(TenantContextHolder.getRequiredTenantId());
|
||||
}
|
||||
|
||||
public static String ttsQueuePattern(Long tenantId) {
|
||||
return tenantKey(TTS_QUEUE_PREFIX, tenantId) + ":*";
|
||||
}
|
||||
|
||||
public static String ttsLock(Long deviceId) {
|
||||
return ttsLock(TenantContextHolder.getRequiredTenantId(), deviceId);
|
||||
}
|
||||
|
||||
public static String ttsLock(Long tenantId, Long deviceId) {
|
||||
return tenantKey(TTS_LOCK_PREFIX, tenantId) + ":" + deviceId;
|
||||
}
|
||||
|
||||
public static String ttsLoop(Long deviceId) {
|
||||
return ttsLoop(TenantContextHolder.getRequiredTenantId(), deviceId);
|
||||
}
|
||||
|
||||
public static String ttsLoop(Long tenantId, Long deviceId) {
|
||||
return tenantKey(TTS_LOOP_PREFIX, tenantId) + ":" + deviceId;
|
||||
}
|
||||
|
||||
public static String orderCode(String businessType, String dateStr) {
|
||||
return orderCode(TenantContextHolder.getRequiredTenantId(), businessType, dateStr);
|
||||
}
|
||||
|
||||
public static String orderCode(Long tenantId, String businessType, String dateStr) {
|
||||
return tenantKey(ORDER_CODE_PREFIX, tenantId) + ":" + businessType + ":" + dateStr;
|
||||
}
|
||||
|
||||
public static String areaDeviceConfig(Long areaId, String relationType) {
|
||||
return areaDeviceConfig(TenantContextHolder.getRequiredTenantId(), areaId, relationType);
|
||||
}
|
||||
|
||||
public static String areaDeviceConfig(Long tenantId, Long areaId, String relationType) {
|
||||
return tenantKey(AREA_DEVICE_CONFIG_PREFIX, tenantId) + ":" + areaId + ":type:" + relationType;
|
||||
}
|
||||
|
||||
public static String eventDedup(String type, String eventId) {
|
||||
return eventDedup(TenantContextHolder.getRequiredTenantId(), type, eventId);
|
||||
}
|
||||
|
||||
public static String eventDedup(Long tenantId, String type, String eventId) {
|
||||
return tenantKey(EVENT_DEDUP_PREFIX, tenantId) + ":" + type + ":" + eventId;
|
||||
}
|
||||
|
||||
public static String badgeDedup(String eventId) {
|
||||
return badgeDedup(TenantContextHolder.getRequiredTenantId(), eventId);
|
||||
}
|
||||
|
||||
public static String badgeDedup(Long tenantId, String eventId) {
|
||||
return tenantKey(BADGE_DEDUP_PREFIX, tenantId) + ":status:" + eventId;
|
||||
}
|
||||
|
||||
public static String trafficPersistLock() {
|
||||
return trafficPersistLock(TenantContextHolder.getRequiredTenantId());
|
||||
}
|
||||
|
||||
public static String trafficPersistLock(Long tenantId) {
|
||||
return tenantKey(TRAFFIC_PERSIST_LOCK_PREFIX, tenantId);
|
||||
}
|
||||
|
||||
private static String tenantKey(String prefix, Long tenantId) {
|
||||
return prefix + ":t" + tenantId;
|
||||
}
|
||||
}
|
||||
@@ -33,7 +33,7 @@ public interface AreaDeviceService {
|
||||
/**
|
||||
* 根据区域ID和关联类型查询单个启用的关联关系(带缓存)
|
||||
* <p>
|
||||
* 缓存Key: ops:area:{areaId}:type:{relationType}
|
||||
* 缓存Key: ops:area:t{tenantId}:{areaId}:type:{relationType}
|
||||
*
|
||||
* @param areaId 区域ID
|
||||
* @param relationType 关联类型
|
||||
@@ -85,7 +85,7 @@ public interface AreaDeviceService {
|
||||
/**
|
||||
* 初始化区域设备配置缓存
|
||||
* <p>
|
||||
* 预热 ops:area:{areaId}:type:{relationType} 缓存
|
||||
* 预热 ops:area:t{tenantId}:{areaId}:type:{relationType} 缓存
|
||||
*/
|
||||
void initConfigCache();
|
||||
|
||||
|
||||
@@ -4,6 +4,7 @@ import com.viewsh.framework.common.util.json.JsonUtils;
|
||||
import com.viewsh.module.ops.api.area.AreaDeviceDTO;
|
||||
import com.viewsh.module.ops.dal.dataobject.area.OpsAreaDeviceRelationDO;
|
||||
import com.viewsh.module.ops.dal.mysql.area.OpsAreaDeviceRelationMapper;
|
||||
import com.viewsh.module.ops.infrastructure.redis.OpsRedisKeyBuilder;
|
||||
import jakarta.annotation.Resource;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.InitializingBean;
|
||||
@@ -18,7 +19,7 @@ import java.util.stream.Collectors;
|
||||
* 区域设备关联服务实现
|
||||
* <p>
|
||||
* 缓存设计:
|
||||
* - 唯一缓存Key: ops:area:{areaId}:type:{relationType}
|
||||
* - 唯一缓存Key: ops:area:t{tenantId}:{areaId}:type:{relationType}
|
||||
* - 用途: 存储区域+类型的设备配置(1对1绑定设备)
|
||||
* - N对N设备(如工牌): 先查DB获取关联区域,再用此Key获取配置
|
||||
*
|
||||
@@ -35,12 +36,10 @@ public class AreaDeviceServiceImpl implements AreaDeviceService, InitializingBea
|
||||
private StringRedisTemplate stringRedisTemplate;
|
||||
|
||||
/**
|
||||
* 缓存Key前缀: ops:area:{areaId}:type:{relationType}
|
||||
* 缓存Key前缀: ops:area:t{tenantId}:{areaId}:type:{relationType}
|
||||
* <p>
|
||||
* 唯一的区域设备配置缓存,适用于所有设备类型
|
||||
*/
|
||||
private static final String CONFIG_CACHE_KEY_PREFIX = "ops:area:%s:type:%s";
|
||||
|
||||
/**
|
||||
* 空值缓存标记
|
||||
*/
|
||||
@@ -72,7 +71,7 @@ public class AreaDeviceServiceImpl implements AreaDeviceService, InitializingBea
|
||||
return null;
|
||||
}
|
||||
|
||||
String cacheKey = String.format(CONFIG_CACHE_KEY_PREFIX, areaId, relationType);
|
||||
String cacheKey = OpsRedisKeyBuilder.areaDeviceConfig(areaId, relationType);
|
||||
|
||||
// 1. 先读缓存
|
||||
try {
|
||||
@@ -180,8 +179,8 @@ public class AreaDeviceServiceImpl implements AreaDeviceService, InitializingBea
|
||||
continue;
|
||||
}
|
||||
|
||||
String cacheKey = String.format(CONFIG_CACHE_KEY_PREFIX,
|
||||
relation.getAreaId(), relation.getRelationType());
|
||||
String cacheKey = OpsRedisKeyBuilder.areaDeviceConfig(
|
||||
relation.getTenantId(), relation.getAreaId(), relation.getRelationType());
|
||||
|
||||
try {
|
||||
AreaDeviceDTO dto = toAreaDeviceDTO(relation);
|
||||
@@ -217,7 +216,7 @@ public class AreaDeviceServiceImpl implements AreaDeviceService, InitializingBea
|
||||
}
|
||||
|
||||
try {
|
||||
String cacheKey = String.format(CONFIG_CACHE_KEY_PREFIX, areaId, relationType);
|
||||
String cacheKey = OpsRedisKeyBuilder.areaDeviceConfig(areaId, relationType);
|
||||
stringRedisTemplate.delete(cacheKey);
|
||||
log.info("[AreaDevice] 清除配置缓存:areaId={}, type={}", areaId, relationType);
|
||||
} catch (Exception e) {
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package com.viewsh.module.ops.service.dispatch;
|
||||
|
||||
import com.viewsh.module.ops.api.dispatch.UserDispatchStatusDTO;
|
||||
import com.viewsh.module.ops.infrastructure.redis.OpsRedisKeyBuilder;
|
||||
import jakarta.annotation.PostConstruct;
|
||||
import jakarta.annotation.Resource;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
@@ -13,7 +14,7 @@ import java.util.*;
|
||||
/**
|
||||
* 通用人员调度状态服务 - Redis 实现
|
||||
* <p>
|
||||
* Redis Key: ops:user:dispatch:{userId}
|
||||
* Redis Key: ops:user:dispatch:t{tenantId}:{userId}
|
||||
* Redis Type: Hash
|
||||
* TTL: 24h(每次写入刷新)
|
||||
* <p>
|
||||
@@ -25,7 +26,6 @@ import java.util.*;
|
||||
@Service
|
||||
public class UserDispatchStatusServiceImpl implements UserDispatchStatusService {
|
||||
|
||||
private static final String KEY_PREFIX = "ops:user:dispatch:";
|
||||
private static final long TTL_SECONDS = 24 * 3600; // 24h
|
||||
|
||||
@Resource
|
||||
@@ -256,7 +256,7 @@ public class UserDispatchStatusServiceImpl implements UserDispatchStatusService
|
||||
// ==================== 私有方法 ====================
|
||||
|
||||
private String keyOf(Long userId) {
|
||||
return KEY_PREFIX + userId;
|
||||
return OpsRedisKeyBuilder.userDispatchStatus(userId);
|
||||
}
|
||||
|
||||
private UserDispatchStatusDTO mapToDto(Long userId, Map<Object, Object> map) {
|
||||
|
||||
@@ -1,8 +1,10 @@
|
||||
package com.viewsh.module.ops.service.job;
|
||||
|
||||
import com.viewsh.framework.tenant.core.context.TenantContextHolder;
|
||||
import com.viewsh.framework.tenant.core.util.TenantUtils;
|
||||
import com.viewsh.module.ops.dal.dataobject.statistics.OpsTrafficStatisticsDO;
|
||||
import com.viewsh.module.ops.dal.mysql.statistics.OpsTrafficStatisticsMapper;
|
||||
import com.viewsh.module.ops.infrastructure.redis.OpsRedisKeyBuilder;
|
||||
import com.viewsh.module.ops.service.area.OpsBusAreaService;
|
||||
import com.xxl.job.core.handler.annotation.XxlJob;
|
||||
import jakarta.annotation.Resource;
|
||||
@@ -36,7 +38,7 @@ import java.util.concurrent.TimeUnit;
|
||||
public class TrafficStatisticsPersistJob {
|
||||
|
||||
private static final DateTimeFormatter DATE_FORMATTER = DateTimeFormatter.ofPattern("yyyyMMdd");
|
||||
private static final String LOCK_KEY = "ops:traffic:persist:lock";
|
||||
private static final String LOCK_KEY = "ops:traffic:persist:lock"; // 全局锁(该 Job 不分租户执行)
|
||||
private static final int LOCK_TTL_SECONDS = 300; // 5分钟锁超时
|
||||
|
||||
@Resource
|
||||
@@ -61,8 +63,9 @@ public class TrafficStatisticsPersistJob {
|
||||
log.info("[TrafficStatisticsPersistJob] 开始执行客流统计持久化任务");
|
||||
|
||||
// P0修复1: 获取分布式锁,防止并发执行导致双重计数
|
||||
String lockKey = getLockKey();
|
||||
Boolean locked = stringRedisTemplate.opsForValue()
|
||||
.setIfAbsent(LOCK_KEY, String.valueOf(System.currentTimeMillis()),
|
||||
.setIfAbsent(lockKey, String.valueOf(System.currentTimeMillis()),
|
||||
LOCK_TTL_SECONDS, TimeUnit.SECONDS);
|
||||
|
||||
if (Boolean.FALSE.equals(locked)) {
|
||||
@@ -108,7 +111,7 @@ public class TrafficStatisticsPersistJob {
|
||||
return "执行失败: " + e.getMessage();
|
||||
} finally {
|
||||
// 释放分布式锁
|
||||
stringRedisTemplate.delete(LOCK_KEY);
|
||||
stringRedisTemplate.delete(lockKey);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -326,6 +329,16 @@ public class TrafficStatisticsPersistJob {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取分布式锁 Key。
|
||||
* <p>
|
||||
* 该 Job 不使用 @TenantJob,而是全局扫描 IoT Redis Key 后按 tenantId 分发写入。
|
||||
* 因此使用全局锁防止多节点并发执行。
|
||||
*/
|
||||
private String getLockKey() {
|
||||
return LOCK_KEY;
|
||||
}
|
||||
|
||||
/**
|
||||
* 持久化结果枚举
|
||||
*/
|
||||
|
||||
@@ -11,13 +11,6 @@ import java.util.List;
|
||||
*/
|
||||
public interface RedisOrderQueueService {
|
||||
|
||||
/**
|
||||
* Redis Key 前缀
|
||||
*/
|
||||
String QUEUE_KEY_PREFIX = "ops:order:queue:";
|
||||
String INFO_KEY_PREFIX = "ops:order:queue:info:";
|
||||
String LOCK_KEY_PREFIX = "ops:order:queue:lock:";
|
||||
|
||||
// ========== 队列操作 ==========
|
||||
|
||||
/**
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package com.viewsh.module.ops.service.queue;
|
||||
|
||||
import com.viewsh.module.ops.api.queue.OrderQueueDTO;
|
||||
import com.viewsh.module.ops.infrastructure.redis.OpsRedisKeyBuilder;
|
||||
import jakarta.annotation.Resource;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.data.redis.core.StringRedisTemplate;
|
||||
@@ -29,8 +30,8 @@ public class RedisOrderQueueServiceImpl implements RedisOrderQueueService {
|
||||
@Override
|
||||
public boolean enqueue(OrderQueueDTO dto) {
|
||||
try {
|
||||
String queueKey = QUEUE_KEY_PREFIX + dto.getUserId();
|
||||
String infoKey = INFO_KEY_PREFIX + dto.getId();
|
||||
String queueKey = OpsRedisKeyBuilder.orderQueue(dto.getUserId());
|
||||
String infoKey = OpsRedisKeyBuilder.orderQueueInfo(dto.getId());
|
||||
|
||||
// Redis 仅持久化服务层已计算好的最终总分,不再本地兜底重算
|
||||
double score = requireQueueScore(dto, "enqueue");
|
||||
@@ -66,8 +67,8 @@ public class RedisOrderQueueServiceImpl implements RedisOrderQueueService {
|
||||
stringRedisTemplate.executePipelined((org.springframework.data.redis.core.RedisCallback<Object>) connection -> {
|
||||
dtos.forEach(dto -> {
|
||||
try {
|
||||
byte[] queueKey = (QUEUE_KEY_PREFIX + dto.getUserId()).getBytes();
|
||||
byte[] infoKey = (INFO_KEY_PREFIX + dto.getId()).getBytes();
|
||||
byte[] queueKey = OpsRedisKeyBuilder.orderQueue(dto.getUserId()).getBytes();
|
||||
byte[] infoKey = OpsRedisKeyBuilder.orderQueueInfo(dto.getId()).getBytes();
|
||||
|
||||
// Redis 仅持久化服务层已计算好的最终总分,不再本地兜底重算
|
||||
double score = requireQueueScore(dto, "batchEnqueue");
|
||||
@@ -102,7 +103,7 @@ public class RedisOrderQueueServiceImpl implements RedisOrderQueueService {
|
||||
@Override
|
||||
public OrderQueueDTO dequeue(Long cleanerId) {
|
||||
try {
|
||||
String queueKey = QUEUE_KEY_PREFIX + cleanerId;
|
||||
String queueKey = OpsRedisKeyBuilder.orderQueue(cleanerId);
|
||||
|
||||
// 使用 ZPOPMIN 原子性出队(获取并移除最高优先级任务)
|
||||
// popMin 返回单个 TypedTuple<String>,值为 queueId
|
||||
@@ -122,7 +123,7 @@ public class RedisOrderQueueServiceImpl implements RedisOrderQueueService {
|
||||
Long queueId = Long.parseLong(queueIdStr);
|
||||
|
||||
// 从 Hash 获取详细信息
|
||||
String infoKey = INFO_KEY_PREFIX + queueId;
|
||||
String infoKey = OpsRedisKeyBuilder.orderQueueInfo(queueId);
|
||||
Map<Object, Object> infoMap = stringRedisTemplate.opsForHash().entries(infoKey);
|
||||
|
||||
if (infoMap == null || infoMap.isEmpty()) {
|
||||
@@ -149,7 +150,7 @@ public class RedisOrderQueueServiceImpl implements RedisOrderQueueService {
|
||||
@Override
|
||||
public List<OrderQueueDTO> peekTasks(Long cleanerId, int count) {
|
||||
try {
|
||||
String queueKey = QUEUE_KEY_PREFIX + cleanerId;
|
||||
String queueKey = OpsRedisKeyBuilder.orderQueue(cleanerId);
|
||||
|
||||
// 1. 查询前 N 个 queueId(按 score 排序)
|
||||
Set<String> queueIds = stringRedisTemplate.opsForZSet().range(queueKey, 0, count - 1);
|
||||
@@ -163,7 +164,7 @@ public class RedisOrderQueueServiceImpl implements RedisOrderQueueService {
|
||||
.map(queueId -> {
|
||||
try {
|
||||
String idStr = queueId.toString();
|
||||
String infoKey = INFO_KEY_PREFIX + idStr;
|
||||
String infoKey = OpsRedisKeyBuilder.orderQueueInfo(Long.parseLong(idStr));
|
||||
Map<Object, Object> infoMap = stringRedisTemplate.opsForHash().entries(infoKey);
|
||||
if (infoMap != null && !infoMap.isEmpty()) {
|
||||
return mapToDto(infoMap);
|
||||
@@ -187,7 +188,7 @@ public class RedisOrderQueueServiceImpl implements RedisOrderQueueService {
|
||||
@Override
|
||||
public long getQueueSize(Long cleanerId) {
|
||||
try {
|
||||
String queueKey = QUEUE_KEY_PREFIX + cleanerId;
|
||||
String queueKey = OpsRedisKeyBuilder.orderQueue(cleanerId);
|
||||
Long size = stringRedisTemplate.opsForZSet().size(queueKey);
|
||||
return size != null ? size : 0;
|
||||
} catch (Exception e) {
|
||||
@@ -199,11 +200,11 @@ public class RedisOrderQueueServiceImpl implements RedisOrderQueueService {
|
||||
@Override
|
||||
public long clearQueue(Long cleanerId) {
|
||||
try {
|
||||
String queueKey = QUEUE_KEY_PREFIX + cleanerId;
|
||||
String queueKey = OpsRedisKeyBuilder.orderQueue(cleanerId);
|
||||
Set<String> queueIds = stringRedisTemplate.opsForZSet().range(queueKey, 0, -1);
|
||||
if (queueIds != null && !queueIds.isEmpty()) {
|
||||
List<String> infoKeys = queueIds.stream()
|
||||
.map(queueId -> INFO_KEY_PREFIX + queueId)
|
||||
.map(queueId -> OpsRedisKeyBuilder.orderQueueInfo(Long.parseLong(queueId)))
|
||||
.collect(Collectors.toList());
|
||||
stringRedisTemplate.delete(infoKeys);
|
||||
}
|
||||
@@ -220,7 +221,7 @@ public class RedisOrderQueueServiceImpl implements RedisOrderQueueService {
|
||||
@Override
|
||||
public boolean updateStatus(Long queueId, String newStatus) {
|
||||
try {
|
||||
String infoKey = INFO_KEY_PREFIX + queueId;
|
||||
String infoKey = OpsRedisKeyBuilder.orderQueueInfo(queueId);
|
||||
|
||||
// 先从 Hash 获取 userId
|
||||
Map<Object, Object> infoMap = stringRedisTemplate.opsForHash().entries(infoKey);
|
||||
@@ -236,7 +237,7 @@ public class RedisOrderQueueServiceImpl implements RedisOrderQueueService {
|
||||
}
|
||||
|
||||
Long userId = Long.parseLong(userIdObj.toString());
|
||||
String queueKey = QUEUE_KEY_PREFIX + userId;
|
||||
String queueKey = OpsRedisKeyBuilder.orderQueue(userId);
|
||||
|
||||
// 如果状态是 REMOVED,从 Sorted Set 中移除并删除 Hash
|
||||
if ("REMOVED".equalsIgnoreCase(newStatus)) {
|
||||
@@ -303,7 +304,7 @@ public class RedisOrderQueueServiceImpl implements RedisOrderQueueService {
|
||||
@Override
|
||||
public boolean updatePriority(Long queueId, Integer newPriority) {
|
||||
try {
|
||||
String infoKey = INFO_KEY_PREFIX + queueId;
|
||||
String infoKey = OpsRedisKeyBuilder.orderQueueInfo(queueId);
|
||||
|
||||
// 从 Hash 中获取数据
|
||||
Map<Object, Object> infoMap = stringRedisTemplate.opsForHash().entries(infoKey);
|
||||
@@ -318,7 +319,7 @@ public class RedisOrderQueueServiceImpl implements RedisOrderQueueService {
|
||||
return false;
|
||||
}
|
||||
|
||||
String queueKey = QUEUE_KEY_PREFIX + dto.getUserId();
|
||||
String queueKey = OpsRedisKeyBuilder.orderQueue(dto.getUserId());
|
||||
dto.setPriority(newPriority);
|
||||
double newScore = requireQueueScore(dto, "updatePriority");
|
||||
|
||||
@@ -364,7 +365,7 @@ public class RedisOrderQueueServiceImpl implements RedisOrderQueueService {
|
||||
@Override
|
||||
public boolean remove(Long queueId) {
|
||||
try {
|
||||
String infoKey = INFO_KEY_PREFIX + queueId;
|
||||
String infoKey = OpsRedisKeyBuilder.orderQueueInfo(queueId);
|
||||
|
||||
// 从 Hash 中获取 userId
|
||||
Map<Object, Object> infoMap = stringRedisTemplate.opsForHash().entries(infoKey);
|
||||
@@ -378,7 +379,7 @@ public class RedisOrderQueueServiceImpl implements RedisOrderQueueService {
|
||||
}
|
||||
|
||||
Long userId = Long.parseLong(userIdObj.toString());
|
||||
String queueKey = QUEUE_KEY_PREFIX + userId;
|
||||
String queueKey = OpsRedisKeyBuilder.orderQueue(userId);
|
||||
|
||||
// 使用 Lua 脚本原子性删除 Sorted Set 和 Hash
|
||||
String script =
|
||||
@@ -413,7 +414,7 @@ public class RedisOrderQueueServiceImpl implements RedisOrderQueueService {
|
||||
@Override
|
||||
public OrderQueueDTO getByQueueId(Long queueId) {
|
||||
try {
|
||||
String infoKey = INFO_KEY_PREFIX + queueId;
|
||||
String infoKey = OpsRedisKeyBuilder.orderQueueInfo(queueId);
|
||||
|
||||
Map<Object, Object> infoMap = stringRedisTemplate.opsForHash().entries(infoKey);
|
||||
if (infoMap.isEmpty()) {
|
||||
@@ -432,7 +433,7 @@ public class RedisOrderQueueServiceImpl implements RedisOrderQueueService {
|
||||
public OrderQueueDTO getByOrderId(Long orderId) {
|
||||
try {
|
||||
// 查询所有 Hash 信息键(效率较低,慎用)
|
||||
Set<String> keys = stringRedisTemplate.keys(INFO_KEY_PREFIX + "*");
|
||||
Set<String> keys = stringRedisTemplate.keys(OpsRedisKeyBuilder.orderQueueInfoPattern());
|
||||
if (keys == null || keys.isEmpty()) {
|
||||
return null;
|
||||
}
|
||||
@@ -449,7 +450,7 @@ public class RedisOrderQueueServiceImpl implements RedisOrderQueueService {
|
||||
if (dto == null || dto.getUserId() == null || dto.getId() == null) {
|
||||
continue;
|
||||
}
|
||||
String queueKey = QUEUE_KEY_PREFIX + dto.getUserId();
|
||||
String queueKey = OpsRedisKeyBuilder.orderQueue(dto.getUserId());
|
||||
Double score = stringRedisTemplate.opsForZSet().score(queueKey, dto.getId().toString());
|
||||
if (score != null) {
|
||||
return dto;
|
||||
@@ -477,7 +478,7 @@ public class RedisOrderQueueServiceImpl implements RedisOrderQueueService {
|
||||
@Override
|
||||
public boolean tryLock(Long cleanerId, String lockValue, long expireTime) {
|
||||
try {
|
||||
String lockKey = LOCK_KEY_PREFIX + cleanerId;
|
||||
String lockKey = OpsRedisKeyBuilder.orderQueueLock(cleanerId);
|
||||
|
||||
Boolean acquired = stringRedisTemplate.opsForValue()
|
||||
.setIfAbsent(lockKey, lockValue, expireTime, TimeUnit.MILLISECONDS);
|
||||
@@ -495,7 +496,7 @@ public class RedisOrderQueueServiceImpl implements RedisOrderQueueService {
|
||||
@Override
|
||||
public boolean unlock(Long cleanerId, String lockValue) {
|
||||
try {
|
||||
String lockKey = LOCK_KEY_PREFIX + cleanerId;
|
||||
String lockKey = OpsRedisKeyBuilder.orderQueueLock(cleanerId);
|
||||
|
||||
// 使用 Lua 脚本确保只删除自己的锁
|
||||
String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
|
||||
@@ -520,7 +521,7 @@ public class RedisOrderQueueServiceImpl implements RedisOrderQueueService {
|
||||
@Override
|
||||
public boolean forceUnlock(Long cleanerId) {
|
||||
try {
|
||||
String lockKey = LOCK_KEY_PREFIX + cleanerId;
|
||||
String lockKey = OpsRedisKeyBuilder.orderQueueLock(cleanerId);
|
||||
stringRedisTemplate.delete(lockKey);
|
||||
|
||||
log.warn("强制释放分布式锁: cleanerId={}", cleanerId);
|
||||
|
||||
@@ -1,5 +1,7 @@
|
||||
package com.viewsh.module.ops.infrastructure.code;
|
||||
|
||||
import com.viewsh.framework.tenant.core.context.TenantContextHolder;
|
||||
import org.junit.jupiter.api.AfterEach;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.extension.ExtendWith;
|
||||
@@ -37,6 +39,7 @@ class OrderCodeGeneratorTest {
|
||||
|
||||
@BeforeEach
|
||||
void setUp() {
|
||||
TenantContextHolder.setTenantId(1L);
|
||||
orderCodeGenerator = new OrderCodeGenerator();
|
||||
ReflectionTestUtils.setField(orderCodeGenerator, "stringRedisTemplate", stringRedisTemplate);
|
||||
|
||||
@@ -44,6 +47,11 @@ class OrderCodeGeneratorTest {
|
||||
lenient().when(stringRedisTemplate.opsForValue()).thenReturn(valueOperations);
|
||||
}
|
||||
|
||||
@AfterEach
|
||||
void tearDown() {
|
||||
TenantContextHolder.clear();
|
||||
}
|
||||
|
||||
@Test
|
||||
void testGenerate_FirstOrderOfToday() {
|
||||
// Given
|
||||
|
||||
Reference in New Issue
Block a user