fix(iot): 修复客流统计持久化无数据和租户隔离问题

1. 解耦统计采集与工单触发:将 incrementDaily() 提前到配置检查之前,
   即使设备未配置工单触发规则,统计数据也能正常写入 Redis
2. 修复租户隔离:Redis Hash 中写入 tenantId,持久化任务读取后在
   正确的租户上下文中执行 upsert 和区域查询
3. 修复清理任务:使用 TenantUtils.executeIgnore() 避免 XXL-Job
   线程无租户上下文导致 NPE

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
lzh
2026-02-10 13:18:48 +08:00
parent 1147ae4503
commit 631612951c
4 changed files with 81 additions and 41 deletions

View File

@@ -1,5 +1,6 @@
package com.viewsh.module.iot.dal.redis.clean;
import com.viewsh.framework.tenant.core.context.TenantContextHolder;
import jakarta.annotation.Resource;
import org.springframework.data.redis.core.Cursor;
import org.springframework.data.redis.core.ScanOptions;
@@ -54,6 +55,7 @@ public class TrafficCounterRedisDAO {
private static final String FIELD_TOTAL_OUT = "totalOut";
private static final String FIELD_LAST_PERSISTED_IN = "lastPersistedIn";
private static final String FIELD_LAST_PERSISTED_OUT = "lastPersistedOut";
private static final String FIELD_TENANT_ID = "tenantId";
private static final DateTimeFormatter DATE_FORMATTER = DateTimeFormatter.ofPattern("yyyyMMdd");
@@ -124,6 +126,8 @@ public class TrafficCounterRedisDAO {
/**
* 原子递增当日累积统计
* <p>
* 同时记录当前租户ID供持久化任务使用
*
* @param deviceId 设备ID
* @param date 日期
@@ -138,6 +142,11 @@ public class TrafficCounterRedisDAO {
if (peopleOut > 0) {
stringRedisTemplate.opsForHash().increment(key, FIELD_TOTAL_OUT, peopleOut);
}
// 记录租户ID幂等写入同一设备始终属于同一租户
Long tenantId = TenantContextHolder.getTenantId();
if (tenantId != null) {
stringRedisTemplate.opsForHash().putIfAbsent(key, FIELD_TENANT_ID, String.valueOf(tenantId));
}
// 设置 TTL幂等每次都设置保证不过期
stringRedisTemplate.expire(key, DAILY_TTL_SECONDS, TimeUnit.SECONDS);
}

View File

@@ -47,8 +47,11 @@ public class TrafficThresholdRuleProcessor {
* 处理客流属性上报
* <p>
* 支持 people_in 和 people_out 两个属性:
* - people_in累加到阈值计数器 + 当日统计
* - people_out累加到当日统计
* - people_in累加到当日统计 + 阈值计数器(需配置)
* - people_out累加到当日统计
* <p>
* 当日累积统计(用于报表)与工单触发(需配置)解耦:
* 即使设备未配置工单触发规则,统计数据也会正常采集。
*
* @param deviceId 设备ID
* @param identifier 属性标识符people_in 或 people_out
@@ -63,12 +66,33 @@ public class TrafficThresholdRuleProcessor {
log.debug("[TrafficThreshold] 收到客流属性deviceId={}, identifier={}, value={}",
deviceId, identifier, propertyValue);
// 2. 获取设备关联信息(包含 areaId
// 2. 解析增量值
Long increment = parseTrafficCount(propertyValue);
if (increment == null || increment <= 0) {
log.debug("[TrafficThreshold] 增量值无效deviceId={}, identifier={}, value={}",
deviceId, identifier, propertyValue);
return;
}
// 3. 无条件累加到当日统计(统计与工单触发解耦)
LocalDate today = LocalDate.now();
if ("people_in".equals(identifier)) {
trafficCounterRedisDAO.incrementDaily(deviceId, today, increment, 0);
} else {
trafficCounterRedisDAO.incrementDaily(deviceId, today, 0, increment);
}
log.debug("[TrafficThreshold] 当日统计累加deviceId={}, identifier={}, increment={}",
deviceId, identifier, increment);
// 4. 以下为工单触发逻辑,需要设备配置支持
if (!"people_in".equals(identifier)) {
return; // people_out 不参与阈值判定
}
CleanOrderIntegrationConfigService.AreaDeviceConfigWrapper configWrapper =
getConfigWrapper(deviceId);
if (configWrapper == null || configWrapper.getConfig() == null) {
log.debug("[TrafficThreshold] 设备无配置deviceId={}", deviceId);
log.debug("[TrafficThreshold] 设备无工单触发配置deviceId={}", deviceId);
return;
}
@@ -78,26 +102,8 @@ public class TrafficThresholdRuleProcessor {
return;
}
// 3. 解析增量值
Long increment = parseTrafficCount(propertyValue);
if (increment == null || increment <= 0) {
log.debug("[TrafficThreshold] 增量值无效deviceId={}, identifier={}, value={}",
deviceId, identifier, propertyValue);
return;
}
Long areaId = configWrapper.getAreaId();
LocalDate today = LocalDate.now();
// 4. 根据属性类型分别处理
if ("people_in".equals(identifier)) {
handlePeopleIn(deviceId, areaId, increment, today, thresholdConfig, configWrapper);
} else {
// people_out仅累加到当日统计
trafficCounterRedisDAO.incrementDaily(deviceId, today, 0, increment);
log.debug("[TrafficThreshold] people_out 累加到当日统计deviceId={}, areaId={}, increment={}",
deviceId, areaId, increment);
}
handlePeopleIn(deviceId, areaId, increment, today, thresholdConfig, configWrapper);
}
/**
@@ -106,13 +112,10 @@ public class TrafficThresholdRuleProcessor {
private void handlePeopleIn(Long deviceId, Long areaId, Long increment, LocalDate today,
TrafficThresholdConfig thresholdConfig,
CleanOrderIntegrationConfigService.AreaDeviceConfigWrapper configWrapper) {
// 1. 原子累加到阈值计数器,返回累积值
// 1. 原子累加到阈值计数器,返回累积值(当日统计已在 processPropertyChange 中完成)
Long accumulated = trafficCounterRedisDAO.incrementThreshold(deviceId, areaId, increment);
// 2. 原子累加到当日统计
trafficCounterRedisDAO.incrementDaily(deviceId, today, increment, 0);
log.debug("[TrafficThreshold] people_in 累加deviceId={}, areaId={}, increment={}, accumulated={}, threshold={}",
log.debug("[TrafficThreshold] people_in 阈值累加deviceId={}, areaId={}, increment={}, accumulated={}, threshold={}",
deviceId, areaId, increment, accumulated, thresholdConfig.getThreshold());
// 3. 阈值判定

View File

@@ -1,5 +1,6 @@
package com.viewsh.module.ops.service.job;
import com.viewsh.framework.tenant.core.util.TenantUtils;
import com.viewsh.module.ops.dal.mysql.statistics.OpsTrafficStatisticsMapper;
import com.xxl.job.core.handler.annotation.XxlJob;
import jakarta.annotation.Resource;
@@ -36,7 +37,9 @@ public class TrafficStatisticsCleanupJob {
try {
LocalDateTime beforeTime = LocalDateTime.now().minusDays(30);
int deletedCount = trafficStatisticsMapper.deleteByStatHourBefore(beforeTime);
// 使用 executeIgnore 忽略租户过滤,清理所有租户的过期数据
int deletedCount = TenantUtils.executeIgnore(
() -> trafficStatisticsMapper.deleteByStatHourBefore(beforeTime));
log.info("[TrafficStatisticsCleanupJob] 客流统计清理完成:删除 {} 条记录(截止时间={}",
deletedCount, beforeTime);

View File

@@ -149,6 +149,13 @@ public class TrafficStatisticsPersistJob {
long lastPersistedIn = parseLong(data.get("lastPersistedIn"));
long lastPersistedOut = parseLong(data.get("lastPersistedOut"));
// 读取租户ID
Long tenantId = parseLongOrNull(data.get("tenantId"));
if (tenantId == null) {
log.warn("[TrafficStatisticsPersistJob] 缺少租户信息跳过deviceId={}, date={}", deviceId, date);
return PersistResult.SKIPPED;
}
// 计算本次需要持久化的增量
long deltaIn = totalIn - lastPersistedIn;
long deltaOut = totalOut - lastPersistedOut;
@@ -179,8 +186,8 @@ public class TrafficStatisticsPersistJob {
return PersistResult.ERROR;
}
// 获取区域ID从区域设备关系中查询)
Long areaId = getAreaIdForDevice(deviceId);
// 获取区域ID在正确的租户上下文中查询)
Long areaId = getAreaIdForDevice(deviceId, tenantId);
// P1修复4: 处理缺失区域关联场景
if (areaId == null) {
@@ -204,15 +211,17 @@ public class TrafficStatisticsPersistJob {
.peopleIn((int) deltaIn)
.peopleOut((int) deltaOut)
.build();
record.setTenantId(tenantId);
trafficStatisticsMapper.upsert(record);
// 在正确的租户上下文中执行 upsert确保租户拦截器正常工作
TenantUtils.execute(tenantId, () -> trafficStatisticsMapper.upsert(record));
// 更新 Redis 已持久化的值
stringRedisTemplate.opsForHash().put(key, "lastPersistedIn", String.valueOf(totalIn));
stringRedisTemplate.opsForHash().put(key, "lastPersistedOut", String.valueOf(totalOut));
log.debug("[TrafficStatisticsPersistJob] 持久化成功deviceId={}, areaId={}, statHour={}, deltaIn={}, deltaOut={}",
deviceId, areaId, statHour, deltaIn, deltaOut);
log.debug("[TrafficStatisticsPersistJob] 持久化成功deviceId={}, areaId={}, tenantId={}, statHour={}, deltaIn={}, deltaOut={}",
deviceId, areaId, tenantId, statHour, deltaIn, deltaOut);
return PersistResult.SUCCESS;
}
@@ -241,13 +250,12 @@ public class TrafficStatisticsPersistJob {
/**
* 获取设备关联的区域ID
*/
private Long getAreaIdForDevice(Long deviceId) {
private Long getAreaIdForDevice(Long deviceId, Long tenantId) {
try {
// 通过 OpsBusAreaService 查询设备关联的区域
// 使用 executeIgnore 忽略租户过滤,因为 xxl-job 线程无租户上下文
return TenantUtils.executeIgnore(() -> areaService.getAreaIdByDeviceId(deviceId));
// 在正确的租户上下文中查询设备关联的区域
return TenantUtils.execute(tenantId, () -> areaService.getAreaIdByDeviceId(deviceId));
} catch (Exception e) {
log.error("[TrafficStatisticsPersistJob] 查询设备区域失败deviceId={}", deviceId, e);
log.error("[TrafficStatisticsPersistJob] 查询设备区域失败deviceId={}, tenantId={}", deviceId, tenantId, e);
return null;
}
}
@@ -285,7 +293,7 @@ public class TrafficStatisticsPersistJob {
}
/**
* 解析 Long 值
* 解析 Long 值,无数据返回 0
*/
private long parseLong(Object value) {
if (value == null) {
@@ -301,6 +309,23 @@ public class TrafficStatisticsPersistJob {
}
}
/**
* 解析 Long 值,无数据返回 null
*/
private Long parseLongOrNull(Object value) {
if (value == null) {
return null;
}
if (value instanceof Number) {
return ((Number) value).longValue();
}
try {
return Long.parseLong(value.toString());
} catch (NumberFormatException e) {
return null;
}
}
/**
* 持久化结果枚举
*/