refactor(iot): 重构客流计数器为增量累加模式,支持 people_out

- 删除旧 TrafficCounterBaseRedisDAO(基准值模式),新增 TrafficCounterRedisDAO
  支持阈值计数器(达标后重置)和当日累积统计(用于报表)
- TrafficThresholdRuleProcessor 改为增量原子累加,消除基准值校准逻辑
- CleanRuleProcessorManager 路由增加 people_out 支持
- TrafficCounterBaseResetJob 改为每日清除阈值计数器,持久化职责移交 Ops 模块
- 使用 SCAN 替代 KEYS 避免阻塞 Redis

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
lzh
2026-02-03 15:34:03 +08:00
parent 6a109954d3
commit 46024fd043
13 changed files with 915 additions and 175 deletions

View File

@@ -0,0 +1,42 @@
package com.viewsh.module.ops.dal.dataobject.dto;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data;
import java.time.LocalDateTime;
import java.util.Map;
/**
* 工单业务日志响应 DTO
*
* @author lzh
*/
@Schema(description = "管理后台 - 工单业务日志 Response DTO")
@Data
public class OpsOrderBusinessLogRespDTO {
@Schema(description = "日志ID", requiredMode = Schema.RequiredMode.REQUIRED, example = "1")
private Long id;
@Schema(description = "日志类型system=系统/user=用户)", requiredMode = Schema.RequiredMode.REQUIRED, example = "system")
private String type;
@Schema(description = "日志标题", requiredMode = Schema.RequiredMode.REQUIRED, example = "工单自动创建")
private String title;
@Schema(description = "日志内容", example = "蓝牙信标触发自动创建保洁工单")
private String content;
@Schema(description = "操作人", requiredMode = Schema.RequiredMode.REQUIRED, example = "系统")
private String operator;
@Schema(description = "日志时间", requiredMode = Schema.RequiredMode.REQUIRED, example = "2026-01-23 14:30:25")
private LocalDateTime time;
@Schema(description = "工单状态", example = "PENDING")
private String status;
@Schema(description = "扩展数据", example = "{}")
private Map<String, Object> extra;
}

View File

@@ -0,0 +1,23 @@
package com.viewsh.module.ops.dal.dataobject.dto;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data;
import java.util.List;
/**
* 工单业务日志列表响应 DTO
*
* @author lzh
*/
@Schema(description = "管理后台 - 工单业务日志列表 Response DTO")
@Data
public class OpsOrderBusinessLogsRespDTO {
@Schema(description = "工单ID", requiredMode = Schema.RequiredMode.REQUIRED, example = "10001")
private Long orderId;
@Schema(description = "日志列表", requiredMode = Schema.RequiredMode.REQUIRED)
private List<OpsOrderBusinessLogRespDTO> logs;
}

View File

@@ -0,0 +1,62 @@
package com.viewsh.module.ops.dal.dataobject.statistics;
import com.baomidou.mybatisplus.annotation.KeySequence;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import com.viewsh.framework.tenant.core.db.TenantBaseDO;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
import java.time.LocalDateTime;
/**
* 客流统计小时汇总 DO
* <p>
* 用于 Ops 业务报表统计,数据来源于 IoT 设备的客流计数器
*
* @author AI
*/
@TableName("ops_traffic_statistics")
@KeySequence("ops_traffic_statistics_seq")
@Data
@EqualsAndHashCode(callSuper = true)
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class OpsTrafficStatisticsDO extends TenantBaseDO {
/**
* 主键
*/
@TableId
private Long id;
/**
* 设备ID数据来源
*/
private Long deviceId;
/**
* 区域ID主查询维度
*/
private Long areaId;
/**
* 统计小时(精确到小时,如 2026-02-03 10:00:00
*/
private LocalDateTime statHour;
/**
* 进入人数
*/
private Integer peopleIn;
/**
* 离开人数
*/
private Integer peopleOut;
}

View File

@@ -0,0 +1,36 @@
package com.viewsh.module.ops.dal.mysql.statistics;
import com.viewsh.framework.mybatis.core.mapper.BaseMapperX;
import com.viewsh.module.ops.dal.dataobject.statistics.OpsTrafficStatisticsDO;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
import java.time.LocalDateTime;
/**
* 客流统计小时汇总 Mapper
*
* @author AI
*/
@Mapper
public interface OpsTrafficStatisticsMapper extends BaseMapperX<OpsTrafficStatisticsDO> {
/**
* 插入或更新统计记录
* <p>
* 使用 INSERT ... ON DUPLICATE KEY UPDATE 实现 upsert
* 注意:方法名改为 upsert 避免与 MyBatis Plus BaseMapper.insertOrUpdate() 冲突
*
* @param record 统计记录
*/
void upsert(@Param("record") OpsTrafficStatisticsDO record);
/**
* 删除指定时间之前的统计记录
*
* @param beforeTime 截止时间
* @return 删除的记录数
*/
int deleteByStatHourBefore(@Param("beforeTime") LocalDateTime beforeTime);
}

