From 1ca472ea936ab3452952c0471a6c762381997cae Mon Sep 17 00:00:00 2001 From: lzh Date: Thu, 9 Apr 2026 13:59:20 +0800 Subject: [PATCH] =?UTF-8?q?feat(iot):=20=E5=AE=A2=E6=B5=81=E8=AE=A1?= =?UTF-8?q?=E6=95=B0=E5=99=A8=E6=94=AF=E6=8C=81=E7=B4=AF=E8=AE=A1=E5=80=BC?= =?UTF-8?q?=E4=B8=8A=E6=8A=A5=E6=A8=A1=E5=BC=8F=EF=BC=88CUMULATIVE?= =?UTF-8?q?=EF=BC=89?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit TrafficThresholdConfig 新增 reportMode 字段,支持 INCREMENTAL(默认)和 CUMULATIVE 两种模式。 累计值设备通过 Redis 存储上次值自动算差值,处理首次上报跳过和设备重启归零场景。 现有增量设备无需改配置,行为完全兼容。 Co-Authored-By: Claude Opus 4.6 (1M context) --- .../clean/TrafficThresholdConfig.java | 21 ++++ .../redis/clean/TrafficCounterRedisDAO.java | 52 ++++++++++ .../TrafficThresholdRuleProcessor.java | 95 ++++++++++++++----- 3 files changed, 146 insertions(+), 22 deletions(-) diff --git a/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/dal/dataobject/integration/clean/TrafficThresholdConfig.java b/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/dal/dataobject/integration/clean/TrafficThresholdConfig.java index cb840c9..b2dcd48 100644 --- a/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/dal/dataobject/integration/clean/TrafficThresholdConfig.java +++ b/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/dal/dataobject/integration/clean/TrafficThresholdConfig.java @@ -12,6 +12,27 @@ import lombok.Data; @Data public class TrafficThresholdConfig { + /** + * 上报模式:INCREMENTAL / CUMULATIVE + */ + public static final String REPORT_MODE_INCREMENTAL = "INCREMENTAL"; + public static final String REPORT_MODE_CUMULATIVE = "CUMULATIVE"; + + /** + * 设备上报模式 + *

+ * INCREMENTAL(默认):设备上报增量值,直接使用 + * CUMULATIVE:设备上报累计值,需计算差值得到增量 + */ + private String reportMode; + + /** + * 是否累计值模式 + */ + public boolean isCumulative() { + return REPORT_MODE_CUMULATIVE.equalsIgnoreCase(reportMode); + } + /** * 触发阈值 *

diff --git a/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/dal/redis/clean/TrafficCounterRedisDAO.java b/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/dal/redis/clean/TrafficCounterRedisDAO.java index 0132edf..6cb4939 100644 --- a/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/dal/redis/clean/TrafficCounterRedisDAO.java +++ b/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/dal/redis/clean/TrafficCounterRedisDAO.java @@ -244,6 +244,58 @@ public class TrafficCounterRedisDAO { return null; } + // ==================== 累计值设备上次值存取 ==================== + + /** + * 累计值设备上次上报值 Key 模式 + *

+ * 格式:iot:clean:traffic:lastvalue:{deviceId} + * Hash Field:people_in / people_out + */ + private static final String LAST_VALUE_KEY_PATTERN = "iot:clean:traffic:lastvalue:%s"; + + /** + * 累计值 TTL(秒)- 7 天,跨天不丢失,设备离线数天后仍能衔接 + */ + private static final int LAST_VALUE_TTL_SECONDS = 604800; + + /** + * 获取设备上次上报的累计值 + * + * @param deviceId 设备ID + * @param identifier 属性标识符(people_in / people_out) + * @return 上次累计值,首次上报时返回 null + */ + public Long getLastCumulativeValue(Long deviceId, String identifier) { + String key = String.format(LAST_VALUE_KEY_PATTERN, deviceId); + Object value = stringRedisTemplate.opsForHash().get(key, identifier); + if (value == null) { + return null; + } + try { + return Long.parseLong(value.toString()); + } catch (NumberFormatException e) { + return null; + } + } + + /** + * 更新设备上次上报的累计值 + * + * @param deviceId 设备ID + * @param identifier 属性标识符(people_in / people_out) + * @param value 当前累计值 + */ + public void setLastCumulativeValue(Long deviceId, String identifier, long value) { + String key = String.format(LAST_VALUE_KEY_PATTERN, deviceId); + stringRedisTemplate.opsForHash().put(key, identifier, String.valueOf(value)); + // 仅在 key 无 TTL 时设置,避免高频上报场景下每次都执行 EXPIRE + Long ttl = stringRedisTemplate.getExpire(key, TimeUnit.SECONDS); + if (ttl == null || ttl == -1) { + stringRedisTemplate.expire(key, LAST_VALUE_TTL_SECONDS, TimeUnit.SECONDS); + } + } + // ==================== 私有方法 ==================== private static String formatThresholdKey(Long deviceId, Long areaId) { diff --git a/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/service/rule/clean/processor/TrafficThresholdRuleProcessor.java b/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/service/rule/clean/processor/TrafficThresholdRuleProcessor.java index 52e9010..85b3a7b 100644 --- a/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/service/rule/clean/processor/TrafficThresholdRuleProcessor.java +++ b/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/service/rule/clean/processor/TrafficThresholdRuleProcessor.java @@ -50,12 +50,13 @@ public class TrafficThresholdRuleProcessor { * - people_in:累加到当日统计 + 阈值计数器(需配置) * - people_out:累加到当日统计 *

- * 当日累积统计(用于报表)与工单触发(需配置)解耦: - * 即使设备未配置工单触发规则,统计数据也会正常采集。 + * 支持两种上报模式(通过 configData.trafficThreshold.reportMode 配置): + * - INCREMENTAL(默认):上报值直接作为增量 + * - CUMULATIVE:上报值为累计值,自动计算差值得到增量 * * @param deviceId 设备ID * @param identifier 属性标识符(people_in 或 people_out) - * @param propertyValue 属性值(周期内增量) + * @param propertyValue 属性值(增量或累计值,取决于 reportMode) */ public void processPropertyChange(Long deviceId, String identifier, Object propertyValue) { // 1. 校验属性类型 @@ -66,15 +67,28 @@ public class TrafficThresholdRuleProcessor { log.debug("[TrafficThreshold] 收到客流属性:deviceId={}, identifier={}, value={}", deviceId, identifier, propertyValue); - // 2. 解析增量值 - Long increment = parseTrafficCount(propertyValue); - if (increment == null || increment <= 0) { - log.debug("[TrafficThreshold] 增量值无效:deviceId={}, identifier={}, value={}", - deviceId, identifier, propertyValue); + // 2. 解析原始值 + Long rawValue = parseTrafficCount(propertyValue); + if (rawValue == null || rawValue <= 0) { return; } - // 3. 无条件累加到当日统计(统计与工单触发解耦) + // 3. 获取配置,判断上报模式 + CleanOrderIntegrationConfigService.AreaDeviceConfigWrapper configWrapper = getConfigWrapper(deviceId); + TrafficThresholdConfig thresholdConfig = resolveThresholdConfig(configWrapper); + + // 4. 根据上报模式计算增量 + long increment; + if (thresholdConfig != null && thresholdConfig.isCumulative()) { + increment = resolveIncrement(deviceId, identifier, rawValue); + } else { + increment = rawValue; + } + if (increment <= 0) { + return; + } + + // 5. 累加到当日统计(统计与工单触发解耦) LocalDate today = LocalDate.now(); if ("people_in".equals(identifier)) { trafficCounterRedisDAO.incrementDaily(deviceId, today, increment, 0); @@ -84,21 +98,11 @@ public class TrafficThresholdRuleProcessor { log.debug("[TrafficThreshold] 当日统计累加:deviceId={}, identifier={}, increment={}", deviceId, identifier, increment); - // 4. 以下为工单触发逻辑,需要设备配置支持 + // 6. 以下为工单触发逻辑,仅 people_in 参与 if (!"people_in".equals(identifier)) { - return; // people_out 不参与阈值判定 - } - - CleanOrderIntegrationConfigService.AreaDeviceConfigWrapper configWrapper = - getConfigWrapper(deviceId); - if (configWrapper == null || configWrapper.getConfig() == null) { - log.debug("[TrafficThreshold] 设备无工单触发配置:deviceId={}", deviceId); return; } - - TrafficThresholdConfig thresholdConfig = configWrapper.getConfig().getTrafficThreshold(); - if (thresholdConfig == null || !thresholdConfig.getAutoCreateOrder()) { - log.debug("[TrafficThreshold] 未启用客流阈值触发:deviceId={}", deviceId); + if (thresholdConfig == null || !Boolean.TRUE.equals(thresholdConfig.getAutoCreateOrder())) { return; } @@ -106,10 +110,57 @@ public class TrafficThresholdRuleProcessor { handlePeopleIn(deviceId, areaId, increment, today, thresholdConfig, configWrapper); } + /** + * 从配置包装器中提取客流阈值配置 + * + * @return 阈值配置,无配置时返回 null + */ + private TrafficThresholdConfig resolveThresholdConfig( + CleanOrderIntegrationConfigService.AreaDeviceConfigWrapper configWrapper) { + if (configWrapper == null || configWrapper.getConfig() == null) { + return null; + } + return configWrapper.getConfig().getTrafficThreshold(); + } + + /** + * 累计值转增量 + *

+ * 通过 Redis 存储上次上报的累计值,计算差值得到本次增量。 + * 处理三种场景:首次上报、正常递增、设备重启归零。 + * + * @param deviceId 设备ID + * @param identifier 属性标识符 + * @param currentValue 本次上报的累计值 + * @return 增量值;首次上报返回 0 + */ + private long resolveIncrement(Long deviceId, String identifier, long currentValue) { + Long lastValue = trafficCounterRedisDAO.getLastCumulativeValue(deviceId, identifier); + + // 无论是否能算出增量,都记录当前值 + trafficCounterRedisDAO.setLastCumulativeValue(deviceId, identifier, currentValue); + + if (lastValue == null) { + // 首次上报:无历史基准,不计入统计 + log.info("[TrafficThreshold] 累计值设备首次上报,建立基准:deviceId={}, identifier={}, value={}", + deviceId, identifier, currentValue); + return 0; + } + + if (currentValue >= lastValue) { + return currentValue - lastValue; + } + + // currentValue < lastValue → 设备重启归零 + log.info("[TrafficThreshold] 检测到设备重启:deviceId={}, identifier={}, last={}, current={}", + deviceId, identifier, lastValue, currentValue); + return currentValue; + } + /** * 处理 people_in 增量 */ - private void handlePeopleIn(Long deviceId, Long areaId, Long increment, LocalDate today, + private void handlePeopleIn(Long deviceId, Long areaId, long increment, LocalDate today, TrafficThresholdConfig thresholdConfig, CleanOrderIntegrationConfigService.AreaDeviceConfigWrapper configWrapper) { // 1. 原子累加到阈值计数器,返回累积值(当日统计已在 processPropertyChange 中完成)