fix(iot): 轨迹检测防抖 + eventTime 用 reportTime 避免回放挤压
- 事件 eventTime 透传设备 reportTime,修复 TDengine/消息总线恢复后堆积回放导致 ENTER/LEAVE 全部塞进同一秒的问题 - 区域切换加 5dB 滞回 + 进入后 5s 最小停留,压制 RSSI 抖动造成的瞬态 AREA_SWITCH 与 SIGNAL_LOSS - 滞回兜底改用窗口内最近一次非 -999 样本,避免当前信标短暂漏扫时滞回被缺失哨兵破坏 - reportTime 为空时记录 warn,便于追踪上游漏传的调用链 Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -10,6 +10,7 @@ import jakarta.annotation.Resource;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.time.LocalDateTime;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
@@ -60,6 +61,7 @@ public class CleanRuleProcessorManager {
|
||||
}
|
||||
|
||||
Long deviceId = message.getDeviceId();
|
||||
LocalDateTime reportTime = message.getReportTime();
|
||||
@SuppressWarnings("unchecked")
|
||||
Map<String, Object> data = (Map<String, Object>) message.getParams();
|
||||
|
||||
@@ -73,7 +75,7 @@ public class CleanRuleProcessorManager {
|
||||
} else {
|
||||
// 属性上报:直接遍历 key-value
|
||||
data.forEach((identifier, value) ->
|
||||
processDataSafely(deviceId, identifier, value));
|
||||
processDataSafely(deviceId, identifier, value, reportTime));
|
||||
|
||||
// 4. 蓝牙信号缺失补偿:当设备上报了属性但不含 bluetoothDevices 时,
|
||||
// 主动注入一次 null 调用,使 BeaconDetectionRuleProcessor 能写入 -999(信号缺失),
|
||||
@@ -81,7 +83,7 @@ public class CleanRuleProcessorManager {
|
||||
if (!data.containsKey("bluetoothDevices")) {
|
||||
beaconDetectionRuleProcessor.processPropertyChange(deviceId, "bluetoothDevices", null);
|
||||
// 轨迹检测同样需要信号丢失补偿,注入 null 使窗口写入 -999
|
||||
trajectoryDetectionProcessor.processPropertyChange(deviceId, "bluetoothDevices", null);
|
||||
trajectoryDetectionProcessor.processPropertyChange(deviceId, "bluetoothDevices", null, reportTime);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -127,7 +129,7 @@ public class CleanRuleProcessorManager {
|
||||
* @param identifier 标识符
|
||||
* @param value 数据值
|
||||
*/
|
||||
private void processDataSafely(Long deviceId, String identifier, Object value) {
|
||||
private void processDataSafely(Long deviceId, String identifier, Object value, LocalDateTime reportTime) {
|
||||
try {
|
||||
switch (identifier) {
|
||||
case "people_in", "people_out" ->
|
||||
@@ -135,7 +137,7 @@ public class CleanRuleProcessorManager {
|
||||
case "bluetoothDevices" -> {
|
||||
beaconDetectionRuleProcessor.processPropertyChange(deviceId, identifier, value);
|
||||
// 轨迹检测:独立于保洁到岗检测,匹配所有已知 Beacon
|
||||
trajectoryDetectionProcessor.processPropertyChange(deviceId, identifier, value);
|
||||
trajectoryDetectionProcessor.processPropertyChange(deviceId, identifier, value, reportTime);
|
||||
}
|
||||
default -> {
|
||||
// 其他属性/事件忽略
|
||||
|
||||
@@ -24,6 +24,7 @@ import org.springframework.stereotype.Component;
|
||||
|
||||
import com.viewsh.framework.tenant.core.context.TenantContextHolder;
|
||||
|
||||
import java.time.LocalDateTime;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
@@ -57,6 +58,17 @@ public class TrajectoryDetectionProcessor {
|
||||
private static final String DEVICE_ENABLED_KEY_PATTERN = "iot:trajectory:device:enabled:%s";
|
||||
private static final int DEVICE_ENABLED_TTL_SECONDS = 3600; // 1小时
|
||||
|
||||
/**
|
||||
* 最小停留时长(毫秒):进入区域后至少停留这么久,才允许发布 LEAVE/AREA_SWITCH,
|
||||
* 用于过滤 RSSI 抖动和批量消息回放导致的瞬态切换
|
||||
*/
|
||||
private static final long MIN_STAY_MILLIS = 5_000L;
|
||||
|
||||
/**
|
||||
* 区域切换滞回阈值(dB):候选区域 RSSI 必须比当前区域 RSSI 高出此值,才允许 AREA_SWITCH
|
||||
*/
|
||||
private static final int RSSI_HYSTERESIS_DB = 5;
|
||||
|
||||
@Resource
|
||||
private BeaconRegistryService beaconRegistryService;
|
||||
|
||||
@@ -84,11 +96,20 @@ public class TrajectoryDetectionProcessor {
|
||||
* @param deviceId 设备ID(工牌)
|
||||
* @param identifier 属性标识符
|
||||
* @param propertyValue 蓝牙设备列表
|
||||
* @param reportTime 设备上报时间(可为 null,为空则用当前时间兜底)
|
||||
*/
|
||||
public void processPropertyChange(Long deviceId, String identifier, Object propertyValue) {
|
||||
public void processPropertyChange(Long deviceId, String identifier, Object propertyValue,
|
||||
LocalDateTime reportTime) {
|
||||
if (!"bluetoothDevices".equals(identifier)) {
|
||||
return;
|
||||
}
|
||||
LocalDateTime eventTime;
|
||||
if (reportTime != null) {
|
||||
eventTime = reportTime;
|
||||
} else {
|
||||
eventTime = LocalDateTime.now();
|
||||
log.warn("[Trajectory] reportTime 为空,使用当前时间兜底:deviceId={}(若频繁出现请排查上游调用链)", deviceId);
|
||||
}
|
||||
|
||||
// 1. 检查设备是否开启轨迹功能
|
||||
if (!isTrajectoryEnabled(deviceId)) {
|
||||
@@ -125,9 +146,9 @@ public class TrajectoryDetectionProcessor {
|
||||
|
||||
// 8. 处理区域状态变化
|
||||
if (currentArea != null) {
|
||||
processWithCurrentArea(deviceId, currentArea, matchedBeacons, areaConfigIndex);
|
||||
processWithCurrentArea(deviceId, currentArea, matchedBeacons, areaConfigIndex, eventTime);
|
||||
} else {
|
||||
processWithoutCurrentArea(deviceId, matchedBeacons, areaConfigIndex);
|
||||
processWithoutCurrentArea(deviceId, matchedBeacons, areaConfigIndex, eventTime);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -139,8 +160,11 @@ public class TrajectoryDetectionProcessor {
|
||||
private void processWithCurrentArea(Long deviceId,
|
||||
TrajectoryStateRedisDAO.CurrentAreaInfo currentArea,
|
||||
Map<Long, MatchedBeacon> matchedBeacons,
|
||||
Map<Long, BeaconPresenceConfig> areaConfigIndex) {
|
||||
Map<Long, BeaconPresenceConfig> areaConfigIndex,
|
||||
LocalDateTime eventTime) {
|
||||
Long currentAreaId = currentArea.getAreaId();
|
||||
long stayMillis = System.currentTimeMillis() - (currentArea.getEnterTime() != null ? currentArea.getEnterTime() : 0L);
|
||||
boolean minStayReached = stayMillis >= MIN_STAY_MILLIS;
|
||||
|
||||
// 6a. 检查当前区域的退出条件
|
||||
BeaconPresenceConfig currentConfig = areaConfigIndex.get(currentAreaId);
|
||||
@@ -153,16 +177,22 @@ public class TrajectoryDetectionProcessor {
|
||||
AreaState.IN_AREA);
|
||||
|
||||
if (exitResult == DetectionResult.LEAVE_CONFIRMED) {
|
||||
if (!minStayReached) {
|
||||
// 未达到最小停留时长,视为瞬态抖动,忽略本次离开
|
||||
log.debug("[Trajectory] 未达最小停留,忽略 LEAVE:deviceId={}, areaId={}, stayMs={}",
|
||||
deviceId, currentAreaId, stayMillis);
|
||||
return;
|
||||
}
|
||||
// 确认离开当前区域
|
||||
publishLeaveEvent(deviceId, currentAreaId, currentArea.getBeaconMac(),
|
||||
"SIGNAL_LOSS", currentArea.getEnterTime());
|
||||
"SIGNAL_LOSS", currentArea.getEnterTime(), eventTime);
|
||||
stateRedisDAO.clearCurrentArea(deviceId);
|
||||
windowRedisDAO.clearWindow(deviceId, currentAreaId);
|
||||
|
||||
log.info("[Trajectory] 离开区域:deviceId={}, areaId={}, reason=SIGNAL_LOSS", deviceId, currentAreaId);
|
||||
|
||||
// 离开后,尝试进入新区域
|
||||
processWithoutCurrentArea(deviceId, matchedBeacons, areaConfigIndex);
|
||||
processWithoutCurrentArea(deviceId, matchedBeacons, areaConfigIndex, eventTime);
|
||||
return;
|
||||
}
|
||||
}
|
||||
@@ -170,15 +200,29 @@ public class TrajectoryDetectionProcessor {
|
||||
// 6b. 当前区域未退出,检查是否有更强区域触发切换
|
||||
MatchedBeacon bestCandidate = findBestEnterCandidate(deviceId, matchedBeacons, currentAreaId);
|
||||
if (bestCandidate != null && !bestCandidate.areaId.equals(currentAreaId)) {
|
||||
if (!minStayReached) {
|
||||
log.debug("[Trajectory] 未达最小停留,忽略 AREA_SWITCH:deviceId={}, from={}, to={}, stayMs={}",
|
||||
deviceId, currentAreaId, bestCandidate.areaId, stayMillis);
|
||||
return;
|
||||
}
|
||||
// 切换滞回:候选 RSSI 必须显著强于当前区域 RSSI
|
||||
// 优先取本次匹配值;若当前未匹配到,回退到窗口里最近一次非缺失(-999)样本,
|
||||
// 避免当前信标短暂漏扫时滞回被 -999 哨兵破坏
|
||||
int currentRssi = resolveCurrentAreaRssi(deviceId, currentAreaId, matchedBeacons);
|
||||
if (bestCandidate.rssi - currentRssi < RSSI_HYSTERESIS_DB) {
|
||||
log.debug("[Trajectory] 未达滞回阈值,忽略 AREA_SWITCH:deviceId={}, from={}({}dBm), to={}({}dBm)",
|
||||
deviceId, currentAreaId, currentRssi, bestCandidate.areaId, bestCandidate.rssi);
|
||||
return;
|
||||
}
|
||||
// 区域切换:先离开当前区域,再进入新区域
|
||||
publishLeaveEvent(deviceId, currentAreaId, currentArea.getBeaconMac(),
|
||||
"AREA_SWITCH", currentArea.getEnterTime());
|
||||
"AREA_SWITCH", currentArea.getEnterTime(), eventTime);
|
||||
windowRedisDAO.clearWindow(deviceId, currentAreaId);
|
||||
|
||||
long now = System.currentTimeMillis();
|
||||
stateRedisDAO.setCurrentArea(deviceId, bestCandidate.areaId, now, bestCandidate.beaconMac);
|
||||
windowRedisDAO.clearWindow(deviceId, bestCandidate.areaId);
|
||||
publishEnterEvent(deviceId, bestCandidate.areaId, bestCandidate.beaconMac, bestCandidate.rssi);
|
||||
publishEnterEvent(deviceId, bestCandidate.areaId, bestCandidate.beaconMac, bestCandidate.rssi, eventTime);
|
||||
|
||||
log.info("[Trajectory] 区域切换:deviceId={}, from={}, to={}", deviceId, currentAreaId, bestCandidate.areaId);
|
||||
}
|
||||
@@ -189,13 +233,14 @@ public class TrajectoryDetectionProcessor {
|
||||
*/
|
||||
private void processWithoutCurrentArea(Long deviceId,
|
||||
Map<Long, MatchedBeacon> matchedBeacons,
|
||||
Map<Long, BeaconPresenceConfig> areaConfigIndex) {
|
||||
Map<Long, BeaconPresenceConfig> areaConfigIndex,
|
||||
LocalDateTime eventTime) {
|
||||
MatchedBeacon bestCandidate = findBestEnterCandidate(deviceId, matchedBeacons, null);
|
||||
if (bestCandidate != null) {
|
||||
long now = System.currentTimeMillis();
|
||||
stateRedisDAO.setCurrentArea(deviceId, bestCandidate.areaId, now, bestCandidate.beaconMac);
|
||||
windowRedisDAO.clearWindow(deviceId, bestCandidate.areaId);
|
||||
publishEnterEvent(deviceId, bestCandidate.areaId, bestCandidate.beaconMac, bestCandidate.rssi);
|
||||
publishEnterEvent(deviceId, bestCandidate.areaId, bestCandidate.beaconMac, bestCandidate.rssi, eventTime);
|
||||
|
||||
log.info("[Trajectory] 进入区域:deviceId={}, areaId={}, rssi={}", deviceId, bestCandidate.areaId, bestCandidate.rssi);
|
||||
}
|
||||
@@ -282,6 +327,31 @@ public class TrajectoryDetectionProcessor {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 解析当前区域用于滞回判断的参考 RSSI:
|
||||
* 1) 本次上报匹配到了当前区域 → 直接用本次 RSSI
|
||||
* 2) 未匹配到 → 取窗口里最近一次非 -999(缺失哨兵)样本
|
||||
* 3) 仍取不到 → 返回 -999(此时滞回天然放行,等价于允许切换,
|
||||
* 因为当前区域已彻底失去信号)
|
||||
*/
|
||||
private int resolveCurrentAreaRssi(Long deviceId, Long currentAreaId,
|
||||
Map<Long, MatchedBeacon> matchedBeacons) {
|
||||
MatchedBeacon matched = matchedBeacons.get(currentAreaId);
|
||||
if (matched != null) {
|
||||
return matched.rssi;
|
||||
}
|
||||
List<Integer> window = windowRedisDAO.getWindow(deviceId, currentAreaId);
|
||||
if (window != null) {
|
||||
for (int i = window.size() - 1; i >= 0; i--) {
|
||||
Integer sample = window.get(i);
|
||||
if (sample != null && sample != -999) {
|
||||
return sample;
|
||||
}
|
||||
}
|
||||
}
|
||||
return -999;
|
||||
}
|
||||
|
||||
/**
|
||||
* 找到信号最强且满足进入条件的候选区域
|
||||
*
|
||||
@@ -366,7 +436,8 @@ public class TrajectoryDetectionProcessor {
|
||||
|
||||
// ==================== 事件发布 ====================
|
||||
|
||||
private void publishEnterEvent(Long deviceId, Long areaId, String beaconMac, Integer enterRssi) {
|
||||
private void publishEnterEvent(Long deviceId, Long areaId, String beaconMac, Integer enterRssi,
|
||||
LocalDateTime eventTime) {
|
||||
try {
|
||||
IotDeviceDO device = deviceService.getDeviceFromCache(deviceId);
|
||||
TrajectoryEnterEvent event = TrajectoryEnterEvent.builder()
|
||||
@@ -376,6 +447,7 @@ public class TrajectoryDetectionProcessor {
|
||||
.areaId(areaId)
|
||||
.beaconMac(beaconMac)
|
||||
.enterRssi(enterRssi)
|
||||
.eventTime((eventTime != null ? eventTime : LocalDateTime.now()).toString())
|
||||
.tenantId(TenantContextHolder.getTenantId())
|
||||
.build();
|
||||
|
||||
@@ -390,7 +462,7 @@ public class TrajectoryDetectionProcessor {
|
||||
}
|
||||
|
||||
private void publishLeaveEvent(Long deviceId, Long areaId, String beaconMac,
|
||||
String leaveReason, Long enterTimestamp) {
|
||||
String leaveReason, Long enterTimestamp, LocalDateTime eventTime) {
|
||||
try {
|
||||
IotDeviceDO device = deviceService.getDeviceFromCache(deviceId);
|
||||
TrajectoryLeaveEvent event = TrajectoryLeaveEvent.builder()
|
||||
@@ -401,6 +473,7 @@ public class TrajectoryDetectionProcessor {
|
||||
.beaconMac(beaconMac)
|
||||
.leaveReason(leaveReason)
|
||||
.enterTimestamp(enterTimestamp)
|
||||
.eventTime((eventTime != null ? eventTime : LocalDateTime.now()).toString())
|
||||
.tenantId(TenantContextHolder.getTenantId())
|
||||
.build();
|
||||
|
||||
|
||||
Reference in New Issue
Block a user