fix(ops): 修复区域设备索引缓存问题并优化查询性能
1. 修复 listAvailableBadges() 读穿透 bug - 改用 areaDeviceService.getDeviceIdsByArea() 获取设备列表 - 缓存未命中时自动从数据库重建 2. 优化 N+1 查询问题 - listBadgesByArea() 和 listAvailableBadges() 使用 batchGetBadgeStatus() 批量查询 3. 简化 BadgeDeviceStatusServiceImpl - 移除重复的 AREA_BADGES_KEY_PREFIX 常量 - 区域索引操作委托给 AreaDeviceService 处理 4. 增强缓存可靠性 - getDeviceIdsByArea() 支持读穿透缓存 - 缓存 TTL 从 30 分钟延长到 24 小时 Co-Authored-By: Claude (MiniMax-M2.1) <noreply@anthropic.com>
This commit is contained in:
@@ -7,7 +7,6 @@ import jakarta.annotation.Resource;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.InitializingBean;
|
||||
import org.springframework.data.redis.core.RedisTemplate;
|
||||
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import java.time.LocalDateTime;
|
||||
@@ -23,13 +22,11 @@ import java.util.stream.Collectors;
|
||||
* 职责:
|
||||
* 1. 设备状态管理(IDLE/BUSY/PAUSED/OFFLINE)
|
||||
* 2. 设备与工单关联管理
|
||||
* 3. 区域设备索引管理
|
||||
* 4. 心跳超时检查
|
||||
* 3. 区域设备索引查询转发
|
||||
* <p>
|
||||
* 设计说明:
|
||||
* - 状态变更事件由
|
||||
* {@link com.viewsh.module.ops.environment.integration.listener.BadgeDeviceStatusEventListener}
|
||||
* 处理
|
||||
* - 状态变更由 IoT 事件驱动或定时对账任务触发
|
||||
* - 区域索引维护委托给 {@link AreaDeviceService}
|
||||
* - 本类只提供基础的服务方法
|
||||
*
|
||||
* @author lzh
|
||||
@@ -48,12 +45,6 @@ public class BadgeDeviceStatusServiceImpl implements BadgeDeviceStatusService, I
|
||||
* Redis Key 前缀
|
||||
*/
|
||||
private static final String BADGE_STATUS_KEY_PREFIX = "ops:badge:status:";
|
||||
private static final String AREA_BADGES_KEY_PREFIX = "ops:area:badges:";
|
||||
|
||||
/**
|
||||
* 心跳超时时间(分钟)
|
||||
*/
|
||||
private static final int HEARTBEAT_TIMEOUT_MINUTES = 30;
|
||||
|
||||
/**
|
||||
* 状态过期时间(小时)
|
||||
@@ -172,16 +163,13 @@ public class BadgeDeviceStatusServiceImpl implements BadgeDeviceStatusService, I
|
||||
}
|
||||
|
||||
try {
|
||||
String areaKey = AREA_BADGES_KEY_PREFIX + areaId;
|
||||
Set<Object> deviceIds = redisTemplate.opsForSet().members(areaKey);
|
||||
|
||||
if (deviceIds == null || deviceIds.isEmpty()) {
|
||||
Set<Long> deviceIds = areaDeviceService.getDeviceIdsByArea(areaId);
|
||||
if (deviceIds.isEmpty()) {
|
||||
return Collections.emptyList();
|
||||
}
|
||||
|
||||
return deviceIds.stream()
|
||||
.map(id -> Long.parseLong(id.toString()))
|
||||
.map(this::getBadgeStatus)
|
||||
// 使用批量查询,避免 N+1
|
||||
return batchGetBadgeStatus(new ArrayList<>(deviceIds)).stream()
|
||||
.filter(Objects::nonNull)
|
||||
.filter(dto -> dto.getStatus() != null && dto.getStatus().isActive())
|
||||
.sorted(Comparator.comparing(BadgeDeviceStatusDTO::getStatusChangeTime))
|
||||
@@ -200,16 +188,14 @@ public class BadgeDeviceStatusServiceImpl implements BadgeDeviceStatusService, I
|
||||
}
|
||||
|
||||
try {
|
||||
String areaKey = AREA_BADGES_KEY_PREFIX + areaId;
|
||||
Set<Object> deviceIds = redisTemplate.opsForSet().members(areaKey);
|
||||
|
||||
if (deviceIds == null || deviceIds.isEmpty()) {
|
||||
// 使用读穿透方式获取设备ID列表,缓存未命中时会从数据库重建
|
||||
Set<Long> deviceIds = areaDeviceService.getDeviceIdsByArea(areaId);
|
||||
if (deviceIds.isEmpty()) {
|
||||
return Collections.emptyList();
|
||||
}
|
||||
|
||||
return deviceIds.stream()
|
||||
.map(id -> Long.parseLong(id.toString()))
|
||||
.map(this::getBadgeStatus)
|
||||
// 使用批量查询,避免 N+1
|
||||
return batchGetBadgeStatus(new ArrayList<>(deviceIds)).stream()
|
||||
.filter(Objects::nonNull)
|
||||
.filter(dto -> dto.getStatus() == BadgeDeviceStatusEnum.IDLE)
|
||||
.sorted(Comparator.comparing(BadgeDeviceStatusDTO::getLastHeartbeatTime).reversed())
|
||||
@@ -274,13 +260,11 @@ public class BadgeDeviceStatusServiceImpl implements BadgeDeviceStatusService, I
|
||||
statusMap.put("lastHeartbeatTime", now);
|
||||
}
|
||||
|
||||
// 更新区域信息
|
||||
// 更新实时物理区域信息 (Key2)
|
||||
if (areaId != null) {
|
||||
statusMap.put("currentAreaId", areaId);
|
||||
// 更新区域索引
|
||||
addToAreaIndex(deviceId, areaId);
|
||||
} else {
|
||||
// 保持现有区域信息
|
||||
// 保持现有实时物理区域信息
|
||||
Object existingAreaId = currentMap.get("currentAreaId");
|
||||
if (existingAreaId != null) {
|
||||
statusMap.put("currentAreaId", existingAreaId);
|
||||
@@ -474,32 +458,12 @@ public class BadgeDeviceStatusServiceImpl implements BadgeDeviceStatusService, I
|
||||
|
||||
@Override
|
||||
public void addToAreaIndex(Long deviceId, Long areaId) {
|
||||
if (deviceId == null || areaId == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
String areaKey = AREA_BADGES_KEY_PREFIX + areaId;
|
||||
redisTemplate.opsForSet().add(areaKey, deviceId.toString());
|
||||
log.debug("添加设备到区域索引: deviceId={}, areaId={}", deviceId, areaId);
|
||||
} catch (Exception e) {
|
||||
log.error("添加设备到区域索引失败: deviceId={}, areaId={}", deviceId, areaId, e);
|
||||
}
|
||||
areaDeviceService.addToAreaIndex(deviceId, areaId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void removeFromAreaIndex(Long deviceId, Long areaId) {
|
||||
if (deviceId == null || areaId == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
String areaKey = AREA_BADGES_KEY_PREFIX + areaId;
|
||||
redisTemplate.opsForSet().remove(areaKey, deviceId.toString());
|
||||
log.debug("从区域索引移除设备: deviceId={}, areaId={}", deviceId, areaId);
|
||||
} catch (Exception e) {
|
||||
log.error("从区域索引移除设备失败: deviceId={}, areaId={}", deviceId, areaId, e);
|
||||
}
|
||||
areaDeviceService.removeFromAreaIndex(deviceId, areaId);
|
||||
}
|
||||
|
||||
// ==================== 设备管理 ====================
|
||||
|
||||
@@ -64,6 +64,14 @@ public interface AreaDeviceService {
|
||||
*/
|
||||
void refreshAreaDeviceIndex();
|
||||
|
||||
/**
|
||||
* 获取区域下的设备ID列表(带读穿透缓存)
|
||||
*
|
||||
* @param areaId 区域ID
|
||||
* @return 设备ID集合
|
||||
*/
|
||||
java.util.Set<Long> getDeviceIdsByArea(Long areaId);
|
||||
|
||||
/**
|
||||
* 添加设备到区域索引
|
||||
*
|
||||
|
||||
@@ -14,6 +14,7 @@ import org.springframework.stereotype.Service;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* 区域设备关联服务实现
|
||||
@@ -63,9 +64,10 @@ public class AreaDeviceServiceImpl implements AreaDeviceService, InitializingBea
|
||||
private static final String NULL_CACHE = "NULL";
|
||||
|
||||
/**
|
||||
* 缓存 TTL(30 分钟)
|
||||
* 缓存 TTL(24 小时)
|
||||
* 区域设备关系相对静态,可以设置较长过期时间
|
||||
*/
|
||||
private static final int CACHE_TTL_MINUTES = 30;
|
||||
private static final int CACHE_TTL_HOURS = 24;
|
||||
|
||||
@Override
|
||||
public void afterPropertiesSet() {
|
||||
@@ -108,7 +110,8 @@ public class AreaDeviceServiceImpl implements AreaDeviceService, InitializingBea
|
||||
}
|
||||
|
||||
// 从数据库查询
|
||||
List<OpsAreaDeviceRelationDO> relations = relationMapper.selectListByAreaIdAndRelationType(areaId, relationType);
|
||||
List<OpsAreaDeviceRelationDO> relations = relationMapper.selectListByAreaIdAndRelationType(areaId,
|
||||
relationType);
|
||||
|
||||
// 返回第一个启用的
|
||||
OpsAreaDeviceRelationDO relation = relations.stream()
|
||||
@@ -123,9 +126,8 @@ public class AreaDeviceServiceImpl implements AreaDeviceService, InitializingBea
|
||||
stringRedisTemplate.opsForValue().set(
|
||||
cacheKey,
|
||||
JsonUtils.toJsonString(dto),
|
||||
CACHE_TTL_MINUTES,
|
||||
TimeUnit.MINUTES
|
||||
);
|
||||
CACHE_TTL_HOURS,
|
||||
TimeUnit.HOURS);
|
||||
} else {
|
||||
// 空值缓存,防止穿透
|
||||
stringRedisTemplate.opsForValue().set(cacheKey, NULL_CACHE, 1, TimeUnit.MINUTES);
|
||||
@@ -169,9 +171,8 @@ public class AreaDeviceServiceImpl implements AreaDeviceService, InitializingBea
|
||||
stringRedisTemplate.opsForValue().set(
|
||||
cacheKey,
|
||||
JsonUtils.toJsonString(dto),
|
||||
CACHE_TTL_MINUTES,
|
||||
TimeUnit.MINUTES
|
||||
);
|
||||
CACHE_TTL_HOURS,
|
||||
TimeUnit.HOURS);
|
||||
} else {
|
||||
// 空值缓存,防止穿透
|
||||
stringRedisTemplate.opsForValue().set(cacheKey, NULL_CACHE, 1, TimeUnit.MINUTES);
|
||||
@@ -213,6 +214,45 @@ public class AreaDeviceServiceImpl implements AreaDeviceService, InitializingBea
|
||||
initAreaDeviceIndex();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<Long> getDeviceIdsByArea(Long areaId) {
|
||||
if (areaId == null) {
|
||||
return Set.of();
|
||||
}
|
||||
|
||||
String areaKey = AREA_BADGES_KEY_PREFIX + areaId;
|
||||
try {
|
||||
Set<Object> members = redisTemplate.opsForSet().members(areaKey);
|
||||
if (members != null && !members.isEmpty()) {
|
||||
return members.stream()
|
||||
.map(m -> Long.parseLong(m.toString()))
|
||||
.collect(Collectors.toSet());
|
||||
}
|
||||
|
||||
// 缓存未命中,读穿透从数据库重构
|
||||
log.info("[AreaDevice] 区域索引缓存未命中,从数据库重建:areaId={}", areaId);
|
||||
List<OpsAreaDeviceRelationDO> relations = relationMapper.selectListByAreaIdAndRelationType(areaId, "BADGE");
|
||||
|
||||
Set<Long> deviceIds = relations.stream()
|
||||
.filter(OpsAreaDeviceRelationDO::getEnabled)
|
||||
.map(OpsAreaDeviceRelationDO::getDeviceId)
|
||||
.collect(Collectors.toSet());
|
||||
|
||||
if (!deviceIds.isEmpty()) {
|
||||
// 写入缓存
|
||||
String[] idStrings = deviceIds.stream().map(Object::toString).toArray(String[]::new);
|
||||
redisTemplate.opsForSet().add(areaKey, (Object[]) idStrings);
|
||||
redisTemplate.expire(areaKey, CACHE_TTL_HOURS, TimeUnit.HOURS);
|
||||
}
|
||||
|
||||
return deviceIds;
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("[AreaDevice] 获取区域设备索引失败:areaId={}", areaId, e);
|
||||
return Set.of();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void addToAreaIndex(Long deviceId, Long areaId) {
|
||||
if (deviceId == null || areaId == null) {
|
||||
@@ -222,7 +262,8 @@ public class AreaDeviceServiceImpl implements AreaDeviceService, InitializingBea
|
||||
try {
|
||||
String areaKey = AREA_BADGES_KEY_PREFIX + areaId;
|
||||
redisTemplate.opsForSet().add(areaKey, deviceId.toString());
|
||||
redisTemplate.expire(areaKey, CACHE_TTL_MINUTES, TimeUnit.MINUTES);
|
||||
// 延长过期时间到 24 小时
|
||||
redisTemplate.expire(areaKey, CACHE_TTL_HOURS, TimeUnit.HOURS);
|
||||
log.debug("[AreaDevice] 添加设备到区域索引:deviceId={}, areaId={}", deviceId, areaId);
|
||||
} catch (Exception e) {
|
||||
log.error("[AreaDevice] 添加设备到区域索引失败:deviceId={}, areaId={}", deviceId, areaId, e);
|
||||
|
||||
Reference in New Issue
Block a user