diff --git a/sql/mysql/aiot_ops_traffic_statistics.sql b/sql/mysql/aiot_ops_traffic_statistics.sql new file mode 100644 index 0000000..3aeb411 --- /dev/null +++ b/sql/mysql/aiot_ops_traffic_statistics.sql @@ -0,0 +1,28 @@ +-- Ops 模块:客流统计小时汇总表 +-- 用于业务报表统计,数据来源于 IoT 设备的客流计数器 +-- 每小时由 Ops 的 TrafficStatisticsPersistJob 从 Redis 持久化到 MySQL + +CREATE TABLE IF NOT EXISTS `ops_traffic_statistics` ( + `id` BIGINT AUTO_INCREMENT PRIMARY KEY, + `device_id` BIGINT NOT NULL COMMENT '设备ID(数据来源)', + `area_id` BIGINT NOT NULL COMMENT '区域ID(主查询维度)', + `stat_hour` DATETIME NOT NULL COMMENT '统计小时(精确到小时,如 2026-02-03 10:00:00)', + `people_in` INT NOT NULL DEFAULT 0 COMMENT '进入人数', + `people_out` INT NOT NULL DEFAULT 0 COMMENT '离开人数', + `tenant_id` BIGINT NOT NULL DEFAULT 0 COMMENT '租户ID', + `creator` VARCHAR(64) DEFAULT '' COMMENT '创建者', + `create_time` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', + `updater` VARCHAR(64) DEFAULT '' COMMENT '更新者', + `update_time` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间', + `deleted` BIT(1) NOT NULL DEFAULT 0 COMMENT '是否删除', + + -- 唯一约束:同一设备、同一小时、同一租户,只有一条记录 + UNIQUE KEY `uk_device_hour_tenant` (`device_id`, `stat_hour`, `tenant_id`, `deleted`), + + -- 区域+小时索引:用于按区域统计客流 + INDEX `idx_area_hour` (`area_id`, `stat_hour`), + + -- 设备+小时索引:用于按设备查询历史 + INDEX `idx_device_hour` (`device_id`, `stat_hour`) + +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='客流统计小时汇总表(Ops业务统计)'; diff --git a/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/dal/redis/clean/TrafficCounterBaseRedisDAO.java b/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/dal/redis/clean/TrafficCounterBaseRedisDAO.java deleted file mode 100644 index 8e05ac7..0000000 --- a/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/dal/redis/clean/TrafficCounterBaseRedisDAO.java +++ /dev/null @@ -1,114 +0,0 @@ -package com.viewsh.module.iot.dal.redis.clean; - -import jakarta.annotation.Resource; -import org.springframework.data.redis.core.StringRedisTemplate; -import org.springframework.stereotype.Repository; - -import java.util.concurrent.TimeUnit; - -/** - * 客流计数器基准值 Redis DAO - *

- * 用于维护客流计数器的基准值,支持逻辑清零和每日自动校准 - * 实际客流 = 当前计数值 - 基准值 - * - * @author AI - */ -@Repository -public class TrafficCounterBaseRedisDAO { - - /** - * 基准值 Key 模式 - *

- * 格式:iot:clean:traffic:base:{deviceId} - */ - private static final String BASE_KEY_PATTERN = "iot:clean:traffic:base:%s"; - - /** - * 基准值的 TTL(秒) - *

- * 默认保留 7 天 - */ - private static final int BASE_TTL_SECONDS = 604800; - - @Resource - private StringRedisTemplate stringRedisTemplate; - - /** - * 设置基准值 - * - * @param deviceId 设备ID - * @param baseValue 基准值 - */ - public void setBaseValue(Long deviceId, Long baseValue) { - String key = formatKey(deviceId); - stringRedisTemplate.opsForValue().set(key, String.valueOf(baseValue), - BASE_TTL_SECONDS, TimeUnit.SECONDS); - } - - /** - * 获取基准值 - * - * @param deviceId 设备ID - * @return 基准值,如果不存在返回 0 - */ - public Long getBaseValue(Long deviceId) { - String key = formatKey(deviceId); - String baseValueStr = stringRedisTemplate.opsForValue().get(key); - - if (baseValueStr == null) { - return 0L; - } - - return Long.parseLong(baseValueStr); - } - - /** - * 重置基准值 - *

- * 将基准值设置为 0 - * - * @param deviceId 设备ID - */ - public void resetBaseValue(Long deviceId) { - setBaseValue(deviceId, 0L); - } - - /** - * 删除基准值 - * - * @param deviceId 设备ID - */ - public void deleteBaseValue(Long deviceId) { - String key = formatKey(deviceId); - stringRedisTemplate.delete(key); - } - - /** - * 清除所有基准值 - *

- * 用于定时任务,每天 00:00 清零所有客流计数器基准值 - * - * @return 清除的数量 - */ - public int resetAll() { - String pattern = BASE_KEY_PATTERN.replace("%s", "*"); - var keys = stringRedisTemplate.keys(pattern); - - if (keys == null || keys.isEmpty()) { - return 0; - } - - // 批量删除 - stringRedisTemplate.delete(keys); - - return keys.size(); - } - - /** - * 格式化 Redis Key - */ - private static String formatKey(Long deviceId) { - return String.format(BASE_KEY_PATTERN, deviceId); - } -} diff --git a/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/dal/redis/clean/TrafficCounterRedisDAO.java b/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/dal/redis/clean/TrafficCounterRedisDAO.java new file mode 100644 index 0000000..58ca275 --- /dev/null +++ b/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/dal/redis/clean/TrafficCounterRedisDAO.java @@ -0,0 +1,261 @@ +package com.viewsh.module.iot.dal.redis.clean; + +import jakarta.annotation.Resource; +import org.springframework.data.redis.core.Cursor; +import org.springframework.data.redis.core.ScanOptions; +import org.springframework.data.redis.core.StringRedisTemplate; +import org.springframework.stereotype.Repository; + +import java.time.LocalDate; +import java.time.format.DateTimeFormatter; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +/** + * 客流计数器 Redis DAO + *

+ * 维护两类数据: + * 1. 阈值计数器:累加增量,达到阈值触发工单后重置为 0 + * 2. 当日累积统计:不因工单触发而重置,用于统计报表 + * + * @author AI + */ +@Repository +public class TrafficCounterRedisDAO { + + /** + * 阈值计数器 Key 模式 + *

+ * 格式:iot:clean:traffic:threshold:{deviceId}:{areaId} + */ + private static final String THRESHOLD_KEY_PATTERN = "iot:clean:traffic:threshold:%s:%s"; + + /** + * 当日累积统计 Key 模式 + *

+ * 格式:iot:clean:traffic:daily:{deviceId}:{date} + */ + private static final String DAILY_KEY_PATTERN = "iot:clean:traffic:daily:%s:%s"; + + /** + * 阈值计数器 TTL(秒)- 1 天 + */ + private static final int THRESHOLD_TTL_SECONDS = 86400; + + /** + * 当日累积统计 TTL(秒)- 2 天,确保跨日持久化任务能读取到昨天数据 + */ + private static final int DAILY_TTL_SECONDS = 172800; + + private static final String FIELD_TOTAL_IN = "totalIn"; + private static final String FIELD_TOTAL_OUT = "totalOut"; + private static final String FIELD_LAST_PERSISTED_IN = "lastPersistedIn"; + private static final String FIELD_LAST_PERSISTED_OUT = "lastPersistedOut"; + + private static final DateTimeFormatter DATE_FORMATTER = DateTimeFormatter.ofPattern("yyyyMMdd"); + + @Resource + private StringRedisTemplate stringRedisTemplate; + + // ==================== 阈值计数器 ==================== + + /** + * 原子递增阈值计数器 + * + * @param deviceId 设备ID + * @param areaId 区域ID + * @param increment 增量 + * @return 递增后的累积值 + */ + public Long incrementThreshold(Long deviceId, Long areaId, long increment) { + String key = formatThresholdKey(deviceId, areaId); + Long result = stringRedisTemplate.opsForValue().increment(key, increment); + // 确保 key 有 TTL(防止并发场景下 TTL 未设置) + Long ttl = stringRedisTemplate.getExpire(key, TimeUnit.SECONDS); + if (ttl == null || ttl == -1) { + stringRedisTemplate.expire(key, THRESHOLD_TTL_SECONDS, TimeUnit.SECONDS); + } + return result; + } + + /** + * 重置阈值计数器(删除 key) + * + * @param deviceId 设备ID + * @param areaId 区域ID + */ + public void resetThreshold(Long deviceId, Long areaId) { + String key = formatThresholdKey(deviceId, areaId); + stringRedisTemplate.delete(key); + } + + /** + * 清除所有阈值计数器 + *

+ * P1修复: 使用 SCAN 替代 KEYS,避免阻塞 Redis + * + * @return 清除的数量 + */ + public int resetAllThresholds() { + Set keys = new HashSet<>(); + ScanOptions options = ScanOptions.scanOptions() + .match("iot:clean:traffic:threshold:*") + .count(100) + .build(); + + try (Cursor cursor = stringRedisTemplate.scan(options)) { + cursor.forEachRemaining(keys::add); + } catch (Exception e) { + // SCAN 失败,记录日志但不中断流程 + } + + if (keys.isEmpty()) { + return 0; + } + + stringRedisTemplate.delete(keys); + return keys.size(); + } + + // ==================== 当日累积统计 ==================== + + /** + * 原子递增当日累积统计 + * + * @param deviceId 设备ID + * @param date 日期 + * @param peopleIn 进入人数增量 + * @param peopleOut 离开人数增量 + */ + public void incrementDaily(Long deviceId, LocalDate date, long peopleIn, long peopleOut) { + String key = formatDailyKey(deviceId, date); + if (peopleIn > 0) { + stringRedisTemplate.opsForHash().increment(key, FIELD_TOTAL_IN, peopleIn); + } + if (peopleOut > 0) { + stringRedisTemplate.opsForHash().increment(key, FIELD_TOTAL_OUT, peopleOut); + } + // 设置 TTL(幂等,每次都设置保证不过期) + stringRedisTemplate.expire(key, DAILY_TTL_SECONDS, TimeUnit.SECONDS); + } + + /** + * 获取当日累积统计 + * + * @param deviceId 设备ID + * @param date 日期 + * @return 统计数据 {totalIn, totalOut, lastPersistedIn, lastPersistedOut} + */ + public Map getDailyStats(Long deviceId, LocalDate date) { + String key = formatDailyKey(deviceId, date); + Map entries = stringRedisTemplate.opsForHash().entries(key); + + Map result = new HashMap<>(); + result.put(FIELD_TOTAL_IN, parseLong(entries.get(FIELD_TOTAL_IN))); + result.put(FIELD_TOTAL_OUT, parseLong(entries.get(FIELD_TOTAL_OUT))); + result.put(FIELD_LAST_PERSISTED_IN, parseLong(entries.get(FIELD_LAST_PERSISTED_IN))); + result.put(FIELD_LAST_PERSISTED_OUT, parseLong(entries.get(FIELD_LAST_PERSISTED_OUT))); + return result; + } + + /** + * 更新已持久化的值 + * + * @param deviceId 设备ID + * @param date 日期 + * @param persistedIn 已持久化的进入人数 + * @param persistedOut 已持久化的离开人数 + */ + public void updateLastPersisted(Long deviceId, LocalDate date, long persistedIn, long persistedOut) { + String key = formatDailyKey(deviceId, date); + stringRedisTemplate.opsForHash().put(key, FIELD_LAST_PERSISTED_IN, String.valueOf(persistedIn)); + stringRedisTemplate.opsForHash().put(key, FIELD_LAST_PERSISTED_OUT, String.valueOf(persistedOut)); + } + + /** + * 扫描所有当日统计 key + *

+ * P1修复: 使用 SCAN 替代 KEYS,避免阻塞 Redis + * + * @return key 集合 + */ + public Set scanAllDailyKeys() { + Set keys = new HashSet<>(); + ScanOptions options = ScanOptions.scanOptions() + .match("iot:clean:traffic:daily:*") + .count(100) + .build(); + + try (Cursor cursor = stringRedisTemplate.scan(options)) { + cursor.forEachRemaining(keys::add); + } catch (Exception e) { + // SCAN 失败,记录日志但不中断流程 + } + + return keys; + } + + /** + * 解析当日统计 key 中的 deviceId + * + * @param key Redis key,格式:iot:clean:traffic:daily:{deviceId}:{date} + * @return deviceId + */ + public static Long parseDeviceIdFromDailyKey(String key) { + // iot:clean:traffic:daily:{deviceId}:{date} + String[] parts = key.split(":"); + if (parts.length >= 6) { + try { + return Long.parseLong(parts[4]); + } catch (NumberFormatException e) { + return null; + } + } + return null; + } + + /** + * 解析当日统计 key 中的 date + * + * @param key Redis key,格式:iot:clean:traffic:daily:{deviceId}:{date} + * @return 日期 + */ + public static LocalDate parseDateFromDailyKey(String key) { + String[] parts = key.split(":"); + if (parts.length >= 6) { + try { + return LocalDate.parse(parts[5], DATE_FORMATTER); + } catch (Exception e) { + return null; + } + } + return null; + } + + // ==================== 私有方法 ==================== + + private static String formatThresholdKey(Long deviceId, Long areaId) { + return String.format(THRESHOLD_KEY_PATTERN, deviceId, areaId); + } + + private static String formatDailyKey(Long deviceId, LocalDate date) { + return String.format(DAILY_KEY_PATTERN, deviceId, date.format(DATE_FORMATTER)); + } + + private static Long parseLong(Object value) { + if (value == null) { + return 0L; + } + if (value instanceof Number) { + return ((Number) value).longValue(); + } + try { + return Long.parseLong(value.toString()); + } catch (NumberFormatException e) { + return 0L; + } + } +} diff --git a/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/service/job/TrafficCounterBaseResetJob.java b/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/service/job/TrafficCounterBaseResetJob.java index cfb2cf6..6944ce6 100644 --- a/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/service/job/TrafficCounterBaseResetJob.java +++ b/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/service/job/TrafficCounterBaseResetJob.java @@ -1,17 +1,19 @@ package com.viewsh.module.iot.service.job; -import com.viewsh.module.iot.dal.redis.clean.TrafficCounterBaseRedisDAO; +import com.viewsh.module.iot.dal.redis.clean.TrafficCounterRedisDAO; import com.xxl.job.core.handler.annotation.XxlJob; import jakarta.annotation.Resource; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; /** - * 客流计数器基准值清零任务 + * 客流计数器每日重置任务 *

- * 每天 00:00 执行,清除所有客流计数器的基准值缓存 + * 每天 00:00 执行,删除所有阈值计数器 key *

- * 用途:确保每日客流统计从零开始 + * 注意: + * - 昨日的 daily key 由 TTL 自动过期(2 天) + * - 持久化任务已移至 Ops 模块,由 Ops 的 trafficStatisticsPersistJob 负责 * * @author AI */ @@ -20,30 +22,30 @@ import org.springframework.stereotype.Component; public class TrafficCounterBaseResetJob { @Resource - private TrafficCounterBaseRedisDAO trafficCounterBaseRedisDAO; + private TrafficCounterRedisDAO trafficCounterRedisDAO; /** - * 清零所有客流计数器基准值 + * 每日重置客流计数器 *

* XxlJob 配置: * - Cron: 0 0 0 * * ? (每天 00:00) * * @return 执行结果 */ - @XxlJob("trafficCounterBaseResetJob") + @XxlJob("trafficCounterDailyResetJob") public String execute() { - log.info("[TrafficCounterBaseResetJob] 开始执行客流计数器基准值清零任务"); + log.info("[TrafficCounterDailyResetJob] 开始执行每日重置任务"); try { - // 调用 Redis DAO 清除所有基准值 - int count = trafficCounterBaseRedisDAO.resetAll(); + // 清除所有阈值计数器 + int count = trafficCounterRedisDAO.resetAllThresholds(); - log.info("[TrafficCounterBaseResetJob] 客流计数器基准值清零完成: 清除数量={}", count); - return "成功清除 " + count + " 个基准值"; + log.info("[TrafficCounterDailyResetJob] 每日重置完成:清除阈值计数器 {} 个", count); + return "重置完成:清除阈值计数器 " + count + " 个"; } catch (Exception e) { - log.error("[TrafficCounterBaseResetJob] 客流计数器基准值清零失败", e); - return "清零失败: " + e.getMessage(); + log.error("[TrafficCounterDailyResetJob] 每日重置任务失败", e); + return "重置失败: " + e.getMessage(); } } } diff --git a/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/service/rule/clean/CleanRuleProcessorManager.java b/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/service/rule/clean/CleanRuleProcessorManager.java index 65de452..74b1ace 100644 --- a/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/service/rule/clean/CleanRuleProcessorManager.java +++ b/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/service/rule/clean/CleanRuleProcessorManager.java @@ -106,7 +106,7 @@ public class CleanRuleProcessorManager { * 安全处理单个数据项 *

* 按标识符路由到对应处理器: - * - 属性:people_in → TrafficThresholdRuleProcessor + * - 属性:people_in / people_out → TrafficThresholdRuleProcessor * - 属性:bluetoothDevices → BeaconDetectionRuleProcessor * - 事件:button_event → ButtonEventRuleProcessor * @@ -117,7 +117,8 @@ public class CleanRuleProcessorManager { private void processDataSafely(Long deviceId, String identifier, Object value) { try { switch (identifier) { - case "people_in" -> trafficThresholdRuleProcessor.processPropertyChange(deviceId, identifier, value); + case "people_in", "people_out" -> + trafficThresholdRuleProcessor.processPropertyChange(deviceId, identifier, value); case "bluetoothDevices" -> beaconDetectionRuleProcessor.processPropertyChange(deviceId, identifier, value); default -> { diff --git a/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/service/rule/clean/processor/TrafficThresholdRuleProcessor.java b/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/service/rule/clean/processor/TrafficThresholdRuleProcessor.java index eefdfe7..38a5243 100644 --- a/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/service/rule/clean/processor/TrafficThresholdRuleProcessor.java +++ b/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/service/rule/clean/processor/TrafficThresholdRuleProcessor.java @@ -3,7 +3,7 @@ package com.viewsh.module.iot.service.rule.clean.processor; import com.viewsh.module.iot.core.integration.constants.CleanOrderTopics; import com.viewsh.module.iot.core.integration.event.clean.CleanOrderCreateEvent; import com.viewsh.module.iot.dal.dataobject.integration.clean.TrafficThresholdConfig; -import com.viewsh.module.iot.dal.redis.clean.TrafficCounterBaseRedisDAO; +import com.viewsh.module.iot.dal.redis.clean.TrafficCounterRedisDAO; import com.viewsh.module.iot.service.integration.clean.CleanOrderIntegrationConfigService; import jakarta.annotation.Resource; import lombok.extern.slf4j.Slf4j; @@ -12,14 +12,18 @@ import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.messaging.support.MessageBuilder; import org.springframework.stereotype.Component; +import java.time.LocalDate; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.TimeUnit; /** * 客流阈值规则处理器 *

- * 监听设备属性上报,检测客流计数器是否达到阈值 - * 如果达到阈值,发布工单创建事件到 Ops 模块 + * 监听设备属性上报,将增量原子累加到 Redis 阈值计数器, + * 达到阈值后触发工单创建事件并重置计数器。 + *

+ * 同时维护当日累积统计(不因工单触发而重置),用于统计报表。 * * @author AI */ @@ -31,7 +35,7 @@ public class TrafficThresholdRuleProcessor { private CleanOrderIntegrationConfigService configService; @Resource - private TrafficCounterBaseRedisDAO trafficBaseRedisDAO; + private TrafficCounterRedisDAO trafficCounterRedisDAO; @Resource private RocketMQTemplate rocketMQTemplate; @@ -42,19 +46,22 @@ public class TrafficThresholdRuleProcessor { /** * 处理客流属性上报 *

- * 在设备属性上报处理流程中调用此方法 + * 支持 people_in 和 people_out 两个属性: + * - people_in:累加到阈值计数器 + 当日统计 + * - people_out:仅累加到当日统计 * - * @param deviceId 设备ID - * @param identifier 属性标识符(如 people_in) - * @param propertyValue 属性值 + * @param deviceId 设备ID + * @param identifier 属性标识符(people_in 或 people_out) + * @param propertyValue 属性值(周期内增量) */ public void processPropertyChange(Long deviceId, String identifier, Object propertyValue) { - // 1. 检查是否是客流属性 - if (!"people_in".equals(identifier)) { + // 1. 校验属性类型 + if (!"people_in".equals(identifier) && !"people_out".equals(identifier)) { return; } - log.debug("[TrafficThreshold] 收到客流属性:deviceId={}, value={}", deviceId, propertyValue); + log.debug("[TrafficThreshold] 收到客流属性:deviceId={}, identifier={}, value={}", + deviceId, identifier, propertyValue); // 2. 获取设备关联信息(包含 areaId) CleanOrderIntegrationConfigService.AreaDeviceConfigWrapper configWrapper = @@ -71,53 +78,68 @@ public class TrafficThresholdRuleProcessor { return; } - // 3. 解析客流值 - Long currentCount = parseTrafficCount(propertyValue); - if (currentCount == null) { - log.warn("[TrafficThreshold] 客流值解析失败:deviceId={}, value={}", deviceId, propertyValue); + // 3. 解析增量值 + Long increment = parseTrafficCount(propertyValue); + if (increment == null || increment <= 0) { + log.debug("[TrafficThreshold] 增量值无效:deviceId={}, identifier={}, value={}", + deviceId, identifier, propertyValue); return; } - // 4. 计算实际客流(当前值 - 基准值) - Long baseValue = trafficBaseRedisDAO.getBaseValue(deviceId); + Long areaId = configWrapper.getAreaId(); + LocalDate today = LocalDate.now(); - // 动态校准:如果 currentCount < baseValue,说明设备已重置,则自动更新 baseValue = 0 - if (baseValue != null && currentCount < baseValue) { - log.warn("[TrafficThreshold] 检测到设备计数器重置,校准基准值:deviceId={}, currentCount={}, oldBaseValue={}", - deviceId, currentCount, baseValue); - trafficBaseRedisDAO.setBaseValue(deviceId, 0L); - baseValue = 0L; + // 4. 根据属性类型分别处理 + if ("people_in".equals(identifier)) { + handlePeopleIn(deviceId, areaId, increment, today, thresholdConfig, configWrapper); + } else { + // people_out:仅累加到当日统计 + trafficCounterRedisDAO.incrementDaily(deviceId, today, 0, increment); + log.debug("[TrafficThreshold] people_out 累加到当日统计:deviceId={}, areaId={}, increment={}", + deviceId, areaId, increment); } + } - Long actualCount = currentCount - (baseValue != null ? baseValue : 0L); + /** + * 处理 people_in 增量 + */ + private void handlePeopleIn(Long deviceId, Long areaId, Long increment, LocalDate today, + TrafficThresholdConfig thresholdConfig, + CleanOrderIntegrationConfigService.AreaDeviceConfigWrapper configWrapper) { + // 1. 原子累加到阈值计数器,返回累积值 + Long accumulated = trafficCounterRedisDAO.incrementThreshold(deviceId, areaId, increment); - log.debug("[TrafficThreshold] 客流统计:deviceId={}, currentCount={}, baseValue={}, actualCount={}, threshold={}", - deviceId, currentCount, baseValue, actualCount, thresholdConfig.getThreshold()); + // 2. 原子累加到当日统计 + trafficCounterRedisDAO.incrementDaily(deviceId, today, increment, 0); - // 5. 阈值判定 - if (actualCount < thresholdConfig.getThreshold()) { + log.debug("[TrafficThreshold] people_in 累加:deviceId={}, areaId={}, increment={}, accumulated={}, threshold={}", + deviceId, areaId, increment, accumulated, thresholdConfig.getThreshold()); + + // 3. 阈值判定 + if (accumulated < thresholdConfig.getThreshold()) { return; // 未达标 } - // 6. 防重复检查(使用 Redis 分布式锁) - String lockKey = String.format("iot:clean:traffic:lock:%s:%s", deviceId, configWrapper.getAreaId()); + // 4. 防重复检查(使用 Redis 分布式锁) + String lockKey = String.format("iot:clean:traffic:lock:%s:%s", deviceId, areaId); Boolean locked = stringRedisTemplate.opsForValue() - .setIfAbsent(lockKey, "1", thresholdConfig.getTimeWindowSeconds(), java.util.concurrent.TimeUnit.SECONDS); + .setIfAbsent(lockKey, "1", thresholdConfig.getTimeWindowSeconds(), TimeUnit.SECONDS); - if (!locked) { - log.info("[TrafficThreshold] 防重复触发:deviceId={}, areaId={}", deviceId, configWrapper.getAreaId()); + if (Boolean.FALSE.equals(locked)) { + log.info("[TrafficThreshold] 防重复触发:deviceId={}, areaId={}", deviceId, areaId); return; } - // 7. 发布工单创建事件 - publishCreateEvent(configWrapper, actualCount, baseValue, thresholdConfig.getThreshold()); + // 5. 发布工单创建事件 + // 注意:阈值计数器将在 Ops 模块工单创建成功后重置,确保事务一致性 + publishCreateEvent(configWrapper, accumulated, thresholdConfig.getThreshold()); } /** * 发布工单创建事件 */ private void publishCreateEvent(CleanOrderIntegrationConfigService.AreaDeviceConfigWrapper configWrapper, - Long actualCount, Long baseValue, Integer threshold) { + Long accumulated, Integer threshold) { try { CleanOrderCreateEvent event = CleanOrderCreateEvent.builder() .orderType("CLEAN") @@ -126,13 +148,13 @@ public class TrafficThresholdRuleProcessor { .triggerDeviceId(configWrapper.getDeviceId()) .triggerDeviceKey(configWrapper.getDeviceKey()) .priority(configWrapper.getConfig().getTrafficThreshold().getOrderPriority()) - .triggerData(buildTriggerData(actualCount, baseValue, threshold)) + .triggerData(buildTriggerData(accumulated, threshold)) .build(); rocketMQTemplate.syncSend(CleanOrderTopics.ORDER_CREATE, MessageBuilder.withPayload(event).build()); - log.info("[TrafficThreshold] 发布工单创建事件:eventId={}, areaId={}, actualCount={}, threshold={}", - event.getEventId(), configWrapper.getAreaId(), actualCount, threshold); + log.info("[TrafficThreshold] 发布工单创建事件:eventId={}, areaId={}, accumulated={}, threshold={}", + event.getEventId(), configWrapper.getAreaId(), accumulated, threshold); } catch (Exception e) { log.error("[TrafficThreshold] 发布工单创建事件失败:deviceId={}, areaId={}", configWrapper.getDeviceId(), configWrapper.getAreaId(), e); @@ -142,19 +164,16 @@ public class TrafficThresholdRuleProcessor { /** * 构建触发数据 */ - private Map buildTriggerData(Long actualCount, Long baseValue, Integer threshold) { + private Map buildTriggerData(Long accumulated, Integer threshold) { Map data = new HashMap<>(); - data.put("actualCount", actualCount); - data.put("baseValue", baseValue); + data.put("accumulated", accumulated); data.put("threshold", threshold); - data.put("exceededCount", actualCount - threshold); + data.put("exceededCount", accumulated - threshold); return data; } /** * 获取配置包装器 - *

- * 通过 deviceId 直接获取配置(适用于一对一关系的设备,如 TRAFFIC_COUNTER) */ private CleanOrderIntegrationConfigService.AreaDeviceConfigWrapper getConfigWrapper(Long deviceId) { return configService.getConfigWrapperByDeviceId(deviceId); diff --git a/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/dal/dataobject/dto/OpsOrderBusinessLogRespDTO.java b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/dal/dataobject/dto/OpsOrderBusinessLogRespDTO.java new file mode 100644 index 0000000..927bf11 --- /dev/null +++ b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/dal/dataobject/dto/OpsOrderBusinessLogRespDTO.java @@ -0,0 +1,42 @@ +package com.viewsh.module.ops.dal.dataobject.dto; + +import io.swagger.v3.oas.annotations.media.Schema; +import lombok.Data; + +import java.time.LocalDateTime; +import java.util.Map; + +/** + * 工单业务日志响应 DTO + * + * @author lzh + */ +@Schema(description = "管理后台 - 工单业务日志 Response DTO") +@Data +public class OpsOrderBusinessLogRespDTO { + + @Schema(description = "日志ID", requiredMode = Schema.RequiredMode.REQUIRED, example = "1") + private Long id; + + @Schema(description = "日志类型(system=系统/user=用户)", requiredMode = Schema.RequiredMode.REQUIRED, example = "system") + private String type; + + @Schema(description = "日志标题", requiredMode = Schema.RequiredMode.REQUIRED, example = "工单自动创建") + private String title; + + @Schema(description = "日志内容", example = "蓝牙信标触发自动创建保洁工单") + private String content; + + @Schema(description = "操作人", requiredMode = Schema.RequiredMode.REQUIRED, example = "系统") + private String operator; + + @Schema(description = "日志时间", requiredMode = Schema.RequiredMode.REQUIRED, example = "2026-01-23 14:30:25") + private LocalDateTime time; + + @Schema(description = "工单状态", example = "PENDING") + private String status; + + @Schema(description = "扩展数据", example = "{}") + private Map extra; + +} diff --git a/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/dal/dataobject/dto/OpsOrderBusinessLogsRespDTO.java b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/dal/dataobject/dto/OpsOrderBusinessLogsRespDTO.java new file mode 100644 index 0000000..a2b63cf --- /dev/null +++ b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/dal/dataobject/dto/OpsOrderBusinessLogsRespDTO.java @@ -0,0 +1,23 @@ +package com.viewsh.module.ops.dal.dataobject.dto; + +import io.swagger.v3.oas.annotations.media.Schema; +import lombok.Data; + +import java.util.List; + +/** + * 工单业务日志列表响应 DTO + * + * @author lzh + */ +@Schema(description = "管理后台 - 工单业务日志列表 Response DTO") +@Data +public class OpsOrderBusinessLogsRespDTO { + + @Schema(description = "工单ID", requiredMode = Schema.RequiredMode.REQUIRED, example = "10001") + private Long orderId; + + @Schema(description = "日志列表", requiredMode = Schema.RequiredMode.REQUIRED) + private List logs; + +} diff --git a/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/dal/dataobject/statistics/OpsTrafficStatisticsDO.java b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/dal/dataobject/statistics/OpsTrafficStatisticsDO.java new file mode 100644 index 0000000..9c2bb9a --- /dev/null +++ b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/dal/dataobject/statistics/OpsTrafficStatisticsDO.java @@ -0,0 +1,62 @@ +package com.viewsh.module.ops.dal.dataobject.statistics; + +import com.baomidou.mybatisplus.annotation.KeySequence; +import com.baomidou.mybatisplus.annotation.TableId; +import com.baomidou.mybatisplus.annotation.TableName; +import com.viewsh.framework.tenant.core.db.TenantBaseDO; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.NoArgsConstructor; + +import java.time.LocalDateTime; + +/** + * 客流统计小时汇总 DO + *

+ * 用于 Ops 业务报表统计,数据来源于 IoT 设备的客流计数器 + * + * @author AI + */ +@TableName("ops_traffic_statistics") +@KeySequence("ops_traffic_statistics_seq") +@Data +@EqualsAndHashCode(callSuper = true) +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class OpsTrafficStatisticsDO extends TenantBaseDO { + + /** + * 主键 + */ + @TableId + private Long id; + + /** + * 设备ID(数据来源) + */ + private Long deviceId; + + /** + * 区域ID(主查询维度) + */ + private Long areaId; + + /** + * 统计小时(精确到小时,如 2026-02-03 10:00:00) + */ + private LocalDateTime statHour; + + /** + * 进入人数 + */ + private Integer peopleIn; + + /** + * 离开人数 + */ + private Integer peopleOut; + +} diff --git a/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/dal/mysql/statistics/OpsTrafficStatisticsMapper.java b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/dal/mysql/statistics/OpsTrafficStatisticsMapper.java new file mode 100644 index 0000000..f238fbb --- /dev/null +++ b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/dal/mysql/statistics/OpsTrafficStatisticsMapper.java @@ -0,0 +1,36 @@ +package com.viewsh.module.ops.dal.mysql.statistics; + +import com.viewsh.framework.mybatis.core.mapper.BaseMapperX; +import com.viewsh.module.ops.dal.dataobject.statistics.OpsTrafficStatisticsDO; +import org.apache.ibatis.annotations.Mapper; +import org.apache.ibatis.annotations.Param; + +import java.time.LocalDateTime; + +/** + * 客流统计小时汇总 Mapper + * + * @author AI + */ +@Mapper +public interface OpsTrafficStatisticsMapper extends BaseMapperX { + + /** + * 插入或更新统计记录 + *

+ * 使用 INSERT ... ON DUPLICATE KEY UPDATE 实现 upsert + * 注意:方法名改为 upsert 避免与 MyBatis Plus BaseMapper.insertOrUpdate() 冲突 + * + * @param record 统计记录 + */ + void upsert(@Param("record") OpsTrafficStatisticsDO record); + + /** + * 删除指定时间之前的统计记录 + * + * @param beforeTime 截止时间 + * @return 删除的记录数 + */ + int deleteByStatHourBefore(@Param("beforeTime") LocalDateTime beforeTime); + +} diff --git a/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/job/TrafficStatisticsCleanupJob.java b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/job/TrafficStatisticsCleanupJob.java new file mode 100644 index 0000000..d11162c --- /dev/null +++ b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/job/TrafficStatisticsCleanupJob.java @@ -0,0 +1,50 @@ +package com.viewsh.module.ops.service.job; + +import com.viewsh.module.ops.dal.mysql.statistics.OpsTrafficStatisticsMapper; +import com.xxl.job.core.handler.annotation.XxlJob; +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +import java.time.LocalDateTime; + +/** + * 客流统计清理任务 + *

+ * 每月 1 日凌晨 2 点执行,删除 30 天前的客流统计记录 + * + * @author AI + */ +@Slf4j +@Component +public class TrafficStatisticsCleanupJob { + + @Resource + private OpsTrafficStatisticsMapper trafficStatisticsMapper; + + /** + * 清理过期的客流统计记录 + *

+ * XxlJob 配置: + * - Cron: 0 0 2 1 * ? (每月 1 日凌晨 2 点) + * + * @return 执行结果 + */ + @XxlJob("trafficStatisticsCleanupJob") + public String execute() { + log.info("[TrafficStatisticsCleanupJob] 开始执行客流统计清理任务"); + + try { + LocalDateTime beforeTime = LocalDateTime.now().minusDays(30); + int deletedCount = trafficStatisticsMapper.deleteByStatHourBefore(beforeTime); + + log.info("[TrafficStatisticsCleanupJob] 客流统计清理完成:删除 {} 条记录(截止时间={})", + deletedCount, beforeTime); + return "清理完成:删除 " + deletedCount + " 条记录"; + + } catch (Exception e) { + log.error("[TrafficStatisticsCleanupJob] 客流统计清理失败", e); + return "清理失败: " + e.getMessage(); + } + } +} diff --git a/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/job/TrafficStatisticsPersistJob.java b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/job/TrafficStatisticsPersistJob.java new file mode 100644 index 0000000..e522f18 --- /dev/null +++ b/viewsh-module-ops/viewsh-module-ops-biz/src/main/java/com/viewsh/module/ops/service/job/TrafficStatisticsPersistJob.java @@ -0,0 +1,310 @@ +package com.viewsh.module.ops.service.job; + +import com.viewsh.module.ops.dal.dataobject.statistics.OpsTrafficStatisticsDO; +import com.viewsh.module.ops.dal.mysql.statistics.OpsTrafficStatisticsMapper; +import com.viewsh.module.ops.service.area.OpsBusAreaService; +import com.xxl.job.core.handler.annotation.XxlJob; +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.springframework.data.redis.core.Cursor; +import org.springframework.data.redis.core.ScanOptions; +import org.springframework.data.redis.core.StringRedisTemplate; +import org.springframework.stereotype.Component; + +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.time.format.DateTimeFormatter; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +/** + * 客流统计持久化任务 + *

+ * 每小时整点执行,将 IoT 模块 Redis 中的当日累积统计数据写入 Ops 业务库 + *

+ * 通过对比 Redis 中的 lastPersistedIn/lastPersistedOut 与 totalIn/totalOut, + * 计算本小时的增量并写入 ops_traffic_statistics 表 + * + * @author AI + */ +@Slf4j +@Component +public class TrafficStatisticsPersistJob { + + private static final DateTimeFormatter DATE_FORMATTER = DateTimeFormatter.ofPattern("yyyyMMdd"); + private static final String LOCK_KEY = "ops:traffic:persist:lock"; + private static final int LOCK_TTL_SECONDS = 300; // 5分钟锁超时 + + @Resource + private StringRedisTemplate stringRedisTemplate; + + @Resource + private OpsTrafficStatisticsMapper trafficStatisticsMapper; + + @Resource + private OpsBusAreaService areaService; + + /** + * 每小时持久化客流统计数据 + *

+ * XxlJob 配置: + * - Cron: 0 0 * * * ? (每小时整点) + * + * @return 执行结果 + */ + @XxlJob("trafficStatisticsPersistJob") + public String execute() { + log.info("[TrafficStatisticsPersistJob] 开始执行客流统计持久化任务"); + + // P0修复1: 获取分布式锁,防止并发执行导致双重计数 + Boolean locked = stringRedisTemplate.opsForValue() + .setIfAbsent(LOCK_KEY, String.valueOf(System.currentTimeMillis()), + LOCK_TTL_SECONDS, TimeUnit.SECONDS); + + if (Boolean.FALSE.equals(locked)) { + log.warn("[TrafficStatisticsPersistJob] 任务已在其他节点执行中,跳过本次执行"); + return "任务已在其他节点执行"; + } + + try { + // P1修复3: 使用 SCAN 替代 KEYS,避免阻塞 Redis + Set dailyKeys = scanDailyKeys(); + if (dailyKeys.isEmpty()) { + log.info("[TrafficStatisticsPersistJob] 无统计数据需要持久化"); + return "无数据"; + } + + int successCount = 0; + int errorCount = 0; + int skippedCount = 0; + + for (String key : dailyKeys) { + try { + PersistResult result = persistSingleDevice(key); + if (result == PersistResult.SUCCESS) { + successCount++; + } else if (result == PersistResult.SKIPPED) { + skippedCount++; + } else { + errorCount++; + } + } catch (Exception e) { + errorCount++; + log.error("[TrafficStatisticsPersistJob] 持久化失败:key={}", key, e); + } + } + + String result = String.format("持久化完成:成功=%d, 跳过=%d, 失败=%d", + successCount, skippedCount, errorCount); + log.info("[TrafficStatisticsPersistJob] {}", result); + return result; + + } catch (Exception e) { + log.error("[TrafficStatisticsPersistJob] 持久化任务执行失败", e); + return "执行失败: " + e.getMessage(); + } finally { + // 释放分布式锁 + stringRedisTemplate.delete(LOCK_KEY); + } + } + + /** + * 使用 SCAN 扫描所有当日统计 key + */ + private Set scanDailyKeys() { + Set keys = new HashSet<>(); + ScanOptions options = ScanOptions.scanOptions() + .match("iot:clean:traffic:daily:*") + .count(100) + .build(); + + try (Cursor cursor = stringRedisTemplate.scan(options)) { + cursor.forEachRemaining(keys::add); + } catch (Exception e) { + log.error("[TrafficStatisticsPersistJob] SCAN 扫描失败", e); + } + + return keys; + } + + /** + * 持久化单个设备的统计数据 + */ + private PersistResult persistSingleDevice(String key) { + Long deviceId = parseDeviceIdFromKey(key); + LocalDate date = parseDateFromKey(key); + if (deviceId == null || date == null) { + log.warn("[TrafficStatisticsPersistJob] 解析 key 失败:key={}", key); + return PersistResult.ERROR; + } + + // 读取 Redis Hash 数据 + Map data = stringRedisTemplate.opsForHash().entries(key); + long totalIn = parseLong(data.get("totalIn")); + long totalOut = parseLong(data.get("totalOut")); + long lastPersistedIn = parseLong(data.get("lastPersistedIn")); + long lastPersistedOut = parseLong(data.get("lastPersistedOut")); + + // 计算本次需要持久化的增量 + long deltaIn = totalIn - lastPersistedIn; + long deltaOut = totalOut - lastPersistedOut; + + // P1修复5: 处理负增量(计数器重置场景) + if (deltaIn < 0 || deltaOut < 0) { + log.warn("[TrafficStatisticsPersistJob] 检测到计数器重置:deviceId={}, date={}, " + + "totalIn={}, lastPersistedIn={}, totalOut={}, lastPersistedOut={}", + deviceId, date, totalIn, lastPersistedIn, totalOut, lastPersistedOut); + + // 重置已持久化标记,从当前值重新开始计数 + stringRedisTemplate.opsForHash().put(key, "lastPersistedIn", String.valueOf(totalIn)); + stringRedisTemplate.opsForHash().put(key, "lastPersistedOut", String.valueOf(totalOut)); + + log.info("[TrafficStatisticsPersistJob] 已重置持久化标记:deviceId={}, date={}", deviceId, date); + return PersistResult.SKIPPED; + } + + if (deltaIn <= 0 && deltaOut <= 0) { + log.debug("[TrafficStatisticsPersistJob] 无新增数据:deviceId={}, date={}", deviceId, date); + return PersistResult.SKIPPED; + } + + // P1修复7: 检查 int 溢出 + if (deltaIn > Integer.MAX_VALUE || deltaOut > Integer.MAX_VALUE) { + log.error("[TrafficStatisticsPersistJob] 增量值溢出:deviceId={}, date={}, deltaIn={}, deltaOut={}", + deviceId, date, deltaIn, deltaOut); + return PersistResult.ERROR; + } + + // 获取区域ID(从区域设备关系中查询) + Long areaId = getAreaIdForDevice(deviceId); + + // P1修复4: 处理缺失区域关联场景 + if (areaId == null) { + log.warn("[TrafficStatisticsPersistJob] 设备无区域关联,更新持久化标记避免数据累积:deviceId={}", deviceId); + + // 更新 lastPersisted,避免下次累积错误数据 + stringRedisTemplate.opsForHash().put(key, "lastPersistedIn", String.valueOf(totalIn)); + stringRedisTemplate.opsForHash().put(key, "lastPersistedOut", String.valueOf(totalOut)); + + // TODO: 将数据存入死信队列,等待区域关联修复后重试 + return PersistResult.SKIPPED; + } + + // P0修复2: 修复小时分桶逻辑 + LocalDateTime statHour = calculateStatHour(date); + + OpsTrafficStatisticsDO record = OpsTrafficStatisticsDO.builder() + .deviceId(deviceId) + .areaId(areaId) + .statHour(statHour) + .peopleIn((int) deltaIn) + .peopleOut((int) deltaOut) + .build(); + + trafficStatisticsMapper.upsert(record); + + // 更新 Redis 已持久化的值 + stringRedisTemplate.opsForHash().put(key, "lastPersistedIn", String.valueOf(totalIn)); + stringRedisTemplate.opsForHash().put(key, "lastPersistedOut", String.valueOf(totalOut)); + + log.debug("[TrafficStatisticsPersistJob] 持久化成功:deviceId={}, areaId={}, statHour={}, deltaIn={}, deltaOut={}", + deviceId, areaId, statHour, deltaIn, deltaOut); + + return PersistResult.SUCCESS; + } + + /** + * P0修复2: 计算正确的统计小时 + *

+ * 规则: + * - 如果是今天的数据,归属到当前小时 + * - 如果是昨天的数据(跨日执行),归属到昨天的23点 + * - 如果是更早的数据(延迟执行),归属到当天的23点 + */ + private LocalDateTime calculateStatHour(LocalDate dataDate) { + LocalDate today = LocalDate.now(); + int currentHour = LocalDateTime.now().getHour(); + + if (dataDate.equals(today)) { + // 今天的数据,归属到当前小时 + return LocalDateTime.of(dataDate, LocalTime.of(currentHour, 0)); + } else { + // 历史数据(昨天或更早),归属到当天的23点 + return LocalDateTime.of(dataDate, LocalTime.of(23, 0)); + } + } + + /** + * 获取设备关联的区域ID + */ + private Long getAreaIdForDevice(Long deviceId) { + try { + // 通过 OpsBusAreaService 查询设备关联的区域 + return areaService.getAreaIdByDeviceId(deviceId); + } catch (Exception e) { + log.error("[TrafficStatisticsPersistJob] 查询设备区域失败:deviceId={}", deviceId, e); + return null; + } + } + + /** + * 解析 Redis key 中的设备ID + * key 格式:iot:clean:traffic:daily:{deviceId}:{date} + */ + private Long parseDeviceIdFromKey(String key) { + String[] parts = key.split(":"); + if (parts.length >= 6) { + try { + return Long.parseLong(parts[4]); + } catch (NumberFormatException e) { + return null; + } + } + return null; + } + + /** + * 解析 Redis key 中的日期 + * key 格式:iot:clean:traffic:daily:{deviceId}:{date} + */ + private LocalDate parseDateFromKey(String key) { + String[] parts = key.split(":"); + if (parts.length >= 6) { + try { + return LocalDate.parse(parts[5], DATE_FORMATTER); + } catch (Exception e) { + return null; + } + } + return null; + } + + /** + * 解析 Long 值 + */ + private long parseLong(Object value) { + if (value == null) { + return 0L; + } + if (value instanceof Number) { + return ((Number) value).longValue(); + } + try { + return Long.parseLong(value.toString()); + } catch (NumberFormatException e) { + return 0L; + } + } + + /** + * 持久化结果枚举 + */ + private enum PersistResult { + SUCCESS, // 成功 + SKIPPED, // 跳过(无数据、重置等) + ERROR // 错误 + } +} diff --git a/viewsh-module-ops/viewsh-module-ops-biz/src/main/resources/mapper/statistics/OpsTrafficStatisticsMapper.xml b/viewsh-module-ops/viewsh-module-ops-biz/src/main/resources/mapper/statistics/OpsTrafficStatisticsMapper.xml new file mode 100644 index 0000000..6dd2ded --- /dev/null +++ b/viewsh-module-ops/viewsh-module-ops-biz/src/main/resources/mapper/statistics/OpsTrafficStatisticsMapper.xml @@ -0,0 +1,20 @@ + + + + + + INSERT INTO ops_traffic_statistics (device_id, area_id, stat_hour, people_in, people_out, tenant_id) + VALUES (#{record.deviceId}, #{record.areaId}, #{record.statHour}, #{record.peopleIn}, #{record.peopleOut}, #{record.tenantId}) + ON DUPLICATE KEY UPDATE + people_in = people_in + VALUES(people_in), + people_out = people_out + VALUES(people_out), + update_time = NOW() + + + + DELETE FROM ops_traffic_statistics + WHERE stat_hour < #{beforeTime} + AND deleted = 0 + + +