View File

@@ -0,0 +1,50 @@
package com.viewsh.module.ops.service.job;
import com.viewsh.module.ops.dal.mysql.statistics.OpsTrafficStatisticsMapper;
import com.xxl.job.core.handler.annotation.XxlJob;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
/**
* 客流统计清理任务
* <p>
* 每月 1 日凌晨 2 点执行,删除 30 天前的客流统计记录
*
* @author AI
*/
@Slf4j
@Component
public class TrafficStatisticsCleanupJob {
@Resource
private OpsTrafficStatisticsMapper trafficStatisticsMapper;
/**
* 清理过期的客流统计记录
* <p>
* XxlJob 配置:
* - Cron: 0 0 2 1 * ? (每月 1 日凌晨 2 点)
*
* @return 执行结果
*/
@XxlJob("trafficStatisticsCleanupJob")
public String execute() {
log.info("[TrafficStatisticsCleanupJob] 开始执行客流统计清理任务");
try {
LocalDateTime beforeTime = LocalDateTime.now().minusDays(30);
int deletedCount = trafficStatisticsMapper.deleteByStatHourBefore(beforeTime);
log.info("[TrafficStatisticsCleanupJob] 客流统计清理完成:删除 {} 条记录(截止时间={}",
deletedCount, beforeTime);
return "清理完成:删除 " + deletedCount + " 条记录";
} catch (Exception e) {
log.error("[TrafficStatisticsCleanupJob] 客流统计清理失败", e);
return "清理失败: " + e.getMessage();
}
}
}

View File

