diff --git a/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/service/badge/BadgeDeviceStatusServiceImpl.java b/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/service/badge/BadgeDeviceStatusServiceImpl.java index fe433d7..0b66f5c 100644 --- a/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/service/badge/BadgeDeviceStatusServiceImpl.java +++ b/viewsh-module-ops/viewsh-module-environment-biz/src/main/java/com/viewsh/module/ops/environment/service/badge/BadgeDeviceStatusServiceImpl.java @@ -7,7 +7,6 @@ import jakarta.annotation.Resource; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.InitializingBean; import org.springframework.data.redis.core.RedisTemplate; - import org.springframework.stereotype.Service; import java.time.LocalDateTime; @@ -23,13 +22,11 @@ import java.util.stream.Collectors; * 职责: * 1. 设备状态管理(IDLE/BUSY/PAUSED/OFFLINE) * 2. 设备与工单关联管理 - * 3. 区域设备索引管理 - * 4. 心跳超时检查 + * 3. 区域设备索引查询转发 *

* 设计说明: - * - 状态变更事件由 - * {@link com.viewsh.module.ops.environment.integration.listener.BadgeDeviceStatusEventListener} - * 处理 + * - 状态变更由 IoT 事件驱动或定时对账任务触发 + * - 区域索引维护委托给 {@link AreaDeviceService} * - 本类只提供基础的服务方法 * * @author lzh @@ -48,12 +45,6 @@ public class BadgeDeviceStatusServiceImpl implements BadgeDeviceStatusService, I * Redis Key 前缀 */ private static final String BADGE_STATUS_KEY_PREFIX = "ops:badge:status:"; - private static final String AREA_BADGES_KEY_PREFIX = "ops:area:badges:"; - - /** - * 心跳超时时间(分钟) - */ - private static final int HEARTBEAT_TIMEOUT_MINUTES = 30; /** * 状态过期时间(小时) @@ -172,16 +163,13 @@ public class BadgeDeviceStatusServiceImpl implements BadgeDeviceStatusService, I } try { - String areaKey = AREA_BADGES_KEY_PREFIX + areaId; - Set deviceIds = redisTemplate.opsForSet().members(areaKey); - - if (deviceIds == null || deviceIds.isEmpty()) { + Set deviceIds = areaDeviceService.getDeviceIdsByArea(areaId); + if (deviceIds.isEmpty()) { return Collections.emptyList(); } - return deviceIds.stream() - .map(id -> Long.parseLong(id.toString())) - .map(this::getBadgeStatus) + // 使用批量查询,避免 N+1 + return batchGetBadgeStatus(new ArrayList<>(deviceIds)).stream() .filter(Objects::nonNull) .filter(dto -> dto.getStatus() != null && dto.getStatus().isActive()) .sorted(Comparator.comparing(BadgeDeviceStatusDTO::getStatusChangeTime)) @@ -200,16 +188,14 @@ public class BadgeDeviceStatusServiceImpl implements BadgeDeviceStatusService, I } try { - String areaKey = AREA_BADGES_KEY_PREFIX + areaId; - Set deviceIds = redisTemplate.opsForSet().members(areaKey); - - if (deviceIds == null || deviceIds.isEmpty()) { + // 使用读穿透方式获取设备ID列表,缓存未命中时会从数据库重建 + Set deviceIds = areaDeviceService.getDeviceIdsByArea(areaId); + if (deviceIds.isEmpty()) { return Collections.emptyList(); } - return deviceIds.stream() - .map(id -> Long.parseLong(id.toString())) - .map(this::getBadgeStatus) + // 使用批量查询,避免 N+1 + return batchGetBadgeStatus(new ArrayList<>(deviceIds)).stream() .filter(Objects::nonNull) .filter(dto -> dto.getStatus() == BadgeDeviceStatusEnum.IDLE) .sorted(Comparator.comparing(BadgeDeviceStatusDTO::getLastHeartbeatTime).reversed()) @@ -274,13 +260,11 @@ public class BadgeDeviceStatusServiceImpl implements BadgeDeviceStatusService, I statusMap.put("lastHeartbeatTime", now); } - // 更新区域信息 + // 更新实时物理区域信息 (Key2) if (areaId != null) { statusMap.put("currentAreaId", areaId); - // 更新区域索引 - addToAreaIndex(deviceId, areaId); } else { - // 保持现有区域信息 + // 保持现有实时物理区域信息 Object existingAreaId = currentMap.get("currentAreaId"); if (existingAreaId != null) { statusMap.put("currentAreaId", existingAreaId); @@ -474,32 +458,12 @@ public class BadgeDeviceStatusServiceImpl implements BadgeDeviceStatusService, I @Override public void addToAreaIndex(Long deviceId, Long areaId) { - if (deviceId == null || areaId == null) { - return; - } - - try { - String areaKey = AREA_BADGES_KEY_PREFIX + areaId; - redisTemplate.opsForSet().add(areaKey, deviceId.toString()); - log.debug("添加设备到区域索引: deviceId={}, areaId={}", deviceId, areaId); - } catch (Exception e) { - log.error("添加设备到区域索引失败: deviceId={}, areaId={}", deviceId, areaId, e); - } + areaDeviceService.addToAreaIndex(deviceId, areaId); } @Override public void removeFromAreaIndex(Long deviceId, Long areaId) { - if (deviceId == null || areaId == null) { - return; - } - - try { - String areaKey = AREA_BADGES_KEY_PREFIX + areaId; - redisTemplate.opsForSet().remove(areaKey, deviceId.toString()); - log.debug("从区域索引移除设备: deviceId={}, areaId={}", deviceId, areaId); - } catch (Exception e) { - log.error("从区域索引移除设备失败: deviceId={}, areaId={}", deviceId, areaId, e); - } + areaDeviceService.removeFromAreaIndex(deviceId, areaId); } // ==================== 设备管理 ==================== diff --git a/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/area/AreaDeviceService.java b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/area/AreaDeviceService.java index 4f66ae7..3328580 100644 --- a/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/area/AreaDeviceService.java +++ b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/area/AreaDeviceService.java @@ -64,6 +64,14 @@ public interface AreaDeviceService { */ void refreshAreaDeviceIndex(); + /** + * 获取区域下的设备ID列表(带读穿透缓存) + * + * @param areaId 区域ID + * @return 设备ID集合 + */ + java.util.Set getDeviceIdsByArea(Long areaId); + /** * 添加设备到区域索引 * diff --git a/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/area/AreaDeviceServiceImpl.java b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/area/AreaDeviceServiceImpl.java index 585970e..5794efb 100644 --- a/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/area/AreaDeviceServiceImpl.java +++ b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/area/AreaDeviceServiceImpl.java @@ -14,6 +14,7 @@ import org.springframework.stereotype.Service; import java.util.List; import java.util.Set; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; /** * 区域设备关联服务实现 @@ -63,9 +64,10 @@ public class AreaDeviceServiceImpl implements AreaDeviceService, InitializingBea private static final String NULL_CACHE = "NULL"; /** - * 缓存 TTL(30 分钟) + * 缓存 TTL(24 小时) + * 区域设备关系相对静态,可以设置较长过期时间 */ - private static final int CACHE_TTL_MINUTES = 30; + private static final int CACHE_TTL_HOURS = 24; @Override public void afterPropertiesSet() { @@ -108,7 +110,8 @@ public class AreaDeviceServiceImpl implements AreaDeviceService, InitializingBea } // 从数据库查询 - List relations = relationMapper.selectListByAreaIdAndRelationType(areaId, relationType); + List relations = relationMapper.selectListByAreaIdAndRelationType(areaId, + relationType); // 返回第一个启用的 OpsAreaDeviceRelationDO relation = relations.stream() @@ -123,9 +126,8 @@ public class AreaDeviceServiceImpl implements AreaDeviceService, InitializingBea stringRedisTemplate.opsForValue().set( cacheKey, JsonUtils.toJsonString(dto), - CACHE_TTL_MINUTES, - TimeUnit.MINUTES - ); + CACHE_TTL_HOURS, + TimeUnit.HOURS); } else { // 空值缓存,防止穿透 stringRedisTemplate.opsForValue().set(cacheKey, NULL_CACHE, 1, TimeUnit.MINUTES); @@ -169,9 +171,8 @@ public class AreaDeviceServiceImpl implements AreaDeviceService, InitializingBea stringRedisTemplate.opsForValue().set( cacheKey, JsonUtils.toJsonString(dto), - CACHE_TTL_MINUTES, - TimeUnit.MINUTES - ); + CACHE_TTL_HOURS, + TimeUnit.HOURS); } else { // 空值缓存,防止穿透 stringRedisTemplate.opsForValue().set(cacheKey, NULL_CACHE, 1, TimeUnit.MINUTES); @@ -213,6 +214,45 @@ public class AreaDeviceServiceImpl implements AreaDeviceService, InitializingBea initAreaDeviceIndex(); } + @Override + public Set getDeviceIdsByArea(Long areaId) { + if (areaId == null) { + return Set.of(); + } + + String areaKey = AREA_BADGES_KEY_PREFIX + areaId; + try { + Set members = redisTemplate.opsForSet().members(areaKey); + if (members != null && !members.isEmpty()) { + return members.stream() + .map(m -> Long.parseLong(m.toString())) + .collect(Collectors.toSet()); + } + + // 缓存未命中,读穿透从数据库重构 + log.info("[AreaDevice] 区域索引缓存未命中,从数据库重建:areaId={}", areaId); + List relations = relationMapper.selectListByAreaIdAndRelationType(areaId, "BADGE"); + + Set deviceIds = relations.stream() + .filter(OpsAreaDeviceRelationDO::getEnabled) + .map(OpsAreaDeviceRelationDO::getDeviceId) + .collect(Collectors.toSet()); + + if (!deviceIds.isEmpty()) { + // 写入缓存 + String[] idStrings = deviceIds.stream().map(Object::toString).toArray(String[]::new); + redisTemplate.opsForSet().add(areaKey, (Object[]) idStrings); + redisTemplate.expire(areaKey, CACHE_TTL_HOURS, TimeUnit.HOURS); + } + + return deviceIds; + + } catch (Exception e) { + log.error("[AreaDevice] 获取区域设备索引失败:areaId={}", areaId, e); + return Set.of(); + } + } + @Override public void addToAreaIndex(Long deviceId, Long areaId) { if (deviceId == null || areaId == null) { @@ -222,7 +262,8 @@ public class AreaDeviceServiceImpl implements AreaDeviceService, InitializingBea try { String areaKey = AREA_BADGES_KEY_PREFIX + areaId; redisTemplate.opsForSet().add(areaKey, deviceId.toString()); - redisTemplate.expire(areaKey, CACHE_TTL_MINUTES, TimeUnit.MINUTES); + // 延长过期时间到 24 小时 + redisTemplate.expire(areaKey, CACHE_TTL_HOURS, TimeUnit.HOURS); log.debug("[AreaDevice] 添加设备到区域索引:deviceId={}, areaId={}", deviceId, areaId); } catch (Exception e) { log.error("[AreaDevice] 添加设备到区域索引失败:deviceId={}, areaId={}", deviceId, areaId, e);