feat(iot): 客流计数器支持累计值上报模式(CUMULATIVE)
Some checks failed
Java CI with Maven / build (11) (push) Has been cancelled
Java CI with Maven / build (17) (push) Has been cancelled
Java CI with Maven / build (8) (push) Has been cancelled

TrafficThresholdConfig 新增 reportMode 字段,支持 INCREMENTAL(默认)和 CUMULATIVE 两种模式。
累计值设备通过 Redis 存储上次值自动算差值,处理首次上报跳过和设备重启归零场景。
现有增量设备无需改配置,行为完全兼容。

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
lzh
2026-04-09 13:59:20 +08:00
parent c8ba3e63cb
commit 1ca472ea93
3 changed files with 146 additions and 22 deletions

View File

@@ -12,6 +12,27 @@ import lombok.Data;
@Data
public class TrafficThresholdConfig {
/**
* 上报模式INCREMENTAL / CUMULATIVE
*/
public static final String REPORT_MODE_INCREMENTAL = "INCREMENTAL";
public static final String REPORT_MODE_CUMULATIVE = "CUMULATIVE";
/**
* 设备上报模式
* <p>
* INCREMENTAL默认设备上报增量值直接使用
* CUMULATIVE设备上报累计值需计算差值得到增量
*/
private String reportMode;
/**
* 是否累计值模式
*/
public boolean isCumulative() {
return REPORT_MODE_CUMULATIVE.equalsIgnoreCase(reportMode);
}
/**
* 触发阈值
* <p>

View File

@@ -244,6 +244,58 @@ public class TrafficCounterRedisDAO {
return null;
}
// ==================== 累计值设备上次值存取 ====================
/**
* 累计值设备上次上报值 Key 模式
* <p>
* 格式iot:clean:traffic:lastvalue:{deviceId}
* Hash Fieldpeople_in / people_out
*/
private static final String LAST_VALUE_KEY_PATTERN = "iot:clean:traffic:lastvalue:%s";
/**
* 累计值 TTL- 7 天,跨天不丢失,设备离线数天后仍能衔接
*/
private static final int LAST_VALUE_TTL_SECONDS = 604800;
/**
* 获取设备上次上报的累计值
*
* @param deviceId 设备ID
* @param identifier 属性标识符people_in / people_out
* @return 上次累计值,首次上报时返回 null
*/
public Long getLastCumulativeValue(Long deviceId, String identifier) {
String key = String.format(LAST_VALUE_KEY_PATTERN, deviceId);
Object value = stringRedisTemplate.opsForHash().get(key, identifier);
if (value == null) {
return null;
}
try {
return Long.parseLong(value.toString());
} catch (NumberFormatException e) {
return null;
}
}
/**
* 更新设备上次上报的累计值
*
* @param deviceId 设备ID
* @param identifier 属性标识符people_in / people_out
* @param value 当前累计值
*/
public void setLastCumulativeValue(Long deviceId, String identifier, long value) {
String key = String.format(LAST_VALUE_KEY_PATTERN, deviceId);
stringRedisTemplate.opsForHash().put(key, identifier, String.valueOf(value));
// 仅在 key 无 TTL 时设置,避免高频上报场景下每次都执行 EXPIRE
Long ttl = stringRedisTemplate.getExpire(key, TimeUnit.SECONDS);
if (ttl == null || ttl == -1) {
stringRedisTemplate.expire(key, LAST_VALUE_TTL_SECONDS, TimeUnit.SECONDS);
}
}
// ==================== 私有方法 ====================
private static String formatThresholdKey(Long deviceId, Long areaId) {

View File

@@ -50,12 +50,13 @@ public class TrafficThresholdRuleProcessor {
* - people_in累加到当日统计 + 阈值计数器(需配置)
* - people_out累加到当日统计
* <p>
* 当日累积统计(用于报表)与工单触发(需配置)解耦
* 即使设备未配置工单触发规则,统计数据也会正常采集。
* 支持两种上报模式(通过 configData.trafficThreshold.reportMode 配置):
* - INCREMENTAL默认上报值直接作为增量
* - CUMULATIVE上报值为累计值自动计算差值得到增量
*
* @param deviceId 设备ID
* @param identifier 属性标识符people_in 或 people_out
* @param propertyValue 属性值(周期内增量)
* @param propertyValue 属性值(增量或累计值,取决于 reportMode
*/
public void processPropertyChange(Long deviceId, String identifier, Object propertyValue) {
// 1. 校验属性类型
@@ -66,15 +67,28 @@ public class TrafficThresholdRuleProcessor {
log.debug("[TrafficThreshold] 收到客流属性deviceId={}, identifier={}, value={}",
deviceId, identifier, propertyValue);
// 2. 解析增量
Long increment = parseTrafficCount(propertyValue);
if (increment == null || increment <= 0) {
log.debug("[TrafficThreshold] 增量值无效deviceId={}, identifier={}, value={}",
deviceId, identifier, propertyValue);
// 2. 解析原始
Long rawValue = parseTrafficCount(propertyValue);
if (rawValue == null || rawValue <= 0) {
return;
}
// 3. 无条件累加到当日统计(统计与工单触发解耦)
// 3. 获取配置,判断上报模式
CleanOrderIntegrationConfigService.AreaDeviceConfigWrapper configWrapper = getConfigWrapper(deviceId);
TrafficThresholdConfig thresholdConfig = resolveThresholdConfig(configWrapper);
// 4. 根据上报模式计算增量
long increment;
if (thresholdConfig != null && thresholdConfig.isCumulative()) {
increment = resolveIncrement(deviceId, identifier, rawValue);
} else {
increment = rawValue;
}
if (increment <= 0) {
return;
}
// 5. 累加到当日统计(统计与工单触发解耦)
LocalDate today = LocalDate.now();
if ("people_in".equals(identifier)) {
trafficCounterRedisDAO.incrementDaily(deviceId, today, increment, 0);
@@ -84,21 +98,11 @@ public class TrafficThresholdRuleProcessor {
log.debug("[TrafficThreshold] 当日统计累加deviceId={}, identifier={}, increment={}",
deviceId, identifier, increment);
// 4. 以下为工单触发逻辑,需要设备配置支持
// 6. 以下为工单触发逻辑,仅 people_in 参与
if (!"people_in".equals(identifier)) {
return; // people_out 不参与阈值判定
}
CleanOrderIntegrationConfigService.AreaDeviceConfigWrapper configWrapper =
getConfigWrapper(deviceId);
if (configWrapper == null || configWrapper.getConfig() == null) {
log.debug("[TrafficThreshold] 设备无工单触发配置deviceId={}", deviceId);
return;
}
TrafficThresholdConfig thresholdConfig = configWrapper.getConfig().getTrafficThreshold();
if (thresholdConfig == null || !thresholdConfig.getAutoCreateOrder()) {
log.debug("[TrafficThreshold] 未启用客流阈值触发deviceId={}", deviceId);
if (thresholdConfig == null || !Boolean.TRUE.equals(thresholdConfig.getAutoCreateOrder())) {
return;
}
@@ -106,10 +110,57 @@ public class TrafficThresholdRuleProcessor {
handlePeopleIn(deviceId, areaId, increment, today, thresholdConfig, configWrapper);
}
/**
* 从配置包装器中提取客流阈值配置
*
* @return 阈值配置,无配置时返回 null
*/
private TrafficThresholdConfig resolveThresholdConfig(
CleanOrderIntegrationConfigService.AreaDeviceConfigWrapper configWrapper) {
if (configWrapper == null || configWrapper.getConfig() == null) {
return null;
}
return configWrapper.getConfig().getTrafficThreshold();
}
/**
* 累计值转增量
* <p>
* 通过 Redis 存储上次上报的累计值,计算差值得到本次增量。
* 处理三种场景:首次上报、正常递增、设备重启归零。
*
* @param deviceId 设备ID
* @param identifier 属性标识符
* @param currentValue 本次上报的累计值
* @return 增量值;首次上报返回 0
*/
private long resolveIncrement(Long deviceId, String identifier, long currentValue) {
Long lastValue = trafficCounterRedisDAO.getLastCumulativeValue(deviceId, identifier);
// 无论是否能算出增量,都记录当前值
trafficCounterRedisDAO.setLastCumulativeValue(deviceId, identifier, currentValue);
if (lastValue == null) {
// 首次上报:无历史基准,不计入统计
log.info("[TrafficThreshold] 累计值设备首次上报建立基准deviceId={}, identifier={}, value={}",
deviceId, identifier, currentValue);
return 0;
}
if (currentValue >= lastValue) {
return currentValue - lastValue;
}
// currentValue < lastValue → 设备重启归零
log.info("[TrafficThreshold] 检测到设备重启deviceId={}, identifier={}, last={}, current={}",
deviceId, identifier, lastValue, currentValue);
return currentValue;
}
/**
* 处理 people_in 增量
*/
private void handlePeopleIn(Long deviceId, Long areaId, Long increment, LocalDate today,
private void handlePeopleIn(Long deviceId, Long areaId, long increment, LocalDate today,
TrafficThresholdConfig thresholdConfig,
CleanOrderIntegrationConfigService.AreaDeviceConfigWrapper configWrapper) {
// 1. 原子累加到阈值计数器,返回累积值(当日统计已在 processPropertyChange 中完成)