diff --git a/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/dal/redis/clean/TrafficCounterRedisDAO.java b/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/dal/redis/clean/TrafficCounterRedisDAO.java index 58ca275..0132edf 100644 --- a/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/dal/redis/clean/TrafficCounterRedisDAO.java +++ b/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/dal/redis/clean/TrafficCounterRedisDAO.java @@ -1,5 +1,6 @@ package com.viewsh.module.iot.dal.redis.clean; +import com.viewsh.framework.tenant.core.context.TenantContextHolder; import jakarta.annotation.Resource; import org.springframework.data.redis.core.Cursor; import org.springframework.data.redis.core.ScanOptions; @@ -54,6 +55,7 @@ public class TrafficCounterRedisDAO { private static final String FIELD_TOTAL_OUT = "totalOut"; private static final String FIELD_LAST_PERSISTED_IN = "lastPersistedIn"; private static final String FIELD_LAST_PERSISTED_OUT = "lastPersistedOut"; + private static final String FIELD_TENANT_ID = "tenantId"; private static final DateTimeFormatter DATE_FORMATTER = DateTimeFormatter.ofPattern("yyyyMMdd"); @@ -124,6 +126,8 @@ public class TrafficCounterRedisDAO { /** * 原子递增当日累积统计 + *

+ * 同时记录当前租户ID,供持久化任务使用 * * @param deviceId 设备ID * @param date 日期 @@ -138,6 +142,11 @@ public class TrafficCounterRedisDAO { if (peopleOut > 0) { stringRedisTemplate.opsForHash().increment(key, FIELD_TOTAL_OUT, peopleOut); } + // 记录租户ID(幂等写入,同一设备始终属于同一租户) + Long tenantId = TenantContextHolder.getTenantId(); + if (tenantId != null) { + stringRedisTemplate.opsForHash().putIfAbsent(key, FIELD_TENANT_ID, String.valueOf(tenantId)); + } // 设置 TTL(幂等,每次都设置保证不过期) stringRedisTemplate.expire(key, DAILY_TTL_SECONDS, TimeUnit.SECONDS); } diff --git a/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/service/rule/clean/processor/TrafficThresholdRuleProcessor.java b/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/service/rule/clean/processor/TrafficThresholdRuleProcessor.java index 38a5243..52e9010 100644 --- a/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/service/rule/clean/processor/TrafficThresholdRuleProcessor.java +++ b/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/service/rule/clean/processor/TrafficThresholdRuleProcessor.java @@ -47,8 +47,11 @@ public class TrafficThresholdRuleProcessor { * 处理客流属性上报 *

* 支持 people_in 和 people_out 两个属性: - * - people_in:累加到阈值计数器 + 当日统计 - * - people_out:仅累加到当日统计 + * - people_in:累加到当日统计 + 阈值计数器(需配置) + * - people_out:累加到当日统计 + *

+ * 当日累积统计(用于报表)与工单触发(需配置)解耦: + * 即使设备未配置工单触发规则,统计数据也会正常采集。 * * @param deviceId 设备ID * @param identifier 属性标识符(people_in 或 people_out) @@ -63,12 +66,33 @@ public class TrafficThresholdRuleProcessor { log.debug("[TrafficThreshold] 收到客流属性:deviceId={}, identifier={}, value={}", deviceId, identifier, propertyValue); - // 2. 获取设备关联信息(包含 areaId) + // 2. 解析增量值 + Long increment = parseTrafficCount(propertyValue); + if (increment == null || increment <= 0) { + log.debug("[TrafficThreshold] 增量值无效:deviceId={}, identifier={}, value={}", + deviceId, identifier, propertyValue); + return; + } + + // 3. 无条件累加到当日统计(统计与工单触发解耦) + LocalDate today = LocalDate.now(); + if ("people_in".equals(identifier)) { + trafficCounterRedisDAO.incrementDaily(deviceId, today, increment, 0); + } else { + trafficCounterRedisDAO.incrementDaily(deviceId, today, 0, increment); + } + log.debug("[TrafficThreshold] 当日统计累加:deviceId={}, identifier={}, increment={}", + deviceId, identifier, increment); + + // 4. 以下为工单触发逻辑,需要设备配置支持 + if (!"people_in".equals(identifier)) { + return; // people_out 不参与阈值判定 + } + CleanOrderIntegrationConfigService.AreaDeviceConfigWrapper configWrapper = getConfigWrapper(deviceId); - if (configWrapper == null || configWrapper.getConfig() == null) { - log.debug("[TrafficThreshold] 设备无配置:deviceId={}", deviceId); + log.debug("[TrafficThreshold] 设备无工单触发配置:deviceId={}", deviceId); return; } @@ -78,26 +102,8 @@ public class TrafficThresholdRuleProcessor { return; } - // 3. 解析增量值 - Long increment = parseTrafficCount(propertyValue); - if (increment == null || increment <= 0) { - log.debug("[TrafficThreshold] 增量值无效:deviceId={}, identifier={}, value={}", - deviceId, identifier, propertyValue); - return; - } - Long areaId = configWrapper.getAreaId(); - LocalDate today = LocalDate.now(); - - // 4. 根据属性类型分别处理 - if ("people_in".equals(identifier)) { - handlePeopleIn(deviceId, areaId, increment, today, thresholdConfig, configWrapper); - } else { - // people_out:仅累加到当日统计 - trafficCounterRedisDAO.incrementDaily(deviceId, today, 0, increment); - log.debug("[TrafficThreshold] people_out 累加到当日统计:deviceId={}, areaId={}, increment={}", - deviceId, areaId, increment); - } + handlePeopleIn(deviceId, areaId, increment, today, thresholdConfig, configWrapper); } /** @@ -106,13 +112,10 @@ public class TrafficThresholdRuleProcessor { private void handlePeopleIn(Long deviceId, Long areaId, Long increment, LocalDate today, TrafficThresholdConfig thresholdConfig, CleanOrderIntegrationConfigService.AreaDeviceConfigWrapper configWrapper) { - // 1. 原子累加到阈值计数器,返回累积值 + // 1. 原子累加到阈值计数器,返回累积值(当日统计已在 processPropertyChange 中完成) Long accumulated = trafficCounterRedisDAO.incrementThreshold(deviceId, areaId, increment); - // 2. 原子累加到当日统计 - trafficCounterRedisDAO.incrementDaily(deviceId, today, increment, 0); - - log.debug("[TrafficThreshold] people_in 累加:deviceId={}, areaId={}, increment={}, accumulated={}, threshold={}", + log.debug("[TrafficThreshold] people_in 阈值累加:deviceId={}, areaId={}, increment={}, accumulated={}, threshold={}", deviceId, areaId, increment, accumulated, thresholdConfig.getThreshold()); // 3. 阈值判定 diff --git a/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/job/TrafficStatisticsCleanupJob.java b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/job/TrafficStatisticsCleanupJob.java index d11162c..4adb4ae 100644 --- a/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/job/TrafficStatisticsCleanupJob.java +++ b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/job/TrafficStatisticsCleanupJob.java @@ -1,5 +1,6 @@ package com.viewsh.module.ops.service.job; +import com.viewsh.framework.tenant.core.util.TenantUtils; import com.viewsh.module.ops.dal.mysql.statistics.OpsTrafficStatisticsMapper; import com.xxl.job.core.handler.annotation.XxlJob; import jakarta.annotation.Resource; @@ -36,7 +37,9 @@ public class TrafficStatisticsCleanupJob { try { LocalDateTime beforeTime = LocalDateTime.now().minusDays(30); - int deletedCount = trafficStatisticsMapper.deleteByStatHourBefore(beforeTime); + // 使用 executeIgnore 忽略租户过滤,清理所有租户的过期数据 + int deletedCount = TenantUtils.executeIgnore( + () -> trafficStatisticsMapper.deleteByStatHourBefore(beforeTime)); log.info("[TrafficStatisticsCleanupJob] 客流统计清理完成:删除 {} 条记录(截止时间={})", deletedCount, beforeTime); diff --git a/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/job/TrafficStatisticsPersistJob.java b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/job/TrafficStatisticsPersistJob.java index c262c0b..b4d1f8f 100644 --- a/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/job/TrafficStatisticsPersistJob.java +++ b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/job/TrafficStatisticsPersistJob.java @@ -149,6 +149,13 @@ public class TrafficStatisticsPersistJob { long lastPersistedIn = parseLong(data.get("lastPersistedIn")); long lastPersistedOut = parseLong(data.get("lastPersistedOut")); + // 读取租户ID + Long tenantId = parseLongOrNull(data.get("tenantId")); + if (tenantId == null) { + log.warn("[TrafficStatisticsPersistJob] 缺少租户信息,跳过:deviceId={}, date={}", deviceId, date); + return PersistResult.SKIPPED; + } + // 计算本次需要持久化的增量 long deltaIn = totalIn - lastPersistedIn; long deltaOut = totalOut - lastPersistedOut; @@ -179,8 +186,8 @@ public class TrafficStatisticsPersistJob { return PersistResult.ERROR; } - // 获取区域ID(从区域设备关系中查询) - Long areaId = getAreaIdForDevice(deviceId); + // 获取区域ID(在正确的租户上下文中查询) + Long areaId = getAreaIdForDevice(deviceId, tenantId); // P1修复4: 处理缺失区域关联场景 if (areaId == null) { @@ -204,15 +211,17 @@ public class TrafficStatisticsPersistJob { .peopleIn((int) deltaIn) .peopleOut((int) deltaOut) .build(); + record.setTenantId(tenantId); - trafficStatisticsMapper.upsert(record); + // 在正确的租户上下文中执行 upsert,确保租户拦截器正常工作 + TenantUtils.execute(tenantId, () -> trafficStatisticsMapper.upsert(record)); // 更新 Redis 已持久化的值 stringRedisTemplate.opsForHash().put(key, "lastPersistedIn", String.valueOf(totalIn)); stringRedisTemplate.opsForHash().put(key, "lastPersistedOut", String.valueOf(totalOut)); - log.debug("[TrafficStatisticsPersistJob] 持久化成功:deviceId={}, areaId={}, statHour={}, deltaIn={}, deltaOut={}", - deviceId, areaId, statHour, deltaIn, deltaOut); + log.debug("[TrafficStatisticsPersistJob] 持久化成功:deviceId={}, areaId={}, tenantId={}, statHour={}, deltaIn={}, deltaOut={}", + deviceId, areaId, tenantId, statHour, deltaIn, deltaOut); return PersistResult.SUCCESS; } @@ -241,13 +250,12 @@ public class TrafficStatisticsPersistJob { /** * 获取设备关联的区域ID */ - private Long getAreaIdForDevice(Long deviceId) { + private Long getAreaIdForDevice(Long deviceId, Long tenantId) { try { - // 通过 OpsBusAreaService 查询设备关联的区域 - // 使用 executeIgnore 忽略租户过滤,因为 xxl-job 线程无租户上下文 - return TenantUtils.executeIgnore(() -> areaService.getAreaIdByDeviceId(deviceId)); + // 在正确的租户上下文中查询设备关联的区域 + return TenantUtils.execute(tenantId, () -> areaService.getAreaIdByDeviceId(deviceId)); } catch (Exception e) { - log.error("[TrafficStatisticsPersistJob] 查询设备区域失败:deviceId={}", deviceId, e); + log.error("[TrafficStatisticsPersistJob] 查询设备区域失败:deviceId={}, tenantId={}", deviceId, tenantId, e); return null; } } @@ -285,7 +293,7 @@ public class TrafficStatisticsPersistJob { } /** - * 解析 Long 值 + * 解析 Long 值,无数据返回 0 */ private long parseLong(Object value) { if (value == null) { @@ -301,6 +309,23 @@ public class TrafficStatisticsPersistJob { } } + /** + * 解析 Long 值,无数据返回 null + */ + private Long parseLongOrNull(Object value) { + if (value == null) { + return null; + } + if (value instanceof Number) { + return ((Number) value).longValue(); + } + try { + return Long.parseLong(value.toString()); + } catch (NumberFormatException e) { + return null; + } + } + /** * 持久化结果枚举 */