From acd7a35e1d029d36d612d92cde44f97edd1f521b Mon Sep 17 00:00:00 2001 From: lzh Date: Fri, 24 Apr 2026 10:59:27 +0800 Subject: [PATCH] =?UTF-8?q?fix(iot):=20=E8=BD=A8=E8=BF=B9=E6=A3=80?= =?UTF-8?q?=E6=B5=8B=E9=98=B2=E6=8A=96=20+=20eventTime=20=E7=94=A8=20repor?= =?UTF-8?q?tTime=20=E9=81=BF=E5=85=8D=E5=9B=9E=E6=94=BE=E6=8C=A4=E5=8E=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 事件 eventTime 透传设备 reportTime,修复 TDengine/消息总线恢复后堆积回放导致 ENTER/LEAVE 全部塞进同一秒的问题 - 区域切换加 5dB 滞回 + 进入后 5s 最小停留,压制 RSSI 抖动造成的瞬态 AREA_SWITCH 与 SIGNAL_LOSS - 滞回兜底改用窗口内最近一次非 -999 样本,避免当前信标短暂漏扫时滞回被缺失哨兵破坏 - reportTime 为空时记录 warn,便于追踪上游漏传的调用链 Co-Authored-By: Claude Opus 4.7 (1M context) --- .../rule/clean/CleanRuleProcessorManager.java | 10 +- .../TrajectoryDetectionProcessor.java | 97 ++++++++++++++++--- 2 files changed, 91 insertions(+), 16 deletions(-) diff --git a/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/service/rule/clean/CleanRuleProcessorManager.java b/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/service/rule/clean/CleanRuleProcessorManager.java index acf13bf1..59a163b9 100644 --- a/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/service/rule/clean/CleanRuleProcessorManager.java +++ b/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/service/rule/clean/CleanRuleProcessorManager.java @@ -10,6 +10,7 @@ import jakarta.annotation.Resource; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; +import java.time.LocalDateTime; import java.util.Map; /** @@ -60,6 +61,7 @@ public class CleanRuleProcessorManager { } Long deviceId = message.getDeviceId(); + LocalDateTime reportTime = message.getReportTime(); @SuppressWarnings("unchecked") Map data = (Map) message.getParams(); @@ -73,7 +75,7 @@ public class CleanRuleProcessorManager { } else { // 属性上报:直接遍历 key-value data.forEach((identifier, value) -> - processDataSafely(deviceId, identifier, value)); + processDataSafely(deviceId, identifier, value, reportTime)); // 4. 蓝牙信号缺失补偿:当设备上报了属性但不含 bluetoothDevices 时, // 主动注入一次 null 调用,使 BeaconDetectionRuleProcessor 能写入 -999(信号缺失), @@ -81,7 +83,7 @@ public class CleanRuleProcessorManager { if (!data.containsKey("bluetoothDevices")) { beaconDetectionRuleProcessor.processPropertyChange(deviceId, "bluetoothDevices", null); // 轨迹检测同样需要信号丢失补偿,注入 null 使窗口写入 -999 - trajectoryDetectionProcessor.processPropertyChange(deviceId, "bluetoothDevices", null); + trajectoryDetectionProcessor.processPropertyChange(deviceId, "bluetoothDevices", null, reportTime); } } } @@ -127,7 +129,7 @@ public class CleanRuleProcessorManager { * @param identifier 标识符 * @param value 数据值 */ - private void processDataSafely(Long deviceId, String identifier, Object value) { + private void processDataSafely(Long deviceId, String identifier, Object value, LocalDateTime reportTime) { try { switch (identifier) { case "people_in", "people_out" -> @@ -135,7 +137,7 @@ public class CleanRuleProcessorManager { case "bluetoothDevices" -> { beaconDetectionRuleProcessor.processPropertyChange(deviceId, identifier, value); // 轨迹检测:独立于保洁到岗检测,匹配所有已知 Beacon - trajectoryDetectionProcessor.processPropertyChange(deviceId, identifier, value); + trajectoryDetectionProcessor.processPropertyChange(deviceId, identifier, value, reportTime); } default -> { // 其他属性/事件忽略 diff --git a/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/service/rule/clean/processor/TrajectoryDetectionProcessor.java b/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/service/rule/clean/processor/TrajectoryDetectionProcessor.java index b6fe4b8f..933e9754 100644 --- a/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/service/rule/clean/processor/TrajectoryDetectionProcessor.java +++ b/viewsh-module-iot/viewsh-module-iot-server/src/main/java/com/viewsh/module/iot/service/rule/clean/processor/TrajectoryDetectionProcessor.java @@ -24,6 +24,7 @@ import org.springframework.stereotype.Component; import com.viewsh.framework.tenant.core.context.TenantContextHolder; +import java.time.LocalDateTime; import java.util.*; import java.util.concurrent.TimeUnit; @@ -57,6 +58,17 @@ public class TrajectoryDetectionProcessor { private static final String DEVICE_ENABLED_KEY_PATTERN = "iot:trajectory:device:enabled:%s"; private static final int DEVICE_ENABLED_TTL_SECONDS = 3600; // 1小时 + /** + * 最小停留时长(毫秒):进入区域后至少停留这么久,才允许发布 LEAVE/AREA_SWITCH, + * 用于过滤 RSSI 抖动和批量消息回放导致的瞬态切换 + */ + private static final long MIN_STAY_MILLIS = 5_000L; + + /** + * 区域切换滞回阈值(dB):候选区域 RSSI 必须比当前区域 RSSI 高出此值,才允许 AREA_SWITCH + */ + private static final int RSSI_HYSTERESIS_DB = 5; + @Resource private BeaconRegistryService beaconRegistryService; @@ -84,11 +96,20 @@ public class TrajectoryDetectionProcessor { * @param deviceId 设备ID(工牌) * @param identifier 属性标识符 * @param propertyValue 蓝牙设备列表 + * @param reportTime 设备上报时间(可为 null,为空则用当前时间兜底) */ - public void processPropertyChange(Long deviceId, String identifier, Object propertyValue) { + public void processPropertyChange(Long deviceId, String identifier, Object propertyValue, + LocalDateTime reportTime) { if (!"bluetoothDevices".equals(identifier)) { return; } + LocalDateTime eventTime; + if (reportTime != null) { + eventTime = reportTime; + } else { + eventTime = LocalDateTime.now(); + log.warn("[Trajectory] reportTime 为空,使用当前时间兜底:deviceId={}(若频繁出现请排查上游调用链)", deviceId); + } // 1. 检查设备是否开启轨迹功能 if (!isTrajectoryEnabled(deviceId)) { @@ -125,9 +146,9 @@ public class TrajectoryDetectionProcessor { // 8. 处理区域状态变化 if (currentArea != null) { - processWithCurrentArea(deviceId, currentArea, matchedBeacons, areaConfigIndex); + processWithCurrentArea(deviceId, currentArea, matchedBeacons, areaConfigIndex, eventTime); } else { - processWithoutCurrentArea(deviceId, matchedBeacons, areaConfigIndex); + processWithoutCurrentArea(deviceId, matchedBeacons, areaConfigIndex, eventTime); } } @@ -139,8 +160,11 @@ public class TrajectoryDetectionProcessor { private void processWithCurrentArea(Long deviceId, TrajectoryStateRedisDAO.CurrentAreaInfo currentArea, Map matchedBeacons, - Map areaConfigIndex) { + Map areaConfigIndex, + LocalDateTime eventTime) { Long currentAreaId = currentArea.getAreaId(); + long stayMillis = System.currentTimeMillis() - (currentArea.getEnterTime() != null ? currentArea.getEnterTime() : 0L); + boolean minStayReached = stayMillis >= MIN_STAY_MILLIS; // 6a. 检查当前区域的退出条件 BeaconPresenceConfig currentConfig = areaConfigIndex.get(currentAreaId); @@ -153,16 +177,22 @@ public class TrajectoryDetectionProcessor { AreaState.IN_AREA); if (exitResult == DetectionResult.LEAVE_CONFIRMED) { + if (!minStayReached) { + // 未达到最小停留时长,视为瞬态抖动,忽略本次离开 + log.debug("[Trajectory] 未达最小停留,忽略 LEAVE:deviceId={}, areaId={}, stayMs={}", + deviceId, currentAreaId, stayMillis); + return; + } // 确认离开当前区域 publishLeaveEvent(deviceId, currentAreaId, currentArea.getBeaconMac(), - "SIGNAL_LOSS", currentArea.getEnterTime()); + "SIGNAL_LOSS", currentArea.getEnterTime(), eventTime); stateRedisDAO.clearCurrentArea(deviceId); windowRedisDAO.clearWindow(deviceId, currentAreaId); log.info("[Trajectory] 离开区域:deviceId={}, areaId={}, reason=SIGNAL_LOSS", deviceId, currentAreaId); // 离开后,尝试进入新区域 - processWithoutCurrentArea(deviceId, matchedBeacons, areaConfigIndex); + processWithoutCurrentArea(deviceId, matchedBeacons, areaConfigIndex, eventTime); return; } } @@ -170,15 +200,29 @@ public class TrajectoryDetectionProcessor { // 6b. 当前区域未退出,检查是否有更强区域触发切换 MatchedBeacon bestCandidate = findBestEnterCandidate(deviceId, matchedBeacons, currentAreaId); if (bestCandidate != null && !bestCandidate.areaId.equals(currentAreaId)) { + if (!minStayReached) { + log.debug("[Trajectory] 未达最小停留,忽略 AREA_SWITCH:deviceId={}, from={}, to={}, stayMs={}", + deviceId, currentAreaId, bestCandidate.areaId, stayMillis); + return; + } + // 切换滞回:候选 RSSI 必须显著强于当前区域 RSSI + // 优先取本次匹配值;若当前未匹配到,回退到窗口里最近一次非缺失(-999)样本, + // 避免当前信标短暂漏扫时滞回被 -999 哨兵破坏 + int currentRssi = resolveCurrentAreaRssi(deviceId, currentAreaId, matchedBeacons); + if (bestCandidate.rssi - currentRssi < RSSI_HYSTERESIS_DB) { + log.debug("[Trajectory] 未达滞回阈值,忽略 AREA_SWITCH:deviceId={}, from={}({}dBm), to={}({}dBm)", + deviceId, currentAreaId, currentRssi, bestCandidate.areaId, bestCandidate.rssi); + return; + } // 区域切换:先离开当前区域,再进入新区域 publishLeaveEvent(deviceId, currentAreaId, currentArea.getBeaconMac(), - "AREA_SWITCH", currentArea.getEnterTime()); + "AREA_SWITCH", currentArea.getEnterTime(), eventTime); windowRedisDAO.clearWindow(deviceId, currentAreaId); long now = System.currentTimeMillis(); stateRedisDAO.setCurrentArea(deviceId, bestCandidate.areaId, now, bestCandidate.beaconMac); windowRedisDAO.clearWindow(deviceId, bestCandidate.areaId); - publishEnterEvent(deviceId, bestCandidate.areaId, bestCandidate.beaconMac, bestCandidate.rssi); + publishEnterEvent(deviceId, bestCandidate.areaId, bestCandidate.beaconMac, bestCandidate.rssi, eventTime); log.info("[Trajectory] 区域切换:deviceId={}, from={}, to={}", deviceId, currentAreaId, bestCandidate.areaId); } @@ -189,13 +233,14 @@ public class TrajectoryDetectionProcessor { */ private void processWithoutCurrentArea(Long deviceId, Map matchedBeacons, - Map areaConfigIndex) { + Map areaConfigIndex, + LocalDateTime eventTime) { MatchedBeacon bestCandidate = findBestEnterCandidate(deviceId, matchedBeacons, null); if (bestCandidate != null) { long now = System.currentTimeMillis(); stateRedisDAO.setCurrentArea(deviceId, bestCandidate.areaId, now, bestCandidate.beaconMac); windowRedisDAO.clearWindow(deviceId, bestCandidate.areaId); - publishEnterEvent(deviceId, bestCandidate.areaId, bestCandidate.beaconMac, bestCandidate.rssi); + publishEnterEvent(deviceId, bestCandidate.areaId, bestCandidate.beaconMac, bestCandidate.rssi, eventTime); log.info("[Trajectory] 进入区域:deviceId={}, areaId={}, rssi={}", deviceId, bestCandidate.areaId, bestCandidate.rssi); } @@ -282,6 +327,31 @@ public class TrajectoryDetectionProcessor { } } + /** + * 解析当前区域用于滞回判断的参考 RSSI: + * 1) 本次上报匹配到了当前区域 → 直接用本次 RSSI + * 2) 未匹配到 → 取窗口里最近一次非 -999(缺失哨兵)样本 + * 3) 仍取不到 → 返回 -999(此时滞回天然放行,等价于允许切换, + * 因为当前区域已彻底失去信号) + */ + private int resolveCurrentAreaRssi(Long deviceId, Long currentAreaId, + Map matchedBeacons) { + MatchedBeacon matched = matchedBeacons.get(currentAreaId); + if (matched != null) { + return matched.rssi; + } + List window = windowRedisDAO.getWindow(deviceId, currentAreaId); + if (window != null) { + for (int i = window.size() - 1; i >= 0; i--) { + Integer sample = window.get(i); + if (sample != null && sample != -999) { + return sample; + } + } + } + return -999; + } + /** * 找到信号最强且满足进入条件的候选区域 * @@ -366,7 +436,8 @@ public class TrajectoryDetectionProcessor { // ==================== 事件发布 ==================== - private void publishEnterEvent(Long deviceId, Long areaId, String beaconMac, Integer enterRssi) { + private void publishEnterEvent(Long deviceId, Long areaId, String beaconMac, Integer enterRssi, + LocalDateTime eventTime) { try { IotDeviceDO device = deviceService.getDeviceFromCache(deviceId); TrajectoryEnterEvent event = TrajectoryEnterEvent.builder() @@ -376,6 +447,7 @@ public class TrajectoryDetectionProcessor { .areaId(areaId) .beaconMac(beaconMac) .enterRssi(enterRssi) + .eventTime((eventTime != null ? eventTime : LocalDateTime.now()).toString()) .tenantId(TenantContextHolder.getTenantId()) .build(); @@ -390,7 +462,7 @@ public class TrajectoryDetectionProcessor { } private void publishLeaveEvent(Long deviceId, Long areaId, String beaconMac, - String leaveReason, Long enterTimestamp) { + String leaveReason, Long enterTimestamp, LocalDateTime eventTime) { try { IotDeviceDO device = deviceService.getDeviceFromCache(deviceId); TrajectoryLeaveEvent event = TrajectoryLeaveEvent.builder() @@ -401,6 +473,7 @@ public class TrajectoryDetectionProcessor { .beaconMac(beaconMac) .leaveReason(leaveReason) .enterTimestamp(enterTimestamp) + .eventTime((eventTime != null ? eventTime : LocalDateTime.now()).toString()) .tenantId(TenantContextHolder.getTenantId()) .build();