@@ -0,0 +1,310 @@
package com.viewsh.module.ops.service.job;
import com.viewsh.module.ops.dal.dataobject.statistics.OpsTrafficStatisticsDO;
import com.viewsh.module.ops.dal.mysql.statistics.OpsTrafficStatisticsMapper;
import com.viewsh.module.ops.service.area.OpsBusAreaService;
import com.xxl.job.core.handler.annotation.XxlJob;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.Cursor;
import org.springframework.data.redis.core.ScanOptions;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.format.DateTimeFormatter;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
/**
* 客流统计持久化任务
* <p>
* 每小时整点执行,将 IoT 模块 Redis 中的当日累积统计数据写入 Ops 业务库
* <p>
* 通过对比 Redis 中的 lastPersistedIn/lastPersistedOut 与 totalIn/totalOut
* 计算本小时的增量并写入 ops_traffic_statistics 表
*
* @author AI
*/
@Slf4j
@Component
public class TrafficStatisticsPersistJob {
private static final DateTimeFormatter DATE_FORMATTER = DateTimeFormatter.ofPattern("yyyyMMdd");
private static final String LOCK_KEY = "ops:traffic:persist:lock";
private static final int LOCK_TTL_SECONDS = 300; // 5分钟锁超时
@Resource
private StringRedisTemplate stringRedisTemplate;
@Resource
private OpsTrafficStatisticsMapper trafficStatisticsMapper;
@Resource
private OpsBusAreaService areaService;
/**
* 每小时持久化客流统计数据
* <p>
* XxlJob 配置:
* - Cron: 0 0 * * * ? (每小时整点)
*
* @return 执行结果
*/
@XxlJob("trafficStatisticsPersistJob")
public String execute() {
log.info("[TrafficStatisticsPersistJob] 开始执行客流统计持久化任务");
// P0修复1: 获取分布式锁,防止并发执行导致双重计数
Boolean locked = stringRedisTemplate.opsForValue()
.setIfAbsent(LOCK_KEY, String.valueOf(System.currentTimeMillis()),
LOCK_TTL_SECONDS, TimeUnit.SECONDS);
if (Boolean.FALSE.equals(locked)) {
log.warn("[TrafficStatisticsPersistJob] 任务已在其他节点执行中,跳过本次执行");
return "任务已在其他节点执行";
}
try {
// P1修复3: 使用 SCAN 替代 KEYS避免阻塞 Redis
Set<String> dailyKeys = scanDailyKeys();
if (dailyKeys.isEmpty()) {
log.info("[TrafficStatisticsPersistJob] 无统计数据需要持久化");
return "无数据";
}
int successCount = 0;
int errorCount = 0;
int skippedCount = 0;
for (String key : dailyKeys) {
try {
PersistResult result = persistSingleDevice(key);
if (result == PersistResult.SUCCESS) {
successCount++;
} else if (result == PersistResult.SKIPPED) {
skippedCount++;
} else {
errorCount++;
}
} catch (Exception e) {
errorCount++;
log.error("[TrafficStatisticsPersistJob] 持久化失败key={}", key, e);
}
}
String result = String.format("持久化完成:成功=%d, 跳过=%d, 失败=%d",
successCount, skippedCount, errorCount);
log.info("[TrafficStatisticsPersistJob] {}", result);
return result;
} catch (Exception e) {
log.error("[TrafficStatisticsPersistJob] 持久化任务执行失败", e);
return "执行失败: " + e.getMessage();
} finally {
// 释放分布式锁
stringRedisTemplate.delete(LOCK_KEY);
}
}
/**
* 使用 SCAN 扫描所有当日统计 key
*/
private Set<String> scanDailyKeys() {
Set<String> keys = new HashSet<>();
ScanOptions options = ScanOptions.scanOptions()
.match("iot:clean:traffic:daily:*")
.count(100)
.build();
try (Cursor<String> cursor = stringRedisTemplate.scan(options)) {
cursor.forEachRemaining(keys::add);
} catch (Exception e) {
log.error("[TrafficStatisticsPersistJob] SCAN 扫描失败", e);
}
return keys;
}
/**
* 持久化单个设备的统计数据
*/
private PersistResult persistSingleDevice(String key) {
Long deviceId = parseDeviceIdFromKey(key);
LocalDate date = parseDateFromKey(key);
if (deviceId == null || date == null) {
log.warn("[TrafficStatisticsPersistJob] 解析 key 失败key={}", key);
return PersistResult.ERROR;
}
// 读取 Redis Hash 数据
Map<Object, Object> data = stringRedisTemplate.opsForHash().entries(key);
long totalIn = parseLong(data.get("totalIn"));
long totalOut = parseLong(data.get("totalOut"));
long lastPersistedIn = parseLong(data.get("lastPersistedIn"));
long lastPersistedOut = parseLong(data.get("lastPersistedOut"));
// 计算本次需要持久化的增量
long deltaIn = totalIn - lastPersistedIn;
long deltaOut = totalOut - lastPersistedOut;
// P1修复5: 处理负增量(计数器重置场景)
if (deltaIn < 0 || deltaOut < 0) {
log.warn("[TrafficStatisticsPersistJob] 检测到计数器重置deviceId={}, date={}, " +
"totalIn={}, lastPersistedIn={}, totalOut={}, lastPersistedOut={}",
deviceId, date, totalIn, lastPersistedIn, totalOut, lastPersistedOut);
// 重置已持久化标记,从当前值重新开始计数
stringRedisTemplate.opsForHash().put(key, "lastPersistedIn", String.valueOf(totalIn));
stringRedisTemplate.opsForHash().put(key, "lastPersistedOut", String.valueOf(totalOut));
log.info("[TrafficStatisticsPersistJob] 已重置持久化标记deviceId={}, date={}", deviceId, date);
return PersistResult.SKIPPED;
}
if (deltaIn <= 0 && deltaOut <= 0) {
log.debug("[TrafficStatisticsPersistJob] 无新增数据deviceId={}, date={}", deviceId, date);
return PersistResult.SKIPPED;
}
// P1修复7: 检查 int 溢出
if (deltaIn > Integer.MAX_VALUE || deltaOut > Integer.MAX_VALUE) {
log.error("[TrafficStatisticsPersistJob] 增量值溢出deviceId={}, date={}, deltaIn={}, deltaOut={}",
deviceId, date, deltaIn, deltaOut);
return PersistResult.ERROR;
}
// 获取区域ID从区域设备关系中查询
Long areaId = getAreaIdForDevice(deviceId);
// P1修复4: 处理缺失区域关联场景
if (areaId == null) {
log.warn("[TrafficStatisticsPersistJob] 设备无区域关联更新持久化标记避免数据累积deviceId={}", deviceId);
// 更新 lastPersisted避免下次累积错误数据
stringRedisTemplate.opsForHash().put(key, "lastPersistedIn", String.valueOf(totalIn));
stringRedisTemplate.opsForHash().put(key, "lastPersistedOut", String.valueOf(totalOut));
// TODO: 将数据存入死信队列,等待区域关联修复后重试
return PersistResult.SKIPPED;
}
// P0修复2: 修复小时分桶逻辑
LocalDateTime statHour = calculateStatHour(date);
OpsTrafficStatisticsDO record = OpsTrafficStatisticsDO.builder()
.deviceId(deviceId)
.areaId(areaId)
.statHour(statHour)
.peopleIn((int) deltaIn)
.peopleOut((int) deltaOut)
.build();
trafficStatisticsMapper.upsert(record);
// 更新 Redis 已持久化的值
stringRedisTemplate.opsForHash().put(key, "lastPersistedIn", String.valueOf(totalIn));
stringRedisTemplate.opsForHash().put(key, "lastPersistedOut", String.valueOf(totalOut));
log.debug("[TrafficStatisticsPersistJob] 持久化成功deviceId={}, areaId={}, statHour={}, deltaIn={}, deltaOut={}",
deviceId, areaId, statHour, deltaIn, deltaOut);
return PersistResult.SUCCESS;
}
/**
* P0修复2: 计算正确的统计小时
* <p>
* 规则:
* - 如果是今天的数据,归属到当前小时
* - 如果是昨天的数据跨日执行归属到昨天的23点
* - 如果是更早的数据延迟执行归属到当天的23点
*/
private LocalDateTime calculateStatHour(LocalDate dataDate) {
LocalDate today = LocalDate.now();
int currentHour = LocalDateTime.now().getHour();
if (dataDate.equals(today)) {
// 今天的数据,归属到当前小时
return LocalDateTime.of(dataDate, LocalTime.of(currentHour, 0));
} else {
// 历史数据昨天或更早归属到当天的23点
return LocalDateTime.of(dataDate, LocalTime.of(23, 0));
}
}
/**
* 获取设备关联的区域ID
*/
private Long getAreaIdForDevice(Long deviceId) {
try {
// 通过 OpsBusAreaService 查询设备关联的区域
return areaService.getAreaIdByDeviceId(deviceId);
} catch (Exception e) {
log.error("[TrafficStatisticsPersistJob] 查询设备区域失败deviceId={}", deviceId, e);
return null;
}
}
/**
* 解析 Redis key 中的设备ID
* key 格式iot:clean:traffic:daily:{deviceId}:{date}
*/
private Long parseDeviceIdFromKey(String key) {
String[] parts = key.split(":");
if (parts.length >= 6) {
try {
return Long.parseLong(parts[4]);
} catch (NumberFormatException e) {
return null;
}
}
return null;
}
/**
* 解析 Redis key 中的日期
* key 格式iot:clean:traffic:daily:{deviceId}:{date}
*/
private LocalDate parseDateFromKey(String key) {
String[] parts = key.split(":");
if (parts.length >= 6) {
try {
return LocalDate.parse(parts[5], DATE_FORMATTER);
} catch (Exception e) {
return null;
}
}
return null;
}
/**
* 解析 Long 值
*/
private long parseLong(Object value) {
if (value == null) {
return 0L;
}
if (value instanceof Number) {
return ((Number) value).longValue();
}
try {
return Long.parseLong(value.toString());
} catch (NumberFormatException e) {
return 0L;
}
}
/**
* 持久化结果枚举
*/
private enum PersistResult {
SUCCESS, // 成功
SKIPPED, // 跳过(无数据、重置等)
ERROR // 错误
}
}

View File

@@ -0,0 +1,20 @@
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.viewsh.module.ops.dal.mysql.statistics.OpsTrafficStatisticsMapper">
<insert id="upsert">
INSERT INTO ops_traffic_statistics (device_id, area_id, stat_hour, people_in, people_out, tenant_id)
VALUES (#{record.deviceId}, #{record.areaId}, #{record.statHour}, #{record.peopleIn}, #{record.peopleOut}, #{record.tenantId})
ON DUPLICATE KEY UPDATE
people_in = people_in + VALUES(people_in),
people_out = people_out + VALUES(people_out),
update_time = NOW()
</insert>
<delete id="deleteByStatHourBefore">
DELETE FROM ops_traffic_statistics
WHERE stat_hour &lt; #{beforeTime}
AND deleted = 0
</delete>
</mapper>